1 import re, struct, errno, time, zlib
2 from bup import git, ssh
3 from bup.helpers import *
8 class ClientError(Exception):
12 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
15 return (len(buf), time.time())
17 # We want to write in reasonably large blocks, but not so large that
18 # they're likely to overflow a router's queue. So our bwlimit timing
19 # has to be pretty granular. Also, if it takes too long from one
20 # transmit to the next, we can't just make up for lost time to bring
21 # the average back up to bwlimit - that will risk overflowing the
22 # outbound queue, which defeats the purpose. So if we fall behind
23 # by more than one block delay, we shouldn't ever try to catch up.
24 for i in xrange(0,len(buf),4096):
26 next = max(now, bwtime + 1.0*bwcount/bwlimit)
30 bwcount = len(sub) # might be less than 4096
32 return (bwcount, bwtime)
35 def parse_remote(remote):
36 protocol = r'([a-z]+)://'
37 host = r'(?P<sb>\[)?((?(sb)[0-9a-f:]+|[^:/]+))(?(sb)\])'
41 '%s(?:%s%s)?%s' % (protocol, host, port, path), remote, re.I)
43 assert(url_match.group(1) in ('ssh', 'bup', 'file'))
44 return url_match.group(1,3,4,5)
46 rs = remote.split(':', 1)
47 if len(rs) == 1 or rs[0] in ('', '-'):
48 return 'file', None, None, rs[-1]
50 return 'ssh', rs[0], None, rs[1]
54 def __init__(self, remote, create=False):
55 self._busy = self.conn = None
56 self.sock = self.p = self.pout = self.pin = None
57 is_reverse = os.environ.get('BUP_SERVER_REVERSE')
60 remote = '%s:' % is_reverse
61 (self.protocol, self.host, self.port, self.dir) = parse_remote(remote)
62 self.cachedir = git.repo('index-cache/%s'
63 % re.sub(r'[^@\w]', '_',
64 "%s:%s" % (self.host, self.dir)))
66 self.pout = os.fdopen(3, 'rb')
67 self.pin = os.fdopen(4, 'wb')
69 if self.protocol in ('ssh', 'file'):
71 # FIXME: ssh and file shouldn't use the same module
72 self.p = ssh.connect(self.host, self.port, 'server')
73 self.pout = self.p.stdout
74 self.pin = self.p.stdin
76 raise ClientError, 'connect: %s' % e, sys.exc_info()[2]
77 elif self.protocol == 'bup':
78 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
79 self.sock.connect((self.host, self.port or 1982))
80 self.pout = self.sock.makefile('rb')
81 self.pin = self.sock.makefile('wb')
82 self.conn = Conn(self.pout, self.pin)
84 self.dir = re.sub(r'[\r\n]', ' ', self.dir)
86 self.conn.write('init-dir %s\n' % self.dir)
88 self.conn.write('set-dir %s\n' % self.dir)
96 if e.errno == errno.EPIPE:
102 if self.conn and not self._busy:
103 self.conn.write('quit\n')
104 if self.pin and self.pout:
106 while self.pout.read(65536):
115 raise ClientError('server tunnel returned exit code %d' % rv)
117 self.sock = self.p = self.pin = self.pout = None
123 raise ClientError('server exited unexpectedly with code %r'
126 return self.conn.check_ok()
128 raise ClientError, e, sys.exc_info()[2]
130 def check_busy(self):
132 raise ClientError('already busy with command %r' % self._busy)
134 def ensure_busy(self):
136 raise ClientError('expected to be busy, but not busy?!')
141 def sync_indexes(self):
144 mkdirp(self.cachedir)
145 # All cached idxs are extra until proven otherwise
147 for f in os.listdir(self.cachedir):
149 if f.endswith('.idx'):
152 conn.write('list-indexes\n')
153 for line in linereader(conn):
156 assert(line.find('/') < 0)
157 parts = line.split(' ')
159 if len(parts) == 2 and parts[1] == 'load' and idx not in extra:
160 # If the server requests that we load an idx and we don't
161 # already have a copy of it, it is needed
163 # Any idx that the server has heard of is proven not extra
167 debug1('client: removing extra indexes: %s\n' % extra)
169 os.unlink(os.path.join(self.cachedir, idx))
170 debug1('client: server requested load of: %s\n' % needed)
174 def sync_index(self, name):
175 #debug1('requesting %r\n' % name)
177 mkdirp(self.cachedir)
178 self.conn.write('send-index %s\n' % name)
179 n = struct.unpack('!I', self.conn.read(4))[0]
181 fn = os.path.join(self.cachedir, name)
182 f = open(fn + '.tmp', 'w')
184 progress('Receiving index from server: %d/%d\r' % (count, n))
185 for b in chunkyreader(self.conn, n):
188 progress('Receiving index from server: %d/%d\r' % (count, n))
189 progress('Receiving index from server: %d/%d, done.\n' % (count, n))
192 os.rename(fn + '.tmp', fn)
193 git.auto_midx(self.cachedir)
195 def _make_objcache(self):
200 return git.PackIdxList(self.cachedir)
202 def _suggest_pack(self, indexname):
203 debug1('client: received index suggestion: %s\n' % indexname)
206 assert(ob == 'receive-objects-v2')
207 self.conn.write('\xff\xff\xff\xff') # suspend receive-objects
209 self.conn.drain_and_check_ok()
210 self.sync_index(indexname)
213 self.conn.write('receive-objects-v2\n')
215 def new_packwriter(self):
218 self._busy = 'receive-objects-v2'
219 self.conn.write('receive-objects-v2\n')
220 return PackWriter_Remote(self.conn,
221 objcache_maker = self._make_objcache,
222 suggest_pack = self._suggest_pack,
224 onclose = self._not_busy,
225 ensure_busy = self.ensure_busy)
227 def read_ref(self, refname):
229 self.conn.write('read-ref %s\n' % refname)
230 r = self.conn.readline().strip()
233 assert(len(r) == 40) # hexified sha
234 return r.decode('hex')
236 return None # nonexistent ref
238 def update_ref(self, refname, newval, oldval):
240 self.conn.write('update-ref %s\n%s\n%s\n'
241 % (refname, newval.encode('hex'),
242 (oldval or '').encode('hex')))
248 self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
250 sz = struct.unpack('!I', self.conn.read(4))[0]
252 yield self.conn.read(sz)
256 raise KeyError(str(e))
259 class PackWriter_Remote(git.PackWriter):
260 def __init__(self, conn, objcache_maker, suggest_pack,
263 git.PackWriter.__init__(self, objcache_maker)
265 self.filename = 'remote socket'
266 self.suggest_pack = suggest_pack
268 self.onclose = onclose
269 self.ensure_busy = ensure_busy
270 self._packopen = False
272 self._bwtime = time.time()
275 if not self._packopen:
276 self._make_objcache()
279 self._packopen = True
282 if self._packopen and self.file:
283 self.file.write('\0\0\0\0')
284 self._packopen = False
286 line = self.file.readline().strip()
287 if line.startswith('index '):
296 if id and self.suggest_pack:
297 self.suggest_pack(id)
306 raise GitError("don't know how to abort remote pack writing")
308 def _raw_write(self, datalist, sha):
310 if not self._packopen:
314 data = ''.join(datalist)
317 crc = zlib.crc32(data) & 0xffffffff
318 outbuf = ''.join((struct.pack('!I', len(data) + 20 + 4),
320 struct.pack('!I', crc),
322 (self._bwcount, self._bwtime) = \
323 _raw_write_bwlimit(self.file, outbuf, self._bwcount, self._bwtime)
324 self.outbytes += len(data) - 20 - 4 # Don't count sha1+crc
327 if self.file.has_input():
328 line = self.file.readline().strip()
329 assert(line.startswith('index '))
331 if self.suggest_pack:
332 self.suggest_pack(idxname)
333 self.objcache.refresh()