aboutsummaryrefslogtreecommitdiff
path: root/db.go
diff options
context:
space:
mode:
Diffstat (limited to 'db.go')
-rw-r--r--db.go107
1 files changed, 61 insertions, 46 deletions
diff --git a/db.go b/db.go
index a76639d..c9611f9 100644
--- a/db.go
+++ b/db.go
@@ -45,6 +45,7 @@ type DB struct {
rwlock sync.Mutex // Allows only one writer at a time.
metalock sync.Mutex // Protects meta page access.
mmaplock sync.RWMutex // Protects mmap access during remapping.
+ statlock sync.RWMutex // Protects stats access.
ops struct {
writeAt func(b []byte, off int64) (n int, err error)
@@ -133,7 +134,7 @@ func (db *DB) mmap(minsz int) error {
// Dereference all mmap references before unmapping.
if db.rwtx != nil {
- db.rwtx.dereference()
+ db.rwtx.root.dereference()
}
// Unmap existing data before continuing.
@@ -224,7 +225,7 @@ func (db *DB) init() error {
m.pageSize = uint32(db.pageSize)
m.version = version
m.freelist = 2
- m.buckets = 3
+ m.root = bucket{root: 3}
m.pgid = 4
m.txid = txid(i)
}
@@ -238,7 +239,7 @@ func (db *DB) init() error {
// Write an empty leaf page at page 4.
p = db.pageInBuffer(buf[:], pgid(3))
p.id = pgid(3)
- p.flags = bucketsPageFlag
+ p.flags = leafPageFlag
p.count = 0
// Write the buffer to our data file.
@@ -305,16 +306,18 @@ func (db *DB) Begin(writable bool) (*Tx, error) {
}
func (db *DB) beginTx() (*Tx, error) {
- db.metalock.Lock()
- defer db.metalock.Unlock()
-
// Obtain a read-only lock on the mmap. When the mmap is remapped it will
// obtain a write lock so all transactions must finish before it can be
// remapped.
db.mmaplock.RLock()
+ // Lock the meta pages while we initialize the transaction.
+ db.metalock.Lock()
+ defer db.metalock.Unlock()
+
// Exit if the database is not open yet.
if !db.opened {
+ db.mmaplock.RUnlock()
return nil, ErrDatabaseNotOpen
}
@@ -329,12 +332,15 @@ func (db *DB) beginTx() (*Tx, error) {
}
func (db *DB) beginRWTx() (*Tx, error) {
- db.metalock.Lock()
- defer db.metalock.Unlock()
-
// Obtain writer lock. This is released by the transaction when it closes.
+ // This enforces only one writer transaction at a time.
db.rwlock.Lock()
+ // Once we have the writer lock then we can lock the meta pages so that
+ // we can set up the transaction.
+ db.metalock.Lock()
+ defer db.metalock.Unlock()
+
// Exit if the database is not open yet.
if !db.opened {
db.rwlock.Unlock()
@@ -363,7 +369,6 @@ func (db *DB) beginRWTx() (*Tx, error) {
// removeTx removes a transaction from the database.
func (db *DB) removeTx(tx *Tx) {
db.metalock.Lock()
- defer db.metalock.Unlock()
// Release the read lock on the mmap.
db.mmaplock.RUnlock()
@@ -376,8 +381,13 @@ func (db *DB) removeTx(tx *Tx) {
}
}
+ // Unlock the meta pages.
+ db.metalock.Unlock()
+
// Merge statistics.
+ db.statlock.Lock()
db.stats.TxStats.add(&tx.stats)
+ db.statlock.Unlock()
}
// Update executes a function within the context of a read-write managed transaction.
@@ -497,8 +507,8 @@ func (db *DB) CopyFile(path string, mode os.FileMode) error {
// Stats retrieves ongoing performance stats for the database.
// This is only updated when a transaction closes.
func (db *DB) Stats() Stats {
- db.metalock.Lock()
- defer db.metalock.Unlock()
+ db.statlock.RLock()
+ defer db.statlock.RUnlock()
return db.stats
}
@@ -510,40 +520,14 @@ func (db *DB) Check() error {
// Track every reachable page.
reachable := make(map[pgid]*page)
- reachable[0] = tx.page(0) // meta0
- reachable[1] = tx.page(1) // meta1
- for i := uint32(0); i <= tx.page(tx.meta.buckets).overflow; i++ {
- reachable[tx.meta.buckets+pgid(i)] = tx.page(tx.meta.buckets)
- }
- for i := uint32(0); i <= tx.page(tx.meta.freelist).overflow; i++ {
- reachable[tx.meta.freelist+pgid(i)] = tx.page(tx.meta.freelist)
+ reachable[0] = db.page(0) // meta0
+ reachable[1] = db.page(1) // meta1
+ for i := uint32(0); i <= db.page(tx.meta.freelist).overflow; i++ {
+ reachable[tx.meta.freelist+pgid(i)] = db.page(tx.meta.freelist)
}
- // Check each reachable page within each bucket.
- for _, bucket := range tx.Buckets() {
- // warnf("[bucket] %s", bucket.name)
- tx.forEachPage(bucket.root, 0, func(p *page, _ int) {
- // Ensure each page is only referenced once.
- for i := pgid(0); i <= pgid(p.overflow); i++ {
- var id = p.id + i
- if _, ok := reachable[id]; ok {
- errors = append(errors, fmt.Errorf("page %d: multiple references", int(id)))
- }
- reachable[id] = p
- }
-
- // Retrieve page info.
- info, err := tx.Page(int(p.id))
- // warnf("[page] %d + %d (%s)", p.id, p.overflow, info.Type)
- if err != nil {
- errors = append(errors, err)
- } else if info == nil {
- errors = append(errors, fmt.Errorf("page %d: out of bounds: %d", int(p.id), int(tx.meta.pgid)))
- } else if info.Type != "branch" && info.Type != "leaf" {
- errors = append(errors, fmt.Errorf("page %d: invalid type: %s", int(p.id), info.Type))
- }
- })
- }
+ // Recursively check buckets.
+ db.checkBucket(&tx.root, reachable, &errors)
// Ensure all pages below high water mark are either reachable or freed.
for i := pgid(0); i < tx.meta.pgid; i++ {
@@ -553,8 +537,6 @@ func (db *DB) Check() error {
}
}
- // TODO(benbjohnson): Ensure that only one buckets page exists.
-
if len(errors) > 0 {
return errors
}
@@ -563,6 +545,39 @@ func (db *DB) Check() error {
})
}
+func (db *DB) checkBucket(b *Bucket, reachable map[pgid]*page, errors *ErrorList) {
+ // Check every page used by this bucket.
+ b.tx.forEachPage(b.root, 0, func(p *page, _ int) {
+ // Ensure each page is only referenced once.
+ for i := pgid(0); i <= pgid(p.overflow); i++ {
+ var id = p.id + i
+ if _, ok := reachable[id]; ok {
+ *errors = append(*errors, fmt.Errorf("page %d: multiple references", int(id)))
+ }
+ reachable[id] = p
+ }
+
+ // Retrieve page info.
+ info, err := b.tx.Page(int(p.id))
+ // warnf("[page] %d + %d (%s)", p.id, p.overflow, info.Type)
+ if err != nil {
+ *errors = append(*errors, err)
+ } else if info == nil {
+ *errors = append(*errors, fmt.Errorf("page %d: out of bounds: %d", int(p.id), int(b.tx.meta.pgid)))
+ } else if info.Type != "branch" && info.Type != "leaf" {
+ *errors = append(*errors, fmt.Errorf("page %d: invalid type: %s", int(p.id), info.Type))
+ }
+ })
+
+ // Check each bucket within this bucket.
+ _ = b.ForEach(func(k, v []byte) error {
+ if child := b.Bucket(k); child != nil {
+ db.checkBucket(child, reachable, errors)
+ }
+ return nil
+ })
+}
+
// page retrieves a page reference from the mmap based on the current page size.
func (db *DB) page(id pgid) *page {
pos := id * pgid(db.pageSize)