]> arthur.barton.de Git - bup.git/blob - lib/bup/hashsplit.py
hashsplit: don't import * from bup.helpers
[bup.git] / lib / bup / hashsplit.py
1 import math, os
2
3 from bup import _helpers
4
5 try:
6     _fmincore = _helpers.fmincore
7 except AttributeError, e:
8     _fmincore = None
9
10 _page_size = os.sysconf("SC_PAGE_SIZE")
11
12 BLOB_MAX = 8192*4   # 8192 is the "typical" blob size for bupsplit
13 BLOB_READ_SIZE = 1024*1024
14 MAX_PER_TREE = 256
15 progress_callback = None
16 fanout = 16
17
18 GIT_MODE_FILE = 0100644
19 GIT_MODE_TREE = 040000
20 GIT_MODE_SYMLINK = 0120000
21 assert(GIT_MODE_TREE != 40000)  # 0xxx should be treated as octal
22
23 # The purpose of this type of buffer is to avoid copying on peek(), get(),
24 # and eat().  We do copy the buffer contents on put(), but that should
25 # be ok if we always only put() large amounts of data at a time.
26 class Buf:
27     def __init__(self):
28         self.data = ''
29         self.start = 0
30
31     def put(self, s):
32         if s:
33             self.data = buffer(self.data, self.start) + s
34             self.start = 0
35             
36     def peek(self, count):
37         return buffer(self.data, self.start, count)
38     
39     def eat(self, count):
40         self.start += count
41
42     def get(self, count):
43         v = buffer(self.data, self.start, count)
44         self.start += count
45         return v
46
47     def used(self):
48         return len(self.data) - self.start
49
50
51 def _fadvise_pages_done(fd, first_page, count):
52     assert(first_page >= 0)
53     assert(count >= 0)
54     if count > 0:
55         _helpers.fadvise_done(fd, first_page * _page_size, count * _page_size)
56
57
58 def _nonresident_page_regions(status_bytes, max_region_len=None):
59     """Return (start_page, count) pairs in ascending start_page order for
60     each contiguous region of nonresident pages indicated by the
61     mincore() status_bytes.  Limit the number of pages in each region
62     to max_region_len."""
63     assert(max_region_len is None or max_region_len > 0)
64     start = None
65     for i, x in enumerate(status_bytes):
66         in_core = ord(x) & 1
67         if start is None:
68             if not in_core:
69                 start = i
70         else:
71             count = i - start
72             if in_core:
73                 yield (start, count)
74                 start = None
75             elif max_region_len and count >= max_region_len:
76                 yield (start, count)
77                 start = i
78     if start is not None:
79         yield (start, len(status_bytes) - start)
80
81
82 def _uncache_ours_upto(fd, offset, first_region, remaining_regions):
83     """Uncache the pages of fd indicated by first_region and
84     remaining_regions that are before offset, where each region is a
85     (start_page, count) pair.  The final region must have a start_page
86     of None."""
87     rstart, rlen = first_region
88     while rstart is not None and (rstart + rlen) * _page_size <= offset:
89         _fadvise_pages_done(fd, rstart, rlen)
90         rstart, rlen = next(remaining_regions, (None, None))
91     return (rstart, rlen)
92
93
94 def readfile_iter(files, progress=None):
95     for filenum,f in enumerate(files):
96         ofs = 0
97         b = ''
98         fd = rpr = rstart = rlen = None
99         if _fmincore and hasattr(f, 'fileno'):
100             fd = f.fileno()
101             max_chunk = max(1, (8 * 1024 * 1024) / _page_size)
102             rpr = _nonresident_page_regions(_helpers.fmincore(fd), max_chunk)
103             rstart, rlen = next(rpr, (None, None))
104         while 1:
105             if progress:
106                 progress(filenum, len(b))
107             b = f.read(BLOB_READ_SIZE)
108             ofs += len(b)
109             if rpr:
110                 rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
111             if not b:
112                 break
113             yield b
114         if rpr:
115             rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
116
117
118 def _splitbuf(buf, basebits, fanbits):
119     while 1:
120         b = buf.peek(buf.used())
121         (ofs, bits) = _helpers.splitbuf(b)
122         if ofs:
123             if ofs > BLOB_MAX:
124                 ofs = BLOB_MAX
125                 level = 0
126             else:
127                 level = (bits-basebits)//fanbits  # integer division
128             buf.eat(ofs)
129             yield buffer(b, 0, ofs), level
130         else:
131             break
132     while buf.used() >= BLOB_MAX:
133         # limit max blob size
134         yield buf.get(BLOB_MAX), 0
135
136
137 def _hashsplit_iter(files, progress):
138     assert(BLOB_READ_SIZE > BLOB_MAX)
139     basebits = _helpers.blobbits()
140     fanbits = int(math.log(fanout or 128, 2))
141     buf = Buf()
142     for inblock in readfile_iter(files, progress):
143         buf.put(inblock)
144         for buf_and_level in _splitbuf(buf, basebits, fanbits):
145             yield buf_and_level
146     if buf.used():
147         yield buf.get(buf.used()), 0
148
149
150 def _hashsplit_iter_keep_boundaries(files, progress):
151     for real_filenum,f in enumerate(files):
152         if progress:
153             def prog(filenum, nbytes):
154                 # the inner _hashsplit_iter doesn't know the real file count,
155                 # so we'll replace it here.
156                 return progress(real_filenum, nbytes)
157         else:
158             prog = None
159         for buf_and_level in _hashsplit_iter([f], progress=prog):
160             yield buf_and_level
161
162
163 def hashsplit_iter(files, keep_boundaries, progress):
164     if keep_boundaries:
165         return _hashsplit_iter_keep_boundaries(files, progress)
166     else:
167         return _hashsplit_iter(files, progress)
168
169
170 total_split = 0
171 def split_to_blobs(makeblob, files, keep_boundaries, progress):
172     global total_split
173     for (blob, level) in hashsplit_iter(files, keep_boundaries, progress):
174         sha = makeblob(blob)
175         total_split += len(blob)
176         if progress_callback:
177             progress_callback(len(blob))
178         yield (sha, len(blob), level)
179
180
181 def _make_shalist(l):
182     ofs = 0
183     l = list(l)
184     total = sum(size for mode,sha,size, in l)
185     vlen = len('%x' % total)
186     shalist = []
187     for (mode, sha, size) in l:
188         shalist.append((mode, '%0*x' % (vlen,ofs), sha))
189         ofs += size
190     assert(ofs == total)
191     return (shalist, total)
192
193
194 def _squish(maketree, stacks, n):
195     i = 0
196     while i < n or len(stacks[i]) >= MAX_PER_TREE:
197         while len(stacks) <= i+1:
198             stacks.append([])
199         if len(stacks[i]) == 1:
200             stacks[i+1] += stacks[i]
201         elif stacks[i]:
202             (shalist, size) = _make_shalist(stacks[i])
203             tree = maketree(shalist)
204             stacks[i+1].append((GIT_MODE_TREE, tree, size))
205         stacks[i] = []
206         i += 1
207
208
209 def split_to_shalist(makeblob, maketree, files,
210                      keep_boundaries, progress=None):
211     sl = split_to_blobs(makeblob, files, keep_boundaries, progress)
212     assert(fanout != 0)
213     if not fanout:
214         shal = []
215         for (sha,size,level) in sl:
216             shal.append((GIT_MODE_FILE, sha, size))
217         return _make_shalist(shal)[0]
218     else:
219         stacks = [[]]
220         for (sha,size,level) in sl:
221             stacks[0].append((GIT_MODE_FILE, sha, size))
222             _squish(maketree, stacks, level)
223         #log('stacks: %r\n' % [len(i) for i in stacks])
224         _squish(maketree, stacks, len(stacks)-1)
225         #log('stacks: %r\n' % [len(i) for i in stacks])
226         return _make_shalist(stacks[-1])[0]
227
228
229 def split_to_blob_or_tree(makeblob, maketree, files,
230                           keep_boundaries, progress=None):
231     shalist = list(split_to_shalist(makeblob, maketree,
232                                     files, keep_boundaries, progress))
233     if len(shalist) == 1:
234         return (shalist[0][0], shalist[0][2])
235     elif len(shalist) == 0:
236         return (GIT_MODE_FILE, makeblob(''))
237     else:
238         return (GIT_MODE_TREE, maketree(shalist))
239
240
241 def open_noatime(name):
242     fd = _helpers.open_noatime(name)
243     try:
244         return os.fdopen(fd, 'rb', 1024*1024)
245     except:
246         try:
247             os.close(fd)
248         except:
249             pass
250         raise