aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Johnson <benbjohnson@yahoo.com>2014-05-28 10:28:15 -0600
committerBen Johnson <benbjohnson@yahoo.com>2014-05-28 10:31:22 -0600
commitb7896919761d1f942a042603510b30921a8c2009 (patch)
treead956b5e92c9e371df3f10277126b6c5821ecd2e
parentMerge pull request #175 from benbjohnson/check-loop (diff)
downloaddedo-b7896919761d1f942a042603510b30921a8c2009.tar.gz
dedo-b7896919761d1f942a042603510b30921a8c2009.tar.xz
Add streaming check.
This commit changes Tx.Check() to return a channel through which check errors are returned. This allows errors to be found before checking the entire data file.
-rw-r--r--cmd/bolt/check.go32
-rw-r--r--db.go18
-rw-r--r--db_test.go15
-rw-r--r--tx.go41
-rw-r--r--tx_test.go2
5 files changed, 46 insertions, 62 deletions
diff --git a/cmd/bolt/check.go b/cmd/bolt/check.go
index 1436aba..1466fd7 100644
--- a/cmd/bolt/check.go
+++ b/cmd/bolt/check.go
@@ -21,19 +21,27 @@ func Check(path string) {
defer db.Close()
// Perform consistency check.
- err = db.View(func(tx *bolt.Tx) error {
- return tx.Check()
- })
-
- // Print out any errors that occur.
- if err != nil {
- if errors, ok := err.(bolt.ErrorList); ok {
- for _, err := range errors {
+ _ = db.View(func(tx *bolt.Tx) error {
+ var count int
+ ch := tx.Check()
+ loop:
+ for {
+ select {
+ case err, ok := <-ch:
+ if !ok {
+ break loop
+ }
println(err)
+ count++
}
}
- fatalln(err)
- return
- }
- println("OK")
+
+ // Print summary of errors.
+ if count > 0 {
+ fatalf("%d errors found")
+ } else {
+ println("OK")
+ }
+ return nil
+ })
}
diff --git a/db.go b/db.go
index 7b17086..d759b1b 100644
--- a/db.go
+++ b/db.go
@@ -5,7 +5,6 @@ import (
"fmt"
"hash/fnv"
"os"
- "strings"
"sync"
"syscall"
"unsafe"
@@ -632,23 +631,6 @@ func (m *meta) sum64() uint64 {
return h.Sum64()
}
-// ErrorList represents a slice of errors.
-type ErrorList []error
-
-// Error returns a readable count of the errors in the list.
-func (l ErrorList) Error() string {
- return fmt.Sprintf("%d errors occurred", len(l))
-}
-
-// join returns a error messages joined by a string.
-func (l ErrorList) join(sep string) string {
- var a []string
- for _, e := range l {
- a = append(a, e.Error())
- }
- return strings.Join(a, sep)
-}
-
// _assert will panic with a given formatted message if the given condition is false.
func _assert(condition bool, msg string, v ...interface{}) {
if !condition {
diff --git a/db_test.go b/db_test.go
index 06accd3..e1c4aaa 100644
--- a/db_test.go
+++ b/db_test.go
@@ -53,12 +53,12 @@ func TestOpen_Check(t *testing.T) {
withTempPath(func(path string) {
db, err := Open(path, 0666)
assert.NoError(t, err)
- assert.NoError(t, db.View(func(tx *Tx) error { return tx.Check() }))
+ assert.NoError(t, db.View(func(tx *Tx) error { return <-tx.Check() }))
db.Close()
db, err = Open(path, 0666)
assert.NoError(t, err)
- assert.NoError(t, db.View(func(tx *Tx) error { return tx.Check() }))
+ assert.NoError(t, db.View(func(tx *Tx) error { return <-tx.Check() }))
db.Close()
})
}
@@ -464,20 +464,13 @@ func withOpenDB(fn func(*DB, string)) {
// mustCheck runs a consistency check on the database and panics if any errors are found.
func mustCheck(db *DB) {
err := db.Update(func(tx *Tx) error {
- return tx.Check()
+ return <-tx.Check()
})
if err != nil {
// Copy db off first.
var path = tempfile()
db.View(func(tx *Tx) error { return tx.CopyFile(path, 0600) })
-
- if errors, ok := err.(ErrorList); ok {
- for _, err := range errors {
- warn(err)
- }
- }
- warn(err)
- panic("check failure: " + path)
+ panic("check failure: " + err.Error() + ": " + path)
}
}
diff --git a/tx.go b/tx.go
index a0ae5bd..6ebe04e 100644
--- a/tx.go
+++ b/tx.go
@@ -184,10 +184,10 @@ func (tx *Tx) Commit() error {
}
// If strict mode is enabled then perform a consistency check.
+ // Only the first consistency error is reported in the panic.
if tx.db.StrictMode {
- if err := tx.Check(); err != nil {
- err := err.(ErrorList)
- panic("check fail: " + err.Error() + ": " + err.join("; "))
+ if err, ok := <-tx.Check(); ok {
+ panic("check fail: " + err.Error())
}
}
@@ -291,14 +291,18 @@ func (tx *Tx) CopyFile(path string, mode os.FileMode) error {
// because of caching. This overhead can be removed if running on a read-only
// transaction, however, it is not safe to execute other writer transactions at
// the same time.
-func (tx *Tx) Check() error {
- var errors ErrorList
+func (tx *Tx) Check() <-chan error {
+ ch := make(chan error)
+ go tx.check(ch)
+ return ch
+}
+func (tx *Tx) check(ch chan error) {
// Check if any pages are double freed.
freed := make(map[pgid]bool)
for _, id := range tx.db.freelist.all() {
if freed[id] {
- errors = append(errors, fmt.Errorf("page %d: already freed", id))
+ ch <- fmt.Errorf("page %d: already freed", id)
}
freed[id] = true
}
@@ -312,26 +316,23 @@ func (tx *Tx) Check() error {
}
// Recursively check buckets.
- tx.checkBucket(&tx.root, reachable, &errors)
+ tx.checkBucket(&tx.root, reachable, ch)
// Ensure all pages below high water mark are either reachable or freed.
for i := pgid(0); i < tx.meta.pgid; i++ {
_, isReachable := reachable[i]
if !isReachable && !freed[i] {
- errors = append(errors, fmt.Errorf("page %d: unreachable unfreed", int(i)))
+ ch <- fmt.Errorf("page %d: unreachable unfreed", int(i))
} else if isReachable && freed[i] {
- errors = append(errors, fmt.Errorf("page %d: reachable freed", int(i)))
+ ch <- fmt.Errorf("page %d: reachable freed", int(i))
}
}
- if len(errors) > 0 {
- return errors
- }
-
- return nil
+ // Close the channel to signal completion.
+ close(ch)
}
-func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, errors *ErrorList) {
+func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, ch chan error) {
// Ignore inline buckets.
if b.root == 0 {
return
@@ -343,7 +344,7 @@ func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, errors *ErrorList
for i := pgid(0); i <= pgid(p.overflow); i++ {
var id = p.id + i
if _, ok := reachable[id]; ok {
- *errors = append(*errors, fmt.Errorf("page %d: multiple references", int(id)))
+ ch <- fmt.Errorf("page %d: multiple references", int(id))
}
reachable[id] = p
}
@@ -351,18 +352,18 @@ func (tx *Tx) checkBucket(b *Bucket, reachable map[pgid]*page, errors *ErrorList
// Retrieve page info.
info, err := b.tx.Page(int(p.id))
if err != nil {
- *errors = append(*errors, err)
+ ch <- err
} else if info == nil {
- *errors = append(*errors, fmt.Errorf("page %d: out of bounds: %d", int(p.id), int(b.tx.meta.pgid)))
+ ch <- fmt.Errorf("page %d: out of bounds: %d", int(p.id), int(b.tx.meta.pgid))
} else if info.Type != "branch" && info.Type != "leaf" {
- *errors = append(*errors, fmt.Errorf("page %d: invalid type: %s", int(p.id), info.Type))
+ ch <- fmt.Errorf("page %d: invalid type: %s", int(p.id), info.Type)
}
})
// Check each bucket within this bucket.
_ = b.ForEach(func(k, v []byte) error {
if child := b.Bucket(k); child != nil {
- tx.checkBucket(child, reachable, errors)
+ tx.checkBucket(child, reachable, ch)
}
return nil
})
diff --git a/tx_test.go b/tx_test.go
index f6ee3d3..dd04ae6 100644
--- a/tx_test.go
+++ b/tx_test.go
@@ -310,7 +310,7 @@ func TestTx_Check_Corrupt(t *testing.T) {
})
}()
- assert.Equal(t, "check fail: 1 errors occurred: page 3: already freed", msg)
+ assert.Equal(t, "check fail: page 3: already freed", msg)
}
// Ensure that the database can be copied to a file path.