]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
client.py: raising a particular rare exception caused a syntax error.
[bup.git] / lib / bup / client.py
1 import re, struct, errno, select, time
2 from bup import git, ssh
3 from bup.helpers import *
4
5 bwlimit = None
6
7
8 class ClientError(Exception):
9     pass
10
11
12 def _raw_write_bwlimit(f, buf, bwcount, bwtime):
13     if not bwlimit:
14         f.write(buf)
15         return (len(buf), time.time())
16     else:
17         # We want to write in reasonably large blocks, but not so large that
18         # they're likely to overflow a router's queue.  So our bwlimit timing
19         # has to be pretty granular.  Also, if it takes too long from one
20         # transmit to the next, we can't just make up for lost time to bring
21         # the average back up to bwlimit - that will risk overflowing the
22         # outbound queue, which defeats the purpose.  So if we fall behind
23         # by more than one block delay, we shouldn't ever try to catch up.
24         for i in xrange(0,len(buf),4096):
25             now = time.time()
26             next = max(now, bwtime + 1.0*bwcount/bwlimit)
27             time.sleep(next-now)
28             sub = buf[i:i+4096]
29             f.write(sub)
30             bwcount = len(sub)  # might be less than 4096
31             bwtime = next
32         return (bwcount, bwtime)
33                        
34
35 class Client:
36     def __init__(self, remote, create=False):
37         self._busy = self.conn = self.p = self.pout = self.pin = None
38         is_reverse = os.environ.get('BUP_SERVER_REVERSE')
39         if is_reverse:
40             assert(not remote)
41             remote = '%s:' % is_reverse
42         rs = remote.split(':', 1)
43         if len(rs) == 1:
44             (host, dir) = (None, remote)
45         else:
46             (host, dir) = rs
47         (self.host, self.dir) = (host, dir)
48         self.cachedir = git.repo('index-cache/%s'
49                                  % re.sub(r'[^@\w]', '_', 
50                                           "%s:%s" % (host, dir)))
51         if is_reverse:
52             self.pout = os.fdopen(3, 'rb')
53             self.pin = os.fdopen(4, 'wb')
54         else:
55             try:
56                 self.p = ssh.connect(host, 'server')
57                 self.pout = self.p.stdout
58                 self.pin = self.p.stdin
59             except OSError, e:
60                 raise ClientError, 'connect: %s' % e, sys.exc_info()[2]
61         self.conn = Conn(self.pout, self.pin)
62         if dir:
63             dir = re.sub(r'[\r\n]', ' ', dir)
64             if create:
65                 self.conn.write('init-dir %s\n' % dir)
66             else:
67                 self.conn.write('set-dir %s\n' % dir)
68             self.check_ok()
69         self.sync_indexes_del()
70
71     def __del__(self):
72         try:
73             self.close()
74         except IOError, e:
75             if e.errno == errno.EPIPE:
76                 pass
77             else:
78                 raise
79
80     def close(self):
81         if self.conn and not self._busy:
82             self.conn.write('quit\n')
83         if self.pin and self.pout:
84             self.pin.close()
85             while self.pout.read(65536):
86                 pass
87             self.pout.close()
88         if self.p:
89             self.p.wait()
90             rv = self.p.wait()
91             if rv:
92                 raise ClientError('server tunnel returned exit code %d' % rv)
93         self.conn = None
94         self.p = self.pin = self.pout = None
95
96     def check_ok(self):
97         if self.p:
98             rv = self.p.poll()
99             if rv != None:
100                 raise ClientError('server exited unexpectedly with code %r'
101                                   % rv)
102         try:
103             return self.conn.check_ok()
104         except Exception, e:
105             raise ClientError, e, sys.exc_info()[2]
106
107     def check_busy(self):
108         if self._busy:
109             raise ClientError('already busy with command %r' % self._busy)
110         
111     def ensure_busy(self):
112         if not self._busy:
113             raise ClientError('expected to be busy, but not busy?!')
114         
115     def _not_busy(self):
116         self._busy = None
117
118     def sync_indexes_del(self):
119         self.check_busy()
120         conn = self.conn
121         conn.write('list-indexes\n')
122         packdir = git.repo('objects/pack')
123         all = {}
124         needed = {}
125         for line in linereader(conn):
126             if not line:
127                 break
128             all[line] = 1
129             assert(line.find('/') < 0)
130             if not os.path.exists(os.path.join(self.cachedir, line)):
131                 needed[line] = 1
132         self.check_ok()
133
134         mkdirp(self.cachedir)
135         for f in os.listdir(self.cachedir):
136             if f.endswith('.idx') and not f in all:
137                 log('pruning old index: %r\n' % f)
138                 os.unlink(os.path.join(self.cachedir, f))
139
140     def sync_index(self, name):
141         #log('requesting %r\n' % name)
142         self.check_busy()
143         mkdirp(self.cachedir)
144         self.conn.write('send-index %s\n' % name)
145         n = struct.unpack('!I', self.conn.read(4))[0]
146         assert(n)
147         fn = os.path.join(self.cachedir, name)
148         f = open(fn + '.tmp', 'w')
149         count = 0
150         progress('Receiving index: %d/%d\r' % (count, n))
151         for b in chunkyreader(self.conn, n):
152             f.write(b)
153             count += len(b)
154             progress('Receiving index: %d/%d\r' % (count, n))
155         progress('Receiving index: %d/%d, done.\n' % (count, n))
156         self.check_ok()
157         f.close()
158         os.rename(fn + '.tmp', fn)
159
160     def _make_objcache(self):
161         ob = self._busy
162         self._busy = None
163         #self.sync_indexes()
164         self._busy = ob
165         return git.PackIdxList(self.cachedir)
166
167     def _suggest_pack(self, indexname):
168         log('received index suggestion: %s\n' % indexname)
169         ob = self._busy
170         if ob:
171             assert(ob == 'receive-objects')
172             self.conn.write('\xff\xff\xff\xff')  # suspend receive-objects
173             self._busy = None
174             self.conn.drain_and_check_ok()
175         self.sync_index(indexname)
176         if ob:
177             self._busy = ob
178             self.conn.write('receive-objects\n')
179
180     def new_packwriter(self):
181         self.check_busy()
182         def _set_busy():
183             self._busy = 'receive-objects'
184             self.conn.write('receive-objects\n')
185         return PackWriter_Remote(self.conn,
186                                  objcache_maker = self._make_objcache,
187                                  suggest_pack = self._suggest_pack,
188                                  onopen = _set_busy,
189                                  onclose = self._not_busy,
190                                  ensure_busy = self.ensure_busy)
191
192     def read_ref(self, refname):
193         self.check_busy()
194         self.conn.write('read-ref %s\n' % refname)
195         r = self.conn.readline().strip()
196         self.check_ok()
197         if r:
198             assert(len(r) == 40)   # hexified sha
199             return r.decode('hex')
200         else:
201             return None   # nonexistent ref
202
203     def update_ref(self, refname, newval, oldval):
204         self.check_busy()
205         self.conn.write('update-ref %s\n%s\n%s\n' 
206                         % (refname, newval.encode('hex'),
207                            (oldval or '').encode('hex')))
208         self.check_ok()
209
210     def cat(self, id):
211         self.check_busy()
212         self._busy = 'cat'
213         self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
214         while 1:
215             sz = struct.unpack('!I', self.conn.read(4))[0]
216             if not sz: break
217             yield self.conn.read(sz)
218         e = self.check_ok()
219         self._not_busy()
220         if e:
221             raise KeyError(str(e))
222
223
224 class PackWriter_Remote(git.PackWriter):
225     def __init__(self, conn, objcache_maker, suggest_pack,
226                  onopen, onclose,
227                  ensure_busy):
228         git.PackWriter.__init__(self, objcache_maker)
229         self.file = conn
230         self.filename = 'remote socket'
231         self.suggest_pack = suggest_pack
232         self.onopen = onopen
233         self.onclose = onclose
234         self.ensure_busy = ensure_busy
235         self._packopen = False
236         self._bwcount = 0
237         self._bwtime = time.time()
238
239     def _open(self):
240         if not self._packopen:
241             self._make_objcache()
242             if self.onopen:
243                 self.onopen()
244             self._packopen = True
245
246     def _end(self):
247         if self._packopen and self.file:
248             self.file.write('\0\0\0\0')
249             self._packopen = False
250             while True:
251                 line = self.file.readline().strip()
252                 if line.startswith('index '):
253                     pass
254                 else:
255                     break
256             id = line
257             self.file.check_ok()
258             self.objcache = None
259             if self.onclose:
260                 self.onclose()
261             if id and self.suggest_pack:
262                 self.suggest_pack(id)
263             return id
264
265     def close(self):
266         id = self._end()
267         self.file = None
268         return id
269
270     def abort(self):
271         raise GitError("don't know how to abort remote pack writing")
272
273     def _raw_write(self, datalist):
274         assert(self.file)
275         if not self._packopen:
276             self._open()
277         if self.ensure_busy:
278             self.ensure_busy()
279         data = ''.join(datalist)
280         assert(len(data))
281         outbuf = struct.pack('!I', len(data)) + data
282         (self._bwcount, self._bwtime) = \
283             _raw_write_bwlimit(self.file, outbuf, self._bwcount, self._bwtime)
284         self.outbytes += len(data)
285         self.count += 1
286
287         if self.file.has_input():
288             line = self.file.readline().strip()
289             assert(line.startswith('index '))
290             idxname = line[6:]
291             if self.suggest_pack:
292                 self.suggest_pack(idxname)
293                 self.objcache.refresh()