1 import re, struct, errno, select
2 from bup import git, ssh
3 from bup.helpers import *
6 class ClientError(Exception):
11 def __init__(self, remote, create=False):
12 self._busy = self.conn = self.p = None
13 rs = remote.split(':', 1)
15 (host, dir) = (None, remote)
18 (self.host, self.dir) = (host, dir)
19 self.cachedir = git.repo('index-cache/%s'
20 % re.sub(r'[^@\w]', '_',
21 "%s:%s" % (host, dir)))
23 self.p = ssh.connect(host, 'server')
25 raise ClientError, 'exec %r: %s' % (argv[0], e), sys.exc_info()[2]
26 self.conn = Conn(self.p.stdout, self.p.stdin)
28 dir = re.sub(r'[\r\n]', ' ', dir)
30 self.conn.write('init-dir %s\n' % dir)
32 self.conn.write('set-dir %s\n' % dir)
34 self.sync_indexes_del()
40 if e.errno == errno.EPIPE:
46 if self.conn and not self._busy:
47 self.conn.write('quit\n')
50 while self.p.stdout.read(65536):
56 raise ClientError('server tunnel returned exit code %d' % rv)
63 raise ClientError('server exited unexpectedly with code %r' % rv)
65 return self.conn.check_ok()
67 raise ClientError, e, sys.exc_info()[2]
71 raise ClientError('already busy with command %r' % self._busy)
73 def ensure_busy(self):
75 raise ClientError('expected to be busy, but not busy?!')
80 def sync_indexes_del(self):
83 conn.write('list-indexes\n')
84 packdir = git.repo('objects/pack')
87 for line in linereader(conn):
91 assert(line.find('/') < 0)
92 if not os.path.exists(os.path.join(self.cachedir, line)):
97 for f in os.listdir(self.cachedir):
98 if f.endswith('.idx') and not f in all:
99 log('pruning old index: %r\n' % f)
100 os.unlink(os.path.join(self.cachedir, f))
102 def sync_index(self, name):
103 #log('requesting %r\n' % name)
105 mkdirp(self.cachedir)
106 self.conn.write('send-index %s\n' % name)
107 n = struct.unpack('!I', self.conn.read(4))[0]
109 fn = os.path.join(self.cachedir, name)
110 f = open(fn + '.tmp', 'w')
112 progress('Receiving index: %d/%d\r' % (count, n))
113 for b in chunkyreader(self.conn, n):
116 progress('Receiving index: %d/%d\r' % (count, n))
117 progress('Receiving index: %d/%d, done.\n' % (count, n))
120 os.rename(fn + '.tmp', fn)
122 def _make_objcache(self):
127 return git.PackIdxList(self.cachedir)
129 def _suggest_pack(self, indexname):
130 log('received index suggestion: %s\n' % indexname)
133 assert(ob == 'receive-objects')
134 self.conn.write('\xff\xff\xff\xff') # suspend receive-objects
136 self.conn.drain_and_check_ok()
137 self.sync_index(indexname)
140 self.conn.write('receive-objects\n')
142 def new_packwriter(self):
145 self._busy = 'receive-objects'
146 self.conn.write('receive-objects\n')
147 return PackWriter_Remote(self.conn,
148 objcache_maker = self._make_objcache,
149 suggest_pack = self._suggest_pack,
151 onclose = self._not_busy,
152 ensure_busy = self.ensure_busy)
154 def read_ref(self, refname):
156 self.conn.write('read-ref %s\n' % refname)
157 r = self.conn.readline().strip()
160 assert(len(r) == 40) # hexified sha
161 return r.decode('hex')
163 return None # nonexistent ref
165 def update_ref(self, refname, newval, oldval):
167 self.conn.write('update-ref %s\n%s\n%s\n'
168 % (refname, newval.encode('hex'),
169 (oldval or '').encode('hex')))
175 self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
177 sz = struct.unpack('!I', self.conn.read(4))[0]
179 yield self.conn.read(sz)
183 raise KeyError(str(e))
186 class PackWriter_Remote(git.PackWriter):
187 def __init__(self, conn, objcache_maker, suggest_pack,
190 git.PackWriter.__init__(self, objcache_maker)
192 self.filename = 'remote socket'
193 self.suggest_pack = suggest_pack
195 self.onclose = onclose
196 self.ensure_busy = ensure_busy
197 self._packopen = False
200 if not self._packopen:
201 self._make_objcache()
204 self._packopen = True
207 if self._packopen and self.file:
208 self.file.write('\0\0\0\0')
209 self._packopen = False
211 line = self.file.readline().strip()
212 if line.startswith('index '):
221 if id and self.suggest_pack:
222 self.suggest_pack(id)
231 raise GitError("don't know how to abort remote pack writing")
233 def _raw_write(self, datalist):
235 if not self._packopen:
239 data = ''.join(datalist)
241 self.file.write(struct.pack('!I', len(data)) + data)
242 self.outbytes += len(data)
245 if self.file.has_input():
246 line = self.file.readline().strip()
247 assert(line.startswith('index '))
249 if self.suggest_pack:
250 self.suggest_pack(idxname)
251 self.objcache.refresh()