]> arthur.barton.de Git - bup.git/blob - lib/bup/t/tgit.py
git: accommodate python 3 and enable tests
[bup.git] / lib / bup / t / tgit.py
1
2 from __future__ import absolute_import, print_function
3 from binascii import hexlify, unhexlify
4 from subprocess import check_call
5 import struct, os, time
6
7 from wvtest import *
8
9 from bup import git, path
10 from bup.compat import bytes_from_byte, environ, range
11 from bup.helpers import localtime, log, mkdirp, readpipe
12 from buptest import no_lingering_errors, test_tempdir
13
14
15 bup_exe = path.exe()
16
17
18 def exc(*cmd):
19     print(repr(cmd), file=sys.stderr)
20     check_call(cmd)
21
22
23 def exo(*cmd):
24     print(repr(cmd), file=sys.stderr)
25     return readpipe(cmd)
26
27
28 @wvtest
29 def testmangle():
30     with no_lingering_errors():
31         afile  = 0o100644
32         afile2 = 0o100770
33         alink  = 0o120000
34         adir   = 0o040000
35         adir2  = 0o040777
36         WVPASSEQ(git.mangle_name(b'a', adir2, adir), b'a')
37         WVPASSEQ(git.mangle_name(b'.bup', adir2, adir), b'.bup.bupl')
38         WVPASSEQ(git.mangle_name(b'a.bupa', adir2, adir), b'a.bupa.bupl')
39         WVPASSEQ(git.mangle_name(b'b.bup', alink, alink), b'b.bup.bupl')
40         WVPASSEQ(git.mangle_name(b'b.bu', alink, alink), b'b.bu')
41         WVPASSEQ(git.mangle_name(b'f', afile, afile2), b'f')
42         WVPASSEQ(git.mangle_name(b'f.bup', afile, afile2), b'f.bup.bupl')
43         WVPASSEQ(git.mangle_name(b'f.bup', afile, adir), b'f.bup.bup')
44         WVPASSEQ(git.mangle_name(b'f', afile, adir), b'f.bup')
45
46         WVPASSEQ(git.demangle_name(b'f.bup', afile), (b'f', git.BUP_CHUNKED))
47         WVPASSEQ(git.demangle_name(b'f.bupl', afile), (b'f', git.BUP_NORMAL))
48         WVPASSEQ(git.demangle_name(b'f.bup.bupl', afile), (b'f.bup', git.BUP_NORMAL))
49
50         WVPASSEQ(git.demangle_name(b'.bupm', afile), (b'', git.BUP_NORMAL))
51         WVPASSEQ(git.demangle_name(b'.bupm', adir), (b'', git.BUP_CHUNKED))
52
53         # for safety, we ignore .bup? suffixes we don't recognize.  Future
54         # versions might implement a .bup[a-z] extension as something other
55         # than BUP_NORMAL.
56         WVPASSEQ(git.demangle_name(b'f.bupa', afile), (b'f.bupa', git.BUP_NORMAL))
57
58
59 @wvtest
60 def testencode():
61     with no_lingering_errors():
62         s = b'hello world'
63         looseb = b''.join(git._encode_looseobj(b'blob', s))
64         looset = b''.join(git._encode_looseobj(b'tree', s))
65         loosec = b''.join(git._encode_looseobj(b'commit', s))
66         packb = b''.join(git._encode_packobj(b'blob', s))
67         packt = b''.join(git._encode_packobj(b'tree', s))
68         packc = b''.join(git._encode_packobj(b'commit', s))
69         WVPASSEQ(git._decode_looseobj(looseb), (b'blob', s))
70         WVPASSEQ(git._decode_looseobj(looset), (b'tree', s))
71         WVPASSEQ(git._decode_looseobj(loosec), (b'commit', s))
72         WVPASSEQ(git._decode_packobj(packb), (b'blob', s))
73         WVPASSEQ(git._decode_packobj(packt), (b'tree', s))
74         WVPASSEQ(git._decode_packobj(packc), (b'commit', s))
75         for i in range(10):
76             WVPASS(git._encode_looseobj(b'blob', s, compression_level=i))
77         def encode_pobj(n):
78             return b''.join(git._encode_packobj(b'blob', s, compression_level=n))
79         WVEXCEPT(ValueError, encode_pobj, -1)
80         WVEXCEPT(ValueError, encode_pobj, 10)
81         WVEXCEPT(ValueError, encode_pobj, b'x')
82
83
84 @wvtest
85 def testpacks():
86     with no_lingering_errors():
87         with test_tempdir(b'bup-tgit-') as tmpdir:
88             environ[b'BUP_DIR'] = bupdir = tmpdir + b'/bup'
89             git.init_repo(bupdir)
90             git.verbose = 1
91
92             w = git.PackWriter()
93             w.new_blob(os.urandom(100))
94             w.new_blob(os.urandom(100))
95             w.abort()
96
97             w = git.PackWriter()
98             hashes = []
99             nobj = 1000
100             for i in range(nobj):
101                 hashes.append(w.new_blob(b'%d' % i))
102             log('\n')
103             nameprefix = w.close()
104             print(repr(nameprefix))
105             WVPASS(os.path.exists(nameprefix + b'.pack'))
106             WVPASS(os.path.exists(nameprefix + b'.idx'))
107
108             r = git.open_idx(nameprefix + b'.idx')
109             print(repr(r.fanout))
110
111             for i in range(nobj):
112                 WVPASS(r.find_offset(hashes[i]) > 0)
113             WVPASS(r.exists(hashes[99]))
114             WVFAIL(r.exists(b'\0'*20))
115
116             pi = iter(r)
117             for h in sorted(hashes):
118                 WVPASSEQ(hexlify(next(pi)), hexlify(h))
119
120             WVFAIL(r.find_offset(b'\0'*20))
121
122             r = git.PackIdxList(bupdir + b'/objects/pack')
123             WVPASS(r.exists(hashes[5]))
124             WVPASS(r.exists(hashes[6]))
125             WVFAIL(r.exists(b'\0'*20))
126
127
128 @wvtest
129 def test_pack_name_lookup():
130     with no_lingering_errors():
131         with test_tempdir(b'bup-tgit-') as tmpdir:
132             environ[b'BUP_DIR'] = bupdir = tmpdir + b'/bup'
133             git.init_repo(bupdir)
134             git.verbose = 1
135             packdir = git.repo(b'objects/pack')
136
137             idxnames = []
138             hashes = []
139
140             for start in range(0,28,2):
141                 w = git.PackWriter()
142                 for i in range(start, start+2):
143                     hashes.append(w.new_blob(b'%d' % i))
144                 log('\n')
145                 idxnames.append(os.path.basename(w.close() + b'.idx'))
146
147             r = git.PackIdxList(packdir)
148             WVPASSEQ(len(r.packs), 2)
149             for e,idxname in enumerate(idxnames):
150                 for i in range(e*2, (e+1)*2):
151                     WVPASSEQ(idxname, r.exists(hashes[i], want_source=True))
152
153
154 @wvtest
155 def test_long_index():
156     with no_lingering_errors():
157         with test_tempdir(b'bup-tgit-') as tmpdir:
158             environ[b'BUP_DIR'] = bupdir = tmpdir + b'/bup'
159             git.init_repo(bupdir)
160             w = git.PackWriter()
161             obj_bin = struct.pack('!IIIII',
162                     0x00112233, 0x44556677, 0x88990011, 0x22334455, 0x66778899)
163             obj2_bin = struct.pack('!IIIII',
164                     0x11223344, 0x55667788, 0x99001122, 0x33445566, 0x77889900)
165             obj3_bin = struct.pack('!IIIII',
166                     0x22334455, 0x66778899, 0x00112233, 0x44556677, 0x88990011)
167             pack_bin = struct.pack('!IIIII',
168                     0x99887766, 0x55443322, 0x11009988, 0x77665544, 0x33221100)
169             idx = list(list() for i in range(256))
170             idx[0].append((obj_bin, 1, 0xfffffffff))
171             idx[0x11].append((obj2_bin, 2, 0xffffffffff))
172             idx[0x22].append((obj3_bin, 3, 0xff))
173             w.count = 3
174             name = tmpdir + b'/tmp.idx'
175             r = w._write_pack_idx_v2(name, idx, pack_bin)
176             i = git.PackIdxV2(name, open(name, 'rb'))
177             WVPASSEQ(i.find_offset(obj_bin), 0xfffffffff)
178             WVPASSEQ(i.find_offset(obj2_bin), 0xffffffffff)
179             WVPASSEQ(i.find_offset(obj3_bin), 0xff)
180
181
182 @wvtest
183 def test_check_repo_or_die():
184     with no_lingering_errors():
185         with test_tempdir(b'bup-tgit-') as tmpdir:
186             environ[b'BUP_DIR'] = bupdir = tmpdir + b'/bup'
187             orig_cwd = os.getcwd()
188             try:
189                 os.chdir(tmpdir)
190                 git.init_repo(bupdir)
191                 git.check_repo_or_die()
192                 # if we reach this point the call above passed
193                 WVPASS('check_repo_or_die')
194
195                 os.rename(bupdir + b'/objects/pack',
196                           bupdir + b'/objects/pack.tmp')
197                 open(bupdir + b'/objects/pack', 'w').close()
198                 try:
199                     git.check_repo_or_die()
200                 except SystemExit as e:
201                     WVPASSEQ(e.code, 14)
202                 else:
203                     WVFAIL()
204                 os.unlink(bupdir + b'/objects/pack')
205                 os.rename(bupdir + b'/objects/pack.tmp',
206                           bupdir + b'/objects/pack')
207
208                 try:
209                     git.check_repo_or_die(b'nonexistantbup.tmp')
210                 except SystemExit as e:
211                     WVPASSEQ(e.code, 15)
212                 else:
213                     WVFAIL()
214             finally:
215                 os.chdir(orig_cwd)
216
217
218 @wvtest
219 def test_commit_parsing():
220
221     def restore_env_var(name, val):
222         if val is None:
223             del environ[name]
224         else:
225             environ[name] = val
226
227     def showval(commit, val):
228         return readpipe([b'git', b'show', b'-s',
229                          b'--pretty=format:%s' % val, commit]).strip()
230
231     with no_lingering_errors():
232         with test_tempdir(b'bup-tgit-') as tmpdir:
233             orig_cwd = os.getcwd()
234             workdir = tmpdir + b'/work'
235             repodir = workdir + b'/.git'
236             orig_author_name = environ.get(b'GIT_AUTHOR_NAME')
237             orig_author_email = environ.get(b'GIT_AUTHOR_EMAIL')
238             orig_committer_name = environ.get(b'GIT_COMMITTER_NAME')
239             orig_committer_email = environ.get(b'GIT_COMMITTER_EMAIL')
240             environ[b'GIT_AUTHOR_NAME'] = b'bup test'
241             environ[b'GIT_COMMITTER_NAME'] = environ[b'GIT_AUTHOR_NAME']
242             environ[b'GIT_AUTHOR_EMAIL'] = b'bup@a425bc70a02811e49bdf73ee56450e6f'
243             environ[b'GIT_COMMITTER_EMAIL'] = environ[b'GIT_AUTHOR_EMAIL']
244             try:
245                 readpipe([b'git', b'init', workdir])
246                 environ[b'GIT_DIR'] = environ[b'BUP_DIR'] = repodir
247                 git.check_repo_or_die(repodir)
248                 os.chdir(workdir)
249                 with open('foo', 'w') as f:
250                     print('bar', file=f)
251                 readpipe([b'git', b'add', b'.'])
252                 readpipe([b'git', b'commit', b'-am', b'Do something',
253                           b'--author', b'Someone <someone@somewhere>',
254                           b'--date', b'Sat Oct 3 19:48:49 2009 -0400'])
255                 commit = readpipe([b'git', b'show-ref', b'-s', b'master']).strip()
256                 parents = showval(commit, b'%P')
257                 tree = showval(commit, b'%T')
258                 cname = showval(commit, b'%cn')
259                 cmail = showval(commit, b'%ce')
260                 cdate = showval(commit, b'%ct')
261                 coffs = showval(commit, b'%ci')
262                 coffs = coffs[-5:]
263                 coff = (int(coffs[-4:-2]) * 60 * 60) + (int(coffs[-2:]) * 60)
264                 if bytes_from_byte(coffs[-5]) == b'-':
265                     coff = - coff
266                 commit_items = git.get_commit_items(commit, git.cp())
267                 WVPASSEQ(commit_items.parents, [])
268                 WVPASSEQ(commit_items.tree, tree)
269                 WVPASSEQ(commit_items.author_name, b'Someone')
270                 WVPASSEQ(commit_items.author_mail, b'someone@somewhere')
271                 WVPASSEQ(commit_items.author_sec, 1254613729)
272                 WVPASSEQ(commit_items.author_offset, -(4 * 60 * 60))
273                 WVPASSEQ(commit_items.committer_name, cname)
274                 WVPASSEQ(commit_items.committer_mail, cmail)
275                 WVPASSEQ(commit_items.committer_sec, int(cdate))
276                 WVPASSEQ(commit_items.committer_offset, coff)
277                 WVPASSEQ(commit_items.message, b'Do something\n')
278                 with open(b'bar', 'wb') as f:
279                     f.write(b'baz\n')
280                 readpipe([b'git', b'add', '.'])
281                 readpipe([b'git', b'commit', b'-am', b'Do something else'])
282                 child = readpipe([b'git', b'show-ref', b'-s', b'master']).strip()
283                 parents = showval(child, b'%P')
284                 commit_items = git.get_commit_items(child, git.cp())
285                 WVPASSEQ(commit_items.parents, [commit])
286             finally:
287                 os.chdir(orig_cwd)
288                 restore_env_var(b'GIT_AUTHOR_NAME', orig_author_name)
289                 restore_env_var(b'GIT_AUTHOR_EMAIL', orig_author_email)
290                 restore_env_var(b'GIT_COMMITTER_NAME', orig_committer_name)
291                 restore_env_var(b'GIT_COMMITTER_EMAIL', orig_committer_email)
292
293
294 @wvtest
295 def test_new_commit():
296     with no_lingering_errors():
297         with test_tempdir(b'bup-tgit-') as tmpdir:
298             environ[b'BUP_DIR'] = bupdir = tmpdir + b'/bup'
299             git.init_repo(bupdir)
300             git.verbose = 1
301
302             w = git.PackWriter()
303             tree = os.urandom(20)
304             parent = os.urandom(20)
305             author_name = b'Author'
306             author_mail = b'author@somewhere'
307             adate_sec = 1439657836
308             cdate_sec = adate_sec + 1
309             committer_name = b'Committer'
310             committer_mail = b'committer@somewhere'
311             adate_tz_sec = cdate_tz_sec = None
312             commit = w.new_commit(tree, parent,
313                                   b'%s <%s>' % (author_name, author_mail),
314                                   adate_sec, adate_tz_sec,
315                                   b'%s <%s>' % (committer_name, committer_mail),
316                                   cdate_sec, cdate_tz_sec,
317                                   b'There is a small mailbox here')
318             adate_tz_sec = -60 * 60
319             cdate_tz_sec = 120 * 60
320             commit_off = w.new_commit(tree, parent,
321                                       b'%s <%s>' % (author_name, author_mail),
322                                       adate_sec, adate_tz_sec,
323                                       b'%s <%s>' % (committer_name, committer_mail),
324                                       cdate_sec, cdate_tz_sec,
325                                       b'There is a small mailbox here')
326             w.close()
327
328             commit_items = git.get_commit_items(hexlify(commit), git.cp())
329             local_author_offset = localtime(adate_sec).tm_gmtoff
330             local_committer_offset = localtime(cdate_sec).tm_gmtoff
331             WVPASSEQ(tree, unhexlify(commit_items.tree))
332             WVPASSEQ(1, len(commit_items.parents))
333             WVPASSEQ(parent, unhexlify(commit_items.parents[0]))
334             WVPASSEQ(author_name, commit_items.author_name)
335             WVPASSEQ(author_mail, commit_items.author_mail)
336             WVPASSEQ(adate_sec, commit_items.author_sec)
337             WVPASSEQ(local_author_offset, commit_items.author_offset)
338             WVPASSEQ(committer_name, commit_items.committer_name)
339             WVPASSEQ(committer_mail, commit_items.committer_mail)
340             WVPASSEQ(cdate_sec, commit_items.committer_sec)
341             WVPASSEQ(local_committer_offset, commit_items.committer_offset)
342
343             commit_items = git.get_commit_items(hexlify(commit_off), git.cp())
344             WVPASSEQ(tree, unhexlify(commit_items.tree))
345             WVPASSEQ(1, len(commit_items.parents))
346             WVPASSEQ(parent, unhexlify(commit_items.parents[0]))
347             WVPASSEQ(author_name, commit_items.author_name)
348             WVPASSEQ(author_mail, commit_items.author_mail)
349             WVPASSEQ(adate_sec, commit_items.author_sec)
350             WVPASSEQ(adate_tz_sec, commit_items.author_offset)
351             WVPASSEQ(committer_name, commit_items.committer_name)
352             WVPASSEQ(committer_mail, commit_items.committer_mail)
353             WVPASSEQ(cdate_sec, commit_items.committer_sec)
354             WVPASSEQ(cdate_tz_sec, commit_items.committer_offset)
355
356
357 @wvtest
358 def test_list_refs():
359     with no_lingering_errors():
360         with test_tempdir(b'bup-tgit-') as tmpdir:
361             environ[b'BUP_DIR'] = bupdir = tmpdir + b'/bup'
362             src = tmpdir + b'/src'
363             mkdirp(src)
364             with open(src + b'/1', 'wb+') as f:
365                 f.write(b'something\n')
366             with open(src + b'/2', 'wb+') as f:
367                 f.write(b'something else\n')
368             git.init_repo(bupdir)
369             emptyset = frozenset()
370             WVPASSEQ(frozenset(git.list_refs()), emptyset)
371             WVPASSEQ(frozenset(git.list_refs(limit_to_tags=True)), emptyset)
372             WVPASSEQ(frozenset(git.list_refs(limit_to_heads=True)), emptyset)
373             exc(bup_exe, b'index', src)
374             exc(bup_exe, b'save', b'-n', b'src', b'--strip', src)
375             src_hash = exo(b'git', b'--git-dir', bupdir,
376                            b'rev-parse', b'src').strip().split(b'\n')
377             assert(len(src_hash) == 1)
378             src_hash = unhexlify(src_hash[0])
379             tree_hash = unhexlify(exo(b'git', b'--git-dir', bupdir,
380                                       b'rev-parse',
381                                       b'src:').strip().split(b'\n')[0])
382             blob_hash = unhexlify(exo(b'git', b'--git-dir', bupdir,
383                                       b'rev-parse',
384                                       b'src:1').strip().split(b'\n')[0])
385             WVPASSEQ(frozenset(git.list_refs()),
386                      frozenset([(b'refs/heads/src', src_hash)]))
387             WVPASSEQ(frozenset(git.list_refs(limit_to_tags=True)), emptyset)
388             WVPASSEQ(frozenset(git.list_refs(limit_to_heads=True)),
389                      frozenset([(b'refs/heads/src', src_hash)]))
390             exc(b'git', b'--git-dir', bupdir, b'tag', b'commit-tag', b'src')
391             WVPASSEQ(frozenset(git.list_refs()),
392                      frozenset([(b'refs/heads/src', src_hash),
393                                 (b'refs/tags/commit-tag', src_hash)]))
394             WVPASSEQ(frozenset(git.list_refs(limit_to_tags=True)),
395                      frozenset([(b'refs/tags/commit-tag', src_hash)]))
396             WVPASSEQ(frozenset(git.list_refs(limit_to_heads=True)),
397                      frozenset([(b'refs/heads/src', src_hash)]))
398             exc(b'git', b'--git-dir', bupdir, b'tag', b'tree-tag', b'src:')
399             exc(b'git', b'--git-dir', bupdir, b'tag', b'blob-tag', b'src:1')
400             os.unlink(bupdir + b'/refs/heads/src')
401             expected_tags = frozenset([(b'refs/tags/commit-tag', src_hash),
402                                        (b'refs/tags/tree-tag', tree_hash),
403                                        (b'refs/tags/blob-tag', blob_hash)])
404             WVPASSEQ(frozenset(git.list_refs()), expected_tags)
405             WVPASSEQ(frozenset(git.list_refs(limit_to_heads=True)), frozenset([]))
406             WVPASSEQ(frozenset(git.list_refs(limit_to_tags=True)), expected_tags)
407
408
409 @wvtest
410 def test__git_date_str():
411     with no_lingering_errors():
412         WVPASSEQ(b'0 +0000', git._git_date_str(0, 0))
413         WVPASSEQ(b'0 -0130', git._git_date_str(0, -90 * 60))
414         WVPASSEQ(b'0 +0130', git._git_date_str(0, 90 * 60))
415
416
417 @wvtest
418 def test_cat_pipe():
419     with no_lingering_errors():
420         with test_tempdir(b'bup-tgit-') as tmpdir:
421             environ[b'BUP_DIR'] = bupdir = tmpdir + b'/bup'
422             src = tmpdir + b'/src'
423             mkdirp(src)
424             with open(src + b'/1', 'wb+') as f:
425                 f.write(b'something\n')
426             with open(src + b'/2', 'wb+') as f:
427                 f.write(b'something else\n')
428             git.init_repo(bupdir)
429             exc(bup_exe, b'index', src)
430             oidx = exo(bup_exe, b'save', b'-cn', b'src', b'--strip',
431                        src).strip()
432             typ = exo(b'git', b'--git-dir', bupdir,
433                       b'cat-file', b'-t', b'src').strip()
434             size = int(exo(b'git', b'--git-dir', bupdir,
435                                b'cat-file', b'-s', b'src'))
436             it = git.cp().get(b'src')
437             get_info = next(it)
438             for buf in next(it):
439                 pass
440             WVPASSEQ((oidx, typ, size), get_info)