]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
bup.client: fix freeze when suggest-index after finishing a full pack.
[bup.git] / lib / bup / client.py
1 import re, struct, errno, select
2 from bup import git
3 from bup.helpers import *
4 from subprocess import Popen, PIPE
5
6
7 class ClientError(Exception):
8     pass
9
10
11 class Client:
12     def __init__(self, remote, create=False):
13         self._busy = None
14         self.p = None
15         self.conn = None
16         rs = remote.split(':', 1)
17         main_exe = os.environ.get('BUP_MAIN_EXE') or sys.argv[0]
18         nicedir = os.path.split(os.path.abspath(main_exe))[0]
19         nicedir = re.sub(r':', "_", nicedir)
20         if len(rs) == 1:
21             (host, dir) = ('NONE', remote)
22             def fixenv():
23                 os.environ['PATH'] = ':'.join([nicedir,
24                                                os.environ.get('PATH', '')])
25             argv = ['bup', 'server']
26         else:
27             (host, dir) = rs
28             fixenv = None
29             # WARNING: shell quoting security holes are possible here, so we
30             # have to be super careful.  We have to use 'sh -c' because
31             # csh-derived shells can't handle PATH= notation.  We can't
32             # set PATH in advance, because ssh probably replaces it.  We
33             # can't exec *safely* using argv, because *both* ssh and 'sh -c'
34             # allow shellquoting.  So we end up having to double-shellquote
35             # stuff here.
36             escapedir = re.sub(r'([^\w/])', r'\\\\\\\1', nicedir)
37             cmd = r"""
38                        sh -c PATH=%s:'$PATH bup server'
39                    """ % escapedir
40             argv = ['ssh', host, '--', cmd.strip()]
41             #log('argv is: %r\n' % argv)
42         def setup():
43             if fixenv:
44                 fixenv()
45             os.setsid()
46         (self.host, self.dir) = (host, dir)
47         self.cachedir = git.repo('index-cache/%s'
48                                  % re.sub(r'[^@\w]', '_', 
49                                           "%s:%s" % (host, dir)))
50         try:
51             self.p = p = Popen(argv, stdin=PIPE, stdout=PIPE, preexec_fn=setup)
52         except OSError, e:
53             raise ClientError, 'exec %r: %s' % (argv[0], e), sys.exc_info()[2]
54         self.conn = conn = Conn(p.stdout, p.stdin)
55         if dir:
56             dir = re.sub(r'[\r\n]', ' ', dir)
57             if create:
58                 conn.write('init-dir %s\n' % dir)
59             else:
60                 conn.write('set-dir %s\n' % dir)
61             self.check_ok()
62         self.sync_indexes_del()
63
64     def __del__(self):
65         try:
66             self.close()
67         except IOError, e:
68             if e.errno == errno.EPIPE:
69                 pass
70             else:
71                 raise
72
73     def close(self):
74         if self.conn and not self._busy:
75             self.conn.write('quit\n')
76         if self.p:
77             self.p.stdin.close()
78             while self.p.stdout.read(65536):
79                 pass
80             self.p.stdout.close()
81             self.p.wait()
82             rv = self.p.wait()
83             if rv:
84                 raise ClientError('server tunnel returned exit code %d' % rv)
85         self.conn = None
86         self.p = None
87
88     def check_ok(self):
89         rv = self.p.poll()
90         if rv != None:
91             raise ClientError('server exited unexpectedly with code %r' % rv)
92         try:
93             return self.conn.check_ok()
94         except Exception, e:
95             raise ClientError, e, sys.exc_info()[2]
96
97     def check_busy(self):
98         if self._busy:
99             raise ClientError('already busy with command %r' % self._busy)
100         
101     def ensure_busy(self):
102         if not self._busy:
103             raise ClientError('expected to be busy, but not busy?!')
104         
105     def _not_busy(self):
106         self._busy = None
107
108     def sync_indexes_del(self):
109         self.check_busy()
110         conn = self.conn
111         conn.write('list-indexes\n')
112         packdir = git.repo('objects/pack')
113         all = {}
114         needed = {}
115         for line in linereader(conn):
116             if not line:
117                 break
118             all[line] = 1
119             assert(line.find('/') < 0)
120             if not os.path.exists(os.path.join(self.cachedir, line)):
121                 needed[line] = 1
122         self.check_ok()
123
124         mkdirp(self.cachedir)
125         for f in os.listdir(self.cachedir):
126             if f.endswith('.idx') and not f in all:
127                 log('pruning old index: %r\n' % f)
128                 os.unlink(os.path.join(self.cachedir, f))
129
130     def sync_index(self, name):
131         #log('requesting %r\n' % name)
132         self.check_busy()
133         mkdirp(self.cachedir)
134         self.conn.write('send-index %s\n' % name)
135         n = struct.unpack('!I', self.conn.read(4))[0]
136         assert(n)
137         fn = os.path.join(self.cachedir, name)
138         f = open(fn + '.tmp', 'w')
139         count = 0
140         progress('Receiving index: %d/%d\r' % (count, n))
141         for b in chunkyreader(self.conn, n):
142             f.write(b)
143             count += len(b)
144             progress('Receiving index: %d/%d\r' % (count, n))
145         progress('Receiving index: %d/%d, done.\n' % (count, n))
146         self.check_ok()
147         f.close()
148         os.rename(fn + '.tmp', fn)
149
150     def _make_objcache(self):
151         ob = self._busy
152         self._busy = None
153         #self.sync_indexes()
154         self._busy = ob
155         return git.PackIdxList(self.cachedir)
156
157     def _suggest_pack(self, indexname):
158         log('received index suggestion: %s\n' % indexname)
159         ob = self._busy
160         if ob:
161             assert(ob == 'receive-objects')
162             self.conn.write('\xff\xff\xff\xff')  # suspend receive-objects
163             self._busy = None
164             self.conn.drain_and_check_ok()
165         self.sync_index(indexname)
166         if ob:
167             self._busy = ob
168             self.conn.write('receive-objects\n')
169
170     def new_packwriter(self):
171         self.check_busy()
172         def _set_busy():
173             self._busy = 'receive-objects'
174             self.conn.write('receive-objects\n')
175         return PackWriter_Remote(self.conn,
176                                  objcache_maker = self._make_objcache,
177                                  suggest_pack = self._suggest_pack,
178                                  onopen = _set_busy,
179                                  onclose = self._not_busy,
180                                  ensure_busy = self.ensure_busy)
181
182     def read_ref(self, refname):
183         self.check_busy()
184         self.conn.write('read-ref %s\n' % refname)
185         r = self.conn.readline().strip()
186         self.check_ok()
187         if r:
188             assert(len(r) == 40)   # hexified sha
189             return r.decode('hex')
190         else:
191             return None   # nonexistent ref
192
193     def update_ref(self, refname, newval, oldval):
194         self.check_busy()
195         self.conn.write('update-ref %s\n%s\n%s\n' 
196                         % (refname, newval.encode('hex'),
197                            (oldval or '').encode('hex')))
198         self.check_ok()
199
200     def cat(self, id):
201         self.check_busy()
202         self._busy = 'cat'
203         self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
204         while 1:
205             sz = struct.unpack('!I', self.conn.read(4))[0]
206             if not sz: break
207             yield self.conn.read(sz)
208         e = self.check_ok()
209         self._not_busy()
210         if e:
211             raise KeyError(str(e))
212
213
214 class PackWriter_Remote(git.PackWriter):
215     def __init__(self, conn, objcache_maker, suggest_pack,
216                  onopen, onclose,
217                  ensure_busy):
218         git.PackWriter.__init__(self, objcache_maker)
219         self.file = conn
220         self.filename = 'remote socket'
221         self.suggest_pack = suggest_pack
222         self.onopen = onopen
223         self.onclose = onclose
224         self.ensure_busy = ensure_busy
225         self._packopen = False
226
227     def _open(self):
228         if not self._packopen:
229             self._make_objcache()
230             if self.onopen:
231                 self.onopen()
232             self._packopen = True
233
234     def _end(self):
235         if self._packopen and self.file:
236             self.file.write('\0\0\0\0')
237             self._packopen = False
238             while True:
239                 line = self.file.readline().strip()
240                 if line.startswith('index '):
241                     pass
242                 else:
243                     break
244             id = line
245             self.file.check_ok()
246             self.objcache = None
247             if self.onclose:
248                 self.onclose()
249             if id and self.suggest_pack:
250                 self.suggest_pack(id)
251             return id
252
253     def close(self):
254         id = self._end()
255         self.file = None
256         return id
257
258     def abort(self):
259         raise GitError("don't know how to abort remote pack writing")
260
261     def _raw_write(self, datalist):
262         assert(self.file)
263         if not self._packopen:
264             self._open()
265         if self.ensure_busy:
266             self.ensure_busy()
267         data = ''.join(datalist)
268         assert(len(data))
269         self.file.write(struct.pack('!I', len(data)) + data)
270         self.outbytes += len(data)
271         self.count += 1
272
273         if self.file.has_input():
274             line = self.file.readline().strip()
275             assert(line.startswith('index '))
276             idxname = line[6:]
277             if self.suggest_pack:
278                 self.suggest_pack(idxname)
279                 self.objcache.refresh()