]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
Rename PackIndex->PackIdx and MultiPackIndex->PackIdxList.
[bup.git] / lib / bup / client.py
1 import re, struct, errno, select
2 from bup import git
3 from bup.helpers import *
4 from subprocess import Popen, PIPE
5
6
7 class ClientError(Exception):
8     pass
9
10
11 class Client:
12     def __init__(self, remote, create=False):
13         self._busy = None
14         self.p = None
15         self.conn = None
16         rs = remote.split(':', 1)
17         nicedir = os.path.split(os.path.abspath(sys.argv[0]))[0]
18         nicedir = re.sub(r':', "_", nicedir)
19         if len(rs) == 1:
20             (host, dir) = ('NONE', remote)
21             def fixenv():
22                 os.environ['PATH'] = ':'.join([nicedir,
23                                                os.environ.get('PATH', '')])
24             argv = ['bup', 'server']
25         else:
26             (host, dir) = rs
27             fixenv = None
28             # WARNING: shell quoting security holes are possible here, so we
29             # have to be super careful.  We have to use 'sh -c' because
30             # csh-derived shells can't handle PATH= notation.  We can't
31             # set PATH in advance, because ssh probably replaces it.  We
32             # can't exec *safely* using argv, because *both* ssh and 'sh -c'
33             # allow shellquoting.  So we end up having to double-shellquote
34             # stuff here.
35             escapedir = re.sub(r'([^\w/])', r'\\\\\\\1', nicedir)
36             cmd = r"""
37                        sh -c PATH=%s:'$PATH bup server'
38                    """ % escapedir
39             argv = ['ssh', host, '--', cmd.strip()]
40             #log('argv is: %r\n' % argv)
41         (self.host, self.dir) = (host, dir)
42         self.cachedir = git.repo('index-cache/%s'
43                                  % re.sub(r'[^@\w]', '_', 
44                                           "%s:%s" % (host, dir)))
45         try:
46             self.p = p = Popen(argv, stdin=PIPE, stdout=PIPE, preexec_fn=fixenv)
47         except OSError, e:
48             raise ClientError, 'exec %r: %s' % (argv[0], e), sys.exc_info()[2]
49         self.conn = conn = Conn(p.stdout, p.stdin)
50         if dir:
51             dir = re.sub(r'[\r\n]', ' ', dir)
52             if create:
53                 conn.write('init-dir %s\n' % dir)
54             else:
55                 conn.write('set-dir %s\n' % dir)
56             self.check_ok()
57         self.sync_indexes_del()
58
59     def __del__(self):
60         try:
61             self.close()
62         except IOError, e:
63             if e.errno == errno.EPIPE:
64                 pass
65             else:
66                 raise
67
68     def close(self):
69         if self.conn and not self._busy:
70             self.conn.write('quit\n')
71         if self.p:
72             self.p.stdin.close()
73             while self.p.stdout.read(65536):
74                 pass
75             self.p.stdout.close()
76             self.p.wait()
77             rv = self.p.wait()
78             if rv:
79                 raise ClientError('server tunnel returned exit code %d' % rv)
80         self.conn = None
81         self.p = None
82
83     def check_ok(self):
84         rv = self.p.poll()
85         if rv != None:
86             raise ClientError('server exited unexpectedly with code %r' % rv)
87         try:
88             return self.conn.check_ok()
89         except Exception, e:
90             raise ClientError, e, sys.exc_info()[2]
91
92     def check_busy(self):
93         if self._busy:
94             raise ClientError('already busy with command %r' % self._busy)
95         
96     def _not_busy(self):
97         self._busy = None
98
99     def sync_indexes_del(self):
100         self.check_busy()
101         conn = self.conn
102         conn.write('list-indexes\n')
103         packdir = git.repo('objects/pack')
104         all = {}
105         needed = {}
106         for line in linereader(conn):
107             if not line:
108                 break
109             all[line] = 1
110             assert(line.find('/') < 0)
111             if not os.path.exists(os.path.join(self.cachedir, line)):
112                 needed[line] = 1
113         self.check_ok()
114
115         mkdirp(self.cachedir)
116         for f in os.listdir(self.cachedir):
117             if f.endswith('.idx') and not f in all:
118                 log('pruning old index: %r\n' % f)
119                 os.unlink(os.path.join(self.cachedir, f))
120
121     def sync_index(self, name):
122         #log('requesting %r\n' % name)
123         mkdirp(self.cachedir)
124         self.conn.write('send-index %s\n' % name)
125         n = struct.unpack('!I', self.conn.read(4))[0]
126         assert(n)
127         fn = os.path.join(self.cachedir, name)
128         f = open(fn + '.tmp', 'w')
129         count = 0
130         progress('Receiving index: %d/%d\r' % (count, n))
131         for b in chunkyreader(self.conn, n):
132             f.write(b)
133             count += len(b)
134             progress('Receiving index: %d/%d\r' % (count, n))
135         progress('Receiving index: %d/%d, done.\n' % (count, n))
136         self.check_ok()
137         f.close()
138         os.rename(fn + '.tmp', fn)
139
140     def _make_objcache(self):
141         ob = self._busy
142         self._busy = None
143         #self.sync_indexes()
144         self._busy = ob
145         return git.PackIdxList(self.cachedir)
146
147     def _suggest_pack(self, indexname):
148         log('received index suggestion: %s\n' % indexname)
149         ob = self._busy
150         if ob:
151             assert(ob == 'receive-objects')
152             self._busy = None
153             self.conn.write('\xff\xff\xff\xff')  # suspend receive-objects
154             self.conn.drain_and_check_ok()
155         self.sync_index(indexname)
156         if ob:
157             self.conn.write('receive-objects\n')
158             self._busy = ob
159
160     def new_packwriter(self):
161         self.check_busy()
162         self._busy = 'receive-objects'
163         return PackWriter_Remote(self.conn,
164                                  objcache_maker = self._make_objcache,
165                                  suggest_pack = self._suggest_pack,
166                                  onclose = self._not_busy)
167
168     def read_ref(self, refname):
169         self.check_busy()
170         self.conn.write('read-ref %s\n' % refname)
171         r = self.conn.readline().strip()
172         self.check_ok()
173         if r:
174             assert(len(r) == 40)   # hexified sha
175             return r.decode('hex')
176         else:
177             return None   # nonexistent ref
178
179     def update_ref(self, refname, newval, oldval):
180         self.check_busy()
181         self.conn.write('update-ref %s\n%s\n%s\n' 
182                         % (refname, newval.encode('hex'),
183                            (oldval or '').encode('hex')))
184         self.check_ok()
185
186     def cat(self, id):
187         self.check_busy()
188         self._busy = 'cat'
189         self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
190         while 1:
191             sz = struct.unpack('!I', self.conn.read(4))[0]
192             if not sz: break
193             yield self.conn.read(sz)
194         e = self.check_ok()
195         self._not_busy()
196         if e:
197             raise KeyError(str(e))
198
199
200 class PackWriter_Remote(git.PackWriter):
201     def __init__(self, conn, objcache_maker, suggest_pack, onclose):
202         git.PackWriter.__init__(self, objcache_maker)
203         self.file = conn
204         self.filename = 'remote socket'
205         self.suggest_pack = suggest_pack
206         self.onclose = onclose
207         self._packopen = False
208
209     def _open(self):
210         if not self._packopen:
211             self._make_objcache()
212             self.file.write('receive-objects\n')
213             self._packopen = True
214
215     def _end(self):
216         if self._packopen and self.file:
217             self.file.write('\0\0\0\0')
218             self._packopen = False
219             while True:
220                 line = self.file.readline().strip()
221                 if line.startswith('index '):
222                     pass
223                 else:
224                     break
225             id = line
226             self.file.check_ok()
227             self.objcache = None
228             if self.onclose:
229                 self.onclose()
230             if self.suggest_pack:
231                 self.suggest_pack(id)
232             return id
233
234     def close(self):
235         id = self._end()
236         self.file = None
237         return id
238
239     def abort(self):
240         raise GitError("don't know how to abort remote pack writing")
241
242     def _raw_write(self, datalist):
243         assert(self.file)
244         if not self._packopen:
245             self._open()
246         data = ''.join(datalist)
247         assert(len(data))
248         self.file.write(struct.pack('!I', len(data)) + data)
249         self.outbytes += len(data)
250         self.count += 1
251
252         if self.file.has_input():
253             line = self.file.readline().strip()
254             assert(line.startswith('index '))
255             idxname = line[6:]
256             if self.suggest_pack:
257                 self.suggest_pack(idxname)
258                 self.objcache.refresh()