532 lines
12 KiB
Go
532 lines
12 KiB
Go
|
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
|
||
|
// All rights reserved.
|
||
|
//
|
||
|
// Use of this source code is governed by a BSD-style license that can be
|
||
|
// found in the LICENSE file.
|
||
|
|
||
|
package leveldb
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"sort"
|
||
|
"sync/atomic"
|
||
|
|
||
|
"github.com/syndtr/goleveldb/leveldb/cache"
|
||
|
"github.com/syndtr/goleveldb/leveldb/iterator"
|
||
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
||
|
"github.com/syndtr/goleveldb/leveldb/storage"
|
||
|
"github.com/syndtr/goleveldb/leveldb/table"
|
||
|
"github.com/syndtr/goleveldb/leveldb/util"
|
||
|
)
|
||
|
|
||
|
// tFile holds basic information about a table.
|
||
|
type tFile struct {
|
||
|
fd storage.FileDesc
|
||
|
seekLeft int32
|
||
|
size int64
|
||
|
imin, imax internalKey
|
||
|
}
|
||
|
|
||
|
// Returns true if given key is after largest key of this table.
|
||
|
func (t *tFile) after(icmp *iComparer, ukey []byte) bool {
|
||
|
return ukey != nil && icmp.uCompare(ukey, t.imax.ukey()) > 0
|
||
|
}
|
||
|
|
||
|
// Returns true if given key is before smallest key of this table.
|
||
|
func (t *tFile) before(icmp *iComparer, ukey []byte) bool {
|
||
|
return ukey != nil && icmp.uCompare(ukey, t.imin.ukey()) < 0
|
||
|
}
|
||
|
|
||
|
// Returns true if given key range overlaps with this table key range.
|
||
|
func (t *tFile) overlaps(icmp *iComparer, umin, umax []byte) bool {
|
||
|
return !t.after(icmp, umin) && !t.before(icmp, umax)
|
||
|
}
|
||
|
|
||
|
// Cosumes one seek and return current seeks left.
|
||
|
func (t *tFile) consumeSeek() int32 {
|
||
|
return atomic.AddInt32(&t.seekLeft, -1)
|
||
|
}
|
||
|
|
||
|
// Creates new tFile.
|
||
|
func newTableFile(fd storage.FileDesc, size int64, imin, imax internalKey) *tFile {
|
||
|
f := &tFile{
|
||
|
fd: fd,
|
||
|
size: size,
|
||
|
imin: imin,
|
||
|
imax: imax,
|
||
|
}
|
||
|
|
||
|
// We arrange to automatically compact this file after
|
||
|
// a certain number of seeks. Let's assume:
|
||
|
// (1) One seek costs 10ms
|
||
|
// (2) Writing or reading 1MB costs 10ms (100MB/s)
|
||
|
// (3) A compaction of 1MB does 25MB of IO:
|
||
|
// 1MB read from this level
|
||
|
// 10-12MB read from next level (boundaries may be misaligned)
|
||
|
// 10-12MB written to next level
|
||
|
// This implies that 25 seeks cost the same as the compaction
|
||
|
// of 1MB of data. I.e., one seek costs approximately the
|
||
|
// same as the compaction of 40KB of data. We are a little
|
||
|
// conservative and allow approximately one seek for every 16KB
|
||
|
// of data before triggering a compaction.
|
||
|
f.seekLeft = int32(size / 16384)
|
||
|
if f.seekLeft < 100 {
|
||
|
f.seekLeft = 100
|
||
|
}
|
||
|
|
||
|
return f
|
||
|
}
|
||
|
|
||
|
func tableFileFromRecord(r atRecord) *tFile {
|
||
|
return newTableFile(storage.FileDesc{Type: storage.TypeTable, Num: r.num}, r.size, r.imin, r.imax)
|
||
|
}
|
||
|
|
||
|
// tFiles hold multiple tFile.
|
||
|
type tFiles []*tFile
|
||
|
|
||
|
func (tf tFiles) Len() int { return len(tf) }
|
||
|
func (tf tFiles) Swap(i, j int) { tf[i], tf[j] = tf[j], tf[i] }
|
||
|
|
||
|
func (tf tFiles) nums() string {
|
||
|
x := "[ "
|
||
|
for i, f := range tf {
|
||
|
if i != 0 {
|
||
|
x += ", "
|
||
|
}
|
||
|
x += fmt.Sprint(f.fd.Num)
|
||
|
}
|
||
|
x += " ]"
|
||
|
return x
|
||
|
}
|
||
|
|
||
|
// Returns true if i smallest key is less than j.
|
||
|
// This used for sort by key in ascending order.
|
||
|
func (tf tFiles) lessByKey(icmp *iComparer, i, j int) bool {
|
||
|
a, b := tf[i], tf[j]
|
||
|
n := icmp.Compare(a.imin, b.imin)
|
||
|
if n == 0 {
|
||
|
return a.fd.Num < b.fd.Num
|
||
|
}
|
||
|
return n < 0
|
||
|
}
|
||
|
|
||
|
// Returns true if i file number is greater than j.
|
||
|
// This used for sort by file number in descending order.
|
||
|
func (tf tFiles) lessByNum(i, j int) bool {
|
||
|
return tf[i].fd.Num > tf[j].fd.Num
|
||
|
}
|
||
|
|
||
|
// Sorts tables by key in ascending order.
|
||
|
func (tf tFiles) sortByKey(icmp *iComparer) {
|
||
|
sort.Sort(&tFilesSortByKey{tFiles: tf, icmp: icmp})
|
||
|
}
|
||
|
|
||
|
// Sorts tables by file number in descending order.
|
||
|
func (tf tFiles) sortByNum() {
|
||
|
sort.Sort(&tFilesSortByNum{tFiles: tf})
|
||
|
}
|
||
|
|
||
|
// Returns sum of all tables size.
|
||
|
func (tf tFiles) size() (sum int64) {
|
||
|
for _, t := range tf {
|
||
|
sum += t.size
|
||
|
}
|
||
|
return sum
|
||
|
}
|
||
|
|
||
|
// Searches smallest index of tables whose its smallest
|
||
|
// key is after or equal with given key.
|
||
|
func (tf tFiles) searchMin(icmp *iComparer, ikey internalKey) int {
|
||
|
return sort.Search(len(tf), func(i int) bool {
|
||
|
return icmp.Compare(tf[i].imin, ikey) >= 0
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// Searches smallest index of tables whose its largest
|
||
|
// key is after or equal with given key.
|
||
|
func (tf tFiles) searchMax(icmp *iComparer, ikey internalKey) int {
|
||
|
return sort.Search(len(tf), func(i int) bool {
|
||
|
return icmp.Compare(tf[i].imax, ikey) >= 0
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// Returns true if given key range overlaps with one or more
|
||
|
// tables key range. If unsorted is true then binary search will not be used.
|
||
|
func (tf tFiles) overlaps(icmp *iComparer, umin, umax []byte, unsorted bool) bool {
|
||
|
if unsorted {
|
||
|
// Check against all files.
|
||
|
for _, t := range tf {
|
||
|
if t.overlaps(icmp, umin, umax) {
|
||
|
return true
|
||
|
}
|
||
|
}
|
||
|
return false
|
||
|
}
|
||
|
|
||
|
i := 0
|
||
|
if len(umin) > 0 {
|
||
|
// Find the earliest possible internal key for min.
|
||
|
i = tf.searchMax(icmp, makeInternalKey(nil, umin, keyMaxSeq, keyTypeSeek))
|
||
|
}
|
||
|
if i >= len(tf) {
|
||
|
// Beginning of range is after all files, so no overlap.
|
||
|
return false
|
||
|
}
|
||
|
return !tf[i].before(icmp, umax)
|
||
|
}
|
||
|
|
||
|
// Returns tables whose its key range overlaps with given key range.
|
||
|
// Range will be expanded if ukey found hop across tables.
|
||
|
// If overlapped is true then the search will be restarted if umax
|
||
|
// expanded.
|
||
|
// The dst content will be overwritten.
|
||
|
func (tf tFiles) getOverlaps(dst tFiles, icmp *iComparer, umin, umax []byte, overlapped bool) tFiles {
|
||
|
dst = dst[:0]
|
||
|
for i := 0; i < len(tf); {
|
||
|
t := tf[i]
|
||
|
if t.overlaps(icmp, umin, umax) {
|
||
|
if umin != nil && icmp.uCompare(t.imin.ukey(), umin) < 0 {
|
||
|
umin = t.imin.ukey()
|
||
|
dst = dst[:0]
|
||
|
i = 0
|
||
|
continue
|
||
|
} else if umax != nil && icmp.uCompare(t.imax.ukey(), umax) > 0 {
|
||
|
umax = t.imax.ukey()
|
||
|
// Restart search if it is overlapped.
|
||
|
if overlapped {
|
||
|
dst = dst[:0]
|
||
|
i = 0
|
||
|
continue
|
||
|
}
|
||
|
}
|
||
|
|
||
|
dst = append(dst, t)
|
||
|
}
|
||
|
i++
|
||
|
}
|
||
|
|
||
|
return dst
|
||
|
}
|
||
|
|
||
|
// Returns tables key range.
|
||
|
func (tf tFiles) getRange(icmp *iComparer) (imin, imax internalKey) {
|
||
|
for i, t := range tf {
|
||
|
if i == 0 {
|
||
|
imin, imax = t.imin, t.imax
|
||
|
continue
|
||
|
}
|
||
|
if icmp.Compare(t.imin, imin) < 0 {
|
||
|
imin = t.imin
|
||
|
}
|
||
|
if icmp.Compare(t.imax, imax) > 0 {
|
||
|
imax = t.imax
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Creates iterator index from tables.
|
||
|
func (tf tFiles) newIndexIterator(tops *tOps, icmp *iComparer, slice *util.Range, ro *opt.ReadOptions) iterator.IteratorIndexer {
|
||
|
if slice != nil {
|
||
|
var start, limit int
|
||
|
if slice.Start != nil {
|
||
|
start = tf.searchMax(icmp, internalKey(slice.Start))
|
||
|
}
|
||
|
if slice.Limit != nil {
|
||
|
limit = tf.searchMin(icmp, internalKey(slice.Limit))
|
||
|
} else {
|
||
|
limit = tf.Len()
|
||
|
}
|
||
|
tf = tf[start:limit]
|
||
|
}
|
||
|
return iterator.NewArrayIndexer(&tFilesArrayIndexer{
|
||
|
tFiles: tf,
|
||
|
tops: tops,
|
||
|
icmp: icmp,
|
||
|
slice: slice,
|
||
|
ro: ro,
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// Tables iterator index.
|
||
|
type tFilesArrayIndexer struct {
|
||
|
tFiles
|
||
|
tops *tOps
|
||
|
icmp *iComparer
|
||
|
slice *util.Range
|
||
|
ro *opt.ReadOptions
|
||
|
}
|
||
|
|
||
|
func (a *tFilesArrayIndexer) Search(key []byte) int {
|
||
|
return a.searchMax(a.icmp, internalKey(key))
|
||
|
}
|
||
|
|
||
|
func (a *tFilesArrayIndexer) Get(i int) iterator.Iterator {
|
||
|
if i == 0 || i == a.Len()-1 {
|
||
|
return a.tops.newIterator(a.tFiles[i], a.slice, a.ro)
|
||
|
}
|
||
|
return a.tops.newIterator(a.tFiles[i], nil, a.ro)
|
||
|
}
|
||
|
|
||
|
// Helper type for sortByKey.
|
||
|
type tFilesSortByKey struct {
|
||
|
tFiles
|
||
|
icmp *iComparer
|
||
|
}
|
||
|
|
||
|
func (x *tFilesSortByKey) Less(i, j int) bool {
|
||
|
return x.lessByKey(x.icmp, i, j)
|
||
|
}
|
||
|
|
||
|
// Helper type for sortByNum.
|
||
|
type tFilesSortByNum struct {
|
||
|
tFiles
|
||
|
}
|
||
|
|
||
|
func (x *tFilesSortByNum) Less(i, j int) bool {
|
||
|
return x.lessByNum(i, j)
|
||
|
}
|
||
|
|
||
|
// Table operations.
|
||
|
type tOps struct {
|
||
|
s *session
|
||
|
noSync bool
|
||
|
evictRemoved bool
|
||
|
cache *cache.Cache
|
||
|
bcache *cache.Cache
|
||
|
bpool *util.BufferPool
|
||
|
}
|
||
|
|
||
|
// Creates an empty table and returns table writer.
|
||
|
func (t *tOps) create() (*tWriter, error) {
|
||
|
fd := storage.FileDesc{Type: storage.TypeTable, Num: t.s.allocFileNum()}
|
||
|
fw, err := t.s.stor.Create(fd)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return &tWriter{
|
||
|
t: t,
|
||
|
fd: fd,
|
||
|
w: fw,
|
||
|
tw: table.NewWriter(fw, t.s.o.Options),
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
// Builds table from src iterator.
|
||
|
func (t *tOps) createFrom(src iterator.Iterator) (f *tFile, n int, err error) {
|
||
|
w, err := t.create()
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
defer func() {
|
||
|
if err != nil {
|
||
|
w.drop()
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
for src.Next() {
|
||
|
err = w.append(src.Key(), src.Value())
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
err = src.Error()
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
n = w.tw.EntriesLen()
|
||
|
f, err = w.finish()
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Opens table. It returns a cache handle, which should
|
||
|
// be released after use.
|
||
|
func (t *tOps) open(f *tFile) (ch *cache.Handle, err error) {
|
||
|
ch = t.cache.Get(0, uint64(f.fd.Num), func() (size int, value cache.Value) {
|
||
|
var r storage.Reader
|
||
|
r, err = t.s.stor.Open(f.fd)
|
||
|
if err != nil {
|
||
|
return 0, nil
|
||
|
}
|
||
|
|
||
|
var bcache *cache.NamespaceGetter
|
||
|
if t.bcache != nil {
|
||
|
bcache = &cache.NamespaceGetter{Cache: t.bcache, NS: uint64(f.fd.Num)}
|
||
|
}
|
||
|
|
||
|
var tr *table.Reader
|
||
|
tr, err = table.NewReader(r, f.size, f.fd, bcache, t.bpool, t.s.o.Options)
|
||
|
if err != nil {
|
||
|
r.Close()
|
||
|
return 0, nil
|
||
|
}
|
||
|
return 1, tr
|
||
|
|
||
|
})
|
||
|
if ch == nil && err == nil {
|
||
|
err = ErrClosed
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Finds key/value pair whose key is greater than or equal to the
|
||
|
// given key.
|
||
|
func (t *tOps) find(f *tFile, key []byte, ro *opt.ReadOptions) (rkey, rvalue []byte, err error) {
|
||
|
ch, err := t.open(f)
|
||
|
if err != nil {
|
||
|
return nil, nil, err
|
||
|
}
|
||
|
defer ch.Release()
|
||
|
return ch.Value().(*table.Reader).Find(key, true, ro)
|
||
|
}
|
||
|
|
||
|
// Finds key that is greater than or equal to the given key.
|
||
|
func (t *tOps) findKey(f *tFile, key []byte, ro *opt.ReadOptions) (rkey []byte, err error) {
|
||
|
ch, err := t.open(f)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
defer ch.Release()
|
||
|
return ch.Value().(*table.Reader).FindKey(key, true, ro)
|
||
|
}
|
||
|
|
||
|
// Returns approximate offset of the given key.
|
||
|
func (t *tOps) offsetOf(f *tFile, key []byte) (offset int64, err error) {
|
||
|
ch, err := t.open(f)
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
defer ch.Release()
|
||
|
return ch.Value().(*table.Reader).OffsetOf(key)
|
||
|
}
|
||
|
|
||
|
// Creates an iterator from the given table.
|
||
|
func (t *tOps) newIterator(f *tFile, slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
|
||
|
ch, err := t.open(f)
|
||
|
if err != nil {
|
||
|
return iterator.NewEmptyIterator(err)
|
||
|
}
|
||
|
iter := ch.Value().(*table.Reader).NewIterator(slice, ro)
|
||
|
iter.SetReleaser(ch)
|
||
|
return iter
|
||
|
}
|
||
|
|
||
|
// Removes table from persistent storage. It waits until
|
||
|
// no one use the the table.
|
||
|
func (t *tOps) remove(f *tFile) {
|
||
|
t.cache.Delete(0, uint64(f.fd.Num), func() {
|
||
|
if err := t.s.stor.Remove(f.fd); err != nil {
|
||
|
t.s.logf("table@remove removing @%d %q", f.fd.Num, err)
|
||
|
} else {
|
||
|
t.s.logf("table@remove removed @%d", f.fd.Num)
|
||
|
}
|
||
|
if t.evictRemoved && t.bcache != nil {
|
||
|
t.bcache.EvictNS(uint64(f.fd.Num))
|
||
|
}
|
||
|
})
|
||
|
}
|
||
|
|
||
|
// Closes the table ops instance. It will close all tables,
|
||
|
// regadless still used or not.
|
||
|
func (t *tOps) close() {
|
||
|
t.bpool.Close()
|
||
|
t.cache.Close()
|
||
|
if t.bcache != nil {
|
||
|
t.bcache.CloseWeak()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Creates new initialized table ops instance.
|
||
|
func newTableOps(s *session) *tOps {
|
||
|
var (
|
||
|
cacher cache.Cacher
|
||
|
bcache *cache.Cache
|
||
|
bpool *util.BufferPool
|
||
|
)
|
||
|
if s.o.GetOpenFilesCacheCapacity() > 0 {
|
||
|
cacher = cache.NewLRU(s.o.GetOpenFilesCacheCapacity())
|
||
|
}
|
||
|
if !s.o.GetDisableBlockCache() {
|
||
|
var bcacher cache.Cacher
|
||
|
if s.o.GetBlockCacheCapacity() > 0 {
|
||
|
bcacher = s.o.GetBlockCacher().New(s.o.GetBlockCacheCapacity())
|
||
|
}
|
||
|
bcache = cache.NewCache(bcacher)
|
||
|
}
|
||
|
if !s.o.GetDisableBufferPool() {
|
||
|
bpool = util.NewBufferPool(s.o.GetBlockSize() + 5)
|
||
|
}
|
||
|
return &tOps{
|
||
|
s: s,
|
||
|
noSync: s.o.GetNoSync(),
|
||
|
evictRemoved: s.o.GetBlockCacheEvictRemoved(),
|
||
|
cache: cache.NewCache(cacher),
|
||
|
bcache: bcache,
|
||
|
bpool: bpool,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// tWriter wraps the table writer. It keep track of file descriptor
|
||
|
// and added key range.
|
||
|
type tWriter struct {
|
||
|
t *tOps
|
||
|
|
||
|
fd storage.FileDesc
|
||
|
w storage.Writer
|
||
|
tw *table.Writer
|
||
|
|
||
|
first, last []byte
|
||
|
}
|
||
|
|
||
|
// Append key/value pair to the table.
|
||
|
func (w *tWriter) append(key, value []byte) error {
|
||
|
if w.first == nil {
|
||
|
w.first = append([]byte{}, key...)
|
||
|
}
|
||
|
w.last = append(w.last[:0], key...)
|
||
|
return w.tw.Append(key, value)
|
||
|
}
|
||
|
|
||
|
// Returns true if the table is empty.
|
||
|
func (w *tWriter) empty() bool {
|
||
|
return w.first == nil
|
||
|
}
|
||
|
|
||
|
// Closes the storage.Writer.
|
||
|
func (w *tWriter) close() {
|
||
|
if w.w != nil {
|
||
|
w.w.Close()
|
||
|
w.w = nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Finalizes the table and returns table file.
|
||
|
func (w *tWriter) finish() (f *tFile, err error) {
|
||
|
defer w.close()
|
||
|
err = w.tw.Close()
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
if !w.t.noSync {
|
||
|
err = w.w.Sync()
|
||
|
if err != nil {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
f = newTableFile(w.fd, int64(w.tw.BytesLen()), internalKey(w.first), internalKey(w.last))
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Drops the table.
|
||
|
func (w *tWriter) drop() {
|
||
|
w.close()
|
||
|
w.t.s.stor.Remove(w.fd)
|
||
|
w.t.s.reuseFileNum(w.fd.Num)
|
||
|
w.tw = nil
|
||
|
w.first = nil
|
||
|
w.last = nil
|
||
|
}
|