blog/archive/go/cache/hashtable.go
2024-03-19 02:45:09 +08:00

284 lines
5.5 KiB
Go

package main
import (
"errors"
"hash/fnv"
"math/rand"
"sync"
"sync/atomic"
"time"
)
type entry struct {
key string
value interface{}
expireTime time.Time
operateTime time.Time
next *entry
}
var ExpiredErr = errors.New("key过期")
func (v *entry) expired() bool {
return time.Now().After(v.expireTime)
}
// 过期也会返回value,用于计算内存占用大小
func (v *entry) val() (interface{}, error) {
if !v.expired() {
return v.value, nil
}
return v.value, ExpiredErr
}
// 拉链法实现hashtable
type hashtable struct {
//maxMemory int64
//memory int64
//
//cap int64
//num int64
// 分段锁提高性能
bucket []*bucket
}
func newHashtable(cap int64) *hashtable {
ht := &hashtable{
//maxMemory: maxMemory, memory: memory, cap: cap, num: num,
bucket: make([]*bucket, 0, cap/1024),
}
for i := int64(0); i < (cap / 1024); i++ {
ht.bucket = append(ht.bucket, &bucket{})
}
return ht
}
// 计算key 返回桶位置和数组位置
func (h *hashtable) hash(key string) (int, int) {
hash := fnv.New64()
hash.Write([]byte(key))
nk := int(hash.Sum64())
if nk < 0 {
nk = -nk
}
return nk % len(h.bucket), nk % 1024
}
func (h *hashtable) Get(key string) (interface{}, error) {
b, n := h.hash(key)
return h.bucket[b].get(key, n)
}
func (h *hashtable) Set(key string, val interface{}, expire time.Duration) {
b, n := h.hash(key)
h.bucket[b].set(key, n, val, expire)
}
func (h *hashtable) SetExpireTime(key string, val interface{}, expireTime time.Time) {
b, n := h.hash(key)
h.bucket[b].setExpireTime(key, n, val, expireTime)
}
func (h *hashtable) Del(key string) (interface{}, bool) {
b, n := h.hash(key)
val, ok := h.bucket[b].del(key, n)
return val, ok
}
// flush 不在hashtab层加锁,循环桶清空
func (h *hashtable) flush(mem, num *int64) {
for _, v := range h.bucket {
v.flush(mem, num)
}
}
func (h *hashtable) index(index int) (*entry, bool) {
b, n := index/1024, index%1024
if index >= len(h.bucket)*1024 {
return nil, true
}
return h.bucket[b].getAndDel(n), false
}
//从某个位置开始随机取值,-1为随机所有key
func (h *hashtable) random(count, start int) []*entry {
if start == -1 {
start = rand.Intn(len(h.bucket) * 1024)
}
startBucket := start / 1024
randomBucket := startBucket + rand.Intn(len(h.bucket)-startBucket)
startIndex := 0
if randomBucket == startBucket {
startIndex = start % 1024
startIndex = startIndex + rand.Intn(1024-startIndex)
} else {
startBucket = randomBucket
startIndex = rand.Intn(1024)
}
ret := make([]*entry, 0, count)
for i := 0; i < 1024; i++ {
var e *entry
if startIndex+i >= 1024 {
// 跨过桶了
b := h.bucket[(startBucket+1)%len(h.bucket)]
e = b.index((startIndex + i) % 1024)
} else {
e = h.bucket[startBucket].index(startIndex + i)
}
if e == nil {
continue
}
for {
ret = append(ret, e)
if e.next == nil || len(ret) >= count {
break
}
e = e.next
}
if len(ret) >= count {
break
}
}
return ret
}
type bucket struct {
sync.RWMutex
kv [1024]*entry
// 桶内也对内存 num进行统计,另外一个优化思路,定时对桶内的内存和数量进行统计 然后进行内存释放和扩容
mem int64
num int64
}
// 清理,对总的内存和数量进行计算
func (b *bucket) flush(mem, num *int64) {
b.Lock()
defer b.Unlock()
b.kv = [1024]*entry{}
atomic.AddInt64(mem, -b.mem)
atomic.AddInt64(num, -b.num)
b.mem = 0
b.num = 0
}
func (b *bucket) getAndDel(n int) (v *entry) {
b.Lock()
defer b.Unlock()
v, b.kv[n] = b.kv[n], nil
return
}
// 获取指针
func (b *bucket) getP(key string, n int) (*entry, *entry) {
v := b.kv[n]
prev := v
for v != nil {
if v.key == key {
return v, prev
}
prev = v
v = v.next
}
return nil, nil
}
func (b *bucket) index(n int) *entry {
b.RLock()
defer b.RUnlock()
return b.kv[n]
}
func (b *bucket) get(key string, n int) (interface{}, error) {
b.RLock()
v, prev := b.getP(key, n)
if v == nil {
b.RUnlock()
return nil, errors.New("key不存在")
}
val, err := v.val()
if err != nil {
b.RUnlock()
b.Lock()
defer b.Unlock()
b.mem -= int64(sizeof(val))
b.num--
// 过期删除
if prev == v {
b.kv[n] = nil
} else {
prev.next = v.next
}
return val, err
}
v.operateTime = time.Now()
b.RUnlock()
return val, nil
}
func (b *bucket) setExpireTime(key string, n int, val interface{}, expireTime time.Time) {
b.Lock()
defer b.Unlock()
v := b.kv[n]
b.mem += int64(sizeof(val))
b.num++
if v == nil {
b.kv[n] = &entry{
key: key,
value: val,
expireTime: expireTime,
operateTime: time.Now(),
next: nil,
}
return
}
for v != nil {
if v.key == key {
v.value = val
v.expireTime = expireTime
v.operateTime = time.Now()
return
}
if v.next == nil {
break
}
v = v.next
}
v.next = &entry{
key: key,
value: val,
expireTime: expireTime,
operateTime: time.Now(),
next: nil,
}
}
func (b *bucket) set(key string, n int, val interface{}, expire time.Duration) {
expireTime := time.Now()
if expire == 0 {
// 直接设置一个超大的时间
expireTime = expireTime.Add(time.Hour * 24 * 365 * 10)
} else {
expireTime = expireTime.Add(expire)
}
b.setExpireTime(key, n, val, expireTime)
}
// 删除并返回删除的值
func (b *bucket) del(key string, n int) (interface{}, bool) {
b.Lock()
defer b.Unlock()
v, prev := b.getP(key, n)
if v == nil {
return nil, false
}
b.mem -= int64(sizeof(v.value))
b.num--
if prev == v {
b.kv[n] = nil
} else {
prev.next = v.next
}
return v.value, true
}