]> arthur.barton.de Git - bup.git/blob - lib/bup/hashsplit.py
Switch to py2/3 compatible octal notation
[bup.git] / lib / bup / hashsplit.py
1 import io, math, os
2
3 from bup import _helpers, helpers
4 from bup.helpers import sc_page_size
5
6 _fmincore = getattr(helpers, 'fmincore', None)
7
8 BLOB_MAX = 8192*4   # 8192 is the "typical" blob size for bupsplit
9 BLOB_READ_SIZE = 1024*1024
10 MAX_PER_TREE = 256
11 progress_callback = None
12 fanout = 16
13
14 GIT_MODE_FILE = 0o100644
15 GIT_MODE_TREE = 0o40000
16 GIT_MODE_SYMLINK = 0o120000
17
18 # The purpose of this type of buffer is to avoid copying on peek(), get(),
19 # and eat().  We do copy the buffer contents on put(), but that should
20 # be ok if we always only put() large amounts of data at a time.
21 class Buf:
22     def __init__(self):
23         self.data = ''
24         self.start = 0
25
26     def put(self, s):
27         if s:
28             self.data = buffer(self.data, self.start) + s
29             self.start = 0
30             
31     def peek(self, count):
32         return buffer(self.data, self.start, count)
33     
34     def eat(self, count):
35         self.start += count
36
37     def get(self, count):
38         v = buffer(self.data, self.start, count)
39         self.start += count
40         return v
41
42     def used(self):
43         return len(self.data) - self.start
44
45
46 def _fadvise_pages_done(fd, first_page, count):
47     assert(first_page >= 0)
48     assert(count >= 0)
49     if count > 0:
50         _helpers.fadvise_done(fd,
51                               first_page * sc_page_size,
52                               count * sc_page_size)
53
54
55 def _nonresident_page_regions(status_bytes, incore_mask, max_region_len=None):
56     """Return (start_page, count) pairs in ascending start_page order for
57     each contiguous region of nonresident pages indicated by the
58     mincore() status_bytes.  Limit the number of pages in each region
59     to max_region_len."""
60     assert(max_region_len is None or max_region_len > 0)
61     start = None
62     for i, x in enumerate(status_bytes):
63         in_core = x & incore_mask
64         if start is None:
65             if not in_core:
66                 start = i
67         else:
68             count = i - start
69             if in_core:
70                 yield (start, count)
71                 start = None
72             elif max_region_len and count >= max_region_len:
73                 yield (start, count)
74                 start = i
75     if start is not None:
76         yield (start, len(status_bytes) - start)
77
78
79 def _uncache_ours_upto(fd, offset, first_region, remaining_regions):
80     """Uncache the pages of fd indicated by first_region and
81     remaining_regions that are before offset, where each region is a
82     (start_page, count) pair.  The final region must have a start_page
83     of None."""
84     rstart, rlen = first_region
85     while rstart is not None and (rstart + rlen) * sc_page_size <= offset:
86         _fadvise_pages_done(fd, rstart, rlen)
87         rstart, rlen = next(remaining_regions, (None, None))
88     return (rstart, rlen)
89
90
91 def readfile_iter(files, progress=None):
92     for filenum,f in enumerate(files):
93         ofs = 0
94         b = ''
95         fd = rpr = rstart = rlen = None
96         if _fmincore and hasattr(f, 'fileno'):
97             try:
98                 fd = f.fileno()
99             except io.UnsupportedOperation:
100                 pass
101             if fd:
102                 mcore = _fmincore(fd)
103                 if mcore:
104                     max_chunk = max(1, (8 * 1024 * 1024) / sc_page_size)
105                     rpr = _nonresident_page_regions(mcore, helpers.MINCORE_INCORE,
106                                                     max_chunk)
107                     rstart, rlen = next(rpr, (None, None))
108         while 1:
109             if progress:
110                 progress(filenum, len(b))
111             b = f.read(BLOB_READ_SIZE)
112             ofs += len(b)
113             if rpr:
114                 rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
115             if not b:
116                 break
117             yield b
118         if rpr:
119             rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
120
121
122 def _splitbuf(buf, basebits, fanbits):
123     while 1:
124         b = buf.peek(buf.used())
125         (ofs, bits) = _helpers.splitbuf(b)
126         if ofs:
127             if ofs > BLOB_MAX:
128                 ofs = BLOB_MAX
129                 level = 0
130             else:
131                 level = (bits-basebits)//fanbits  # integer division
132             buf.eat(ofs)
133             yield buffer(b, 0, ofs), level
134         else:
135             break
136     while buf.used() >= BLOB_MAX:
137         # limit max blob size
138         yield buf.get(BLOB_MAX), 0
139
140
141 def _hashsplit_iter(files, progress):
142     assert(BLOB_READ_SIZE > BLOB_MAX)
143     basebits = _helpers.blobbits()
144     fanbits = int(math.log(fanout or 128, 2))
145     buf = Buf()
146     for inblock in readfile_iter(files, progress):
147         buf.put(inblock)
148         for buf_and_level in _splitbuf(buf, basebits, fanbits):
149             yield buf_and_level
150     if buf.used():
151         yield buf.get(buf.used()), 0
152
153
154 def _hashsplit_iter_keep_boundaries(files, progress):
155     for real_filenum,f in enumerate(files):
156         if progress:
157             def prog(filenum, nbytes):
158                 # the inner _hashsplit_iter doesn't know the real file count,
159                 # so we'll replace it here.
160                 return progress(real_filenum, nbytes)
161         else:
162             prog = None
163         for buf_and_level in _hashsplit_iter([f], progress=prog):
164             yield buf_and_level
165
166
167 def hashsplit_iter(files, keep_boundaries, progress):
168     if keep_boundaries:
169         return _hashsplit_iter_keep_boundaries(files, progress)
170     else:
171         return _hashsplit_iter(files, progress)
172
173
174 total_split = 0
175 def split_to_blobs(makeblob, files, keep_boundaries, progress):
176     global total_split
177     for (blob, level) in hashsplit_iter(files, keep_boundaries, progress):
178         sha = makeblob(blob)
179         total_split += len(blob)
180         if progress_callback:
181             progress_callback(len(blob))
182         yield (sha, len(blob), level)
183
184
185 def _make_shalist(l):
186     ofs = 0
187     l = list(l)
188     total = sum(size for mode,sha,size, in l)
189     vlen = len('%x' % total)
190     shalist = []
191     for (mode, sha, size) in l:
192         shalist.append((mode, '%0*x' % (vlen,ofs), sha))
193         ofs += size
194     assert(ofs == total)
195     return (shalist, total)
196
197
198 def _squish(maketree, stacks, n):
199     i = 0
200     while i < n or len(stacks[i]) >= MAX_PER_TREE:
201         while len(stacks) <= i+1:
202             stacks.append([])
203         if len(stacks[i]) == 1:
204             stacks[i+1] += stacks[i]
205         elif stacks[i]:
206             (shalist, size) = _make_shalist(stacks[i])
207             tree = maketree(shalist)
208             stacks[i+1].append((GIT_MODE_TREE, tree, size))
209         stacks[i] = []
210         i += 1
211
212
213 def split_to_shalist(makeblob, maketree, files,
214                      keep_boundaries, progress=None):
215     sl = split_to_blobs(makeblob, files, keep_boundaries, progress)
216     assert(fanout != 0)
217     if not fanout:
218         shal = []
219         for (sha,size,level) in sl:
220             shal.append((GIT_MODE_FILE, sha, size))
221         return _make_shalist(shal)[0]
222     else:
223         stacks = [[]]
224         for (sha,size,level) in sl:
225             stacks[0].append((GIT_MODE_FILE, sha, size))
226             _squish(maketree, stacks, level)
227         #log('stacks: %r\n' % [len(i) for i in stacks])
228         _squish(maketree, stacks, len(stacks)-1)
229         #log('stacks: %r\n' % [len(i) for i in stacks])
230         return _make_shalist(stacks[-1])[0]
231
232
233 def split_to_blob_or_tree(makeblob, maketree, files,
234                           keep_boundaries, progress=None):
235     shalist = list(split_to_shalist(makeblob, maketree,
236                                     files, keep_boundaries, progress))
237     if len(shalist) == 1:
238         return (shalist[0][0], shalist[0][2])
239     elif len(shalist) == 0:
240         return (GIT_MODE_FILE, makeblob(''))
241     else:
242         return (GIT_MODE_TREE, maketree(shalist))
243
244
245 def open_noatime(name):
246     fd = _helpers.open_noatime(name)
247     try:
248         return os.fdopen(fd, 'rb', 1024*1024)
249     except:
250         try:
251             os.close(fd)
252         except:
253             pass
254         raise