]> arthur.barton.de Git - bup.git/blob - lib/bup/hashsplit.py
dc3a538fe52d329537a3021c289dca16c9692de7
[bup.git] / lib / bup / hashsplit.py
1
2 from __future__ import absolute_import
3 import io, math, os
4
5 from bup import _helpers, compat, helpers
6 from bup.compat import buffer, join_bytes
7 from bup.helpers import sc_page_size
8
9
10 _fmincore = getattr(helpers, 'fmincore', None)
11
12 BLOB_MAX = 8192*4   # 8192 is the "typical" blob size for bupsplit
13 BLOB_READ_SIZE = 1024*1024
14 MAX_PER_TREE = 256
15 progress_callback = None
16 fanout = 16
17
18 GIT_MODE_FILE = 0o100644
19 GIT_MODE_TREE = 0o40000
20 GIT_MODE_SYMLINK = 0o120000
21
22 # The purpose of this type of buffer is to avoid copying on peek(), get(),
23 # and eat().  We do copy the buffer contents on put(), but that should
24 # be ok if we always only put() large amounts of data at a time.
25 class Buf:
26     def __init__(self):
27         self.data = b''
28         self.start = 0
29
30     def put(self, s):
31         if s:
32             self.data = join_bytes(buffer(self.data, self.start), s)
33             self.start = 0
34             
35     def peek(self, count):
36         return buffer(self.data, self.start, count)
37     
38     def eat(self, count):
39         self.start += count
40
41     def get(self, count):
42         if count <= 256:
43             v = self.data[self.start : self.start + count]
44         else:
45             v = buffer(self.data, self.start, count)
46         self.start += count
47         return v
48
49     def used(self):
50         return len(self.data) - self.start
51
52
53 def _fadvise_pages_done(fd, first_page, count):
54     assert(first_page >= 0)
55     assert(count >= 0)
56     if count > 0:
57         _helpers.fadvise_done(fd,
58                               first_page * sc_page_size,
59                               count * sc_page_size)
60
61
62 def _nonresident_page_regions(status_bytes, incore_mask, max_region_len=None):
63     """Return (start_page, count) pairs in ascending start_page order for
64     each contiguous region of nonresident pages indicated by the
65     mincore() status_bytes.  Limit the number of pages in each region
66     to max_region_len."""
67     assert(max_region_len is None or max_region_len > 0)
68     start = None
69     for i, x in enumerate(status_bytes):
70         in_core = x & incore_mask
71         if start is None:
72             if not in_core:
73                 start = i
74         else:
75             count = i - start
76             if in_core:
77                 yield (start, count)
78                 start = None
79             elif max_region_len and count >= max_region_len:
80                 yield (start, count)
81                 start = i
82     if start is not None:
83         yield (start, len(status_bytes) - start)
84
85
86 def _uncache_ours_upto(fd, offset, first_region, remaining_regions):
87     """Uncache the pages of fd indicated by first_region and
88     remaining_regions that are before offset, where each region is a
89     (start_page, count) pair.  The final region must have a start_page
90     of None."""
91     rstart, rlen = first_region
92     while rstart is not None and (rstart + rlen) * sc_page_size <= offset:
93         _fadvise_pages_done(fd, rstart, rlen)
94         rstart, rlen = next(remaining_regions, (None, None))
95     return (rstart, rlen)
96
97
98 def readfile_iter(files, progress=None):
99     for filenum,f in enumerate(files):
100         ofs = 0
101         b = ''
102         fd = rpr = rstart = rlen = None
103         if _fmincore and hasattr(f, 'fileno'):
104             try:
105                 fd = f.fileno()
106             except io.UnsupportedOperation:
107                 pass
108             if fd:
109                 mcore = _fmincore(fd)
110                 if mcore:
111                     max_chunk = max(1, (8 * 1024 * 1024) / sc_page_size)
112                     rpr = _nonresident_page_regions(mcore, helpers.MINCORE_INCORE,
113                                                     max_chunk)
114                     rstart, rlen = next(rpr, (None, None))
115         while 1:
116             if progress:
117                 progress(filenum, len(b))
118             b = f.read(BLOB_READ_SIZE)
119             ofs += len(b)
120             if rpr:
121                 rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
122             if not b:
123                 break
124             yield b
125         if rpr:
126             rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
127
128
129 def _splitbuf(buf, basebits, fanbits):
130     while 1:
131         b = buf.peek(buf.used())
132         (ofs, bits) = _helpers.splitbuf(b)
133         if ofs:
134             if ofs > BLOB_MAX:
135                 ofs = BLOB_MAX
136                 level = 0
137             else:
138                 level = (bits-basebits)//fanbits  # integer division
139             buf.eat(ofs)
140             yield buffer(b, 0, ofs), level
141         else:
142             break
143     while buf.used() >= BLOB_MAX:
144         # limit max blob size
145         yield buf.get(BLOB_MAX), 0
146
147
148 def _hashsplit_iter(files, progress):
149     assert(BLOB_READ_SIZE > BLOB_MAX)
150     basebits = _helpers.blobbits()
151     fanbits = int(math.log(fanout or 128, 2))
152     buf = Buf()
153     for inblock in readfile_iter(files, progress):
154         buf.put(inblock)
155         for buf_and_level in _splitbuf(buf, basebits, fanbits):
156             yield buf_and_level
157     if buf.used():
158         yield buf.get(buf.used()), 0
159
160
161 def _hashsplit_iter_keep_boundaries(files, progress):
162     for real_filenum,f in enumerate(files):
163         if progress:
164             def prog(filenum, nbytes):
165                 # the inner _hashsplit_iter doesn't know the real file count,
166                 # so we'll replace it here.
167                 return progress(real_filenum, nbytes)
168         else:
169             prog = None
170         for buf_and_level in _hashsplit_iter([f], progress=prog):
171             yield buf_and_level
172
173
174 def hashsplit_iter(files, keep_boundaries, progress):
175     if keep_boundaries:
176         return _hashsplit_iter_keep_boundaries(files, progress)
177     else:
178         return _hashsplit_iter(files, progress)
179
180
181 total_split = 0
182 def split_to_blobs(makeblob, files, keep_boundaries, progress):
183     global total_split
184     for (blob, level) in hashsplit_iter(files, keep_boundaries, progress):
185         sha = makeblob(blob)
186         total_split += len(blob)
187         if progress_callback:
188             progress_callback(len(blob))
189         yield (sha, len(blob), level)
190
191
192 def _make_shalist(l):
193     ofs = 0
194     l = list(l)
195     total = sum(size for mode,sha,size, in l)
196     vlen = len(b'%x' % total)
197     shalist = []
198     for (mode, sha, size) in l:
199         shalist.append((mode, b'%0*x' % (vlen,ofs), sha))
200         ofs += size
201     assert(ofs == total)
202     return (shalist, total)
203
204
205 def _squish(maketree, stacks, n):
206     i = 0
207     while i < n or len(stacks[i]) >= MAX_PER_TREE:
208         while len(stacks) <= i+1:
209             stacks.append([])
210         if len(stacks[i]) == 1:
211             stacks[i+1] += stacks[i]
212         elif stacks[i]:
213             (shalist, size) = _make_shalist(stacks[i])
214             tree = maketree(shalist)
215             stacks[i+1].append((GIT_MODE_TREE, tree, size))
216         stacks[i] = []
217         i += 1
218
219
220 def split_to_shalist(makeblob, maketree, files,
221                      keep_boundaries, progress=None):
222     sl = split_to_blobs(makeblob, files, keep_boundaries, progress)
223     assert(fanout != 0)
224     if not fanout:
225         shal = []
226         for (sha,size,level) in sl:
227             shal.append((GIT_MODE_FILE, sha, size))
228         return _make_shalist(shal)[0]
229     else:
230         stacks = [[]]
231         for (sha,size,level) in sl:
232             stacks[0].append((GIT_MODE_FILE, sha, size))
233             _squish(maketree, stacks, level)
234         #log('stacks: %r\n' % [len(i) for i in stacks])
235         _squish(maketree, stacks, len(stacks)-1)
236         #log('stacks: %r\n' % [len(i) for i in stacks])
237         return _make_shalist(stacks[-1])[0]
238
239
240 def split_to_blob_or_tree(makeblob, maketree, files,
241                           keep_boundaries, progress=None):
242     shalist = list(split_to_shalist(makeblob, maketree,
243                                     files, keep_boundaries, progress))
244     if len(shalist) == 1:
245         return (shalist[0][0], shalist[0][2])
246     elif len(shalist) == 0:
247         return (GIT_MODE_FILE, makeblob(b''))
248     else:
249         return (GIT_MODE_TREE, maketree(shalist))
250
251
252 def open_noatime(name):
253     fd = _helpers.open_noatime(name)
254     try:
255         return os.fdopen(fd, 'rb', 1024*1024)
256     except:
257         try:
258             os.close(fd)
259         except:
260             pass
261         raise