forked from TrueCloudLab/frostfs-node
[#585] fstree: Add optional file counter
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
baad49990c
commit
58c8722c81
12 changed files with 312 additions and 172 deletions
|
@ -8,6 +8,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||||
|
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
@ -24,59 +25,6 @@ type valueWithTime[V any] struct {
|
||||||
e error
|
e error
|
||||||
}
|
}
|
||||||
|
|
||||||
type locker struct {
|
|
||||||
mtx sync.Mutex
|
|
||||||
waiters int // not protected by mtx, must used outer mutex to update concurrently
|
|
||||||
}
|
|
||||||
|
|
||||||
type keyLocker[K comparable] struct {
|
|
||||||
lockers map[K]*locker
|
|
||||||
lockersMtx sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func newKeyLocker[K comparable]() *keyLocker[K] {
|
|
||||||
return &keyLocker[K]{
|
|
||||||
lockers: make(map[K]*locker),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *keyLocker[K]) LockKey(key K) {
|
|
||||||
l.lockersMtx.Lock()
|
|
||||||
|
|
||||||
if locker, found := l.lockers[key]; found {
|
|
||||||
locker.waiters++
|
|
||||||
l.lockersMtx.Unlock()
|
|
||||||
|
|
||||||
locker.mtx.Lock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
locker := &locker{
|
|
||||||
waiters: 1,
|
|
||||||
}
|
|
||||||
locker.mtx.Lock()
|
|
||||||
|
|
||||||
l.lockers[key] = locker
|
|
||||||
l.lockersMtx.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *keyLocker[K]) UnlockKey(key K) {
|
|
||||||
l.lockersMtx.Lock()
|
|
||||||
defer l.lockersMtx.Unlock()
|
|
||||||
|
|
||||||
locker, found := l.lockers[key]
|
|
||||||
if !found {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if locker.waiters == 1 {
|
|
||||||
delete(l.lockers, key)
|
|
||||||
}
|
|
||||||
locker.waiters--
|
|
||||||
|
|
||||||
locker.mtx.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// entity that provides TTL cache interface.
|
// entity that provides TTL cache interface.
|
||||||
type ttlNetCache[K comparable, V any] struct {
|
type ttlNetCache[K comparable, V any] struct {
|
||||||
ttl time.Duration
|
ttl time.Duration
|
||||||
|
@ -87,7 +35,7 @@ type ttlNetCache[K comparable, V any] struct {
|
||||||
|
|
||||||
netRdr netValueReader[K, V]
|
netRdr netValueReader[K, V]
|
||||||
|
|
||||||
keyLocker *keyLocker[K]
|
keyLocker *utilSync.KeyLocker[K]
|
||||||
}
|
}
|
||||||
|
|
||||||
// complicates netValueReader with TTL caching mechanism.
|
// complicates netValueReader with TTL caching mechanism.
|
||||||
|
@ -100,7 +48,7 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
|
||||||
sz: sz,
|
sz: sz,
|
||||||
cache: cache,
|
cache: cache,
|
||||||
netRdr: netRdr,
|
netRdr: netRdr,
|
||||||
keyLocker: newKeyLocker[K](),
|
keyLocker: utilSync.NewKeyLocker[K](),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,8 +63,8 @@ func (c *ttlNetCache[K, V]) get(key K) (V, error) {
|
||||||
return val.v, val.e
|
return val.v, val.e
|
||||||
}
|
}
|
||||||
|
|
||||||
c.keyLocker.LockKey(key)
|
c.keyLocker.Lock(key)
|
||||||
defer c.keyLocker.UnlockKey(key)
|
defer c.keyLocker.Unlock(key)
|
||||||
|
|
||||||
val, ok = c.cache.Peek(key)
|
val, ok = c.cache.Peek(key)
|
||||||
if ok && time.Since(val.t) < c.ttl {
|
if ok && time.Since(val.t) < c.ttl {
|
||||||
|
@ -135,8 +83,8 @@ func (c *ttlNetCache[K, V]) get(key K) (V, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
|
func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
|
||||||
c.keyLocker.LockKey(k)
|
c.keyLocker.Lock(k)
|
||||||
defer c.keyLocker.UnlockKey(k)
|
defer c.keyLocker.Unlock(k)
|
||||||
|
|
||||||
c.cache.Add(k, &valueWithTime[V]{
|
c.cache.Add(k, &valueWithTime[V]{
|
||||||
v: v,
|
v: v,
|
||||||
|
@ -146,8 +94,8 @@ func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ttlNetCache[K, V]) remove(key K) {
|
func (c *ttlNetCache[K, V]) remove(key K) {
|
||||||
c.keyLocker.LockKey(key)
|
c.keyLocker.Lock(key)
|
||||||
defer c.keyLocker.UnlockKey(key)
|
defer c.keyLocker.Unlock(key)
|
||||||
|
|
||||||
c.cache.Remove(key)
|
c.cache.Remove(key)
|
||||||
}
|
}
|
||||||
|
|
|
@ -13,7 +13,10 @@ func (t *FSTree) Open(ro bool) error {
|
||||||
|
|
||||||
// Init implements common.Storage.
|
// Init implements common.Storage.
|
||||||
func (t *FSTree) Init() error {
|
func (t *FSTree) Init() error {
|
||||||
return util.MkdirAllX(t.RootPath, t.Permissions)
|
if err := util.MkdirAllX(t.RootPath, t.Permissions); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return t.initFileCounter()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close implements common.Storage.
|
// Close implements common.Storage.
|
||||||
|
|
32
pkg/local_object_storage/blobstor/fstree/counter.go
Normal file
32
pkg/local_object_storage/blobstor/fstree/counter.go
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
package fstree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"sync/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
// FileCounter used to count files in FSTree. The implementation must be thread-safe.
|
||||||
|
type FileCounter interface {
|
||||||
|
Set(v uint64)
|
||||||
|
Inc()
|
||||||
|
Dec()
|
||||||
|
}
|
||||||
|
|
||||||
|
type noopCounter struct{}
|
||||||
|
|
||||||
|
func (c *noopCounter) Set(uint64) {}
|
||||||
|
func (c *noopCounter) Inc() {}
|
||||||
|
func (c *noopCounter) Dec() {}
|
||||||
|
|
||||||
|
type SimpleCounter struct {
|
||||||
|
v atomic.Uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSimpleCounter() *SimpleCounter {
|
||||||
|
return &SimpleCounter{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SimpleCounter) Set(v uint64) { c.v.Store(v) }
|
||||||
|
func (c *SimpleCounter) Inc() { c.v.Add(1) }
|
||||||
|
func (c *SimpleCounter) Dec() { c.v.Add(math.MaxUint64) }
|
||||||
|
func (c *SimpleCounter) Value() uint64 { return c.v.Load() }
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -26,6 +27,16 @@ import (
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type keyLock interface {
|
||||||
|
Lock(string)
|
||||||
|
Unlock(string)
|
||||||
|
}
|
||||||
|
|
||||||
|
type noopKeyLock struct{}
|
||||||
|
|
||||||
|
func (l *noopKeyLock) Lock(string) {}
|
||||||
|
func (l *noopKeyLock) Unlock(string) {}
|
||||||
|
|
||||||
// FSTree represents an object storage as a filesystem tree.
|
// FSTree represents an object storage as a filesystem tree.
|
||||||
type FSTree struct {
|
type FSTree struct {
|
||||||
Info
|
Info
|
||||||
|
@ -37,6 +48,12 @@ type FSTree struct {
|
||||||
noSync bool
|
noSync bool
|
||||||
readOnly bool
|
readOnly bool
|
||||||
metrics Metrics
|
metrics Metrics
|
||||||
|
|
||||||
|
fileGuard keyLock
|
||||||
|
fileCounter FileCounter
|
||||||
|
fileCounterEnabled bool
|
||||||
|
|
||||||
|
suffix atomic.Uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// Info groups the information about file storage.
|
// Info groups the information about file storage.
|
||||||
|
@ -67,6 +84,8 @@ func New(opts ...Option) *FSTree {
|
||||||
Depth: 4,
|
Depth: 4,
|
||||||
DirNameLen: DirNameLen,
|
DirNameLen: DirNameLen,
|
||||||
metrics: &noopMetrics{},
|
metrics: &noopMetrics{},
|
||||||
|
fileGuard: &noopKeyLock{},
|
||||||
|
fileCounter: &noopCounter{},
|
||||||
}
|
}
|
||||||
for i := range opts {
|
for i := range opts {
|
||||||
opts[i](f)
|
opts[i](f)
|
||||||
|
@ -244,7 +263,17 @@ func (t *FSTree) Delete(ctx context.Context, prm common.DeletePrm) (common.Delet
|
||||||
|
|
||||||
p := t.treePath(prm.Address)
|
p := t.treePath(prm.Address)
|
||||||
|
|
||||||
|
if t.fileCounterEnabled {
|
||||||
|
t.fileGuard.Lock(p)
|
||||||
err = os.Remove(p)
|
err = os.Remove(p)
|
||||||
|
t.fileGuard.Unlock(p)
|
||||||
|
if err == nil {
|
||||||
|
t.fileCounter.Dec()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = os.Remove(p)
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil && os.IsNotExist(err) {
|
if err != nil && os.IsNotExist(err) {
|
||||||
err = logicerr.Wrap(new(apistatus.ObjectNotFound))
|
err = logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
}
|
||||||
|
@ -317,45 +346,19 @@ func (t *FSTree) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, err
|
||||||
prm.RawData = t.Compress(prm.RawData)
|
prm.RawData = t.Compress(prm.RawData)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Here is a situation:
|
|
||||||
// Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.161Z info log/log.go:13 local object storage operation {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "address": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "op": "PUT", "type": "fstree", "storage_id": ""}
|
|
||||||
// Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.183Z info log/log.go:13 local object storage operation {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "address": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "op": "metabase PUT"}
|
|
||||||
// Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.862Z debug policer/check.go:231 shortage of object copies detected {"component": "Object Policer", "object": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "shortage": 1}
|
|
||||||
// Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.862Z debug shard/get.go:124 object is missing in write-cache {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "addr": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "skip_meta": false}
|
|
||||||
//
|
|
||||||
// 1. We put an object on node 1.
|
|
||||||
// 2. Relentless policer sees that it has only 1 copy and tries to PUT it to node 2.
|
|
||||||
// 3. PUT operation started by client at (1) also puts an object here.
|
|
||||||
// 4. Now we have concurrent writes and one of `Rename` calls will return `no such file` error.
|
|
||||||
// Even more than that, concurrent writes can corrupt data.
|
|
||||||
//
|
|
||||||
// So here is a solution:
|
|
||||||
// 1. Write a file to 'name + 1'.
|
|
||||||
// 2. If it exists, retry with temporary name being 'name + 2'.
|
|
||||||
// 3. Set some reasonable number of attempts.
|
|
||||||
//
|
|
||||||
// It is a bit kludgey, but I am unusually proud about having found this out after
|
|
||||||
// hours of research on linux kernel, dirsync mount option and ext4 FS, turned out
|
|
||||||
// to be so hecking simple.
|
|
||||||
// In a very rare situation we can have multiple partially written copies on disk,
|
|
||||||
// this will be fixed in another issue (we should remove garbage on start).
|
|
||||||
size = len(prm.RawData)
|
size = len(prm.RawData)
|
||||||
const retryCount = 5
|
tmpPath := p + "#" + strconv.FormatUint(t.suffix.Add(1), 10)
|
||||||
for i := 0; i < retryCount; i++ {
|
|
||||||
tmpPath := p + "#" + strconv.FormatUint(uint64(i), 10)
|
|
||||||
err = t.writeAndRename(tmpPath, p, prm.RawData)
|
err = t.writeAndRename(tmpPath, p, prm.RawData)
|
||||||
if err != syscall.EEXIST || i == retryCount-1 {
|
|
||||||
return common.PutRes{StorageID: []byte{}}, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
err = fmt.Errorf("couldn't read file after %d retries", retryCount)
|
|
||||||
// unreachable, but precaution never hurts, especially 1 day before release.
|
|
||||||
return common.PutRes{StorageID: []byte{}}, err
|
return common.PutRes{StorageID: []byte{}}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeAndRename opens tmpPath exclusively, writes data to it and renames it to p.
|
// writeAndRename opens tmpPath exclusively, writes data to it and renames it to p.
|
||||||
func (t *FSTree) writeAndRename(tmpPath, p string, data []byte) error {
|
func (t *FSTree) writeAndRename(tmpPath, p string, data []byte) error {
|
||||||
|
if t.fileCounterEnabled {
|
||||||
|
t.fileGuard.Lock(p)
|
||||||
|
defer t.fileGuard.Unlock(p)
|
||||||
|
}
|
||||||
|
|
||||||
err := t.writeFile(tmpPath, data)
|
err := t.writeFile(tmpPath, data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var pe *fs.PathError
|
var pe *fs.PathError
|
||||||
|
@ -364,10 +367,21 @@ func (t *FSTree) writeAndRename(tmpPath, p string, data []byte) error {
|
||||||
case syscall.ENOSPC:
|
case syscall.ENOSPC:
|
||||||
err = common.ErrNoSpace
|
err = common.ErrNoSpace
|
||||||
_ = os.RemoveAll(tmpPath)
|
_ = os.RemoveAll(tmpPath)
|
||||||
case syscall.EEXIST:
|
|
||||||
return syscall.EEXIST
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if t.fileCounterEnabled {
|
||||||
|
t.fileCounter.Inc()
|
||||||
|
var targetFileExists bool
|
||||||
|
if _, e := os.Stat(p); e == nil {
|
||||||
|
targetFileExists = true
|
||||||
|
}
|
||||||
|
err = os.Rename(tmpPath, p)
|
||||||
|
if err == nil && targetFileExists {
|
||||||
|
t.fileCounter.Dec()
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
err = os.Rename(tmpPath, p)
|
err = os.Rename(tmpPath, p)
|
||||||
}
|
}
|
||||||
|
@ -491,11 +505,23 @@ func (t *FSTree) GetRange(ctx context.Context, prm common.GetRangePrm) (common.G
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NumberOfObjects walks the file tree rooted at FSTree's root
|
// initFileCounter walks the file tree rooted at FSTree's root,
|
||||||
// and returns number of stored objects.
|
// counts total items count, inits counter and returns number of stored objects.
|
||||||
func (t *FSTree) NumberOfObjects() (uint64, error) {
|
func (t *FSTree) initFileCounter() error {
|
||||||
var counter uint64
|
if !t.fileCounterEnabled {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
counter, err := t.countFiles()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t.fileCounter.Set(counter)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *FSTree) countFiles() (uint64, error) {
|
||||||
|
var counter uint64
|
||||||
// it is simpler to just consider every file
|
// it is simpler to just consider every file
|
||||||
// that is not directory as an object
|
// that is not directory as an object
|
||||||
err := filepath.WalkDir(t.RootPath,
|
err := filepath.WalkDir(t.RootPath,
|
||||||
|
|
|
@ -1,10 +1,16 @@
|
||||||
package fstree
|
package fstree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestAddressToString(t *testing.T) {
|
func TestAddressToString(t *testing.T) {
|
||||||
|
@ -28,3 +34,68 @@ func Benchmark_addressFromString(b *testing.B) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestObjectCounter(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
counter := NewSimpleCounter()
|
||||||
|
fst := New(
|
||||||
|
WithPath(t.TempDir()),
|
||||||
|
WithDepth(2),
|
||||||
|
WithDirNameLen(2),
|
||||||
|
WithFileCounter(counter))
|
||||||
|
require.NoError(t, fst.Open(false))
|
||||||
|
require.NoError(t, fst.Init())
|
||||||
|
|
||||||
|
counterValue := counter.Value()
|
||||||
|
require.Equal(t, uint64(0), counterValue)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, fst.Close())
|
||||||
|
}()
|
||||||
|
|
||||||
|
addr := oidtest.Address()
|
||||||
|
obj := objectSDK.New()
|
||||||
|
obj.SetID(addr.Object())
|
||||||
|
obj.SetContainerID(addr.Container())
|
||||||
|
obj.SetPayload([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0})
|
||||||
|
|
||||||
|
var putPrm common.PutPrm
|
||||||
|
putPrm.Address = addr
|
||||||
|
putPrm.RawData, _ = obj.Marshal()
|
||||||
|
|
||||||
|
var getPrm common.GetPrm
|
||||||
|
getPrm.Address = putPrm.Address
|
||||||
|
|
||||||
|
var delPrm common.DeletePrm
|
||||||
|
delPrm.Address = addr
|
||||||
|
|
||||||
|
eg, egCtx := errgroup.WithContext(context.Background())
|
||||||
|
|
||||||
|
eg.Go(func() error {
|
||||||
|
for j := 0; j < 1_000; j++ {
|
||||||
|
_, err := fst.Put(egCtx, putPrm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
eg.Go(func() error {
|
||||||
|
var le logicerr.Logical
|
||||||
|
for j := 0; j < 1_000; j++ {
|
||||||
|
_, err := fst.Delete(egCtx, delPrm)
|
||||||
|
if err != nil && !errors.As(err, &le) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
|
||||||
|
counterValue = counter.Value()
|
||||||
|
realCount, err := fst.countFiles()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, realCount, counterValue)
|
||||||
|
}
|
||||||
|
|
|
@ -2,6 +2,8 @@ package fstree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io/fs"
|
"io/fs"
|
||||||
|
|
||||||
|
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Option func(*FSTree)
|
type Option func(*FSTree)
|
||||||
|
@ -41,3 +43,11 @@ func WithMetrics(m Metrics) Option {
|
||||||
f.metrics = m
|
f.metrics = m
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithFileCounter(c FileCounter) Option {
|
||||||
|
return func(f *FSTree) {
|
||||||
|
f.fileCounterEnabled = true
|
||||||
|
f.fileCounter = c
|
||||||
|
f.fileGuard = utilSync.NewKeyLocker[string]()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -53,7 +53,7 @@ var storages = []storage{
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
desc: "fstree",
|
desc: "fstree_without_object_counter",
|
||||||
create: func(dir string) common.Storage {
|
create: func(dir string) common.Storage {
|
||||||
return fstree.New(
|
return fstree.New(
|
||||||
fstree.WithPath(dir),
|
fstree.WithPath(dir),
|
||||||
|
@ -62,6 +62,17 @@ var storages = []storage{
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
desc: "fstree_with_object_counter",
|
||||||
|
create: func(dir string) common.Storage {
|
||||||
|
return fstree.New(
|
||||||
|
fstree.WithPath(dir),
|
||||||
|
fstree.WithDepth(2),
|
||||||
|
fstree.WithDirNameLen(2),
|
||||||
|
fstree.WithFileCounter(fstree.NewSimpleCounter()),
|
||||||
|
)
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
desc: "blobovniczatree",
|
desc: "blobovniczatree",
|
||||||
create: func(dir string) common.Storage {
|
create: func(dir string) common.Storage {
|
||||||
|
|
|
@ -107,14 +107,13 @@ func (c *cache) Open(readOnly bool) error {
|
||||||
// thus we need to create a channel here.
|
// thus we need to create a channel here.
|
||||||
c.closeCh = make(chan struct{})
|
c.closeCh = make(chan struct{})
|
||||||
|
|
||||||
return metaerr.Wrap(c.setCounters())
|
return metaerr.Wrap(c.initCounters())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Init runs necessary services.
|
// Init runs necessary services.
|
||||||
func (c *cache) Init() error {
|
func (c *cache) Init() error {
|
||||||
c.metrics.SetMode(c.mode)
|
c.metrics.SetMode(c.mode)
|
||||||
c.runFlushLoop()
|
c.runFlushLoop()
|
||||||
c.runDBCounterLoop()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,20 +2,24 @@ package writecachebbolt
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
"go.uber.org/zap"
|
|
||||||
"golang.org/x/sync/errgroup"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *cache) estimateCacheSize() uint64 {
|
func (c *cache) estimateCacheSize() uint64 {
|
||||||
db := c.objCounters.DB() * c.smallObjectSize
|
dbCount := c.objCounters.DB()
|
||||||
fstree := c.objCounters.FS() * c.maxObjectSize
|
fsCount := c.objCounters.FS()
|
||||||
c.metrics.SetEstimateSize(db, fstree)
|
if fsCount > 0 {
|
||||||
return db + fstree
|
fsCount-- //db file
|
||||||
|
}
|
||||||
|
dbSize := dbCount * c.smallObjectSize
|
||||||
|
fsSize := fsCount * c.maxObjectSize
|
||||||
|
c.metrics.SetEstimateSize(dbSize, fsSize)
|
||||||
|
c.metrics.SetActualCounters(dbCount, fsCount)
|
||||||
|
return dbSize + fsSize
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) incSizeDB(sz uint64) uint64 {
|
func (c *cache) incSizeDB(sz uint64) uint64 {
|
||||||
|
@ -26,6 +30,8 @@ func (c *cache) incSizeFS(sz uint64) uint64 {
|
||||||
return sz + c.maxObjectSize
|
return sz + c.maxObjectSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ fstree.FileCounter = &counters{}
|
||||||
|
|
||||||
type counters struct {
|
type counters struct {
|
||||||
cDB, cFS atomic.Uint64
|
cDB, cFS atomic.Uint64
|
||||||
}
|
}
|
||||||
|
@ -38,13 +44,23 @@ func (x *counters) FS() uint64 {
|
||||||
return x.cFS.Load()
|
return x.cFS.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) setCounters() error {
|
// Set implements fstree.ObjectCounter.
|
||||||
|
func (x *counters) Set(v uint64) {
|
||||||
|
x.cFS.Store(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Inc implements fstree.ObjectCounter.
|
||||||
|
func (x *counters) Inc() {
|
||||||
|
x.cFS.Add(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dec implements fstree.ObjectCounter.
|
||||||
|
func (x *counters) Dec() {
|
||||||
|
x.cFS.Add(math.MaxUint64)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *cache) initCounters() error {
|
||||||
var inDB uint64
|
var inDB uint64
|
||||||
var inFS uint64
|
|
||||||
|
|
||||||
var eg errgroup.Group
|
|
||||||
|
|
||||||
eg.Go(func() error {
|
|
||||||
err := c.db.View(func(tx *bbolt.Tx) error {
|
err := c.db.View(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
if b != nil {
|
if b != nil {
|
||||||
|
@ -57,41 +73,4 @@ func (c *cache) setCounters() error {
|
||||||
}
|
}
|
||||||
c.objCounters.cDB.Store(inDB)
|
c.objCounters.cDB.Store(inDB)
|
||||||
return nil
|
return nil
|
||||||
})
|
|
||||||
|
|
||||||
eg.Go(func() error {
|
|
||||||
var err error
|
|
||||||
inFS, err = c.fsTree.NumberOfObjects()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not read write-cache FS counter: %w", err)
|
|
||||||
}
|
|
||||||
if inFS > 0 {
|
|
||||||
inFS-- //small.bolt DB file
|
|
||||||
}
|
|
||||||
c.objCounters.cFS.Store(inFS)
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
if err := eg.Wait(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
c.metrics.SetActualCounters(inDB, inFS)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cache) runDBCounterLoop() {
|
|
||||||
go func() {
|
|
||||||
t := time.NewTicker(time.Second * 30)
|
|
||||||
defer t.Stop()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-t.C:
|
|
||||||
err := c.setCounters()
|
|
||||||
if err != nil {
|
|
||||||
c.log.Warn(logs.FailedToCountWritecacheItems, zap.Error(err))
|
|
||||||
}
|
|
||||||
case <-c.closeCh:
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,10 +54,15 @@ func (c *cache) openStore(readOnly bool) error {
|
||||||
fstree.WithPerm(os.ModePerm),
|
fstree.WithPerm(os.ModePerm),
|
||||||
fstree.WithDepth(1),
|
fstree.WithDepth(1),
|
||||||
fstree.WithDirNameLen(1),
|
fstree.WithDirNameLen(1),
|
||||||
fstree.WithNoSync(c.noSync))
|
fstree.WithNoSync(c.noSync),
|
||||||
|
fstree.WithFileCounter(&c.objCounters),
|
||||||
|
)
|
||||||
if err := c.fsTree.Open(readOnly); err != nil {
|
if err := c.fsTree.Open(readOnly); err != nil {
|
||||||
return fmt.Errorf("could not open FSTree: %w", err)
|
return fmt.Errorf("could not open FSTree: %w", err)
|
||||||
}
|
}
|
||||||
|
if err := c.fsTree.Init(); err != nil {
|
||||||
|
return fmt.Errorf("could not init FSTree: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
56
pkg/util/sync/key_locker.go
Normal file
56
pkg/util/sync/key_locker.go
Normal file
|
@ -0,0 +1,56 @@
|
||||||
|
package sync
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
type locker struct {
|
||||||
|
mtx sync.Mutex
|
||||||
|
waiters int // not protected by mtx, must used outer mutex to update concurrently
|
||||||
|
}
|
||||||
|
|
||||||
|
type KeyLocker[K comparable] struct {
|
||||||
|
lockers map[K]*locker
|
||||||
|
lockersMtx sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewKeyLocker[K comparable]() *KeyLocker[K] {
|
||||||
|
return &KeyLocker[K]{
|
||||||
|
lockers: make(map[K]*locker),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *KeyLocker[K]) Lock(key K) {
|
||||||
|
l.lockersMtx.Lock()
|
||||||
|
|
||||||
|
if locker, found := l.lockers[key]; found {
|
||||||
|
locker.waiters++
|
||||||
|
l.lockersMtx.Unlock()
|
||||||
|
|
||||||
|
locker.mtx.Lock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
locker := &locker{
|
||||||
|
waiters: 1,
|
||||||
|
}
|
||||||
|
locker.mtx.Lock()
|
||||||
|
|
||||||
|
l.lockers[key] = locker
|
||||||
|
l.lockersMtx.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *KeyLocker[K]) Unlock(key K) {
|
||||||
|
l.lockersMtx.Lock()
|
||||||
|
defer l.lockersMtx.Unlock()
|
||||||
|
|
||||||
|
locker, found := l.lockers[key]
|
||||||
|
if !found {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if locker.waiters == 1 {
|
||||||
|
delete(l.lockers, key)
|
||||||
|
}
|
||||||
|
locker.waiters--
|
||||||
|
|
||||||
|
locker.mtx.Unlock()
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package main
|
package sync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -12,11 +12,11 @@ import (
|
||||||
func TestKeyLocker(t *testing.T) {
|
func TestKeyLocker(t *testing.T) {
|
||||||
taken := false
|
taken := false
|
||||||
eg, _ := errgroup.WithContext(context.Background())
|
eg, _ := errgroup.WithContext(context.Background())
|
||||||
keyLocker := newKeyLocker[int]()
|
keyLocker := NewKeyLocker[int]()
|
||||||
for i := 0; i < 100; i++ {
|
for i := 0; i < 100; i++ {
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
keyLocker.LockKey(0)
|
keyLocker.Lock(0)
|
||||||
defer keyLocker.UnlockKey(0)
|
defer keyLocker.Unlock(0)
|
||||||
|
|
||||||
require.False(t, taken)
|
require.False(t, taken)
|
||||||
taken = true
|
taken = true
|
Loading…
Reference in a new issue