diff options
Diffstat (limited to 'rwcursor.go')
-rw-r--r-- | rwcursor.go | 601 |
1 files changed, 601 insertions, 0 deletions
diff --git a/rwcursor.go b/rwcursor.go new file mode 100644 index 0000000..553989f --- /dev/null +++ b/rwcursor.go @@ -0,0 +1,601 @@ +package bolt + +// RWCursor represents a cursor that can read and write data for a bucket. +type RWCursor struct { + Cursor + transaction *RWTransaction + reclaimed []pgno /**< Reclaimed freeDB pages, or NULL before use (was me_pghead) */ + last txnid /**< ID of last used record, or 0 if len(reclaimed) == 0 */ +} + +func (c *RWCursor) Put(key []byte, value []byte) error { + // Make sure this cursor was created by a transaction. + if c.transaction == nil { + return &Error{"invalid cursor", nil} + } + db := c.transaction.db + + // Validate the key we're using. + if key == nil { + return &Error{"key required", nil} + } else if len(key) > db.maxKeySize { + return &Error{"key too large", nil} + } + + // TODO: Validate data size based on MaxKeySize if DUPSORT. + + // Validate the size of our data. + if len(data) > MaxDataSize { + return &Error{"data too large", nil} + } + + // If we don't have a root page then add one. + if c.bucket.root == p_invalid { + p, err := c.newLeafPage() + if err != nil { + return err + } + c.push(p) + c.bucket.root = p.id + c.bucket.root++ + // TODO: *mc->mc_dbflag |= DB_DIRTY; + // TODO? mc->mc_flags |= C_INITIALIZED; + } + + // TODO: Move to key. + exists, err := c.moveTo(key) + if err != nil { + return err + } + + // TODO: spill? + if err := c.spill(key, data); err != nil { + return err + } + + // Make sure all cursor pages are writable + if err := c.touch(); err != nil { + return err + } + + // If key does not exist the + if exists { + node := c.currentNode() + + } + + /* + + insert = rc; + if (insert) { + // The key does not exist + DPRINTF(("inserting key at index %i", mc->mc_ki[mc->mc_top])); + if ((mc->mc_db->md_flags & MDB_DUPSORT) && + LEAFSIZE(key, data) > env->me_nodemax) + { + // Too big for a node, insert in sub-DB + fp_flags = P_LEAF|P_DIRTY; + fp = env->me_pbuf; + fp->mp_pad = data->mv_size; // used if MDB_DUPFIXED + fp->mp_lower = fp->mp_upper = olddata.mv_size = PAGEHDRSZ; + goto prep_subDB; + } + } else { + +more: + leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); + olddata.mv_size = NODEDSZ(leaf); + olddata.mv_data = NODEDATA(leaf); + + // DB has dups? + if (F_ISSET(mc->mc_db->md_flags, MDB_DUPSORT)) { + // Prepare (sub-)page/sub-DB to accept the new item, + // if needed. fp: old sub-page or a header faking + // it. mp: new (sub-)page. offset: growth in page + // size. xdata: node data with new page or DB. + ssize_t i, offset = 0; + mp = fp = xdata.mv_data = env->me_pbuf; + mp->mp_pgno = mc->mc_pg[mc->mc_top]->mp_pgno; + + // Was a single item before, must convert now + if (!F_ISSET(leaf->mn_flags, F_DUPDATA)) { + // Just overwrite the current item + if (flags == MDB_CURRENT) + goto current; + +#if UINT_MAX < SIZE_MAX + if (mc->mc_dbx->md_dcmp == mdb_cmp_int && olddata.mv_size == sizeof(size_t)) +#ifdef MISALIGNED_OK + mc->mc_dbx->md_dcmp = mdb_cmp_long; +#else + mc->mc_dbx->md_dcmp = mdb_cmp_cint; +#endif +#endif + // if data matches, skip it + if (!mc->mc_dbx->md_dcmp(data, &olddata)) { + if (flags & MDB_NODUPDATA) + rc = MDB_KEYEXIST; + else if (flags & MDB_MULTIPLE) + goto next_mult; + else + rc = MDB_SUCCESS; + return rc; + } + + // Back up original data item + dkey.mv_size = olddata.mv_size; + dkey.mv_data = memcpy(fp+1, olddata.mv_data, olddata.mv_size); + + // Make sub-page header for the dup items, with dummy body + fp->mp_flags = P_LEAF|P_DIRTY|P_SUBP; + fp->mp_lower = PAGEHDRSZ; + xdata.mv_size = PAGEHDRSZ + dkey.mv_size + data->mv_size; + if (mc->mc_db->md_flags & MDB_DUPFIXED) { + fp->mp_flags |= P_LEAF2; + fp->mp_pad = data->mv_size; + xdata.mv_size += 2 * data->mv_size; // leave space for 2 more + } else { + xdata.mv_size += 2 * (sizeof(indx_t) + NODESIZE) + + (dkey.mv_size & 1) + (data->mv_size & 1); + } + fp->mp_upper = xdata.mv_size; + olddata.mv_size = fp->mp_upper; // pretend olddata is fp + } else if (leaf->mn_flags & F_SUBDATA) { + // Data is on sub-DB, just store it + flags |= F_DUPDATA|F_SUBDATA; + goto put_sub; + } else { + // Data is on sub-page + fp = olddata.mv_data; + switch (flags) { + default: + i = -(ssize_t)SIZELEFT(fp); + if (!(mc->mc_db->md_flags & MDB_DUPFIXED)) { + offset = i += (ssize_t) EVEN( + sizeof(indx_t) + NODESIZE + data->mv_size); + } else { + i += offset = fp->mp_pad; + offset *= 4; // space for 4 more + } + if (i > 0) + break; + // FALLTHRU: Sub-page is big enough + case MDB_CURRENT: + fp->mp_flags |= P_DIRTY; + COPY_PGNO(fp->mp_pgno, mp->mp_pgno); + mc->mc_xcursor->mx_cursor.mc_pg[0] = fp; + flags |= F_DUPDATA; + goto put_sub; + } + xdata.mv_size = olddata.mv_size + offset; + } + + fp_flags = fp->mp_flags; + if (NODESIZE + NODEKSZ(leaf) + xdata.mv_size > env->me_nodemax) { + // Too big for a sub-page, convert to sub-DB + fp_flags &= ~P_SUBP; +prep_subDB: + dummy.md_pad = 0; + dummy.md_flags = 0; + dummy.md_depth = 1; + dummy.md_branch_pages = 0; + dummy.md_leaf_pages = 1; + dummy.md_overflow_pages = 0; + dummy.md_entries = NUMKEYS(fp); + xdata.mv_size = sizeof(MDB_db); + xdata.mv_data = &dummy; + if ((rc = mdb_page_alloc(mc, 1, &mp))) + return rc; + offset = env->me_psize - olddata.mv_size; + flags |= F_DUPDATA|F_SUBDATA; + dummy.md_root = mp->mp_pgno; + } + if (mp != fp) { + mp->mp_flags = fp_flags | P_DIRTY; + mp->mp_pad = fp->mp_pad; + mp->mp_lower = fp->mp_lower; + mp->mp_upper = fp->mp_upper + offset; + if (fp_flags & P_LEAF2) { + memcpy(METADATA(mp), METADATA(fp), NUMKEYS(fp) * fp->mp_pad); + } else { + memcpy((char *)mp + mp->mp_upper, (char *)fp + fp->mp_upper, + olddata.mv_size - fp->mp_upper); + for (i = NUMKEYS(fp); --i >= 0; ) + mp->mp_ptrs[i] = fp->mp_ptrs[i] + offset; + } + } + + rdata = &xdata; + flags |= F_DUPDATA; + do_sub = 1; + if (!insert) + mdb_node_del(mc, 0); + goto new_sub; + } +current: + // overflow page overwrites need special handling + if (F_ISSET(leaf->mn_flags, F_BIGDATA)) { + MDB_page *omp; + pgno_t pg; + int level, ovpages, dpages = OVPAGES(data->mv_size, env->me_psize); + + memcpy(&pg, olddata.mv_data, sizeof(pg)); + if ((rc2 = mdb_page_get(mc->mc_txn, pg, &omp, &level)) != 0) + return rc2; + ovpages = omp->mp_pages; + + // Is the ov page large enough? + if (ovpages >= dpages) { + if (!(omp->mp_flags & P_DIRTY) && + (level || (env->me_flags & MDB_WRITEMAP))) + { + rc = mdb_page_unspill(mc->mc_txn, omp, &omp); + if (rc) + return rc; + level = 0; // dirty in this txn or clean + } + // Is it dirty? + if (omp->mp_flags & P_DIRTY) { + // yes, overwrite it. Note in this case we don't + // bother to try shrinking the page if the new data + // is smaller than the overflow threshold. + if (level > 1) { + // It is writable only in a parent txn + size_t sz = (size_t) env->me_psize * ovpages, off; + MDB_page *np = mdb_page_malloc(mc->mc_txn, ovpages); + MDB_ID2 id2; + if (!np) + return ENOMEM; + id2.mid = pg; + id2.mptr = np; + rc = mdb_mid2l_insert(mc->mc_txn->mt_u.dirty_list, &id2); + mdb_cassert(mc, rc == 0); + if (!(flags & MDB_RESERVE)) { + // Copy end of page, adjusting alignment so + // compiler may copy words instead of bytes. + off = (PAGEHDRSZ + data->mv_size) & -sizeof(size_t); + memcpy((size_t *)((char *)np + off), + (size_t *)((char *)omp + off), sz - off); + sz = PAGEHDRSZ; + } + memcpy(np, omp, sz); // Copy beginning of page + omp = np; + } + SETDSZ(leaf, data->mv_size); + if (F_ISSET(flags, MDB_RESERVE)) + data->mv_data = METADATA(omp); + else + memcpy(METADATA(omp), data->mv_data, data->mv_size); + goto done; + } + } + if ((rc2 = mdb_ovpage_free(mc, omp)) != MDB_SUCCESS) + return rc2; + } else if (data->mv_size == olddata.mv_size) { + // same size, just replace it. Note that we could + // also reuse this node if the new data is smaller, + // but instead we opt to shrink the node in that case. + if (F_ISSET(flags, MDB_RESERVE)) + data->mv_data = olddata.mv_data; + else if (data->mv_size) + memcpy(olddata.mv_data, data->mv_data, data->mv_size); + else + memcpy(NODEKEY(leaf), key->mv_data, key->mv_size); + goto done; + } + mdb_node_del(mc, 0); + mc->mc_db->md_entries--; + } + + rdata = data; + +new_sub: + nflags = flags & NODE_ADD_FLAGS; + nsize = IS_LEAF2(mc->mc_pg[mc->mc_top]) ? key->mv_size : mdb_leaf_size(env, key, rdata); + if (SIZELEFT(mc->mc_pg[mc->mc_top]) < nsize) { + if (( flags & (F_DUPDATA|F_SUBDATA)) == F_DUPDATA ) + nflags &= ~MDB_APPEND; + if (!insert) + nflags |= MDB_SPLIT_REPLACE; + rc = mdb_page_split(mc, key, rdata, P_INVALID, nflags); + } else { + // There is room already in this leaf page. + rc = mdb_node_add(mc, mc->mc_ki[mc->mc_top], key, rdata, 0, nflags); + if (rc == 0 && !do_sub && insert) { + // Adjust other cursors pointing to mp + MDB_cursor *m2, *m3; + MDB_dbi dbi = mc->mc_dbi; + unsigned i = mc->mc_top; + MDB_page *mp = mc->mc_pg[i]; + + for (m2 = mc->mc_txn->mt_cursors[dbi]; m2; m2=m2->mc_next) { + if (mc->mc_flags & C_SUB) + m3 = &m2->mc_xcursor->mx_cursor; + else + m3 = m2; + if (m3 == mc || m3->mc_snum < mc->mc_snum) continue; + if (m3->mc_pg[i] == mp && m3->mc_ki[i] >= mc->mc_ki[i]) { + m3->mc_ki[i]++; + } + } + } + } + + if (rc != MDB_SUCCESS) + mc->mc_txn->mt_flags |= MDB_TXN_ERROR; + else { + // Now store the actual data in the child DB. Note that we're + // storing the user data in the keys field, so there are strict + // size limits on dupdata. The actual data fields of the child + // DB are all zero size. + if (do_sub) { + int xflags; +put_sub: + xdata.mv_size = 0; + xdata.mv_data = ""; + leaf = NODEPTR(mc->mc_pg[mc->mc_top], mc->mc_ki[mc->mc_top]); + if (flags & MDB_CURRENT) { + xflags = MDB_CURRENT|MDB_NOSPILL; + } else { + mdb_xcursor_init1(mc, leaf); + xflags = (flags & MDB_NODUPDATA) ? + MDB_NOOVERWRITE|MDB_NOSPILL : MDB_NOSPILL; + } + // converted, write the original data first + if (dkey.mv_size) { + rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, &dkey, &xdata, xflags); + if (rc) + return rc; + { + // Adjust other cursors pointing to mp + MDB_cursor *m2; + unsigned i = mc->mc_top; + MDB_page *mp = mc->mc_pg[i]; + + for (m2 = mc->mc_txn->mt_cursors[mc->mc_dbi]; m2; m2=m2->mc_next) { + if (m2 == mc || m2->mc_snum < mc->mc_snum) continue; + if (!(m2->mc_flags & C_INITIALIZED)) continue; + if (m2->mc_pg[i] == mp && m2->mc_ki[i] == mc->mc_ki[i]) { + mdb_xcursor_init1(m2, leaf); + } + } + } + // we've done our job + dkey.mv_size = 0; + } + if (flags & MDB_APPENDDUP) + xflags |= MDB_APPEND; + rc = mdb_cursor_put(&mc->mc_xcursor->mx_cursor, data, &xdata, xflags); + if (flags & F_SUBDATA) { + void *db = NODEDATA(leaf); + memcpy(db, &mc->mc_xcursor->mx_db, sizeof(MDB_db)); + } + } + // sub-writes might have failed so check rc again. + // Don't increment count if we just replaced an existing item. + if (!rc && !(flags & MDB_CURRENT)) + mc->mc_db->md_entries++; + if (flags & MDB_MULTIPLE) { + if (!rc) { +next_mult: + mcount++; + // let caller know how many succeeded, if any + data[1].mv_size = mcount; + if (mcount < dcount) { + data[0].mv_data = (char *)data[0].mv_data + data[0].mv_size; + goto more; + } + } + } + } +done: + // If we succeeded and the key didn't exist before, make sure + // the cursor is marked valid. + if (!rc && insert) + mc->mc_flags |= C_INITIALIZED; + return rc; + */ + return nil +} + +// newLeafPage allocates and initialize new a new leaf page. +func (c *RWCursor) newLeafPage() (*page, error) { + // Allocate page. + p, err := c.allocatePage(1) + if err != nil { + return nil, err + } + + // Set flags and bounds. + p.flags = p_leaf | p_dirty + p.lower = pageHeaderSize + p.upper = c.transaction.db.pageSize + c.leafs += 1 + + return p, nil +} + +// newBranchPage allocates and initialize new a new branch page. +func (b *RWCursor) newBranchPage() (*page, error) { + // Allocate page. + p, err := c.allocatePage(1) + if err != nil { + return nil, err + } + + // Set flags and bounds. + p.flags = p_branch | p_dirty + p.lower = pageHeaderSize + p.upper = c.transaction.db.pageSize + c.bucket.branches += 1 + + return p, nil +} + +// newOverflowPage allocates and initialize new overflow pages. +func (b *RWCursor) newOverflowPage(count int) (*page, error) { + // Allocate page. + p, err := c.allocatePage(count) + if err != nil { + return nil, err + } + + // Set flags and bounds. + p.flags = p_overflow | p_dirty + p.lower = pageHeaderSize + p.upper = c.transaction.db.pageSize + c.bucket.overflows += count + + return p, nil +} + +// Allocate page numbers and memory for writing. Maintain me_pglast, +// me_pghead and mt_next_pgno. +// +// If there are free pages available from older transactions, they +// are re-used first. Otherwise allocate a new page at mt_next_pgno. +// Do not modify the freedB, just merge freeDB records into me_pghead[] +// and move me_pglast to say which records were consumed. Only this +// function can create me_pghead and move me_pglast/mt_next_pgno. +// @param[in] mc cursor A cursor handle identifying the transaction and +// database for which we are allocating. +// @param[in] num the number of pages to allocate. +// @param[out] mp Address of the allocated page(s). Requests for multiple pages +// will always be satisfied by a single contiguous chunk of memory. +// @return 0 on success, non-zero on failure. + +// allocatePage allocates a new page. +func (c *RWCursor) allocatePage(count int) (*page, error) { + head := env.pagestate.head + + // TODO? + // If our dirty list is already full, we can't do anything + // if (txn->mt_dirty_room == 0) { + // rc = MDB_TXN_FULL; + // goto fail; + // } + + /* + int rc, retry = INT_MAX; + MDB_txn *txn = mc->mc_txn; + MDB_env *env = txn->mt_env; + pgno_t pgno, *mop = env->me_pghead; + unsigned i, j, k, mop_len = mop ? mop[0] : 0, n2 = num-1; + MDB_page *np; + txnid_t oldest = 0, last; + MDB_cursor_op op; + MDB_cursor m2; + + *mp = NULL; + + + for (op = MDB_FIRST;; op = MDB_NEXT) { + MDB_val key, data; + MDB_node *leaf; + pgno_t *idl, old_id, new_id; + + // Seek a big enough contiguous page range. Prefer + // pages at the tail, just truncating the list. + if (mop_len > n2) { + i = mop_len; + do { + pgno = mop[i]; + if (mop[i-n2] == pgno+n2) + goto search_done; + } while (--i > n2); + if (Max_retries < INT_MAX && --retry < 0) + break; + } + + if (op == MDB_FIRST) { // 1st iteration + // Prepare to fetch more and coalesce + oldest = mdb_find_oldest(txn); + last = env->me_pglast; + mdb_cursor_init(&m2, txn, FREE_DBI, NULL); + if (last) { + op = MDB_SET_RANGE; + key.mv_data = &last; // will look up last+1 + key.mv_size = sizeof(last); + } + } + + last++; + // Do not fetch more if the record will be too recent + if (oldest <= last) + break; + rc = mdb_cursor_get(&m2, &key, NULL, op); + if (rc) { + if (rc == MDB_NOTFOUND) + break; + goto fail; + } + last = *(txnid_t*)key.mv_data; + if (oldest <= last) + break; + np = m2.mc_pg[m2.mc_top]; + leaf = NODEPTR(np, m2.mc_ki[m2.mc_top]); + if ((rc = mdb_node_read(txn, leaf, &data)) != MDB_SUCCESS) + return rc; + + idl = (MDB_ID *) data.mv_data; + i = idl[0]; + if (!mop) { + if (!(env->me_pghead = mop = mdb_midl_alloc(i))) { + rc = ENOMEM; + goto fail; + } + } else { + if ((rc = mdb_midl_need(&env->me_pghead, i)) != 0) + goto fail; + mop = env->me_pghead; + } + env->me_pglast = last; + + // Merge in descending sorted order + j = mop_len; + k = mop_len += i; + mop[0] = (pgno_t)-1; + old_id = mop[j]; + while (i) { + new_id = idl[i--]; + for (; old_id < new_id; old_id = mop[--j]) + mop[k--] = old_id; + mop[k--] = new_id; + } + mop[0] = mop_len; + } + + // Use new pages from the map when nothing suitable in the freeDB + i = 0; + pgno = txn->mt_next_pgno; + if (pgno + num >= env->me_maxpg) { + DPUTS("DB size maxed out"); + rc = MDB_MAP_FULL; + goto fail; + } + + search_done: + if (!(np = mdb_page_malloc(txn, num))) { + rc = ENOMEM; + goto fail; + } + if (i) { + mop[0] = mop_len -= num; + // Move any stragglers down + for (j = i-num; j < mop_len; ) + mop[++j] = mop[++i]; + } else { + txn->mt_next_pgno = pgno + num; + } + np->mp_pgno = pgno; + mdb_page_dirty(txn, np); + *mp = np; + + return MDB_SUCCESS; + + fail: + txn->mt_flags |= MDB_TXN_ERROR; + return rc; + */ + return nil +} |