]> arthur.barton.de Git - bup.git/blob - lib/bup/client.py
8a83e38e40a3fef958bbadf07f8687a7abfc4120
[bup.git] / lib / bup / client.py
1 import re, struct, errno, select
2 from bup import git, ssh
3 from bup.helpers import *
4
5
6 class ClientError(Exception):
7     pass
8
9
10 class Client:
11     def __init__(self, remote, create=False):
12         self._busy = self.conn = self.p = None
13         rs = remote.split(':', 1)
14         if len(rs) == 1:
15             (host, dir) = (None, remote)
16         else:
17             (host, dir) = rs
18         (self.host, self.dir) = (host, dir)
19         self.cachedir = git.repo('index-cache/%s'
20                                  % re.sub(r'[^@\w]', '_', 
21                                           "%s:%s" % (host, dir)))
22         try:
23             self.p = ssh.connect(host, 'server')
24         except OSError, e:
25             raise ClientError, 'exec %r: %s' % (argv[0], e), sys.exc_info()[2]
26         self.conn = Conn(self.p.stdout, self.p.stdin)
27         if dir:
28             dir = re.sub(r'[\r\n]', ' ', dir)
29             if create:
30                 self.conn.write('init-dir %s\n' % dir)
31             else:
32                 self.conn.write('set-dir %s\n' % dir)
33             self.check_ok()
34         self.sync_indexes_del()
35
36     def __del__(self):
37         try:
38             self.close()
39         except IOError, e:
40             if e.errno == errno.EPIPE:
41                 pass
42             else:
43                 raise
44
45     def close(self):
46         if self.conn and not self._busy:
47             self.conn.write('quit\n')
48         if self.p:
49             self.p.stdin.close()
50             while self.p.stdout.read(65536):
51                 pass
52             self.p.stdout.close()
53             self.p.wait()
54             rv = self.p.wait()
55             if rv:
56                 raise ClientError('server tunnel returned exit code %d' % rv)
57         self.conn = None
58         self.p = None
59
60     def check_ok(self):
61         rv = self.p.poll()
62         if rv != None:
63             raise ClientError('server exited unexpectedly with code %r' % rv)
64         try:
65             return self.conn.check_ok()
66         except Exception, e:
67             raise ClientError, e, sys.exc_info()[2]
68
69     def check_busy(self):
70         if self._busy:
71             raise ClientError('already busy with command %r' % self._busy)
72         
73     def ensure_busy(self):
74         if not self._busy:
75             raise ClientError('expected to be busy, but not busy?!')
76         
77     def _not_busy(self):
78         self._busy = None
79
80     def sync_indexes_del(self):
81         self.check_busy()
82         conn = self.conn
83         conn.write('list-indexes\n')
84         packdir = git.repo('objects/pack')
85         all = {}
86         needed = {}
87         for line in linereader(conn):
88             if not line:
89                 break
90             all[line] = 1
91             assert(line.find('/') < 0)
92             if not os.path.exists(os.path.join(self.cachedir, line)):
93                 needed[line] = 1
94         self.check_ok()
95
96         mkdirp(self.cachedir)
97         for f in os.listdir(self.cachedir):
98             if f.endswith('.idx') and not f in all:
99                 log('pruning old index: %r\n' % f)
100                 os.unlink(os.path.join(self.cachedir, f))
101
102     def sync_index(self, name):
103         #log('requesting %r\n' % name)
104         self.check_busy()
105         mkdirp(self.cachedir)
106         self.conn.write('send-index %s\n' % name)
107         n = struct.unpack('!I', self.conn.read(4))[0]
108         assert(n)
109         fn = os.path.join(self.cachedir, name)
110         f = open(fn + '.tmp', 'w')
111         count = 0
112         progress('Receiving index: %d/%d\r' % (count, n))
113         for b in chunkyreader(self.conn, n):
114             f.write(b)
115             count += len(b)
116             progress('Receiving index: %d/%d\r' % (count, n))
117         progress('Receiving index: %d/%d, done.\n' % (count, n))
118         self.check_ok()
119         f.close()
120         os.rename(fn + '.tmp', fn)
121
122     def _make_objcache(self):
123         ob = self._busy
124         self._busy = None
125         #self.sync_indexes()
126         self._busy = ob
127         return git.PackIdxList(self.cachedir)
128
129     def _suggest_pack(self, indexname):
130         log('received index suggestion: %s\n' % indexname)
131         ob = self._busy
132         if ob:
133             assert(ob == 'receive-objects')
134             self.conn.write('\xff\xff\xff\xff')  # suspend receive-objects
135             self._busy = None
136             self.conn.drain_and_check_ok()
137         self.sync_index(indexname)
138         if ob:
139             self._busy = ob
140             self.conn.write('receive-objects\n')
141
142     def new_packwriter(self):
143         self.check_busy()
144         def _set_busy():
145             self._busy = 'receive-objects'
146             self.conn.write('receive-objects\n')
147         return PackWriter_Remote(self.conn,
148                                  objcache_maker = self._make_objcache,
149                                  suggest_pack = self._suggest_pack,
150                                  onopen = _set_busy,
151                                  onclose = self._not_busy,
152                                  ensure_busy = self.ensure_busy)
153
154     def read_ref(self, refname):
155         self.check_busy()
156         self.conn.write('read-ref %s\n' % refname)
157         r = self.conn.readline().strip()
158         self.check_ok()
159         if r:
160             assert(len(r) == 40)   # hexified sha
161             return r.decode('hex')
162         else:
163             return None   # nonexistent ref
164
165     def update_ref(self, refname, newval, oldval):
166         self.check_busy()
167         self.conn.write('update-ref %s\n%s\n%s\n' 
168                         % (refname, newval.encode('hex'),
169                            (oldval or '').encode('hex')))
170         self.check_ok()
171
172     def cat(self, id):
173         self.check_busy()
174         self._busy = 'cat'
175         self.conn.write('cat %s\n' % re.sub(r'[\n\r]', '_', id))
176         while 1:
177             sz = struct.unpack('!I', self.conn.read(4))[0]
178             if not sz: break
179             yield self.conn.read(sz)
180         e = self.check_ok()
181         self._not_busy()
182         if e:
183             raise KeyError(str(e))
184
185
186 class PackWriter_Remote(git.PackWriter):
187     def __init__(self, conn, objcache_maker, suggest_pack,
188                  onopen, onclose,
189                  ensure_busy):
190         git.PackWriter.__init__(self, objcache_maker)
191         self.file = conn
192         self.filename = 'remote socket'
193         self.suggest_pack = suggest_pack
194         self.onopen = onopen
195         self.onclose = onclose
196         self.ensure_busy = ensure_busy
197         self._packopen = False
198
199     def _open(self):
200         if not self._packopen:
201             self._make_objcache()
202             if self.onopen:
203                 self.onopen()
204             self._packopen = True
205
206     def _end(self):
207         if self._packopen and self.file:
208             self.file.write('\0\0\0\0')
209             self._packopen = False
210             while True:
211                 line = self.file.readline().strip()
212                 if line.startswith('index '):
213                     pass
214                 else:
215                     break
216             id = line
217             self.file.check_ok()
218             self.objcache = None
219             if self.onclose:
220                 self.onclose()
221             if id and self.suggest_pack:
222                 self.suggest_pack(id)
223             return id
224
225     def close(self):
226         id = self._end()
227         self.file = None
228         return id
229
230     def abort(self):
231         raise GitError("don't know how to abort remote pack writing")
232
233     def _raw_write(self, datalist):
234         assert(self.file)
235         if not self._packopen:
236             self._open()
237         if self.ensure_busy:
238             self.ensure_busy()
239         data = ''.join(datalist)
240         assert(len(data))
241         self.file.write(struct.pack('!I', len(data)) + data)
242         self.outbytes += len(data)
243         self.count += 1
244
245         if self.file.has_input():
246             line = self.file.readline().strip()
247             assert(line.startswith('index '))
248             idxname = line[6:]
249             if self.suggest_pack:
250                 self.suggest_pack(idxname)
251                 self.objcache.refresh()