1 import re, struct, errno, time, zlib
2 from bup import git, ssh
3 from bup.helpers import *
8 class ClientError(Exception):
12 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
15 return (len(buf), time.time())
17 # We want to write in reasonably large blocks, but not so large that
18 # they're likely to overflow a router's queue. So our bwlimit timing
19 # has to be pretty granular. Also, if it takes too long from one
20 # transmit to the next, we can't just make up for lost time to bring
21 # the average back up to bwlimit - that will risk overflowing the
22 # outbound queue, which defeats the purpose. So if we fall behind
23 # by more than one block delay, we shouldn't ever try to catch up.
24 for i in xrange(0,len(buf),4096):
26 next = max(now, bwtime + 1.0*bwcount/bwlimit)
30 bwcount = len(sub) # might be less than 4096
32 return (bwcount, bwtime)
36 def __init__(self, remote, create=False):
37 self._busy = self.conn = self.p = self.pout = self.pin = None
38 is_reverse = os.environ.get('BUP_SERVER_REVERSE')
41 remote = '%s:' % is_reverse
42 rs = remote.split(':', 1)
44 (host, dir) = (None, remote)
47 (self.host, self.dir) = (host, dir)
48 self.cachedir = git.repo('index-cache/%s'
49 % re.sub(r'[^@\w]', '_',
50 "%s:%s" % (host, dir)))
52 self.pout = os.fdopen(3, 'rb')
53 self.pin = os.fdopen(4, 'wb')
56 self.p = ssh.connect(host, 'server')
57 self.pout = self.p.stdout
58 self.pin = self.p.stdin
60 raise ClientError, 'connect: %s' % e, sys.exc_info()[2]
61 self.conn = Conn(self.pout, self.pin)
63 dir = re.sub(r'[\r\n]', ' ', dir)
65 self.conn.write('init-dir %s\n' % dir)
67 self.conn.write('set-dir %s\n' % dir)
75 if e.errno == errno.EPIPE:
81 if self.conn and not self._busy:
82 self.conn.write('quit\n')
83 if self.pin and self.pout:
85 while self.pout.read(65536):
92 raise ClientError('server tunnel returned exit code %d' % rv)
94 self.p = self.pin = self.pout = None
100 raise ClientError('server exited unexpectedly with code %r'
103 return self.conn.check_ok()
105 raise ClientError, e, sys.exc_info()[2]
107 def check_busy(self):
109 raise ClientError('already busy with command %r' % self._busy)
111 def ensure_busy(self):
113 raise ClientError('expected to be busy, but not busy?!')
118 def sync_indexes(self):
121 mkdirp(self.cachedir)
122 # All cached idxs are extra until proven otherwise
124 for f in os.listdir(self.cachedir):
126 if f.endswith('.idx'):
129 conn.write('list-indexes\n')
130 for line in linereader(conn):
133 assert(line.find('/') < 0)
134 parts = line.split(' ')
136 if len(parts) == 2 and parts[1] == 'load' and idx not in extra:
137 # If the server requests that we load an idx and we don't
138 # already have a copy of it, it is needed
140 # Any idx that the server has heard of is proven not extra
144 debug1('client: removing extra indexes: %s\n' % extra)
146 os.unlink(os.path.join(self.cachedir, idx))
147 debug1('client: server requested load of: %s\n' % needed)
151 def sync_index(self, name):
152 #debug1('requesting %r\n' % name)
154 mkdirp(self.cachedir)
155 self.conn.write('send-index %s\n' % name)
156 n = struct.unpack('!I', self.conn.read(4))[0]
158 fn = os.path.join(self.cachedir, name)
159 f = open(fn + '.tmp', 'w')
161 progress('Receiving index from server: %d/%d\r' % (count, n))
162 for b in chunkyreader(self.conn, n):
165 progress('Receiving index from server: %d/%d\r' % (count, n))
166 progress('Receiving index from server: %d/%d, done.\n' % (count, n))
169 os.rename(fn + '.tmp', fn)
170 git.auto_midx(self.cachedir)
172 def _make_objcache(self):
177 return git.PackIdxList(self.cachedir)
179 def _suggest_pack(self, indexname):
180 debug1('client: received index suggestion: %s\n' % indexname)
183 assert(ob == 'receive-objects-v2')
184 self.conn.write('\xff\xff\xff\xff') # suspend receive-objects
186 self.conn.drain_and_check_ok()
187 self.sync_index(indexname)
190 self.conn.write('receive-objects-v2\n')
192 def new_packwriter(self):
195 self._busy = 'receive-objects-v2'
196 self.conn.write('receive-objects-v2\n')
197 return PackWriter_Remote(self.conn,
198 objcache_maker = self._make_objcache,
199 suggest_pack = self._suggest_pack,
201 onclose = self._not_busy,
202 ensure_busy = self.ensure_busy)
204 def read_ref(self, refname):
206 self.conn.write('read-ref %s\n' % refname)
207 r = self.conn.readline().strip()
210 assert(len(r) == 40) # hexified sha
211 return r.decode('hex')
213 return None # nonexistent ref
215 def update_ref(self, refname, newval, oldval):
217 self.conn.write('update-ref %s\n%s\n%s\n'
218 % (refname, newval.encode('hex'),
219 (oldval or '').encode('hex')))
225 self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
227 sz = struct.unpack('!I', self.conn.read(4))[0]
229 yield self.conn.read(sz)
233 raise KeyError(str(e))
236 class PackWriter_Remote(git.PackWriter):
237 def __init__(self, conn, objcache_maker, suggest_pack,
240 git.PackWriter.__init__(self, objcache_maker)
242 self.filename = 'remote socket'
243 self.suggest_pack = suggest_pack
245 self.onclose = onclose
246 self.ensure_busy = ensure_busy
247 self._packopen = False
249 self._bwtime = time.time()
252 if not self._packopen:
253 self._make_objcache()
256 self._packopen = True
259 if self._packopen and self.file:
260 self.file.write('\0\0\0\0')
261 self._packopen = False
263 line = self.file.readline().strip()
264 if line.startswith('index '):
273 if id and self.suggest_pack:
274 self.suggest_pack(id)
283 raise GitError("don't know how to abort remote pack writing")
285 def _raw_write(self, datalist, sha):
287 if not self._packopen:
291 data = ''.join(datalist)
294 crc = zlib.crc32(data) & 0xffffffff
295 outbuf = ''.join((struct.pack('!I', len(data) + 20 + 4),
297 struct.pack('!I', crc),
299 (self._bwcount, self._bwtime) = \
300 _raw_write_bwlimit(self.file, outbuf, self._bwcount, self._bwtime)
301 self.outbytes += len(data) - 20 - 4 # Don't count sha1+crc
304 if self.file.has_input():
305 line = self.file.readline().strip()
306 assert(line.startswith('index '))
308 if self.suggest_pack:
309 self.suggest_pack(idxname)
310 self.objcache.refresh()