Compare commits

...

9 commits

Author SHA1 Message Date
4c37f95656
[#9999] quota: Add limiter
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-11-15 16:28:21 +03:00
d77a218f7c [#1493] metabase: Merge Inhume() and DropGraves() for tombstones
DropGraves() is only used to drop gravemarks after a tombstone
removal. Thus, it makes sense to do Inhume() and DropGraves() in one
transaction. It has less overhead and no unexpected problems in case
of sudden power failure.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-11-14 06:47:04 +00:00
44df67492f [#1493] metabase: Split inhumeTx() into 2 functions
No functional changes.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-11-14 06:47:04 +00:00
1e6f132b4e [#1493] metabase: Pass InhumePrm by value
Unify with the other code, no functional changes.

Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-11-14 06:47:04 +00:00
6dc0dc6691 [#1493] shard: Take mode mutex in HandleExpiredTombstones()
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
2024-11-14 06:47:04 +00:00
f7cb6b4d87 [#1482] Makefile: Update golangci-lint
Signed-off-by: Alexander Chuprov <a.chuprov@yadro.com>
2024-11-13 15:01:41 +00:00
7fc6101bec
[#1491] engine/test: Rework engine test utils
- Remove `testNewShard` and `setInitializedShards` because they
violated the default engine workflow. The correct workflow is:
first use `New()`, followed by `Open()`, and then `Init()`. As a
result, adding new logic to `(*StorageEngine).Init` caused several
tests to fail with a panic when attempting to access uninitialized
resources. Now, all engines created with the test utils must be
initialized manually. The new helper method `prepare` can be used
for that purpose.
- Additionally, `setInitializedShards` hardcoded the shard worker
pool size, which prevented it from being configured in tests and
benchmarks. This has been fixed as well.
- Ensure engine initialization is done wherever it was missing.
- Refactor `setShardsNumOpts`, `setShardsNumAdditionalOpts`, and
`setShardsNum`. Make them all depend on `setShardsNumOpts`.

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-11-13 14:42:53 +03:00
7ef36749d0
[#1491] engine/test: Move BenchmarkExists to exists_test.go
Move `BenchmarkExists` from `engine_test.go` to `exists_test.go`
for better organization and clarity.

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-11-13 14:09:29 +03:00
c6066d6ee4
[#1491] engine/test: Use more suitable testing utils here and there
Use `setShardsNum` instead of `setInitializedShards` wherever possible.

Signed-off-by: Aleksey Savchuk <a.savchuk@yadro.com>
2024-11-13 14:09:29 +03:00
19 changed files with 662 additions and 301 deletions

View file

@ -8,8 +8,8 @@ HUB_IMAGE ?= git.frostfs.info/truecloudlab/frostfs
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
GO_VERSION ?= 1.22
LINT_VERSION ?= 1.61.0
TRUECLOUDLAB_LINT_VERSION ?= 0.0.7
LINT_VERSION ?= 1.62.0
TRUECLOUDLAB_LINT_VERSION ?= 0.0.8
PROTOC_VERSION ?= 25.0
PROTOGEN_FROSTFS_VERSION ?= $(shell go list -f '{{.Version}}' -m git.frostfs.info/TrueCloudLab/frostfs-sdk-go)
PROTOC_OS_VERSION=osx-x86_64

291
pkg/core/quota/limiter.go Normal file
View file

@ -0,0 +1,291 @@
package quota
import (
"container/heap"
"context"
"fmt"
"math/rand/v2"
"sync"
"time"
)
type Quota struct {
Read int64
Write int64
}
type Priority struct {
Class string
Value byte
}
type Release func()
type limit struct {
read, write int64
maxRead, maxWrite int64
}
type queueItem struct {
priority byte
ts time.Time
index int
}
type queue struct {
l *limit
w int32
items []*queueItem
}
type QuotaLimiter struct {
l *limit
classes map[string]*queue
queues []*queue
nextQueue *queue
cond *sync.Cond
}
func (ql *QuotaLimiter) Acquire(ctx context.Context, p Priority, q Quota) (Release, error) {
queue, ok := ql.classes[p.Class]
if !ok {
return nil, fmt.Errorf("unknown class '%s'", p.Class)
}
if ql.l.maxRead > 0 && q.Read > ql.l.maxRead {
return nil, fmt.Errorf("read quota %d exceeds total limit %d", q.Read, ql.l.maxRead)
}
if ql.l.maxWrite > 0 && q.Write > ql.l.maxWrite {
return nil, fmt.Errorf("read quota %d exceeds total limit %d", q.Write, ql.l.maxWrite)
}
if queue.l.maxRead > 0 && q.Read > queue.l.maxRead {
return nil, fmt.Errorf("read quota %d exceeds queue limit %d", q.Read, queue.l.maxRead)
}
if queue.l.maxWrite > 0 && q.Write > queue.l.maxWrite {
return nil, fmt.Errorf("read quota %d exceeds queue limit %d", q.Write, queue.l.maxWrite)
}
ts := time.Now()
ql.cond.L.Lock()
defer ql.cond.L.Unlock()
stop := context.AfterFunc(ctx, func() {
ql.cond.Broadcast()
})
defer stop()
allow := ql.nextQueue == nil && // no scheduled queue
hasQuota(q, queue.l) && // queue limit
hasQuota(q, ql.l) // global lomit
if allow {
applyQuota(q, queue.l)
applyQuota(q, ql.l)
return func() { ql.release(p, q) }, nil
}
qi := &queueItem{
priority: p.Value,
ts: ts,
}
queue.push(qi)
if queue.count() == 1 {
ql.resetNextQueue()
}
var hasGlobalQuota, hasQueueQuota, isNextItem bool
for !allow {
ql.cond.Wait()
if err := ctx.Err(); err != nil {
queue.drop(qi)
if queue.count() == 0 {
ql.resetNextQueue()
}
return nil, ctx.Err()
}
hasGlobalQuota = hasQuota(q, ql.l)
hasQueueQuota = hasQuota(q, queue.l)
isNextItem = ql.nextQueue == queue && queue.top() == qi
if hasGlobalQuota && !hasQueueQuota && isNextItem {
ql.changeNextQueue()
}
allow = hasGlobalQuota && hasQueueQuota && isNextItem
}
applyQuota(q, queue.l)
applyQuota(q, ql.l)
queue.pop()
ql.resetNextQueue()
return func() { ql.release(p, q) }, nil
}
func (ql *QuotaLimiter) release(p Priority, q Quota) {
queue, ok := ql.classes[p.Class]
if !ok {
panic("unknown class " + p.Class)
}
ql.cond.L.Lock()
defer ql.cond.L.Unlock()
releaseQuota(q, queue.l)
releaseQuota(q, ql.l)
ql.cond.Broadcast()
}
func (ql *QuotaLimiter) resetNextQueue() {
var nonEmptyQueues []*queue
var totalWeight int64
for _, q := range ql.queues {
if q.count() > 0 {
nonEmptyQueues = append(nonEmptyQueues, q)
totalWeight += int64(q.weight())
}
}
if len(nonEmptyQueues) == 0 {
ql.nextQueue = nil
return
}
ql.selectNextQueue(nonEmptyQueues, totalWeight)
}
func (ql *QuotaLimiter) changeNextQueue() {
var nonEmptyQueues []*queue
var totalWeight int64
for _, q := range ql.queues {
if q.count() > 0 && q != ql.nextQueue {
nonEmptyQueues = append(nonEmptyQueues, q)
totalWeight += int64(q.weight())
}
}
if len(nonEmptyQueues) == 0 {
return
}
ql.selectNextQueue(nonEmptyQueues, totalWeight)
}
func (ql *QuotaLimiter) selectNextQueue(nonEmptyQueues []*queue, totalWeight int64) {
if totalWeight == 0 {
ql.nextQueue = nonEmptyQueues[rand.IntN(len(nonEmptyQueues))]
return
}
weight := rand.Int64N(totalWeight)
var low, up int64
for _, q := range nonEmptyQueues {
low = up
up += int64(q.weight())
if weight >= low && weight < up {
ql.nextQueue = q
return
}
}
panic("undefined next queue")
}
func hasQuota(q Quota, l *limit) bool {
if q.Read > 0 && l.maxRead > 0 && q.Read+l.read > l.maxRead {
return false
}
if q.Write > 0 && l.maxWrite > 0 && q.Write+l.write > l.write {
return false
}
return true
}
func applyQuota(q Quota, l *limit) {
if q.Read > 0 && l.maxRead > 0 {
l.read += q.Read
}
if q.Write > 0 && l.maxWrite > 0 {
l.write += q.Write
}
}
func releaseQuota(q Quota, l *limit) {
if q.Read > 0 && l.maxRead > 0 {
l.read -= q.Read
if l.read < 0 {
panic("invalid read limit after release")
}
}
if q.Write > 0 && l.maxWrite > 0 {
l.write -= q.Write
if l.write < 0 {
panic("invalid write limit after release")
}
}
}
func (q *queue) push(qi *queueItem) {
heap.Push(q, qi)
}
func (q *queue) pop() {
heap.Pop(q)
}
func (q *queue) drop(qi *queueItem) {
heap.Remove(q, qi.index)
}
func (q *queue) top() *queueItem {
if len(q.items) > 0 {
return q.items[0]
}
return nil
}
func (q *queue) count() int {
return len(q.items)
}
func (q *queue) weight() int32 {
return q.w
}
// Len implements heap.Interface.
func (q *queue) Len() int {
return q.count()
}
// Less implements heap.Interface.
func (q *queue) Less(i int, j int) bool {
if q.items[i].priority == q.items[j].priority {
return q.items[i].ts.Before(q.items[j].ts)
}
return q.items[i].priority > q.items[j].priority
}
// Pop implements heap.Interface.
func (q *queue) Pop() any {
n := len(q.items)
item := q.items[n-1]
q.items[n-1] = nil
q.items = q.items[0 : n-1]
item.index = -1
return item
}
// Push implements heap.Interface.
func (q *queue) Push(x any) {
it := x.(*queueItem)
it.index = q.Len()
q.items = append(q.items, it)
}
// Swap implements heap.Interface.
func (q *queue) Swap(i int, j int) {
q.items[i], q.items[j] = q.items[j], q.items[i]
q.items[i].index = i
q.items[j].index = j
}

View file

@ -0,0 +1,102 @@
package quota
import (
"math/rand/v2"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestQueue(t *testing.T) {
t.Run("different priority", func(t *testing.T) {
q := &queue{}
ts := time.Now()
const count = 12345
for i := range count {
priority := i % 256
q.push(&queueItem{
priority: byte(priority),
ts: ts,
})
}
testQueueInvariant(t, q)
})
t.Run("same priority, different ts, inc", func(t *testing.T) {
q := &queue{}
ts := time.Now()
const count = 10000
for range count {
ts = ts.Add(time.Second)
q.push(&queueItem{
priority: 100,
ts: ts,
})
}
testQueueInvariant(t, q)
})
t.Run("same priority, different ts, dec", func(t *testing.T) {
q := &queue{}
ts := time.Now()
const count = 10000
for range count {
ts = ts.Add(-1 * time.Second)
q.push(&queueItem{
priority: 100,
ts: ts,
})
}
testQueueInvariant(t, q)
})
t.Run("drop, inc", func(t *testing.T) {
q := &queue{}
ts := time.Now()
const count = 12345
for i := range count {
ts = ts.Add(time.Second)
q.push(&queueItem{
priority: byte(i % 256),
ts: ts,
})
}
for q.Len() > 0 {
idx := rand.IntN(q.count())
it := q.items[idx]
q.drop(it)
testQueueInvariant(t, q)
}
})
t.Run("drop, dec", func(t *testing.T) {
q := &queue{}
ts := time.Now()
const count = 12345
for i := range count {
ts = ts.Add(-1 * time.Second)
q.push(&queueItem{
priority: byte(i % 256),
ts: ts,
})
}
for q.Len() > 0 {
idx := rand.IntN(q.count())
it := q.items[idx]
q.drop(it)
testQueueInvariant(t, q)
}
})
}
func testQueueInvariant(t *testing.T, q *queue) {
var previous *queueItem
for q.count() > 0 {
current := q.top()
if previous != nil {
require.True(t, previous.priority > current.priority ||
(previous.priority == current.priority &&
(previous.ts == current.ts || previous.ts.Before(current.ts))))
}
previous = current
q.pop()
}
require.Equal(t, 0, q.count())
}

View file

@ -164,7 +164,7 @@ func testEngineFailInitAndReload(t *testing.T, degradedMode bool, opts []shard.O
}
func TestExecBlocks(t *testing.T) {
e := testNewEngine(t).setShardsNum(t, 2).engine // number doesn't matter in this test, 2 is several but not many
e := testNewEngine(t).setShardsNum(t, 2).prepare(t).engine // number doesn't matter in this test, 2 is several but not many
// put some object
obj := testutil.GenerateObjectWithCID(cidtest.ID())
@ -302,7 +302,8 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str
meta.WithEpochState(epochState{}),
),
}
})
}).
prepare(t)
e, ids := te.engine, te.shardIDs
for _, id := range ids {
@ -312,8 +313,5 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str
require.Equal(t, num, len(e.shards))
require.Equal(t, num, len(e.shardPools))
require.NoError(t, e.Open(context.Background()))
require.NoError(t, e.Init(context.Background()))
return e, currShards
}

View file

@ -7,7 +7,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -49,13 +48,8 @@ func TestDeleteBigObject(t *testing.T) {
link.SetSplitID(splitID)
link.SetChildren(childIDs...)
s1 := testNewShard(t)
s2 := testNewShard(t)
s3 := testNewShard(t)
e := testNewEngine(t).setInitializedShards(t, s1, s2, s3).engine
e.log = test.NewLogger(t)
defer e.Close(context.Background())
e := testNewEngine(t).setShardsNum(t, 3).prepare(t).engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
for i := range children {
require.NoError(t, Put(context.Background(), e, children[i], false))
@ -119,11 +113,13 @@ func TestDeleteBigObjectWithoutGC(t *testing.T) {
link.SetSplitID(splitID)
link.SetChildren(childIDs...)
s1 := testNewShard(t, shard.WithDisabledGC())
te := testNewEngine(t).setShardsNumAdditionalOpts(t, 1, func(_ int) []shard.Option {
return []shard.Option{shard.WithDisabledGC()}
}).prepare(t)
e := te.engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
e := testNewEngine(t).setInitializedShards(t, s1).engine
e.log = test.NewLogger(t)
defer e.Close(context.Background())
s1 := te.shards[0]
for i := range children {
require.NoError(t, Put(context.Background(), e, children[i], false))

View file

@ -3,24 +3,17 @@ package engine
import (
"context"
"path/filepath"
"sync/atomic"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/teststore"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"git.frostfs.info/TrueCloudLab/hrw"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
)
@ -30,113 +23,79 @@ func (s epochState) CurrentEpoch() uint64 {
return 0
}
func BenchmarkExists(b *testing.B) {
b.Run("2 shards", func(b *testing.B) {
benchmarkExists(b, 2)
})
b.Run("4 shards", func(b *testing.B) {
benchmarkExists(b, 4)
})
b.Run("8 shards", func(b *testing.B) {
benchmarkExists(b, 8)
})
}
func benchmarkExists(b *testing.B, shardNum int) {
shards := make([]*shard.Shard, shardNum)
for i := range shardNum {
shards[i] = testNewShard(b)
}
e := testNewEngine(b).setInitializedShards(b, shards...).engine
defer func() { require.NoError(b, e.Close(context.Background())) }()
addr := oidtest.Address()
for range 100 {
obj := testutil.GenerateObjectWithCID(cidtest.ID())
err := Put(context.Background(), e, obj, false)
if err != nil {
b.Fatal(err)
}
}
b.ReportAllocs()
b.ResetTimer()
for range b.N {
var shPrm shard.ExistsPrm
shPrm.Address = addr
shPrm.ParentAddress = oid.Address{}
ok, _, err := e.exists(context.Background(), shPrm)
if err != nil || ok {
b.Fatalf("%t %v", ok, err)
}
}
}
type testEngineWrapper struct {
engine *StorageEngine
shards []*shard.Shard
shardIDs []*shard.ID
}
func testNewEngine(t testing.TB, opts ...Option) *testEngineWrapper {
engine := New(WithLogger(test.NewLogger(t)))
for _, opt := range opts {
opt(engine.cfg)
}
return &testEngineWrapper{
engine: engine,
}
}
func (te *testEngineWrapper) setInitializedShards(t testing.TB, shards ...*shard.Shard) *testEngineWrapper {
for _, s := range shards {
pool, err := ants.NewPool(10, ants.WithNonblocking(true))
require.NoError(t, err)
te.engine.shards[s.ID().String()] = hashedShard{
shardWrapper: shardWrapper{
errorCount: new(atomic.Uint32),
Shard: s,
},
hash: hrw.StringHash(s.ID().String()),
}
te.engine.shardPools[s.ID().String()] = pool
te.shardIDs = append(te.shardIDs, s.ID())
}
return te
opts = append(testGetDefaultEngineOptions(t), opts...)
return &testEngineWrapper{engine: New(opts...)}
}
func (te *testEngineWrapper) setShardsNum(t testing.TB, num int) *testEngineWrapper {
shards := make([]*shard.Shard, 0, num)
for range num {
shards = append(shards, testNewShard(t))
}
return te.setInitializedShards(t, shards...)
return te.setShardsNumOpts(t, num, func(_ int) []shard.Option {
return testGetDefaultShardOptions(t)
})
}
func (te *testEngineWrapper) setShardsNumOpts(t testing.TB, num int, shardOpts func(id int) []shard.Option) *testEngineWrapper {
func (te *testEngineWrapper) setShardsNumOpts(
t testing.TB, num int, shardOpts func(id int) []shard.Option,
) *testEngineWrapper {
te.shards = make([]*shard.Shard, num)
te.shardIDs = make([]*shard.ID, num)
for i := range num {
opts := shardOpts(i)
id, err := te.engine.AddShard(context.Background(), opts...)
shard, err := te.engine.createShard(context.Background(), shardOpts(i))
require.NoError(t, err)
te.shardIDs = append(te.shardIDs, id)
require.NoError(t, te.engine.addShard(shard))
te.shards[i] = shard
te.shardIDs[i] = shard.ID()
}
require.Len(t, te.engine.shards, num)
require.Len(t, te.engine.shardPools, num)
return te
}
func (te *testEngineWrapper) setShardsNumAdditionalOpts(t testing.TB, num int, shardOpts func(id int) []shard.Option) *testEngineWrapper {
for i := range num {
defaultOpts := testDefaultShardOptions(t)
opts := append(defaultOpts, shardOpts(i)...)
id, err := te.engine.AddShard(context.Background(), opts...)
require.NoError(t, err)
te.shardIDs = append(te.shardIDs, id)
}
func (te *testEngineWrapper) setShardsNumAdditionalOpts(
t testing.TB, num int, shardOpts func(id int) []shard.Option,
) *testEngineWrapper {
return te.setShardsNumOpts(t, num, func(id int) []shard.Option {
return append(testGetDefaultShardOptions(t), shardOpts(id)...)
})
}
// prepare calls Open and Init on the created engine.
func (te *testEngineWrapper) prepare(t testing.TB) *testEngineWrapper {
require.NoError(t, te.engine.Open(context.Background()))
require.NoError(t, te.engine.Init(context.Background()))
return te
}
func testGetDefaultEngineOptions(t testing.TB) []Option {
return []Option{
WithLogger(test.NewLogger(t)),
}
}
func testGetDefaultShardOptions(t testing.TB) []shard.Option {
return []shard.Option{
shard.WithLogger(test.NewLogger(t)),
shard.WithBlobStorOptions(
blobstor.WithStorages(
newStorages(t, t.TempDir(), 1<<20)),
blobstor.WithLogger(test.NewLogger(t)),
),
shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(t.TempDir(), "pilorama"))),
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(t.TempDir(), "metabase")),
meta.WithPermissions(0o700),
meta.WithEpochState(epochState{}),
meta.WithLogger(test.NewLogger(t)),
),
}
}
func newStorages(t testing.TB, root string, smallSize uint64) []blobstor.SubStorage {
return []blobstor.SubStorage{
{
@ -186,34 +145,3 @@ func newTestStorages(root string, smallSize uint64) ([]blobstor.SubStorage, *tes
},
}, smallFileStorage, largeFileStorage
}
func testNewShard(t testing.TB, opts ...shard.Option) *shard.Shard {
sid, err := generateShardID()
require.NoError(t, err)
shardOpts := append([]shard.Option{shard.WithID(sid)}, testDefaultShardOptions(t)...)
s := shard.New(append(shardOpts, opts...)...)
require.NoError(t, s.Open(context.Background()))
require.NoError(t, s.Init(context.Background()))
return s
}
func testDefaultShardOptions(t testing.TB) []shard.Option {
return []shard.Option{
shard.WithLogger(test.NewLogger(t)),
shard.WithBlobStorOptions(
blobstor.WithStorages(
newStorages(t, t.TempDir(), 1<<20)),
blobstor.WithLogger(test.NewLogger(t)),
),
shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(t.TempDir(), "pilorama"))),
shard.WithMetaBaseOptions(
meta.WithPath(filepath.Join(t.TempDir(), "metabase")),
meta.WithPermissions(0o700),
meta.WithEpochState(epochState{}),
meta.WithLogger(test.NewLogger(t)),
),
}
}

View file

@ -67,10 +67,8 @@ func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32)
pilorama.WithPath(filepath.Join(dir, fmt.Sprintf("%d.pilorama", id))),
pilorama.WithPerm(0o700)),
}
})
}).prepare(t)
e := te.engine
require.NoError(t, e.Open(context.Background()))
require.NoError(t, e.Init(context.Background()))
for i, id := range te.shardIDs {
testShards[i].id = id

View file

@ -75,10 +75,9 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng
pilorama.WithPerm(0o700),
),
}
})
}).
prepare(t)
e, ids := te.engine, te.shardIDs
require.NoError(t, e.Open(context.Background()))
require.NoError(t, e.Init(context.Background()))
objects := make([]*objectSDK.Object, 0, objPerShard*len(ids))
treeID := "version"

