aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Johnson <benbjohnson@yahoo.com>2014-02-11 12:16:12 -0700
committerBen Johnson <benbjohnson@yahoo.com>2014-02-12 11:49:57 -0700
commit7bb878ff695b89b62483dd6155f5815ad0d06258 (patch)
tree1ec591c5aca7bfc099a98433f6a7b01e28c1d9db
parentMerge branch 'master' of https://github.com/boltdb/bolt (diff)
downloaddedo-7bb878ff695b89b62483dd6155f5815ad0d06258.tar.gz
dedo-7bb878ff695b89b62483dd6155f5815ad0d06258.tar.xz
Mmap remap.
-rw-r--r--db.go90
-rw-r--r--db_test.go12
-rw-r--r--node.go24
-rw-r--r--rwtransaction.go52
-rw-r--r--syscall_darwin.go6
-rw-r--r--syscall_darwin_test.go5
-rw-r--r--syscall_linux.go5
-rw-r--r--syscall_linux_test.go5
8 files changed, 171 insertions, 28 deletions
diff --git a/db.go b/db.go
index 65879ec..8592d83 100644
--- a/db.go
+++ b/db.go
@@ -15,6 +15,9 @@ const (
const minPageSize = 0x1000
+const minMmapSize = 1 << 22 // 4MB
+const maxMmapStep = 1 << 30 // 1GB
+
type DB struct {
os _os
syscall _syscall
@@ -98,7 +101,7 @@ func (db *DB) Open(path string, mode os.FileMode) error {
}
// Memory map the data file.
- if err := db.mmap(); err != nil {
+ if err := db.mmap(0); err != nil {
db.close()
return err
}
@@ -113,7 +116,19 @@ func (db *DB) Open(path string, mode os.FileMode) error {
}
// mmap opens the underlying memory-mapped file and initializes the meta references.
-func (db *DB) mmap() error {
+// minsz is the minimum size that the new mmap can be.
+func (db *DB) mmap(minsz int) error {
+ db.mmaplock.Lock()
+ defer db.mmaplock.Unlock()
+
+ // Dereference all mmap references before unmapping.
+ if db.rwtransaction != nil {
+ db.rwtransaction.dereference()
+ }
+
+ // Unmap existing data before continuing.
+ db.munmap()
+
info, err := db.file.Stat()
if err != nil {
return &Error{"mmap stat error", err}
@@ -121,8 +136,12 @@ func (db *DB) mmap() error {
return &Error{"file size too small", err}
}
- // TODO(benbjohnson): Determine appropriate mmap size by db size.
- size := 2 << 30
+ // Ensure the size is at least the minimum size.
+ var size = int(info.Size())
+ if size < minsz {
+ size = minsz
+ }
+ size = db.mmapSize(minsz)
// Memory-map the data file as a byte slice.
if db.data, err = db.syscall.Mmap(int(db.file.Fd()), 0, size, syscall.PROT_READ, syscall.MAP_SHARED); err != nil {
@@ -144,6 +163,35 @@ func (db *DB) mmap() error {
return nil
}
+// munmap unmaps the data file from memory.
+func (db *DB) munmap() {
+ if db.data != nil {
+ if err := db.syscall.Munmap(db.data); err != nil {
+ panic("unmap error: " + err.Error())
+ }
+ db.data = nil
+ }
+}
+
+// mmapSize determines the appropriate size for the mmap given the current size
+// of the database. The minimum size is 4MB and doubles until it reaches 1GB.
+func (db *DB) mmapSize(size int) int {
+ if size < minMmapSize {
+ return minMmapSize
+ } else if size < maxMmapStep {
+ size *= 2
+ } else {
+ size += maxMmapStep
+ }
+
+ // Ensure that the mmap size is a multiple of the page size.
+ if (size % db.pageSize) != 0 {
+ size = ((size / db.pageSize) + 1) * db.pageSize
+ }
+
+ return size
+}
+
// init creates a new database file and initializes its meta pages.
func (db *DB) init() error {
// Set the page size to the OS page size.
@@ -196,8 +244,14 @@ func (db *DB) Close() {
}
func (db *DB) close() {
- // TODO: Undo everything in Open().
+ // Wait for pending transactions before closing and unmapping the data.
+ // db.mmaplock.Lock()
+ // defer db.mmaplock.Unlock()
+
+ // TODO(benbjohnson): Undo everything in Open().
db.freelist = nil
+
+ db.munmap()
}
// Transaction creates a read-only transaction.
@@ -206,6 +260,11 @@ func (db *DB) Transaction() (*Transaction, error) {
db.metalock.Lock()
defer db.metalock.Unlock()
+ // Obtain a read-only lock on the mmap. When the mmap is remapped it will
+ // obtain a write lock so all transactions must finish before it can be
+ // remapped.
+ db.mmaplock.RLock()
+
// Exit if the database is not open yet.
if !db.opened {
return nil, DatabaseNotOpenError
@@ -260,6 +319,9 @@ func (db *DB) removeTransaction(t *Transaction) {
db.metalock.Lock()
defer db.metalock.Unlock()
+ // Release the read lock on the mmap.
+ db.mmaplock.RUnlock()
+
// Remove the transaction.
for i, txn := range db.transactions {
if txn == t {
@@ -412,7 +474,7 @@ func (db *DB) meta() *meta {
}
// allocate returns a contiguous block of memory starting at a given page.
-func (db *DB) allocate(count int) *page {
+func (db *DB) allocate(count int) (*page, error) {
// Allocate a temporary buffer for the page.
buf := make([]byte, count*db.pageSize)
p := (*page)(unsafe.Pointer(&buf[0]))
@@ -420,16 +482,22 @@ func (db *DB) allocate(count int) *page {
// Use pages from the freelist if they are available.
if p.id = db.freelist.allocate(count); p.id != 0 {
- return p
+ return p, nil
}
- // TODO(benbjohnson): Resize mmap().
-
- // If there are no free pages then allocate from the end of the file.
+ // Resize mmap() if we're at the end.
p.id = db.rwtransaction.meta.pgid
+ var minsz = int((p.id+pgid(count))+1) * db.pageSize
+ if minsz >= len(db.data) {
+ if err := db.mmap(minsz); err != nil {
+ return nil, &Error{"mmap allocate error", err}
+ }
+ }
+
+ // Move the page id high water mark.
db.rwtransaction.meta.pgid += pgid(count)
- return p
+ return p, nil
}
// sync flushes the file descriptor to disk.
diff --git a/db_test.go b/db_test.go
index 8211eac..db57825 100644
--- a/db_test.go
+++ b/db_test.go
@@ -187,6 +187,18 @@ func TestDBWriteFail(t *testing.T) {
t.Skip("pending") // TODO(benbjohnson)
}
+// Ensure that the mmap grows appropriately.
+func TestDBMmapSize(t *testing.T) {
+ db := &DB{pageSize: 4096}
+ assert.Equal(t, db.mmapSize(0), minMmapSize)
+ assert.Equal(t, db.mmapSize(16384), minMmapSize)
+ assert.Equal(t, db.mmapSize(minMmapSize-1), minMmapSize)
+ assert.Equal(t, db.mmapSize(minMmapSize), minMmapSize*2)
+ assert.Equal(t, db.mmapSize(10000000), 20000768)
+ assert.Equal(t, db.mmapSize((1<<30)-1), 2147483648)
+ assert.Equal(t, db.mmapSize(1<<30), 1<<31)
+}
+
// withDB executes a function with a database reference.
func withDB(fn func(*DB, string)) {
f, _ := ioutil.TempFile("", "bolt-")
diff --git a/node.go b/node.go
index 11b5e2f..1460f52 100644
--- a/node.go
+++ b/node.go
@@ -161,10 +161,8 @@ func (n *node) write(p *page) {
// Initialize page.
if n.isLeaf {
p.flags |= p_leaf
- // warn("∑", p.id, "leaf")
} else {
p.flags |= p_branch
- // warn("∑", p.id, "branch")
}
p.count = uint16(len(n.inodes))
@@ -177,13 +175,11 @@ func (n *node) write(p *page) {
elem.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(elem)))
elem.ksize = uint32(len(item.key))
elem.vsize = uint32(len(item.value))
- // warn(" »", string(item.key), "->", string(item.value))
} else {
elem := p.branchPageElement(uint16(i))
elem.pos = uint32(uintptr(unsafe.Pointer(&b[0])) - uintptr(unsafe.Pointer(elem)))
elem.ksize = uint32(len(item.key))
elem.pgid = item.pgid
- // warn(" »", string(item.key))
}
// Write data for the element to the end of the page.
@@ -341,6 +337,26 @@ func (n *node) rebalance() {
n.parent.rebalance()
}
+// dereference causes the node to copy all its inode key/value references to heap memory.
+// This is required when the mmap is reallocated so inodes are not pointing to stale data.
+func (n *node) dereference() {
+ key := make([]byte, len(n.key))
+ copy(key, n.key)
+ n.key = key
+
+ for i, _ := range n.inodes {
+ inode := &n.inodes[i]
+
+ key := make([]byte, len(inode.key))
+ copy(key, inode.key)
+ inode.key = key
+
+ value := make([]byte, len(inode.value))
+ copy(value, inode.value)
+ inode.value = value
+ }
+}
+
// nodesByDepth sorts a list of branches by deepest first.
type nodesByDepth []*node
diff --git a/rwtransaction.go b/rwtransaction.go
index 5058dd3..b96df51 100644
--- a/rwtransaction.go
+++ b/rwtransaction.go
@@ -9,7 +9,8 @@ import (
// Only one read/write transaction can be active for a DB at a time.
type RWTransaction struct {
Transaction
- nodes map[pgid]*node
+ nodes map[pgid]*node
+ pending []*node
}
// init initializes the transaction.
@@ -35,7 +36,10 @@ func (t *RWTransaction) CreateBucket(name string) error {
}
// Create a blank root leaf page.
- p := t.allocate(1)
+ p, err := t.allocate(1)
+ if err != nil {
+ return err
+ }
p.flags = p_leaf
// Add bucket to buckets page.
@@ -100,14 +104,15 @@ func (t *RWTransaction) Commit() error {
// TODO(benbjohnson): Use vectorized I/O to write out dirty pages.
- // TODO(benbjohnson): Move rebalancing to occur immediately after deletion (?).
-
// Rebalance and spill data onto dirty pages.
t.rebalance()
t.spill()
// Spill buckets page.
- p := t.allocate((t.buckets.size() / t.db.pageSize) + 1)
+ p, err := t.allocate((t.buckets.size() / t.db.pageSize) + 1)
+ if err != nil {
+ return err
+ }
t.buckets.write(p)
// Write dirty pages to disk.
@@ -135,13 +140,16 @@ func (t *RWTransaction) close() {
}
// allocate returns a contiguous block of memory starting at a given page.
-func (t *RWTransaction) allocate(count int) *page {
- p := t.db.allocate(count)
+func (t *RWTransaction) allocate(count int) (*page, error) {
+ p, err := t.db.allocate(count)
+ if err != nil {
+ return nil, err
+ }
// Save to our page cache.
t.pages[p.id] = p
- return p
+ return p, nil
}
// rebalance attempts to balance all nodes.
@@ -152,7 +160,7 @@ func (t *RWTransaction) rebalance() {
}
// spill writes all the nodes to dirty pages.
-func (t *RWTransaction) spill() {
+func (t *RWTransaction) spill() error {
// Keep track of the current root nodes.
// We will update this at the end once all nodes are created.
type root struct {
@@ -180,6 +188,7 @@ func (t *RWTransaction) spill() {
// Split nodes into appropriate sized nodes.
// The first node in this list will be a reference to n to preserve ancestry.
newNodes := n.split(t.db.pageSize)
+ t.pending = newNodes
// If this is a root node that split then create a parent node.
if n.parent == nil && len(newNodes) > 1 {
@@ -195,7 +204,10 @@ func (t *RWTransaction) spill() {
// Write nodes to dirty pages.
for i, newNode := range newNodes {
// Allocate contiguous space for the node.
- p := t.allocate((newNode.size() / t.db.pageSize) + 1)
+ p, err := t.allocate((newNode.size() / t.db.pageSize) + 1)
+ if err != nil {
+ return err
+ }
// Write the node to the page.
newNode.write(p)
@@ -215,6 +227,8 @@ func (t *RWTransaction) spill() {
newNode.parent.put(oldKey, newNode.inodes[0].key, nil, newNode.pgid)
}
}
+
+ t.pending = nil
}
// Update roots with new roots.
@@ -224,12 +238,12 @@ func (t *RWTransaction) spill() {
// Clear out nodes now that they are all spilled.
t.nodes = make(map[pgid]*node)
+
+ return nil
}
// write writes any dirty pages to disk.
func (t *RWTransaction) write() error {
- // TODO(benbjohnson): If our last page id is greater than the mmap size then lock the DB and resize.
-
// Sort pages by id.
pages := make(pages, 0, len(t.pages))
for _, p := range t.pages {
@@ -247,6 +261,9 @@ func (t *RWTransaction) write() error {
}
}
+ // Clear out page cache.
+ t.pages = make(map[pgid]*page)
+
return nil
}
@@ -280,3 +297,14 @@ func (t *RWTransaction) node(pgid pgid, parent *node) *node {
return n
}
+
+// dereference removes all references to the old mmap.
+func (t *RWTransaction) dereference() {
+ for _, n := range t.nodes {
+ n.dereference()
+ }
+
+ for _, n := range t.pending {
+ n.dereference()
+ }
+}
diff --git a/syscall_darwin.go b/syscall_darwin.go
index de34193..cb9a20c 100644
--- a/syscall_darwin.go
+++ b/syscall_darwin.go
@@ -6,11 +6,15 @@ import (
type _syscall interface {
Mmap(fd int, offset int64, length int, prot int, flags int) (data []byte, err error)
+ Munmap([]byte) error
}
type syssyscall struct{}
func (o *syssyscall) Mmap(fd int, offset int64, length int, prot int, flags int) (data []byte, err error) {
- // err = (EACCES, EBADF, EINVAL, ENODEV, ENOMEM, ENXIO, EOVERFLOW)
return syscall.Mmap(fd, offset, length, prot, flags)
}
+
+func (o *syssyscall) Munmap(b []byte) error {
+ return syscall.Munmap(b)
+}
diff --git a/syscall_darwin_test.go b/syscall_darwin_test.go
index 9e64cf7..6b468f6 100644
--- a/syscall_darwin_test.go
+++ b/syscall_darwin_test.go
@@ -12,3 +12,8 @@ func (m *mocksyscall) Mmap(fd int, offset int64, length int, prot int, flags int
args := m.Called(fd, offset, length, prot, flags)
return args.Get(0).([]byte), args.Error(1)
}
+
+func (m *mocksyscall) Munmap(b []byte) error {
+ args := m.Called(b)
+ return args.Error(0)
+}
diff --git a/syscall_linux.go b/syscall_linux.go
index de34193..65daad4 100644
--- a/syscall_linux.go
+++ b/syscall_linux.go
@@ -6,6 +6,7 @@ import (
type _syscall interface {
Mmap(fd int, offset int64, length int, prot int, flags int) (data []byte, err error)
+ Munmap([]byte) error
}
type syssyscall struct{}
@@ -14,3 +15,7 @@ func (o *syssyscall) Mmap(fd int, offset int64, length int, prot int, flags int)
// err = (EACCES, EBADF, EINVAL, ENODEV, ENOMEM, ENXIO, EOVERFLOW)
return syscall.Mmap(fd, offset, length, prot, flags)
}
+
+func (o *syssyscall) Munmap(b []byte) error {
+ return syscall.Munmap(b)
+}
diff --git a/syscall_linux_test.go b/syscall_linux_test.go
index 9e64cf7..6b468f6 100644
--- a/syscall_linux_test.go
+++ b/syscall_linux_test.go
@@ -12,3 +12,8 @@ func (m *mocksyscall) Mmap(fd int, offset int64, length int, prot int, flags int
args := m.Called(fd, offset, length, prot, flags)
return args.Get(0).([]byte), args.Error(1)
}
+
+func (m *mocksyscall) Munmap(b []byte) error {
+ args := m.Called(b)
+ return args.Error(0)
+}