705 lines
14 KiB
Go
705 lines
14 KiB
Go
// Copyright (c) 2012, Suryandaru Triandana <syndtr@gmail.com>
|
|
// All rights reserved.
|
|
//
|
|
// Use of this source code is governed by a BSD-style license that can be
|
|
// found in the LICENSE file.
|
|
|
|
// Package cache provides interface and implementation of a cache algorithms.
|
|
package cache
|
|
|
|
import (
|
|
"sync"
|
|
"sync/atomic"
|
|
"unsafe"
|
|
|
|
"github.com/syndtr/goleveldb/leveldb/util"
|
|
)
|
|
|
|
// Cacher provides interface to implements a caching functionality.
|
|
// An implementation must be safe for concurrent use.
|
|
type Cacher interface {
|
|
// Capacity returns cache capacity.
|
|
Capacity() int
|
|
|
|
// SetCapacity sets cache capacity.
|
|
SetCapacity(capacity int)
|
|
|
|
// Promote promotes the 'cache node'.
|
|
Promote(n *Node)
|
|
|
|
// Ban evicts the 'cache node' and prevent subsequent 'promote'.
|
|
Ban(n *Node)
|
|
|
|
// Evict evicts the 'cache node'.
|
|
Evict(n *Node)
|
|
|
|
// EvictNS evicts 'cache node' with the given namespace.
|
|
EvictNS(ns uint64)
|
|
|
|
// EvictAll evicts all 'cache node'.
|
|
EvictAll()
|
|
|
|
// Close closes the 'cache tree'
|
|
Close() error
|
|
}
|
|
|
|
// Value is a 'cacheable object'. It may implements util.Releaser, if
|
|
// so the the Release method will be called once object is released.
|
|
type Value interface{}
|
|
|
|
// NamespaceGetter provides convenient wrapper for namespace.
|
|
type NamespaceGetter struct {
|
|
Cache *Cache
|
|
NS uint64
|
|
}
|
|
|
|
// Get simply calls Cache.Get() method.
|
|
func (g *NamespaceGetter) Get(key uint64, setFunc func() (size int, value Value)) *Handle {
|
|
return g.Cache.Get(g.NS, key, setFunc)
|
|
}
|
|
|
|
// The hash tables implementation is based on:
|
|
// "Dynamic-Sized Nonblocking Hash Tables", by Yujie Liu,
|
|
// Kunlong Zhang, and Michael Spear.
|
|
// ACM Symposium on Principles of Distributed Computing, Jul 2014.
|
|
|
|
const (
|
|
mInitialSize = 1 << 4
|
|
mOverflowThreshold = 1 << 5
|
|
mOverflowGrowThreshold = 1 << 7
|
|
)
|
|
|
|
type mBucket struct {
|
|
mu sync.Mutex
|
|
node []*Node
|
|
frozen bool
|
|
}
|
|
|
|
func (b *mBucket) freeze() []*Node {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
if !b.frozen {
|
|
b.frozen = true
|
|
}
|
|
return b.node
|
|
}
|
|
|
|
func (b *mBucket) get(r *Cache, h *mNode, hash uint32, ns, key uint64, noset bool) (done, added bool, n *Node) {
|
|
b.mu.Lock()
|
|
|
|
if b.frozen {
|
|
b.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
// Scan the node.
|
|
for _, n := range b.node {
|
|
if n.hash == hash && n.ns == ns && n.key == key {
|
|
atomic.AddInt32(&n.ref, 1)
|
|
b.mu.Unlock()
|
|
return true, false, n
|
|
}
|
|
}
|
|
|
|
// Get only.
|
|
if noset {
|
|
b.mu.Unlock()
|
|
return true, false, nil
|
|
}
|
|
|
|
// Create node.
|
|
n = &Node{
|
|
r: r,
|
|
hash: hash,
|
|
ns: ns,
|
|
key: key,
|
|
ref: 1,
|
|
}
|
|
// Add node to bucket.
|
|
b.node = append(b.node, n)
|
|
bLen := len(b.node)
|
|
b.mu.Unlock()
|
|
|
|
// Update counter.
|
|
grow := atomic.AddInt32(&r.nodes, 1) >= h.growThreshold
|
|
if bLen > mOverflowThreshold {
|
|
grow = grow || atomic.AddInt32(&h.overflow, 1) >= mOverflowGrowThreshold
|
|
}
|
|
|
|
// Grow.
|
|
if grow && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) {
|
|
nhLen := len(h.buckets) << 1
|
|
nh := &mNode{
|
|
buckets: make([]unsafe.Pointer, nhLen),
|
|
mask: uint32(nhLen) - 1,
|
|
pred: unsafe.Pointer(h),
|
|
growThreshold: int32(nhLen * mOverflowThreshold),
|
|
shrinkThreshold: int32(nhLen >> 1),
|
|
}
|
|
ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
|
|
if !ok {
|
|
panic("BUG: failed swapping head")
|
|
}
|
|
go nh.initBuckets()
|
|
}
|
|
|
|
return true, true, n
|
|
}
|
|
|
|
func (b *mBucket) delete(r *Cache, h *mNode, hash uint32, ns, key uint64) (done, deleted bool) {
|
|
b.mu.Lock()
|
|
|
|
if b.frozen {
|
|
b.mu.Unlock()
|
|
return
|
|
}
|
|
|
|
// Scan the node.
|
|
var (
|
|
n *Node
|
|
bLen int
|
|
)
|
|
for i := range b.node {
|
|
n = b.node[i]
|
|
if n.ns == ns && n.key == key {
|
|
if atomic.LoadInt32(&n.ref) == 0 {
|
|
deleted = true
|
|
|
|
// Call releaser.
|
|
if n.value != nil {
|
|
if r, ok := n.value.(util.Releaser); ok {
|
|
r.Release()
|
|
}
|
|
n.value = nil
|
|
}
|
|
|
|
// Remove node from bucket.
|
|
b.node = append(b.node[:i], b.node[i+1:]...)
|
|
bLen = len(b.node)
|
|
}
|
|
break
|
|
}
|
|
}
|
|
b.mu.Unlock()
|
|
|
|
if deleted {
|
|
// Call OnDel.
|
|
for _, f := range n.onDel {
|
|
f()
|
|
}
|
|
|
|
// Update counter.
|
|
atomic.AddInt32(&r.size, int32(n.size)*-1)
|
|
shrink := atomic.AddInt32(&r.nodes, -1) < h.shrinkThreshold
|
|
if bLen >= mOverflowThreshold {
|
|
atomic.AddInt32(&h.overflow, -1)
|
|
}
|
|
|
|
// Shrink.
|
|
if shrink && len(h.buckets) > mInitialSize && atomic.CompareAndSwapInt32(&h.resizeInProgess, 0, 1) {
|
|
nhLen := len(h.buckets) >> 1
|
|
nh := &mNode{
|
|
buckets: make([]unsafe.Pointer, nhLen),
|
|
mask: uint32(nhLen) - 1,
|
|
pred: unsafe.Pointer(h),
|
|
growThreshold: int32(nhLen * mOverflowThreshold),
|
|
shrinkThreshold: int32(nhLen >> 1),
|
|
}
|
|
ok := atomic.CompareAndSwapPointer(&r.mHead, unsafe.Pointer(h), unsafe.Pointer(nh))
|
|
if !ok {
|
|
panic("BUG: failed swapping head")
|
|
}
|
|
go nh.initBuckets()
|
|
}
|
|
}
|
|
|
|
return true, deleted
|
|
}
|
|
|
|
type mNode struct {
|
|
buckets []unsafe.Pointer // []*mBucket
|
|
mask uint32
|
|
pred unsafe.Pointer // *mNode
|
|
resizeInProgess int32
|
|
|
|
overflow int32
|
|
growThreshold int32
|
|
shrinkThreshold int32
|
|
}
|
|
|
|
func (n *mNode) initBucket(i uint32) *mBucket {
|
|
if b := (*mBucket)(atomic.LoadPointer(&n.buckets[i])); b != nil {
|
|
return b
|
|
}
|
|
|
|
p := (*mNode)(atomic.LoadPointer(&n.pred))
|
|
if p != nil {
|
|
var node []*Node
|
|
if n.mask > p.mask {
|
|
// Grow.
|
|
pb := (*mBucket)(atomic.LoadPointer(&p.buckets[i&p.mask]))
|
|
if pb == nil {
|
|
pb = p.initBucket(i & p.mask)
|
|
}
|
|
m := pb.freeze()
|
|
// Split nodes.
|
|
for _, x := range m {
|
|
if x.hash&n.mask == i {
|
|
node = append(node, x)
|
|
}
|
|
}
|
|
} else {
|
|
// Shrink.
|
|
pb0 := (*mBucket)(atomic.LoadPointer(&p.buckets[i]))
|
|
if pb0 == nil {
|
|
pb0 = p.initBucket(i)
|
|
}
|
|
pb1 := (*mBucket)(atomic.LoadPointer(&p.buckets[i+uint32(len(n.buckets))]))
|
|
if pb1 == nil {
|
|
pb1 = p.initBucket(i + uint32(len(n.buckets)))
|
|
}
|
|
m0 := pb0.freeze()
|
|
m1 := pb1.freeze()
|
|
// Merge nodes.
|
|
node = make([]*Node, 0, len(m0)+len(m1))
|
|
node = append(node, m0...)
|
|
node = append(node, m1...)
|
|
}
|
|
b := &mBucket{node: node}
|
|
if atomic.CompareAndSwapPointer(&n.buckets[i], nil, unsafe.Pointer(b)) {
|
|
if len(node) > mOverflowThreshold {
|
|
atomic.AddInt32(&n.overflow, int32(len(node)-mOverflowThreshold))
|
|
}
|
|
return b
|
|
}
|
|
}
|
|
|
|
return (*mBucket)(atomic.LoadPointer(&n.buckets[i]))
|
|
}
|
|
|
|
func (n *mNode) initBuckets() {
|
|
for i := range n.buckets {
|
|
n.initBucket(uint32(i))
|
|
}
|
|
atomic.StorePointer(&n.pred, nil)
|
|
}
|
|
|
|
// Cache is a 'cache map'.
|
|
type Cache struct {
|
|
mu sync.RWMutex
|
|
mHead unsafe.Pointer // *mNode
|
|
nodes int32
|
|
size int32
|
|
cacher Cacher
|
|
closed bool
|
|
}
|
|
|
|
// NewCache creates a new 'cache map'. The cacher is optional and
|
|
// may be nil.
|
|
func NewCache(cacher Cacher) *Cache {
|
|
h := &mNode{
|
|
buckets: make([]unsafe.Pointer, mInitialSize),
|
|
mask: mInitialSize - 1,
|
|
growThreshold: int32(mInitialSize * mOverflowThreshold),
|
|
shrinkThreshold: 0,
|
|
}
|
|
for i := range h.buckets {
|
|
h.buckets[i] = unsafe.Pointer(&mBucket{})
|
|
}
|
|
r := &Cache{
|
|
mHead: unsafe.Pointer(h),
|
|
cacher: cacher,
|
|
}
|
|
return r
|
|
}
|
|
|
|
func (r *Cache) getBucket(hash uint32) (*mNode, *mBucket) {
|
|
h := (*mNode)(atomic.LoadPointer(&r.mHead))
|
|
i := hash & h.mask
|
|
b := (*mBucket)(atomic.LoadPointer(&h.buckets[i]))
|
|
if b == nil {
|
|
b = h.initBucket(i)
|
|
}
|
|
return h, b
|
|
}
|
|
|
|
func (r *Cache) delete(n *Node) bool {
|
|
for {
|
|
h, b := r.getBucket(n.hash)
|
|
done, deleted := b.delete(r, h, n.hash, n.ns, n.key)
|
|
if done {
|
|
return deleted
|
|
}
|
|
}
|
|
}
|
|
|
|
// Nodes returns number of 'cache node' in the map.
|
|
func (r *Cache) Nodes() int {
|
|
return int(atomic.LoadInt32(&r.nodes))
|
|
}
|
|
|
|
// Size returns sums of 'cache node' size in the map.
|
|
func (r *Cache) Size() int {
|
|
return int(atomic.LoadInt32(&r.size))
|
|
}
|
|
|
|
// Capacity returns cache capacity.
|
|
func (r *Cache) Capacity() int {
|
|
if r.cacher == nil {
|
|
return 0
|
|
}
|
|
return r.cacher.Capacity()
|
|
}
|
|
|
|
// SetCapacity sets cache capacity.
|
|
func (r *Cache) SetCapacity(capacity int) {
|
|
if r.cacher != nil {
|
|
r.cacher.SetCapacity(capacity)
|
|
}
|
|
}
|
|
|
|
// Get gets 'cache node' with the given namespace and key.
|
|
// If cache node is not found and setFunc is not nil, Get will atomically creates
|
|
// the 'cache node' by calling setFunc. Otherwise Get will returns nil.
|
|
//
|
|
// The returned 'cache handle' should be released after use by calling Release
|
|
// method.
|
|
func (r *Cache) Get(ns, key uint64, setFunc func() (size int, value Value)) *Handle {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
if r.closed {
|
|
return nil
|
|
}
|
|
|
|
hash := murmur32(ns, key, 0xf00)
|
|
for {
|
|
h, b := r.getBucket(hash)
|
|
done, _, n := b.get(r, h, hash, ns, key, setFunc == nil)
|
|
if done {
|
|
if n != nil {
|
|
n.mu.Lock()
|
|
if n.value == nil {
|
|
if setFunc == nil {
|
|
n.mu.Unlock()
|
|
n.unref()
|
|
return nil
|
|
}
|
|
|
|
n.size, n.value = setFunc()
|
|
if n.value == nil {
|
|
n.size = 0
|
|
n.mu.Unlock()
|
|
n.unref()
|
|
return nil
|
|
}
|
|
atomic.AddInt32(&r.size, int32(n.size))
|
|
}
|
|
n.mu.Unlock()
|
|
if r.cacher != nil {
|
|
r.cacher.Promote(n)
|
|
}
|
|
return &Handle{unsafe.Pointer(n)}
|
|
}
|
|
|
|
break
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Delete removes and ban 'cache node' with the given namespace and key.
|
|
// A banned 'cache node' will never inserted into the 'cache tree'. Ban
|
|
// only attributed to the particular 'cache node', so when a 'cache node'
|
|
// is recreated it will not be banned.
|
|
//
|
|
// If onDel is not nil, then it will be executed if such 'cache node'
|
|
// doesn't exist or once the 'cache node' is released.
|
|
//
|
|
// Delete return true is such 'cache node' exist.
|
|
func (r *Cache) Delete(ns, key uint64, onDel func()) bool {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
if r.closed {
|
|
return false
|
|
}
|
|
|
|
hash := murmur32(ns, key, 0xf00)
|
|
for {
|
|
h, b := r.getBucket(hash)
|
|
done, _, n := b.get(r, h, hash, ns, key, true)
|
|
if done {
|
|
if n != nil {
|
|
if onDel != nil {
|
|
n.mu.Lock()
|
|
n.onDel = append(n.onDel, onDel)
|
|
n.mu.Unlock()
|
|
}
|
|
if r.cacher != nil {
|
|
r.cacher.Ban(n)
|
|
}
|
|
n.unref()
|
|
return true
|
|
}
|
|
|
|
break
|
|
}
|
|
}
|
|
|
|
if onDel != nil {
|
|
onDel()
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// Evict evicts 'cache node' with the given namespace and key. This will
|
|
// simply call Cacher.Evict.
|
|
//
|
|
// Evict return true is such 'cache node' exist.
|
|
func (r *Cache) Evict(ns, key uint64) bool {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
if r.closed {
|
|
return false
|
|
}
|
|
|
|
hash := murmur32(ns, key, 0xf00)
|
|
for {
|
|
h, b := r.getBucket(hash)
|
|
done, _, n := b.get(r, h, hash, ns, key, true)
|
|
if done {
|
|
if n != nil {
|
|
if r.cacher != nil {
|
|
r.cacher.Evict(n)
|
|
}
|
|
n.unref()
|
|
return true
|
|
}
|
|
|
|
break
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// EvictNS evicts 'cache node' with the given namespace. This will
|
|
// simply call Cacher.EvictNS.
|
|
func (r *Cache) EvictNS(ns uint64) {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
if r.closed {
|
|
return
|
|
}
|
|
|
|
if r.cacher != nil {
|
|
r.cacher.EvictNS(ns)
|
|
}
|
|
}
|
|
|
|
// EvictAll evicts all 'cache node'. This will simply call Cacher.EvictAll.
|
|
func (r *Cache) EvictAll() {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
if r.closed {
|
|
return
|
|
}
|
|
|
|
if r.cacher != nil {
|
|
r.cacher.EvictAll()
|
|
}
|
|
}
|
|
|
|
// Close closes the 'cache map' and forcefully releases all 'cache node'.
|
|
func (r *Cache) Close() error {
|
|
r.mu.Lock()
|
|
if !r.closed {
|
|
r.closed = true
|
|
|
|
h := (*mNode)(r.mHead)
|
|
h.initBuckets()
|
|
|
|
for i := range h.buckets {
|
|
b := (*mBucket)(h.buckets[i])
|
|
for _, n := range b.node {
|
|
// Call releaser.
|
|
if n.value != nil {
|
|
if r, ok := n.value.(util.Releaser); ok {
|
|
r.Release()
|
|
}
|
|
n.value = nil
|
|
}
|
|
|
|
// Call OnDel.
|
|
for _, f := range n.onDel {
|
|
f()
|
|
}
|
|
n.onDel = nil
|
|
}
|
|
}
|
|
}
|
|
r.mu.Unlock()
|
|
|
|
// Avoid deadlock.
|
|
if r.cacher != nil {
|
|
if err := r.cacher.Close(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// CloseWeak closes the 'cache map' and evict all 'cache node' from cacher, but
|
|
// unlike Close it doesn't forcefully releases 'cache node'.
|
|
func (r *Cache) CloseWeak() error {
|
|
r.mu.Lock()
|
|
if !r.closed {
|
|
r.closed = true
|
|
}
|
|
r.mu.Unlock()
|
|
|
|
// Avoid deadlock.
|
|
if r.cacher != nil {
|
|
r.cacher.EvictAll()
|
|
if err := r.cacher.Close(); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Node is a 'cache node'.
|
|
type Node struct {
|
|
r *Cache
|
|
|
|
hash uint32
|
|
ns, key uint64
|
|
|
|
mu sync.Mutex
|
|
size int
|
|
value Value
|
|
|
|
ref int32
|
|
onDel []func()
|
|
|
|
CacheData unsafe.Pointer
|
|
}
|
|
|
|
// NS returns this 'cache node' namespace.
|
|
func (n *Node) NS() uint64 {
|
|
return n.ns
|
|
}
|
|
|
|
// Key returns this 'cache node' key.
|
|
func (n *Node) Key() uint64 {
|
|
return n.key
|
|
}
|
|
|
|
// Size returns this 'cache node' size.
|
|
func (n *Node) Size() int {
|
|
return n.size
|
|
}
|
|
|
|
// Value returns this 'cache node' value.
|
|
func (n *Node) Value() Value {
|
|
return n.value
|
|
}
|
|
|
|
// Ref returns this 'cache node' ref counter.
|
|
func (n *Node) Ref() int32 {
|
|
return atomic.LoadInt32(&n.ref)
|
|
}
|
|
|
|
// GetHandle returns an handle for this 'cache node'.
|
|
func (n *Node) GetHandle() *Handle {
|
|
if atomic.AddInt32(&n.ref, 1) <= 1 {
|
|
panic("BUG: Node.GetHandle on zero ref")
|
|
}
|
|
return &Handle{unsafe.Pointer(n)}
|
|
}
|
|
|
|
func (n *Node) unref() {
|
|
if atomic.AddInt32(&n.ref, -1) == 0 {
|
|
n.r.delete(n)
|
|
}
|
|
}
|
|
|
|
func (n *Node) unrefLocked() {
|
|
if atomic.AddInt32(&n.ref, -1) == 0 {
|
|
n.r.mu.RLock()
|
|
if !n.r.closed {
|
|
n.r.delete(n)
|
|
}
|
|
n.r.mu.RUnlock()
|
|
}
|
|
}
|
|
|
|
// Handle is a 'cache handle' of a 'cache node'.
|
|
type Handle struct {
|
|
n unsafe.Pointer // *Node
|
|
}
|
|
|
|
// Value returns the value of the 'cache node'.
|
|
func (h *Handle) Value() Value {
|
|
n := (*Node)(atomic.LoadPointer(&h.n))
|
|
if n != nil {
|
|
return n.value
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Release releases this 'cache handle'.
|
|
// It is safe to call release multiple times.
|
|
func (h *Handle) Release() {
|
|
nPtr := atomic.LoadPointer(&h.n)
|
|
if nPtr != nil && atomic.CompareAndSwapPointer(&h.n, nPtr, nil) {
|
|
n := (*Node)(nPtr)
|
|
n.unrefLocked()
|
|
}
|
|
}
|
|
|
|
func murmur32(ns, key uint64, seed uint32) uint32 {
|
|
const (
|
|
m = uint32(0x5bd1e995)
|
|
r = 24
|
|
)
|
|
|
|
k1 := uint32(ns >> 32)
|
|
k2 := uint32(ns)
|
|
k3 := uint32(key >> 32)
|
|
k4 := uint32(key)
|
|
|
|
k1 *= m
|
|
k1 ^= k1 >> r
|
|
k1 *= m
|
|
|
|
k2 *= m
|
|
k2 ^= k2 >> r
|
|
k2 *= m
|
|
|
|
k3 *= m
|
|
k3 ^= k3 >> r
|
|
k3 *= m
|
|
|
|
k4 *= m
|
|
k4 ^= k4 >> r
|
|
k4 *= m
|
|
|
|
h := seed
|
|
|
|
h *= m
|
|
h ^= k1
|
|
h *= m
|
|
h ^= k2
|
|
h *= m
|
|
h ^= k3
|
|
h *= m
|
|
h ^= k4
|
|
|
|
h ^= h >> 13
|
|
h *= m
|
|
h ^= h >> 15
|
|
|
|
return h
|
|
}
|