forked from TrueCloudLab/frostfs-node
Compare commits
No commits in common. "4c37f95656b49b4ede338d8904c6c45b24a39f73" and "612b34d5708c23f888f75f6b6b1e5e87efdeede1" have entirely different histories.
4c37f95656
...
612b34d570
19 changed files with 301 additions and 662 deletions
4
Makefile
4
Makefile
|
@ -8,8 +8,8 @@ HUB_IMAGE ?= git.frostfs.info/truecloudlab/frostfs
|
||||||
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
|
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
|
||||||
|
|
||||||
GO_VERSION ?= 1.22
|
GO_VERSION ?= 1.22
|
||||||
LINT_VERSION ?= 1.62.0
|
LINT_VERSION ?= 1.61.0
|
||||||
TRUECLOUDLAB_LINT_VERSION ?= 0.0.8
|
TRUECLOUDLAB_LINT_VERSION ?= 0.0.7
|
||||||
PROTOC_VERSION ?= 25.0
|
PROTOC_VERSION ?= 25.0
|
||||||
PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-sdk-go)
|
PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-sdk-go)
|
||||||
PROTOC_OS_VERSION=osx-x86_64
|
PROTOC_OS_VERSION=osx-x86_64
|
||||||
|
|
|
@ -1,291 +0,0 @@
|
||||||
package quota
|
|
||||||
|
|
||||||
import (
|
|
||||||
"container/heap"
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"math/rand/v2"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Quota struct {
|
|
||||||
Read int64
|
|
||||||
Write int64
|
|
||||||
}
|
|
||||||
|
|
||||||
type Priority struct {
|
|
||||||
Class string
|
|
||||||
Value byte
|
|
||||||
}
|
|
||||||
|
|
||||||
type Release func()
|
|
||||||
|
|
||||||
type limit struct {
|
|
||||||
read, write int64
|
|
||||||
maxRead, maxWrite int64
|
|
||||||
}
|
|
||||||
|
|
||||||
type queueItem struct {
|
|
||||||
priority byte
|
|
||||||
ts time.Time
|
|
||||||
index int
|
|
||||||
}
|
|
||||||
|
|
||||||
type queue struct {
|
|
||||||
l *limit
|
|
||||||
w int32
|
|
||||||
items []*queueItem
|
|
||||||
}
|
|
||||||
|
|
||||||
type QuotaLimiter struct {
|
|
||||||
l *limit
|
|
||||||
classes map[string]*queue
|
|
||||||
queues []*queue
|
|
||||||
nextQueue *queue
|
|
||||||
cond *sync.Cond
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ql *QuotaLimiter) Acquire(ctx context.Context, p Priority, q Quota) (Release, error) {
|
|
||||||
queue, ok := ql.classes[p.Class]
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("unknown class '%s'", p.Class)
|
|
||||||
}
|
|
||||||
|
|
||||||
if ql.l.maxRead > 0 && q.Read > ql.l.maxRead {
|
|
||||||
return nil, fmt.Errorf("read quota %d exceeds total limit %d", q.Read, ql.l.maxRead)
|
|
||||||
}
|
|
||||||
|
|
||||||
if ql.l.maxWrite > 0 && q.Write > ql.l.maxWrite {
|
|
||||||
return nil, fmt.Errorf("read quota %d exceeds total limit %d", q.Write, ql.l.maxWrite)
|
|
||||||
}
|
|
||||||
|
|
||||||
if queue.l.maxRead > 0 && q.Read > queue.l.maxRead {
|
|
||||||
return nil, fmt.Errorf("read quota %d exceeds queue limit %d", q.Read, queue.l.maxRead)
|
|
||||||
}
|
|
||||||
|
|
||||||
if queue.l.maxWrite > 0 && q.Write > queue.l.maxWrite {
|
|
||||||
return nil, fmt.Errorf("read quota %d exceeds queue limit %d", q.Write, queue.l.maxWrite)
|
|
||||||
}
|
|
||||||
|
|
||||||
ts := time.Now()
|
|
||||||
|
|
||||||
ql.cond.L.Lock()
|
|
||||||
defer ql.cond.L.Unlock()
|
|
||||||
|
|
||||||
stop := context.AfterFunc(ctx, func() {
|
|
||||||
ql.cond.Broadcast()
|
|
||||||
})
|
|
||||||
defer stop()
|
|
||||||
|
|
||||||
allow := ql.nextQueue == nil && // no scheduled queue
|
|
||||||
hasQuota(q, queue.l) && // queue limit
|
|
||||||
hasQuota(q, ql.l) // global lomit
|
|
||||||
|
|
||||||
if allow {
|
|
||||||
applyQuota(q, queue.l)
|
|
||||||
applyQuota(q, ql.l)
|
|
||||||
return func() { ql.release(p, q) }, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
qi := &queueItem{
|
|
||||||
priority: p.Value,
|
|
||||||
ts: ts,
|
|
||||||
}
|
|
||||||
|
|
||||||
queue.push(qi)
|
|
||||||
if queue.count() == 1 {
|
|
||||||
ql.resetNextQueue()
|
|
||||||
}
|
|
||||||
|
|
||||||
var hasGlobalQuota, hasQueueQuota, isNextItem bool
|
|
||||||
for !allow {
|
|
||||||
ql.cond.Wait()
|
|
||||||
|
|
||||||
if err := ctx.Err(); err != nil {
|
|
||||||
queue.drop(qi)
|
|
||||||
if queue.count() == 0 {
|
|
||||||
ql.resetNextQueue()
|
|
||||||
}
|
|
||||||
return nil, ctx.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
hasGlobalQuota = hasQuota(q, ql.l)
|
|
||||||
hasQueueQuota = hasQuota(q, queue.l)
|
|
||||||
isNextItem = ql.nextQueue == queue && queue.top() == qi
|
|
||||||
|
|
||||||
if hasGlobalQuota && !hasQueueQuota && isNextItem {
|
|
||||||
ql.changeNextQueue()
|
|
||||||
}
|
|
||||||
allow = hasGlobalQuota && hasQueueQuota && isNextItem
|
|
||||||
}
|
|
||||||
|
|
||||||
applyQuota(q, queue.l)
|
|
||||||
applyQuota(q, ql.l)
|
|
||||||
queue.pop()
|
|
||||||
ql.resetNextQueue()
|
|
||||||
return func() { ql.release(p, q) }, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ql *QuotaLimiter) release(p Priority, q Quota) {
|
|
||||||
queue, ok := ql.classes[p.Class]
|
|
||||||
if !ok {
|
|
||||||
panic("unknown class " + p.Class)
|
|
||||||
}
|
|
||||||
|
|
||||||
ql.cond.L.Lock()
|
|
||||||
defer ql.cond.L.Unlock()
|
|
||||||
|
|
||||||
releaseQuota(q, queue.l)
|
|
||||||
releaseQuota(q, ql.l)
|
|
||||||
|
|
||||||
ql.cond.Broadcast()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ql *QuotaLimiter) resetNextQueue() {
|
|
||||||
var nonEmptyQueues []*queue
|
|
||||||
var totalWeight int64
|
|
||||||
for _, q := range ql.queues {
|
|
||||||
if q.count() > 0 {
|
|
||||||
nonEmptyQueues = append(nonEmptyQueues, q)
|
|
||||||
totalWeight += int64(q.weight())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(nonEmptyQueues) == 0 {
|
|
||||||
ql.nextQueue = nil
|
|
||||||
return
|
|
||||||
}
|
|
||||||
ql.selectNextQueue(nonEmptyQueues, totalWeight)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ql *QuotaLimiter) changeNextQueue() {
|
|
||||||
var nonEmptyQueues []*queue
|
|
||||||
var totalWeight int64
|
|
||||||
for _, q := range ql.queues {
|
|
||||||
if q.count() > 0 && q != ql.nextQueue {
|
|
||||||
nonEmptyQueues = append(nonEmptyQueues, q)
|
|
||||||
totalWeight += int64(q.weight())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(nonEmptyQueues) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
ql.selectNextQueue(nonEmptyQueues, totalWeight)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (ql *QuotaLimiter) selectNextQueue(nonEmptyQueues []*queue, totalWeight int64) {
|
|
||||||
if totalWeight == 0 {
|
|
||||||
ql.nextQueue = nonEmptyQueues[rand.IntN(len(nonEmptyQueues))]
|
|
||||||
return
|
|
||||||
}
|
|
||||||
weight := rand.Int64N(totalWeight)
|
|
||||||
var low, up int64
|
|
||||||
for _, q := range nonEmptyQueues {
|
|
||||||
low = up
|
|
||||||
up += int64(q.weight())
|
|
||||||
if weight >= low && weight < up {
|
|
||||||
ql.nextQueue = q
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
panic("undefined next queue")
|
|
||||||
}
|
|
||||||
|
|
||||||
func hasQuota(q Quota, l *limit) bool {
|
|
||||||
if q.Read > 0 && l.maxRead > 0 && q.Read+l.read > l.maxRead {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if q.Write > 0 && l.maxWrite > 0 && q.Write+l.write > l.write {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func applyQuota(q Quota, l *limit) {
|
|
||||||
if q.Read > 0 && l.maxRead > 0 {
|
|
||||||
l.read += q.Read
|
|
||||||
}
|
|
||||||
if q.Write > 0 && l.maxWrite > 0 {
|
|
||||||
l.write += q.Write
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func releaseQuota(q Quota, l *limit) {
|
|
||||||
if q.Read > 0 && l.maxRead > 0 {
|
|
||||||
l.read -= q.Read
|
|
||||||
if l.read < 0 {
|
|
||||||
panic("invalid read limit after release")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if q.Write > 0 && l.maxWrite > 0 {
|
|
||||||
l.write -= q.Write
|
|
||||||
if l.write < 0 {
|
|
||||||
panic("invalid write limit after release")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *queue) push(qi *queueItem) {
|
|
||||||
heap.Push(q, qi)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *queue) pop() {
|
|
||||||
heap.Pop(q)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *queue) drop(qi *queueItem) {
|
|
||||||
heap.Remove(q, qi.index)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *queue) top() *queueItem {
|
|
||||||
if len(q.items) > 0 {
|
|
||||||
return q.items[0]
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *queue) count() int {
|
|
||||||
return len(q.items)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *queue) weight() int32 {
|
|
||||||
return q.w
|
|
||||||
}
|
|
||||||
|
|
||||||
// Len implements heap.Interface.
|
|
||||||
func (q *queue) Len() int {
|
|
||||||
return q.count()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Less implements heap.Interface.
|
|
||||||
func (q *queue) Less(i int, j int) bool {
|
|
||||||
if q.items[i].priority == q.items[j].priority {
|
|
||||||
return q.items[i].ts.Before(q.items[j].ts)
|
|
||||||
}
|
|
||||||
return q.items[i].priority > q.items[j].priority
|
|
||||||
}
|
|
||||||
|
|
||||||
// Pop implements heap.Interface.
|
|
||||||
func (q *queue) Pop() any {
|
|
||||||
n := len(q.items)
|
|
||||||
item := q.items[n-1]
|
|
||||||
q.items[n-1] = nil
|
|
||||||
q.items = q.items[0 : n-1]
|
|
||||||
item.index = -1
|
|
||||||
return item
|
|
||||||
}
|
|
||||||
|
|
||||||
// Push implements heap.Interface.
|
|
||||||
func (q *queue) Push(x any) {
|
|
||||||
it := x.(*queueItem)
|
|
||||||
it.index = q.Len()
|
|
||||||
q.items = append(q.items, it)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Swap implements heap.Interface.
|
|
||||||
func (q *queue) Swap(i int, j int) {
|
|
||||||
q.items[i], q.items[j] = q.items[j], q.items[i]
|
|
||||||
q.items[i].index = i
|
|
||||||
q.items[j].index = j
|
|
||||||
}
|
|
|
@ -1,102 +0,0 @@
|
||||||
package quota
|
|
||||||
|
|
||||||
import (
|
|
||||||
"math/rand/v2"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestQueue(t *testing.T) {
|
|
||||||
t.Run("different priority", func(t *testing.T) {
|
|
||||||
q := &queue{}
|
|
||||||
ts := time.Now()
|
|
||||||
const count = 12345
|
|
||||||
for i := range count {
|
|
||||||
priority := i % 256
|
|
||||||
q.push(&queueItem{
|
|
||||||
priority: byte(priority),
|
|
||||||
ts: ts,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
testQueueInvariant(t, q)
|
|
||||||
})
|
|
||||||
t.Run("same priority, different ts, inc", func(t *testing.T) {
|
|
||||||
q := &queue{}
|
|
||||||
ts := time.Now()
|
|
||||||
const count = 10000
|
|
||||||
for range count {
|
|
||||||
ts = ts.Add(time.Second)
|
|
||||||
q.push(&queueItem{
|
|
||||||
priority: 100,
|
|
||||||
ts: ts,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
testQueueInvariant(t, q)
|
|
||||||
})
|
|
||||||
t.Run("same priority, different ts, dec", func(t *testing.T) {
|
|
||||||
q := &queue{}
|
|
||||||
ts := time.Now()
|
|
||||||
const count = 10000
|
|
||||||
for range count {
|
|
||||||
ts = ts.Add(-1 * time.Second)
|
|
||||||
q.push(&queueItem{
|
|
||||||
priority: 100,
|
|
||||||
ts: ts,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
testQueueInvariant(t, q)
|
|
||||||
})
|
|
||||||
t.Run("drop, inc", func(t *testing.T) {
|
|
||||||
q := &queue{}
|
|
||||||
ts := time.Now()
|
|
||||||
const count = 12345
|
|
||||||
for i := range count {
|
|
||||||
ts = ts.Add(time.Second)
|
|
||||||
q.push(&queueItem{
|
|
||||||
priority: byte(i % 256),
|
|
||||||
ts: ts,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
for q.Len() > 0 {
|
|
||||||
idx := rand.IntN(q.count())
|
|
||||||
it := q.items[idx]
|
|
||||||
q.drop(it)
|
|
||||||
testQueueInvariant(t, q)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
t.Run("drop, dec", func(t *testing.T) {
|
|
||||||
q := &queue{}
|
|
||||||
ts := time.Now()
|
|
||||||
const count = 12345
|
|
||||||
for i := range count {
|
|
||||||
ts = ts.Add(-1 * time.Second)
|
|
||||||
q.push(&queueItem{
|
|
||||||
priority: byte(i % 256),
|
|
||||||
ts: ts,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
for q.Len() > 0 {
|
|
||||||
idx := rand.IntN(q.count())
|
|
||||||
it := q.items[idx]
|
|
||||||
q.drop(it)
|
|
||||||
testQueueInvariant(t, q)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func testQueueInvariant(t *testing.T, q *queue) {
|
|
||||||
var previous *queueItem
|
|
||||||
for q.count() > 0 {
|
|
||||||
current := q.top()
|
|
||||||
if previous != nil {
|
|
||||||
require.True(t, previous.priority > current.priority ||
|
|
||||||
(previous.priority == current.priority &&
|
|
||||||
(previous.ts == current.ts || previous.ts.Before(current.ts))))
|
|
||||||
}
|
|
||||||
previous = current
|
|
||||||
q.pop()
|
|
||||||
}
|
|
||||||
require.Equal(t, 0, q.count())
|
|
||||||
}
|
|
|
@ -164,7 +164,7 @@ func testEngineFailInitAndReload(t *testing.T, degradedMode bool, opts []shard.O
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestExecBlocks(t *testing.T) {
|
func TestExecBlocks(t *testing.T) {
|
||||||
e := testNewEngine(t).setShardsNum(t, 2).prepare(t).engine // number doesn't matter in this test, 2 is several but not many
|
e := testNewEngine(t).setShardsNum(t, 2).engine // number doesn't matter in this test, 2 is several but not many
|
||||||
|
|
||||||
// put some object
|
// put some object
|
||||||
obj := testutil.GenerateObjectWithCID(cidtest.ID())
|
obj := testutil.GenerateObjectWithCID(cidtest.ID())
|
||||||
|
@ -302,8 +302,7 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str
|
||||||
meta.WithEpochState(epochState{}),
|
meta.WithEpochState(epochState{}),
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
}).
|
})
|
||||||
prepare(t)
|
|
||||||
e, ids := te.engine, te.shardIDs
|
e, ids := te.engine, te.shardIDs
|
||||||
|
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
|
@ -313,5 +312,8 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str
|
||||||
require.Equal(t, num, len(e.shards))
|
require.Equal(t, num, len(e.shards))
|
||||||
require.Equal(t, num, len(e.shardPools))
|
require.Equal(t, num, len(e.shardPools))
|
||||||
|
|
||||||
|
require.NoError(t, e.Open(context.Background()))
|
||||||
|
require.NoError(t, e.Init(context.Background()))
|
||||||
|
|
||||||
return e, currShards
|
return e, currShards
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
@ -48,8 +49,13 @@ func TestDeleteBigObject(t *testing.T) {
|
||||||
link.SetSplitID(splitID)
|
link.SetSplitID(splitID)
|
||||||
link.SetChildren(childIDs...)
|
link.SetChildren(childIDs...)
|
||||||
|
|
||||||
e := testNewEngine(t).setShardsNum(t, 3).prepare(t).engine
|
s1 := testNewShard(t)
|
||||||
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
s2 := testNewShard(t)
|
||||||
|
s3 := testNewShard(t)
|
||||||
|
|
||||||
|
e := testNewEngine(t).setInitializedShards(t, s1, s2, s3).engine
|
||||||
|
e.log = test.NewLogger(t)
|
||||||
|
defer e.Close(context.Background())
|
||||||
|
|
||||||
for i := range children {
|
for i := range children {
|
||||||
require.NoError(t, Put(context.Background(), e, children[i], false))
|
require.NoError(t, Put(context.Background(), e, children[i], false))
|
||||||
|
@ -113,13 +119,11 @@ func TestDeleteBigObjectWithoutGC(t *testing.T) {
|
||||||
link.SetSplitID(splitID)
|
link.SetSplitID(splitID)
|
||||||
link.SetChildren(childIDs...)
|
link.SetChildren(childIDs...)
|
||||||
|
|
||||||
te := testNewEngine(t).setShardsNumAdditionalOpts(t, 1, func(_ int) []shard.Option {
|
s1 := testNewShard(t, shard.WithDisabledGC())
|
||||||
return []shard.Option{shard.WithDisabledGC()}
|
|
||||||
}).prepare(t)
|
|
||||||
e := te.engine
|
|
||||||
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
|
||||||
|
|
||||||
s1 := te.shards[0]
|
e := testNewEngine(t).setInitializedShards(t, s1).engine
|
||||||
|
e.log = test.NewLogger(t)
|
||||||
|
defer e.Close(context.Background())
|
||||||
|
|
||||||
for i := range children {
|
for i := range children {
|
||||||
require.NoError(t, Put(context.Background(), e, children[i], false))
|
require.NoError(t, Put(context.Background(), e, children[i], false))
|
||||||
|
|
|
@ -3,17 +3,24 @@ package engine
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||||
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
"git.frostfs.info/TrueCloudLab/hrw"
|
||||||
|
"github.com/panjf2000/ants/v2"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -23,79 +30,113 @@ func (s epochState) CurrentEpoch() uint64 {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func BenchmarkExists(b *testing.B) {
|
||||||
|
b.Run("2 shards", func(b *testing.B) {
|
||||||
|
benchmarkExists(b, 2)
|
||||||
|
})
|
||||||
|
b.Run("4 shards", func(b *testing.B) {
|
||||||
|
benchmarkExists(b, 4)
|
||||||
|
})
|
||||||
|
b.Run("8 shards", func(b *testing.B) {
|
||||||
|
benchmarkExists(b, 8)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func benchmarkExists(b *testing.B, shardNum int) {
|
||||||
|
shards := make([]*shard.Shard, shardNum)
|
||||||
|
for i := range shardNum {
|
||||||
|
shards[i] = testNewShard(b)
|
||||||
|
}
|
||||||
|
|
||||||
|
e := testNewEngine(b).setInitializedShards(b, shards...).engine
|
||||||
|
defer func() { require.NoError(b, e.Close(context.Background())) }()
|
||||||
|
|
||||||
|
addr := oidtest.Address()
|
||||||
|
for range 100 {
|
||||||
|
obj := testutil.GenerateObjectWithCID(cidtest.ID())
|
||||||
|
err := Put(context.Background(), e, obj, false)
|
||||||
|
if err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.ResetTimer()
|
||||||
|
for range b.N {
|
||||||
|
var shPrm shard.ExistsPrm
|
||||||
|
shPrm.Address = addr
|
||||||
|
shPrm.ParentAddress = oid.Address{}
|
||||||
|
ok, _, err := e.exists(context.Background(), shPrm)
|
||||||
|
if err != nil || ok {
|
||||||
|
b.Fatalf("%t %v", ok, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type testEngineWrapper struct {
|
type testEngineWrapper struct {
|
||||||
engine *StorageEngine
|
engine *StorageEngine
|
||||||
shards []*shard.Shard
|
|
||||||
shardIDs []*shard.ID
|
shardIDs []*shard.ID
|
||||||
}
|
}
|
||||||
|
|
||||||
func testNewEngine(t testing.TB, opts ...Option) *testEngineWrapper {
|
func testNewEngine(t testing.TB, opts ...Option) *testEngineWrapper {
|
||||||
opts = append(testGetDefaultEngineOptions(t), opts...)
|
engine := New(WithLogger(test.NewLogger(t)))
|
||||||
return &testEngineWrapper{engine: New(opts...)}
|
for _, opt := range opts {
|
||||||
|
opt(engine.cfg)
|
||||||
|
}
|
||||||
|
return &testEngineWrapper{
|
||||||
|
engine: engine,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (te *testEngineWrapper) setInitializedShards(t testing.TB, shards ...*shard.Shard) *testEngineWrapper {
|
||||||
|
for _, s := range shards {
|
||||||
|
pool, err := ants.NewPool(10, ants.WithNonblocking(true))
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
te.engine.shards[s.ID().String()] = hashedShard{
|
||||||
|
shardWrapper: shardWrapper{
|
||||||
|
errorCount: new(atomic.Uint32),
|
||||||
|
Shard: s,
|
||||||
|
},
|
||||||
|
hash: hrw.StringHash(s.ID().String()),
|
||||||
|
}
|
||||||
|
te.engine.shardPools[s.ID().String()] = pool
|
||||||
|
te.shardIDs = append(te.shardIDs, s.ID())
|
||||||
|
}
|
||||||
|
return te
|
||||||
}
|
}
|
||||||
|
|
||||||
func (te *testEngineWrapper) setShardsNum(t testing.TB, num int) *testEngineWrapper {
|
func (te *testEngineWrapper) setShardsNum(t testing.TB, num int) *testEngineWrapper {
|
||||||
return te.setShardsNumOpts(t, num, func(_ int) []shard.Option {
|
shards := make([]*shard.Shard, 0, num)
|
||||||
return testGetDefaultShardOptions(t)
|
|
||||||
})
|
for range num {
|
||||||
|
shards = append(shards, testNewShard(t))
|
||||||
|
}
|
||||||
|
|
||||||
|
return te.setInitializedShards(t, shards...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (te *testEngineWrapper) setShardsNumOpts(
|
func (te *testEngineWrapper) setShardsNumOpts(t testing.TB, num int, shardOpts func(id int) []shard.Option) *testEngineWrapper {
|
||||||
t testing.TB, num int, shardOpts func(id int) []shard.Option,
|
|
||||||
) *testEngineWrapper {
|
|
||||||
te.shards = make([]*shard.Shard, num)
|
|
||||||
te.shardIDs = make([]*shard.ID, num)
|
|
||||||
for i := range num {
|
for i := range num {
|
||||||
shard, err := te.engine.createShard(context.Background(), shardOpts(i))
|
opts := shardOpts(i)
|
||||||
|
id, err := te.engine.AddShard(context.Background(), opts...)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NoError(t, te.engine.addShard(shard))
|
te.shardIDs = append(te.shardIDs, id)
|
||||||
te.shards[i] = shard
|
|
||||||
te.shardIDs[i] = shard.ID()
|
|
||||||
}
|
}
|
||||||
require.Len(t, te.engine.shards, num)
|
|
||||||
require.Len(t, te.engine.shardPools, num)
|
|
||||||
return te
|
return te
|
||||||
}
|
}
|
||||||
|
|
||||||
func (te *testEngineWrapper) setShardsNumAdditionalOpts(
|
func (te *testEngineWrapper) setShardsNumAdditionalOpts(t testing.TB, num int, shardOpts func(id int) []shard.Option) *testEngineWrapper {
|
||||||
t testing.TB, num int, shardOpts func(id int) []shard.Option,
|
for i := range num {
|
||||||
) *testEngineWrapper {
|
defaultOpts := testDefaultShardOptions(t)
|
||||||
return te.setShardsNumOpts(t, num, func(id int) []shard.Option {
|
opts := append(defaultOpts, shardOpts(i)...)
|
||||||
return append(testGetDefaultShardOptions(t), shardOpts(id)...)
|
id, err := te.engine.AddShard(context.Background(), opts...)
|
||||||
})
|
require.NoError(t, err)
|
||||||
}
|
te.shardIDs = append(te.shardIDs, id)
|
||||||
|
}
|
||||||
// prepare calls Open and Init on the created engine.
|
|
||||||
func (te *testEngineWrapper) prepare(t testing.TB) *testEngineWrapper {
|
|
||||||
require.NoError(t, te.engine.Open(context.Background()))
|
|
||||||
require.NoError(t, te.engine.Init(context.Background()))
|
|
||||||
return te
|
return te
|
||||||
}
|
}
|
||||||
|
|
||||||
func testGetDefaultEngineOptions(t testing.TB) []Option {
|
|
||||||
return []Option{
|
|
||||||
WithLogger(test.NewLogger(t)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func testGetDefaultShardOptions(t testing.TB) []shard.Option {
|
|
||||||
return []shard.Option{
|
|
||||||
shard.WithLogger(test.NewLogger(t)),
|
|
||||||
shard.WithBlobStorOptions(
|
|
||||||
blobstor.WithStorages(
|
|
||||||
newStorages(t, t.TempDir(), 1<<20)),
|
|
||||||
blobstor.WithLogger(test.NewLogger(t)),
|
|
||||||
),
|
|
||||||
shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(t.TempDir(), "pilorama"))),
|
|
||||||
shard.WithMetaBaseOptions(
|
|
||||||
meta.WithPath(filepath.Join(t.TempDir(), "metabase")),
|
|
||||||
meta.WithPermissions(0o700),
|
|
||||||
meta.WithEpochState(epochState{}),
|
|
||||||
meta.WithLogger(test.NewLogger(t)),
|
|
||||||
),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func newStorages(t testing.TB, root string, smallSize uint64) []blobstor.SubStorage {
|
func newStorages(t testing.TB, root string, smallSize uint64) []blobstor.SubStorage {
|
||||||
return []blobstor.SubStorage{
|
return []blobstor.SubStorage{
|
||||||
{
|
{
|
||||||
|
@ -145,3 +186,34 @@ func newTestStorages(root string, smallSize uint64) ([]blobstor.SubStorage, *tes
|
||||||
},
|
},
|
||||||
}, smallFileStorage, largeFileStorage
|
}, smallFileStorage, largeFileStorage
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func testNewShard(t testing.TB, opts ...shard.Option) *shard.Shard {
|
||||||
|
sid, err := generateShardID()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
shardOpts := append([]shard.Option{shard.WithID(sid)}, testDefaultShardOptions(t)...)
|
||||||
|
s := shard.New(append(shardOpts, opts...)...)
|
||||||
|
|
||||||
|
require.NoError(t, s.Open(context.Background()))
|
||||||
|
require.NoError(t, s.Init(context.Background()))
|
||||||
|
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
|
||||||
|
func testDefaultShardOptions(t testing.TB) []shard.Option {
|
||||||
|
return []shard.Option{
|
||||||
|
shard.WithLogger(test.NewLogger(t)),
|
||||||
|
shard.WithBlobStorOptions(
|
||||||
|
blobstor.WithStorages(
|
||||||
|
newStorages(t, t.TempDir(), 1<<20)),
|
||||||
|
blobstor.WithLogger(test.NewLogger(t)),
|
||||||
|
),
|
||||||
|
shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(t.TempDir(), "pilorama"))),
|
||||||
|
shard.WithMetaBaseOptions(
|
||||||
|
meta.WithPath(filepath.Join(t.TempDir(), "metabase")),
|
||||||
|
meta.WithPermissions(0o700),
|
||||||
|
meta.WithEpochState(epochState{}),
|
||||||
|
meta.WithLogger(test.NewLogger(t)),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -67,8 +67,10 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
|
||||||
pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", id))),
|
pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", id))),
|
||||||
pilorama.WithPerm(0o700)),
|
pilorama.WithPerm(0o700)),
|
||||||
}
|
}
|
||||||
}).prepare(t)
|
})
|
||||||
e := te.engine
|
e := te.engine
|
||||||
|
require.NoError(t, e.Open(context.Background()))
|
||||||
|
require.NoError(t, e.Init(context.Background()))
|
||||||
|
|
||||||
for i, id := range te.shardIDs {
|
for i, id := range te.shardIDs {
|
||||||
testShards[i].id = id
|
testShards[i].id = id
|
||||||
|
|
|
@ -75,9 +75,10 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
|
||||||
pilorama.WithPerm(0o700),
|
pilorama.WithPerm(0o700),
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
}).
|
})
|
||||||
prepare(t)
|
|
||||||
e, ids := te.engine, te.shardIDs
|
e, ids := te.engine, te.shardIDs
|
||||||
|
require.NoError(t, e.Open(context.Background()))
|
||||||
|
require.NoError(t, e.Init(context.Background()))
|
||||||
|
|
||||||
objects := make([]*objectSDK.Object, 0, objPerShard*len(ids))
|
objects := make([]*objectSDK.Object, 0, objPerShard*len(ids))
|
||||||
treeID := "version"
|
treeID := "version"
|
||||||
|
|
|
@ -1,51 +0,0 @@
|
||||||
package engine
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
|
||||||
"github.com/stretchr/testify/require"
|
|
||||||
)
|
|
||||||
|
|
||||||
func BenchmarkExists(b *testing.B) {
|
|
||||||
b.Run("2 shards", func(b *testing.B) {
|
|
||||||
benchmarkExists(b, 2)
|
|
||||||
})
|
|
||||||
b.Run("4 shards", func(b *testing.B) {
|
|
||||||
benchmarkExists(b, 4)
|
|
||||||
})
|
|
||||||
b.Run("8 shards", func(b *testing.B) {
|
|
||||||
benchmarkExists(b, 8)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func benchmarkExists(b *testing.B, shardNum int) {
|
|
||||||
e := testNewEngine(b).setShardsNum(b, shardNum).prepare(b).engine
|
|
||||||
defer func() { require.NoError(b, e.Close(context.Background())) }()
|
|
||||||
|
|
||||||
addr := oidtest.Address()
|
|
||||||
for range 100 {
|
|
||||||
obj := testutil.GenerateObjectWithCID(cidtest.ID())
|
|
||||||
err := Put(context.Background(), e, obj, false)
|
|
||||||
if err != nil {
|
|
||||||
b.Fatal(err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
b.ReportAllocs()
|
|
||||||
b.ResetTimer()
|
|
||||||
for range b.N {
|
|
||||||
var shPrm shard.ExistsPrm
|
|
||||||
shPrm.Address = addr
|
|
||||||
shPrm.ParentAddress = oid.Address{}
|
|
||||||
ok, _, err := e.exists(context.Background(), shPrm)
|
|
||||||
if err != nil || ok {
|
|
||||||
b.Fatalf("%t %v", ok, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -39,11 +39,11 @@ func TestHeadRaw(t *testing.T) {
|
||||||
link.SetSplitID(splitID)
|
link.SetSplitID(splitID)
|
||||||
|
|
||||||
t.Run("virtual object split in different shards", func(t *testing.T) {
|
t.Run("virtual object split in different shards", func(t *testing.T) {
|
||||||
te := testNewEngine(t).setShardsNum(t, 2).prepare(t)
|
s1 := testNewShard(t)
|
||||||
e := te.engine
|
s2 := testNewShard(t)
|
||||||
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
|
||||||
|
|
||||||
s1, s2 := te.shards[0], te.shards[1]
|
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
|
||||||
|
defer e.Close(context.Background())
|
||||||
|
|
||||||
var putPrmLeft shard.PutPrm
|
var putPrmLeft shard.PutPrm
|
||||||
putPrmLeft.SetObject(child)
|
putPrmLeft.SetObject(child)
|
||||||
|
|
|
@ -37,8 +37,8 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
||||||
|
|
||||||
t.Run("delete small object", func(t *testing.T) {
|
t.Run("delete small object", func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
e := testNewEngine(t).setShardsNum(t, 1).prepare(t).engine
|
e := testNewEngine(t).setShardsNum(t, 1).engine
|
||||||
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
defer e.Close(context.Background())
|
||||||
|
|
||||||
err := Put(context.Background(), e, parent, false)
|
err := Put(context.Background(), e, parent, false)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -56,12 +56,11 @@ func TestStorageEngine_Inhume(t *testing.T) {
|
||||||
|
|
||||||
t.Run("delete big object", func(t *testing.T) {
|
t.Run("delete big object", func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
s1 := testNewShard(t)
|
||||||
|
s2 := testNewShard(t)
|
||||||
|
|
||||||
te := testNewEngine(t).setShardsNum(t, 2).prepare(t)
|
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
|
||||||
e := te.engine
|
defer e.Close(context.Background())
|
||||||
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
|
||||||
|
|
||||||
s1, s2 := te.shards[0], te.shards[1]
|
|
||||||
|
|
||||||
var putChild shard.PutPrm
|
var putChild shard.PutPrm
|
||||||
putChild.SetObject(child)
|
putChild.SetObject(child)
|
||||||
|
|
|
@ -68,7 +68,10 @@ func TestListWithCursor(t *testing.T) {
|
||||||
meta.WithEpochState(epochState{}),
|
meta.WithEpochState(epochState{}),
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
}).prepare(t).engine
|
}).engine
|
||||||
|
require.NoError(t, e.Open(context.Background()))
|
||||||
|
require.NoError(t, e.Init(context.Background()))
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
require.NoError(t, e.Close(context.Background()))
|
require.NoError(t, e.Close(context.Background()))
|
||||||
}()
|
}()
|
||||||
|
|
|
@ -57,9 +57,11 @@ func TestLockUserScenario(t *testing.T) {
|
||||||
}),
|
}),
|
||||||
shard.WithTombstoneSource(tss{lockerExpiresAfter}),
|
shard.WithTombstoneSource(tss{lockerExpiresAfter}),
|
||||||
}
|
}
|
||||||
}).
|
})
|
||||||
prepare(t)
|
|
||||||
e := testEngine.engine
|
e := testEngine.engine
|
||||||
|
require.NoError(t, e.Open(context.Background()))
|
||||||
|
require.NoError(t, e.Init(context.Background()))
|
||||||
|
|
||||||
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
||||||
|
|
||||||
lockerID := oidtest.ID()
|
lockerID := oidtest.ID()
|
||||||
|
@ -160,9 +162,11 @@ func TestLockExpiration(t *testing.T) {
|
||||||
return pool
|
return pool
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
}).
|
})
|
||||||
prepare(t)
|
|
||||||
e := testEngine.engine
|
e := testEngine.engine
|
||||||
|
require.NoError(t, e.Open(context.Background()))
|
||||||
|
require.NoError(t, e.Init(context.Background()))
|
||||||
|
|
||||||
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
||||||
|
|
||||||
const lockerExpiresAfter = 13
|
const lockerExpiresAfter = 13
|
||||||
|
@ -239,8 +243,9 @@ func TestLockForceRemoval(t *testing.T) {
|
||||||
}),
|
}),
|
||||||
shard.WithDeletedLockCallback(e.processDeletedLocks),
|
shard.WithDeletedLockCallback(e.processDeletedLocks),
|
||||||
}
|
}
|
||||||
}).
|
}).engine
|
||||||
prepare(t).engine
|
require.NoError(t, e.Open(context.Background()))
|
||||||
|
require.NoError(t, e.Init(context.Background()))
|
||||||
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
||||||
|
|
||||||
cnr := cidtest.ID()
|
cnr := cidtest.ID()
|
||||||
|
|
|
@ -13,7 +13,7 @@ import (
|
||||||
func TestRemoveShard(t *testing.T) {
|
func TestRemoveShard(t *testing.T) {
|
||||||
const numOfShards = 6
|
const numOfShards = 6
|
||||||
|
|
||||||
te := testNewEngine(t).setShardsNum(t, numOfShards).prepare(t)
|
te := testNewEngine(t).setShardsNum(t, numOfShards)
|
||||||
e, ids := te.engine, te.shardIDs
|
e, ids := te.engine, te.shardIDs
|
||||||
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
||||||
|
|
||||||
|
@ -51,7 +51,7 @@ func TestDisableShards(t *testing.T) {
|
||||||
|
|
||||||
const numOfShards = 2
|
const numOfShards = 2
|
||||||
|
|
||||||
te := testNewEngine(t).setShardsNum(t, numOfShards).prepare(t)
|
te := testNewEngine(t).setShardsNum(t, numOfShards)
|
||||||
e, ids := te.engine, te.shardIDs
|
e, ids := te.engine, te.shardIDs
|
||||||
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
defer func() { require.NoError(t, e.Close(context.Background())) }()
|
||||||
|
|
||||||
|
|
|
@ -130,9 +130,17 @@ func TestDeleteECObject_WithoutSplit(t *testing.T) {
|
||||||
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
|
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
|
||||||
require.Equal(t, 2, len(tombstonedObjects))
|
require.Equal(t, 2, len(tombstonedObjects))
|
||||||
|
|
||||||
_, err = db.InhumeTombstones(context.Background(), tombstonedObjects)
|
var tombstones []oid.Address
|
||||||
|
for _, tss := range tombstonedObjects {
|
||||||
|
tombstones = append(tombstones, tss.tomb)
|
||||||
|
}
|
||||||
|
inhumePrm.SetAddresses(tombstones...)
|
||||||
|
inhumePrm.SetGCMark()
|
||||||
|
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, db.DropGraves(context.Background(), tombstonedObjects))
|
||||||
|
|
||||||
// GC finds tombstone as garbage and deletes it
|
// GC finds tombstone as garbage and deletes it
|
||||||
|
|
||||||
garbageAddresses = nil
|
garbageAddresses = nil
|
||||||
|
@ -366,9 +374,17 @@ func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool
|
||||||
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
|
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
|
||||||
require.True(t, len(tombstonedObjects) == parentCount+chunksCount)
|
require.True(t, len(tombstonedObjects) == parentCount+chunksCount)
|
||||||
|
|
||||||
_, err = db.InhumeTombstones(context.Background(), tombstonedObjects)
|
var tombstones []oid.Address
|
||||||
|
for _, tss := range tombstonedObjects {
|
||||||
|
tombstones = append(tombstones, tss.tomb)
|
||||||
|
}
|
||||||
|
inhumePrm.SetAddresses(tombstones...)
|
||||||
|
inhumePrm.SetGCMark()
|
||||||
|
_, err = db.Inhume(context.Background(), inhumePrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.NoError(t, db.DropGraves(context.Background(), tombstonedObjects))
|
||||||
|
|
||||||
// GC finds tombstone as garbage and deletes it
|
// GC finds tombstone as garbage and deletes it
|
||||||
|
|
||||||
garbageAddresses = nil
|
garbageAddresses = nil
|
||||||
|
|
|
@ -9,7 +9,6 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
@ -256,58 +255,46 @@ func graveFromKV(k, v []byte) (res TombstonedObject, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// InhumeTombstones deletes tombstoned objects from the
|
// DropGraves deletes tombstoned objects from the
|
||||||
// graveyard bucket.
|
// graveyard bucket.
|
||||||
//
|
//
|
||||||
// Returns any error appeared during deletion process.
|
// Returns any error appeared during deletion process.
|
||||||
func (db *DB) InhumeTombstones(ctx context.Context, tss []TombstonedObject) (InhumeRes, error) {
|
func (db *DB) DropGraves(ctx context.Context, tss []TombstonedObject) error {
|
||||||
var (
|
var (
|
||||||
startedAt = time.Now()
|
startedAt = time.Now()
|
||||||
success = false
|
success = false
|
||||||
)
|
)
|
||||||
defer func() {
|
defer func() {
|
||||||
db.metrics.AddMethodDuration("InhumeTombstones", time.Since(startedAt), success)
|
db.metrics.AddMethodDuration("DropGraves", time.Since(startedAt), success)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.InhumeTombstones")
|
_, span := tracing.StartSpanFromContext(ctx, "metabase.DropGraves")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
db.modeMtx.RLock()
|
db.modeMtx.RLock()
|
||||||
defer db.modeMtx.RUnlock()
|
defer db.modeMtx.RUnlock()
|
||||||
|
|
||||||
if db.mode.NoMetabase() {
|
if db.mode.NoMetabase() {
|
||||||
return InhumeRes{}, ErrDegradedMode
|
return ErrDegradedMode
|
||||||
} else if db.mode.ReadOnly() {
|
} else if db.mode.ReadOnly() {
|
||||||
return InhumeRes{}, ErrReadOnlyMode
|
return ErrReadOnlyMode
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := make([]byte, addressKeySize)
|
buf := make([]byte, addressKeySize)
|
||||||
prm := InhumePrm{forceRemoval: true}
|
|
||||||
currEpoch := db.epochState.CurrentEpoch()
|
|
||||||
|
|
||||||
var res InhumeRes
|
return db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||||
|
bkt := tx.Bucket(graveyardBucketName)
|
||||||
err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
if bkt == nil {
|
||||||
res = InhumeRes{inhumedByCnrID: make(map[cid.ID]ObjectCounters)}
|
return nil
|
||||||
|
|
||||||
garbageBKT := tx.Bucket(garbageBucketName)
|
|
||||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
|
||||||
|
|
||||||
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range tss {
|
for _, ts := range tss {
|
||||||
if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, tss[i].Tombstone(), buf, currEpoch, prm, &res); err != nil {
|
err := bkt.Delete(addressKey(ts.Address(), buf))
|
||||||
return err
|
if err != nil {
|
||||||
}
|
|
||||||
if err := graveyardBKT.Delete(addressKey(tss[i].Address(), buf)); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
return res, err
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,9 +7,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -395,7 +393,7 @@ func TestDB_IterateOverGarbage_Offset(t *testing.T) {
|
||||||
require.False(t, iWasCalled)
|
require.False(t, iWasCalled)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDB_InhumeTombstones(t *testing.T) {
|
func TestDB_DropGraves(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
defer func() { require.NoError(t, db.Close(context.Background())) }()
|
||||||
|
|
||||||
|
@ -412,20 +410,9 @@ func TestDB_InhumeTombstones(t *testing.T) {
|
||||||
err = putBig(db, obj2)
|
err = putBig(db, obj2)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
id1, _ := obj1.ID()
|
// inhume with tombstone
|
||||||
id2, _ := obj2.ID()
|
addrTombstone := oidtest.Address()
|
||||||
ts := objectSDK.NewTombstone()
|
addrTombstone.SetContainer(cnr)
|
||||||
ts.SetMembers([]oid.ID{id1, id2})
|
|
||||||
objTs := objectSDK.New()
|
|
||||||
objTs.SetContainerID(cnr)
|
|
||||||
objTs.SetType(objectSDK.TypeTombstone)
|
|
||||||
|
|
||||||
data, _ := ts.Marshal()
|
|
||||||
objTs.SetPayload(data)
|
|
||||||
require.NoError(t, objectSDK.CalculateAndSetID(objTs))
|
|
||||||
require.NoError(t, putBig(db, objTs))
|
|
||||||
|
|
||||||
addrTombstone := object.AddressOf(objTs)
|
|
||||||
|
|
||||||
var inhumePrm meta.InhumePrm
|
var inhumePrm meta.InhumePrm
|
||||||
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
|
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
|
||||||
|
@ -448,11 +435,8 @@ func TestDB_InhumeTombstones(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Equal(t, 2, counter)
|
require.Equal(t, 2, counter)
|
||||||
|
|
||||||
res, err := db.InhumeTombstones(context.Background(), buriedTS)
|
err = db.DropGraves(context.Background(), buriedTS)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.EqualValues(t, 1, res.LogicInhumed())
|
|
||||||
require.EqualValues(t, 0, res.UserInhumed())
|
|
||||||
require.EqualValues(t, map[cid.ID]meta.ObjectCounters{cnr: {Logic: 1}}, res.InhumedByCnrID())
|
|
||||||
|
|
||||||
counter = 0
|
counter = 0
|
||||||
iterGravePRM.SetHandler(func(_ meta.TombstonedObject) error {
|
iterGravePRM.SetHandler(func(_ meta.TombstonedObject) error {
|
||||||
|
|
|
@ -217,93 +217,85 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
|
||||||
garbageBKT := tx.Bucket(garbageBucketName)
|
garbageBKT := tx.Bucket(garbageBucketName)
|
||||||
graveyardBKT := tx.Bucket(graveyardBucketName)
|
graveyardBKT := tx.Bucket(graveyardBucketName)
|
||||||
|
|
||||||
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
|
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, &prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
buf := make([]byte, addressKeySize)
|
buf := make([]byte, addressKeySize)
|
||||||
for i := range prm.target {
|
for i := range prm.target {
|
||||||
if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, prm.target[i], buf, epoch, prm, res); err != nil {
|
id := prm.target[i].Object()
|
||||||
|
cnr := prm.target[i].Container()
|
||||||
|
|
||||||
|
// prevent locked objects to be inhumed
|
||||||
|
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
|
||||||
|
return new(apistatus.ObjectLocked)
|
||||||
|
}
|
||||||
|
|
||||||
|
var lockWasChecked bool
|
||||||
|
|
||||||
|
// prevent lock objects to be inhumed
|
||||||
|
// if `Inhume` was called not with the
|
||||||
|
// `WithForceGCMark` option
|
||||||
|
if !prm.forceRemoval {
|
||||||
|
if isLockObject(tx, cnr, id) {
|
||||||
|
return ErrLockObjectRemoval
|
||||||
|
}
|
||||||
|
|
||||||
|
lockWasChecked = true
|
||||||
|
}
|
||||||
|
|
||||||
|
obj, err := db.get(tx, prm.target[i], buf, false, true, epoch)
|
||||||
|
targetKey := addressKey(prm.target[i], buf)
|
||||||
|
var ecErr *objectSDK.ECInfoError
|
||||||
|
if err == nil {
|
||||||
|
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else if errors.As(err, &ecErr) {
|
||||||
|
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if prm.tomb != nil {
|
||||||
|
var isTomb bool
|
||||||
|
isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if isTomb {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// consider checking if target is already in graveyard?
|
||||||
|
err = bkt.Put(targetKey, value)
|
||||||
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if prm.lockObjectHandling {
|
||||||
|
// do not perform lock check if
|
||||||
|
// it was already called
|
||||||
|
if lockWasChecked {
|
||||||
|
// inhumed object is not of
|
||||||
|
// the LOCK type
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if isLockObject(tx, cnr, id) {
|
||||||
|
res.deletedLockObj = append(res.deletedLockObj, prm.target[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return db.applyInhumeResToCounters(tx, res)
|
return db.applyInhumeResToCounters(tx, res)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garbageBKT *bbolt.Bucket, addr oid.Address, buf []byte, epoch uint64, prm InhumePrm, res *InhumeRes) error {
|
|
||||||
id := addr.Object()
|
|
||||||
cnr := addr.Container()
|
|
||||||
tx := bkt.Tx()
|
|
||||||
|
|
||||||
// prevent locked objects to be inhumed
|
|
||||||
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
|
|
||||||
return new(apistatus.ObjectLocked)
|
|
||||||
}
|
|
||||||
|
|
||||||
var lockWasChecked bool
|
|
||||||
|
|
||||||
// prevent lock objects to be inhumed
|
|
||||||
// if `Inhume` was called not with the
|
|
||||||
// `WithForceGCMark` option
|
|
||||||
if !prm.forceRemoval {
|
|
||||||
if isLockObject(tx, cnr, id) {
|
|
||||||
return ErrLockObjectRemoval
|
|
||||||
}
|
|
||||||
|
|
||||||
lockWasChecked = true
|
|
||||||
}
|
|
||||||
|
|
||||||
obj, err := db.get(tx, addr, buf, false, true, epoch)
|
|
||||||
targetKey := addressKey(addr, buf)
|
|
||||||
var ecErr *objectSDK.ECInfoError
|
|
||||||
if err == nil {
|
|
||||||
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
} else if errors.As(err, &ecErr) {
|
|
||||||
err = db.inhumeECInfo(tx, epoch, prm.tomb, res, garbageBKT, graveyardBKT, ecErr.ECInfo(), cnr, bkt, value)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if prm.tomb != nil {
|
|
||||||
var isTomb bool
|
|
||||||
isTomb, err = db.markAsGC(graveyardBKT, garbageBKT, targetKey)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if isTomb {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// consider checking if target is already in graveyard?
|
|
||||||
err = bkt.Put(targetKey, value)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if prm.lockObjectHandling {
|
|
||||||
// do not perform lock check if
|
|
||||||
// it was already called
|
|
||||||
if lockWasChecked {
|
|
||||||
// inhumed object is not of
|
|
||||||
// the LOCK type
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if isLockObject(tx, cnr, id) {
|
|
||||||
res.deletedLockObj = append(res.deletedLockObj, addr)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
|
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
|
||||||
garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
|
garbageBKT *bbolt.Bucket, graveyardBKT *bbolt.Bucket,
|
||||||
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte,
|
ecInfo *objectSDK.ECInfo, cnr cid.ID, targetBucket *bbolt.Bucket, value []byte,
|
||||||
|
@ -362,7 +354,7 @@ func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
|
||||||
// 1. tombstone address if Inhume was called with
|
// 1. tombstone address if Inhume was called with
|
||||||
// a Tombstone
|
// a Tombstone
|
||||||
// 2. zeroValue if Inhume was called with a GC mark
|
// 2. zeroValue if Inhume was called with a GC mark
|
||||||
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
|
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm *InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
|
||||||
if prm.tomb != nil {
|
if prm.tomb != nil {
|
||||||
targetBucket = graveyardBKT
|
targetBucket = graveyardBKT
|
||||||
tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize))
|
tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize))
|
||||||
|
|
|
@ -627,14 +627,23 @@ func (s *Shard) selectExpired(ctx context.Context, epoch uint64, addresses []oid
|
||||||
//
|
//
|
||||||
// Does not modify tss.
|
// Does not modify tss.
|
||||||
func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.TombstonedObject) {
|
func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.TombstonedObject) {
|
||||||
s.m.RLock()
|
if s.GetMode().NoMetabase() {
|
||||||
defer s.m.RUnlock()
|
|
||||||
|
|
||||||
if s.info.Mode.NoMetabase() {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := s.metaBase.InhumeTombstones(ctx, tss)
|
// Mark tombstones as garbage.
|
||||||
|
var pInhume meta.InhumePrm
|
||||||
|
|
||||||
|
tsAddrs := make([]oid.Address, 0, len(tss))
|
||||||
|
for _, ts := range tss {
|
||||||
|
tsAddrs = append(tsAddrs, ts.Tombstone())
|
||||||
|
}
|
||||||
|
|
||||||
|
pInhume.SetGCMark()
|
||||||
|
pInhume.SetAddresses(tsAddrs...)
|
||||||
|
|
||||||
|
// inhume tombstones
|
||||||
|
res, err := s.metaBase.Inhume(ctx, pInhume)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage,
|
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage,
|
||||||
zap.String("error", err.Error()),
|
zap.String("error", err.Error()),
|
||||||
|
@ -654,6 +663,13 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
|
||||||
s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size))
|
s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size))
|
||||||
i++
|
i++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// drop just processed expired tombstones
|
||||||
|
// from graveyard
|
||||||
|
err = s.metaBase.DropGraves(ctx, tss)
|
||||||
|
if err != nil {
|
||||||
|
s.log.Warn(ctx, logs.ShardCouldNotDropExpiredGraveRecords, zap.Error(err))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleExpiredLocks unlocks all objects which were locked by lockers.
|
// HandleExpiredLocks unlocks all objects which were locked by lockers.
|
||||||
|
|
Loading…
Reference in a new issue