diff options
Diffstat (limited to 'transaction.go')
-rw-r--r-- | transaction.go | 349 |
1 files changed, 300 insertions, 49 deletions
diff --git a/transaction.go b/transaction.go index 6e9ca8f..1bb990b 100644 --- a/transaction.go +++ b/transaction.go @@ -1,27 +1,28 @@ package bolt import ( - "bytes" + "sort" + "unsafe" ) -// Transaction represents a read-only transaction on the database. -// It can be used for retrieving values for keys as well as creating cursors for -// iterating over the data. -// -// IMPORTANT: You must close transactions when you are done with them. Pages -// can not be reclaimed by the writer until no more transactions are using them. -// A long running read transaction can cause the database to quickly grow. -type Transaction struct { - db *DB - meta *meta - buckets *buckets - pages map[pgid]*page -} - // txnid represents the internal transaction identifier. type txnid uint64 -// init initializes the transaction and associates it with a database. +// Transaction represents a consistent view into the database. +// Read-only transactions can be created by calling DB.Transaction(). +// Read-write transactions can be created by calling DB.RWTransaction(). +// Only one read-write transaction is allowed at a time. +type Transaction struct { + db *DB + meta *meta + buckets *buckets + writable bool + pages map[pgid]*page + nodes map[pgid]*node + pending []*node +} + +// init initializes the transaction. func (t *Transaction) init(db *DB) { t.db = db t.pages = nil @@ -33,6 +34,11 @@ func (t *Transaction) init(db *DB) { // Read in the buckets page. t.buckets = &buckets{} t.buckets.read(t.page(t.meta.buckets)) + + t.pages = make(map[pgid]*page) + + // Increment the transaction id. + t.meta.txnid += txnid(1) } // id returns the transaction id. @@ -40,19 +46,20 @@ func (t *Transaction) id() txnid { return t.meta.txnid } -// Close closes the transaction and releases any pages it is using. -func (t *Transaction) Close() { - t.db.removeTransaction(t) -} - // DB returns a reference to the database that created the transaction. func (t *Transaction) DB() *DB { return t.db } +// Writable returns whether the transaction can change data. +func (t *Transaction) Writable() bool { + return t.writable +} + // Bucket retrieves a bucket by name. // Returns nil if the bucket does not exist. func (t *Transaction) Bucket(name string) *Bucket { + _assert(t.isOpen(), "transaction not open") b := t.buckets.get(name) if b == nil { return nil @@ -67,6 +74,7 @@ func (t *Transaction) Bucket(name string) *Bucket { // Buckets retrieves a list of all buckets. func (t *Transaction) Buckets() []*Bucket { + _assert(t.isOpen(), "transaction not open") buckets := make([]*Bucket, 0, len(t.buckets.items)) for name, b := range t.buckets.items { bucket := &Bucket{bucket: b, transaction: t, name: name} @@ -75,49 +83,263 @@ func (t *Transaction) Buckets() []*Bucket { return buckets } -// Cursor creates a cursor associated with a given bucket. -// The cursor is only valid as long as the Transaction is open. -// Do not use a cursor after the transaction is closed. -func (t *Transaction) Cursor(name string) (*Cursor, error) { - b := t.Bucket(name) - if b == nil { - return nil, ErrBucketNotFound +// CreateBucket creates a new bucket. +// Returns an error if the transaction is read-only, if bucket already exists, +// if the bucket name is blank, or if the bucket name is too long. +func (t *Transaction) CreateBucket(name string) error { + _assert(t.isOpen(), "transaction not open") + if !t.writable { + return ErrTransactionNotWritable + } else if b := t.Bucket(name); b != nil { + return ErrBucketExists + } else if len(name) == 0 { + return ErrBucketNameRequired + } else if len(name) > MaxBucketNameSize { + return ErrBucketNameTooLarge } - return b.cursor(), nil -} -// Get retrieves the value for a key in a named bucket. -// Returns a nil value if the key does not exist. -// Returns an error if the bucket does not exist. -func (t *Transaction) Get(name string, key []byte) (value []byte, err error) { - c, err := t.Cursor(name) + // Create a blank root leaf page. + p, err := t.allocate(1) if err != nil { - return nil, err + return err } - k, v := c.Seek(key) - // If our target node isn't the same key as what's passed in then return nil. - if !bytes.Equal(key, k) { - return nil, nil + p.flags = leafPageFlag + + // Add bucket to buckets page. + t.buckets.put(name, &bucket{root: p.id}) + + return nil +} + +// CreateBucketIfNotExists creates a new bucket if it doesn't already exist. +// Returns an error if the transaction is read-only, if the bucket name is +// blank, or if the bucket name is too long. +func (t *Transaction) CreateBucketIfNotExists(name string) error { + _assert(t.isOpen(), "transaction not open") + err := t.CreateBucket(name) + if err != nil && err != ErrBucketExists { + return err + } + return nil +} + +// DeleteBucket deletes a bucket. +// Returns an error if the transaction is read-only or if the bucket cannot be found. +func (t *Transaction) DeleteBucket(name string) error { + _assert(t.isOpen(), "transaction not open") + if !t.writable { + return ErrTransactionNotWritable + } else if b := t.Bucket(name); b == nil { + return ErrBucketNotFound } - return v, nil + + // Remove from buckets page. + t.buckets.del(name) + + // TODO(benbjohnson): Free all pages. + + return nil } -// ForEach executes a function for each key/value pair in a bucket. -// An error is returned if the bucket cannot be found. -func (t *Transaction) ForEach(name string, fn func(k, v []byte) error) error { - // Open a cursor on the bucket. - c, err := t.Cursor(name) +// Commit writes all changes to disk and updates the meta page. +// Read-only transactions will simply be closed. +// Returns an error if a disk write error occurs. +func (t *Transaction) Commit() error { + defer t.close() + + // Ignore commit for read-only transactions. + if !t.writable { + return nil + } + + // TODO(benbjohnson): Use vectorized I/O to write out dirty pages. + + // Rebalance and spill data onto dirty pages. + t.rebalance() + t.spill() + + // Spill buckets page. + p, err := t.allocate((t.buckets.size() / t.db.pageSize) + 1) if err != nil { return err } + t.buckets.write(p) + + // Write dirty pages to disk. + if err := t.write(); err != nil { + return err + } + + // Update the meta. + t.meta.buckets = p.id + + // Write meta to disk. + if err := t.writeMeta(); err != nil { + return err + } + + return nil +} + +// Rollback closes the transaction and rolls back any pending changes. +func (t *Transaction) Rollback() { + t.close() +} + +func (t *Transaction) close() { + if t.writable { + t.db.rwlock.Unlock() + } else { + t.db.removeTransaction(t) + } + + // Detach from the database. + t.db = nil +} + +// isOpen returns whether the transaction is currently open. +func (t *Transaction) isOpen() bool { + return t.db != nil +} + +// allocate returns a contiguous block of memory starting at a given page. +func (t *Transaction) allocate(count int) (*page, error) { + p, err := t.db.allocate(count) + if err != nil { + return nil, err + } + + // Save to our page cache. + t.pages[p.id] = p + + return p, nil +} + +// rebalance attempts to balance all nodes. +func (t *Transaction) rebalance() { + for _, n := range t.nodes { + n.rebalance() + } +} + +// spill writes all the nodes to dirty pages. +func (t *Transaction) spill() error { + // Keep track of the current root nodes. + // We will update this at the end once all nodes are created. + type root struct { + node *node + pgid pgid + } + var roots []root + + // Sort nodes by highest depth first. + nodes := make(nodesByDepth, 0, len(t.nodes)) + for _, n := range t.nodes { + nodes = append(nodes, n) + } + sort.Sort(nodes) + + // Spill nodes by deepest first. + for i := 0; i < len(nodes); i++ { + n := nodes[i] + + // Save existing root buckets for later. + if n.parent == nil && n.pgid != 0 { + roots = append(roots, root{n, n.pgid}) + } + + // Split nodes into appropriate sized nodes. + // The first node in this list will be a reference to n to preserve ancestry. + newNodes := n.split(t.db.pageSize) + t.pending = newNodes + + // If this is a root node that split then create a parent node. + if n.parent == nil && len(newNodes) > 1 { + n.parent = &node{transaction: t, isLeaf: false} + nodes = append(nodes, n.parent) + } + + // Add node's page to the freelist. + if n.pgid > 0 { + t.db.freelist.free(t.id(), t.page(n.pgid)) + } + + // Write nodes to dirty pages. + for i, newNode := range newNodes { + // Allocate contiguous space for the node. + p, err := t.allocate((newNode.size() / t.db.pageSize) + 1) + if err != nil { + return err + } + + // Write the node to the page. + newNode.write(p) + newNode.pgid = p.id + newNode.parent = n.parent + + // The first node should use the existing entry, other nodes are inserts. + var oldKey []byte + if i == 0 { + oldKey = n.key + } else { + oldKey = newNode.inodes[0].key + } + + // Update the parent entry. + if newNode.parent != nil { + newNode.parent.put(oldKey, newNode.inodes[0].key, nil, newNode.pgid) + } + } + + t.pending = nil + } + + // Update roots with new roots. + for _, root := range roots { + t.buckets.updateRoot(root.pgid, root.node.root().pgid) + } + + // Clear out nodes now that they are all spilled. + t.nodes = make(map[pgid]*node) + + return nil +} + +// write writes any dirty pages to disk. +func (t *Transaction) write() error { + // Sort pages by id. + pages := make(pages, 0, len(t.pages)) + for _, p := range t.pages { + pages = append(pages, p) + } + sort.Sort(pages) - // Iterate over each key/value pair in the bucket. - for k, v := c.First(); k != nil; k, v = c.Next() { - if err := fn(k, v); err != nil { + // Write pages to disk in order. + for _, p := range pages { + size := (int(p.overflow) + 1) * t.db.pageSize + buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size] + offset := int64(p.id) * int64(t.db.pageSize) + if _, err := t.db.file.WriteAt(buf, offset); err != nil { return err } } + // Clear out page cache. + t.pages = make(map[pgid]*page) + + return nil +} + +// writeMeta writes the meta to the disk. +func (t *Transaction) writeMeta() error { + // Create a temporary buffer for the meta page. + buf := make([]byte, t.db.pageSize) + p := t.db.pageInBuffer(buf, 0) + t.meta.write(p) + + // Write the meta page to file. + t.db.metafile.WriteAt(buf, int64(p.id)*int64(t.db.pageSize)) + return nil } @@ -135,6 +357,35 @@ func (t *Transaction) page(id pgid) *page { return t.db.page(id) } +// node creates a node from a page and associates it with a given parent. +func (t *Transaction) node(pgid pgid, parent *node) *node { + // Retrieve node if it has already been fetched. + if n := t.nodes[pgid]; n != nil { + return n + } + + // Otherwise create a branch and cache it. + n := &node{transaction: t, parent: parent} + if n.parent != nil { + n.depth = n.parent.depth + 1 + } + n.read(t.page(pgid)) + t.nodes[pgid] = n + + return n +} + +// dereference removes all references to the old mmap. +func (t *Transaction) dereference() { + for _, n := range t.nodes { + n.dereference() + } + + for _, n := range t.pending { + n.dereference() + } +} + // forEachPage iterates over every page within a given page and executes a function. func (t *Transaction) forEachPage(pgid pgid, depth int, fn func(*page, int)) { p := t.page(pgid) |