]> arthur.barton.de Git - bup.git/blob - client.py
Wrap mmap calls to help with portability.
[bup.git] / client.py
1 import re, struct, errno, select
2 import git
3 from helpers import *
4 from subprocess import Popen, PIPE
5
6
7 class ClientError(Exception):
8     pass
9
10
11 class Client:
12     def __init__(self, remote, create=False):
13         self._busy = None
14         self.p = None
15         self.conn = None
16         rs = remote.split(':', 1)
17         nicedir = os.path.split(os.path.abspath(sys.argv[0]))[0]
18         nicedir = re.sub(r':', "_", nicedir)
19         if len(rs) == 1:
20             (host, dir) = ('NONE', remote)
21             def fixenv():
22                 os.environ['PATH'] = ':'.join([nicedir,
23                                                os.environ.get('PATH', '')])
24             argv = ['bup', 'server']
25         else:
26             (host, dir) = rs
27             fixenv = None
28             # WARNING: shell quoting security holes are possible here, so we
29             # have to be super careful.  We have to use 'sh -c' because
30             # csh-derived shells can't handle PATH= notation.  We can't
31             # set PATH in advance, because ssh probably replaces it.  We
32             # can't exec *safely* using argv, because *both* ssh and 'sh -c'
33             # allow shellquoting.  So we end up having to double-shellquote
34             # stuff here.
35             escapedir = re.sub(r'([^\w/])', r'\\\\\\\1', nicedir)
36             cmd = r"""
37                        sh -c PATH=%s:'$PATH bup server'
38                    """ % escapedir
39             argv = ['ssh', host, '--', cmd.strip()]
40             #log('argv is: %r\n' % argv)
41         (self.host, self.dir) = (host, dir)
42         self.cachedir = git.repo('index-cache/%s'
43                                  % re.sub(r'[^@\w]', '_', 
44                                           "%s:%s" % (host, dir)))
45         try:
46             self.p = p = Popen(argv, stdin=PIPE, stdout=PIPE, preexec_fn=fixenv)
47         except OSError, e:
48             raise ClientError, 'exec %r: %s' % (argv[0], e), sys.exc_info()[2]
49         self.conn = conn = Conn(p.stdout, p.stdin)
50         if dir:
51             dir = re.sub(r'[\r\n]', ' ', dir)
52             if create:
53                 conn.write('init-dir %s\n' % dir)
54             else:
55                 conn.write('set-dir %s\n' % dir)
56             self.check_ok()
57         self.sync_indexes_del()
58
59     def __del__(self):
60         try:
61             self.close()
62         except IOError, e:
63             if e.errno == errno.EPIPE:
64                 pass
65             else:
66                 raise
67
68     def close(self):
69         if self.conn and not self._busy:
70             self.conn.write('quit\n')
71         if self.p:
72             self.p.stdin.close()
73             while self.p.stdout.read(65536):
74                 pass
75             self.p.stdout.close()
76             self.p.wait()
77             rv = self.p.wait()
78             if rv:
79                 raise ClientError('server tunnel returned exit code %d' % rv)
80         self.conn = None
81         self.p = None
82
83     def check_ok(self):
84         rv = self.p.poll()
85         if rv != None:
86             raise ClientError('server exited unexpectedly with code %r' % rv)
87         try:
88             self.conn.check_ok()
89         except Exception, e:
90             raise ClientError, e, sys.exc_info()[2]
91
92     def check_busy(self):
93         if self._busy:
94             raise ClientError('already busy with command %r' % self._busy)
95         
96     def _not_busy(self):
97         self._busy = None
98
99     def sync_indexes_del(self):
100         self.check_busy()
101         conn = self.conn
102         conn.write('list-indexes\n')
103         packdir = git.repo('objects/pack')
104         all = {}
105         needed = {}
106         for line in linereader(conn):
107             if not line:
108                 break
109             all[line] = 1
110             assert(line.find('/') < 0)
111             if not os.path.exists(os.path.join(self.cachedir, line)):
112                 needed[line] = 1
113         self.check_ok()
114
115         mkdirp(self.cachedir)
116         for f in os.listdir(self.cachedir):
117             if f.endswith('.idx') and not f in all:
118                 log('pruning old index: %r\n' % f)
119                 os.unlink(os.path.join(self.cachedir, f))
120
121     def sync_index(self, name):
122         #log('requesting %r\n' % name)
123         mkdirp(self.cachedir)
124         self.conn.write('send-index %s\n' % name)
125         n = struct.unpack('!I', self.conn.read(4))[0]
126         assert(n)
127         log('   expect %d bytes\n' % n)
128         fn = os.path.join(self.cachedir, name)
129         f = open(fn + '.tmp', 'w')
130         for b in chunkyreader(self.conn, n):
131             f.write(b)
132         self.check_ok()
133         f.close()
134         os.rename(fn + '.tmp', fn)
135
136     def _make_objcache(self):
137         ob = self._busy
138         self._busy = None
139         #self.sync_indexes()
140         self._busy = ob
141         return git.MultiPackIndex(self.cachedir)
142
143     def _suggest_pack(self, indexname):
144         log('received index suggestion: %s\n' % indexname)
145         ob = self._busy
146         if ob:
147             assert(ob == 'receive-objects')
148             self._busy = None
149             self.conn.write('\xff\xff\xff\xff')  # suspend receive-objects
150             self.conn.drain_and_check_ok()
151         self.sync_index(indexname)
152         if ob:
153             self.conn.write('receive-objects\n')
154             self._busy = ob
155
156     def new_packwriter(self):
157         self.check_busy()
158         self._busy = 'receive-objects'
159         return PackWriter_Remote(self.conn,
160                                  objcache_maker = self._make_objcache,
161                                  suggest_pack = self._suggest_pack,
162                                  onclose = self._not_busy)
163
164     def read_ref(self, refname):
165         self.check_busy()
166         self.conn.write('read-ref %s\n' % refname)
167         r = self.conn.readline().strip()
168         self.check_ok()
169         if r:
170             assert(len(r) == 40)   # hexified sha
171             return r.decode('hex')
172         else:
173             return None   # nonexistent ref
174
175     def update_ref(self, refname, newval, oldval):
176         self.check_busy()
177         self.conn.write('update-ref %s\n%s\n%s\n' 
178                         % (refname, newval.encode('hex'),
179                            (oldval or '').encode('hex')))
180         self.check_ok()
181
182     def cat(self, id):
183         self.check_busy()
184         self._busy = 'cat'
185         self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
186         while 1:
187             sz = struct.unpack('!I', self.conn.read(4))[0]
188             if not sz: break
189             yield self.conn.read(sz)
190         self.check_ok()
191         self._not_busy()
192
193
194 class PackWriter_Remote(git.PackWriter):
195     def __init__(self, conn, objcache_maker, suggest_pack, onclose):
196         git.PackWriter.__init__(self, objcache_maker)
197         self.file = conn
198         self.filename = 'remote socket'
199         self.suggest_pack = suggest_pack
200         self.onclose = onclose
201         self._packopen = False
202
203     def _open(self):
204         if not self._packopen:
205             self._make_objcache()
206             self.file.write('receive-objects\n')
207             self._packopen = True
208
209     def _end(self):
210         if self._packopen and self.file:
211             self.file.write('\0\0\0\0')
212             self._packopen = False
213             id = self.file.readline().strip()
214             self.file.check_ok()
215             self.objcache = None
216             if self.onclose:
217                 self.onclose()
218             if self.suggest_pack:
219                 self.suggest_pack(id)
220             return id
221
222     def close(self):
223         id = self._end()
224         self.file = None
225         return id
226
227     def abort(self):
228         raise GitError("don't know how to abort remote pack writing")
229
230     def _raw_write(self, datalist):
231         assert(self.file)
232         if not self._packopen:
233             self._open()
234         data = ''.join(datalist)
235         assert(len(data))
236         self.file.write(struct.pack('!I', len(data)) + data)
237         self.outbytes += len(data)
238         self.count += 1
239
240         if self.file.has_input():
241             line = self.file.readline().strip()
242             assert(line.startswith('index '))
243             idxname = line[6:]
244             if self.suggest_pack:
245                 self.suggest_pack(idxname)
246                 self.objcache.refresh()