diff options
author | Ben Johnson <benbjohnson@yahoo.com> | 2014-04-11 15:11:55 -0600 |
---|---|---|
committer | Ben Johnson <benbjohnson@yahoo.com> | 2014-04-11 15:11:55 -0600 |
commit | 2c8020ec8e98e7b6c6c0fd3bd6e91d41caf7f25a (patch) | |
tree | 125c24e03c653417ce8bf5965b7fbcbeb2dedb04 /db.go | |
parent | Merge pull request #128 from benbjohnson/import-export (diff) | |
parent | Upgrade import/export to use nested buckets. (diff) | |
download | dedo-2c8020ec8e98e7b6c6c0fd3bd6e91d41caf7f25a.tar.gz dedo-2c8020ec8e98e7b6c6c0fd3bd6e91d41caf7f25a.tar.xz |
Merge pull request #127 from benbjohnson/nested-keys
Add nested buckets.
Diffstat (limited to 'db.go')
-rw-r--r-- | db.go | 107 |
1 files changed, 61 insertions, 46 deletions
@@ -45,6 +45,7 @@ type DB struct { rwlock sync.Mutex // Allows only one writer at a time. metalock sync.Mutex // Protects meta page access. mmaplock sync.RWMutex // Protects mmap access during remapping. + statlock sync.RWMutex // Protects stats access. ops struct { writeAt func(b []byte, off int64) (n int, err error) @@ -133,7 +134,7 @@ func (db *DB) mmap(minsz int) error { // Dereference all mmap references before unmapping. if db.rwtx != nil { - db.rwtx.dereference() + db.rwtx.root.dereference() } // Unmap existing data before continuing. @@ -224,7 +225,7 @@ func (db *DB) init() error { m.pageSize = uint32(db.pageSize) m.version = version m.freelist = 2 - m.buckets = 3 + m.root = bucket{root: 3} m.pgid = 4 m.txid = txid(i) } @@ -238,7 +239,7 @@ func (db *DB) init() error { // Write an empty leaf page at page 4. p = db.pageInBuffer(buf[:], pgid(3)) p.id = pgid(3) - p.flags = bucketsPageFlag + p.flags = leafPageFlag p.count = 0 // Write the buffer to our data file. @@ -305,16 +306,18 @@ func (db *DB) Begin(writable bool) (*Tx, error) { } func (db *DB) beginTx() (*Tx, error) { - db.metalock.Lock() - defer db.metalock.Unlock() - // Obtain a read-only lock on the mmap. When the mmap is remapped it will // obtain a write lock so all transactions must finish before it can be // remapped. db.mmaplock.RLock() + // Lock the meta pages while we initialize the transaction. + db.metalock.Lock() + defer db.metalock.Unlock() + // Exit if the database is not open yet. if !db.opened { + db.mmaplock.RUnlock() return nil, ErrDatabaseNotOpen } @@ -329,12 +332,15 @@ func (db *DB) beginTx() (*Tx, error) { } func (db *DB) beginRWTx() (*Tx, error) { - db.metalock.Lock() - defer db.metalock.Unlock() - // Obtain writer lock. This is released by the transaction when it closes. + // This enforces only one writer transaction at a time. db.rwlock.Lock() + // Once we have the writer lock then we can lock the meta pages so that + // we can set up the transaction. + db.metalock.Lock() + defer db.metalock.Unlock() + // Exit if the database is not open yet. if !db.opened { db.rwlock.Unlock() @@ -363,7 +369,6 @@ func (db *DB) beginRWTx() (*Tx, error) { // removeTx removes a transaction from the database. func (db *DB) removeTx(tx *Tx) { db.metalock.Lock() - defer db.metalock.Unlock() // Release the read lock on the mmap. db.mmaplock.RUnlock() @@ -376,8 +381,13 @@ func (db *DB) removeTx(tx *Tx) { } } + // Unlock the meta pages. + db.metalock.Unlock() + // Merge statistics. + db.statlock.Lock() db.stats.TxStats.add(&tx.stats) + db.statlock.Unlock() } // Update executes a function within the context of a read-write managed transaction. @@ -497,8 +507,8 @@ func (db *DB) CopyFile(path string, mode os.FileMode) error { // Stats retrieves ongoing performance stats for the database. // This is only updated when a transaction closes. func (db *DB) Stats() Stats { - db.metalock.Lock() - defer db.metalock.Unlock() + db.statlock.RLock() + defer db.statlock.RUnlock() return db.stats } @@ -510,40 +520,14 @@ func (db *DB) Check() error { // Track every reachable page. reachable := make(map[pgid]*page) - reachable[0] = tx.page(0) // meta0 - reachable[1] = tx.page(1) // meta1 - for i := uint32(0); i <= tx.page(tx.meta.buckets).overflow; i++ { - reachable[tx.meta.buckets+pgid(i)] = tx.page(tx.meta.buckets) - } - for i := uint32(0); i <= tx.page(tx.meta.freelist).overflow; i++ { - reachable[tx.meta.freelist+pgid(i)] = tx.page(tx.meta.freelist) + reachable[0] = db.page(0) // meta0 + reachable[1] = db.page(1) // meta1 + for i := uint32(0); i <= db.page(tx.meta.freelist).overflow; i++ { + reachable[tx.meta.freelist+pgid(i)] = db.page(tx.meta.freelist) } - // Check each reachable page within each bucket. - for _, bucket := range tx.Buckets() { - // warnf("[bucket] %s", bucket.name) - tx.forEachPage(bucket.root, 0, func(p *page, _ int) { - // Ensure each page is only referenced once. - for i := pgid(0); i <= pgid(p.overflow); i++ { - var id = p.id + i - if _, ok := reachable[id]; ok { - errors = append(errors, fmt.Errorf("page %d: multiple references", int(id))) - } - reachable[id] = p - } - - // Retrieve page info. - info, err := tx.Page(int(p.id)) - // warnf("[page] %d + %d (%s)", p.id, p.overflow, info.Type) - if err != nil { - errors = append(errors, err) - } else if info == nil { - errors = append(errors, fmt.Errorf("page %d: out of bounds: %d", int(p.id), int(tx.meta.pgid))) - } else if info.Type != "branch" && info.Type != "leaf" { - errors = append(errors, fmt.Errorf("page %d: invalid type: %s", int(p.id), info.Type)) - } - }) - } + // Recursively check buckets. + db.checkBucket(&tx.root, reachable, &errors) // Ensure all pages below high water mark are either reachable or freed. for i := pgid(0); i < tx.meta.pgid; i++ { @@ -553,8 +537,6 @@ func (db *DB) Check() error { } } - // TODO(benbjohnson): Ensure that only one buckets page exists. - if len(errors) > 0 { return errors } @@ -563,6 +545,39 @@ func (db *DB) Check() error { }) } +func (db *DB) checkBucket(b *Bucket, reachable map[pgid]*page, errors *ErrorList) { + // Check every page used by this bucket. + b.tx.forEachPage(b.root, 0, func(p *page, _ int) { + // Ensure each page is only referenced once. + for i := pgid(0); i <= pgid(p.overflow); i++ { + var id = p.id + i + if _, ok := reachable[id]; ok { + *errors = append(*errors, fmt.Errorf("page %d: multiple references", int(id))) + } + reachable[id] = p + } + + // Retrieve page info. + info, err := b.tx.Page(int(p.id)) + // warnf("[page] %d + %d (%s)", p.id, p.overflow, info.Type) + if err != nil { + *errors = append(*errors, err) + } else if info == nil { + *errors = append(*errors, fmt.Errorf("page %d: out of bounds: %d", int(p.id), int(b.tx.meta.pgid))) + } else if info.Type != "branch" && info.Type != "leaf" { + *errors = append(*errors, fmt.Errorf("page %d: invalid type: %s", int(p.id), info.Type)) + } + }) + + // Check each bucket within this bucket. + _ = b.ForEach(func(k, v []byte) error { + if child := b.Bucket(k); child != nil { + db.checkBucket(child, reachable, errors) + } + return nil + }) +} + // page retrieves a page reference from the mmap based on the current page size. func (db *DB) page(id pgid) *page { pos := id * pgid(db.pageSize) |