diff options
Diffstat (limited to 'transaction.go')
-rw-r--r-- | transaction.go | 349 |
1 files changed, 49 insertions, 300 deletions
diff --git a/transaction.go b/transaction.go index 1bb990b..6e9ca8f 100644 --- a/transaction.go +++ b/transaction.go @@ -1,28 +1,27 @@ package bolt import ( - "sort" - "unsafe" + "bytes" ) -// txnid represents the internal transaction identifier. -type txnid uint64 - -// Transaction represents a consistent view into the database. -// Read-only transactions can be created by calling DB.Transaction(). -// Read-write transactions can be created by calling DB.RWTransaction(). -// Only one read-write transaction is allowed at a time. +// Transaction represents a read-only transaction on the database. +// It can be used for retrieving values for keys as well as creating cursors for +// iterating over the data. +// +// IMPORTANT: You must close transactions when you are done with them. Pages +// can not be reclaimed by the writer until no more transactions are using them. +// A long running read transaction can cause the database to quickly grow. type Transaction struct { - db *DB - meta *meta - buckets *buckets - writable bool - pages map[pgid]*page - nodes map[pgid]*node - pending []*node + db *DB + meta *meta + buckets *buckets + pages map[pgid]*page } -// init initializes the transaction. +// txnid represents the internal transaction identifier. +type txnid uint64 + +// init initializes the transaction and associates it with a database. func (t *Transaction) init(db *DB) { t.db = db t.pages = nil @@ -34,11 +33,6 @@ func (t *Transaction) init(db *DB) { // Read in the buckets page. t.buckets = &buckets{} t.buckets.read(t.page(t.meta.buckets)) - - t.pages = make(map[pgid]*page) - - // Increment the transaction id. - t.meta.txnid += txnid(1) } // id returns the transaction id. @@ -46,20 +40,19 @@ func (t *Transaction) id() txnid { return t.meta.txnid } +// Close closes the transaction and releases any pages it is using. +func (t *Transaction) Close() { + t.db.removeTransaction(t) +} + // DB returns a reference to the database that created the transaction. func (t *Transaction) DB() *DB { return t.db } -// Writable returns whether the transaction can change data. -func (t *Transaction) Writable() bool { - return t.writable -} - // Bucket retrieves a bucket by name. // Returns nil if the bucket does not exist. func (t *Transaction) Bucket(name string) *Bucket { - _assert(t.isOpen(), "transaction not open") b := t.buckets.get(name) if b == nil { return nil @@ -74,7 +67,6 @@ func (t *Transaction) Bucket(name string) *Bucket { // Buckets retrieves a list of all buckets. func (t *Transaction) Buckets() []*Bucket { - _assert(t.isOpen(), "transaction not open") buckets := make([]*Bucket, 0, len(t.buckets.items)) for name, b := range t.buckets.items { bucket := &Bucket{bucket: b, transaction: t, name: name} @@ -83,263 +75,49 @@ func (t *Transaction) Buckets() []*Bucket { return buckets } -// CreateBucket creates a new bucket. -// Returns an error if the transaction is read-only, if bucket already exists, -// if the bucket name is blank, or if the bucket name is too long. -func (t *Transaction) CreateBucket(name string) error { - _assert(t.isOpen(), "transaction not open") - if !t.writable { - return ErrTransactionNotWritable - } else if b := t.Bucket(name); b != nil { - return ErrBucketExists - } else if len(name) == 0 { - return ErrBucketNameRequired - } else if len(name) > MaxBucketNameSize { - return ErrBucketNameTooLarge - } - - // Create a blank root leaf page. - p, err := t.allocate(1) - if err != nil { - return err - } - p.flags = leafPageFlag - - // Add bucket to buckets page. - t.buckets.put(name, &bucket{root: p.id}) - - return nil -} - -// CreateBucketIfNotExists creates a new bucket if it doesn't already exist. -// Returns an error if the transaction is read-only, if the bucket name is -// blank, or if the bucket name is too long. -func (t *Transaction) CreateBucketIfNotExists(name string) error { - _assert(t.isOpen(), "transaction not open") - err := t.CreateBucket(name) - if err != nil && err != ErrBucketExists { - return err - } - return nil -} - -// DeleteBucket deletes a bucket. -// Returns an error if the transaction is read-only or if the bucket cannot be found. -func (t *Transaction) DeleteBucket(name string) error { - _assert(t.isOpen(), "transaction not open") - if !t.writable { - return ErrTransactionNotWritable - } else if b := t.Bucket(name); b == nil { - return ErrBucketNotFound - } - - // Remove from buckets page. - t.buckets.del(name) - - // TODO(benbjohnson): Free all pages. - - return nil -} - -// Commit writes all changes to disk and updates the meta page. -// Read-only transactions will simply be closed. -// Returns an error if a disk write error occurs. -func (t *Transaction) Commit() error { - defer t.close() - - // Ignore commit for read-only transactions. - if !t.writable { - return nil - } - - // TODO(benbjohnson): Use vectorized I/O to write out dirty pages. - - // Rebalance and spill data onto dirty pages. - t.rebalance() - t.spill() - - // Spill buckets page. - p, err := t.allocate((t.buckets.size() / t.db.pageSize) + 1) - if err != nil { - return err - } - t.buckets.write(p) - - // Write dirty pages to disk. - if err := t.write(); err != nil { - return err - } - - // Update the meta. - t.meta.buckets = p.id - - // Write meta to disk. - if err := t.writeMeta(); err != nil { - return err - } - - return nil -} - -// Rollback closes the transaction and rolls back any pending changes. -func (t *Transaction) Rollback() { - t.close() -} - -func (t *Transaction) close() { - if t.writable { - t.db.rwlock.Unlock() - } else { - t.db.removeTransaction(t) +// Cursor creates a cursor associated with a given bucket. +// The cursor is only valid as long as the Transaction is open. +// Do not use a cursor after the transaction is closed. +func (t *Transaction) Cursor(name string) (*Cursor, error) { + b := t.Bucket(name) + if b == nil { + return nil, ErrBucketNotFound } - - // Detach from the database. - t.db = nil + return b.cursor(), nil } -// isOpen returns whether the transaction is currently open. -func (t *Transaction) isOpen() bool { - return t.db != nil -} - -// allocate returns a contiguous block of memory starting at a given page. -func (t *Transaction) allocate(count int) (*page, error) { - p, err := t.db.allocate(count) +// Get retrieves the value for a key in a named bucket. +// Returns a nil value if the key does not exist. +// Returns an error if the bucket does not exist. +func (t *Transaction) Get(name string, key []byte) (value []byte, err error) { + c, err := t.Cursor(name) if err != nil { return nil, err } - - // Save to our page cache. - t.pages[p.id] = p - - return p, nil -} - -// rebalance attempts to balance all nodes. -func (t *Transaction) rebalance() { - for _, n := range t.nodes { - n.rebalance() - } -} - -// spill writes all the nodes to dirty pages. -func (t *Transaction) spill() error { - // Keep track of the current root nodes. - // We will update this at the end once all nodes are created. - type root struct { - node *node - pgid pgid - } - var roots []root - - // Sort nodes by highest depth first. - nodes := make(nodesByDepth, 0, len(t.nodes)) - for _, n := range t.nodes { - nodes = append(nodes, n) - } - sort.Sort(nodes) - - // Spill nodes by deepest first. - for i := 0; i < len(nodes); i++ { - n := nodes[i] - - // Save existing root buckets for later. - if n.parent == nil && n.pgid != 0 { - roots = append(roots, root{n, n.pgid}) - } - - // Split nodes into appropriate sized nodes. - // The first node in this list will be a reference to n to preserve ancestry. - newNodes := n.split(t.db.pageSize) - t.pending = newNodes - - // If this is a root node that split then create a parent node. - if n.parent == nil && len(newNodes) > 1 { - n.parent = &node{transaction: t, isLeaf: false} - nodes = append(nodes, n.parent) - } - - // Add node's page to the freelist. - if n.pgid > 0 { - t.db.freelist.free(t.id(), t.page(n.pgid)) - } - - // Write nodes to dirty pages. - for i, newNode := range newNodes { - // Allocate contiguous space for the node. - p, err := t.allocate((newNode.size() / t.db.pageSize) + 1) - if err != nil { - return err - } - - // Write the node to the page. - newNode.write(p) - newNode.pgid = p.id - newNode.parent = n.parent - - // The first node should use the existing entry, other nodes are inserts. - var oldKey []byte - if i == 0 { - oldKey = n.key - } else { - oldKey = newNode.inodes[0].key - } - - // Update the parent entry. - if newNode.parent != nil { - newNode.parent.put(oldKey, newNode.inodes[0].key, nil, newNode.pgid) - } - } - - t.pending = nil - } - - // Update roots with new roots. - for _, root := range roots { - t.buckets.updateRoot(root.pgid, root.node.root().pgid) + k, v := c.Seek(key) + // If our target node isn't the same key as what's passed in then return nil. + if !bytes.Equal(key, k) { + return nil, nil } - - // Clear out nodes now that they are all spilled. - t.nodes = make(map[pgid]*node) - - return nil + return v, nil } -// write writes any dirty pages to disk. -func (t *Transaction) write() error { - // Sort pages by id. - pages := make(pages, 0, len(t.pages)) - for _, p := range t.pages { - pages = append(pages, p) +// ForEach executes a function for each key/value pair in a bucket. +// An error is returned if the bucket cannot be found. +func (t *Transaction) ForEach(name string, fn func(k, v []byte) error) error { + // Open a cursor on the bucket. + c, err := t.Cursor(name) + if err != nil { + return err } - sort.Sort(pages) - // Write pages to disk in order. - for _, p := range pages { - size := (int(p.overflow) + 1) * t.db.pageSize - buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size] - offset := int64(p.id) * int64(t.db.pageSize) - if _, err := t.db.file.WriteAt(buf, offset); err != nil { + // Iterate over each key/value pair in the bucket. + for k, v := c.First(); k != nil; k, v = c.Next() { + if err := fn(k, v); err != nil { return err } } - // Clear out page cache. - t.pages = make(map[pgid]*page) - - return nil -} - -// writeMeta writes the meta to the disk. -func (t *Transaction) writeMeta() error { - // Create a temporary buffer for the meta page. - buf := make([]byte, t.db.pageSize) - p := t.db.pageInBuffer(buf, 0) - t.meta.write(p) - - // Write the meta page to file. - t.db.metafile.WriteAt(buf, int64(p.id)*int64(t.db.pageSize)) - return nil } @@ -357,35 +135,6 @@ func (t *Transaction) page(id pgid) *page { return t.db.page(id) } -// node creates a node from a page and associates it with a given parent. -func (t *Transaction) node(pgid pgid, parent *node) *node { - // Retrieve node if it has already been fetched. - if n := t.nodes[pgid]; n != nil { - return n - } - - // Otherwise create a branch and cache it. - n := &node{transaction: t, parent: parent} - if n.parent != nil { - n.depth = n.parent.depth + 1 - } - n.read(t.page(pgid)) - t.nodes[pgid] = n - - return n -} - -// dereference removes all references to the old mmap. -func (t *Transaction) dereference() { - for _, n := range t.nodes { - n.dereference() - } - - for _, n := range t.pending { - n.dereference() - } -} - // forEachPage iterates over every page within a given page and executes a function. func (t *Transaction) forEachPage(pgid pgid, depth int, fn func(*page, int)) { p := t.page(pgid) |