]> arthur.barton.de Git - bup.git/blob - lib/bup/cmd/server.py
357442847e1872265393381e887fb2c9c64b08f6
[bup.git] / lib / bup / cmd / server.py
1
2 from __future__ import absolute_import
3 from binascii import hexlify, unhexlify
4 import os, struct, subprocess, sys
5
6 from bup import options, git, vfs, vint
7 from bup.compat import environ, hexstr
8 from bup.helpers import (Conn, debug1, debug2, linereader, lines_until_sentinel,
9                          log)
10 from bup.io import byte_stream, path_msg
11 from bup.repo import LocalRepo
12
13
14 suspended_w = None
15 dumb_server_mode = False
16 repo = None
17
18
19 def do_help(conn, junk):
20     conn.write(b'Commands:\n    %s\n' % b'\n    '.join(sorted(commands)))
21     conn.ok()
22
23
24 def _set_mode():
25     global dumb_server_mode
26     dumb_server_mode = os.path.exists(git.repo(b'bup-dumb-server'))
27     debug1('bup server: serving in %s mode\n'
28            % (dumb_server_mode and 'dumb' or 'smart'))
29
30
31 def _init_session(reinit_with_new_repopath=None):
32     global repo
33     if reinit_with_new_repopath is None and git.repodir:
34         if not repo:
35             repo = LocalRepo()
36         return
37     git.check_repo_or_die(reinit_with_new_repopath)
38     if repo:
39         repo.close()
40     repo = LocalRepo()
41     # OK. we now know the path is a proper repository. Record this path in the
42     # environment so that subprocesses inherit it and know where to operate.
43     environ[b'BUP_DIR'] = git.repodir
44     debug1('bup server: bupdir is %s\n' % path_msg(git.repodir))
45     _set_mode()
46
47
48 def init_dir(conn, arg):
49     git.init_repo(arg)
50     debug1('bup server: bupdir initialized: %s\n' % path_msg(git.repodir))
51     _init_session(arg)
52     conn.ok()
53
54
55 def set_dir(conn, arg):
56     _init_session(arg)
57     conn.ok()
58
59
60 def list_indexes(conn, junk):
61     _init_session()
62     suffix = b''
63     if dumb_server_mode:
64         suffix = b' load'
65     for f in os.listdir(git.repo(b'objects/pack')):
66         if f.endswith(b'.idx'):
67             conn.write(b'%s%s\n' % (f, suffix))
68     conn.ok()
69
70
71 def send_index(conn, name):
72     _init_session()
73     assert name.find(b'/') < 0
74     assert name.endswith(b'.idx')
75     idx = git.open_idx(git.repo(b'objects/pack/%s' % name))
76     conn.write(struct.pack('!I', len(idx.map)))
77     conn.write(idx.map)
78     conn.ok()
79
80
81 def receive_objects_v2(conn, junk):
82     global suspended_w
83     _init_session()
84     suggested = set()
85     if suspended_w:
86         w = suspended_w
87         suspended_w = None
88     else:
89         if dumb_server_mode:
90             w = git.PackWriter(objcache_maker=None)
91         else:
92             w = git.PackWriter()
93     while 1:
94         ns = conn.read(4)
95         if not ns:
96             w.abort()
97             raise Exception('object read: expected length header, got EOF\n')
98         n = struct.unpack('!I', ns)[0]
99         #debug2('expecting %d bytes\n' % n)
100         if not n:
101             debug1('bup server: received %d object%s.\n'
102                 % (w.count, w.count!=1 and "s" or ''))
103             fullpath = w.close(run_midx=not dumb_server_mode)
104             if fullpath:
105                 (dir, name) = os.path.split(fullpath)
106                 conn.write(b'%s.idx\n' % name)
107             conn.ok()
108             return
109         elif n == 0xffffffff:
110             debug2('bup server: receive-objects suspended.\n')
111             suspended_w = w
112             conn.ok()
113             return
114
115         shar = conn.read(20)
116         crcr = struct.unpack('!I', conn.read(4))[0]
117         n -= 20 + 4
118         buf = conn.read(n)  # object sizes in bup are reasonably small
119         #debug2('read %d bytes\n' % n)
120         _check(w, n, len(buf), 'object read: expected %d bytes, got %d\n')
121         if not dumb_server_mode:
122             oldpack = w.exists(shar, want_source=True)
123             if oldpack:
124                 assert(not oldpack == True)
125                 assert(oldpack.endswith(b'.idx'))
126                 (dir,name) = os.path.split(oldpack)
127                 if not (name in suggested):
128                     debug1("bup server: suggesting index %s\n"
129                            % git.shorten_hash(name).decode('ascii'))
130                     debug1("bup server:   because of object %s\n"
131                            % hexstr(shar))
132                     conn.write(b'index %s\n' % name)
133                     suggested.add(name)
134                 continue
135         nw, crc = w._raw_write((buf,), sha=shar)
136         _check(w, crcr, crc, 'object read: expected crc %d, got %d\n')
137     # NOTREACHED
138
139
140 def _check(w, expected, actual, msg):
141     if expected != actual:
142         w.abort()
143         raise Exception(msg % (expected, actual))
144
145
146 def read_ref(conn, refname):
147     _init_session()
148     r = git.read_ref(refname)
149     conn.write(b'%s\n' % hexlify(r) if r else b'')
150     conn.ok()
151
152
153 def update_ref(conn, refname):
154     _init_session()
155     newval = conn.readline().strip()
156     oldval = conn.readline().strip()
157     git.update_ref(refname, unhexlify(newval), unhexlify(oldval))
158     conn.ok()
159
160 def join(conn, id):
161     _init_session()
162     try:
163         for blob in git.cp().join(id):
164             conn.write(struct.pack('!I', len(blob)))
165             conn.write(blob)
166     except KeyError as e:
167         log('server: error: %s\n' % e)
168         conn.write(b'\0\0\0\0')
169         conn.error(e)
170     else:
171         conn.write(b'\0\0\0\0')
172         conn.ok()
173
174 def cat_batch(conn, dummy):
175     _init_session()
176     cat_pipe = git.cp()
177     # For now, avoid potential deadlock by just reading them all
178     for ref in tuple(lines_until_sentinel(conn, b'\n', Exception)):
179         ref = ref[:-1]
180         it = cat_pipe.get(ref)
181         info = next(it)
182         if not info[0]:
183             conn.write(b'missing\n')
184             continue
185         conn.write(b'%s %s %d\n' % info)
186         for buf in it:
187             conn.write(buf)
188     conn.ok()
189
190 def refs(conn, args):
191     limit_to_heads, limit_to_tags = args.split()
192     assert limit_to_heads in (b'0', b'1')
193     assert limit_to_tags in (b'0', b'1')
194     limit_to_heads = int(limit_to_heads)
195     limit_to_tags = int(limit_to_tags)
196     _init_session()
197     patterns = tuple(x[:-1] for x in lines_until_sentinel(conn, b'\n', Exception))
198     for name, oid in git.list_refs(patterns=patterns,
199                                    limit_to_heads=limit_to_heads,
200                                    limit_to_tags=limit_to_tags):
201         assert b'\n' not in name
202         conn.write(b'%s %s\n' % (hexlify(oid), name))
203     conn.write(b'\n')
204     conn.ok()
205
206 def rev_list(conn, _):
207     _init_session()
208     count = conn.readline()
209     if not count:
210         raise Exception('Unexpected EOF while reading rev-list count')
211     assert count == b'\n'
212     count = None
213     fmt = conn.readline()
214     if not fmt:
215         raise Exception('Unexpected EOF while reading rev-list format')
216     fmt = None if fmt == b'\n' else fmt[:-1]
217     refs = tuple(x[:-1] for x in lines_until_sentinel(conn, b'\n', Exception))
218     args = git.rev_list_invocation(refs, format=fmt)
219     p = subprocess.Popen(args, env=git._gitenv(git.repodir),
220                          stdout=subprocess.PIPE)
221     while True:
222         out = p.stdout.read(64 * 1024)
223         if not out:
224             break
225         conn.write(out)
226     conn.write(b'\n')
227     rv = p.wait()  # not fatal
228     if rv:
229         msg = 'git rev-list returned error %d' % rv
230         conn.error(msg)
231         raise git.GitError(msg)
232     conn.ok()
233
234 def resolve(conn, args):
235     _init_session()
236     (flags,) = args.split()
237     flags = int(flags)
238     want_meta = bool(flags & 1)
239     follow = bool(flags & 2)
240     have_parent = bool(flags & 4)
241     parent = vfs.read_resolution(conn) if have_parent else None
242     path = vint.read_bvec(conn)
243     if not len(path):
244         raise Exception('Empty resolve path')
245     try:
246         res = list(vfs.resolve(repo, path, parent=parent, want_meta=want_meta,
247                                follow=follow))
248     except vfs.IOError as ex:
249         res = ex
250     if isinstance(res, vfs.IOError):
251         conn.write(b'\x00')  # error
252         vfs.write_ioerror(conn, res)
253     else:
254         conn.write(b'\x01')  # success
255         vfs.write_resolution(conn, res)
256     conn.ok()
257
258 optspec = """
259 bup server
260 """
261
262 commands = {
263     b'quit': None,
264     b'help': do_help,
265     b'init-dir': init_dir,
266     b'set-dir': set_dir,
267     b'list-indexes': list_indexes,
268     b'send-index': send_index,
269     b'receive-objects-v2': receive_objects_v2,
270     b'read-ref': read_ref,
271     b'update-ref': update_ref,
272     b'join': join,
273     b'cat': join,  # apocryphal alias
274     b'cat-batch' : cat_batch,
275     b'refs': refs,
276     b'rev-list': rev_list,
277     b'resolve': resolve
278 }
279
280 def main(argv):
281     o = options.Options(optspec)
282     opt, flags, extra = o.parse_bytes(argv[1:])
283
284     if extra:
285         o.fatal('no arguments expected')
286
287     debug2('bup server: reading from stdin.\n')
288
289     # FIXME: this protocol is totally lame and not at all future-proof.
290     # (Especially since we abort completely as soon as *anything* bad happens)
291     sys.stdout.flush()
292     conn = Conn(byte_stream(sys.stdin), byte_stream(sys.stdout))
293     lr = linereader(conn)
294     for _line in lr:
295         line = _line.strip()
296         if not line:
297             continue
298         debug1('bup server: command: %r\n' % line)
299         words = line.split(b' ', 1)
300         cmd = words[0]
301         rest = len(words)>1 and words[1] or b''
302         if cmd == b'quit':
303             break
304         else:
305             cmd = commands.get(cmd)
306             if cmd:
307                 cmd(conn, rest)
308             else:
309                 raise Exception('unknown server command: %r\n' % line)
310
311     debug1('bup server: done\n')