]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
Merge branch 'bl/dumbserver' into next
[bup.git] / lib / bup / client.py
1 import re, struct, errno, time, zlib
2 from bup import git, ssh
3 from bup.helpers import *
4
5 bwlimit = None
6
7
8 class ClientError(Exception):
9     pass
10
11
12 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
13     if not bwlimit:
14         f.write(buf)
15         return (len(buf), time.time())
16     else:
17         # We want to write in reasonably large blocks, but not so large that
18         # they're likely to overflow a router's queue.  So our bwlimit timing
19         # has to be pretty granular.  Also, if it takes too long from one
20         # transmit to the next, we can't just make up for lost time to bring
21         # the average back up to bwlimit - that will risk overflowing the
22         # outbound queue, which defeats the purpose.  So if we fall behind
23         # by more than one block delay, we shouldn't ever try to catch up.
24         for i in xrange(0,len(buf),4096):
25             now = time.time()
26             next = max(now, bwtime + 1.0*bwcount/bwlimit)
27             time.sleep(next-now)
28             sub = buf[i:i+4096]
29             f.write(sub)
30             bwcount = len(sub)  # might be less than 4096
31             bwtime = next
32         return (bwcount, bwtime)
33
34
35 def parse_remote(remote):
36     protocol = r'([a-z]+)://'
37     host = r'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
38     port = r'(?::(\d+))?'
39     path = r'(/.*)?'
40     url_match = re.match(
41             '%s(?:%s%s)?%s' % (protocol, host, port, path), remote, re.I)
42     if url_match:
43         assert(url_match.group(1) in ('ssh', 'bup', 'file'))
44         return url_match.group(1,3,4,5)
45     else:
46         rs = remote.split(':', 1)
47         if len(rs) == 1 or rs[0] in ('', '-'):
48             return 'file', None, None, rs[-1]
49         else:
50             return 'ssh', rs[0], None, rs[1]
51
52
53 class Client:
54     def __init__(self, remote, create=False):
55         self._busy = self.conn = None
56         self.sock = self.p = self.pout = self.pin = None
57         is_reverse = os.environ.get('BUP_SERVER_REVERSE')
58         if is_reverse:
59             assert(not remote)
60             remote = '%s:' % is_reverse
61         (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
62         self.cachedir = git.repo('index-cache/%s'
63                                  % re.sub(r'[^@\w]', '_', 
64                                           "%s:%s" % (self.host, self.dir)))
65         if is_reverse:
66             self.pout = os.fdopen(3, 'rb')
67             self.pin = os.fdopen(4, 'wb')
68         else:
69             if self.protocol in ('ssh', 'file'):
70                 try:
71                     # FIXME: ssh and file shouldn't use the same module
72                     self.p = ssh.connect(self.host, self.port, 'server')
73                     self.pout = self.p.stdout
74                     self.pin = self.p.stdin
75                 except OSError, e:
76                     raise ClientError, 'connect: %s' % e, sys.exc_info()[2]
77             elif self.protocol == 'bup':
78                 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
79                 self.sock.connect((self.host, self.port or 1982))
80                 self.pout = self.sock.makefile('rb')
81                 self.pin = self.sock.makefile('wb')
82         self.conn = Conn(self.pout, self.pin)
83         if self.dir:
84             self.dir = re.sub(r'[\r\n]', ' ', self.dir)
85             if create:
86                 self.conn.write('init-dir %s\n' % self.dir)
87             else:
88                 self.conn.write('set-dir %s\n' % self.dir)
89             self.check_ok()
90         self.sync_indexes()
91
92     def __del__(self):
93         try:
94             self.close()
95         except IOError, e:
96             if e.errno == errno.EPIPE:
97                 pass
98             else:
99                 raise
100
101     def close(self):
102         if self.conn and not self._busy:
103             self.conn.write('quit\n')
104         if self.pin and self.pout:
105             self.pin.close()
106             while self.pout.read(65536):
107                 pass
108             self.pout.close()
109         if self.sock:
110             self.sock.close()
111         if self.p:
112             self.p.wait()
113             rv = self.p.wait()
114             if rv:
115                 raise ClientError('server tunnel returned exit code %d' % rv)
116         self.conn = None
117         self.sock = self.p = self.pin = self.pout = None
118
119     def check_ok(self):
120         if self.p:
121             rv = self.p.poll()
122             if rv != None:
123                 raise ClientError('server exited unexpectedly with code %r'
124                                   % rv)
125         try:
126             return self.conn.check_ok()
127         except Exception, e:
128             raise ClientError, e, sys.exc_info()[2]
129
130     def check_busy(self):
131         if self._busy:
132             raise ClientError('already busy with command %r' % self._busy)
133         
134     def ensure_busy(self):
135         if not self._busy:
136             raise ClientError('expected to be busy, but not busy?!')
137         
138     def _not_busy(self):
139         self._busy = None
140
141     def sync_indexes(self):
142         self.check_busy()
143         conn = self.conn
144         mkdirp(self.cachedir)
145         # All cached idxs are extra until proven otherwise
146         extra = set()
147         for f in os.listdir(self.cachedir):
148             debug1('%s\n' % f)
149             if f.endswith('.idx'):
150                 extra.add(f)
151         needed = set()
152         conn.write('list-indexes\n')
153         for line in linereader(conn):
154             if not line:
155                 break
156             assert(line.find('/') < 0)
157             parts = line.split(' ')
158             idx = parts[0]
159             if len(parts) == 2 and parts[1] == 'load' and idx not in extra:
160                 # If the server requests that we load an idx and we don't
161                 # already have a copy of it, it is needed
162                 needed.add(idx)
163             # Any idx that the server has heard of is proven not extra
164             extra.discard(idx)
165
166         self.check_ok()
167         debug1('client: removing extra indexes: %s\n' % extra)
168         for idx in extra:
169             os.unlink(os.path.join(self.cachedir, idx))
170         debug1('client: server requested load of: %s\n' % needed)
171         for idx in needed:
172             self.sync_index(idx)
173
174     def sync_index(self, name):
175         #debug1('requesting %r\n' % name)
176         self.check_busy()
177         mkdirp(self.cachedir)
178         self.conn.write('send-index %s\n' % name)
179         n = struct.unpack('!I', self.conn.read(4))[0]
180         assert(n)
181         fn = os.path.join(self.cachedir, name)
182         f = open(fn + '.tmp', 'w')
183         count = 0
184         progress('Receiving index from server: %d/%d\r' % (count, n))
185         for b in chunkyreader(self.conn, n):
186             f.write(b)
187             count += len(b)
188             progress('Receiving index from server: %d/%d\r' % (count, n))
189         progress('Receiving index from server: %d/%d, done.\n' % (count, n))
190         self.check_ok()
191         f.close()
192         os.rename(fn + '.tmp', fn)
193         git.auto_midx(self.cachedir)
194
195     def _make_objcache(self):
196         ob = self._busy
197         self._busy = None
198         #self.sync_indexes()
199         self._busy = ob
200         return git.PackIdxList(self.cachedir)
201
202     def _suggest_pack(self, indexname):
203         debug1('client: received index suggestion: %s\n' % indexname)
204         ob = self._busy
205         if ob:
206             assert(ob == 'receive-objects-v2')
207             self.conn.write('\xff\xff\xff\xff')  # suspend receive-objects
208             self._busy = None
209             self.conn.drain_and_check_ok()
210         self.sync_index(indexname)
211         if ob:
212             self._busy = ob
213             self.conn.write('receive-objects-v2\n')
214
215     def new_packwriter(self):
216         self.check_busy()
217         def _set_busy():
218             self._busy = 'receive-objects-v2'
219             self.conn.write('receive-objects-v2\n')
220         return PackWriter_Remote(self.conn,
221                                  objcache_maker = self._make_objcache,
222                                  suggest_pack = self._suggest_pack,
223                                  onopen = _set_busy,
224                                  onclose = self._not_busy,
225                                  ensure_busy = self.ensure_busy)
226
227     def read_ref(self, refname):
228         self.check_busy()
229         self.conn.write('read-ref %s\n' % refname)
230         r = self.conn.readline().strip()
231         self.check_ok()
232         if r:
233             assert(len(r) == 40)   # hexified sha
234             return r.decode('hex')
235         else:
236             return None   # nonexistent ref
237
238     def update_ref(self, refname, newval, oldval):
239         self.check_busy()
240         self.conn.write('update-ref %s\n%s\n%s\n' 
241                         % (refname, newval.encode('hex'),
242                            (oldval or '').encode('hex')))
243         self.check_ok()
244
245     def cat(self, id):
246         self.check_busy()
247         self._busy = 'cat'
248         self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
249         while 1:
250             sz = struct.unpack('!I', self.conn.read(4))[0]
251             if not sz: break
252             yield self.conn.read(sz)
253         e = self.check_ok()
254         self._not_busy()
255         if e:
256             raise KeyError(str(e))
257
258
259 class PackWriter_Remote(git.PackWriter):
260     def __init__(self, conn, objcache_maker, suggest_pack,
261                  onopen, onclose,
262                  ensure_busy):
263         git.PackWriter.__init__(self, objcache_maker)
264         self.file = conn
265         self.filename = 'remote socket'
266         self.suggest_pack = suggest_pack
267         self.onopen = onopen
268         self.onclose = onclose
269         self.ensure_busy = ensure_busy
270         self._packopen = False
271         self._bwcount = 0
272         self._bwtime = time.time()
273
274     def _open(self):
275         if not self._packopen:
276             self._make_objcache()
277             if self.onopen:
278                 self.onopen()
279             self._packopen = True
280
281     def _end(self):
282         if self._packopen and self.file:
283             self.file.write('\0\0\0\0')
284             self._packopen = False
285             while True:
286                 line = self.file.readline().strip()
287                 if line.startswith('index '):
288                     pass
289                 else:
290                     break
291             id = line
292             self.file.check_ok()
293             self.objcache = None
294             if self.onclose:
295                 self.onclose()
296             if id and self.suggest_pack:
297                 self.suggest_pack(id)
298             return id
299
300     def close(self):
301         id = self._end()
302         self.file = None
303         return id
304
305     def abort(self):
306         raise GitError("don't know how to abort remote pack writing")
307
308     def _raw_write(self, datalist, sha):
309         assert(self.file)
310         if not self._packopen:
311             self._open()
312         if self.ensure_busy:
313             self.ensure_busy()
314         data = ''.join(datalist)
315         assert(data)
316         assert(sha)
317         crc = zlib.crc32(data) & 0xffffffff
318         outbuf = ''.join((struct.pack('!I', len(data) + 20 + 4),
319                           sha,
320                           struct.pack('!I', crc),
321                           data))
322         (self._bwcount, self._bwtime) = \
323             _raw_write_bwlimit(self.file, outbuf, self._bwcount, self._bwtime)
324         self.outbytes += len(data) - 20 - 4 # Don't count sha1+crc
325         self.count += 1
326
327         if self.file.has_input():
328             line = self.file.readline().strip()
329             assert(line.startswith('index '))
330             idxname = line[6:]
331             if self.suggest_pack:
332                 self.suggest_pack(idxname)
333                 self.objcache.refresh()
334
335         return sha, crc