From 58c8722c815320206cc14bfa878772f1b8c6efc8 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Fri, 11 Aug 2023 11:32:43 +0300 Subject: [PATCH] [#585] fstree: Add optional file counter Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/cache.go | 70 ++--------- .../blobstor/fstree/control.go | 5 +- .../blobstor/fstree/counter.go | 32 +++++ .../blobstor/fstree/fstree.go | 114 +++++++++++------- .../blobstor/fstree/fstree_test.go | 71 +++++++++++ .../blobstor/fstree/option.go | 10 ++ .../blobstor/perf_test.go | 13 +- .../writecache/writecachebbolt/cachebbolt.go | 3 +- .../writecache/writecachebbolt/state.go | 95 ++++++--------- .../writecache/writecachebbolt/storage.go | 7 +- pkg/util/sync/key_locker.go | 56 +++++++++ .../util/sync/key_locker_test.go | 8 +- 12 files changed, 312 insertions(+), 172 deletions(-) create mode 100644 pkg/local_object_storage/blobstor/fstree/counter.go create mode 100644 pkg/util/sync/key_locker.go rename cmd/frostfs-node/cache_test.go => pkg/util/sync/key_locker_test.go (82%) diff --git a/cmd/frostfs-node/cache.go b/cmd/frostfs-node/cache.go index fa5513640..6d138d894 100644 --- a/cmd/frostfs-node/cache.go +++ b/cmd/frostfs-node/cache.go @@ -8,6 +8,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container" putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" + utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" @@ -24,59 +25,6 @@ type valueWithTime[V any] struct { e error } -type locker struct { - mtx sync.Mutex - waiters int // not protected by mtx, must used outer mutex to update concurrently -} - -type keyLocker[K comparable] struct { - lockers map[K]*locker - lockersMtx sync.Mutex -} - -func newKeyLocker[K comparable]() *keyLocker[K] { - return &keyLocker[K]{ - lockers: make(map[K]*locker), - } -} - -func (l *keyLocker[K]) LockKey(key K) { - l.lockersMtx.Lock() - - if locker, found := l.lockers[key]; found { - locker.waiters++ - l.lockersMtx.Unlock() - - locker.mtx.Lock() - return - } - - locker := &locker{ - waiters: 1, - } - locker.mtx.Lock() - - l.lockers[key] = locker - l.lockersMtx.Unlock() -} - -func (l *keyLocker[K]) UnlockKey(key K) { - l.lockersMtx.Lock() - defer l.lockersMtx.Unlock() - - locker, found := l.lockers[key] - if !found { - return - } - - if locker.waiters == 1 { - delete(l.lockers, key) - } - locker.waiters-- - - locker.mtx.Unlock() -} - // entity that provides TTL cache interface. type ttlNetCache[K comparable, V any] struct { ttl time.Duration @@ -87,7 +35,7 @@ type ttlNetCache[K comparable, V any] struct { netRdr netValueReader[K, V] - keyLocker *keyLocker[K] + keyLocker *utilSync.KeyLocker[K] } // complicates netValueReader with TTL caching mechanism. @@ -100,7 +48,7 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n sz: sz, cache: cache, netRdr: netRdr, - keyLocker: newKeyLocker[K](), + keyLocker: utilSync.NewKeyLocker[K](), } } @@ -115,8 +63,8 @@ func (c *ttlNetCache[K, V]) get(key K) (V, error) { return val.v, val.e } - c.keyLocker.LockKey(key) - defer c.keyLocker.UnlockKey(key) + c.keyLocker.Lock(key) + defer c.keyLocker.Unlock(key) val, ok = c.cache.Peek(key) if ok && time.Since(val.t) < c.ttl { @@ -135,8 +83,8 @@ func (c *ttlNetCache[K, V]) get(key K) (V, error) { } func (c *ttlNetCache[K, V]) set(k K, v V, e error) { - c.keyLocker.LockKey(k) - defer c.keyLocker.UnlockKey(k) + c.keyLocker.Lock(k) + defer c.keyLocker.Unlock(k) c.cache.Add(k, &valueWithTime[V]{ v: v, @@ -146,8 +94,8 @@ func (c *ttlNetCache[K, V]) set(k K, v V, e error) { } func (c *ttlNetCache[K, V]) remove(key K) { - c.keyLocker.LockKey(key) - defer c.keyLocker.UnlockKey(key) + c.keyLocker.Lock(key) + defer c.keyLocker.Unlock(key) c.cache.Remove(key) } diff --git a/pkg/local_object_storage/blobstor/fstree/control.go b/pkg/local_object_storage/blobstor/fstree/control.go index f41b7aacd..c56312d38 100644 --- a/pkg/local_object_storage/blobstor/fstree/control.go +++ b/pkg/local_object_storage/blobstor/fstree/control.go @@ -13,7 +13,10 @@ func (t *FSTree) Open(ro bool) error { // Init implements common.Storage. func (t *FSTree) Init() error { - return util.MkdirAllX(t.RootPath, t.Permissions) + if err := util.MkdirAllX(t.RootPath, t.Permissions); err != nil { + return err + } + return t.initFileCounter() } // Close implements common.Storage. diff --git a/pkg/local_object_storage/blobstor/fstree/counter.go b/pkg/local_object_storage/blobstor/fstree/counter.go new file mode 100644 index 000000000..70b346093 --- /dev/null +++ b/pkg/local_object_storage/blobstor/fstree/counter.go @@ -0,0 +1,32 @@ +package fstree + +import ( + "math" + "sync/atomic" +) + +// FileCounter used to count files in FSTree. The implementation must be thread-safe. +type FileCounter interface { + Set(v uint64) + Inc() + Dec() +} + +type noopCounter struct{} + +func (c *noopCounter) Set(uint64) {} +func (c *noopCounter) Inc() {} +func (c *noopCounter) Dec() {} + +type SimpleCounter struct { + v atomic.Uint64 +} + +func NewSimpleCounter() *SimpleCounter { + return &SimpleCounter{} +} + +func (c *SimpleCounter) Set(v uint64) { c.v.Store(v) } +func (c *SimpleCounter) Inc() { c.v.Add(1) } +func (c *SimpleCounter) Dec() { c.v.Add(math.MaxUint64) } +func (c *SimpleCounter) Value() uint64 { return c.v.Load() } diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 5a9adde0f..243a7239e 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -10,6 +10,7 @@ import ( "path/filepath" "strconv" "strings" + "sync/atomic" "syscall" "time" @@ -26,6 +27,16 @@ import ( "go.opentelemetry.io/otel/trace" ) +type keyLock interface { + Lock(string) + Unlock(string) +} + +type noopKeyLock struct{} + +func (l *noopKeyLock) Lock(string) {} +func (l *noopKeyLock) Unlock(string) {} + // FSTree represents an object storage as a filesystem tree. type FSTree struct { Info @@ -37,6 +48,12 @@ type FSTree struct { noSync bool readOnly bool metrics Metrics + + fileGuard keyLock + fileCounter FileCounter + fileCounterEnabled bool + + suffix atomic.Uint64 } // Info groups the information about file storage. @@ -63,10 +80,12 @@ func New(opts ...Option) *FSTree { Permissions: 0700, RootPath: "./", }, - Config: nil, - Depth: 4, - DirNameLen: DirNameLen, - metrics: &noopMetrics{}, + Config: nil, + Depth: 4, + DirNameLen: DirNameLen, + metrics: &noopMetrics{}, + fileGuard: &noopKeyLock{}, + fileCounter: &noopCounter{}, } for i := range opts { opts[i](f) @@ -244,7 +263,17 @@ func (t *FSTree) Delete(ctx context.Context, prm common.DeletePrm) (common.Delet p := t.treePath(prm.Address) - err = os.Remove(p) + if t.fileCounterEnabled { + t.fileGuard.Lock(p) + err = os.Remove(p) + t.fileGuard.Unlock(p) + if err == nil { + t.fileCounter.Dec() + } + } else { + err = os.Remove(p) + } + if err != nil && os.IsNotExist(err) { err = logicerr.Wrap(new(apistatus.ObjectNotFound)) } @@ -317,45 +346,19 @@ func (t *FSTree) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, err prm.RawData = t.Compress(prm.RawData) } - // Here is a situation: - // Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.161Z info log/log.go:13 local object storage operation {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "address": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "op": "PUT", "type": "fstree", "storage_id": ""} - // Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.183Z info log/log.go:13 local object storage operation {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "address": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "op": "metabase PUT"} - // Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.862Z debug policer/check.go:231 shortage of object copies detected {"component": "Object Policer", "object": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "shortage": 1} - // Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.862Z debug shard/get.go:124 object is missing in write-cache {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "addr": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "skip_meta": false} - // - // 1. We put an object on node 1. - // 2. Relentless policer sees that it has only 1 copy and tries to PUT it to node 2. - // 3. PUT operation started by client at (1) also puts an object here. - // 4. Now we have concurrent writes and one of `Rename` calls will return `no such file` error. - // Even more than that, concurrent writes can corrupt data. - // - // So here is a solution: - // 1. Write a file to 'name + 1'. - // 2. If it exists, retry with temporary name being 'name + 2'. - // 3. Set some reasonable number of attempts. - // - // It is a bit kludgey, but I am unusually proud about having found this out after - // hours of research on linux kernel, dirsync mount option and ext4 FS, turned out - // to be so hecking simple. - // In a very rare situation we can have multiple partially written copies on disk, - // this will be fixed in another issue (we should remove garbage on start). size = len(prm.RawData) - const retryCount = 5 - for i := 0; i < retryCount; i++ { - tmpPath := p + "#" + strconv.FormatUint(uint64(i), 10) - err = t.writeAndRename(tmpPath, p, prm.RawData) - if err != syscall.EEXIST || i == retryCount-1 { - return common.PutRes{StorageID: []byte{}}, err - } - } - - err = fmt.Errorf("couldn't read file after %d retries", retryCount) - // unreachable, but precaution never hurts, especially 1 day before release. + tmpPath := p + "#" + strconv.FormatUint(t.suffix.Add(1), 10) + err = t.writeAndRename(tmpPath, p, prm.RawData) return common.PutRes{StorageID: []byte{}}, err } // writeAndRename opens tmpPath exclusively, writes data to it and renames it to p. func (t *FSTree) writeAndRename(tmpPath, p string, data []byte) error { + if t.fileCounterEnabled { + t.fileGuard.Lock(p) + defer t.fileGuard.Unlock(p) + } + err := t.writeFile(tmpPath, data) if err != nil { var pe *fs.PathError @@ -364,10 +367,21 @@ func (t *FSTree) writeAndRename(tmpPath, p string, data []byte) error { case syscall.ENOSPC: err = common.ErrNoSpace _ = os.RemoveAll(tmpPath) - case syscall.EEXIST: - return syscall.EEXIST } } + return err + } + + if t.fileCounterEnabled { + t.fileCounter.Inc() + var targetFileExists bool + if _, e := os.Stat(p); e == nil { + targetFileExists = true + } + err = os.Rename(tmpPath, p) + if err == nil && targetFileExists { + t.fileCounter.Dec() + } } else { err = os.Rename(tmpPath, p) } @@ -491,11 +505,23 @@ func (t *FSTree) GetRange(ctx context.Context, prm common.GetRangePrm) (common.G }, nil } -// NumberOfObjects walks the file tree rooted at FSTree's root -// and returns number of stored objects. -func (t *FSTree) NumberOfObjects() (uint64, error) { - var counter uint64 +// initFileCounter walks the file tree rooted at FSTree's root, +// counts total items count, inits counter and returns number of stored objects. +func (t *FSTree) initFileCounter() error { + if !t.fileCounterEnabled { + return nil + } + counter, err := t.countFiles() + if err != nil { + return err + } + t.fileCounter.Set(counter) + return nil +} + +func (t *FSTree) countFiles() (uint64, error) { + var counter uint64 // it is simpler to just consider every file // that is not directory as an object err := filepath.WalkDir(t.RootPath, diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_test.go b/pkg/local_object_storage/blobstor/fstree/fstree_test.go index 0e5525e77..b81ce43f1 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_test.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_test.go @@ -1,10 +1,16 @@ package fstree import ( + "context" + "errors" "testing" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func TestAddressToString(t *testing.T) { @@ -28,3 +34,68 @@ func Benchmark_addressFromString(b *testing.B) { } } } + +func TestObjectCounter(t *testing.T) { + t.Parallel() + counter := NewSimpleCounter() + fst := New( + WithPath(t.TempDir()), + WithDepth(2), + WithDirNameLen(2), + WithFileCounter(counter)) + require.NoError(t, fst.Open(false)) + require.NoError(t, fst.Init()) + + counterValue := counter.Value() + require.Equal(t, uint64(0), counterValue) + + defer func() { + require.NoError(t, fst.Close()) + }() + + addr := oidtest.Address() + obj := objectSDK.New() + obj.SetID(addr.Object()) + obj.SetContainerID(addr.Container()) + obj.SetPayload([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0}) + + var putPrm common.PutPrm + putPrm.Address = addr + putPrm.RawData, _ = obj.Marshal() + + var getPrm common.GetPrm + getPrm.Address = putPrm.Address + + var delPrm common.DeletePrm + delPrm.Address = addr + + eg, egCtx := errgroup.WithContext(context.Background()) + + eg.Go(func() error { + for j := 0; j < 1_000; j++ { + _, err := fst.Put(egCtx, putPrm) + if err != nil { + return err + } + } + return nil + }) + + eg.Go(func() error { + var le logicerr.Logical + for j := 0; j < 1_000; j++ { + _, err := fst.Delete(egCtx, delPrm) + if err != nil && !errors.As(err, &le) { + return err + } + } + return nil + }) + + require.NoError(t, eg.Wait()) + + counterValue = counter.Value() + realCount, err := fst.countFiles() + require.NoError(t, err) + require.Equal(t, realCount, counterValue) +} diff --git a/pkg/local_object_storage/blobstor/fstree/option.go b/pkg/local_object_storage/blobstor/fstree/option.go index 52c8718c2..21d46ac4d 100644 --- a/pkg/local_object_storage/blobstor/fstree/option.go +++ b/pkg/local_object_storage/blobstor/fstree/option.go @@ -2,6 +2,8 @@ package fstree import ( "io/fs" + + utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync" ) type Option func(*FSTree) @@ -41,3 +43,11 @@ func WithMetrics(m Metrics) Option { f.metrics = m } } + +func WithFileCounter(c FileCounter) Option { + return func(f *FSTree) { + f.fileCounterEnabled = true + f.fileCounter = c + f.fileGuard = utilSync.NewKeyLocker[string]() + } +} diff --git a/pkg/local_object_storage/blobstor/perf_test.go b/pkg/local_object_storage/blobstor/perf_test.go index 5245146cb..c773ea0ee 100644 --- a/pkg/local_object_storage/blobstor/perf_test.go +++ b/pkg/local_object_storage/blobstor/perf_test.go @@ -53,7 +53,7 @@ var storages = []storage{ }, }, { - desc: "fstree", + desc: "fstree_without_object_counter", create: func(dir string) common.Storage { return fstree.New( fstree.WithPath(dir), @@ -62,6 +62,17 @@ var storages = []storage{ ) }, }, + { + desc: "fstree_with_object_counter", + create: func(dir string) common.Storage { + return fstree.New( + fstree.WithPath(dir), + fstree.WithDepth(2), + fstree.WithDirNameLen(2), + fstree.WithFileCounter(fstree.NewSimpleCounter()), + ) + }, + }, { desc: "blobovniczatree", create: func(dir string) common.Storage { diff --git a/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go b/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go index 69e418bb3..407d1a9ce 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/cachebbolt.go @@ -107,14 +107,13 @@ func (c *cache) Open(readOnly bool) error { // thus we need to create a channel here. c.closeCh = make(chan struct{}) - return metaerr.Wrap(c.setCounters()) + return metaerr.Wrap(c.initCounters()) } // Init runs necessary services. func (c *cache) Init() error { c.metrics.SetMode(c.mode) c.runFlushLoop() - c.runDBCounterLoop() return nil } diff --git a/pkg/local_object_storage/writecache/writecachebbolt/state.go b/pkg/local_object_storage/writecache/writecachebbolt/state.go index 49b802aef..91d54b0ea 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/state.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/state.go @@ -2,20 +2,24 @@ package writecachebbolt import ( "fmt" + "math" "sync/atomic" - "time" - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree" "go.etcd.io/bbolt" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" ) func (c *cache) estimateCacheSize() uint64 { - db := c.objCounters.DB() * c.smallObjectSize - fstree := c.objCounters.FS() * c.maxObjectSize - c.metrics.SetEstimateSize(db, fstree) - return db + fstree + dbCount := c.objCounters.DB() + fsCount := c.objCounters.FS() + if fsCount > 0 { + fsCount-- //db file + } + dbSize := dbCount * c.smallObjectSize + fsSize := fsCount * c.maxObjectSize + c.metrics.SetEstimateSize(dbSize, fsSize) + c.metrics.SetActualCounters(dbCount, fsCount) + return dbSize + fsSize } func (c *cache) incSizeDB(sz uint64) uint64 { @@ -26,6 +30,8 @@ func (c *cache) incSizeFS(sz uint64) uint64 { return sz + c.maxObjectSize } +var _ fstree.FileCounter = &counters{} + type counters struct { cDB, cFS atomic.Uint64 } @@ -38,60 +44,33 @@ func (x *counters) FS() uint64 { return x.cFS.Load() } -func (c *cache) setCounters() error { +// Set implements fstree.ObjectCounter. +func (x *counters) Set(v uint64) { + x.cFS.Store(v) +} + +// Inc implements fstree.ObjectCounter. +func (x *counters) Inc() { + x.cFS.Add(1) +} + +// Dec implements fstree.ObjectCounter. +func (x *counters) Dec() { + x.cFS.Add(math.MaxUint64) +} + +func (c *cache) initCounters() error { var inDB uint64 - var inFS uint64 - - var eg errgroup.Group - - eg.Go(func() error { - err := c.db.View(func(tx *bbolt.Tx) error { - b := tx.Bucket(defaultBucket) - if b != nil { - inDB = uint64(b.Stats().KeyN) - } - return nil - }) - if err != nil { - return fmt.Errorf("could not read write-cache DB counter: %w", err) + err := c.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(defaultBucket) + if b != nil { + inDB = uint64(b.Stats().KeyN) } - c.objCounters.cDB.Store(inDB) return nil }) - - eg.Go(func() error { - var err error - inFS, err = c.fsTree.NumberOfObjects() - if err != nil { - return fmt.Errorf("could not read write-cache FS counter: %w", err) - } - if inFS > 0 { - inFS-- //small.bolt DB file - } - c.objCounters.cFS.Store(inFS) - return nil - }) - if err := eg.Wait(); err != nil { - return err + if err != nil { + return fmt.Errorf("could not read write-cache DB counter: %w", err) } - c.metrics.SetActualCounters(inDB, inFS) + c.objCounters.cDB.Store(inDB) return nil } - -func (c *cache) runDBCounterLoop() { - go func() { - t := time.NewTicker(time.Second * 30) - defer t.Stop() - for { - select { - case <-t.C: - err := c.setCounters() - if err != nil { - c.log.Warn(logs.FailedToCountWritecacheItems, zap.Error(err)) - } - case <-c.closeCh: - return - } - } - }() -} diff --git a/pkg/local_object_storage/writecache/writecachebbolt/storage.go b/pkg/local_object_storage/writecache/writecachebbolt/storage.go index c651b7445..d79eb2963 100644 --- a/pkg/local_object_storage/writecache/writecachebbolt/storage.go +++ b/pkg/local_object_storage/writecache/writecachebbolt/storage.go @@ -54,10 +54,15 @@ func (c *cache) openStore(readOnly bool) error { fstree.WithPerm(os.ModePerm), fstree.WithDepth(1), fstree.WithDirNameLen(1), - fstree.WithNoSync(c.noSync)) + fstree.WithNoSync(c.noSync), + fstree.WithFileCounter(&c.objCounters), + ) if err := c.fsTree.Open(readOnly); err != nil { return fmt.Errorf("could not open FSTree: %w", err) } + if err := c.fsTree.Init(); err != nil { + return fmt.Errorf("could not init FSTree: %w", err) + } return nil } diff --git a/pkg/util/sync/key_locker.go b/pkg/util/sync/key_locker.go new file mode 100644 index 000000000..97de0386d --- /dev/null +++ b/pkg/util/sync/key_locker.go @@ -0,0 +1,56 @@ +package sync + +import "sync" + +type locker struct { + mtx sync.Mutex + waiters int // not protected by mtx, must used outer mutex to update concurrently +} + +type KeyLocker[K comparable] struct { + lockers map[K]*locker + lockersMtx sync.Mutex +} + +func NewKeyLocker[K comparable]() *KeyLocker[K] { + return &KeyLocker[K]{ + lockers: make(map[K]*locker), + } +} + +func (l *KeyLocker[K]) Lock(key K) { + l.lockersMtx.Lock() + + if locker, found := l.lockers[key]; found { + locker.waiters++ + l.lockersMtx.Unlock() + + locker.mtx.Lock() + return + } + + locker := &locker{ + waiters: 1, + } + locker.mtx.Lock() + + l.lockers[key] = locker + l.lockersMtx.Unlock() +} + +func (l *KeyLocker[K]) Unlock(key K) { + l.lockersMtx.Lock() + defer l.lockersMtx.Unlock() + + locker, found := l.lockers[key] + if !found { + return + } + + if locker.waiters == 1 { + delete(l.lockers, key) + } + locker.waiters-- + + locker.mtx.Unlock() +} diff --git a/cmd/frostfs-node/cache_test.go b/pkg/util/sync/key_locker_test.go similarity index 82% rename from cmd/frostfs-node/cache_test.go rename to pkg/util/sync/key_locker_test.go index a3e1c4ea6..3b3e6a694 100644 --- a/cmd/frostfs-node/cache_test.go +++ b/pkg/util/sync/key_locker_test.go @@ -1,4 +1,4 @@ -package main +package sync import ( "context" @@ -12,11 +12,11 @@ import ( func TestKeyLocker(t *testing.T) { taken := false eg, _ := errgroup.WithContext(context.Background()) - keyLocker := newKeyLocker[int]() + keyLocker := NewKeyLocker[int]() for i := 0; i < 100; i++ { eg.Go(func() error { - keyLocker.LockKey(0) - defer keyLocker.UnlockKey(0) + keyLocker.Lock(0) + defer keyLocker.Unlock(0) require.False(t, taken) taken = true