]> arthur.barton.de Git - netatalk.git/blob - libatalk/tdb/tdb.c
70255d454c0a2e84a995ebd132a259f9a8cf2004
[netatalk.git] / libatalk / tdb / tdb.c
1  /* 
2    Unix SMB/CIFS implementation.
3    Samba database functions
4    Copyright (C) Andrew Tridgell              1999-2000
5    Copyright (C) Luke Kenneth Casson Leighton      2000
6    Copyright (C) Paul `Rusty' Russell              2000
7    Copyright (C) Jeremy Allison                    2000-2003
8    
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 2 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22 */
23
24 #define STANDALONE 1
25
26 /* NOTE: If you use tdbs under valgrind, and in particular if you run
27  * tdbtorture, you may get spurious "uninitialized value" warnings.  I
28  * think this is because valgrind doesn't understand that the mmap'd
29  * area may be written to by other processes.  Memory can, from the
30  * point of view of the grinded process, spontaneously become
31  * initialized.
32  *
33  * I can think of a few solutions.  [mbp 20030311]
34  *
35  * 1 - Write suppressions for Valgrind so that it doesn't complain
36  * about this.  Probably the most reasonable but people need to
37  * remember to use them.
38  *
39  * 2 - Use IO not mmap when running under valgrind.  Not so nice.
40  *
41  * 3 - Use the special valgrind macros to mark memory as valid at the
42  * right time.  Probably too hard -- the process just doesn't know.
43  */ 
44
45 #ifdef STANDALONE
46 #if HAVE_CONFIG_H
47 #include <config.h>
48 #endif
49
50 #define _XOPEN_SOURCE 500
51 #include <unistd.h>
52 #include <stdlib.h>
53 #include <stdio.h>
54 #include <fcntl.h>
55 #include <unistd.h>
56 #include <string.h>
57 #include <fcntl.h>
58 #include <errno.h>
59 #include <sys/mman.h>
60 #include <sys/stat.h>
61 #include <signal.h>
62 #include "spinlock.h"
63 #else
64 #include "includes.h"
65 #endif
66
67 #define TDB_MAGIC_FOOD "TDB file\n"
68 #define TDB_VERSION (0x26011967 + 6)
69 #define TDB_MAGIC (0x26011999U)
70 #define TDB_FREE_MAGIC (~TDB_MAGIC)
71 #define TDB_DEAD_MAGIC (0xFEE1DEAD)
72 #define TDB_ALIGNMENT 4
73 #define MIN_REC_SIZE (2*sizeof(struct list_struct) + TDB_ALIGNMENT)
74 #define DEFAULT_HASH_SIZE 131
75 #define TDB_PAGE_SIZE 0x2000
76 #define FREELIST_TOP (sizeof(struct tdb_header))
77 #define TDB_ALIGN(x,a) (((x) + (a)-1) & ~((a)-1))
78 #define TDB_BYTEREV(x) (((((x)&0xff)<<24)|((x)&0xFF00)<<8)|(((x)>>8)&0xFF00)|((x)>>24))
79 #define TDB_DEAD(r) ((r)->magic == TDB_DEAD_MAGIC)
80 #define TDB_BAD_MAGIC(r) ((r)->magic != TDB_MAGIC && !TDB_DEAD(r))
81 #define TDB_HASH_TOP(hash) (FREELIST_TOP + (BUCKET(hash)+1)*sizeof(tdb_off))
82
83 /* NB assumes there is a local variable called "tdb" that is the
84  * current context, also takes doubly-parenthesized print-style
85  * argument. */
86 #define TDB_LOG(x) (tdb->log_fn?((tdb->log_fn x),0) : 0)
87
88 /* lock offsets */
89 #define GLOBAL_LOCK 0
90 #define ACTIVE_LOCK 4
91
92 #ifndef MAP_FILE
93 #define MAP_FILE 0
94 #endif
95
96 #ifndef MAP_FAILED
97 #define MAP_FAILED ((void *)-1)
98 #endif
99
100 /* free memory if the pointer is valid and zero the pointer */
101 #ifndef SAFE_FREE
102 #define SAFE_FREE(x) do { if ((x) != NULL) {free((x)); (x)=NULL;} } while(0)
103 #endif
104
105 #define BUCKET(hash) ((hash) % tdb->header.hash_size)
106 TDB_DATA tdb_null;
107
108 /* all contexts, to ensure no double-opens (fcntl locks don't nest!) */
109 static TDB_CONTEXT *tdbs = NULL;
110
111 static int tdb_munmap(TDB_CONTEXT *tdb)
112 {
113         if (tdb->flags & TDB_INTERNAL)
114                 return 0;
115
116 #ifdef HAVE_MMAP
117         if (tdb->map_ptr) {
118                 int ret = munmap(tdb->map_ptr, tdb->map_size);
119                 if (ret != 0)
120                         return ret;
121         }
122 #endif
123         tdb->map_ptr = NULL;
124         return 0;
125 }
126
127 static void tdb_mmap(TDB_CONTEXT *tdb)
128 {
129         if (tdb->flags & TDB_INTERNAL)
130                 return;
131
132 #ifdef HAVE_MMAP
133         if (!(tdb->flags & TDB_NOMMAP)) {
134                 tdb->map_ptr = mmap(NULL, tdb->map_size, 
135                                     PROT_READ|(tdb->read_only? 0:PROT_WRITE), 
136                                     MAP_SHARED|MAP_FILE, tdb->fd, 0);
137
138                 /*
139                  * NB. When mmap fails it returns MAP_FAILED *NOT* NULL !!!!
140                  */
141
142                 if (tdb->map_ptr == MAP_FAILED) {
143                         tdb->map_ptr = NULL;
144                         TDB_LOG((tdb, 2, "tdb_mmap failed for size %d (%s)\n", 
145                                  tdb->map_size, strerror(errno)));
146                 }
147         } else {
148                 tdb->map_ptr = NULL;
149         }
150 #else
151         tdb->map_ptr = NULL;
152 #endif
153 }
154
155 /* Endian conversion: we only ever deal with 4 byte quantities */
156 static void *convert(void *buf, u32 size)
157 {
158         u32 i, *p = buf;
159         for (i = 0; i < size / 4; i++)
160                 p[i] = TDB_BYTEREV(p[i]);
161         return buf;
162 }
163 #define DOCONV() (tdb->flags & TDB_CONVERT)
164 #define CONVERT(x) (DOCONV() ? convert(&x, sizeof(x)) : &x)
165
166 /* the body of the database is made of one list_struct for the free space
167    plus a separate data list for each hash value */
168 struct list_struct {
169         tdb_off next; /* offset of the next record in the list */
170         tdb_len rec_len; /* total byte length of record */
171         tdb_len key_len; /* byte length of key */
172         tdb_len data_len; /* byte length of data */
173         u32 full_hash; /* the full 32 bit hash of the key */
174         u32 magic;   /* try to catch errors */
175         /* the following union is implied:
176                 union {
177                         char record[rec_len];
178                         struct {
179                                 char key[key_len];
180                                 char data[data_len];
181                         }
182                         u32 totalsize; (tailer)
183                 }
184         */
185 };
186
187 /***************************************************************
188  Allow a caller to set a "alarm" flag that tdb can check to abort
189  a blocking lock on SIGALRM.
190 ***************************************************************/
191
192 static sig_atomic_t *palarm_fired;
193
194 void tdb_set_lock_alarm(sig_atomic_t *palarm)
195 {
196         palarm_fired = palarm;
197 }
198
199 /* a byte range locking function - return 0 on success
200    this functions locks/unlocks 1 byte at the specified offset.
201
202    On error, errno is also set so that errors are passed back properly
203    through tdb_open(). */
204 static int tdb_brlock(TDB_CONTEXT *tdb, tdb_off offset, 
205                       int rw_type, int lck_type, int probe)
206 {
207         struct flock fl;
208         int ret;
209
210         if (tdb->flags & TDB_NOLOCK)
211                 return 0;
212         if ((rw_type == F_WRLCK) && (tdb->read_only)) {
213                 errno = EACCES;
214                 return -1;
215         }
216
217         fl.l_type = rw_type;
218         fl.l_whence = SEEK_SET;
219         fl.l_start = offset;
220         fl.l_len = 1;
221         fl.l_pid = 0;
222
223         do {
224                 ret = fcntl(tdb->fd,lck_type,&fl);
225                 if (ret == -1 && errno == EINTR && palarm_fired && *palarm_fired)
226                         break;
227         } while (ret == -1 && errno == EINTR);
228
229         if (ret == -1) {
230                 if (!probe && lck_type != F_SETLK) {
231                         /* Ensure error code is set for log fun to examine. */
232                         if (errno == EINTR && palarm_fired && *palarm_fired)
233                                 tdb->ecode = TDB_ERR_LOCK_TIMEOUT;
234                         else
235                                 tdb->ecode = TDB_ERR_LOCK;
236                         TDB_LOG((tdb, 5,"tdb_brlock failed (fd=%d) at offset %d rw_type=%d lck_type=%d\n", 
237                                  tdb->fd, offset, rw_type, lck_type));
238                 }
239                 /* Was it an alarm timeout ? */
240                 if (errno == EINTR && palarm_fired && *palarm_fired)
241                         return TDB_ERRCODE(TDB_ERR_LOCK_TIMEOUT, -1);
242                 /* Otherwise - generic lock error. */
243                 /* errno set by fcntl */
244                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
245         }
246         return 0;
247 }
248
249 /* lock a list in the database. list -1 is the alloc list */
250 static int tdb_lock(TDB_CONTEXT *tdb, int list, int ltype)
251 {
252         if (list < -1 || list >= (int)tdb->header.hash_size) {
253                 TDB_LOG((tdb, 0,"tdb_lock: invalid list %d for ltype=%d\n", 
254                            list, ltype));
255                 return -1;
256         }
257         if (tdb->flags & TDB_NOLOCK)
258                 return 0;
259
260         /* Since fcntl locks don't nest, we do a lock for the first one,
261            and simply bump the count for future ones */
262         if (tdb->locked[list+1].count == 0) {
263                 if (!tdb->read_only && tdb->header.rwlocks) {
264                         if (tdb_spinlock(tdb, list, ltype)) {
265                                 TDB_LOG((tdb, 0, "tdb_lock spinlock failed on list ltype=%d\n", 
266                                            list, ltype));
267                                 return -1;
268                         }
269                 } else if (tdb_brlock(tdb,FREELIST_TOP+4*list,ltype,F_SETLKW, 0)) {
270                         TDB_LOG((tdb, 0,"tdb_lock failed on list %d ltype=%d (%s)\n", 
271                                            list, ltype, strerror(errno)));
272                         return -1;
273                 }
274                 tdb->locked[list+1].ltype = ltype;
275         }
276         tdb->locked[list+1].count++;
277         return 0;
278 }
279
280 /* unlock the database: returns void because it's too late for errors. */
281         /* changed to return int it may be interesting to know there
282            has been an error  --simo */
283 static int tdb_unlock(TDB_CONTEXT *tdb, int list, int ltype)
284 {
285         int ret = -1;
286
287         if (tdb->flags & TDB_NOLOCK)
288                 return 0;
289
290         /* Sanity checks */
291         if (list < -1 || list >= (int)tdb->header.hash_size) {
292                 TDB_LOG((tdb, 0, "tdb_unlock: list %d invalid (%d)\n", list, tdb->header.hash_size));
293                 return ret;
294         }
295
296         if (tdb->locked[list+1].count==0) {
297                 TDB_LOG((tdb, 0, "tdb_unlock: count is 0\n"));
298                 return ret;
299         }
300
301         if (tdb->locked[list+1].count == 1) {
302                 /* Down to last nested lock: unlock underneath */
303                 if (!tdb->read_only && tdb->header.rwlocks) {
304                         ret = tdb_spinunlock(tdb, list, ltype);
305                 } else {
306                         ret = tdb_brlock(tdb, FREELIST_TOP+4*list, F_UNLCK, F_SETLKW, 0);
307                 }
308         } else {
309                 ret = 0;
310         }
311         tdb->locked[list+1].count--;
312
313         if (ret)
314                 TDB_LOG((tdb, 0,"tdb_unlock: An error occurred unlocking!\n")); 
315         return ret;
316 }
317
318 /* This is based on the hash algorithm from gdbm */
319 static u32 tdb_hash(TDB_DATA *key)
320 {
321         u32 value;      /* Used to compute the hash value.  */
322         u32   i;        /* Used to cycle through random values. */
323
324         /* Set the initial value from the key size. */
325         for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
326                 value = (value + (key->dptr[i] << (i*5 % 24)));
327
328         return (1103515243 * value + 12345);  
329 }
330
331 /* check for an out of bounds access - if it is out of bounds then
332    see if the database has been expanded by someone else and expand
333    if necessary 
334    note that "len" is the minimum length needed for the db
335 */
336 static int tdb_oob(TDB_CONTEXT *tdb, tdb_off len, int probe)
337 {
338         struct stat st;
339         if (len <= tdb->map_size)
340                 return 0;
341         if (tdb->flags & TDB_INTERNAL) {
342                 if (!probe) {
343                         /* Ensure ecode is set for log fn. */
344                         tdb->ecode = TDB_ERR_IO;
345                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond internal malloc size %d\n",
346                                  (int)len, (int)tdb->map_size));
347                 }
348                 return TDB_ERRCODE(TDB_ERR_IO, -1);
349         }
350
351         if (fstat(tdb->fd, &st) == -1)
352                 return TDB_ERRCODE(TDB_ERR_IO, -1);
353
354         if (st.st_size < (size_t)len) {
355                 if (!probe) {
356                         /* Ensure ecode is set for log fn. */
357                         tdb->ecode = TDB_ERR_IO;
358                         TDB_LOG((tdb, 0,"tdb_oob len %d beyond eof at %d\n",
359                                  (int)len, (int)st.st_size));
360                 }
361                 return TDB_ERRCODE(TDB_ERR_IO, -1);
362         }
363
364         /* Unmap, update size, remap */
365         if (tdb_munmap(tdb) == -1)
366                 return TDB_ERRCODE(TDB_ERR_IO, -1);
367         tdb->map_size = st.st_size;
368         tdb_mmap(tdb);
369         return 0;
370 }
371
372 /* write a lump of data at a specified offset */
373 static int tdb_write(TDB_CONTEXT *tdb, tdb_off off, void *buf, tdb_len len)
374 {
375         if (tdb_oob(tdb, off + len, 0) != 0)
376                 return -1;
377
378         if (tdb->map_ptr)
379                 memcpy(off + (char *)tdb->map_ptr, buf, len);
380 #ifdef HAVE_PWRITE
381         else if (pwrite(tdb->fd, buf, len, off) != (ssize_t)len) {
382 #else
383         else if (lseek(tdb->fd, off, SEEK_SET) != off
384                  || write(tdb->fd, buf, len) != (ssize_t)len) {
385 #endif
386                 /* Ensure ecode is set for log fn. */
387                 tdb->ecode = TDB_ERR_IO;
388                 TDB_LOG((tdb, 0,"tdb_write failed at %d len=%d (%s)\n",
389                            off, len, strerror(errno)));
390                 return TDB_ERRCODE(TDB_ERR_IO, -1);
391         }
392         return 0;
393 }
394
395 /* read a lump of data at a specified offset, maybe convert */
396 static int tdb_read(TDB_CONTEXT *tdb,tdb_off off,void *buf,tdb_len len,int cv)
397 {
398         if (tdb_oob(tdb, off + len, 0) != 0)
399                 return -1;
400
401         if (tdb->map_ptr)
402                 memcpy(buf, off + (char *)tdb->map_ptr, len);
403 #ifdef HAVE_PREAD
404         else if (pread(tdb->fd, buf, len, off) != (ssize_t)len) {
405 #else
406         else if (lseek(tdb->fd, off, SEEK_SET) != off
407                  || read(tdb->fd, buf, len) != (ssize_t)len) {
408 #endif
409                 /* Ensure ecode is set for log fn. */
410                 tdb->ecode = TDB_ERR_IO;
411                 TDB_LOG((tdb, 0,"tdb_read failed at %d len=%d (%s)\n",
412                            off, len, strerror(errno)));
413                 return TDB_ERRCODE(TDB_ERR_IO, -1);
414         }
415         if (cv)
416                 convert(buf, len);
417         return 0;
418 }
419
420 /* read a lump of data, allocating the space for it */
421 static char *tdb_alloc_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_len len)
422 {
423         char *buf;
424
425         if (!(buf = malloc(len))) {
426                 /* Ensure ecode is set for log fn. */
427                 tdb->ecode = TDB_ERR_OOM;
428                 TDB_LOG((tdb, 0,"tdb_alloc_read malloc failed len=%d (%s)\n",
429                            len, strerror(errno)));
430                 return TDB_ERRCODE(TDB_ERR_OOM, buf);
431         }
432         if (tdb_read(tdb, offset, buf, len, 0) == -1) {
433                 SAFE_FREE(buf);
434                 return NULL;
435         }
436         return buf;
437 }
438
439 /* read/write a tdb_off */
440 static int ofs_read(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
441 {
442         return tdb_read(tdb, offset, (char*)d, sizeof(*d), DOCONV());
443 }
444 static int ofs_write(TDB_CONTEXT *tdb, tdb_off offset, tdb_off *d)
445 {
446         tdb_off off = *d;
447         return tdb_write(tdb, offset, CONVERT(off), sizeof(*d));
448 }
449
450 /* read/write a record */
451 static int rec_read(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
452 {
453         if (tdb_read(tdb, offset, rec, sizeof(*rec),DOCONV()) == -1)
454                 return -1;
455         if (TDB_BAD_MAGIC(rec)) {
456                 /* Ensure ecode is set for log fn. */
457                 tdb->ecode = TDB_ERR_CORRUPT;
458                 TDB_LOG((tdb, 0,"rec_read bad magic 0x%x at offset=%d\n", rec->magic, offset));
459                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
460         }
461         return tdb_oob(tdb, rec->next+sizeof(*rec), 0);
462 }
463 static int rec_write(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
464 {
465         struct list_struct r = *rec;
466         return tdb_write(tdb, offset, CONVERT(r), sizeof(r));
467 }
468
469 /* read a freelist record and check for simple errors */
470 static int rec_free_read(TDB_CONTEXT *tdb, tdb_off off, struct list_struct *rec)
471 {
472         if (tdb_read(tdb, off, rec, sizeof(*rec),DOCONV()) == -1)
473                 return -1;
474
475         if (rec->magic == TDB_MAGIC) {
476                 /* this happens when a app is showdown while deleting a record - we should
477                    not completely fail when this happens */
478                 TDB_LOG((tdb, 0,"rec_free_read non-free magic at offset=%d - fixing\n", 
479                          rec->magic, off));
480                 rec->magic = TDB_FREE_MAGIC;
481                 if (tdb_write(tdb, off, rec, sizeof(*rec)) == -1)
482                         return -1;
483         }
484
485         if (rec->magic != TDB_FREE_MAGIC) {
486                 /* Ensure ecode is set for log fn. */
487                 tdb->ecode = TDB_ERR_CORRUPT;
488                 TDB_LOG((tdb, 0,"rec_free_read bad magic 0x%x at offset=%d\n", 
489                            rec->magic, off));
490                 return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
491         }
492         if (tdb_oob(tdb, rec->next+sizeof(*rec), 0) != 0)
493                 return -1;
494         return 0;
495 }
496
497 /* update a record tailer (must hold allocation lock) */
498 static int update_tailer(TDB_CONTEXT *tdb, tdb_off offset,
499                          const struct list_struct *rec)
500 {
501         tdb_off totalsize;
502
503         /* Offset of tailer from record header */
504         totalsize = sizeof(*rec) + rec->rec_len;
505         return ofs_write(tdb, offset + totalsize - sizeof(tdb_off),
506                          &totalsize);
507 }
508
509 static tdb_off tdb_dump_record(TDB_CONTEXT *tdb, tdb_off offset)
510 {
511         struct list_struct rec;
512         tdb_off tailer_ofs, tailer;
513
514         if (tdb_read(tdb, offset, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
515                 printf("ERROR: failed to read record at %u\n", offset);
516                 return 0;
517         }
518
519         printf(" rec: offset=%u next=%d rec_len=%d key_len=%d data_len=%d full_hash=0x%x magic=0x%x\n",
520                offset, rec.next, rec.rec_len, rec.key_len, rec.data_len, rec.full_hash, rec.magic);
521
522         tailer_ofs = offset + sizeof(rec) + rec.rec_len - sizeof(tdb_off);
523         if (ofs_read(tdb, tailer_ofs, &tailer) == -1) {
524                 printf("ERROR: failed to read tailer at %u\n", tailer_ofs);
525                 return rec.next;
526         }
527
528         if (tailer != rec.rec_len + sizeof(rec)) {
529                 printf("ERROR: tailer does not match record! tailer=%u totalsize=%u\n",
530                                 (unsigned)tailer, (unsigned)(rec.rec_len + sizeof(rec)));
531         }
532         return rec.next;
533 }
534
535 static int tdb_dump_chain(TDB_CONTEXT *tdb, int i)
536 {
537         tdb_off rec_ptr, top;
538
539         top = TDB_HASH_TOP(i);
540
541         if (tdb_lock(tdb, i, F_WRLCK) != 0)
542                 return -1;
543
544         if (ofs_read(tdb, top, &rec_ptr) == -1)
545                 return tdb_unlock(tdb, i, F_WRLCK);
546
547         if (rec_ptr)
548                 printf("hash=%d\n", i);
549
550         while (rec_ptr) {
551                 rec_ptr = tdb_dump_record(tdb, rec_ptr);
552         }
553
554         return tdb_unlock(tdb, i, F_WRLCK);
555 }
556
557 void tdb_dump_all(TDB_CONTEXT *tdb)
558 {
559         unsigned int i;
560         for (i=0;i<tdb->header.hash_size;i++) {
561                 tdb_dump_chain(tdb, i);
562         }
563         printf("freelist:\n");
564         tdb_dump_chain(tdb, -1);
565 }
566
567 int tdb_printfreelist(TDB_CONTEXT *tdb)
568 {
569         int ret;
570         long total_free = 0;
571         tdb_off offset, rec_ptr;
572         struct list_struct rec;
573
574         if ((ret = tdb_lock(tdb, -1, F_WRLCK)) != 0)
575                 return ret;
576
577         offset = FREELIST_TOP;
578
579         /* read in the freelist top */
580         if (ofs_read(tdb, offset, &rec_ptr) == -1) {
581                 tdb_unlock(tdb, -1, F_WRLCK);
582                 return 0;
583         }
584
585         printf("freelist top=[0x%08x]\n", rec_ptr );
586         while (rec_ptr) {
587                 if (tdb_read(tdb, rec_ptr, (char *)&rec, sizeof(rec), DOCONV()) == -1) {
588                         tdb_unlock(tdb, -1, F_WRLCK);
589                         return -1;
590                 }
591
592                 if (rec.magic != TDB_FREE_MAGIC) {
593                         printf("bad magic 0x%08x in free list\n", rec.magic);
594                         tdb_unlock(tdb, -1, F_WRLCK);
595                         return -1;
596                 }
597
598                 printf("entry offset=[0x%08x], rec.rec_len = [0x%08x (%d)]\n", rec.next, rec.rec_len, rec.rec_len );
599                 total_free += rec.rec_len;
600
601                 /* move to the next record */
602                 rec_ptr = rec.next;
603         }
604         printf("total rec_len = [0x%08x (%d)]\n", (int)total_free, 
605                (int)total_free);
606
607         return tdb_unlock(tdb, -1, F_WRLCK);
608 }
609
610 /* Remove an element from the freelist.  Must have alloc lock. */
611 static int remove_from_freelist(TDB_CONTEXT *tdb, tdb_off off, tdb_off next)
612 {
613         tdb_off last_ptr, i;
614
615         /* read in the freelist top */
616         last_ptr = FREELIST_TOP;
617         while (ofs_read(tdb, last_ptr, &i) != -1 && i != 0) {
618                 if (i == off) {
619                         /* We've found it! */
620                         return ofs_write(tdb, last_ptr, &next);
621                 }
622                 /* Follow chain (next offset is at start of record) */
623                 last_ptr = i;
624         }
625         TDB_LOG((tdb, 0,"remove_from_freelist: not on list at off=%d\n", off));
626         return TDB_ERRCODE(TDB_ERR_CORRUPT, -1);
627 }
628
629 /* Add an element into the freelist. Merge adjacent records if
630    neccessary. */
631 static int tdb_free(TDB_CONTEXT *tdb, tdb_off offset, struct list_struct *rec)
632 {
633         tdb_off right, left;
634
635         /* Allocation and tailer lock */
636         if (tdb_lock(tdb, -1, F_WRLCK) != 0)
637                 return -1;
638
639         /* set an initial tailer, so if we fail we don't leave a bogus record */
640         if (update_tailer(tdb, offset, rec) != 0) {
641                 TDB_LOG((tdb, 0, "tdb_free: upfate_tailer failed!\n"));
642                 goto fail;
643         }
644
645         /* Look right first (I'm an Australian, dammit) */
646         right = offset + sizeof(*rec) + rec->rec_len;
647         if (right + sizeof(*rec) <= tdb->map_size) {
648                 struct list_struct r;
649
650                 if (tdb_read(tdb, right, &r, sizeof(r), DOCONV()) == -1) {
651                         TDB_LOG((tdb, 0, "tdb_free: right read failed at %u\n", right));
652                         goto left;
653                 }
654
655                 /* If it's free, expand to include it. */
656                 if (r.magic == TDB_FREE_MAGIC) {
657                         if (remove_from_freelist(tdb, right, r.next) == -1) {
658                                 TDB_LOG((tdb, 0, "tdb_free: right free failed at %u\n", right));
659                                 goto left;
660                         }
661                         rec->rec_len += sizeof(r) + r.rec_len;
662                 }
663         }
664
665 left:
666         /* Look left */
667         left = offset - sizeof(tdb_off);
668         if (left > TDB_HASH_TOP(tdb->header.hash_size-1)) {
669                 struct list_struct l;
670                 tdb_off leftsize;
671
672                 /* Read in tailer and jump back to header */
673                 if (ofs_read(tdb, left, &leftsize) == -1) {
674                         TDB_LOG((tdb, 0, "tdb_free: left offset read failed at %u\n", left));
675                         goto update;
676                 }
677                 left = offset - leftsize;
678
679                 /* Now read in record */
680                 if (tdb_read(tdb, left, &l, sizeof(l), DOCONV()) == -1) {
681                         TDB_LOG((tdb, 0, "tdb_free: left read failed at %u (%u)\n", left, leftsize));
682                         goto update;
683                 }
684
685                 /* If it's free, expand to include it. */
686                 if (l.magic == TDB_FREE_MAGIC) {
687                         if (remove_from_freelist(tdb, left, l.next) == -1) {
688                                 TDB_LOG((tdb, 0, "tdb_free: left free failed at %u\n", left));
689                                 goto update;
690                         } else {
691                                 offset = left;
692                                 rec->rec_len += leftsize;
693                         }
694                 }
695         }
696
697 update:
698         if (update_tailer(tdb, offset, rec) == -1) {
699                 TDB_LOG((tdb, 0, "tdb_free: update_tailer failed at %u\n", offset));
700                 goto fail;
701         }
702
703         /* Now, prepend to free list */
704         rec->magic = TDB_FREE_MAGIC;
705
706         if (ofs_read(tdb, FREELIST_TOP, &rec->next) == -1 ||
707             rec_write(tdb, offset, rec) == -1 ||
708             ofs_write(tdb, FREELIST_TOP, &offset) == -1) {
709                 TDB_LOG((tdb, 0, "tdb_free record write failed at offset=%d\n", offset));
710                 goto fail;
711         }
712
713         /* And we're done. */
714         tdb_unlock(tdb, -1, F_WRLCK);
715         return 0;
716
717  fail:
718         tdb_unlock(tdb, -1, F_WRLCK);
719         return -1;
720 }
721
722
723 /* expand a file.  we prefer to use ftruncate, as that is what posix
724   says to use for mmap expansion */
725 static int expand_file(TDB_CONTEXT *tdb, tdb_off size, tdb_off addition)
726 {
727         char buf[1024];
728 #if HAVE_FTRUNCATE_EXTEND
729         if (ftruncate(tdb->fd, size+addition) != 0) {
730                 TDB_LOG((tdb, 0, "expand_file ftruncate to %d failed (%s)\n", 
731                            size+addition, strerror(errno)));
732                 return -1;
733         }
734 #else
735         char b = 0;
736
737 #ifdef HAVE_PWRITE
738         if (pwrite(tdb->fd,  &b, 1, (size+addition) - 1) != 1) {
739 #else
740         if (lseek(tdb->fd, (size+addition) - 1, SEEK_SET) != (size+addition) - 1 || 
741             write(tdb->fd, &b, 1) != 1) {
742 #endif
743                 TDB_LOG((tdb, 0, "expand_file to %d failed (%s)\n", 
744                            size+addition, strerror(errno)));
745                 return -1;
746         }
747 #endif
748
749         /* now fill the file with something. This ensures that the file isn't sparse, which would be
750            very bad if we ran out of disk. This must be done with write, not via mmap */
751         memset(buf, 0x42, sizeof(buf));
752         while (addition) {
753                 int n = addition>sizeof(buf)?sizeof(buf):addition;
754 #ifdef HAVE_PWRITE
755                 int ret = pwrite(tdb->fd, buf, n, size);
756 #else
757                 int ret;
758                 if (lseek(tdb->fd, size, SEEK_SET) != size)
759                         return -1;
760                 ret = write(tdb->fd, buf, n);
761 #endif
762                 if (ret != n) {
763                         TDB_LOG((tdb, 0, "expand_file write of %d failed (%s)\n", 
764                                    n, strerror(errno)));
765                         return -1;
766                 }
767                 addition -= n;
768                 size += n;
769         }
770         return 0;
771 }
772
773
774 /* expand the database at least size bytes by expanding the underlying
775    file and doing the mmap again if necessary */
776 static int tdb_expand(TDB_CONTEXT *tdb, tdb_off size)
777 {
778         struct list_struct rec;
779         tdb_off offset;
780
781         if (tdb_lock(tdb, -1, F_WRLCK) == -1) {
782                 TDB_LOG((tdb, 0, "lock failed in tdb_expand\n"));
783                 return -1;
784         }
785
786         /* must know about any previous expansions by another process */
787         tdb_oob(tdb, tdb->map_size + 1, 1);
788
789         /* always make room for at least 10 more records, and round
790            the database up to a multiple of TDB_PAGE_SIZE */
791         size = TDB_ALIGN(tdb->map_size + size*10, TDB_PAGE_SIZE) - tdb->map_size;
792
793         if (!(tdb->flags & TDB_INTERNAL))
794                 tdb_munmap(tdb);
795
796         /*
797          * We must ensure the file is unmapped before doing this
798          * to ensure consistency with systems like OpenBSD where
799          * writes and mmaps are not consistent.
800          */
801
802         /* expand the file itself */
803         if (!(tdb->flags & TDB_INTERNAL)) {
804                 if (expand_file(tdb, tdb->map_size, size) != 0)
805                         goto fail;
806         }
807
808         tdb->map_size += size;
809
810         if (tdb->flags & TDB_INTERNAL)
811                 tdb->map_ptr = realloc(tdb->map_ptr, tdb->map_size);
812         else {
813                 /*
814                  * We must ensure the file is remapped before adding the space
815                  * to ensure consistency with systems like OpenBSD where
816                  * writes and mmaps are not consistent.
817                  */
818
819                 /* We're ok if the mmap fails as we'll fallback to read/write */
820                 tdb_mmap(tdb);
821         }
822
823         /* form a new freelist record */
824         memset(&rec,'\0',sizeof(rec));
825         rec.rec_len = size - sizeof(rec);
826
827         /* link it into the free list */
828         offset = tdb->map_size - size;
829         if (tdb_free(tdb, offset, &rec) == -1)
830                 goto fail;
831
832         tdb_unlock(tdb, -1, F_WRLCK);
833         return 0;
834  fail:
835         tdb_unlock(tdb, -1, F_WRLCK);
836         return -1;
837 }
838
839 /* allocate some space from the free list. The offset returned points
840    to a unconnected list_struct within the database with room for at
841    least length bytes of total data
842
843    0 is returned if the space could not be allocated
844  */
845 static tdb_off tdb_allocate(TDB_CONTEXT *tdb, tdb_len length,
846                             struct list_struct *rec)
847 {
848         tdb_off rec_ptr, last_ptr, newrec_ptr;
849         struct list_struct newrec;
850
851         if (tdb_lock(tdb, -1, F_WRLCK) == -1)
852                 return 0;
853
854         /* Extra bytes required for tailer */
855         length += sizeof(tdb_off);
856
857  again:
858         last_ptr = FREELIST_TOP;
859
860         /* read in the freelist top */
861         if (ofs_read(tdb, FREELIST_TOP, &rec_ptr) == -1)
862                 goto fail;
863
864         /* keep looking until we find a freelist record big enough */
865         while (rec_ptr) {
866                 if (rec_free_read(tdb, rec_ptr, rec) == -1)
867                         goto fail;
868
869                 if (rec->rec_len >= length) {
870                         /* found it - now possibly split it up  */
871                         if (rec->rec_len > length + MIN_REC_SIZE) {
872                                 /* Length of left piece */
873                                 length = TDB_ALIGN(length, TDB_ALIGNMENT);
874
875                                 /* Right piece to go on free list */
876                                 newrec.rec_len = rec->rec_len
877                                         - (sizeof(*rec) + length);
878                                 newrec_ptr = rec_ptr + sizeof(*rec) + length;
879
880                                 /* And left record is shortened */
881                                 rec->rec_len = length;
882                         } else
883                                 newrec_ptr = 0;
884
885                         /* Remove allocated record from the free list */
886                         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
887                                 goto fail;
888
889                         /* Update header: do this before we drop alloc
890                            lock, otherwise tdb_free() might try to
891                            merge with us, thinking we're free.
892                            (Thanks Jeremy Allison). */
893                         rec->magic = TDB_MAGIC;
894                         if (rec_write(tdb, rec_ptr, rec) == -1)
895                                 goto fail;
896
897                         /* Did we create new block? */
898                         if (newrec_ptr) {
899                                 /* Update allocated record tailer (we
900                                    shortened it). */
901                                 if (update_tailer(tdb, rec_ptr, rec) == -1)
902                                         goto fail;
903
904                                 /* Free new record */
905                                 if (tdb_free(tdb, newrec_ptr, &newrec) == -1)
906                                         goto fail;
907                         }
908
909                         /* all done - return the new record offset */
910                         tdb_unlock(tdb, -1, F_WRLCK);
911                         return rec_ptr;
912                 }
913                 /* move to the next record */
914                 last_ptr = rec_ptr;
915                 rec_ptr = rec->next;
916         }
917         /* we didn't find enough space. See if we can expand the
918            database and if we can then try again */
919         if (tdb_expand(tdb, length + sizeof(*rec)) == 0)
920                 goto again;
921  fail:
922         tdb_unlock(tdb, -1, F_WRLCK);
923         return 0;
924 }
925
926 /* initialise a new database with a specified hash size */
927 static int tdb_new_database(TDB_CONTEXT *tdb, int hash_size)
928 {
929         struct tdb_header *newdb;
930         int size, ret = -1;
931
932         /* We make it up in memory, then write it out if not internal */
933         size = sizeof(struct tdb_header) + (hash_size+1)*sizeof(tdb_off);
934         if (!(newdb = calloc(size, 1)))
935                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
936
937         /* Fill in the header */
938         newdb->version = TDB_VERSION;
939         newdb->hash_size = hash_size;
940 #ifdef USE_SPINLOCKS
941         newdb->rwlocks = size;
942 #endif
943         if (tdb->flags & TDB_INTERNAL) {
944                 tdb->map_size = size;
945                 tdb->map_ptr = (char *)newdb;
946                 memcpy(&tdb->header, newdb, sizeof(tdb->header));
947                 /* Convert the `ondisk' version if asked. */
948                 CONVERT(*newdb);
949                 return 0;
950         }
951         if (lseek(tdb->fd, 0, SEEK_SET) == -1)
952                 goto fail;
953
954         if (ftruncate(tdb->fd, 0) == -1)
955                 goto fail;
956
957         /* This creates an endian-converted header, as if read from disk */
958         CONVERT(*newdb);
959         memcpy(&tdb->header, newdb, sizeof(tdb->header));
960         /* Don't endian-convert the magic food! */
961         memcpy(newdb->magic_food, TDB_MAGIC_FOOD, strlen(TDB_MAGIC_FOOD)+1);
962         if (write(tdb->fd, newdb, size) != size)
963                 ret = -1;
964         else
965                 ret = tdb_create_rwlocks(tdb->fd, hash_size);
966
967   fail:
968         SAFE_FREE(newdb);
969         return ret;
970 }
971
972 /* Returns 0 on fail.  On success, return offset of record, and fills
973    in rec */
974 static tdb_off tdb_find(TDB_CONTEXT *tdb, TDB_DATA key, u32 hash,
975                         struct list_struct *r)
976 {
977         tdb_off rec_ptr;
978         
979         /* read in the hash top */
980         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1)
981                 return 0;
982
983         /* keep looking until we find the right record */
984         while (rec_ptr) {
985                 if (rec_read(tdb, rec_ptr, r) == -1)
986                         return 0;
987
988                 if (!TDB_DEAD(r) && hash==r->full_hash && key.dsize==r->key_len) {
989                         char *k;
990                         /* a very likely hit - read the key */
991                         k = tdb_alloc_read(tdb, rec_ptr + sizeof(*r), 
992                                            r->key_len);
993                         if (!k)
994                                 return 0;
995
996                         if (memcmp(key.dptr, k, key.dsize) == 0) {
997                                 SAFE_FREE(k);
998                                 return rec_ptr;
999                         }
1000                         SAFE_FREE(k);
1001                 }
1002                 rec_ptr = r->next;
1003         }
1004         return TDB_ERRCODE(TDB_ERR_NOEXIST, 0);
1005 }
1006
1007 /* If they do lockkeys, check that this hash is one they locked */
1008 static int tdb_keylocked(TDB_CONTEXT *tdb, u32 hash)
1009 {
1010         u32 i;
1011         if (!tdb->lockedkeys)
1012                 return 1;
1013         for (i = 0; i < tdb->lockedkeys[0]; i++)
1014                 if (tdb->lockedkeys[i+1] == hash)
1015                         return 1;
1016         return TDB_ERRCODE(TDB_ERR_NOLOCK, 0);
1017 }
1018
1019 /* As tdb_find, but if you succeed, keep the lock */
1020 static tdb_off tdb_find_lock(TDB_CONTEXT *tdb, TDB_DATA key, int locktype,
1021                              struct list_struct *rec)
1022 {
1023         u32 hash, rec_ptr;
1024
1025         hash = tdb_hash(&key);
1026         if (!tdb_keylocked(tdb, hash))
1027                 return 0;
1028         if (tdb_lock(tdb, BUCKET(hash), locktype) == -1)
1029                 return 0;
1030         if (!(rec_ptr = tdb_find(tdb, key, hash, rec)))
1031                 tdb_unlock(tdb, BUCKET(hash), locktype);
1032         return rec_ptr;
1033 }
1034
1035 enum TDB_ERROR tdb_error(TDB_CONTEXT *tdb)
1036 {
1037         return tdb->ecode;
1038 }
1039
1040 static struct tdb_errname {
1041         enum TDB_ERROR ecode; const char *estring;
1042 } emap[] = { {TDB_SUCCESS, "Success"},
1043              {TDB_ERR_CORRUPT, "Corrupt database"},
1044              {TDB_ERR_IO, "IO Error"},
1045              {TDB_ERR_LOCK, "Locking error"},
1046              {TDB_ERR_OOM, "Out of memory"},
1047              {TDB_ERR_EXISTS, "Record exists"},
1048              {TDB_ERR_NOLOCK, "Lock exists on other keys"},
1049              {TDB_ERR_NOEXIST, "Record does not exist"} };
1050
1051 /* Error string for the last tdb error */
1052 const char *tdb_errorstr(TDB_CONTEXT *tdb)
1053 {
1054         u32 i;
1055         for (i = 0; i < sizeof(emap) / sizeof(struct tdb_errname); i++)
1056                 if (tdb->ecode == emap[i].ecode)
1057                         return emap[i].estring;
1058         return "Invalid error code";
1059 }
1060
1061 /* update an entry in place - this only works if the new data size
1062    is <= the old data size and the key exists.
1063    on failure return -1.
1064 */
1065
1066 static int tdb_update(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf)
1067 {
1068         struct list_struct rec;
1069         tdb_off rec_ptr;
1070
1071         /* find entry */
1072         if (!(rec_ptr = tdb_find(tdb, key, tdb_hash(&key), &rec)))
1073                 return -1;
1074
1075         /* must be long enough key, data and tailer */
1076         if (rec.rec_len < key.dsize + dbuf.dsize + sizeof(tdb_off)) {
1077                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1078                 return -1;
1079         }
1080
1081         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1082                       dbuf.dptr, dbuf.dsize) == -1)
1083                 return -1;
1084
1085         if (dbuf.dsize != rec.data_len) {
1086                 /* update size */
1087                 rec.data_len = dbuf.dsize;
1088                 return rec_write(tdb, rec_ptr, &rec);
1089         }
1090  
1091         return 0;
1092 }
1093
1094 /* find an entry in the database given a key */
1095 /* If an entry doesn't exist tdb_err will be set to
1096  * TDB_ERR_NOEXIST. If a key has no data attached
1097  * tdb_err will not be set. Both will return a
1098  * zero pptr and zero dsize.
1099  */
1100
1101 TDB_DATA tdb_fetch(TDB_CONTEXT *tdb, TDB_DATA key)
1102 {
1103         tdb_off rec_ptr;
1104         struct list_struct rec;
1105         TDB_DATA ret;
1106
1107         /* find which hash bucket it is in */
1108         if (!(rec_ptr = tdb_find_lock(tdb,key,F_RDLCK,&rec)))
1109                 return tdb_null;
1110
1111         if (rec.data_len)
1112                 ret.dptr = tdb_alloc_read(tdb, rec_ptr + sizeof(rec) + rec.key_len,
1113                                           rec.data_len);
1114         else
1115                 ret.dptr = NULL;
1116         ret.dsize = rec.data_len;
1117         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1118         return ret;
1119 }
1120
1121 /* check if an entry in the database exists 
1122
1123    note that 1 is returned if the key is found and 0 is returned if not found
1124    this doesn't match the conventions in the rest of this module, but is
1125    compatible with gdbm
1126 */
1127 int tdb_exists(TDB_CONTEXT *tdb, TDB_DATA key)
1128 {
1129         struct list_struct rec;
1130         
1131         if (tdb_find_lock(tdb, key, F_RDLCK, &rec) == 0)
1132                 return 0;
1133         tdb_unlock(tdb, BUCKET(rec.full_hash), F_RDLCK);
1134         return 1;
1135 }
1136
1137 /* record lock stops delete underneath */
1138 static int lock_record(TDB_CONTEXT *tdb, tdb_off off)
1139 {
1140         return off ? tdb_brlock(tdb, off, F_RDLCK, F_SETLKW, 0) : 0;
1141 }
1142 /*
1143   Write locks override our own fcntl readlocks, so check it here.
1144   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1145   an error to fail to get the lock here.
1146 */
1147  
1148 static int write_lock_record(TDB_CONTEXT *tdb, tdb_off off)
1149 {
1150         struct tdb_traverse_lock *i;
1151         for (i = &tdb->travlocks; i; i = i->next)
1152                 if (i->off == off)
1153                         return -1;
1154         return tdb_brlock(tdb, off, F_WRLCK, F_SETLK, 1);
1155 }
1156
1157 /*
1158   Note this is meant to be F_SETLK, *not* F_SETLKW, as it's not
1159   an error to fail to get the lock here.
1160 */
1161
1162 static int write_unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1163 {
1164         return tdb_brlock(tdb, off, F_UNLCK, F_SETLK, 0);
1165 }
1166 /* fcntl locks don't stack: avoid unlocking someone else's */
1167 static int unlock_record(TDB_CONTEXT *tdb, tdb_off off)
1168 {
1169         struct tdb_traverse_lock *i;
1170         u32 count = 0;
1171
1172         if (off == 0)
1173                 return 0;
1174         for (i = &tdb->travlocks; i; i = i->next)
1175                 if (i->off == off)
1176                         count++;
1177         return (count == 1 ? tdb_brlock(tdb, off, F_UNLCK, F_SETLKW, 0) : 0);
1178 }
1179
1180 /* actually delete an entry in the database given the offset */
1181 static int do_delete(TDB_CONTEXT *tdb, tdb_off rec_ptr, struct list_struct*rec)
1182 {
1183         tdb_off last_ptr, i;
1184         struct list_struct lastrec;
1185
1186         if (tdb->read_only) return -1;
1187
1188         if (write_lock_record(tdb, rec_ptr) == -1) {
1189                 /* Someone traversing here: mark it as dead */
1190                 rec->magic = TDB_DEAD_MAGIC;
1191                 return rec_write(tdb, rec_ptr, rec);
1192         }
1193         if (write_unlock_record(tdb, rec_ptr) != 0)
1194                 return -1;
1195
1196         /* find previous record in hash chain */
1197         if (ofs_read(tdb, TDB_HASH_TOP(rec->full_hash), &i) == -1)
1198                 return -1;
1199         for (last_ptr = 0; i != rec_ptr; last_ptr = i, i = lastrec.next)
1200                 if (rec_read(tdb, i, &lastrec) == -1)
1201                         return -1;
1202
1203         /* unlink it: next ptr is at start of record. */
1204         if (last_ptr == 0)
1205                 last_ptr = TDB_HASH_TOP(rec->full_hash);
1206         if (ofs_write(tdb, last_ptr, &rec->next) == -1)
1207                 return -1;
1208
1209         /* recover the space */
1210         if (tdb_free(tdb, rec_ptr, rec) == -1)
1211                 return -1;
1212         return 0;
1213 }
1214
1215 /* Uses traverse lock: 0 = finish, -1 = error, other = record offset */
1216 static int tdb_next_lock(TDB_CONTEXT *tdb, struct tdb_traverse_lock *tlock,
1217                          struct list_struct *rec)
1218 {
1219         int want_next = (tlock->off != 0);
1220
1221         /* No traversal allows if you've called tdb_lockkeys() */
1222         if (tdb->lockedkeys)
1223                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1224
1225         /* Lock each chain from the start one. */
1226         for (; tlock->hash < tdb->header.hash_size; tlock->hash++) {
1227                 if (tdb_lock(tdb, tlock->hash, F_WRLCK) == -1)
1228                         return -1;
1229
1230                 /* No previous record?  Start at top of chain. */
1231                 if (!tlock->off) {
1232                         if (ofs_read(tdb, TDB_HASH_TOP(tlock->hash),
1233                                      &tlock->off) == -1)
1234                                 goto fail;
1235                 } else {
1236                         /* Otherwise unlock the previous record. */
1237                         if (unlock_record(tdb, tlock->off) != 0)
1238                                 goto fail;
1239                 }
1240
1241                 if (want_next) {
1242                         /* We have offset of old record: grab next */
1243                         if (rec_read(tdb, tlock->off, rec) == -1)
1244                                 goto fail;
1245                         tlock->off = rec->next;
1246                 }
1247
1248                 /* Iterate through chain */
1249                 while( tlock->off) {
1250                         tdb_off current;
1251                         if (rec_read(tdb, tlock->off, rec) == -1)
1252                                 goto fail;
1253                         if (!TDB_DEAD(rec)) {
1254                                 /* Woohoo: we found one! */
1255                                 if (lock_record(tdb, tlock->off) != 0)
1256                                         goto fail;
1257                                 return tlock->off;
1258                         }
1259                         /* Try to clean dead ones from old traverses */
1260                         current = tlock->off;
1261                         tlock->off = rec->next;
1262                         if (do_delete(tdb, current, rec) != 0)
1263                                 goto fail;
1264                 }
1265                 tdb_unlock(tdb, tlock->hash, F_WRLCK);
1266                 want_next = 0;
1267         }
1268         /* We finished iteration without finding anything */
1269         return TDB_ERRCODE(TDB_SUCCESS, 0);
1270
1271  fail:
1272         tlock->off = 0;
1273         if (tdb_unlock(tdb, tlock->hash, F_WRLCK) != 0)
1274                 TDB_LOG((tdb, 0, "tdb_next_lock: On error unlock failed!\n"));
1275         return -1;
1276 }
1277
1278 /* traverse the entire database - calling fn(tdb, key, data) on each element.
1279    return -1 on error or the record count traversed
1280    if fn is NULL then it is not called
1281    a non-zero return value from fn() indicates that the traversal should stop
1282   */
1283 int tdb_traverse(TDB_CONTEXT *tdb, tdb_traverse_func fn, void *state)
1284 {
1285         TDB_DATA key, dbuf;
1286         struct list_struct rec;
1287         struct tdb_traverse_lock tl = { NULL, 0, 0 };
1288         int ret, count = 0;
1289
1290         /* This was in the initializaton, above, but the IRIX compiler
1291          * did not like it.  crh
1292          */
1293         tl.next = tdb->travlocks.next;
1294
1295         /* fcntl locks don't stack: beware traverse inside traverse */
1296         tdb->travlocks.next = &tl;
1297
1298         /* tdb_next_lock places locks on the record returned, and its chain */
1299         while ((ret = tdb_next_lock(tdb, &tl, &rec)) > 0) {
1300                 count++;
1301                 /* now read the full record */
1302                 key.dptr = tdb_alloc_read(tdb, tl.off + sizeof(rec), 
1303                                           rec.key_len + rec.data_len);
1304                 if (!key.dptr) {
1305                         ret = -1;
1306                         if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0)
1307                                 goto out;
1308                         if (unlock_record(tdb, tl.off) != 0)
1309                                 TDB_LOG((tdb, 0, "tdb_traverse: key.dptr == NULL and unlock_record failed!\n"));
1310                         goto out;
1311                 }
1312                 key.dsize = rec.key_len;
1313                 dbuf.dptr = key.dptr + rec.key_len;
1314                 dbuf.dsize = rec.data_len;
1315
1316                 /* Drop chain lock, call out */
1317                 if (tdb_unlock(tdb, tl.hash, F_WRLCK) != 0) {
1318                         ret = -1;
1319                         goto out;
1320                 }
1321                 if (fn && fn(tdb, key, dbuf, state)) {
1322                         /* They want us to terminate traversal */
1323                         ret = count;
1324                         if (unlock_record(tdb, tl.off) != 0) {
1325                                 TDB_LOG((tdb, 0, "tdb_traverse: unlock_record failed!\n"));;
1326                                 ret = -1;
1327                         }
1328                         tdb->travlocks.next = tl.next;
1329                         SAFE_FREE(key.dptr);
1330                         return count;
1331                 }
1332                 SAFE_FREE(key.dptr);
1333         }
1334 out:
1335         tdb->travlocks.next = tl.next;
1336         if (ret < 0)
1337                 return -1;
1338         else
1339                 return count;
1340 }
1341
1342 /* find the first entry in the database and return its key */
1343 TDB_DATA tdb_firstkey(TDB_CONTEXT *tdb)
1344 {
1345         TDB_DATA key;
1346         struct list_struct rec;
1347
1348         /* release any old lock */
1349         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1350                 return tdb_null;
1351         tdb->travlocks.off = tdb->travlocks.hash = 0;
1352
1353         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) <= 0)
1354                 return tdb_null;
1355         /* now read the key */
1356         key.dsize = rec.key_len;
1357         key.dptr =tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),key.dsize);
1358         if (tdb_unlock(tdb, BUCKET(tdb->travlocks.hash), F_WRLCK) != 0)
1359                 TDB_LOG((tdb, 0, "tdb_firstkey: error occurred while tdb_unlocking!\n"));
1360         return key;
1361 }
1362
1363 /* find the next entry in the database, returning its key */
1364 TDB_DATA tdb_nextkey(TDB_CONTEXT *tdb, TDB_DATA oldkey)
1365 {
1366         u32 oldhash;
1367         TDB_DATA key = tdb_null;
1368         struct list_struct rec;
1369         char *k = NULL;
1370
1371         /* Is locked key the old key?  If so, traverse will be reliable. */
1372         if (tdb->travlocks.off) {
1373                 if (tdb_lock(tdb,tdb->travlocks.hash,F_WRLCK))
1374                         return tdb_null;
1375                 if (rec_read(tdb, tdb->travlocks.off, &rec) == -1
1376                     || !(k = tdb_alloc_read(tdb,tdb->travlocks.off+sizeof(rec),
1377                                             rec.key_len))
1378                     || memcmp(k, oldkey.dptr, oldkey.dsize) != 0) {
1379                         /* No, it wasn't: unlock it and start from scratch */
1380                         if (unlock_record(tdb, tdb->travlocks.off) != 0)
1381                                 return tdb_null;
1382                         if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1383                                 return tdb_null;
1384                         tdb->travlocks.off = 0;
1385                 }
1386
1387                 SAFE_FREE(k);
1388         }
1389
1390         if (!tdb->travlocks.off) {
1391                 /* No previous element: do normal find, and lock record */
1392                 tdb->travlocks.off = tdb_find_lock(tdb, oldkey, F_WRLCK, &rec);
1393                 if (!tdb->travlocks.off)
1394                         return tdb_null;
1395                 tdb->travlocks.hash = BUCKET(rec.full_hash);
1396                 if (lock_record(tdb, tdb->travlocks.off) != 0) {
1397                         TDB_LOG((tdb, 0, "tdb_nextkey: lock_record failed (%s)!\n", strerror(errno)));
1398                         return tdb_null;
1399                 }
1400         }
1401         oldhash = tdb->travlocks.hash;
1402
1403         /* Grab next record: locks chain and returned record,
1404            unlocks old record */
1405         if (tdb_next_lock(tdb, &tdb->travlocks, &rec) > 0) {
1406                 key.dsize = rec.key_len;
1407                 key.dptr = tdb_alloc_read(tdb, tdb->travlocks.off+sizeof(rec),
1408                                           key.dsize);
1409                 /* Unlock the chain of this new record */
1410                 if (tdb_unlock(tdb, tdb->travlocks.hash, F_WRLCK) != 0)
1411                         TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1412         }
1413         /* Unlock the chain of old record */
1414         if (tdb_unlock(tdb, BUCKET(oldhash), F_WRLCK) != 0)
1415                 TDB_LOG((tdb, 0, "tdb_nextkey: WARNING tdb_unlock failed!\n"));
1416         return key;
1417 }
1418
1419 /* delete an entry in the database given a key */
1420 int tdb_delete(TDB_CONTEXT *tdb, TDB_DATA key)
1421 {
1422         tdb_off rec_ptr;
1423         struct list_struct rec;
1424         int ret;
1425
1426         if (!(rec_ptr = tdb_find_lock(tdb, key, F_WRLCK, &rec)))
1427                 return -1;
1428         ret = do_delete(tdb, rec_ptr, &rec);
1429         if (tdb_unlock(tdb, BUCKET(rec.full_hash), F_WRLCK) != 0)
1430                 TDB_LOG((tdb, 0, "tdb_delete: WARNING tdb_unlock failed!\n"));
1431         return ret;
1432 }
1433
1434 /* store an element in the database, replacing any existing element
1435    with the same key 
1436
1437    return 0 on success, -1 on failure
1438 */
1439 int tdb_store(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA dbuf, int flag)
1440 {
1441         struct list_struct rec;
1442         u32 hash;
1443         tdb_off rec_ptr;
1444         char *p = NULL;
1445         int ret = 0;
1446
1447         /* find which hash bucket it is in */
1448         hash = tdb_hash(&key);
1449         if (!tdb_keylocked(tdb, hash))
1450                 return -1;
1451         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1452                 return -1;
1453
1454         /* check for it existing, on insert. */
1455         if (flag == TDB_INSERT) {
1456                 if (tdb_exists(tdb, key)) {
1457                         tdb->ecode = TDB_ERR_EXISTS;
1458                         goto fail;
1459                 }
1460         } else {
1461                 /* first try in-place update, on modify or replace. */
1462                 if (tdb_update(tdb, key, dbuf) == 0)
1463                         goto out;
1464                 if (flag == TDB_MODIFY && tdb->ecode == TDB_ERR_NOEXIST)
1465                         goto fail;
1466         }
1467         /* reset the error code potentially set by the tdb_update() */
1468         tdb->ecode = TDB_SUCCESS;
1469
1470         /* delete any existing record - if it doesn't exist we don't
1471            care.  Doing this first reduces fragmentation, and avoids
1472            coalescing with `allocated' block before it's updated. */
1473         if (flag != TDB_INSERT)
1474                 tdb_delete(tdb, key);
1475
1476         /* Copy key+value *before* allocating free space in case malloc
1477            fails and we are left with a dead spot in the tdb. */
1478
1479         if (!(p = (char *)malloc(key.dsize + dbuf.dsize))) {
1480                 tdb->ecode = TDB_ERR_OOM;
1481                 goto fail;
1482         }
1483
1484         memcpy(p, key.dptr, key.dsize);
1485         if (dbuf.dsize)
1486                 memcpy(p+key.dsize, dbuf.dptr, dbuf.dsize);
1487
1488         /* now we're into insert / modify / replace of a record which
1489          * we know could not be optimised by an in-place store (for
1490          * various reasons).  */
1491         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + dbuf.dsize, &rec)))
1492                 goto fail;
1493
1494         /* Read hash top into next ptr */
1495         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1496                 goto fail;
1497
1498         rec.key_len = key.dsize;
1499         rec.data_len = dbuf.dsize;
1500         rec.full_hash = hash;
1501         rec.magic = TDB_MAGIC;
1502
1503         /* write out and point the top of the hash chain at it */
1504         if (rec_write(tdb, rec_ptr, &rec) == -1
1505             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+dbuf.dsize)==-1
1506             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1507                 /* Need to tdb_unallocate() here */
1508                 goto fail;
1509         }
1510  out:
1511         SAFE_FREE(p); 
1512         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1513         return ret;
1514 fail:
1515         ret = -1;
1516         goto out;
1517 }
1518
1519 /* Attempt to append data to an entry in place - this only works if the new data size
1520    is <= the old data size and the key exists.
1521    on failure return -1. Record must be locked before calling.
1522 */
1523 static int tdb_append_inplace(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1524 {
1525         struct list_struct rec;
1526         tdb_off rec_ptr;
1527
1528         /* find entry */
1529         if (!(rec_ptr = tdb_find(tdb, key, tdb_hash(&key), &rec)))
1530                 return -1;
1531
1532         /* Append of 0 is always ok. */
1533         if (new_dbuf.dsize == 0)
1534                 return 0;
1535
1536         /* must be long enough for key, old data + new data and tailer */
1537         if (rec.rec_len < key.dsize + rec.data_len + new_dbuf.dsize + sizeof(tdb_off)) {
1538                 /* No room. */
1539                 tdb->ecode = TDB_SUCCESS; /* Not really an error */
1540                 return -1;
1541         }
1542
1543         if (tdb_write(tdb, rec_ptr + sizeof(rec) + rec.key_len + rec.data_len,
1544                       new_dbuf.dptr, new_dbuf.dsize) == -1)
1545                 return -1;
1546
1547         /* update size */
1548         rec.data_len += new_dbuf.dsize;
1549         return rec_write(tdb, rec_ptr, &rec);
1550 }
1551
1552 /* Append to an entry. Create if not exist. */
1553
1554 int tdb_append(TDB_CONTEXT *tdb, TDB_DATA key, TDB_DATA new_dbuf)
1555 {
1556         struct list_struct rec;
1557         u32 hash;
1558         tdb_off rec_ptr;
1559         char *p = NULL;
1560         int ret = 0;
1561         size_t new_data_size = 0;
1562
1563         /* find which hash bucket it is in */
1564         hash = tdb_hash(&key);
1565         if (!tdb_keylocked(tdb, hash))
1566                 return -1;
1567         if (tdb_lock(tdb, BUCKET(hash), F_WRLCK) == -1)
1568                 return -1;
1569
1570         /* first try in-place. */
1571         if (tdb_append_inplace(tdb, key, new_dbuf) == 0)
1572                 goto out;
1573
1574         /* reset the error code potentially set by the tdb_append_inplace() */
1575         tdb->ecode = TDB_SUCCESS;
1576
1577         /* find entry */
1578         if (!(rec_ptr = tdb_find(tdb, key, hash, &rec))) {
1579                 if (tdb->ecode != TDB_ERR_NOEXIST)
1580                         goto fail;
1581
1582                 /* Not found - create. */
1583
1584                 ret = tdb_store(tdb, key, new_dbuf, TDB_INSERT);
1585                 goto out;
1586         }
1587
1588         new_data_size = rec.data_len + new_dbuf.dsize;
1589
1590         /* Copy key+old_value+value *before* allocating free space in case malloc
1591            fails and we are left with a dead spot in the tdb. */
1592
1593         if (!(p = (char *)malloc(key.dsize + new_data_size))) {
1594                 tdb->ecode = TDB_ERR_OOM;
1595                 goto fail;
1596         }
1597
1598         /* Copy the key in place. */
1599         memcpy(p, key.dptr, key.dsize);
1600
1601         /* Now read the old data into place. */
1602         if (rec.data_len &&
1603                 tdb_read(tdb, rec_ptr + sizeof(rec) + rec.key_len, p + key.dsize, rec.data_len, 0) == -1)
1604                         goto fail;
1605
1606         /* Finally append the new data. */
1607         if (new_dbuf.dsize)
1608                 memcpy(p+key.dsize+rec.data_len, new_dbuf.dptr, new_dbuf.dsize);
1609
1610         /* delete any existing record - if it doesn't exist we don't
1611            care.  Doing this first reduces fragmentation, and avoids
1612            coalescing with `allocated' block before it's updated. */
1613
1614         tdb_delete(tdb, key);
1615
1616         if (!(rec_ptr = tdb_allocate(tdb, key.dsize + new_data_size, &rec)))
1617                 goto fail;
1618
1619         /* Read hash top into next ptr */
1620         if (ofs_read(tdb, TDB_HASH_TOP(hash), &rec.next) == -1)
1621                 goto fail;
1622
1623         rec.key_len = key.dsize;
1624         rec.data_len = new_data_size;
1625         rec.full_hash = hash;
1626         rec.magic = TDB_MAGIC;
1627
1628         /* write out and point the top of the hash chain at it */
1629         if (rec_write(tdb, rec_ptr, &rec) == -1
1630             || tdb_write(tdb, rec_ptr+sizeof(rec), p, key.dsize+new_data_size)==-1
1631             || ofs_write(tdb, TDB_HASH_TOP(hash), &rec_ptr) == -1) {
1632                 /* Need to tdb_unallocate() here */
1633                 goto fail;
1634         }
1635
1636  out:
1637         SAFE_FREE(p); 
1638         tdb_unlock(tdb, BUCKET(hash), F_WRLCK);
1639         return ret;
1640
1641 fail:
1642         ret = -1;
1643         goto out;
1644 }
1645
1646 static int tdb_already_open(dev_t device,
1647                             ino_t ino)
1648 {
1649         TDB_CONTEXT *i;
1650         
1651         for (i = tdbs; i; i = i->next) {
1652                 if (i->device == device && i->inode == ino) {
1653                         return 1;
1654                 }
1655         }
1656
1657         return 0;
1658 }
1659
1660 /* open the database, creating it if necessary 
1661
1662    The open_flags and mode are passed straight to the open call on the
1663    database file. A flags value of O_WRONLY is invalid. The hash size
1664    is advisory, use zero for a default value.
1665
1666    Return is NULL on error, in which case errno is also set.  Don't 
1667    try to call tdb_error or tdb_errname, just do strerror(errno).
1668
1669    @param name may be NULL for internal databases. */
1670 TDB_CONTEXT *tdb_open(const char *name, int hash_size, int tdb_flags,
1671                       int open_flags, mode_t mode)
1672 {
1673         return tdb_open_ex(name, hash_size, tdb_flags, open_flags, mode, NULL);
1674 }
1675
1676
1677 TDB_CONTEXT *tdb_open_ex(const char *name, int hash_size, int tdb_flags,
1678                          int open_flags, mode_t mode,
1679                          tdb_log_func log_fn)
1680 {
1681         TDB_CONTEXT *tdb;
1682         struct stat st;
1683         int rev = 0, locked;
1684         unsigned char *vp;
1685         u32 vertest;
1686
1687         if (!(tdb = calloc(1, sizeof *tdb))) {
1688                 /* Can't log this */
1689                 errno = ENOMEM;
1690                 goto fail;
1691         }
1692         tdb->fd = -1;
1693         tdb->name = NULL;
1694         tdb->map_ptr = NULL;
1695         tdb->lockedkeys = NULL;
1696         tdb->flags = tdb_flags;
1697         tdb->open_flags = open_flags;
1698         tdb->log_fn = log_fn;
1699         
1700         if ((open_flags & O_ACCMODE) == O_WRONLY) {
1701                 TDB_LOG((tdb, 0, "tdb_open_ex: can't open tdb %s write-only\n",
1702                          name));
1703                 errno = EINVAL;
1704                 goto fail;
1705         }
1706         
1707         if (hash_size == 0)
1708                 hash_size = DEFAULT_HASH_SIZE;
1709         if ((open_flags & O_ACCMODE) == O_RDONLY) {
1710                 tdb->read_only = 1;
1711                 /* read only databases don't do locking or clear if first */
1712                 tdb->flags |= TDB_NOLOCK;
1713                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1714         }
1715
1716         /* internal databases don't mmap or lock, and start off cleared */
1717         if (tdb->flags & TDB_INTERNAL) {
1718                 tdb->flags |= (TDB_NOLOCK | TDB_NOMMAP);
1719                 tdb->flags &= ~TDB_CLEAR_IF_FIRST;
1720                 if (tdb_new_database(tdb, hash_size) != 0) {
1721                         TDB_LOG((tdb, 0, "tdb_open_ex: tdb_new_database failed!"));
1722                         goto fail;
1723                 }
1724                 goto internal;
1725         }
1726
1727         if ((tdb->fd = open(name, open_flags, mode)) == -1) {
1728                 TDB_LOG((tdb, 5, "tdb_open_ex: could not open file %s: %s\n",
1729                          name, strerror(errno)));
1730                 goto fail;      /* errno set by open(2) */
1731         }
1732
1733         /* ensure there is only one process initialising at once */
1734         if (tdb_brlock(tdb, GLOBAL_LOCK, F_WRLCK, F_SETLKW, 0) == -1) {
1735                 TDB_LOG((tdb, 0, "tdb_open_ex: failed to get global lock on %s: %s\n",
1736                          name, strerror(errno)));
1737                 goto fail;      /* errno set by tdb_brlock */
1738         }
1739
1740         /* we need to zero database if we are the only one with it open */
1741         if ((locked = (tdb_brlock(tdb, ACTIVE_LOCK, F_WRLCK, F_SETLK, 0) == 0))
1742             && (tdb_flags & TDB_CLEAR_IF_FIRST)) {
1743                 open_flags |= O_CREAT;
1744                 if (ftruncate(tdb->fd, 0) == -1) {
1745                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1746                                  "failed to truncate %s: %s\n",
1747                                  name, strerror(errno)));
1748                         goto fail; /* errno set by ftruncate */
1749                 }
1750         }
1751
1752         if (read(tdb->fd, &tdb->header, sizeof(tdb->header)) != sizeof(tdb->header)
1753             || strcmp(tdb->header.magic_food, TDB_MAGIC_FOOD) != 0
1754             || (tdb->header.version != TDB_VERSION
1755                 && !(rev = (tdb->header.version==TDB_BYTEREV(TDB_VERSION))))) {
1756                 /* its not a valid database - possibly initialise it */
1757                 if (!(open_flags & O_CREAT) || tdb_new_database(tdb, hash_size) == -1) {
1758                         errno = EIO; /* ie bad format or something */
1759                         goto fail;
1760                 }
1761                 rev = (tdb->flags & TDB_CONVERT);
1762         }
1763         vp = (unsigned char *)&tdb->header.version;
1764         vertest = (((u32)vp[0]) << 24) | (((u32)vp[1]) << 16) |
1765                   (((u32)vp[2]) << 8) | (u32)vp[3];
1766         tdb->flags |= (vertest==TDB_VERSION) ? TDB_BIGENDIAN : 0;
1767         if (!rev)
1768                 tdb->flags &= ~TDB_CONVERT;
1769         else {
1770                 tdb->flags |= TDB_CONVERT;
1771                 convert(&tdb->header, sizeof(tdb->header));
1772         }
1773         if (fstat(tdb->fd, &st) == -1)
1774                 goto fail;
1775
1776         /* Is it already in the open list?  If so, fail. */
1777         if (tdb_already_open(st.st_dev, st.st_ino)) {
1778                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1779                          "%s (%d,%d) is already open in this process\n",
1780                          name, st.st_dev, st.st_ino));
1781                 errno = EBUSY;
1782                 goto fail;
1783         }
1784
1785         if (!(tdb->name = (char *)strdup(name))) {
1786                 errno = ENOMEM;
1787                 goto fail;
1788         }
1789
1790         tdb->map_size = st.st_size;
1791         tdb->device = st.st_dev;
1792         tdb->inode = st.st_ino;
1793         tdb->locked = calloc(tdb->header.hash_size+1, sizeof(tdb->locked[0]));
1794         if (!tdb->locked) {
1795                 TDB_LOG((tdb, 2, "tdb_open_ex: "
1796                          "failed to allocate lock structure for %s\n",
1797                          name));
1798                 errno = ENOMEM;
1799                 goto fail;
1800         }
1801         tdb_mmap(tdb);
1802         if (locked) {
1803                 if (!tdb->read_only)
1804                         if (tdb_clear_spinlocks(tdb) != 0) {
1805                                 TDB_LOG((tdb, 0, "tdb_open_ex: "
1806                                 "failed to clear spinlock\n"));
1807                                 goto fail;
1808                         }
1809                 if (tdb_brlock(tdb, ACTIVE_LOCK, F_UNLCK, F_SETLK, 0) == -1) {
1810                         TDB_LOG((tdb, 0, "tdb_open_ex: "
1811                                  "failed to take ACTIVE_LOCK on %s: %s\n",
1812                                  name, strerror(errno)));
1813                         goto fail;
1814                 }
1815         }
1816         /* leave this lock in place to indicate it's in use */
1817         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1)
1818                 goto fail;
1819
1820  internal:
1821         /* Internal (memory-only) databases skip all the code above to
1822          * do with disk files, and resume here by releasing their
1823          * global lock and hooking into the active list. */
1824         if (tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0) == -1)
1825                 goto fail;
1826         tdb->next = tdbs;
1827         tdbs = tdb;
1828         return tdb;
1829
1830  fail:
1831         { int save_errno = errno;
1832
1833         if (!tdb)
1834                 return NULL;
1835         
1836         if (tdb->map_ptr) {
1837                 if (tdb->flags & TDB_INTERNAL)
1838                         SAFE_FREE(tdb->map_ptr);
1839                 else
1840                         tdb_munmap(tdb);
1841         }
1842         SAFE_FREE(tdb->name);
1843         if (tdb->fd != -1)
1844                 if (close(tdb->fd) != 0)
1845                         TDB_LOG((tdb, 5, "tdb_open_ex: failed to close tdb->fd on error!\n"));
1846         SAFE_FREE(tdb->locked);
1847         SAFE_FREE(tdb);
1848         errno = save_errno;
1849         return NULL;
1850         }
1851 }
1852
1853 /**
1854  * Close a database.
1855  *
1856  * @returns -1 for error; 0 for success.
1857  **/
1858 int tdb_close(TDB_CONTEXT *tdb)
1859 {
1860         TDB_CONTEXT **i;
1861         int ret = 0;
1862
1863         if (tdb->map_ptr) {
1864                 if (tdb->flags & TDB_INTERNAL)
1865                         SAFE_FREE(tdb->map_ptr);
1866                 else
1867                         tdb_munmap(tdb);
1868         }
1869         SAFE_FREE(tdb->name);
1870         if (tdb->fd != -1)
1871                 ret = close(tdb->fd);
1872         SAFE_FREE(tdb->locked);
1873         SAFE_FREE(tdb->lockedkeys);
1874
1875         /* Remove from contexts list */
1876         for (i = &tdbs; *i; i = &(*i)->next) {
1877                 if (*i == tdb) {
1878                         *i = tdb->next;
1879                         break;
1880                 }
1881         }
1882
1883         memset(tdb, 0, sizeof(*tdb));
1884         SAFE_FREE(tdb);
1885
1886         return ret;
1887 }
1888
1889 /* lock/unlock entire database */
1890 int tdb_lockall(TDB_CONTEXT *tdb)
1891 {
1892         u32 i;
1893
1894         /* There are no locks on read-only dbs */
1895         if (tdb->read_only)
1896                 return TDB_ERRCODE(TDB_ERR_LOCK, -1);
1897         if (tdb->lockedkeys)
1898                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1899         for (i = 0; i < tdb->header.hash_size; i++) 
1900                 if (tdb_lock(tdb, i, F_WRLCK))
1901                         break;
1902
1903         /* If error, release locks we have... */
1904         if (i < tdb->header.hash_size) {
1905                 u32 j;
1906
1907                 for ( j = 0; j < i; j++)
1908                         tdb_unlock(tdb, j, F_WRLCK);
1909                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1910         }
1911
1912         return 0;
1913 }
1914 void tdb_unlockall(TDB_CONTEXT *tdb)
1915 {
1916         u32 i;
1917         for (i=0; i < tdb->header.hash_size; i++)
1918                 tdb_unlock(tdb, i, F_WRLCK);
1919 }
1920
1921 int tdb_lockkeys(TDB_CONTEXT *tdb, u32 number, TDB_DATA keys[])
1922 {
1923         u32 i, j, hash;
1924
1925         /* Can't lock more keys if already locked */
1926         if (tdb->lockedkeys)
1927                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1928         if (!(tdb->lockedkeys = malloc(sizeof(u32) * (number+1))))
1929                 return TDB_ERRCODE(TDB_ERR_OOM, -1);
1930         /* First number in array is # keys */
1931         tdb->lockedkeys[0] = number;
1932
1933         /* Insertion sort by bucket */
1934         for (i = 0; i < number; i++) {
1935                 hash = tdb_hash(&keys[i]);
1936                 for (j = 0; j < i && BUCKET(tdb->lockedkeys[j+1]) < BUCKET(hash); j++);
1937                         memmove(&tdb->lockedkeys[j+2], &tdb->lockedkeys[j+1], sizeof(u32) * (i-j));
1938                 tdb->lockedkeys[j+1] = hash;
1939         }
1940         /* Finally, lock in order */
1941         for (i = 0; i < number; i++)
1942                 if (tdb_lock(tdb, i, F_WRLCK))
1943                         break;
1944
1945         /* If error, release locks we have... */
1946         if (i < number) {
1947                 for ( j = 0; j < i; j++)
1948                         tdb_unlock(tdb, j, F_WRLCK);
1949                 SAFE_FREE(tdb->lockedkeys);
1950                 return TDB_ERRCODE(TDB_ERR_NOLOCK, -1);
1951         }
1952         return 0;
1953 }
1954
1955 /* Unlock the keys previously locked by tdb_lockkeys() */
1956 void tdb_unlockkeys(TDB_CONTEXT *tdb)
1957 {
1958         u32 i;
1959         if (!tdb->lockedkeys)
1960                 return;
1961         for (i = 0; i < tdb->lockedkeys[0]; i++)
1962                 tdb_unlock(tdb, tdb->lockedkeys[i+1], F_WRLCK);
1963         SAFE_FREE(tdb->lockedkeys);
1964 }
1965
1966 /* lock/unlock one hash chain. This is meant to be used to reduce
1967    contention - it cannot guarantee how many records will be locked */
1968 int tdb_chainlock(TDB_CONTEXT *tdb, TDB_DATA key)
1969 {
1970         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1971 }
1972
1973 int tdb_chainunlock(TDB_CONTEXT *tdb, TDB_DATA key)
1974 {
1975         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_WRLCK);
1976 }
1977
1978 static int tdb_chainlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1979 {
1980         return tdb_lock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
1981 }
1982
1983 static int tdb_chainunlock_read(TDB_CONTEXT *tdb, TDB_DATA key)
1984 {
1985         return tdb_unlock(tdb, BUCKET(tdb_hash(&key)), F_RDLCK);
1986 }
1987
1988
1989 /* register a loging function */
1990 void tdb_logging_function(TDB_CONTEXT *tdb, void (*fn)(TDB_CONTEXT *, int , const char *, ...))
1991 {
1992         tdb->log_fn = fn;
1993 }
1994
1995
1996 /* reopen a tdb - this is used after a fork to ensure that we have an independent
1997    seek pointer from our parent and to re-establish locks */
1998 int tdb_reopen(TDB_CONTEXT *tdb)
1999 {
2000         struct stat st;
2001
2002         if (tdb_munmap(tdb) != 0) {
2003                 TDB_LOG((tdb, 0, "tdb_reopen: munmap failed (%s)\n", strerror(errno)));
2004                 goto fail;
2005         }
2006         if (close(tdb->fd) != 0)
2007                 TDB_LOG((tdb, 0, "tdb_reopen: WARNING closing tdb->fd failed!\n"));
2008         tdb->fd = open(tdb->name, tdb->open_flags & ~(O_CREAT|O_TRUNC), 0);
2009         if (tdb->fd == -1) {
2010                 TDB_LOG((tdb, 0, "tdb_reopen: open failed (%s)\n", strerror(errno)));
2011                 goto fail;
2012         }
2013         if (fstat(tdb->fd, &st) != 0) {
2014                 TDB_LOG((tdb, 0, "tdb_reopen: fstat failed (%s)\n", strerror(errno)));
2015                 goto fail;
2016         }
2017         if (st.st_ino != tdb->inode || st.st_dev != tdb->device) {
2018                 TDB_LOG((tdb, 0, "tdb_reopen: file dev/inode has changed!\n"));
2019                 goto fail;
2020         }
2021         tdb_mmap(tdb);
2022         if (tdb_brlock(tdb, ACTIVE_LOCK, F_RDLCK, F_SETLKW, 0) == -1) {
2023                 TDB_LOG((tdb, 0, "tdb_reopen: failed to obtain active lock\n"));
2024                 goto fail;
2025         }
2026
2027         return 0;
2028
2029 fail:
2030         tdb_close(tdb);
2031         return -1;
2032 }
2033
2034 /* reopen all tdb's */
2035 int tdb_reopen_all(void)
2036 {
2037         TDB_CONTEXT *tdb;
2038
2039         for (tdb=tdbs; tdb; tdb = tdb->next) {
2040                 if (tdb_reopen(tdb) != 0) return -1;
2041         }
2042
2043         return 0;
2044 }