View file

@ -0,0 +1,51 @@
package engine
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
)
func BenchmarkExists(b *testing.B) {
b.Run("2 shards", func(b *testing.B) {
benchmarkExists(b, 2)
})
b.Run("4 shards", func(b *testing.B) {
benchmarkExists(b, 4)
})
b.Run("8 shards", func(b *testing.B) {
benchmarkExists(b, 8)
})
}
func benchmarkExists(b *testing.B, shardNum int) {
e := testNewEngine(b).setShardsNum(b, shardNum).prepare(b).engine
defer func() { require.NoError(b, e.Close(context.Background())) }()
addr := oidtest.Address()
for range 100 {
obj := testutil.GenerateObjectWithCID(cidtest.ID())
err := Put(context.Background(), e, obj, false)
if err != nil {
b.Fatal(err)
}
}
b.ReportAllocs()
b.ResetTimer()
for range b.N {
var shPrm shard.ExistsPrm
shPrm.Address = addr
shPrm.ParentAddress = oid.Address{}
ok, _, err := e.exists(context.Background(), shPrm)
if err != nil || ok {
b.Fatalf("%t %v", ok, err)
}
}
}

View file

@ -39,11 +39,11 @@ func TestHeadRaw(t *testing.T) {
link.SetSplitID(splitID)
t.Run("virtual object split in different shards", func(t *testing.T) {
s1 := testNewShard(t)
s2 := testNewShard(t)
te := testNewEngine(t).setShardsNum(t, 2).prepare(t)
e := te.engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
defer e.Close(context.Background())
s1, s2 := te.shards[0], te.shards[1]
var putPrmLeft shard.PutPrm
putPrmLeft.SetObject(child)

View file

@ -37,8 +37,8 @@ func TestStorageEngine_Inhume(t *testing.T) {
t.Run("delete small object", func(t *testing.T) {
t.Parallel()
e := testNewEngine(t).setShardsNum(t, 1).engine
defer e.Close(context.Background())
e := testNewEngine(t).setShardsNum(t, 1).prepare(t).engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
err := Put(context.Background(), e, parent, false)
require.NoError(t, err)
@ -56,11 +56,12 @@ func TestStorageEngine_Inhume(t *testing.T) {
t.Run("delete big object", func(t *testing.T) {
t.Parallel()
s1 := testNewShard(t)
s2 := testNewShard(t)
e := testNewEngine(t).setInitializedShards(t, s1, s2).engine
defer e.Close(context.Background())
te := testNewEngine(t).setShardsNum(t, 2).prepare(t)
e := te.engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
s1, s2 := te.shards[0], te.shards[1]
var putChild shard.PutPrm
putChild.SetObject(child)

View file

@ -68,10 +68,7 @@ func TestListWithCursor(t *testing.T) {
meta.WithEpochState(epochState{}),
),
}
}).engine
require.NoError(t, e.Open(context.Background()))
require.NoError(t, e.Init(context.Background()))
}).prepare(t).engine
defer func() {
require.NoError(t, e.Close(context.Background()))
}()

View file

@ -57,11 +57,9 @@ func TestLockUserScenario(t *testing.T) {
}),
shard.WithTombstoneSource(tss{lockerExpiresAfter}),
}
})
}).
prepare(t)
e := testEngine.engine
require.NoError(t, e.Open(context.Background()))
require.NoError(t, e.Init(context.Background()))
defer func() { require.NoError(t, e.Close(context.Background())) }()
lockerID := oidtest.ID()
@ -162,11 +160,9 @@ func TestLockExpiration(t *testing.T) {
return pool
}),
}
})
}).
prepare(t)
e := testEngine.engine
require.NoError(t, e.Open(context.Background()))
require.NoError(t, e.Init(context.Background()))
defer func() { require.NoError(t, e.Close(context.Background())) }()
const lockerExpiresAfter = 13
@ -243,9 +239,8 @@ func TestLockForceRemoval(t *testing.T) {
}),
shard.WithDeletedLockCallback(e.processDeletedLocks),
}
}).engine
require.NoError(t, e.Open(context.Background()))
require.NoError(t, e.Init(context.Background()))
}).
prepare(t).engine
defer func() { require.NoError(t, e.Close(context.Background())) }()
cnr := cidtest.ID()

