]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
Don't import * from helpers
[bup.git] / lib / bup / client.py
1
2 import errno, os, re, struct, time, zlib
3
4 from bup import git, ssh
5 from bup.helpers import (Conn, atomically_replaced_file, chunkyreader, debug1,
6                          debug2, linereader, mkdirp, progress, qprogress)
7
8
9 bwlimit = None
10
11
12 class ClientError(Exception):
13     pass
14
15
16 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
17     if not bwlimit:
18         f.write(buf)
19         return (len(buf), time.time())
20     else:
21         # We want to write in reasonably large blocks, but not so large that
22         # they're likely to overflow a router's queue.  So our bwlimit timing
23         # has to be pretty granular.  Also, if it takes too long from one
24         # transmit to the next, we can't just make up for lost time to bring
25         # the average back up to bwlimit - that will risk overflowing the
26         # outbound queue, which defeats the purpose.  So if we fall behind
27         # by more than one block delay, we shouldn't ever try to catch up.
28         for i in xrange(0,len(buf),4096):
29             now = time.time()
30             next = max(now, bwtime + 1.0*bwcount/bwlimit)
31             time.sleep(next-now)
32             sub = buf[i:i+4096]
33             f.write(sub)
34             bwcount = len(sub)  # might be less than 4096
35             bwtime = next
36         return (bwcount, bwtime)
37
38
39 def parse_remote(remote):
40     protocol = r'([a-z]+)://'
41     host = r'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
42     port = r'(?::(\d+))?'
43     path = r'(/.*)?'
44     url_match = re.match(
45             '%s(?:%s%s)?%s' % (protocol, host, port, path), remote, re.I)
46     if url_match:
47         if not url_match.group(1) in ('ssh', 'bup', 'file'):
48             raise ClientError, 'unexpected protocol: %s' % url_match.group(1)
49         return url_match.group(1,3,4,5)
50     else:
51         rs = remote.split(':', 1)
52         if len(rs) == 1 or rs[0] in ('', '-'):
53             return 'file', None, None, rs[-1]
54         else:
55             return 'ssh', rs[0], None, rs[1]
56
57
58 class Client:
59     def __init__(self, remote, create=False):
60         self._busy = self.conn = None
61         self.sock = self.p = self.pout = self.pin = None
62         is_reverse = os.environ.get('BUP_SERVER_REVERSE')
63         if is_reverse:
64             assert(not remote)
65             remote = '%s:' % is_reverse
66         (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
67         self.cachedir = git.repo('index-cache/%s'
68                                  % re.sub(r'[^@\w]', '_', 
69                                           "%s:%s" % (self.host, self.dir)))
70         if is_reverse:
71             self.pout = os.fdopen(3, 'rb')
72             self.pin = os.fdopen(4, 'wb')
73             self.conn = Conn(self.pout, self.pin)
74         else:
75             if self.protocol in ('ssh', 'file'):
76                 try:
77                     # FIXME: ssh and file shouldn't use the same module
78                     self.p = ssh.connect(self.host, self.port, 'server')
79                     self.pout = self.p.stdout
80                     self.pin = self.p.stdin
81                     self.conn = Conn(self.pout, self.pin)
82                 except OSError as e:
83                     raise ClientError, 'connect: %s' % e, sys.exc_info()[2]
84             elif self.protocol == 'bup':
85                 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
86                 self.sock.connect((self.host, atoi(self.port) or 1982))
87                 self.sockw = self.sock.makefile('wb')
88                 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
89         if self.dir:
90             self.dir = re.sub(r'[\r\n]', ' ', self.dir)
91             if create:
92                 self.conn.write('init-dir %s\n' % self.dir)
93             else:
94                 self.conn.write('set-dir %s\n' % self.dir)
95             self.check_ok()
96         self.sync_indexes()
97
98     def __del__(self):
99         try:
100             self.close()
101         except IOError as e:
102             if e.errno == errno.EPIPE:
103                 pass
104             else:
105                 raise
106
107     def close(self):
108         if self.conn and not self._busy:
109             self.conn.write('quit\n')
110         if self.pin:
111             self.pin.close()
112         if self.sock and self.sockw:
113             self.sockw.close()
114             self.sock.shutdown(socket.SHUT_WR)
115         if self.conn:
116             self.conn.close()
117         if self.pout:
118             self.pout.close()
119         if self.sock:
120             self.sock.close()
121         if self.p:
122             self.p.wait()
123             rv = self.p.wait()
124             if rv:
125                 raise ClientError('server tunnel returned exit code %d' % rv)
126         self.conn = None
127         self.sock = self.p = self.pin = self.pout = None
128
129     def check_ok(self):
130         if self.p:
131             rv = self.p.poll()
132             if rv != None:
133                 raise ClientError('server exited unexpectedly with code %r'
134                                   % rv)
135         try:
136             return self.conn.check_ok()
137         except Exception as e:
138             raise ClientError, e, sys.exc_info()[2]
139
140     def check_busy(self):
141         if self._busy:
142             raise ClientError('already busy with command %r' % self._busy)
143         
144     def ensure_busy(self):
145         if not self._busy:
146             raise ClientError('expected to be busy, but not busy?!')
147         
148     def _not_busy(self):
149         self._busy = None
150
151     def sync_indexes(self):
152         self.check_busy()
153         conn = self.conn
154         mkdirp(self.cachedir)
155         # All cached idxs are extra until proven otherwise
156         extra = set()
157         for f in os.listdir(self.cachedir):
158             debug1('%s\n' % f)
159             if f.endswith('.idx'):
160                 extra.add(f)
161         needed = set()
162         conn.write('list-indexes\n')
163         for line in linereader(conn):
164             if not line:
165                 break
166             assert(line.find('/') < 0)
167             parts = line.split(' ')
168             idx = parts[0]
169             if len(parts) == 2 and parts[1] == 'load' and idx not in extra:
170                 # If the server requests that we load an idx and we don't
171                 # already have a copy of it, it is needed
172                 needed.add(idx)
173             # Any idx that the server has heard of is proven not extra
174             extra.discard(idx)
175
176         self.check_ok()
177         debug1('client: removing extra indexes: %s\n' % extra)
178         for idx in extra:
179             os.unlink(os.path.join(self.cachedir, idx))
180         debug1('client: server requested load of: %s\n' % needed)
181         for idx in needed:
182             self.sync_index(idx)
183         git.auto_midx(self.cachedir)
184
185     def sync_index(self, name):
186         #debug1('requesting %r\n' % name)
187         self.check_busy()
188         mkdirp(self.cachedir)
189         fn = os.path.join(self.cachedir, name)
190         if os.path.exists(fn):
191             msg = "won't request existing .idx, try `bup bloom --check %s`" % fn
192             raise ClientError(msg)
193         self.conn.write('send-index %s\n' % name)
194         n = struct.unpack('!I', self.conn.read(4))[0]
195         assert(n)
196         with atomically_replaced_file(fn, 'w') as f:
197             count = 0
198             progress('Receiving index from server: %d/%d\r' % (count, n))
199             for b in chunkyreader(self.conn, n):
200                 f.write(b)
201                 count += len(b)
202                 qprogress('Receiving index from server: %d/%d\r' % (count, n))
203             progress('Receiving index from server: %d/%d, done.\n' % (count, n))
204             self.check_ok()
205
206     def _make_objcache(self):
207         return git.PackIdxList(self.cachedir)
208
209     def _suggest_packs(self):
210         ob = self._busy
211         if ob:
212             assert(ob == 'receive-objects-v2')
213             self.conn.write('\xff\xff\xff\xff')  # suspend receive-objects-v2
214         suggested = []
215         for line in linereader(self.conn):
216             if not line:
217                 break
218             debug2('%s\n' % line)
219             if line.startswith('index '):
220                 idx = line[6:]
221                 debug1('client: received index suggestion: %s\n'
222                        % git.shorten_hash(idx))
223                 suggested.append(idx)
224             else:
225                 assert(line.endswith('.idx'))
226                 debug1('client: completed writing pack, idx: %s\n'
227                        % git.shorten_hash(line))
228                 suggested.append(line)
229         self.check_ok()
230         if ob:
231             self._busy = None
232         idx = None
233         for idx in suggested:
234             self.sync_index(idx)
235         git.auto_midx(self.cachedir)
236         if ob:
237             self._busy = ob
238             self.conn.write('%s\n' % ob)
239         return idx
240
241     def new_packwriter(self, compression_level = 1):
242         self.check_busy()
243         def _set_busy():
244             self._busy = 'receive-objects-v2'
245             self.conn.write('receive-objects-v2\n')
246         return PackWriter_Remote(self.conn,
247                                  objcache_maker = self._make_objcache,
248                                  suggest_packs = self._suggest_packs,
249                                  onopen = _set_busy,
250                                  onclose = self._not_busy,
251                                  ensure_busy = self.ensure_busy,
252                                  compression_level = compression_level)
253
254     def read_ref(self, refname):
255         self.check_busy()
256         self.conn.write('read-ref %s\n' % refname)
257         r = self.conn.readline().strip()
258         self.check_ok()
259         if r:
260             assert(len(r) == 40)   # hexified sha
261             return r.decode('hex')
262         else:
263             return None   # nonexistent ref
264
265     def update_ref(self, refname, newval, oldval):
266         self.check_busy()
267         self.conn.write('update-ref %s\n%s\n%s\n' 
268                         % (refname, newval.encode('hex'),
269                            (oldval or '').encode('hex')))
270         self.check_ok()
271
272     def cat(self, id):
273         self.check_busy()
274         self._busy = 'cat'
275         self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
276         while 1:
277             sz = struct.unpack('!I', self.conn.read(4))[0]
278             if not sz: break
279             yield self.conn.read(sz)
280         e = self.check_ok()
281         self._not_busy()
282         if e:
283             raise KeyError(str(e))
284
285
286 class PackWriter_Remote(git.PackWriter):
287     def __init__(self, conn, objcache_maker, suggest_packs,
288                  onopen, onclose,
289                  ensure_busy,
290                  compression_level=1):
291         git.PackWriter.__init__(self, objcache_maker)
292         self.file = conn
293         self.filename = 'remote socket'
294         self.suggest_packs = suggest_packs
295         self.onopen = onopen
296         self.onclose = onclose
297         self.ensure_busy = ensure_busy
298         self._packopen = False
299         self._bwcount = 0
300         self._bwtime = time.time()
301
302     def _open(self):
303         if not self._packopen:
304             self.onopen()
305             self._packopen = True
306
307     def _end(self):
308         if self._packopen and self.file:
309             self.file.write('\0\0\0\0')
310             self._packopen = False
311             self.onclose() # Unbusy
312             self.objcache = None
313             return self.suggest_packs() # Returns last idx received
314
315     def close(self):
316         id = self._end()
317         self.file = None
318         return id
319
320     def abort(self):
321         raise ClientError("don't know how to abort remote pack writing")
322
323     def _raw_write(self, datalist, sha):
324         assert(self.file)
325         if not self._packopen:
326             self._open()
327         self.ensure_busy()
328         data = ''.join(datalist)
329         assert(data)
330         assert(sha)
331         crc = zlib.crc32(data) & 0xffffffff
332         outbuf = ''.join((struct.pack('!I', len(data) + 20 + 4),
333                           sha,
334                           struct.pack('!I', crc),
335                           data))
336         try:
337             (self._bwcount, self._bwtime) = _raw_write_bwlimit(
338                     self.file, outbuf, self._bwcount, self._bwtime)
339         except IOError as e:
340             raise ClientError, e, sys.exc_info()[2]
341         self.outbytes += len(data)
342         self.count += 1
343
344         if self.file.has_input():
345             self.suggest_packs()
346             self.objcache.refresh()
347
348         return sha, crc