]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
Add optional dumb-server mode
[bup.git] / lib / bup / client.py
1 import re, struct, errno, time, zlib
2 from bup import git, ssh
3 from bup.helpers import *
4
5 bwlimit = None
6
7
8 class ClientError(Exception):
9     pass
10
11
12 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
13     if not bwlimit:
14         f.write(buf)
15         return (len(buf), time.time())
16     else:
17         # We want to write in reasonably large blocks, but not so large that
18         # they're likely to overflow a router's queue.  So our bwlimit timing
19         # has to be pretty granular.  Also, if it takes too long from one
20         # transmit to the next, we can't just make up for lost time to bring
21         # the average back up to bwlimit - that will risk overflowing the
22         # outbound queue, which defeats the purpose.  So if we fall behind
23         # by more than one block delay, we shouldn't ever try to catch up.
24         for i in xrange(0,len(buf),4096):
25             now = time.time()
26             next = max(now, bwtime + 1.0*bwcount/bwlimit)
27             time.sleep(next-now)
28             sub = buf[i:i+4096]
29             f.write(sub)
30             bwcount = len(sub)  # might be less than 4096
31             bwtime = next
32         return (bwcount, bwtime)
33                        
34
35 class Client:
36     def __init__(self, remote, create=False):
37         self._busy = self.conn = self.p = self.pout = self.pin = None
38         is_reverse = os.environ.get('BUP_SERVER_REVERSE')
39         if is_reverse:
40             assert(not remote)
41             remote = '%s:' % is_reverse
42         rs = remote.split(':', 1)
43         if len(rs) == 1:
44             (host, dir) = (None, remote)
45         else:
46             (host, dir) = rs
47         (self.host, self.dir) = (host, dir)
48         self.cachedir = git.repo('index-cache/%s'
49                                  % re.sub(r'[^@\w]', '_', 
50                                           "%s:%s" % (host, dir)))
51         if is_reverse:
52             self.pout = os.fdopen(3, 'rb')
53             self.pin = os.fdopen(4, 'wb')
54         else:
55             try:
56                 self.p = ssh.connect(host, 'server')
57                 self.pout = self.p.stdout
58                 self.pin = self.p.stdin
59             except OSError, e:
60                 raise ClientError, 'connect: %s' % e, sys.exc_info()[2]
61         self.conn = Conn(self.pout, self.pin)
62         if dir:
63             dir = re.sub(r'[\r\n]', ' ', dir)
64             if create:
65                 self.conn.write('init-dir %s\n' % dir)
66             else:
67                 self.conn.write('set-dir %s\n' % dir)
68             self.check_ok()
69         self.sync_indexes()
70
71     def __del__(self):
72         try:
73             self.close()
74         except IOError, e:
75             if e.errno == errno.EPIPE:
76                 pass
77             else:
78                 raise
79
80     def close(self):
81         if self.conn and not self._busy:
82             self.conn.write('quit\n')
83         if self.pin and self.pout:
84             self.pin.close()
85             while self.pout.read(65536):
86                 pass
87             self.pout.close()
88         if self.p:
89             self.p.wait()
90             rv = self.p.wait()
91             if rv:
92                 raise ClientError('server tunnel returned exit code %d' % rv)
93         self.conn = None
94         self.p = self.pin = self.pout = None
95
96     def check_ok(self):
97         if self.p:
98             rv = self.p.poll()
99             if rv != None:
100                 raise ClientError('server exited unexpectedly with code %r'
101                                   % rv)
102         try:
103             return self.conn.check_ok()
104         except Exception, e:
105             raise ClientError, e, sys.exc_info()[2]
106
107     def check_busy(self):
108         if self._busy:
109             raise ClientError('already busy with command %r' % self._busy)
110         
111     def ensure_busy(self):
112         if not self._busy:
113             raise ClientError('expected to be busy, but not busy?!')
114         
115     def _not_busy(self):
116         self._busy = None
117
118     def sync_indexes(self):
119         self.check_busy()
120         conn = self.conn
121         mkdirp(self.cachedir)
122         # All cached idxs are extra until proven otherwise
123         extra = set()
124         for f in os.listdir(self.cachedir):
125             debug1('%s\n' % f)
126             if f.endswith('.idx'):
127                 extra.add(f)
128         needed = set()
129         conn.write('list-indexes\n')
130         for line in linereader(conn):
131             if not line:
132                 break
133             assert(line.find('/') < 0)
134             parts = line.split(' ')
135             idx = parts[0]
136             if len(parts) == 2 and parts[1] == 'load' and idx not in extra:
137                 # If the server requests that we load an idx and we don't
138                 # already have a copy of it, it is needed
139                 needed.add(idx)
140             # Any idx that the server has heard of is proven not extra
141             extra.discard(idx)
142
143         self.check_ok()
144         debug1('client: removing extra indexes: %s\n' % extra)
145         for idx in extra:
146             os.unlink(os.path.join(self.cachedir, idx))
147         debug1('client: server requested load of: %s\n' % needed)
148         for idx in needed:
149             self.sync_index(idx)
150
151     def sync_index(self, name):
152         #debug1('requesting %r\n' % name)
153         self.check_busy()
154         mkdirp(self.cachedir)
155         self.conn.write('send-index %s\n' % name)
156         n = struct.unpack('!I', self.conn.read(4))[0]
157         assert(n)
158         fn = os.path.join(self.cachedir, name)
159         f = open(fn + '.tmp', 'w')
160         count = 0
161         progress('Receiving index from server: %d/%d\r' % (count, n))
162         for b in chunkyreader(self.conn, n):
163             f.write(b)
164             count += len(b)
165             progress('Receiving index from server: %d/%d\r' % (count, n))
166         progress('Receiving index from server: %d/%d, done.\n' % (count, n))
167         self.check_ok()
168         f.close()
169         os.rename(fn + '.tmp', fn)
170         git.auto_midx(self.cachedir)
171
172     def _make_objcache(self):
173         ob = self._busy
174         self._busy = None
175         #self.sync_indexes()
176         self._busy = ob
177         return git.PackIdxList(self.cachedir)
178
179     def _suggest_pack(self, indexname):
180         debug1('client: received index suggestion: %s\n' % indexname)
181         ob = self._busy
182         if ob:
183             assert(ob == 'receive-objects-v2')
184             self.conn.write('\xff\xff\xff\xff')  # suspend receive-objects
185             self._busy = None
186             self.conn.drain_and_check_ok()
187         self.sync_index(indexname)
188         if ob:
189             self._busy = ob
190             self.conn.write('receive-objects-v2\n')
191
192     def new_packwriter(self):
193         self.check_busy()
194         def _set_busy():
195             self._busy = 'receive-objects-v2'
196             self.conn.write('receive-objects-v2\n')
197         return PackWriter_Remote(self.conn,
198                                  objcache_maker = self._make_objcache,
199                                  suggest_pack = self._suggest_pack,
200                                  onopen = _set_busy,
201                                  onclose = self._not_busy,
202                                  ensure_busy = self.ensure_busy)
203
204     def read_ref(self, refname):
205         self.check_busy()
206         self.conn.write('read-ref %s\n' % refname)
207         r = self.conn.readline().strip()
208         self.check_ok()
209         if r:
210             assert(len(r) == 40)   # hexified sha
211             return r.decode('hex')
212         else:
213             return None   # nonexistent ref
214
215     def update_ref(self, refname, newval, oldval):
216         self.check_busy()
217         self.conn.write('update-ref %s\n%s\n%s\n' 
218                         % (refname, newval.encode('hex'),
219                            (oldval or '').encode('hex')))
220         self.check_ok()
221
222     def cat(self, id):
223         self.check_busy()
224         self._busy = 'cat'
225         self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
226         while 1:
227             sz = struct.unpack('!I', self.conn.read(4))[0]
228             if not sz: break
229             yield self.conn.read(sz)
230         e = self.check_ok()
231         self._not_busy()
232         if e:
233             raise KeyError(str(e))
234
235
236 class PackWriter_Remote(git.PackWriter):
237     def __init__(self, conn, objcache_maker, suggest_pack,
238                  onopen, onclose,
239                  ensure_busy):
240         git.PackWriter.__init__(self, objcache_maker)
241         self.file = conn
242         self.filename = 'remote socket'
243         self.suggest_pack = suggest_pack
244         self.onopen = onopen
245         self.onclose = onclose
246         self.ensure_busy = ensure_busy
247         self._packopen = False
248         self._bwcount = 0
249         self._bwtime = time.time()
250
251     def _open(self):
252         if not self._packopen:
253             self._make_objcache()
254             if self.onopen:
255                 self.onopen()
256             self._packopen = True
257
258     def _end(self):
259         if self._packopen and self.file:
260             self.file.write('\0\0\0\0')
261             self._packopen = False
262             while True:
263                 line = self.file.readline().strip()
264                 if line.startswith('index '):
265                     pass
266                 else:
267                     break
268             id = line
269             self.file.check_ok()
270             self.objcache = None
271             if self.onclose:
272                 self.onclose()
273             if id and self.suggest_pack:
274                 self.suggest_pack(id)
275             return id
276
277     def close(self):
278         id = self._end()
279         self.file = None
280         return id
281
282     def abort(self):
283         raise GitError("don't know how to abort remote pack writing")
284
285     def _raw_write(self, datalist, sha):
286         assert(self.file)
287         if not self._packopen:
288             self._open()
289         if self.ensure_busy:
290             self.ensure_busy()
291         data = ''.join(datalist)
292         assert(data)
293         assert(sha)
294         crc = zlib.crc32(data) & 0xffffffff
295         outbuf = ''.join((struct.pack('!I', len(data) + 20 + 4),
296                           sha,
297                           struct.pack('!I', crc),
298                           data))
299         (self._bwcount, self._bwtime) = \
300             _raw_write_bwlimit(self.file, outbuf, self._bwcount, self._bwtime)
301         self.outbytes += len(data) - 20 - 4 # Don't count sha1+crc
302         self.count += 1
303
304         if self.file.has_input():
305             line = self.file.readline().strip()
306             assert(line.startswith('index '))
307             idxname = line[6:]
308             if self.suggest_pack:
309                 self.suggest_pack(idxname)
310                 self.objcache.refresh()
311
312         return sha, crc