Fix writecache counters #595
|
@ -8,6 +8,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
cntClient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client/container"
|
||||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||||
|
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
@ -24,59 +25,6 @@ type valueWithTime[V any] struct {
|
||||||
e error
|
e error
|
||||||
}
|
}
|
||||||
|
|
||||||
type locker struct {
|
|
||||||
mtx sync.Mutex
|
|
||||||
waiters int // not protected by mtx, must used outer mutex to update concurrently
|
|
||||||
}
|
|
||||||
|
|
||||||
type keyLocker[K comparable] struct {
|
|
||||||
lockers map[K]*locker
|
|
||||||
lockersMtx sync.Mutex
|
|
||||||
}
|
|
||||||
|
|
||||||
func newKeyLocker[K comparable]() *keyLocker[K] {
|
|
||||||
return &keyLocker[K]{
|
|
||||||
lockers: make(map[K]*locker),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *keyLocker[K]) LockKey(key K) {
|
|
||||||
l.lockersMtx.Lock()
|
|
||||||
|
|
||||||
if locker, found := l.lockers[key]; found {
|
|
||||||
locker.waiters++
|
|
||||||
l.lockersMtx.Unlock()
|
|
||||||
|
|
||||||
locker.mtx.Lock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
locker := &locker{
|
|
||||||
waiters: 1,
|
|
||||||
}
|
|
||||||
locker.mtx.Lock()
|
|
||||||
|
|
||||||
l.lockers[key] = locker
|
|
||||||
l.lockersMtx.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (l *keyLocker[K]) UnlockKey(key K) {
|
|
||||||
l.lockersMtx.Lock()
|
|
||||||
defer l.lockersMtx.Unlock()
|
|
||||||
|
|
||||||
locker, found := l.lockers[key]
|
|
||||||
if !found {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if locker.waiters == 1 {
|
|
||||||
delete(l.lockers, key)
|
|
||||||
}
|
|
||||||
locker.waiters--
|
|
||||||
|
|
||||||
locker.mtx.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// entity that provides TTL cache interface.
|
// entity that provides TTL cache interface.
|
||||||
type ttlNetCache[K comparable, V any] struct {
|
type ttlNetCache[K comparable, V any] struct {
|
||||||
ttl time.Duration
|
ttl time.Duration
|
||||||
|
@ -87,7 +35,7 @@ type ttlNetCache[K comparable, V any] struct {
|
||||||
|
|
||||||
netRdr netValueReader[K, V]
|
netRdr netValueReader[K, V]
|
||||||
|
|
||||||
keyLocker *keyLocker[K]
|
keyLocker *utilSync.KeyLocker[K]
|
||||||
}
|
}
|
||||||
|
|
||||||
// complicates netValueReader with TTL caching mechanism.
|
// complicates netValueReader with TTL caching mechanism.
|
||||||
|
@ -100,7 +48,7 @@ func newNetworkTTLCache[K comparable, V any](sz int, ttl time.Duration, netRdr n
|
||||||
sz: sz,
|
sz: sz,
|
||||||
cache: cache,
|
cache: cache,
|
||||||
netRdr: netRdr,
|
netRdr: netRdr,
|
||||||
keyLocker: newKeyLocker[K](),
|
keyLocker: utilSync.NewKeyLocker[K](),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -115,8 +63,8 @@ func (c *ttlNetCache[K, V]) get(key K) (V, error) {
|
||||||
return val.v, val.e
|
return val.v, val.e
|
||||||
}
|
}
|
||||||
|
|
||||||
c.keyLocker.LockKey(key)
|
c.keyLocker.Lock(key)
|
||||||
defer c.keyLocker.UnlockKey(key)
|
defer c.keyLocker.Unlock(key)
|
||||||
|
|
||||||
val, ok = c.cache.Peek(key)
|
val, ok = c.cache.Peek(key)
|
||||||
if ok && time.Since(val.t) < c.ttl {
|
if ok && time.Since(val.t) < c.ttl {
|
||||||
|
@ -135,8 +83,8 @@ func (c *ttlNetCache[K, V]) get(key K) (V, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
|
func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
|
||||||
c.keyLocker.LockKey(k)
|
c.keyLocker.Lock(k)
|
||||||
defer c.keyLocker.UnlockKey(k)
|
defer c.keyLocker.Unlock(k)
|
||||||
|
|
||||||
c.cache.Add(k, &valueWithTime[V]{
|
c.cache.Add(k, &valueWithTime[V]{
|
||||||
v: v,
|
v: v,
|
||||||
|
@ -146,8 +94,8 @@ func (c *ttlNetCache[K, V]) set(k K, v V, e error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ttlNetCache[K, V]) remove(key K) {
|
func (c *ttlNetCache[K, V]) remove(key K) {
|
||||||
c.keyLocker.LockKey(key)
|
c.keyLocker.Lock(key)
|
||||||
defer c.keyLocker.UnlockKey(key)
|
defer c.keyLocker.Unlock(key)
|
||||||
|
|
||||||
c.cache.Remove(key)
|
c.cache.Remove(key)
|
||||||
}
|
}
|
||||||
|
|
|
@ -513,4 +513,5 @@ const (
|
||||||
FrostFSNodeCantUnmarshalObjectFromDB = "can't unmarshal an object from the DB" // Error in ../node/cmd/frostfs-node/morph.go
|
FrostFSNodeCantUnmarshalObjectFromDB = "can't unmarshal an object from the DB" // Error in ../node/cmd/frostfs-node/morph.go
|
||||||
RuntimeSoftMemoryLimitUpdated = "soft runtime memory limit value updated"
|
RuntimeSoftMemoryLimitUpdated = "soft runtime memory limit value updated"
|
||||||
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
|
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
|
||||||
|
FailedToCountWritecacheItems = "failed to count writecache items"
|
||||||
)
|
)
|
||||||
|
|
|
@ -13,7 +13,10 @@ func (t *FSTree) Open(ro bool) error {
|
||||||
|
|
||||||
// Init implements common.Storage.
|
// Init implements common.Storage.
|
||||||
func (t *FSTree) Init() error {
|
func (t *FSTree) Init() error {
|
||||||
return util.MkdirAllX(t.RootPath, t.Permissions)
|
if err := util.MkdirAllX(t.RootPath, t.Permissions); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return t.initFileCounter()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close implements common.Storage.
|
// Close implements common.Storage.
|
||||||
|
|
32
pkg/local_object_storage/blobstor/fstree/counter.go
Normal file
|
@ -0,0 +1,32 @@
|
||||||
|
package fstree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"math"
|
||||||
|
"sync/atomic"
|
||||||
|
)
|
||||||
|
|
||||||
|
// FileCounter used to count files in FSTree. The implementation must be thread-safe.
|
||||||
|
type FileCounter interface {
|
||||||
|
Set(v uint64)
|
||||||
|
Inc()
|
||||||
|
Dec()
|
||||||
|
}
|
||||||
|
|
||||||
|
type noopCounter struct{}
|
||||||
|
|
||||||
|
func (c *noopCounter) Set(uint64) {}
|
||||||
|
func (c *noopCounter) Inc() {}
|
||||||
|
func (c *noopCounter) Dec() {}
|
||||||
|
|
||||||
fyrchik marked this conversation as resolved
Outdated
|
|||||||
|
type SimpleCounter struct {
|
||||||
|
v atomic.Uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSimpleCounter() *SimpleCounter {
|
||||||
|
return &SimpleCounter{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *SimpleCounter) Set(v uint64) { c.v.Store(v) }
|
||||||
|
func (c *SimpleCounter) Inc() { c.v.Add(1) }
|
||||||
|
func (c *SimpleCounter) Dec() { c.v.Add(math.MaxUint64) }
|
||||||
|
func (c *SimpleCounter) Value() uint64 { return c.v.Load() }
|
|
@ -10,6 +10,7 @@ import (
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync/atomic"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -26,6 +27,16 @@ import (
|
||||||
"go.opentelemetry.io/otel/trace"
|
"go.opentelemetry.io/otel/trace"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type keyLock interface {
|
||||||
|
Lock(string)
|
||||||
|
Unlock(string)
|
||||||
|
}
|
||||||
|
|
||||||
|
type noopKeyLock struct{}
|
||||||
|
|
||||||
|
func (l *noopKeyLock) Lock(string) {}
|
||||||
|
func (l *noopKeyLock) Unlock(string) {}
|
||||||
|
|
||||||
// FSTree represents an object storage as a filesystem tree.
|
// FSTree represents an object storage as a filesystem tree.
|
||||||
type FSTree struct {
|
type FSTree struct {
|
||||||
Info
|
Info
|
||||||
|
@ -37,6 +48,12 @@ type FSTree struct {
|
||||||
noSync bool
|
noSync bool
|
||||||
readOnly bool
|
readOnly bool
|
||||||
metrics Metrics
|
metrics Metrics
|
||||||
|
|
||||||
|
fileGuard keyLock
|
||||||
|
fileCounter FileCounter
|
||||||
|
fileCounterEnabled bool
|
||||||
|
|
||||||
|
suffix atomic.Uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
// Info groups the information about file storage.
|
// Info groups the information about file storage.
|
||||||
|
@ -63,10 +80,12 @@ func New(opts ...Option) *FSTree {
|
||||||
Permissions: 0700,
|
Permissions: 0700,
|
||||||
RootPath: "./",
|
RootPath: "./",
|
||||||
},
|
},
|
||||||
Config: nil,
|
Config: nil,
|
||||||
Depth: 4,
|
Depth: 4,
|
||||||
DirNameLen: DirNameLen,
|
DirNameLen: DirNameLen,
|
||||||
metrics: &noopMetrics{},
|
metrics: &noopMetrics{},
|
||||||
|
fileGuard: &noopKeyLock{},
|
||||||
|
fileCounter: &noopCounter{},
|
||||||
}
|
}
|
||||||
for i := range opts {
|
for i := range opts {
|
||||||
opts[i](f)
|
opts[i](f)
|
||||||
|
@ -244,7 +263,17 @@ func (t *FSTree) Delete(ctx context.Context, prm common.DeletePrm) (common.Delet
|
||||||
|
|
||||||
p := t.treePath(prm.Address)
|
p := t.treePath(prm.Address)
|
||||||
|
|
||||||
err = os.Remove(p)
|
if t.fileCounterEnabled {
|
||||||
|
t.fileGuard.Lock(p)
|
||||||
|
err = os.Remove(p)
|
||||||
|
t.fileGuard.Unlock(p)
|
||||||
|
if err == nil {
|
||||||
|
t.fileCounter.Dec()
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err = os.Remove(p)
|
||||||
|
}
|
||||||
|
|
||||||
if err != nil && os.IsNotExist(err) {
|
if err != nil && os.IsNotExist(err) {
|
||||||
err = logicerr.Wrap(new(apistatus.ObjectNotFound))
|
err = logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
}
|
}
|
||||||
|
@ -317,45 +346,19 @@ func (t *FSTree) Put(ctx context.Context, prm common.PutPrm) (common.PutRes, err
|
||||||
prm.RawData = t.Compress(prm.RawData)
|
prm.RawData = t.Compress(prm.RawData)
|
||||||
}
|
}
|
||||||
|
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
We should not panic in fstree. >// NewString creates a new random UUID and returns it as a string or panics.
We should not panic in fstree.
dstepanov-yadro
commented
uuid replaced with atomic counter. uuid replaced with atomic counter.
|
|||||||
// Here is a situation:
|
|
||||||
// Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.161Z info log/log.go:13 local object storage operation {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "address": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "op": "PUT", "type": "fstree", "storage_id": ""}
|
|
||||||
// Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.183Z info log/log.go:13 local object storage operation {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "address": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "op": "metabase PUT"}
|
|
||||||
// Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.862Z debug policer/check.go:231 shortage of object copies detected {"component": "Object Policer", "object": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "shortage": 1}
|
|
||||||
// Feb 09 13:10:37 buky neofs-node[32445]: 2023-02-09T13:10:37.862Z debug shard/get.go:124 object is missing in write-cache {"shard_id": "SkT8BfjouW6t93oLuzQ79s", "addr": "7NxFz4SruSi8TqXacr2Ae22nekMhgYk1sfkddJo9PpWk/5enyUJGCyU1sfrURDnHEjZFdbGqANVhayYGfdSqtA6wA", "skip_meta": false}
|
|
||||||
//
|
|
||||||
// 1. We put an object on node 1.
|
|
||||||
// 2. Relentless policer sees that it has only 1 copy and tries to PUT it to node 2.
|
|
||||||
// 3. PUT operation started by client at (1) also puts an object here.
|
|
||||||
// 4. Now we have concurrent writes and one of `Rename` calls will return `no such file` error.
|
|
||||||
// Even more than that, concurrent writes can corrupt data.
|
|
||||||
//
|
|
||||||
// So here is a solution:
|
|
||||||
// 1. Write a file to 'name + 1'.
|
|
||||||
// 2. If it exists, retry with temporary name being 'name + 2'.
|
|
||||||
// 3. Set some reasonable number of attempts.
|
|
||||||
//
|
|
||||||
// It is a bit kludgey, but I am unusually proud about having found this out after
|
|
||||||
// hours of research on linux kernel, dirsync mount option and ext4 FS, turned out
|
|
||||||
// to be so hecking simple.
|
|
||||||
// In a very rare situation we can have multiple partially written copies on disk,
|
|
||||||
// this will be fixed in another issue (we should remove garbage on start).
|
|
||||||
size = len(prm.RawData)
|
size = len(prm.RawData)
|
||||||
const retryCount = 5
|
tmpPath := p + "#" + strconv.FormatUint(t.suffix.Add(1), 10)
|
||||||
for i := 0; i < retryCount; i++ {
|
err = t.writeAndRename(tmpPath, p, prm.RawData)
|
||||||
tmpPath := p + "#" + strconv.FormatUint(uint64(i), 10)
|
|
||||||
err = t.writeAndRename(tmpPath, p, prm.RawData)
|
|
||||||
if err != syscall.EEXIST || i == retryCount-1 {
|
|
||||||
return common.PutRes{StorageID: []byte{}}, err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
err = fmt.Errorf("couldn't read file after %d retries", retryCount)
|
|
||||||
// unreachable, but precaution never hurts, especially 1 day before release.
|
|
||||||
return common.PutRes{StorageID: []byte{}}, err
|
return common.PutRes{StorageID: []byte{}}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// writeAndRename opens tmpPath exclusively, writes data to it and renames it to p.
|
// writeAndRename opens tmpPath exclusively, writes data to it and renames it to p.
|
||||||
func (t *FSTree) writeAndRename(tmpPath, p string, data []byte) error {
|
func (t *FSTree) writeAndRename(tmpPath, p string, data []byte) error {
|
||||||
|
if t.fileCounterEnabled {
|
||||||
|
t.fileGuard.Lock(p)
|
||||||
|
defer t.fileGuard.Unlock(p)
|
||||||
|
}
|
||||||
|
|
||||||
err := t.writeFile(tmpPath, data)
|
err := t.writeFile(tmpPath, data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var pe *fs.PathError
|
var pe *fs.PathError
|
||||||
|
@ -364,10 +367,21 @@ func (t *FSTree) writeAndRename(tmpPath, p string, data []byte) error {
|
||||||
case syscall.ENOSPC:
|
case syscall.ENOSPC:
|
||||||
err = common.ErrNoSpace
|
err = common.ErrNoSpace
|
||||||
_ = os.RemoveAll(tmpPath)
|
_ = os.RemoveAll(tmpPath)
|
||||||
case syscall.EEXIST:
|
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why is it removed? Why is it removed?
dstepanov-yadro
commented
It looks like this It looks like this `case` statement was used for retries. Retries were deleted, so this `case` is redundant.
|
|||||||
return syscall.EEXIST
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
fyrchik marked this conversation as resolved
Outdated
acid-ant
commented
Looks like comment above no more relevant. Looks like comment above no more relevant.
dstepanov-yadro
commented
Hm, which one? Comment was deleted. Hm, which one? Comment was deleted.
acid-ant
commented
```
...
// So here is a solution:
// 1. Write a file to 'name + 1'.
// 2. If it exists, retry with temporary name being 'name + 2'.
// 3. Set some reasonable number of attempts.
...
```
dstepanov-yadro
commented
Ah. Removed this cool story completely. Thx. Ah. Removed this cool story completely. Thx.
|
|||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if t.fileCounterEnabled {
|
||||||
|
t.fileCounter.Inc()
|
||||||
|
var targetFileExists bool
|
||||||
|
if _, e := os.Stat(p); e == nil {
|
||||||
|
targetFileExists = true
|
||||||
|
}
|
||||||
|
err = os.Rename(tmpPath, p)
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why do we Why do we `Inc` then `Dec` instead of just not doing `Inc` in the first place?
It is also nice to increase the counter _after_ the file has been written.
dstepanov-yadro
commented
You've answered your question:
> Why do we Inc then Dec instead of just not doing Inc in the first place?
You've answered your question: `It is also nice to increase the counter after the file has been written.`
```
if t.objCounterEnabled {
t.counter.Inc() // here we have temp file written, so we increase counter
...
if err == nil && targetFileExists {
t.counter.Dec() //here we have temp file deleted, so we decrease counter
}
```
fyrchik
commented
I mean when the temp file is written the object does not yet exists, it start to exist after rename, right? I mean when the temp file is written the object does not yet exists, it start to exist after rename, right?
dstepanov-yadro
commented
But temp file exists, right? But temp file exists, right?
fyrchik
commented
Will Will `Iterate` see it?
dstepanov-yadro
commented
Yes Yes
fyrchik
commented
As I understand, for As I understand, for `Delete` it is ok, because the lock is already taken (not obvious, but works).
`Iterate` does not use any locks currently, and I don't see anything changed in this PR.
Am I missing something?
dstepanov-yadro
commented
renamed to file counter. renamed to file counter.
|
|||||||
|
if err == nil && targetFileExists {
|
||||||
|
t.fileCounter.Dec()
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
err = os.Rename(tmpPath, p)
|
err = os.Rename(tmpPath, p)
|
||||||
}
|
}
|
||||||
|
@ -396,27 +410,6 @@ func (t *FSTree) writeFile(p string, data []byte) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// PutStream puts executes handler on a file opened for write.
|
|
||||||
func (t *FSTree) PutStream(addr oid.Address, handler func(*os.File) error) error {
|
|
||||||
dstepanov-yadro
commented
Unused Unused
|
|||||||
if t.readOnly {
|
|
||||||
return common.ErrReadOnly
|
|
||||||
}
|
|
||||||
|
|
||||||
p := t.treePath(addr)
|
|
||||||
|
|
||||||
if err := util.MkdirAllX(filepath.Dir(p), t.Permissions); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
f, err := os.OpenFile(p, t.writeFlags(), t.Permissions)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer f.Close()
|
|
||||||
|
|
||||||
return handler(f)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get returns an object from the storage by address.
|
// Get returns an object from the storage by address.
|
||||||
func (t *FSTree) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) {
|
func (t *FSTree) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, error) {
|
||||||
var (
|
var (
|
||||||
|
@ -450,6 +443,9 @@ func (t *FSTree) Get(ctx context.Context, prm common.GetPrm) (common.GetRes, err
|
||||||
|
|
||||||
data, err = os.ReadFile(p)
|
data, err = os.ReadFile(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if os.IsNotExist(err) {
|
||||||
|
return common.GetRes{}, logicerr.Wrap(new(apistatus.ObjectNotFound))
|
||||||
|
}
|
||||||
return common.GetRes{}, err
|
return common.GetRes{}, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -509,11 +505,23 @@ func (t *FSTree) GetRange(ctx context.Context, prm common.GetRangePrm) (common.G
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// NumberOfObjects walks the file tree rooted at FSTree's root
|
// initFileCounter walks the file tree rooted at FSTree's root,
|
||||||
// and returns number of stored objects.
|
// counts total items count, inits counter and returns number of stored objects.
|
||||||
func (t *FSTree) NumberOfObjects() (uint64, error) {
|
func (t *FSTree) initFileCounter() error {
|
||||||
var counter uint64
|
if !t.fileCounterEnabled {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
counter, err := t.countFiles()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
t.fileCounter.Set(counter)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *FSTree) countFiles() (uint64, error) {
|
||||||
|
var counter uint64
|
||||||
// it is simpler to just consider every file
|
// it is simpler to just consider every file
|
||||||
// that is not directory as an object
|
// that is not directory as an object
|
||||||
err := filepath.WalkDir(t.RootPath,
|
err := filepath.WalkDir(t.RootPath,
|
||||||
|
|
|
@ -1,10 +1,16 @@
|
||||||
package fstree
|
package fstree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util/logicerr"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestAddressToString(t *testing.T) {
|
func TestAddressToString(t *testing.T) {
|
||||||
|
@ -28,3 +34,68 @@ func Benchmark_addressFromString(b *testing.B) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestObjectCounter(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
counter := NewSimpleCounter()
|
||||||
|
fst := New(
|
||||||
|
WithPath(t.TempDir()),
|
||||||
|
WithDepth(2),
|
||||||
|
WithDirNameLen(2),
|
||||||
|
WithFileCounter(counter))
|
||||||
|
require.NoError(t, fst.Open(false))
|
||||||
|
require.NoError(t, fst.Init())
|
||||||
|
|
||||||
|
counterValue := counter.Value()
|
||||||
|
require.Equal(t, uint64(0), counterValue)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
require.NoError(t, fst.Close())
|
||||||
|
}()
|
||||||
|
|
||||||
|
addr := oidtest.Address()
|
||||||
|
obj := objectSDK.New()
|
||||||
|
obj.SetID(addr.Object())
|
||||||
|
obj.SetContainerID(addr.Container())
|
||||||
|
obj.SetPayload([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 0})
|
||||||
|
|
||||||
|
var putPrm common.PutPrm
|
||||||
|
putPrm.Address = addr
|
||||||
|
putPrm.RawData, _ = obj.Marshal()
|
||||||
|
|
||||||
|
var getPrm common.GetPrm
|
||||||
|
getPrm.Address = putPrm.Address
|
||||||
|
|
||||||
|
var delPrm common.DeletePrm
|
||||||
|
delPrm.Address = addr
|
||||||
|
|
||||||
|
eg, egCtx := errgroup.WithContext(context.Background())
|
||||||
|
|
||||||
|
eg.Go(func() error {
|
||||||
|
for j := 0; j < 1_000; j++ {
|
||||||
|
_, err := fst.Put(egCtx, putPrm)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
eg.Go(func() error {
|
||||||
|
var le logicerr.Logical
|
||||||
|
for j := 0; j < 1_000; j++ {
|
||||||
|
_, err := fst.Delete(egCtx, delPrm)
|
||||||
|
if err != nil && !errors.As(err, &le) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
require.NoError(t, eg.Wait())
|
||||||
|
|
||||||
|
counterValue = counter.Value()
|
||||||
|
realCount, err := fst.countFiles()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, realCount, counterValue)
|
||||||
|
}
|
||||||
|
|
|
@ -2,6 +2,8 @@ package fstree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io/fs"
|
"io/fs"
|
||||||
|
|
||||||
|
utilSync "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Option func(*FSTree)
|
type Option func(*FSTree)
|
||||||
|
@ -41,3 +43,11 @@ func WithMetrics(m Metrics) Option {
|
||||||
f.metrics = m
|
f.metrics = m
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithFileCounter(c FileCounter) Option {
|
||||||
|
return func(f *FSTree) {
|
||||||
|
f.fileCounterEnabled = true
|
||||||
|
f.fileCounter = c
|
||||||
|
f.fileGuard = utilSync.NewKeyLocker[string]()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -53,7 +53,7 @@ var storages = []storage{
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
desc: "fstree",
|
desc: "fstree_without_object_counter",
|
||||||
create: func(dir string) common.Storage {
|
create: func(dir string) common.Storage {
|
||||||
return fstree.New(
|
return fstree.New(
|
||||||
fstree.WithPath(dir),
|
fstree.WithPath(dir),
|
||||||
|
@ -62,6 +62,17 @@ var storages = []storage{
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
desc: "fstree_with_object_counter",
|
||||||
|
create: func(dir string) common.Storage {
|
||||||
|
return fstree.New(
|
||||||
|
fstree.WithPath(dir),
|
||||||
|
fstree.WithDepth(2),
|
||||||
|
fstree.WithDirNameLen(2),
|
||||||
|
fstree.WithFileCounter(fstree.NewSimpleCounter()),
|
||||||
|
)
|
||||||
|
},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
desc: "blobovniczatree",
|
desc: "blobovniczatree",
|
||||||
create: func(dir string) common.Storage {
|
create: func(dir string) common.Storage {
|
||||||
|
|
|
@ -2,6 +2,7 @@ package writecachebbolt
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"math"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||||||
|
@ -49,9 +50,12 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
|
|
||||||
if dataSize > 0 {
|
if dataSize > 0 {
|
||||||
storageType = writecache.StorageTypeDB
|
storageType = writecache.StorageTypeDB
|
||||||
|
var recordDeleted bool
|
||||||
err := c.db.Update(func(tx *bbolt.Tx) error {
|
err := c.db.Update(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
err := b.Delete([]byte(saddr))
|
key := []byte(saddr)
|
||||||
|
recordDeleted = b.Get(key) != nil
|
||||||
|
err := b.Delete(key)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -62,8 +66,11 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
storagelog.OpField("db DELETE"),
|
storagelog.OpField("db DELETE"),
|
||||||
)
|
)
|
||||||
|
if recordDeleted {
|
||||||
|
c.objCounters.cDB.Add(math.MaxUint64)
|
||||||
|
c.estimateCacheSize()
|
||||||
|
}
|
||||||
deleted = true
|
deleted = true
|
||||||
c.objCounters.DecDB()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,9 +82,8 @@ func (c *cache) Delete(ctx context.Context, addr oid.Address) error {
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
storagelog.OpField("fstree DELETE"),
|
storagelog.OpField("fstree DELETE"),
|
||||||
)
|
)
|
||||||
c.objCounters.DecFS()
|
|
||||||
deleted = true
|
deleted = true
|
||||||
|
c.estimateCacheSize()
|
||||||
}
|
}
|
||||||
|
|
||||||
return metaerr.Wrap(err)
|
return metaerr.Wrap(err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -85,9 +85,15 @@ func (c *cache) putSmall(obj objectInfo) error {
|
||||||
return ErrOutOfSpace
|
return ErrOutOfSpace
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var newRecord bool
|
||||||
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
return b.Put([]byte(obj.addr), obj.data)
|
key := []byte(obj.addr)
|
||||||
|
newRecord = b.Get(key) == nil
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why, though? We should always check the actual value, because we cannot assume anything else. Why, though? We should _always_ check the actual value, because we cannot assume anything else.
dstepanov-yadro
commented
See
I understand this as See `boltdb.Batch` comment:
```
// 2. the function passed to Batch may be called multiple times,
// regardless of whether it returns error or not.
```
I understand this as ` function will be called at least once`. If `key` was inserted by first call, then it will be already already in db on second call.
fyrchik
commented
No, this violates transaction isolation: transaction either applies (nil error) or not (non-nil error, and can be retried in Batch in this case). So even if we applied everything on our first execution, I doubt the changes are persisted. No, this violates transaction isolation: transaction either applies (nil error) or not (non-nil error, and can be retried in Batch in this case). So even if we applied everything on our first execution, I doubt the changes are persisted.
fyrchik
commented
If what I wrote is false, we have much bigger problems (consider metabase and all its checks, for example) If what I wrote is false, we have much bigger problems (consider metabase and all its checks, for example)
fyrchik
commented
Oh, I missed the Oh, I missed the `regardless` part.
fyrchik
commented
But no, still, we are talking about the data in the database, the function can be executed twice but under no circumstances should we apply a transaction twice. But no, still, we are talking about the data in the database, the function can be executed twice but under no circumstances should we apply a transaction twice.
Here is a call to batch trigger: https://github.com/etcd-io/bbolt/blob/master/db.go#L983
If any transaction in the batch returns false, all previous functions can be reexecuted with nil error, but not reapplied (we still return err from Update().
fyrchik
commented
Also, how about avoiding PUT in this case? Also, how about avoiding PUT in this case?
It should reduce the amount of pages we flush during commit (I am not sure whether its worth the effort, though, the situation is quite rare).
dstepanov-yadro
commented
Ok, you're right. Fixed. Ok, you're right. Fixed.
|
|||||||
|
if newRecord {
|
||||||
|
return b.Put(key, obj.data)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
if err == nil {
|
if err == nil {
|
||||||
storagelog.Write(c.log,
|
storagelog.Write(c.log,
|
||||||
|
@ -95,7 +101,10 @@ func (c *cache) putSmall(obj objectInfo) error {
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
storagelog.OpField("db PUT"),
|
storagelog.OpField("db PUT"),
|
||||||
aarifullin
commented
`c.objCounters.cDB.Add(1)` -> `c.objCounters.IncDB()` ?
dstepanov-yadro
commented
`c.objCounters.IncDB()` was removed, as there is no need to have this method, all fields are available.
|
|||||||
)
|
)
|
||||||
c.objCounters.IncDB()
|
if newRecord {
|
||||||
|
c.objCounters.cDB.Add(1)
|
||||||
|
c.estimateCacheSize()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -117,12 +126,12 @@ func (c *cache) putBig(ctx context.Context, addr string, prm common.PutPrm) erro
|
||||||
c.compressFlags[addr] = struct{}{}
|
c.compressFlags[addr] = struct{}{}
|
||||||
c.mtx.Unlock()
|
c.mtx.Unlock()
|
||||||
}
|
}
|
||||||
c.objCounters.IncFS()
|
|
||||||
storagelog.Write(c.log,
|
storagelog.Write(c.log,
|
||||||
storagelog.AddressField(addr),
|
storagelog.AddressField(addr),
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
storagelog.OpField("fstree PUT"),
|
storagelog.OpField("fstree PUT"),
|
||||||
)
|
)
|
||||||
|
c.estimateCacheSize()
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,14 +5,21 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (c *cache) estimateCacheSize() uint64 {
|
func (c *cache) estimateCacheSize() uint64 {
|
||||||
db := c.objCounters.DB() * c.smallObjectSize
|
dbCount := c.objCounters.DB()
|
||||||
fstree := c.objCounters.FS() * c.maxObjectSize
|
fsCount := c.objCounters.FS()
|
||||||
c.metrics.SetEstimateSize(db, fstree)
|
if fsCount > 0 {
|
||||||
aarifullin
commented
I suppose I mean I suppose `dbCount` and `fsCount` may be negative (despite they are `uint64`). Don't you need to consider the such case?
I mean `fsCount` can be `MAX_UINT64_SIZE - n` meaning it's negative but still the check works out and decremented
dstepanov-yadro
commented
Yes, they may be negative. But what should we do? Panic or error? Looks too much. Use zero instead of negative value? Then we won't know for sure that we have a bug. So negative values are good point to create a bug, but they shouldn't break the program flow. Yes, they may be negative. But what should we do?
Panic or error? Looks too much.
Use zero instead of negative value? Then we won't know for sure that we have a bug.
So negative values are good point to create a bug, but they shouldn't break the program flow.
|
|||||||
return db + fstree
|
fsCount-- //db file
|
||||||
|
}
|
||||||
|
dbSize := dbCount * c.smallObjectSize
|
||||||
|
fsSize := fsCount * c.maxObjectSize
|
||||||
|
c.metrics.SetEstimateSize(dbSize, fsSize)
|
||||||
|
c.metrics.SetActualCounters(dbCount, fsCount)
|
||||||
|
return dbSize + fsSize
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) incSizeDB(sz uint64) uint64 {
|
func (c *cache) incSizeDB(sz uint64) uint64 {
|
||||||
|
@ -23,34 +30,35 @@ func (c *cache) incSizeFS(sz uint64) uint64 {
|
||||||
return sz + c.maxObjectSize
|
return sz + c.maxObjectSize
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ fstree.FileCounter = &counters{}
|
||||||
|
|
||||||
type counters struct {
|
type counters struct {
|
||||||
cDB, cFS atomic.Uint64
|
cDB, cFS atomic.Uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *counters) IncDB() {
|
|
||||||
x.cDB.Add(1)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (x *counters) DecDB() {
|
|
||||||
x.cDB.Add(math.MaxUint64)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (x *counters) DB() uint64 {
|
func (x *counters) DB() uint64 {
|
||||||
return x.cDB.Load()
|
return x.cDB.Load()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *counters) IncFS() {
|
func (x *counters) FS() uint64 {
|
||||||
|
return x.cFS.Load()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set implements fstree.ObjectCounter.
|
||||||
|
func (x *counters) Set(v uint64) {
|
||||||
|
x.cFS.Store(v)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Inc implements fstree.ObjectCounter.
|
||||||
|
func (x *counters) Inc() {
|
||||||
x.cFS.Add(1)
|
x.cFS.Add(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *counters) DecFS() {
|
// Dec implements fstree.ObjectCounter.
|
||||||
|
func (x *counters) Dec() {
|
||||||
x.cFS.Add(math.MaxUint64)
|
x.cFS.Add(math.MaxUint64)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *counters) FS() uint64 {
|
|
||||||
return x.cFS.Load()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *cache) initCounters() error {
|
func (c *cache) initCounters() error {
|
||||||
var inDB uint64
|
var inDB uint64
|
||||||
err := c.db.View(func(tx *bbolt.Tx) error {
|
err := c.db.View(func(tx *bbolt.Tx) error {
|
||||||
|
@ -63,18 +71,6 @@ func (c *cache) initCounters() error {
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not read write-cache DB counter: %w", err)
|
return fmt.Errorf("could not read write-cache DB counter: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
inFS, err := c.fsTree.NumberOfObjects()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("could not read write-cache FS counter: %w", err)
|
|
||||||
}
|
|
||||||
if inFS > 0 {
|
|
||||||
inFS-- //small.bolt DB file
|
|
||||||
}
|
|
||||||
|
|
||||||
c.objCounters.cDB.Store(inDB)
|
c.objCounters.cDB.Store(inDB)
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
Why do we update counters for db and fstree in different places? Why do we update counters for db and fstree in different places?
dstepanov-yadro
commented
db counter relates to db. fstree counter relates to fstree. But both of counters we update in db counter relates to db. fstree counter relates to fstree. But both of counters _we_ update in `cache.Open` method.
fyrchik
commented
Ok, I don't mind, but to me it adds cognitive complexity, so at some point sth similar to #610 will happen. Ok, I don't mind, but to me it adds cognitive complexity, so at some point sth similar to https://git.frostfs.info/TrueCloudLab/frostfs-node/issues/610 will happen.
|
|||||||
c.objCounters.cFS.Store(inFS)
|
|
||||||
c.metrics.SetActualCounters(inDB, inFS)
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package writecachebbolt
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
@ -54,29 +55,39 @@ func (c *cache) openStore(readOnly bool) error {
|
||||||
fstree.WithPerm(os.ModePerm),
|
fstree.WithPerm(os.ModePerm),
|
||||||
fstree.WithDepth(1),
|
fstree.WithDepth(1),
|
||||||
fstree.WithDirNameLen(1),
|
fstree.WithDirNameLen(1),
|
||||||
fstree.WithNoSync(c.noSync))
|
fstree.WithNoSync(c.noSync),
|
||||||
|
fstree.WithFileCounter(&c.objCounters),
|
||||||
|
)
|
||||||
if err := c.fsTree.Open(readOnly); err != nil {
|
if err := c.fsTree.Open(readOnly); err != nil {
|
||||||
return fmt.Errorf("could not open FSTree: %w", err)
|
return fmt.Errorf("could not open FSTree: %w", err)
|
||||||
}
|
}
|
||||||
dstepanov-yadro
commented
To init object counter To init object counter
|
|||||||
|
if err := c.fsTree.Init(); err != nil {
|
||||||
|
return fmt.Errorf("could not init FSTree: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *cache) deleteFromDB(key string) {
|
func (c *cache) deleteFromDB(key string) {
|
||||||
|
var recordDeleted bool
|
||||||
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
err := c.db.Batch(func(tx *bbolt.Tx) error {
|
||||||
b := tx.Bucket(defaultBucket)
|
b := tx.Bucket(defaultBucket)
|
||||||
return b.Delete([]byte(key))
|
key := []byte(key)
|
||||||
|
recordDeleted = !recordDeleted && b.Get(key) != nil
|
||||||
dstepanov-yadro
commented
batch func can be called more than once, so check previous value is required batch func can be called more than once, so check previous value is required
|
|||||||
|
return b.Delete(key)
|
||||||
})
|
})
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
c.objCounters.DecDB()
|
|
||||||
c.metrics.Evict(writecache.StorageTypeDB)
|
c.metrics.Evict(writecache.StorageTypeDB)
|
||||||
storagelog.Write(c.log,
|
storagelog.Write(c.log,
|
||||||
storagelog.AddressField(key),
|
storagelog.AddressField(key),
|
||||||
storagelog.StorageTypeField(wcStorageType),
|
storagelog.StorageTypeField(wcStorageType),
|
||||||
storagelog.OpField("db DELETE"),
|
storagelog.OpField("db DELETE"),
|
||||||
)
|
)
|
||||||
c.estimateCacheSize()
|
if recordDeleted {
|
||||||
|
c.objCounters.cDB.Add(math.MaxUint64)
|
||||||
|
c.estimateCacheSize()
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
|
c.log.Error(logs.WritecacheCantRemoveObjectsFromTheDatabase, zap.Error(err))
|
||||||
}
|
}
|
||||||
|
@ -111,7 +122,6 @@ func (c *cache) deleteFromDisk(ctx context.Context, keys []string) []string {
|
||||||
storagelog.OpField("fstree DELETE"),
|
storagelog.OpField("fstree DELETE"),
|
||||||
)
|
)
|
||||||
c.metrics.Evict(writecache.StorageTypeFSTree)
|
c.metrics.Evict(writecache.StorageTypeFSTree)
|
||||||
c.objCounters.DecFS()
|
|
||||||
c.estimateCacheSize()
|
c.estimateCacheSize()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
56
pkg/util/sync/key_locker.go
Normal file
|
@ -0,0 +1,56 @@
|
||||||
|
package sync
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
type locker struct {
|
||||||
|
mtx sync.Mutex
|
||||||
|
waiters int // not protected by mtx, must used outer mutex to update concurrently
|
||||||
|
}
|
||||||
|
|
||||||
|
type KeyLocker[K comparable] struct {
|
||||||
|
lockers map[K]*locker
|
||||||
|
lockersMtx sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewKeyLocker[K comparable]() *KeyLocker[K] {
|
||||||
|
return &KeyLocker[K]{
|
||||||
|
lockers: make(map[K]*locker),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *KeyLocker[K]) Lock(key K) {
|
||||||
|
l.lockersMtx.Lock()
|
||||||
|
|
||||||
|
if locker, found := l.lockers[key]; found {
|
||||||
|
locker.waiters++
|
||||||
|
l.lockersMtx.Unlock()
|
||||||
|
|
||||||
|
locker.mtx.Lock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
locker := &locker{
|
||||||
|
waiters: 1,
|
||||||
|
}
|
||||||
|
locker.mtx.Lock()
|
||||||
|
|
||||||
|
l.lockers[key] = locker
|
||||||
|
l.lockersMtx.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *KeyLocker[K]) Unlock(key K) {
|
||||||
|
l.lockersMtx.Lock()
|
||||||
|
defer l.lockersMtx.Unlock()
|
||||||
|
|
||||||
|
locker, found := l.lockers[key]
|
||||||
|
if !found {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if locker.waiters == 1 {
|
||||||
|
delete(l.lockers, key)
|
||||||
|
}
|
||||||
|
locker.waiters--
|
||||||
|
|
||||||
|
locker.mtx.Unlock()
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package main
|
package sync
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -12,11 +12,11 @@ import (
|
||||||
func TestKeyLocker(t *testing.T) {
|
func TestKeyLocker(t *testing.T) {
|
||||||
taken := false
|
taken := false
|
||||||
eg, _ := errgroup.WithContext(context.Background())
|
eg, _ := errgroup.WithContext(context.Background())
|
||||||
keyLocker := newKeyLocker[int]()
|
keyLocker := NewKeyLocker[int]()
|
||||||
for i := 0; i < 100; i++ {
|
for i := 0; i < 100; i++ {
|
||||||
eg.Go(func() error {
|
eg.Go(func() error {
|
||||||
keyLocker.LockKey(0)
|
keyLocker.Lock(0)
|
||||||
defer keyLocker.UnlockKey(0)
|
defer keyLocker.Unlock(0)
|
||||||
|
|
||||||
require.False(t, taken)
|
require.False(t, taken)
|
||||||
taken = true
|
taken = true
|
s/SimpleCounter/AtomicCounter/
?No,
atomic
is implementation detailAgree, can we mention in the interface comment that the counter implementation must be thread-safe?