]> arthur.barton.de Git - bup.git/blob - lib/bup/hashsplit.py
94d93ed36746ebb31e9dfa4bdf11ca1a205af80c
[bup.git] / lib / bup / hashsplit.py
1
2 from __future__ import absolute_import
3 import io, math, os
4
5 from bup import _helpers, compat, helpers
6 from bup._helpers import cat_bytes
7 from bup.compat import buffer, py_maj
8 from bup.helpers import sc_page_size
9
10
11 _fmincore = getattr(helpers, 'fmincore', None)
12
13 BLOB_MAX = 8192*4   # 8192 is the "typical" blob size for bupsplit
14 BLOB_READ_SIZE = 8 * 1024 * 1024
15 MAX_PER_TREE = 256
16 progress_callback = None
17 fanout = 16
18
19 GIT_MODE_FILE = 0o100644
20 GIT_MODE_TREE = 0o40000
21 GIT_MODE_SYMLINK = 0o120000
22
23 # The purpose of this type of buffer is to avoid copying on peek(), get(),
24 # and eat().  We do copy the buffer contents on put(), but that should
25 # be ok if we always only put() large amounts of data at a time.
26 class Buf:
27     def __init__(self):
28         self.data = b''
29         self.start = 0
30
31     def put(self, s):
32         if not self.data:
33             self.data = s
34             self.start = 0
35         elif s:
36             remaining = len(self.data) - self.start
37             self.data = cat_bytes(self.data, self.start, remaining,
38                                   s, 0, len(s))
39             self.start = 0
40             
41     def peek(self, count):
42         if count <= 256:
43             return self.data[self.start : self.start + count]
44         return buffer(self.data, self.start, count)
45     
46     def eat(self, count):
47         self.start += count
48
49     def get(self, count):
50         if count <= 256:
51             v = self.data[self.start : self.start + count]
52         else:
53             v = buffer(self.data, self.start, count)
54         self.start += count
55         return v
56
57     def used(self):
58         return len(self.data) - self.start
59
60
61 def _fadvise_pages_done(fd, first_page, count):
62     assert(first_page >= 0)
63     assert(count >= 0)
64     if count > 0:
65         _helpers.fadvise_done(fd,
66                               first_page * sc_page_size,
67                               count * sc_page_size)
68
69
70 def _nonresident_page_regions(status_bytes, incore_mask, max_region_len=None):
71     """Return (start_page, count) pairs in ascending start_page order for
72     each contiguous region of nonresident pages indicated by the
73     mincore() status_bytes.  Limit the number of pages in each region
74     to max_region_len."""
75     assert(max_region_len is None or max_region_len > 0)
76     start = None
77     for i, x in enumerate(status_bytes):
78         in_core = x & incore_mask
79         if start is None:
80             if not in_core:
81                 start = i
82         else:
83             count = i - start
84             if in_core:
85                 yield (start, count)
86                 start = None
87             elif max_region_len and count >= max_region_len:
88                 yield (start, count)
89                 start = i
90     if start is not None:
91         yield (start, len(status_bytes) - start)
92
93
94 def _uncache_ours_upto(fd, offset, first_region, remaining_regions):
95     """Uncache the pages of fd indicated by first_region and
96     remaining_regions that are before offset, where each region is a
97     (start_page, count) pair.  The final region must have a start_page
98     of None."""
99     rstart, rlen = first_region
100     while rstart is not None and (rstart + rlen) * sc_page_size <= offset:
101         _fadvise_pages_done(fd, rstart, rlen)
102         rstart, rlen = next(remaining_regions, (None, None))
103     return (rstart, rlen)
104
105
106 def readfile_iter(files, progress=None):
107     for filenum,f in enumerate(files):
108         ofs = 0
109         b = ''
110         fd = rpr = rstart = rlen = None
111         if _fmincore and hasattr(f, 'fileno'):
112             try:
113                 fd = f.fileno()
114             except io.UnsupportedOperation:
115                 pass
116             if fd:
117                 mcore = _fmincore(fd)
118                 if mcore:
119                     max_chunk = max(1, (8 * 1024 * 1024) / sc_page_size)
120                     rpr = _nonresident_page_regions(mcore, helpers.MINCORE_INCORE,
121                                                     max_chunk)
122                     rstart, rlen = next(rpr, (None, None))
123         while 1:
124             if progress:
125                 progress(filenum, len(b))
126             b = f.read(BLOB_READ_SIZE)
127             ofs += len(b)
128             if rpr:
129                 rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
130             if not b:
131                 break
132             yield b
133         if rpr:
134             rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
135
136
137 def _splitbuf(buf, basebits, fanbits):
138     while 1:
139         b = buf.peek(buf.used())
140         (ofs, bits) = _helpers.splitbuf(b)
141         if ofs:
142             if ofs > BLOB_MAX:
143                 ofs = BLOB_MAX
144                 level = 0
145             else:
146                 level = (bits-basebits)//fanbits  # integer division
147             buf.eat(ofs)
148             yield buffer(b, 0, ofs), level
149         else:
150             break
151     while buf.used() >= BLOB_MAX:
152         # limit max blob size
153         yield buf.get(BLOB_MAX), 0
154
155
156 def _hashsplit_iter(files, progress):
157     assert(BLOB_READ_SIZE > BLOB_MAX)
158     basebits = _helpers.blobbits()
159     fanbits = int(math.log(fanout or 128, 2))
160     buf = Buf()
161     for inblock in readfile_iter(files, progress):
162         buf.put(inblock)
163         for buf_and_level in _splitbuf(buf, basebits, fanbits):
164             yield buf_and_level
165     if buf.used():
166         yield buf.get(buf.used()), 0
167
168
169 def _hashsplit_iter_keep_boundaries(files, progress):
170     for real_filenum,f in enumerate(files):
171         if progress:
172             def prog(filenum, nbytes):
173                 # the inner _hashsplit_iter doesn't know the real file count,
174                 # so we'll replace it here.
175                 return progress(real_filenum, nbytes)
176         else:
177             prog = None
178         for buf_and_level in _hashsplit_iter([f], progress=prog):
179             yield buf_and_level
180
181
182 def hashsplit_iter(files, keep_boundaries, progress):
183     if keep_boundaries:
184         return _hashsplit_iter_keep_boundaries(files, progress)
185     else:
186         return _hashsplit_iter(files, progress)
187
188
189 total_split = 0
190 def split_to_blobs(makeblob, files, keep_boundaries, progress):
191     global total_split
192     for (blob, level) in hashsplit_iter(files, keep_boundaries, progress):
193         sha = makeblob(blob)
194         total_split += len(blob)
195         if progress_callback:
196             progress_callback(len(blob))
197         yield (sha, len(blob), level)
198
199
200 def _make_shalist(l):
201     ofs = 0
202     l = list(l)
203     total = sum(size for mode,sha,size, in l)
204     vlen = len(b'%x' % total)
205     shalist = []
206     for (mode, sha, size) in l:
207         shalist.append((mode, b'%0*x' % (vlen,ofs), sha))
208         ofs += size
209     assert(ofs == total)
210     return (shalist, total)
211
212
213 def _squish(maketree, stacks, n):
214     i = 0
215     while i < n or len(stacks[i]) >= MAX_PER_TREE:
216         while len(stacks) <= i+1:
217             stacks.append([])
218         if len(stacks[i]) == 1:
219             stacks[i+1] += stacks[i]
220         elif stacks[i]:
221             (shalist, size) = _make_shalist(stacks[i])
222             tree = maketree(shalist)
223             stacks[i+1].append((GIT_MODE_TREE, tree, size))
224         stacks[i] = []
225         i += 1
226
227
228 def split_to_shalist(makeblob, maketree, files,
229                      keep_boundaries, progress=None):
230     sl = split_to_blobs(makeblob, files, keep_boundaries, progress)
231     assert(fanout != 0)
232     if not fanout:
233         shal = []
234         for (sha,size,level) in sl:
235             shal.append((GIT_MODE_FILE, sha, size))
236         return _make_shalist(shal)[0]
237     else:
238         stacks = [[]]
239         for (sha,size,level) in sl:
240             stacks[0].append((GIT_MODE_FILE, sha, size))
241             _squish(maketree, stacks, level)
242         #log('stacks: %r\n' % [len(i) for i in stacks])
243         _squish(maketree, stacks, len(stacks)-1)
244         #log('stacks: %r\n' % [len(i) for i in stacks])
245         return _make_shalist(stacks[-1])[0]
246
247
248 def split_to_blob_or_tree(makeblob, maketree, files,
249                           keep_boundaries, progress=None):
250     shalist = list(split_to_shalist(makeblob, maketree,
251                                     files, keep_boundaries, progress))
252     if len(shalist) == 1:
253         return (shalist[0][0], shalist[0][2])
254     elif len(shalist) == 0:
255         return (GIT_MODE_FILE, makeblob(b''))
256     else:
257         return (GIT_MODE_TREE, maketree(shalist))
258
259
260 def open_noatime(name):
261     fd = _helpers.open_noatime(name)
262     try:
263         return os.fdopen(fd, 'rb', 1024*1024)
264     except:
265         try:
266             os.close(fd)
267         except:
268             pass
269         raise