]> arthur.barton.de Git - bup.git/blob - lib/bup/hashsplit.py
Use absolute_import from the __future__ everywhere
[bup.git] / lib / bup / hashsplit.py
1
2 from __future__ import absolute_import
3 import io, math, os
4
5 from bup import _helpers, helpers
6 from bup.helpers import sc_page_size
7
8 _fmincore = getattr(helpers, 'fmincore', None)
9
10 BLOB_MAX = 8192*4   # 8192 is the "typical" blob size for bupsplit
11 BLOB_READ_SIZE = 1024*1024
12 MAX_PER_TREE = 256
13 progress_callback = None
14 fanout = 16
15
16 GIT_MODE_FILE = 0o100644
17 GIT_MODE_TREE = 0o40000
18 GIT_MODE_SYMLINK = 0o120000
19
20 # The purpose of this type of buffer is to avoid copying on peek(), get(),
21 # and eat().  We do copy the buffer contents on put(), but that should
22 # be ok if we always only put() large amounts of data at a time.
23 class Buf:
24     def __init__(self):
25         self.data = ''
26         self.start = 0
27
28     def put(self, s):
29         if s:
30             self.data = buffer(self.data, self.start) + s
31             self.start = 0
32             
33     def peek(self, count):
34         return buffer(self.data, self.start, count)
35     
36     def eat(self, count):
37         self.start += count
38
39     def get(self, count):
40         v = buffer(self.data, self.start, count)
41         self.start += count
42         return v
43
44     def used(self):
45         return len(self.data) - self.start
46
47
48 def _fadvise_pages_done(fd, first_page, count):
49     assert(first_page >= 0)
50     assert(count >= 0)
51     if count > 0:
52         _helpers.fadvise_done(fd,
53                               first_page * sc_page_size,
54                               count * sc_page_size)
55
56
57 def _nonresident_page_regions(status_bytes, incore_mask, max_region_len=None):
58     """Return (start_page, count) pairs in ascending start_page order for
59     each contiguous region of nonresident pages indicated by the
60     mincore() status_bytes.  Limit the number of pages in each region
61     to max_region_len."""
62     assert(max_region_len is None or max_region_len > 0)
63     start = None
64     for i, x in enumerate(status_bytes):
65         in_core = x & incore_mask
66         if start is None:
67             if not in_core:
68                 start = i
69         else:
70             count = i - start
71             if in_core:
72                 yield (start, count)
73                 start = None
74             elif max_region_len and count >= max_region_len:
75                 yield (start, count)
76                 start = i
77     if start is not None:
78         yield (start, len(status_bytes) - start)
79
80
81 def _uncache_ours_upto(fd, offset, first_region, remaining_regions):
82     """Uncache the pages of fd indicated by first_region and
83     remaining_regions that are before offset, where each region is a
84     (start_page, count) pair.  The final region must have a start_page
85     of None."""
86     rstart, rlen = first_region
87     while rstart is not None and (rstart + rlen) * sc_page_size <= offset:
88         _fadvise_pages_done(fd, rstart, rlen)
89         rstart, rlen = next(remaining_regions, (None, None))
90     return (rstart, rlen)
91
92
93 def readfile_iter(files, progress=None):
94     for filenum,f in enumerate(files):
95         ofs = 0
96         b = ''
97         fd = rpr = rstart = rlen = None
98         if _fmincore and hasattr(f, 'fileno'):
99             try:
100                 fd = f.fileno()
101             except io.UnsupportedOperation:
102                 pass
103             if fd:
104                 mcore = _fmincore(fd)
105                 if mcore:
106                     max_chunk = max(1, (8 * 1024 * 1024) / sc_page_size)
107                     rpr = _nonresident_page_regions(mcore, helpers.MINCORE_INCORE,
108                                                     max_chunk)
109                     rstart, rlen = next(rpr, (None, None))
110         while 1:
111             if progress:
112                 progress(filenum, len(b))
113             b = f.read(BLOB_READ_SIZE)
114             ofs += len(b)
115             if rpr:
116                 rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
117             if not b:
118                 break
119             yield b
120         if rpr:
121             rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
122
123
124 def _splitbuf(buf, basebits, fanbits):
125     while 1:
126         b = buf.peek(buf.used())
127         (ofs, bits) = _helpers.splitbuf(b)
128         if ofs:
129             if ofs > BLOB_MAX:
130                 ofs = BLOB_MAX
131                 level = 0
132             else:
133                 level = (bits-basebits)//fanbits  # integer division
134             buf.eat(ofs)
135             yield buffer(b, 0, ofs), level
136         else:
137             break
138     while buf.used() >= BLOB_MAX:
139         # limit max blob size
140         yield buf.get(BLOB_MAX), 0
141
142
143 def _hashsplit_iter(files, progress):
144     assert(BLOB_READ_SIZE > BLOB_MAX)
145     basebits = _helpers.blobbits()
146     fanbits = int(math.log(fanout or 128, 2))
147     buf = Buf()
148     for inblock in readfile_iter(files, progress):
149         buf.put(inblock)
150         for buf_and_level in _splitbuf(buf, basebits, fanbits):
151             yield buf_and_level
152     if buf.used():
153         yield buf.get(buf.used()), 0
154
155
156 def _hashsplit_iter_keep_boundaries(files, progress):
157     for real_filenum,f in enumerate(files):
158         if progress:
159             def prog(filenum, nbytes):
160                 # the inner _hashsplit_iter doesn't know the real file count,
161                 # so we'll replace it here.
162                 return progress(real_filenum, nbytes)
163         else:
164             prog = None
165         for buf_and_level in _hashsplit_iter([f], progress=prog):
166             yield buf_and_level
167
168
169 def hashsplit_iter(files, keep_boundaries, progress):
170     if keep_boundaries:
171         return _hashsplit_iter_keep_boundaries(files, progress)
172     else:
173         return _hashsplit_iter(files, progress)
174
175
176 total_split = 0
177 def split_to_blobs(makeblob, files, keep_boundaries, progress):
178     global total_split
179     for (blob, level) in hashsplit_iter(files, keep_boundaries, progress):
180         sha = makeblob(blob)
181         total_split += len(blob)
182         if progress_callback:
183             progress_callback(len(blob))
184         yield (sha, len(blob), level)
185
186
187 def _make_shalist(l):
188     ofs = 0
189     l = list(l)
190     total = sum(size for mode,sha,size, in l)
191     vlen = len('%x' % total)
192     shalist = []
193     for (mode, sha, size) in l:
194         shalist.append((mode, '%0*x' % (vlen,ofs), sha))
195         ofs += size
196     assert(ofs == total)
197     return (shalist, total)
198
199
200 def _squish(maketree, stacks, n):
201     i = 0
202     while i < n or len(stacks[i]) >= MAX_PER_TREE:
203         while len(stacks) <= i+1:
204             stacks.append([])
205         if len(stacks[i]) == 1:
206             stacks[i+1] += stacks[i]
207         elif stacks[i]:
208             (shalist, size) = _make_shalist(stacks[i])
209             tree = maketree(shalist)
210             stacks[i+1].append((GIT_MODE_TREE, tree, size))
211         stacks[i] = []
212         i += 1
213
214
215 def split_to_shalist(makeblob, maketree, files,
216                      keep_boundaries, progress=None):
217     sl = split_to_blobs(makeblob, files, keep_boundaries, progress)
218     assert(fanout != 0)
219     if not fanout:
220         shal = []
221         for (sha,size,level) in sl:
222             shal.append((GIT_MODE_FILE, sha, size))
223         return _make_shalist(shal)[0]
224     else:
225         stacks = [[]]
226         for (sha,size,level) in sl:
227             stacks[0].append((GIT_MODE_FILE, sha, size))
228             _squish(maketree, stacks, level)
229         #log('stacks: %r\n' % [len(i) for i in stacks])
230         _squish(maketree, stacks, len(stacks)-1)
231         #log('stacks: %r\n' % [len(i) for i in stacks])
232         return _make_shalist(stacks[-1])[0]
233
234
235 def split_to_blob_or_tree(makeblob, maketree, files,
236                           keep_boundaries, progress=None):
237     shalist = list(split_to_shalist(makeblob, maketree,
238                                     files, keep_boundaries, progress))
239     if len(shalist) == 1:
240         return (shalist[0][0], shalist[0][2])
241     elif len(shalist) == 0:
242         return (GIT_MODE_FILE, makeblob(''))
243     else:
244         return (GIT_MODE_TREE, maketree(shalist))
245
246
247 def open_noatime(name):
248     fd = _helpers.open_noatime(name)
249     try:
250         return os.fdopen(fd, 'rb', 1024*1024)
251     except:
252         try:
253             os.close(fd)
254         except:
255             pass
256         raise