]> arthur.barton.de Git - bup.git/blob - lib/bup/hashsplit.py
Move _page_size to helpers.sc_page_size
[bup.git] / lib / bup / hashsplit.py
1 import math, os
2
3 from bup import _helpers
4 from bup.helpers import sc_page_size
5
6 try:
7     _fmincore = _helpers.fmincore
8 except AttributeError, e:
9     _fmincore = None
10
11 BLOB_MAX = 8192*4   # 8192 is the "typical" blob size for bupsplit
12 BLOB_READ_SIZE = 1024*1024
13 MAX_PER_TREE = 256
14 progress_callback = None
15 fanout = 16
16
17 GIT_MODE_FILE = 0100644
18 GIT_MODE_TREE = 040000
19 GIT_MODE_SYMLINK = 0120000
20 assert(GIT_MODE_TREE != 40000)  # 0xxx should be treated as octal
21
22 # The purpose of this type of buffer is to avoid copying on peek(), get(),
23 # and eat().  We do copy the buffer contents on put(), but that should
24 # be ok if we always only put() large amounts of data at a time.
25 class Buf:
26     def __init__(self):
27         self.data = ''
28         self.start = 0
29
30     def put(self, s):
31         if s:
32             self.data = buffer(self.data, self.start) + s
33             self.start = 0
34             
35     def peek(self, count):
36         return buffer(self.data, self.start, count)
37     
38     def eat(self, count):
39         self.start += count
40
41     def get(self, count):
42         v = buffer(self.data, self.start, count)
43         self.start += count
44         return v
45
46     def used(self):
47         return len(self.data) - self.start
48
49
50 def _fadvise_pages_done(fd, first_page, count):
51     assert(first_page >= 0)
52     assert(count >= 0)
53     if count > 0:
54         _helpers.fadvise_done(fd,
55                               first_page * sc_page_size,
56                               count * sc_page_size)
57
58
59 def _nonresident_page_regions(status_bytes, max_region_len=None):
60     """Return (start_page, count) pairs in ascending start_page order for
61     each contiguous region of nonresident pages indicated by the
62     mincore() status_bytes.  Limit the number of pages in each region
63     to max_region_len."""
64     assert(max_region_len is None or max_region_len > 0)
65     start = None
66     for i, x in enumerate(status_bytes):
67         in_core = ord(x) & 1
68         if start is None:
69             if not in_core:
70                 start = i
71         else:
72             count = i - start
73             if in_core:
74                 yield (start, count)
75                 start = None
76             elif max_region_len and count >= max_region_len:
77                 yield (start, count)
78                 start = i
79     if start is not None:
80         yield (start, len(status_bytes) - start)
81
82
83 def _uncache_ours_upto(fd, offset, first_region, remaining_regions):
84     """Uncache the pages of fd indicated by first_region and
85     remaining_regions that are before offset, where each region is a
86     (start_page, count) pair.  The final region must have a start_page
87     of None."""
88     rstart, rlen = first_region
89     while rstart is not None and (rstart + rlen) * sc_page_size <= offset:
90         _fadvise_pages_done(fd, rstart, rlen)
91         rstart, rlen = next(remaining_regions, (None, None))
92     return (rstart, rlen)
93
94
95 def readfile_iter(files, progress=None):
96     for filenum,f in enumerate(files):
97         ofs = 0
98         b = ''
99         fd = rpr = rstart = rlen = None
100         if _fmincore and hasattr(f, 'fileno'):
101             fd = f.fileno()
102             max_chunk = max(1, (8 * 1024 * 1024) / sc_page_size)
103             rpr = _nonresident_page_regions(_helpers.fmincore(fd), max_chunk)
104             rstart, rlen = next(rpr, (None, None))
105         while 1:
106             if progress:
107                 progress(filenum, len(b))
108             b = f.read(BLOB_READ_SIZE)
109             ofs += len(b)
110             if rpr:
111                 rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
112             if not b:
113                 break
114             yield b
115         if rpr:
116             rstart, rlen = _uncache_ours_upto(fd, ofs, (rstart, rlen), rpr)
117
118
119 def _splitbuf(buf, basebits, fanbits):
120     while 1:
121         b = buf.peek(buf.used())
122         (ofs, bits) = _helpers.splitbuf(b)
123         if ofs:
124             if ofs > BLOB_MAX:
125                 ofs = BLOB_MAX
126                 level = 0
127             else:
128                 level = (bits-basebits)//fanbits  # integer division
129             buf.eat(ofs)
130             yield buffer(b, 0, ofs), level
131         else:
132             break
133     while buf.used() >= BLOB_MAX:
134         # limit max blob size
135         yield buf.get(BLOB_MAX), 0
136
137
138 def _hashsplit_iter(files, progress):
139     assert(BLOB_READ_SIZE > BLOB_MAX)
140     basebits = _helpers.blobbits()
141     fanbits = int(math.log(fanout or 128, 2))
142     buf = Buf()
143     for inblock in readfile_iter(files, progress):
144         buf.put(inblock)
145         for buf_and_level in _splitbuf(buf, basebits, fanbits):
146             yield buf_and_level
147     if buf.used():
148         yield buf.get(buf.used()), 0
149
150
151 def _hashsplit_iter_keep_boundaries(files, progress):
152     for real_filenum,f in enumerate(files):
153         if progress:
154             def prog(filenum, nbytes):
155                 # the inner _hashsplit_iter doesn't know the real file count,
156                 # so we'll replace it here.
157                 return progress(real_filenum, nbytes)
158         else:
159             prog = None
160         for buf_and_level in _hashsplit_iter([f], progress=prog):
161             yield buf_and_level
162
163
164 def hashsplit_iter(files, keep_boundaries, progress):
165     if keep_boundaries:
166         return _hashsplit_iter_keep_boundaries(files, progress)
167     else:
168         return _hashsplit_iter(files, progress)
169
170
171 total_split = 0
172 def split_to_blobs(makeblob, files, keep_boundaries, progress):
173     global total_split
174     for (blob, level) in hashsplit_iter(files, keep_boundaries, progress):
175         sha = makeblob(blob)
176         total_split += len(blob)
177         if progress_callback:
178             progress_callback(len(blob))
179         yield (sha, len(blob), level)
180
181
182 def _make_shalist(l):
183     ofs = 0
184     l = list(l)
185     total = sum(size for mode,sha,size, in l)
186     vlen = len('%x' % total)
187     shalist = []
188     for (mode, sha, size) in l:
189         shalist.append((mode, '%0*x' % (vlen,ofs), sha))
190         ofs += size
191     assert(ofs == total)
192     return (shalist, total)
193
194
195 def _squish(maketree, stacks, n):
196     i = 0
197     while i < n or len(stacks[i]) >= MAX_PER_TREE:
198         while len(stacks) <= i+1:
199             stacks.append([])
200         if len(stacks[i]) == 1:
201             stacks[i+1] += stacks[i]
202         elif stacks[i]:
203             (shalist, size) = _make_shalist(stacks[i])
204             tree = maketree(shalist)
205             stacks[i+1].append((GIT_MODE_TREE, tree, size))
206         stacks[i] = []
207         i += 1
208
209
210 def split_to_shalist(makeblob, maketree, files,
211                      keep_boundaries, progress=None):
212     sl = split_to_blobs(makeblob, files, keep_boundaries, progress)
213     assert(fanout != 0)
214     if not fanout:
215         shal = []
216         for (sha,size,level) in sl:
217             shal.append((GIT_MODE_FILE, sha, size))
218         return _make_shalist(shal)[0]
219     else:
220         stacks = [[]]
221         for (sha,size,level) in sl:
222             stacks[0].append((GIT_MODE_FILE, sha, size))
223             _squish(maketree, stacks, level)
224         #log('stacks: %r\n' % [len(i) for i in stacks])
225         _squish(maketree, stacks, len(stacks)-1)
226         #log('stacks: %r\n' % [len(i) for i in stacks])
227         return _make_shalist(stacks[-1])[0]
228
229
230 def split_to_blob_or_tree(makeblob, maketree, files,
231                           keep_boundaries, progress=None):
232     shalist = list(split_to_shalist(makeblob, maketree,
233                                     files, keep_boundaries, progress))
234     if len(shalist) == 1:
235         return (shalist[0][0], shalist[0][2])
236     elif len(shalist) == 0:
237         return (GIT_MODE_FILE, makeblob(''))
238     else:
239         return (GIT_MODE_TREE, maketree(shalist))
240
241
242 def open_noatime(name):
243     fd = _helpers.open_noatime(name)
244     try:
245         return os.fdopen(fd, 'rb', 1024*1024)
246     except:
247         try:
248             os.close(fd)
249         except:
250             pass
251         raise