]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
Correctly pass along SIGINT to child processes.
[bup.git] / lib / bup / client.py
1 import re, struct, errno, select
2 from bup import git
3 from bup.helpers import *
4 from subprocess import Popen, PIPE
5
6
7 class ClientError(Exception):
8     pass
9
10
11 class Client:
12     def __init__(self, remote, create=False):
13         self._busy = None
14         self.p = None
15         self.conn = None
16         rs = remote.split(':', 1)
17         main_exe = os.environ.get('BUP_MAIN_EXE') or sys.argv[0]
18         nicedir = os.path.split(os.path.abspath(main_exe))[0]
19         nicedir = re.sub(r':', "_", nicedir)
20         if len(rs) == 1:
21             (host, dir) = ('NONE', remote)
22             def fixenv():
23                 os.environ['PATH'] = ':'.join([nicedir,
24                                                os.environ.get('PATH', '')])
25             argv = ['bup', 'server']
26         else:
27             (host, dir) = rs
28             fixenv = None
29             # WARNING: shell quoting security holes are possible here, so we
30             # have to be super careful.  We have to use 'sh -c' because
31             # csh-derived shells can't handle PATH= notation.  We can't
32             # set PATH in advance, because ssh probably replaces it.  We
33             # can't exec *safely* using argv, because *both* ssh and 'sh -c'
34             # allow shellquoting.  So we end up having to double-shellquote
35             # stuff here.
36             escapedir = re.sub(r'([^\w/])', r'\\\\\\\1', nicedir)
37             cmd = r"""
38                        sh -c PATH=%s:'$PATH bup server'
39                    """ % escapedir
40             argv = ['ssh', host, '--', cmd.strip()]
41             #log('argv is: %r\n' % argv)
42         def setup():
43             if fixenv:
44                 fixenv()
45             os.setsid()
46         (self.host, self.dir) = (host, dir)
47         self.cachedir = git.repo('index-cache/%s'
48                                  % re.sub(r'[^@\w]', '_', 
49                                           "%s:%s" % (host, dir)))
50         try:
51             self.p = p = Popen(argv, stdin=PIPE, stdout=PIPE, preexec_fn=setup)
52         except OSError, e:
53             raise ClientError, 'exec %r: %s' % (argv[0], e), sys.exc_info()[2]
54         self.conn = conn = Conn(p.stdout, p.stdin)
55         if dir:
56             dir = re.sub(r'[\r\n]', ' ', dir)
57             if create:
58                 conn.write('init-dir %s\n' % dir)
59             else:
60                 conn.write('set-dir %s\n' % dir)
61             self.check_ok()
62         self.sync_indexes_del()
63
64     def __del__(self):
65         try:
66             self.close()
67         except IOError, e:
68             if e.errno == errno.EPIPE:
69                 pass
70             else:
71                 raise
72
73     def close(self):
74         if self.conn and not self._busy:
75             self.conn.write('quit\n')
76         if self.p:
77             self.p.stdin.close()
78             while self.p.stdout.read(65536):
79                 pass
80             self.p.stdout.close()
81             self.p.wait()
82             rv = self.p.wait()
83             if rv:
84                 raise ClientError('server tunnel returned exit code %d' % rv)
85         self.conn = None
86         self.p = None
87
88     def check_ok(self):
89         rv = self.p.poll()
90         if rv != None:
91             raise ClientError('server exited unexpectedly with code %r' % rv)
92         try:
93             return self.conn.check_ok()
94         except Exception, e:
95             raise ClientError, e, sys.exc_info()[2]
96
97     def check_busy(self):
98         if self._busy:
99             raise ClientError('already busy with command %r' % self._busy)
100         
101     def _not_busy(self):
102         self._busy = None
103
104     def sync_indexes_del(self):
105         self.check_busy()
106         conn = self.conn
107         conn.write('list-indexes\n')
108         packdir = git.repo('objects/pack')
109         all = {}
110         needed = {}
111         for line in linereader(conn):
112             if not line:
113                 break
114             all[line] = 1
115             assert(line.find('/') < 0)
116             if not os.path.exists(os.path.join(self.cachedir, line)):
117                 needed[line] = 1
118         self.check_ok()
119
120         mkdirp(self.cachedir)
121         for f in os.listdir(self.cachedir):
122             if f.endswith('.idx') and not f in all:
123                 log('pruning old index: %r\n' % f)
124                 os.unlink(os.path.join(self.cachedir, f))
125
126     def sync_index(self, name):
127         #log('requesting %r\n' % name)
128         mkdirp(self.cachedir)
129         self.conn.write('send-index %s\n' % name)
130         n = struct.unpack('!I', self.conn.read(4))[0]
131         assert(n)
132         fn = os.path.join(self.cachedir, name)
133         f = open(fn + '.tmp', 'w')
134         count = 0
135         progress('Receiving index: %d/%d\r' % (count, n))
136         for b in chunkyreader(self.conn, n):
137             f.write(b)
138             count += len(b)
139             progress('Receiving index: %d/%d\r' % (count, n))
140         progress('Receiving index: %d/%d, done.\n' % (count, n))
141         self.check_ok()
142         f.close()
143         os.rename(fn + '.tmp', fn)
144
145     def _make_objcache(self):
146         ob = self._busy
147         self._busy = None
148         #self.sync_indexes()
149         self._busy = ob
150         return git.PackIdxList(self.cachedir)
151
152     def _suggest_pack(self, indexname):
153         log('received index suggestion: %s\n' % indexname)
154         ob = self._busy
155         if ob:
156             assert(ob == 'receive-objects')
157             self._busy = None
158             self.conn.write('\xff\xff\xff\xff')  # suspend receive-objects
159             self.conn.drain_and_check_ok()
160         self.sync_index(indexname)
161         if ob:
162             self.conn.write('receive-objects\n')
163             self._busy = ob
164
165     def new_packwriter(self):
166         self.check_busy()
167         self._busy = 'receive-objects'
168         return PackWriter_Remote(self.conn,
169                                  objcache_maker = self._make_objcache,
170                                  suggest_pack = self._suggest_pack,
171                                  onclose = self._not_busy)
172
173     def read_ref(self, refname):
174         self.check_busy()
175         self.conn.write('read-ref %s\n' % refname)
176         r = self.conn.readline().strip()
177         self.check_ok()
178         if r:
179             assert(len(r) == 40)   # hexified sha
180             return r.decode('hex')
181         else:
182             return None   # nonexistent ref
183
184     def update_ref(self, refname, newval, oldval):
185         self.check_busy()
186         self.conn.write('update-ref %s\n%s\n%s\n' 
187                         % (refname, newval.encode('hex'),
188                            (oldval or '').encode('hex')))
189         self.check_ok()
190
191     def cat(self, id):
192         self.check_busy()
193         self._busy = 'cat'
194         self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
195         while 1:
196             sz = struct.unpack('!I', self.conn.read(4))[0]
197             if not sz: break
198             yield self.conn.read(sz)
199         e = self.check_ok()
200         self._not_busy()
201         if e:
202             raise KeyError(str(e))
203
204
205 class PackWriter_Remote(git.PackWriter):
206     def __init__(self, conn, objcache_maker, suggest_pack, onclose):
207         git.PackWriter.__init__(self, objcache_maker)
208         self.file = conn
209         self.filename = 'remote socket'
210         self.suggest_pack = suggest_pack
211         self.onclose = onclose
212         self._packopen = False
213
214     def _open(self):
215         if not self._packopen:
216             self._make_objcache()
217             self.file.write('receive-objects\n')
218             self._packopen = True
219
220     def _end(self):
221         if self._packopen and self.file:
222             self.file.write('\0\0\0\0')
223             self._packopen = False
224             while True:
225                 line = self.file.readline().strip()
226                 if line.startswith('index '):
227                     pass
228                 else:
229                     break
230             id = line
231             self.file.check_ok()
232             self.objcache = None
233             if self.onclose:
234                 self.onclose()
235             if self.suggest_pack:
236                 self.suggest_pack(id)
237             return id
238
239     def close(self):
240         id = self._end()
241         self.file = None
242         return id
243
244     def abort(self):
245         raise GitError("don't know how to abort remote pack writing")
246
247     def _raw_write(self, datalist):
248         assert(self.file)
249         if not self._packopen:
250             self._open()
251         data = ''.join(datalist)
252         assert(len(data))
253         self.file.write(struct.pack('!I', len(data)) + data)
254         self.outbytes += len(data)
255         self.count += 1
256
257         if self.file.has_input():
258             line = self.file.readline().strip()
259             assert(line.startswith('index '))
260             idxname = line[6:]
261             if self.suggest_pack:
262                 self.suggest_pack(idxname)
263                 self.objcache.refresh()