diff options
Diffstat (limited to 'db.go')
-rw-r--r-- | db.go | 91 |
1 files changed, 77 insertions, 14 deletions
@@ -25,10 +25,14 @@ type DB struct { meta0 *meta meta1 *meta pageSize int - opened bool - mutex sync.Mutex + opened bool rwtransaction *RWTransaction transactions []*Transaction + freelist *freelist + + rwlock sync.Mutex // Allows only one writer at a time. + metalock sync.Mutex // Protects meta page access. + mmaplock sync.RWMutex // Protects mmap access during remapping. } // NewDB creates a new DB instance. @@ -45,8 +49,8 @@ func (db *DB) Path() string { // If the file does not exist then it will be created automatically. func (db *DB) Open(path string, mode os.FileMode) error { var err error - db.mutex.Lock() - defer db.mutex.Unlock() + db.metalock.Lock() + defer db.metalock.Unlock() // Initialize OS/Syscall references. // These are overridden by mocks during some tests. @@ -99,6 +103,10 @@ func (db *DB) Open(path string, mode os.FileMode) error { return err } + // Read in the freelist. + db.freelist = &freelist{pending: make(map[txnid][]pgid)} + db.freelist.read(db.page(db.meta().freelist)) + // Mark the database as opened and return. db.opened = true return nil @@ -113,7 +121,7 @@ func (db *DB) mmap() error { return &Error{"file size too small", err} } - // TEMP(benbjohnson): Set max size to 1MB. + // TODO(benbjohnson): Determine appropriate mmap size by db size. size := 2 << 30 // Memory-map the data file as a byte slice. @@ -154,7 +162,7 @@ func (db *DB) init() error { m.version = version m.pageSize = uint32(db.pageSize) m.version = version - m.free = 2 + m.freelist = 2 m.buckets = 3 m.pgid = 4 m.txnid = txnid(i) @@ -182,20 +190,21 @@ func (db *DB) init() error { // Close releases all resources related to the database. func (db *DB) Close() { - db.mutex.Lock() - defer db.mutex.Unlock() + db.metalock.Lock() + defer db.metalock.Unlock() db.close() } func (db *DB) close() { // TODO: Undo everything in Open(). + db.freelist = nil } // Transaction creates a read-only transaction. // Multiple read-only transactions can be used concurrently. func (db *DB) Transaction() (*Transaction, error) { - db.mutex.Lock() - defer db.mutex.Unlock() + db.metalock.Lock() + defer db.metalock.Unlock() // Exit if the database is not open yet. if !db.opened { @@ -206,30 +215,60 @@ func (db *DB) Transaction() (*Transaction, error) { t := &Transaction{} t.init(db) + // Keep track of transaction until it closes. + db.transactions = append(db.transactions, t) + return t, nil } // RWTransaction creates a read/write transaction. // Only one read/write transaction is allowed at a time. func (db *DB) RWTransaction() (*RWTransaction, error) { - db.mutex.Lock() - defer db.mutex.Unlock() + db.metalock.Lock() + defer db.metalock.Unlock() - // TODO: db.writerMutex.Lock() - // TODO: Add unlock to RWTransaction.Commit() / Abort() + // Obtain writer lock. This is released by the RWTransaction when it closes. + db.rwlock.Lock() // Exit if the database is not open yet. if !db.opened { + db.rwlock.Unlock() return nil, DatabaseNotOpenError } // Create a transaction associated with the database. t := &RWTransaction{nodes: make(map[pgid]*node)} t.init(db) + db.rwtransaction = t + + // Free any pages associated with closed read-only transactions. + var minid txnid = 0xFFFFFFFFFFFFFFFF + for _, t := range db.transactions { + if t.id() < minid { + minid = t.id() + } + } + if minid > 0 { + db.freelist.release(minid - 1) + } return t, nil } +// removeTransaction removes a transaction from the database. +func (db *DB) removeTransaction(t *Transaction) { + db.metalock.Lock() + defer db.metalock.Unlock() + + // Remove the transaction. + for i, txn := range db.transactions { + if txn == t { + db.transactions = append(db.transactions[:i], db.transactions[i+1:]...) + break + } + } +} + // Bucket retrieves a reference to a bucket. func (db *DB) Bucket(name string) (*Bucket, error) { t, err := db.Transaction() @@ -372,8 +411,32 @@ func (db *DB) meta() *meta { return db.meta1 } +// allocate returns a contiguous block of memory starting at a given page. +func (db *DB) allocate(count int) *page { + // Allocate a temporary buffer for the page. + buf := make([]byte, count*db.pageSize) + p := (*page)(unsafe.Pointer(&buf[0])) + p.overflow = uint32(count - 1) + + // Use pages from the freelist if they are available. + if p.id = db.freelist.allocate(count); p.id != 0 { + return p + } + + // TODO(benbjohnson): Resize mmap(). + + // If there are no free pages then allocate from the end of the file. + p.id = db.rwtransaction.meta.pgid + db.rwtransaction.meta.pgid += pgid(count) + + return p +} + // sync flushes the file descriptor to disk. func (db *DB) sync(force bool) error { + if db.opened { + return DatabaseAlreadyOpenedError + } if err := syscall.Fsync(int(db.file.Fd())); err != nil { return err } |