aboutsummaryrefslogtreecommitdiff
path: root/transaction.go
diff options
context:
space:
mode:
Diffstat (limited to 'transaction.go')
-rw-r--r--transaction.go349
1 files changed, 300 insertions, 49 deletions
diff --git a/transaction.go b/transaction.go
index 6e9ca8f..1bb990b 100644
--- a/transaction.go
+++ b/transaction.go
@@ -1,27 +1,28 @@
package bolt
import (
- "bytes"
+ "sort"
+ "unsafe"
)
-// Transaction represents a read-only transaction on the database.
-// It can be used for retrieving values for keys as well as creating cursors for
-// iterating over the data.
-//
-// IMPORTANT: You must close transactions when you are done with them. Pages
-// can not be reclaimed by the writer until no more transactions are using them.
-// A long running read transaction can cause the database to quickly grow.
-type Transaction struct {
- db *DB
- meta *meta
- buckets *buckets
- pages map[pgid]*page
-}
-
// txnid represents the internal transaction identifier.
type txnid uint64
-// init initializes the transaction and associates it with a database.
+// Transaction represents a consistent view into the database.
+// Read-only transactions can be created by calling DB.Transaction().
+// Read-write transactions can be created by calling DB.RWTransaction().
+// Only one read-write transaction is allowed at a time.
+type Transaction struct {
+ db *DB
+ meta *meta
+ buckets *buckets
+ writable bool
+ pages map[pgid]*page
+ nodes map[pgid]*node
+ pending []*node
+}
+
+// init initializes the transaction.
func (t *Transaction) init(db *DB) {
t.db = db
t.pages = nil
@@ -33,6 +34,11 @@ func (t *Transaction) init(db *DB) {
// Read in the buckets page.
t.buckets = &buckets{}
t.buckets.read(t.page(t.meta.buckets))
+
+ t.pages = make(map[pgid]*page)
+
+ // Increment the transaction id.
+ t.meta.txnid += txnid(1)
}
// id returns the transaction id.
@@ -40,19 +46,20 @@ func (t *Transaction) id() txnid {
return t.meta.txnid
}
-// Close closes the transaction and releases any pages it is using.
-func (t *Transaction) Close() {
- t.db.removeTransaction(t)
-}
-
// DB returns a reference to the database that created the transaction.
func (t *Transaction) DB() *DB {
return t.db
}
+// Writable returns whether the transaction can change data.
+func (t *Transaction) Writable() bool {
+ return t.writable
+}
+
// Bucket retrieves a bucket by name.
// Returns nil if the bucket does not exist.
func (t *Transaction) Bucket(name string) *Bucket {
+ _assert(t.isOpen(), "transaction not open")
b := t.buckets.get(name)
if b == nil {
return nil
@@ -67,6 +74,7 @@ func (t *Transaction) Bucket(name string) *Bucket {
// Buckets retrieves a list of all buckets.
func (t *Transaction) Buckets() []*Bucket {
+ _assert(t.isOpen(), "transaction not open")
buckets := make([]*Bucket, 0, len(t.buckets.items))
for name, b := range t.buckets.items {
bucket := &Bucket{bucket: b, transaction: t, name: name}
@@ -75,49 +83,263 @@ func (t *Transaction) Buckets() []*Bucket {
return buckets
}
-// Cursor creates a cursor associated with a given bucket.
-// The cursor is only valid as long as the Transaction is open.
-// Do not use a cursor after the transaction is closed.
-func (t *Transaction) Cursor(name string) (*Cursor, error) {
- b := t.Bucket(name)
- if b == nil {
- return nil, ErrBucketNotFound
+// CreateBucket creates a new bucket.
+// Returns an error if the transaction is read-only, if bucket already exists,
+// if the bucket name is blank, or if the bucket name is too long.
+func (t *Transaction) CreateBucket(name string) error {
+ _assert(t.isOpen(), "transaction not open")
+ if !t.writable {
+ return ErrTransactionNotWritable
+ } else if b := t.Bucket(name); b != nil {
+ return ErrBucketExists
+ } else if len(name) == 0 {
+ return ErrBucketNameRequired
+ } else if len(name) > MaxBucketNameSize {
+ return ErrBucketNameTooLarge
}
- return b.cursor(), nil
-}
-// Get retrieves the value for a key in a named bucket.
-// Returns a nil value if the key does not exist.
-// Returns an error if the bucket does not exist.
-func (t *Transaction) Get(name string, key []byte) (value []byte, err error) {
- c, err := t.Cursor(name)
+ // Create a blank root leaf page.
+ p, err := t.allocate(1)
if err != nil {
- return nil, err
+ return err
}
- k, v := c.Seek(key)
- // If our target node isn't the same key as what's passed in then return nil.
- if !bytes.Equal(key, k) {
- return nil, nil
+ p.flags = leafPageFlag
+
+ // Add bucket to buckets page.
+ t.buckets.put(name, &bucket{root: p.id})
+
+ return nil
+}
+
+// CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
+// Returns an error if the transaction is read-only, if the bucket name is
+// blank, or if the bucket name is too long.
+func (t *Transaction) CreateBucketIfNotExists(name string) error {
+ _assert(t.isOpen(), "transaction not open")
+ err := t.CreateBucket(name)
+ if err != nil && err != ErrBucketExists {
+ return err
+ }
+ return nil
+}
+
+// DeleteBucket deletes a bucket.
+// Returns an error if the transaction is read-only or if the bucket cannot be found.
+func (t *Transaction) DeleteBucket(name string) error {
+ _assert(t.isOpen(), "transaction not open")
+ if !t.writable {
+ return ErrTransactionNotWritable
+ } else if b := t.Bucket(name); b == nil {
+ return ErrBucketNotFound
}
- return v, nil
+
+ // Remove from buckets page.
+ t.buckets.del(name)
+
+ // TODO(benbjohnson): Free all pages.
+
+ return nil
}
-// ForEach executes a function for each key/value pair in a bucket.
-// An error is returned if the bucket cannot be found.
-func (t *Transaction) ForEach(name string, fn func(k, v []byte) error) error {
- // Open a cursor on the bucket.
- c, err := t.Cursor(name)
+// Commit writes all changes to disk and updates the meta page.
+// Read-only transactions will simply be closed.
+// Returns an error if a disk write error occurs.
+func (t *Transaction) Commit() error {
+ defer t.close()
+
+ // Ignore commit for read-only transactions.
+ if !t.writable {
+ return nil
+ }
+
+ // TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
+
+ // Rebalance and spill data onto dirty pages.
+ t.rebalance()
+ t.spill()
+
+ // Spill buckets page.
+ p, err := t.allocate((t.buckets.size() / t.db.pageSize) + 1)
if err != nil {
return err
}
+ t.buckets.write(p)
+
+ // Write dirty pages to disk.
+ if err := t.write(); err != nil {
+ return err
+ }
+
+ // Update the meta.
+ t.meta.buckets = p.id
+
+ // Write meta to disk.
+ if err := t.writeMeta(); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+// Rollback closes the transaction and rolls back any pending changes.
+func (t *Transaction) Rollback() {
+ t.close()
+}
+
+func (t *Transaction) close() {
+ if t.writable {
+ t.db.rwlock.Unlock()
+ } else {
+ t.db.removeTransaction(t)
+ }
+
+ // Detach from the database.
+ t.db = nil
+}
+
+// isOpen returns whether the transaction is currently open.
+func (t *Transaction) isOpen() bool {
+ return t.db != nil
+}
+
+// allocate returns a contiguous block of memory starting at a given page.
+func (t *Transaction) allocate(count int) (*page, error) {
+ p, err := t.db.allocate(count)
+ if err != nil {
+ return nil, err
+ }
+
+ // Save to our page cache.
+ t.pages[p.id] = p
+
+ return p, nil
+}
+
+// rebalance attempts to balance all nodes.
+func (t *Transaction) rebalance() {
+ for _, n := range t.nodes {
+ n.rebalance()
+ }
+}
+
+// spill writes all the nodes to dirty pages.
+func (t *Transaction) spill() error {
+ // Keep track of the current root nodes.
+ // We will update this at the end once all nodes are created.
+ type root struct {
+ node *node
+ pgid pgid
+ }
+ var roots []root
+
+ // Sort nodes by highest depth first.
+ nodes := make(nodesByDepth, 0, len(t.nodes))
+ for _, n := range t.nodes {
+ nodes = append(nodes, n)
+ }
+ sort.Sort(nodes)
+
+ // Spill nodes by deepest first.
+ for i := 0; i < len(nodes); i++ {
+ n := nodes[i]
+
+ // Save existing root buckets for later.
+ if n.parent == nil && n.pgid != 0 {
+ roots = append(roots, root{n, n.pgid})
+ }
+
+ // Split nodes into appropriate sized nodes.
+ // The first node in this list will be a reference to n to preserve ancestry.
+ newNodes := n.split(t.db.pageSize)
+ t.pending = newNodes
+
+ // If this is a root node that split then create a parent node.
+ if n.parent == nil && len(newNodes) > 1 {
+ n.parent = &node{transaction: t, isLeaf: false}
+ nodes = append(nodes, n.parent)
+ }
+
+ // Add node's page to the freelist.
+ if n.pgid > 0 {
+ t.db.freelist.free(t.id(), t.page(n.pgid))
+ }
+
+ // Write nodes to dirty pages.
+ for i, newNode := range newNodes {
+ // Allocate contiguous space for the node.
+ p, err := t.allocate((newNode.size() / t.db.pageSize) + 1)
+ if err != nil {
+ return err
+ }
+
+ // Write the node to the page.
+ newNode.write(p)
+ newNode.pgid = p.id
+ newNode.parent = n.parent
+
+ // The first node should use the existing entry, other nodes are inserts.
+ var oldKey []byte
+ if i == 0 {
+ oldKey = n.key
+ } else {
+ oldKey = newNode.inodes[0].key
+ }
+
+ // Update the parent entry.
+ if newNode.parent != nil {
+ newNode.parent.put(oldKey, newNode.inodes[0].key, nil, newNode.pgid)
+ }
+ }
+
+ t.pending = nil
+ }
+
+ // Update roots with new roots.
+ for _, root := range roots {
+ t.buckets.updateRoot(root.pgid, root.node.root().pgid)
+ }
+
+ // Clear out nodes now that they are all spilled.
+ t.nodes = make(map[pgid]*node)
+
+ return nil
+}
+
+// write writes any dirty pages to disk.
+func (t *Transaction) write() error {
+ // Sort pages by id.
+ pages := make(pages, 0, len(t.pages))
+ for _, p := range t.pages {
+ pages = append(pages, p)
+ }
+ sort.Sort(pages)
- // Iterate over each key/value pair in the bucket.
- for k, v := c.First(); k != nil; k, v = c.Next() {
- if err := fn(k, v); err != nil {
+ // Write pages to disk in order.
+ for _, p := range pages {
+ size := (int(p.overflow) + 1) * t.db.pageSize
+ buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size]
+ offset := int64(p.id) * int64(t.db.pageSize)
+ if _, err := t.db.file.WriteAt(buf, offset); err != nil {
return err
}
}
+ // Clear out page cache.
+ t.pages = make(map[pgid]*page)
+
+ return nil
+}
+
+// writeMeta writes the meta to the disk.
+func (t *Transaction) writeMeta() error {
+ // Create a temporary buffer for the meta page.
+ buf := make([]byte, t.db.pageSize)
+ p := t.db.pageInBuffer(buf, 0)
+ t.meta.write(p)
+
+ // Write the meta page to file.
+ t.db.metafile.WriteAt(buf, int64(p.id)*int64(t.db.pageSize))
+
return nil
}
@@ -135,6 +357,35 @@ func (t *Transaction) page(id pgid) *page {
return t.db.page(id)
}
+// node creates a node from a page and associates it with a given parent.
+func (t *Transaction) node(pgid pgid, parent *node) *node {
+ // Retrieve node if it has already been fetched.
+ if n := t.nodes[pgid]; n != nil {
+ return n
+ }
+
+ // Otherwise create a branch and cache it.
+ n := &node{transaction: t, parent: parent}
+ if n.parent != nil {
+ n.depth = n.parent.depth + 1
+ }
+ n.read(t.page(pgid))
+ t.nodes[pgid] = n
+
+ return n
+}
+
+// dereference removes all references to the old mmap.
+func (t *Transaction) dereference() {
+ for _, n := range t.nodes {
+ n.dereference()
+ }
+
+ for _, n := range t.pending {
+ n.dereference()
+ }
+}
+
// forEachPage iterates over every page within a given page and executes a function.
func (t *Transaction) forEachPage(pgid pgid, depth int, fn func(*page, int)) {
p := t.page(pgid)