aboutsummaryrefslogtreecommitdiff
path: root/transaction.go
diff options
context:
space:
mode:
Diffstat (limited to 'transaction.go')
-rw-r--r--transaction.go349
1 files changed, 49 insertions, 300 deletions
diff --git a/transaction.go b/transaction.go
index 1bb990b..6e9ca8f 100644
--- a/transaction.go
+++ b/transaction.go
@@ -1,28 +1,27 @@
package bolt
import (
- "sort"
- "unsafe"
+ "bytes"
)
-// txnid represents the internal transaction identifier.
-type txnid uint64
-
-// Transaction represents a consistent view into the database.
-// Read-only transactions can be created by calling DB.Transaction().
-// Read-write transactions can be created by calling DB.RWTransaction().
-// Only one read-write transaction is allowed at a time.
+// Transaction represents a read-only transaction on the database.
+// It can be used for retrieving values for keys as well as creating cursors for
+// iterating over the data.
+//
+// IMPORTANT: You must close transactions when you are done with them. Pages
+// can not be reclaimed by the writer until no more transactions are using them.
+// A long running read transaction can cause the database to quickly grow.
type Transaction struct {
- db *DB
- meta *meta
- buckets *buckets
- writable bool
- pages map[pgid]*page
- nodes map[pgid]*node
- pending []*node
+ db *DB
+ meta *meta
+ buckets *buckets
+ pages map[pgid]*page
}
-// init initializes the transaction.
+// txnid represents the internal transaction identifier.
+type txnid uint64
+
+// init initializes the transaction and associates it with a database.
func (t *Transaction) init(db *DB) {
t.db = db
t.pages = nil
@@ -34,11 +33,6 @@ func (t *Transaction) init(db *DB) {
// Read in the buckets page.
t.buckets = &buckets{}
t.buckets.read(t.page(t.meta.buckets))
-
- t.pages = make(map[pgid]*page)
-
- // Increment the transaction id.
- t.meta.txnid += txnid(1)
}
// id returns the transaction id.
@@ -46,20 +40,19 @@ func (t *Transaction) id() txnid {
return t.meta.txnid
}
+// Close closes the transaction and releases any pages it is using.
+func (t *Transaction) Close() {
+ t.db.removeTransaction(t)
+}
+
// DB returns a reference to the database that created the transaction.
func (t *Transaction) DB() *DB {
return t.db
}
-// Writable returns whether the transaction can change data.
-func (t *Transaction) Writable() bool {
- return t.writable
-}
-
// Bucket retrieves a bucket by name.
// Returns nil if the bucket does not exist.
func (t *Transaction) Bucket(name string) *Bucket {
- _assert(t.isOpen(), "transaction not open")
b := t.buckets.get(name)
if b == nil {
return nil
@@ -74,7 +67,6 @@ func (t *Transaction) Bucket(name string) *Bucket {
// Buckets retrieves a list of all buckets.
func (t *Transaction) Buckets() []*Bucket {
- _assert(t.isOpen(), "transaction not open")
buckets := make([]*Bucket, 0, len(t.buckets.items))
for name, b := range t.buckets.items {
bucket := &Bucket{bucket: b, transaction: t, name: name}
@@ -83,263 +75,49 @@ func (t *Transaction) Buckets() []*Bucket {
return buckets
}
-// CreateBucket creates a new bucket.
-// Returns an error if the transaction is read-only, if bucket already exists,
-// if the bucket name is blank, or if the bucket name is too long.
-func (t *Transaction) CreateBucket(name string) error {
- _assert(t.isOpen(), "transaction not open")
- if !t.writable {
- return ErrTransactionNotWritable
- } else if b := t.Bucket(name); b != nil {
- return ErrBucketExists
- } else if len(name) == 0 {
- return ErrBucketNameRequired
- } else if len(name) > MaxBucketNameSize {
- return ErrBucketNameTooLarge
- }
-
- // Create a blank root leaf page.
- p, err := t.allocate(1)
- if err != nil {
- return err
- }
- p.flags = leafPageFlag
-
- // Add bucket to buckets page.
- t.buckets.put(name, &bucket{root: p.id})
-
- return nil
-}
-
-// CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
-// Returns an error if the transaction is read-only, if the bucket name is
-// blank, or if the bucket name is too long.
-func (t *Transaction) CreateBucketIfNotExists(name string) error {
- _assert(t.isOpen(), "transaction not open")
- err := t.CreateBucket(name)
- if err != nil && err != ErrBucketExists {
- return err
- }
- return nil
-}
-
-// DeleteBucket deletes a bucket.
-// Returns an error if the transaction is read-only or if the bucket cannot be found.
-func (t *Transaction) DeleteBucket(name string) error {
- _assert(t.isOpen(), "transaction not open")
- if !t.writable {
- return ErrTransactionNotWritable
- } else if b := t.Bucket(name); b == nil {
- return ErrBucketNotFound
- }
-
- // Remove from buckets page.
- t.buckets.del(name)
-
- // TODO(benbjohnson): Free all pages.
-
- return nil
-}
-
-// Commit writes all changes to disk and updates the meta page.
-// Read-only transactions will simply be closed.
-// Returns an error if a disk write error occurs.
-func (t *Transaction) Commit() error {
- defer t.close()
-
- // Ignore commit for read-only transactions.
- if !t.writable {
- return nil
- }
-
- // TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
-
- // Rebalance and spill data onto dirty pages.
- t.rebalance()
- t.spill()
-
- // Spill buckets page.
- p, err := t.allocate((t.buckets.size() / t.db.pageSize) + 1)
- if err != nil {
- return err
- }
- t.buckets.write(p)
-
- // Write dirty pages to disk.
- if err := t.write(); err != nil {
- return err
- }
-
- // Update the meta.
- t.meta.buckets = p.id
-
- // Write meta to disk.
- if err := t.writeMeta(); err != nil {
- return err
- }
-
- return nil
-}
-
-// Rollback closes the transaction and rolls back any pending changes.
-func (t *Transaction) Rollback() {
- t.close()
-}
-
-func (t *Transaction) close() {
- if t.writable {
- t.db.rwlock.Unlock()
- } else {
- t.db.removeTransaction(t)
+// Cursor creates a cursor associated with a given bucket.
+// The cursor is only valid as long as the Transaction is open.
+// Do not use a cursor after the transaction is closed.
+func (t *Transaction) Cursor(name string) (*Cursor, error) {
+ b := t.Bucket(name)
+ if b == nil {
+ return nil, ErrBucketNotFound
}
-
- // Detach from the database.
- t.db = nil
+ return b.cursor(), nil
}
-// isOpen returns whether the transaction is currently open.
-func (t *Transaction) isOpen() bool {
- return t.db != nil
-}
-
-// allocate returns a contiguous block of memory starting at a given page.
-func (t *Transaction) allocate(count int) (*page, error) {
- p, err := t.db.allocate(count)
+// Get retrieves the value for a key in a named bucket.
+// Returns a nil value if the key does not exist.
+// Returns an error if the bucket does not exist.
+func (t *Transaction) Get(name string, key []byte) (value []byte, err error) {
+ c, err := t.Cursor(name)
if err != nil {
return nil, err
}
-
- // Save to our page cache.
- t.pages[p.id] = p
-
- return p, nil
-}
-
-// rebalance attempts to balance all nodes.
-func (t *Transaction) rebalance() {
- for _, n := range t.nodes {
- n.rebalance()
- }
-}
-
-// spill writes all the nodes to dirty pages.
-func (t *Transaction) spill() error {
- // Keep track of the current root nodes.
- // We will update this at the end once all nodes are created.
- type root struct {
- node *node
- pgid pgid
- }
- var roots []root
-
- // Sort nodes by highest depth first.
- nodes := make(nodesByDepth, 0, len(t.nodes))
- for _, n := range t.nodes {
- nodes = append(nodes, n)
- }
- sort.Sort(nodes)
-
- // Spill nodes by deepest first.
- for i := 0; i < len(nodes); i++ {
- n := nodes[i]
-
- // Save existing root buckets for later.
- if n.parent == nil && n.pgid != 0 {
- roots = append(roots, root{n, n.pgid})
- }
-
- // Split nodes into appropriate sized nodes.
- // The first node in this list will be a reference to n to preserve ancestry.
- newNodes := n.split(t.db.pageSize)
- t.pending = newNodes
-
- // If this is a root node that split then create a parent node.
- if n.parent == nil && len(newNodes) > 1 {
- n.parent = &node{transaction: t, isLeaf: false}
- nodes = append(nodes, n.parent)
- }
-
- // Add node's page to the freelist.
- if n.pgid > 0 {
- t.db.freelist.free(t.id(), t.page(n.pgid))
- }
-
- // Write nodes to dirty pages.
- for i, newNode := range newNodes {
- // Allocate contiguous space for the node.
- p, err := t.allocate((newNode.size() / t.db.pageSize) + 1)
- if err != nil {
- return err
- }
-
- // Write the node to the page.
- newNode.write(p)
- newNode.pgid = p.id
- newNode.parent = n.parent
-
- // The first node should use the existing entry, other nodes are inserts.
- var oldKey []byte
- if i == 0 {
- oldKey = n.key
- } else {
- oldKey = newNode.inodes[0].key
- }
-
- // Update the parent entry.
- if newNode.parent != nil {
- newNode.parent.put(oldKey, newNode.inodes[0].key, nil, newNode.pgid)
- }
- }
-
- t.pending = nil
- }
-
- // Update roots with new roots.
- for _, root := range roots {
- t.buckets.updateRoot(root.pgid, root.node.root().pgid)
+ k, v := c.Seek(key)
+ // If our target node isn't the same key as what's passed in then return nil.
+ if !bytes.Equal(key, k) {
+ return nil, nil
}
-
- // Clear out nodes now that they are all spilled.
- t.nodes = make(map[pgid]*node)
-
- return nil
+ return v, nil
}
-// write writes any dirty pages to disk.
-func (t *Transaction) write() error {
- // Sort pages by id.
- pages := make(pages, 0, len(t.pages))
- for _, p := range t.pages {
- pages = append(pages, p)
+// ForEach executes a function for each key/value pair in a bucket.
+// An error is returned if the bucket cannot be found.
+func (t *Transaction) ForEach(name string, fn func(k, v []byte) error) error {
+ // Open a cursor on the bucket.
+ c, err := t.Cursor(name)
+ if err != nil {
+ return err
}
- sort.Sort(pages)
- // Write pages to disk in order.
- for _, p := range pages {
- size := (int(p.overflow) + 1) * t.db.pageSize
- buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size]
- offset := int64(p.id) * int64(t.db.pageSize)
- if _, err := t.db.file.WriteAt(buf, offset); err != nil {
+ // Iterate over each key/value pair in the bucket.
+ for k, v := c.First(); k != nil; k, v = c.Next() {
+ if err := fn(k, v); err != nil {
return err
}
}
- // Clear out page cache.
- t.pages = make(map[pgid]*page)
-
- return nil
-}
-
-// writeMeta writes the meta to the disk.
-func (t *Transaction) writeMeta() error {
- // Create a temporary buffer for the meta page.
- buf := make([]byte, t.db.pageSize)
- p := t.db.pageInBuffer(buf, 0)
- t.meta.write(p)
-
- // Write the meta page to file.
- t.db.metafile.WriteAt(buf, int64(p.id)*int64(t.db.pageSize))
-
return nil
}
@@ -357,35 +135,6 @@ func (t *Transaction) page(id pgid) *page {
return t.db.page(id)
}
-// node creates a node from a page and associates it with a given parent.
-func (t *Transaction) node(pgid pgid, parent *node) *node {
- // Retrieve node if it has already been fetched.
- if n := t.nodes[pgid]; n != nil {
- return n
- }
-
- // Otherwise create a branch and cache it.
- n := &node{transaction: t, parent: parent}
- if n.parent != nil {
- n.depth = n.parent.depth + 1
- }
- n.read(t.page(pgid))
- t.nodes[pgid] = n
-
- return n
-}
-
-// dereference removes all references to the old mmap.
-func (t *Transaction) dereference() {
- for _, n := range t.nodes {
- n.dereference()
- }
-
- for _, n := range t.pending {
- n.dereference()
- }
-}
-
// forEachPage iterates over every page within a given page and executes a function.
func (t *Transaction) forEachPage(pgid pgid, depth int, fn func(*page, int)) {
p := t.page(pgid)