]> arthur.barton.de Git - bup.git/blob - lib/bup/hashsplit.py
Add probabilistic "bup gc", but require --unsafe
[bup.git] / lib / bup / hashsplit.py
1 import io, math, os
2
3 from bup import _helpers, helpers
4 from bup.helpers import sc_page_size
5
6 _fmincore = getattr(helpers, 'fmincore', None)
7
8 BLOB_MAX = 8192*4   # 8192 is the "typical" blob size for bupsplit
9 BLOB_READ_SIZE = 1024*1024
10 MAX_PER_TREE = 256
11 progress_callback = None
12 fanout = 16
13
14 GIT_MODE_FILE = 0100644
15 GIT_MODE_TREE = 040000
16 GIT_MODE_SYMLINK = 0120000
17 assert(GIT_MODE_TREE != 40000)  # 0xxx should be treated as octal
18
19 # The purpose of this type of buffer is to avoid copying on peek(), get(),
20 # and eat().  We do copy the buffer contents on put(), but that should
21 # be ok if we always only put() large amounts of data at a time.
22 class Buf:
23     def __init__(self):
24         self.data = ''
25         self.start = 0
26
27     def put(self, s):
28         if s:
29             self.data = buffer(self.data, self.start) + s
30             self.start = 0
31             
32     def peek(self, count):
33         return buffer(self.data, self.start, count)
34     
35     def eat(self, count):
36         self.start += count
37
38     def get(self, count):
39         v = buffer(self.data, self.start, count)
40         self.start += count
41         return v
42
43     def used(self):
44         return len(self.data) - self.start
45
46
47 def _fadvise_pages_done(fd, first_page, count):
48     assert(first_page >= 0)
49     assert(count >= 0)
50     if count > 0:
51         _helpers.fadvise_done(fd,
52                               first_page * sc_page_size,
53                               count * sc_page_size)
54
55
56 def _nonresident_page_regions(status_bytes, incore_mask, max_region_len=None):
57     """Return (start_page, count) pairs in ascending start_page order for
58     each contiguous region of nonresident pages indicated by the
59     mincore() status_bytes.  Limit the number of pages in each region
60     to max_region_len."""
61     assert(max_region_len is None or max_region_len > 0)
62     start = None
63     for i, x in enumerate(status_bytes):
64         in_core = x & incore_mask
65         if start is None:
66             if not in_core:
67                 start = i
68         else:
69             count = i - start
70             if in_core:
71                 yield (start, count)
72                 start = None
73             elif max_region_len and count >= max_region_len:
74                 yield (start, count)
75                 start = i
76     if start is not None:
77         yield (start, len(status_bytes) - start)
78
79
80 def _uncache_ours_upto(fd, offset, first_region, remaining_regions):
81     """Uncache the pages of fd indicated by first_region and
82     remaining_regions that are before offset, where each region is a
83     (start_page, count) pair.  The final region must have a start_page
84     of None."""
85     rstart, rlen = first_region
86     while rstart is not None and (rstart + rlen) * sc_page_size <= offset:
87         _fadvise_pages_done(fd, rstart, rlen)
88         rstart, rlen = next(remaining_regions, (None, None))
89     return (rstart, rlen)
90
91
92 def readfile_iter(files, progress=None):
93     for filenum,f in enumerate(files):
94         ofs = 0
95         b = ''
96         fd = rpr = rstart = rlen = None
97         if _fmincore and hasattr(f, 'fileno'):
98             try:
99                 fd = f.fileno()
100             except io.UnsupportedOperation:
101                 pass
102             if fd:
103                 mcore = _fmincore(fd)
104                 if mcore:
105                     max_chunk = max(1, (8 * 1024 * 1024) / sc_page_size)
106                     rpr = _nonresident_page_regions(mcore, helpers.MINCORE_INCORE,
107                                                     max_chunk)
108                     rstart, rlen = next(rpr, (None, None))
109         while 1:
110             if progress:
111                 progress(filenum, len(b))
112             b = f.read(BLOB_READ_SIZE)
113             ofs += len(b)
114             if rpr:
115                 rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
116             if not b:
117                 break
118             yield b
119         if rpr:
120             rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
121
122
123 def _splitbuf(buf, basebits, fanbits):
124     while 1:
125         b = buf.peek(buf.used())
126         (ofs, bits) = _helpers.splitbuf(b)
127         if ofs:
128             if ofs > BLOB_MAX:
129                 ofs = BLOB_MAX
130                 level = 0
131             else:
132                 level = (bits-basebits)//fanbits  # integer division
133             buf.eat(ofs)
134             yield buffer(b, 0, ofs), level
135         else:
136             break
137     while buf.used() >= BLOB_MAX:
138         # limit max blob size
139         yield buf.get(BLOB_MAX), 0
140
141
142 def _hashsplit_iter(files, progress):
143     assert(BLOB_READ_SIZE > BLOB_MAX)
144     basebits = _helpers.blobbits()
145     fanbits = int(math.log(fanout or 128, 2))
146     buf = Buf()
147     for inblock in readfile_iter(files, progress):
148         buf.put(inblock)
149         for buf_and_level in _splitbuf(buf, basebits, fanbits):
150             yield buf_and_level
151     if buf.used():
152         yield buf.get(buf.used()), 0
153
154
155 def _hashsplit_iter_keep_boundaries(files, progress):
156     for real_filenum,f in enumerate(files):
157         if progress:
158             def prog(filenum, nbytes):
159                 # the inner _hashsplit_iter doesn't know the real file count,
160                 # so we'll replace it here.
161                 return progress(real_filenum, nbytes)
162         else:
163             prog = None
164         for buf_and_level in _hashsplit_iter([f], progress=prog):
165             yield buf_and_level
166
167
168 def hashsplit_iter(files, keep_boundaries, progress):
169     if keep_boundaries:
170         return _hashsplit_iter_keep_boundaries(files, progress)
171     else:
172         return _hashsplit_iter(files, progress)
173
174
175 total_split = 0
176 def split_to_blobs(makeblob, files, keep_boundaries, progress):
177     global total_split
178     for (blob, level) in hashsplit_iter(files, keep_boundaries, progress):
179         sha = makeblob(blob)
180         total_split += len(blob)
181         if progress_callback:
182             progress_callback(len(blob))
183         yield (sha, len(blob), level)
184
185
186 def _make_shalist(l):
187     ofs = 0
188     l = list(l)
189     total = sum(size for mode,sha,size, in l)
190     vlen = len('%x' % total)
191     shalist = []
192     for (mode, sha, size) in l:
193         shalist.append((mode, '%0*x' % (vlen,ofs), sha))
194         ofs += size
195     assert(ofs == total)
196     return (shalist, total)
197
198
199 def _squish(maketree, stacks, n):
200     i = 0
201     while i < n or len(stacks[i]) >= MAX_PER_TREE:
202         while len(stacks) <= i+1:
203             stacks.append([])
204         if len(stacks[i]) == 1:
205             stacks[i+1] += stacks[i]
206         elif stacks[i]:
207             (shalist, size) = _make_shalist(stacks[i])
208             tree = maketree(shalist)
209             stacks[i+1].append((GIT_MODE_TREE, tree, size))
210         stacks[i] = []
211         i += 1
212
213
214 def split_to_shalist(makeblob, maketree, files,
215                      keep_boundaries, progress=None):
216     sl = split_to_blobs(makeblob, files, keep_boundaries, progress)
217     assert(fanout != 0)
218     if not fanout:
219         shal = []
220         for (sha,size,level) in sl:
221             shal.append((GIT_MODE_FILE, sha, size))
222         return _make_shalist(shal)[0]
223     else:
224         stacks = [[]]
225         for (sha,size,level) in sl:
226             stacks[0].append((GIT_MODE_FILE, sha, size))
227             _squish(maketree, stacks, level)
228         #log('stacks: %r\n' % [len(i) for i in stacks])
229         _squish(maketree, stacks, len(stacks)-1)
230         #log('stacks: %r\n' % [len(i) for i in stacks])
231         return _make_shalist(stacks[-1])[0]
232
233
234 def split_to_blob_or_tree(makeblob, maketree, files,
235                           keep_boundaries, progress=None):
236     shalist = list(split_to_shalist(makeblob, maketree,
237                                     files, keep_boundaries, progress))
238     if len(shalist) == 1:
239         return (shalist[0][0], shalist[0][2])
240     elif len(shalist) == 0:
241         return (GIT_MODE_FILE, makeblob(''))
242     else:
243         return (GIT_MODE_TREE, maketree(shalist))
244
245
246 def open_noatime(name):
247     fd = _helpers.open_noatime(name)
248     try:
249         return os.fdopen(fd, 'rb', 1024*1024)
250     except:
251         try:
252             os.close(fd)
253         except:
254             pass
255         raise