376 lines
8.8 KiB
Go
376 lines
8.8 KiB
Go
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
|
|
// All rights reserved.
|
|
//
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file.
|
|
|
|
package table
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
|
|
"github.com/golang/snappy"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/comparer"
|
|
"github.com/syndtr/goleveldb/leveldb/filter"
|
|
"github.com/syndtr/goleveldb/leveldb/opt"
|
|
"github.com/syndtr/goleveldb/leveldb/util"
|
|
)
|
|
|
|
func sharedPrefixLen(a, b []byte) int {
|
|
i, n := 0, len(a)
|
|
if n > len(b) {
|
|
n = len(b)
|
|
}
|
|
for i < n && a[i] == b[i] {
|
|
i++
|
|
}
|
|
return i
|
|
}
|
|
|
|
type blockWriter struct {
|
|
restartInterval int
|
|
buf util.Buffer
|
|
nEntries int
|
|
prevKey []byte
|
|
restarts []uint32
|
|
scratch []byte
|
|
}
|
|
|
|
func (w *blockWriter) append(key, value []byte) {
|
|
nShared := 0
|
|
if w.nEntries%w.restartInterval == 0 {
|
|
w.restarts = append(w.restarts, uint32(w.buf.Len()))
|
|
} else {
|
|
nShared = sharedPrefixLen(w.prevKey, key)
|
|
}
|
|
n := binary.PutUvarint(w.scratch[0:], uint64(nShared))
|
|
n += binary.PutUvarint(w.scratch[n:], uint64(len(key)-nShared))
|
|
n += binary.PutUvarint(w.scratch[n:], uint64(len(value)))
|
|
w.buf.Write(w.scratch[:n])
|
|
w.buf.Write(key[nShared:])
|
|
w.buf.Write(value)
|
|
w.prevKey = append(w.prevKey[:0], key...)
|
|
w.nEntries++
|
|
}
|
|
|
|
func (w *blockWriter) finish() {
|
|
// Write restarts entry.
|
|
if w.nEntries == 0 {
|
|
// Must have at least one restart entry.
|
|
w.restarts = append(w.restarts, 0)
|
|
}
|
|
w.restarts = append(w.restarts, uint32(len(w.restarts)))
|
|
for _, x := range w.restarts {
|
|
buf4 := w.buf.Alloc(4)
|
|
binary.LittleEndian.PutUint32(buf4, x)
|
|
}
|
|
}
|
|
|
|
func (w *blockWriter) reset() {
|
|
w.buf.Reset()
|
|
w.nEntries = 0
|
|
w.restarts = w.restarts[:0]
|
|
}
|
|
|
|
func (w *blockWriter) bytesLen() int {
|
|
restartsLen := len(w.restarts)
|
|
if restartsLen == 0 {
|
|
restartsLen = 1
|
|
}
|
|
return w.buf.Len() + 4*restartsLen + 4
|
|
}
|
|
|
|
type filterWriter struct {
|
|
generator filter.FilterGenerator
|
|
buf util.Buffer
|
|
nKeys int
|
|
offsets []uint32
|
|
}
|
|
|
|
func (w *filterWriter) add(key []byte) {
|
|
if w.generator == nil {
|
|
return
|
|
}
|
|
w.generator.Add(key)
|
|
w.nKeys++
|
|
}
|
|
|
|
func (w *filterWriter) flush(offset uint64) {
|
|
if w.generator == nil {
|
|
return
|
|
}
|
|
for x := int(offset / filterBase); x > len(w.offsets); {
|
|
w.generate()
|
|
}
|
|
}
|
|
|
|
func (w *filterWriter) finish() {
|
|
if w.generator == nil {
|
|
return
|
|
}
|
|
// Generate last keys.
|
|
|
|
if w.nKeys > 0 {
|
|
w.generate()
|
|
}
|
|
w.offsets = append(w.offsets, uint32(w.buf.Len()))
|
|
for _, x := range w.offsets {
|
|
buf4 := w.buf.Alloc(4)
|
|
binary.LittleEndian.PutUint32(buf4, x)
|
|
}
|
|
w.buf.WriteByte(filterBaseLg)
|
|
}
|
|
|
|
func (w *filterWriter) generate() {
|
|
// Record offset.
|
|
w.offsets = append(w.offsets, uint32(w.buf.Len()))
|
|
// Generate filters.
|
|
if w.nKeys > 0 {
|
|
w.generator.Generate(&w.buf)
|
|
w.nKeys = 0
|
|
}
|
|
}
|
|
|
|
// Writer is a table writer.
|
|
type Writer struct {
|
|
writer io.Writer
|
|
err error
|
|
// Options
|
|
cmp comparer.Comparer
|
|
filter filter.Filter
|
|
compression opt.Compression
|
|
blockSize int
|
|
|
|
dataBlock blockWriter
|
|
indexBlock blockWriter
|
|
filterBlock filterWriter
|
|
pendingBH blockHandle
|
|
offset uint64
|
|
nEntries int
|
|
// Scratch allocated enough for 5 uvarint. Block writer should not use
|
|
// first 20-bytes since it will be used to encode block handle, which
|
|
// then passed to the block writer itself.
|
|
scratch [50]byte
|
|
comparerScratch []byte
|
|
compressionScratch []byte
|
|
}
|
|
|
|
func (w *Writer) writeBlock(buf *util.Buffer, compression opt.Compression) (bh blockHandle, err error) {
|
|
// Compress the buffer if necessary.
|
|
var b []byte
|
|
if compression == opt.SnappyCompression {
|
|
// Allocate scratch enough for compression and block trailer.
|
|
if n := snappy.MaxEncodedLen(buf.Len()) + blockTrailerLen; len(w.compressionScratch) < n {
|
|
w.compressionScratch = make([]byte, n)
|
|
}
|
|
compressed := snappy.Encode(w.compressionScratch, buf.Bytes())
|
|
n := len(compressed)
|
|
b = compressed[:n+blockTrailerLen]
|
|
b[n] = blockTypeSnappyCompression
|
|
} else {
|
|
tmp := buf.Alloc(blockTrailerLen)
|
|
tmp[0] = blockTypeNoCompression
|
|
b = buf.Bytes()
|
|
}
|
|
|
|
// Calculate the checksum.
|
|
n := len(b) - 4
|
|
checksum := util.NewCRC(b[:n]).Value()
|
|
binary.LittleEndian.PutUint32(b[n:], checksum)
|
|
|
|
// Write the buffer to the file.
|
|
_, err = w.writer.Write(b)
|
|
if err != nil {
|
|
return
|
|
}
|
|
bh = blockHandle{w.offset, uint64(len(b) - blockTrailerLen)}
|
|
w.offset += uint64(len(b))
|
|
return
|
|
}
|
|
|
|
func (w *Writer) flushPendingBH(key []byte) {
|
|
if w.pendingBH.length == 0 {
|
|
return
|
|
}
|
|
var separator []byte
|
|
if len(key) == 0 {
|
|
separator = w.cmp.Successor(w.comparerScratch[:0], w.dataBlock.prevKey)
|
|
} else {
|
|
separator = w.cmp.Separator(w.comparerScratch[:0], w.dataBlock.prevKey, key)
|
|
}
|
|
if separator == nil {
|
|
separator = w.dataBlock.prevKey
|
|
} else {
|
|
w.comparerScratch = separator
|
|
}
|
|
n := encodeBlockHandle(w.scratch[:20], w.pendingBH)
|
|
// Append the block handle to the index block.
|
|
w.indexBlock.append(separator, w.scratch[:n])
|
|
// Reset prev key of the data block.
|
|
w.dataBlock.prevKey = w.dataBlock.prevKey[:0]
|
|
// Clear pending block handle.
|
|
w.pendingBH = blockHandle{}
|
|
}
|
|
|
|
func (w *Writer) finishBlock() error {
|
|
w.dataBlock.finish()
|
|
bh, err := w.writeBlock(&w.dataBlock.buf, w.compression)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
w.pendingBH = bh
|
|
// Reset the data block.
|
|
w.dataBlock.reset()
|
|
// Flush the filter block.
|
|
w.filterBlock.flush(w.offset)
|
|
return nil
|
|
}
|
|
|
|
// Append appends key/value pair to the table. The keys passed must
|
|
// be in increasing order.
|
|
//
|
|
// It is safe to modify the contents of the arguments after Append returns.
|
|
func (w *Writer) Append(key, value []byte) error {
|
|
if w.err != nil {
|
|
return w.err
|
|
}
|
|
if w.nEntries > 0 && w.cmp.Compare(w.dataBlock.prevKey, key) >= 0 {
|
|
w.err = fmt.Errorf("leveldb/table: Writer: keys are not in increasing order: %q, %q", w.dataBlock.prevKey, key)
|
|
return w.err
|
|
}
|
|
|
|
w.flushPendingBH(key)
|
|
// Append key/value pair to the data block.
|
|
w.dataBlock.append(key, value)
|
|
// Add key to the filter block.
|
|
w.filterBlock.add(key)
|
|
|
|
// Finish the data block if block size target reached.
|
|
if w.dataBlock.bytesLen() >= w.blockSize {
|
|
if err := w.finishBlock(); err != nil {
|
|
w.err = err
|
|
return w.err
|
|
}
|
|
}
|
|
w.nEntries++
|
|
return nil
|
|
}
|
|
|
|
// BlocksLen returns number of blocks written so far.
|
|
func (w *Writer) BlocksLen() int {
|
|
n := w.indexBlock.nEntries
|
|
if w.pendingBH.length > 0 {
|
|
// Includes the pending block.
|
|
n++
|
|
}
|
|
return n
|
|
}
|
|
|
|
// EntriesLen returns number of entries added so far.
|
|
func (w *Writer) EntriesLen() int {
|
|
return w.nEntries
|
|
}
|
|
|
|
// BytesLen returns number of bytes written so far.
|
|
func (w *Writer) BytesLen() int {
|
|
return int(w.offset)
|
|
}
|
|
|
|
// Close will finalize the table. Calling Append is not possible
|
|
// after Close, but calling BlocksLen, EntriesLen and BytesLen
|
|
// is still possible.
|
|
func (w *Writer) Close() error {
|
|
if w.err != nil {
|
|
return w.err
|
|
}
|
|
|
|
// Write the last data block. Or empty data block if there
|
|
// aren't any data blocks at all.
|
|
if w.dataBlock.nEntries > 0 || w.nEntries == 0 {
|
|
if err := w.finishBlock(); err != nil {
|
|
w.err = err
|
|
return w.err
|
|
}
|
|
}
|
|
w.flushPendingBH(nil)
|
|
|
|
// Write the filter block.
|
|
var filterBH blockHandle
|
|
w.filterBlock.finish()
|
|
if buf := &w.filterBlock.buf; buf.Len() > 0 {
|
|
filterBH, w.err = w.writeBlock(buf, opt.NoCompression)
|
|
if w.err != nil {
|
|
return w.err
|
|
}
|
|
}
|
|
|
|
// Write the metaindex block.
|
|
if filterBH.length > 0 {
|
|
key := []byte("filter." + w.filter.Name())
|
|
n := encodeBlockHandle(w.scratch[:20], filterBH)
|
|
w.dataBlock.append(key, w.scratch[:n])
|
|
}
|
|
w.dataBlock.finish()
|
|
metaindexBH, err := w.writeBlock(&w.dataBlock.buf, w.compression)
|
|
if err != nil {
|
|
w.err = err
|
|
return w.err
|
|
}
|
|
|
|
// Write the index block.
|
|
w.indexBlock.finish()
|
|
indexBH, err := w.writeBlock(&w.indexBlock.buf, w.compression)
|
|
if err != nil {
|
|
w.err = err
|
|
return w.err
|
|
}
|
|
|
|
// Write the table footer.
|
|
footer := w.scratch[:footerLen]
|
|
for i := range footer {
|
|
footer[i] = 0
|
|
}
|
|
n := encodeBlockHandle(footer, metaindexBH)
|
|
encodeBlockHandle(footer[n:], indexBH)
|
|
copy(footer[footerLen-len(magic):], magic)
|
|
if _, err := w.writer.Write(footer); err != nil {
|
|
w.err = err
|
|
return w.err
|
|
}
|
|
w.offset += footerLen
|
|
|
|
w.err = errors.New("leveldb/table: writer is closed")
|
|
return nil
|
|
}
|
|
|
|
// NewWriter creates a new initialized table writer for the file.
|
|
//
|
|
// Table writer is not safe for concurrent use.
|
|
func NewWriter(f io.Writer, o *opt.Options) *Writer {
|
|
w := &Writer{
|
|
writer: f,
|
|
cmp: o.GetComparer(),
|
|
filter: o.GetFilter(),
|
|
compression: o.GetCompression(),
|
|
blockSize: o.GetBlockSize(),
|
|
comparerScratch: make([]byte, 0),
|
|
}
|
|
// data block
|
|
w.dataBlock.restartInterval = o.GetBlockRestartInterval()
|
|
// The first 20-bytes are used for encoding block handle.
|
|
w.dataBlock.scratch = w.scratch[20:]
|
|
// index block
|
|
w.indexBlock.restartInterval = 1
|
|
w.indexBlock.scratch = w.scratch[20:]
|
|
// filter block
|
|
if w.filter != nil {
|
|
w.filterBlock.generator = w.filter.NewGenerator()
|
|
w.filterBlock.flush(0)
|
|
}
|
|
return w
|
|
}
|