View file

@ -13,7 +13,7 @@ import (
func TestRemoveShard(t *testing.T) {
const numOfShards = 6
te := testNewEngine(t).setShardsNum(t, numOfShards)
te := testNewEngine(t).setShardsNum(t, numOfShards).prepare(t)
e, ids := te.engine, te.shardIDs
defer func() { require.NoError(t, e.Close(context.Background())) }()
@ -51,7 +51,7 @@ func TestDisableShards(t *testing.T) {
const numOfShards = 2
te := testNewEngine(t).setShardsNum(t, numOfShards)
te := testNewEngine(t).setShardsNum(t, numOfShards).prepare(t)
e, ids := te.engine, te.shardIDs
defer func() { require.NoError(t, e.Close(context.Background())) }()

View file

@ -130,17 +130,9 @@ func TestDeleteECObject_WithoutSplit(t *testing.T) {
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
require.Equal(t, 2, len(tombstonedObjects))
var tombstones []oid.Address
for _, tss := range tombstonedObjects {
tombstones = append(tombstones, tss.tomb)
}
inhumePrm.SetAddresses(tombstones...)
inhumePrm.SetGCMark()
_, err = db.Inhume(context.Background(), inhumePrm)
_, err = db.InhumeTombstones(context.Background(), tombstonedObjects)
require.NoError(t, err)
require.NoError(t, db.DropGraves(context.Background(), tombstonedObjects))
// GC finds tombstone as garbage and deletes it
garbageAddresses = nil
@ -374,17 +366,9 @@ func testDeleteECObjectWithSplit(t *testing.T, chunksCount int, withLinking bool
require.NoError(t, db.IterateOverGraveyard(context.Background(), graveyardIterationPrm))
require.True(t, len(tombstonedObjects) == parentCount+chunksCount)
var tombstones []oid.Address
for _, tss := range tombstonedObjects {
tombstones = append(tombstones, tss.tomb)
}
inhumePrm.SetAddresses(tombstones...)
inhumePrm.SetGCMark()
_, err = db.Inhume(context.Background(), inhumePrm)
_, err = db.InhumeTombstones(context.Background(), tombstonedObjects)
require.NoError(t, err)
require.NoError(t, db.DropGraves(context.Background(), tombstonedObjects))
// GC finds tombstone as garbage and deletes it
garbageAddresses = nil

View file

@ -9,6 +9,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
)
@ -255,46 +256,58 @@ func graveFromKV(k, v []byte) (res TombstonedObject, err error) {
return
}
// DropGraves deletes tombstoned objects from the
// InhumeTombstones deletes tombstoned objects from the
// graveyard bucket.
//
// Returns any error appeared during deletion process.
func (db *DB) DropGraves(ctx context.Context, tss []TombstonedObject) error {
func (db *DB) InhumeTombstones(ctx context.Context, tss []TombstonedObject) (InhumeRes, error) {
var (
startedAt = time.Now()
success = false
)
defer func() {
db.metrics.AddMethodDuration("DropGraves", time.Since(startedAt), success)
db.metrics.AddMethodDuration("InhumeTombstones", time.Since(startedAt), success)
}()
_, span := tracing.StartSpanFromContext(ctx, "metabase.DropGraves")
_, span := tracing.StartSpanFromContext(ctx, "metabase.InhumeTombstones")
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return ErrDegradedMode
return InhumeRes{}, ErrDegradedMode
} else if db.mode.ReadOnly() {
return ErrReadOnlyMode
return InhumeRes{}, ErrReadOnlyMode
}
buf := make([]byte, addressKeySize)
prm := InhumePrm{forceRemoval: true}
currEpoch := db.epochState.CurrentEpoch()
return db.boltDB.Batch(func(tx *bbolt.Tx) error {
bkt := tx.Bucket(graveyardBucketName)
if bkt == nil {
return nil
var res InhumeRes
err := db.boltDB.Batch(func(tx *bbolt.Tx) error {
res = InhumeRes{inhumedByCnrID: make(map[cid.ID]ObjectCounters)}
garbageBKT := tx.Bucket(garbageBucketName)
graveyardBKT := tx.Bucket(graveyardBucketName)
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
if err != nil {
return err
}
for _, ts := range tss {
err := bkt.Delete(addressKey(ts.Address(), buf))
if err != nil {
for i := range tss {
if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, tss[i].Tombstone(), buf, currEpoch, prm, &res); err != nil {
return err
}
if err := graveyardBKT.Delete(addressKey(tss[i].Address(), buf)); err != nil {
return err
}
}
return nil
})
return res, err
}

View file

@ -7,7 +7,9 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/stretchr/testify/require"
@ -393,7 +395,7 @@ func TestDB_IterateOverGarbage_Offset(t *testing.T) {
require.False(t, iWasCalled)
}
func TestDB_DropGraves(t *testing.T) {
func TestDB_InhumeTombstones(t *testing.T) {
db := newDB(t)
defer func() { require.NoError(t, db.Close(context.Background())) }()
@ -410,9 +412,20 @@ func TestDB_DropGraves(t *testing.T) {
err = putBig(db, obj2)
require.NoError(t, err)
// inhume with tombstone
addrTombstone := oidtest.Address()
addrTombstone.SetContainer(cnr)
id1, _ := obj1.ID()
id2, _ := obj2.ID()
ts := objectSDK.NewTombstone()
ts.SetMembers([]oid.ID{id1, id2})
objTs := objectSDK.New()
objTs.SetContainerID(cnr)
objTs.SetType(objectSDK.TypeTombstone)
data, _ := ts.Marshal()
objTs.SetPayload(data)
require.NoError(t, objectSDK.CalculateAndSetID(objTs))
require.NoError(t, putBig(db, objTs))
addrTombstone := object.AddressOf(objTs)
var inhumePrm meta.InhumePrm
inhumePrm.SetAddresses(object.AddressOf(obj1), object.AddressOf(obj2))
@ -435,8 +448,11 @@ func TestDB_DropGraves(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 2, counter)
err = db.DropGraves(context.Background(), buriedTS)
res, err := db.InhumeTombstones(context.Background(), buriedTS)
require.NoError(t, err)
require.EqualValues(t, 1, res.LogicInhumed())
require.EqualValues(t, 0, res.UserInhumed())
require.EqualValues(t, map[cid.ID]meta.ObjectCounters{cnr: {Logic: 1}}, res.InhumedByCnrID())
counter = 0
iterGravePRM.SetHandler(func(_ meta.TombstonedObject) error {

View file

@ -217,15 +217,25 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
garbageBKT := tx.Bucket(garbageBucketName)
graveyardBKT := tx.Bucket(graveyardBucketName)
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, &prm)
bkt, value, err := db.getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT, prm)
if err != nil {
return err
}
buf := make([]byte, addressKeySize)
for i := range prm.target {
id := prm.target[i].Object()
cnr := prm.target[i].Container()
if err := db.inhumeTxSingle(bkt, value, graveyardBKT, garbageBKT, prm.target[i], buf, epoch, prm, res); err != nil {
return err
}
}
return db.applyInhumeResToCounters(tx, res)
}
func (db *DB) inhumeTxSingle(bkt *bbolt.Bucket, value []byte, graveyardBKT, garbageBKT *bbolt.Bucket, addr oid.Address, buf []byte, epoch uint64, prm InhumePrm, res *InhumeRes) error {
id := addr.Object()
cnr := addr.Container()
tx := bkt.Tx()
// prevent locked objects to be inhumed
if !prm.forceRemoval && objectLocked(tx, cnr, id) {
@ -245,8 +255,8 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
lockWasChecked = true
}
obj, err := db.get(tx, prm.target[i], buf, false, true, epoch)
targetKey := addressKey(prm.target[i], buf)
obj, err := db.get(tx, addr, buf, false, true, epoch)
targetKey := addressKey(addr, buf)
var ecErr *objectSDK.ECInfoError
if err == nil {
err = db.updateDeleteInfo(tx, garbageBKT, graveyardBKT, targetKey, cnr, obj, res)
@ -268,7 +278,7 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
}
if isTomb {
continue
return nil
}
}
@ -284,16 +294,14 @@ func (db *DB) inhumeTx(tx *bbolt.Tx, epoch uint64, prm InhumePrm, res *InhumeRes
if lockWasChecked {
// inhumed object is not of
// the LOCK type
continue
return nil
}
if isLockObject(tx, cnr, id) {
res.deletedLockObj = append(res.deletedLockObj, prm.target[i])
res.deletedLockObj = append(res.deletedLockObj, addr)
}
}
}
return db.applyInhumeResToCounters(tx, res)
return nil
}
func (db *DB) inhumeECInfo(tx *bbolt.Tx, epoch uint64, tomb *oid.Address, res *InhumeRes,
@ -354,7 +362,7 @@ func (db *DB) applyInhumeResToCounters(tx *bbolt.Tx, res *InhumeRes) error {
// 1. tombstone address if Inhume was called with
// a Tombstone
// 2. zeroValue if Inhume was called with a GC mark
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm *InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
func (db *DB) getInhumeTargetBucketAndValue(garbageBKT, graveyardBKT *bbolt.Bucket, prm InhumePrm) (targetBucket *bbolt.Bucket, value []byte, err error) {
if prm.tomb != nil {
targetBucket = graveyardBKT
tombKey := addressKey(*prm.tomb, make([]byte, addressKeySize))

View file

@ -627,23 +627,14 @@ func (s *Shard) selectExpired(ctx context.Context, epoch uint64, addresses []oid
//
// Does not modify tss.
func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.TombstonedObject) {
if s.GetMode().NoMetabase() {
s.m.RLock()
defer s.m.RUnlock()
if s.info.Mode.NoMetabase() {
return
}
// Mark tombstones as garbage.
var pInhume meta.InhumePrm
tsAddrs := make([]oid.Address, 0, len(tss))
for _, ts := range tss {
tsAddrs = append(tsAddrs, ts.Tombstone())
}
pInhume.SetGCMark()
pInhume.SetAddresses(tsAddrs...)
// inhume tombstones
res, err := s.metaBase.Inhume(ctx, pInhume)
res, err := s.metaBase.InhumeTombstones(ctx, tss)
if err != nil {
s.log.Warn(ctx, logs.ShardCouldNotMarkTombstonesAsGarbage,
zap.String("error", err.Error()),
@ -663,13 +654,6 @@ func (s *Shard) HandleExpiredTombstones(ctx context.Context, tss []meta.Tombston
s.addToContainerSize(delInfo.CID.EncodeToString(), -int64(delInfo.Size))
i++
}
// drop just processed expired tombstones
// from graveyard
err = s.metaBase.DropGraves(ctx, tss)
if err != nil {
s.log.Warn(ctx, logs.ShardCouldNotDropExpiredGraveRecords, zap.Error(err))
}
}
// HandleExpiredLocks unlocks all objects which were locked by lockers.