]> arthur.barton.de Git - bup.git/blob - client.py
Refactored client stuff into client.py; now cmd-save and cmd-init use it too.
[bup.git] / client.py
1 import re, struct
2 import git
3 from helpers import *
4 from subprocess import Popen, PIPE
5
6 class ClientError(Exception):
7     pass
8
9 class Client:
10     def __init__(self, remote, create=False):
11         self._busy = None
12         self._indexes_synced = 0
13         rs = remote.split(':', 1)
14         if len(rs) == 1:
15             (host, dir) = ('NONE', remote)
16             argv = ['bup', 'server']
17         else:
18             (host, dir) = rs
19             argv = ['ssh', host, '--', 'bup', 'server']
20         (self.host, self.dir) = (host, dir)
21         self.cachedir = git.repo('index-cache/%s'
22                                  % re.sub(r'[^@:\w]', '_', 
23                                           "%s:%s" % (host, dir)))
24         self.p = p = Popen(argv, stdin=PIPE, stdout=PIPE)
25         self.conn = conn = Conn(p.stdout, p.stdin)
26         if dir:
27             dir = re.sub(r'[\r\n]', ' ', dir)
28             if create:
29                 conn.write('init-dir %s\n' % dir)
30             else:
31                 conn.write('set-dir %s\n' % dir)
32             conn.check_ok()
33
34     def __del__(self):
35         self.close()
36
37     def close(self):
38         if self.conn and not self._busy:
39             self.conn.write('quit\n')
40         if self.p:
41             self.p.stdin.close()
42             while self.p.stdout.read(65536):
43                 pass
44             self.p.stdout.close()
45             self.p.wait()
46             rv = self.p.wait()
47             if rv:
48                 raise ClientError('server tunnel returned exit code %d' % rv)
49         self.conn = None
50         self.p = None
51             
52     def check_busy(self):
53         if self._busy:
54             raise ClientError('already busy with command %r' % self._busy)
55         
56     def _not_busy(self):
57         self._busy = None
58
59     def sync_indexes(self):
60         self.check_busy()
61         conn = self.conn
62         conn.write('list-indexes\n')
63         packdir = git.repo('objects/pack')
64         mkdirp(self.cachedir)
65         all = {}
66         needed = {}
67         for line in linereader(conn):
68             if not line:
69                 break
70             all[line] = 1
71             assert(line.find('/') < 0)
72             if (not os.path.exists(os.path.join(self.cachedir, line)) and
73                 not os.path.exists(os.path.join(packdir, line))):
74                     needed[line] = 1
75         conn.check_ok()
76
77         for f in os.listdir(self.cachedir):
78             if f.endswith('.idx') and not f in all:
79                 log('pruning old index: %r\n' % f)
80                 os.unlink(os.path.join(self.cachedir, f))
81
82         # FIXME this should be pipelined: request multiple indexes at a time, or
83         # we waste lots of network turnarounds.
84         for name in needed.keys():
85             log('requesting %r\n' % name)
86             conn.write('send-index %s\n' % name)
87             n = struct.unpack('!I', conn.read(4))[0]
88             assert(n)
89             log('   expect %d bytes\n' % n)
90             fn = os.path.join(self.cachedir, name)
91             f = open(fn + '.tmp', 'w')
92             for b in chunkyreader(conn, n):
93                 f.write(b)
94             conn.check_ok()
95             f.close()
96             os.rename(fn + '.tmp', fn)
97
98         self._indexes_synced = 1
99
100     def new_packwriter(self):
101         assert(self._indexes_synced)
102         self.check_busy()
103         self._busy = 'receive-objects'
104         self.conn.write('receive-objects\n')
105         objcache = git.MultiPackIndex(self.cachedir)
106         return git.PackWriter_Remote(self.conn, objcache = objcache,
107                                      onclose = self._not_busy)