]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
Use Python 3 compatible "except ... as e" syntax
[bup.git] / lib / bup / client.py
1 import re, struct, errno, time, zlib
2 from bup import git, ssh
3 from bup.helpers import *
4
5 bwlimit = None
6
7
8 class ClientError(Exception):
9     pass
10
11
12 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
13     if not bwlimit:
14         f.write(buf)
15         return (len(buf), time.time())
16     else:
17         # We want to write in reasonably large blocks, but not so large that
18         # they're likely to overflow a router's queue.  So our bwlimit timing
19         # has to be pretty granular.  Also, if it takes too long from one
20         # transmit to the next, we can't just make up for lost time to bring
21         # the average back up to bwlimit - that will risk overflowing the
22         # outbound queue, which defeats the purpose.  So if we fall behind
23         # by more than one block delay, we shouldn't ever try to catch up.
24         for i in xrange(0,len(buf),4096):
25             now = time.time()
26             next = max(now, bwtime + 1.0*bwcount/bwlimit)
27             time.sleep(next-now)
28             sub = buf[i:i+4096]
29             f.write(sub)
30             bwcount = len(sub)  # might be less than 4096
31             bwtime = next
32         return (bwcount, bwtime)
33
34
35 def parse_remote(remote):
36     protocol = r'([a-z]+)://'
37     host = r'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
38     port = r'(?::(\d+))?'
39     path = r'(/.*)?'
40     url_match = re.match(
41             '%s(?:%s%s)?%s' % (protocol, host, port, path), remote, re.I)
42     if url_match:
43         if not url_match.group(1) in ('ssh', 'bup', 'file'):
44             raise ClientError, 'unexpected protocol: %s' % url_match.group(1)
45         return url_match.group(1,3,4,5)
46     else:
47         rs = remote.split(':', 1)
48         if len(rs) == 1 or rs[0] in ('', '-'):
49             return 'file', None, None, rs[-1]
50         else:
51             return 'ssh', rs[0], None, rs[1]
52
53
54 class Client:
55     def __init__(self, remote, create=False):
56         self._busy = self.conn = None
57         self.sock = self.p = self.pout = self.pin = None
58         is_reverse = os.environ.get('BUP_SERVER_REVERSE')
59         if is_reverse:
60             assert(not remote)
61             remote = '%s:' % is_reverse
62         (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
63         self.cachedir = git.repo('index-cache/%s'
64                                  % re.sub(r'[^@\w]', '_', 
65                                           "%s:%s" % (self.host, self.dir)))
66         if is_reverse:
67             self.pout = os.fdopen(3, 'rb')
68             self.pin = os.fdopen(4, 'wb')
69             self.conn = Conn(self.pout, self.pin)
70         else:
71             if self.protocol in ('ssh', 'file'):
72                 try:
73                     # FIXME: ssh and file shouldn't use the same module
74                     self.p = ssh.connect(self.host, self.port, 'server')
75                     self.pout = self.p.stdout
76                     self.pin = self.p.stdin
77                     self.conn = Conn(self.pout, self.pin)
78                 except OSError as e:
79                     raise ClientError, 'connect: %s' % e, sys.exc_info()[2]
80             elif self.protocol == 'bup':
81                 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
82                 self.sock.connect((self.host, atoi(self.port) or 1982))
83                 self.sockw = self.sock.makefile('wb')
84                 self.conn = DemuxConn(self.sock.fileno(), self.sockw)
85         if self.dir:
86             self.dir = re.sub(r'[\r\n]', ' ', self.dir)
87             if create:
88                 self.conn.write('init-dir %s\n' % self.dir)
89             else:
90                 self.conn.write('set-dir %s\n' % self.dir)
91             self.check_ok()
92         self.sync_indexes()
93
94     def __del__(self):
95         try:
96             self.close()
97         except IOError as e:
98             if e.errno == errno.EPIPE:
99                 pass
100             else:
101                 raise
102
103     def close(self):
104         if self.conn and not self._busy:
105             self.conn.write('quit\n')
106         if self.pin:
107             self.pin.close()
108         if self.sock and self.sockw:
109             self.sockw.close()
110             self.sock.shutdown(socket.SHUT_WR)
111         if self.conn:
112             self.conn.close()
113         if self.pout:
114             self.pout.close()
115         if self.sock:
116             self.sock.close()
117         if self.p:
118             self.p.wait()
119             rv = self.p.wait()
120             if rv:
121                 raise ClientError('server tunnel returned exit code %d' % rv)
122         self.conn = None
123         self.sock = self.p = self.pin = self.pout = None
124
125     def check_ok(self):
126         if self.p:
127             rv = self.p.poll()
128             if rv != None:
129                 raise ClientError('server exited unexpectedly with code %r'
130                                   % rv)
131         try:
132             return self.conn.check_ok()
133         except Exception as e:
134             raise ClientError, e, sys.exc_info()[2]
135
136     def check_busy(self):
137         if self._busy:
138             raise ClientError('already busy with command %r' % self._busy)
139         
140     def ensure_busy(self):
141         if not self._busy:
142             raise ClientError('expected to be busy, but not busy?!')
143         
144     def _not_busy(self):
145         self._busy = None
146
147     def sync_indexes(self):
148         self.check_busy()
149         conn = self.conn
150         mkdirp(self.cachedir)
151         # All cached idxs are extra until proven otherwise
152         extra = set()
153         for f in os.listdir(self.cachedir):
154             debug1('%s\n' % f)
155             if f.endswith('.idx'):
156                 extra.add(f)
157         needed = set()
158         conn.write('list-indexes\n')
159         for line in linereader(conn):
160             if not line:
161                 break
162             assert(line.find('/') < 0)
163             parts = line.split(' ')
164             idx = parts[0]
165             if len(parts) == 2 and parts[1] == 'load' and idx not in extra:
166                 # If the server requests that we load an idx and we don't
167                 # already have a copy of it, it is needed
168                 needed.add(idx)
169             # Any idx that the server has heard of is proven not extra
170             extra.discard(idx)
171
172         self.check_ok()
173         debug1('client: removing extra indexes: %s\n' % extra)
174         for idx in extra:
175             os.unlink(os.path.join(self.cachedir, idx))
176         debug1('client: server requested load of: %s\n' % needed)
177         for idx in needed:
178             self.sync_index(idx)
179         git.auto_midx(self.cachedir)
180
181     def sync_index(self, name):
182         #debug1('requesting %r\n' % name)
183         self.check_busy()
184         mkdirp(self.cachedir)
185         fn = os.path.join(self.cachedir, name)
186         if os.path.exists(fn):
187             msg = "won't request existing .idx, try `bup bloom --check %s`" % fn
188             raise ClientError(msg)
189         self.conn.write('send-index %s\n' % name)
190         n = struct.unpack('!I', self.conn.read(4))[0]
191         assert(n)
192         with atomically_replaced_file(fn, 'w') as f:
193             count = 0
194             progress('Receiving index from server: %d/%d\r' % (count, n))
195             for b in chunkyreader(self.conn, n):
196                 f.write(b)
197                 count += len(b)
198                 qprogress('Receiving index from server: %d/%d\r' % (count, n))
199             progress('Receiving index from server: %d/%d, done.\n' % (count, n))
200             self.check_ok()
201
202     def _make_objcache(self):
203         return git.PackIdxList(self.cachedir)
204
205     def _suggest_packs(self):
206         ob = self._busy
207         if ob:
208             assert(ob == 'receive-objects-v2')
209             self.conn.write('\xff\xff\xff\xff')  # suspend receive-objects-v2
210         suggested = []
211         for line in linereader(self.conn):
212             if not line:
213                 break
214             debug2('%s\n' % line)
215             if line.startswith('index '):
216                 idx = line[6:]
217                 debug1('client: received index suggestion: %s\n'
218                        % git.shorten_hash(idx))
219                 suggested.append(idx)
220             else:
221                 assert(line.endswith('.idx'))
222                 debug1('client: completed writing pack, idx: %s\n'
223                        % git.shorten_hash(line))
224                 suggested.append(line)
225         self.check_ok()
226         if ob:
227             self._busy = None
228         idx = None
229         for idx in suggested:
230             self.sync_index(idx)
231         git.auto_midx(self.cachedir)
232         if ob:
233             self._busy = ob
234             self.conn.write('%s\n' % ob)
235         return idx
236
237     def new_packwriter(self, compression_level = 1):
238         self.check_busy()
239         def _set_busy():
240             self._busy = 'receive-objects-v2'
241             self.conn.write('receive-objects-v2\n')
242         return PackWriter_Remote(self.conn,
243                                  objcache_maker = self._make_objcache,
244                                  suggest_packs = self._suggest_packs,
245                                  onopen = _set_busy,
246                                  onclose = self._not_busy,
247                                  ensure_busy = self.ensure_busy,
248                                  compression_level = compression_level)
249
250     def read_ref(self, refname):
251         self.check_busy()
252         self.conn.write('read-ref %s\n' % refname)
253         r = self.conn.readline().strip()
254         self.check_ok()
255         if r:
256             assert(len(r) == 40)   # hexified sha
257             return r.decode('hex')
258         else:
259             return None   # nonexistent ref
260
261     def update_ref(self, refname, newval, oldval):
262         self.check_busy()
263         self.conn.write('update-ref %s\n%s\n%s\n' 
264                         % (refname, newval.encode('hex'),
265                            (oldval or '').encode('hex')))
266         self.check_ok()
267
268     def cat(self, id):
269         self.check_busy()
270         self._busy = 'cat'
271         self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
272         while 1:
273             sz = struct.unpack('!I', self.conn.read(4))[0]
274             if not sz: break
275             yield self.conn.read(sz)
276         e = self.check_ok()
277         self._not_busy()
278         if e:
279             raise KeyError(str(e))
280
281
282 class PackWriter_Remote(git.PackWriter):
283     def __init__(self, conn, objcache_maker, suggest_packs,
284                  onopen, onclose,
285                  ensure_busy,
286                  compression_level=1):
287         git.PackWriter.__init__(self, objcache_maker)
288         self.file = conn
289         self.filename = 'remote socket'
290         self.suggest_packs = suggest_packs
291         self.onopen = onopen
292         self.onclose = onclose
293         self.ensure_busy = ensure_busy
294         self._packopen = False
295         self._bwcount = 0
296         self._bwtime = time.time()
297
298     def _open(self):
299         if not self._packopen:
300             self.onopen()
301             self._packopen = True
302
303     def _end(self):
304         if self._packopen and self.file:
305             self.file.write('\0\0\0\0')
306             self._packopen = False
307             self.onclose() # Unbusy
308             self.objcache = None
309             return self.suggest_packs() # Returns last idx received
310
311     def close(self):
312         id = self._end()
313         self.file = None
314         return id
315
316     def abort(self):
317         raise ClientError("don't know how to abort remote pack writing")
318
319     def _raw_write(self, datalist, sha):
320         assert(self.file)
321         if not self._packopen:
322             self._open()
323         self.ensure_busy()
324         data = ''.join(datalist)
325         assert(data)
326         assert(sha)
327         crc = zlib.crc32(data) & 0xffffffff
328         outbuf = ''.join((struct.pack('!I', len(data) + 20 + 4),
329                           sha,
330                           struct.pack('!I', crc),
331                           data))
332         try:
333             (self._bwcount, self._bwtime) = _raw_write_bwlimit(
334                     self.file, outbuf, self._bwcount, self._bwtime)
335         except IOError as e:
336             raise ClientError, e, sys.exc_info()[2]
337         self.outbytes += len(data)
338         self.count += 1
339
340         if self.file.has_input():
341             self.suggest_packs()
342             self.objcache.refresh()
343
344         return sha, crc