Fix GC race conditions in tests #668
14 changed files with 206 additions and 215 deletions
|
@ -32,12 +32,6 @@ import (
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
|
||||||
type epochState struct{}
|
|
||||||
|
|
||||||
func (s epochState) CurrentEpoch() uint64 {
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
type objAddr struct {
|
type objAddr struct {
|
||||||
obj *objectSDK.Object
|
obj *objectSDK.Object
|
||||||
addr oid.Address
|
addr oid.Address
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package shard_test
|
package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -7,7 +7,6 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -29,15 +28,14 @@ func TestShard_Delete(t *testing.T) {
|
||||||
|
|
||||||
func testShardDelete(t *testing.T, hasWriteCache bool) {
|
func testShardDelete(t *testing.T, hasWriteCache bool) {
|
||||||
sh := newShard(t, hasWriteCache)
|
sh := newShard(t, hasWriteCache)
|
||||||
defer releaseShard(sh, t)
|
|
||||||
|
|
||||||
cnr := cidtest.ID()
|
cnr := cidtest.ID()
|
||||||
|
|
||||||
obj := testutil.GenerateObjectWithCID(cnr)
|
obj := testutil.GenerateObjectWithCID(cnr)
|
||||||
testutil.AddAttribute(obj, "foo", "bar")
|
testutil.AddAttribute(obj, "foo", "bar")
|
||||||
|
|
||||||
var putPrm shard.PutPrm
|
var putPrm PutPrm
|
||||||
var getPrm shard.GetPrm
|
var getPrm GetPrm
|
||||||
|
|
||||||
t.Run("big object", func(t *testing.T) {
|
t.Run("big object", func(t *testing.T) {
|
||||||
testutil.AddPayload(obj, 1<<20)
|
testutil.AddPayload(obj, 1<<20)
|
||||||
|
@ -45,7 +43,7 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
|
||||||
putPrm.SetObject(obj)
|
putPrm.SetObject(obj)
|
||||||
getPrm.SetAddress(object.AddressOf(obj))
|
getPrm.SetAddress(object.AddressOf(obj))
|
||||||
|
|
||||||
var delPrm shard.DeletePrm
|
var delPrm DeletePrm
|
||||||
delPrm.SetAddresses(object.AddressOf(obj))
|
delPrm.SetAddresses(object.AddressOf(obj))
|
||||||
|
|
||||||
_, err := sh.Put(context.Background(), putPrm)
|
_, err := sh.Put(context.Background(), putPrm)
|
||||||
|
@ -71,7 +69,7 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
|
||||||
putPrm.SetObject(obj)
|
putPrm.SetObject(obj)
|
||||||
getPrm.SetAddress(object.AddressOf(obj))
|
getPrm.SetAddress(object.AddressOf(obj))
|
||||||
|
|
||||||
var delPrm shard.DeletePrm
|
var delPrm DeletePrm
|
||||||
delPrm.SetAddresses(object.AddressOf(obj))
|
delPrm.SetAddresses(object.AddressOf(obj))
|
||||||
|
|
||||||
_, err := sh.Put(context.Background(), putPrm)
|
_, err := sh.Put(context.Background(), putPrm)
|
||||||
|
|
|
@ -119,6 +119,8 @@ type gcCfg struct {
|
||||||
expiredCollectorBatchSize int
|
expiredCollectorBatchSize int
|
||||||
|
|
||||||
metrics GCMectrics
|
metrics GCMectrics
|
||||||
|
|
||||||
|
testHookRemover func(ctx context.Context) gcRunResult
|
||||||
}
|
}
|
||||||
|
|
||||||
func defaultGCCfg() gcCfg {
|
func defaultGCCfg() gcCfg {
|
||||||
|
@ -158,9 +160,14 @@ func (gc *gc) listenEvents(ctx context.Context) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
gc.handleEvent(ctx, event)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gc *gc) handleEvent(ctx context.Context, event Event) {
|
||||||
v, ok := gc.mEventHandler[event.typ()]
|
v, ok := gc.mEventHandler[event.typ()]
|
||||||
if !ok {
|
if !ok {
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
v.cancelFunc()
|
v.cancelFunc()
|
||||||
|
@ -187,7 +194,6 @@ func (gc *gc) listenEvents(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
func (gc *gc) tickRemover(ctx context.Context) {
|
func (gc *gc) tickRemover(ctx context.Context) {
|
||||||
defer gc.wg.Done()
|
defer gc.wg.Done()
|
||||||
|
@ -209,7 +215,12 @@ func (gc *gc) tickRemover(ctx context.Context) {
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
startedAt := time.Now()
|
startedAt := time.Now()
|
||||||
|
|
||||||
result := gc.remover(ctx)
|
var result gcRunResult
|
||||||
|
if gc.testHookRemover != nil {
|
||||||
|
result = gc.testHookRemover(ctx)
|
||||||
|
} else {
|
||||||
|
result = gc.remover(ctx)
|
||||||
|
}
|
||||||
timer.Reset(gc.removerInterval)
|
timer.Reset(gc.removerInterval)
|
||||||
|
|
||||||
gc.metrics.AddRunDuration(time.Since(startedAt), result.success)
|
gc.metrics.AddRunDuration(time.Since(startedAt), result.success)
|
||||||
|
@ -220,7 +231,7 @@ func (gc *gc) tickRemover(ctx context.Context) {
|
||||||
|
|
||||||
func (gc *gc) stop() {
|
func (gc *gc) stop() {
|
||||||
gc.onceStop.Do(func() {
|
gc.onceStop.Do(func() {
|
||||||
gc.stopChannel <- struct{}{}
|
close(gc.stopChannel)
|
||||||
})
|
})
|
||||||
|
|
||||||
gc.log.Info(logs.ShardWaitingForGCWorkersToStop)
|
gc.log.Info(logs.ShardWaitingForGCWorkersToStop)
|
||||||
|
|
|
@ -75,7 +75,7 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
sh = New(opts...)
|
sh = New(opts...)
|
||||||
|
sh.gcCfg.testHookRemover = func(context.Context) gcRunResult { return gcRunResult{} }
|
||||||
require.NoError(t, sh.Open())
|
require.NoError(t, sh.Open())
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
|
|
||||||
|
@ -116,13 +116,13 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
|
||||||
storageID, err := sh.metaBase.StorageID(context.Background(), metaStIDPrm)
|
storageID, err := sh.metaBase.StorageID(context.Background(), metaStIDPrm)
|
||||||
require.NoError(t, err, "failed to get storage ID")
|
require.NoError(t, err, "failed to get storage ID")
|
||||||
|
|
||||||
//check existance in blobstore
|
//check existence in blobstore
|
||||||
var bsExisted common.ExistsPrm
|
var bsExisted common.ExistsPrm
|
||||||
bsExisted.Address = addr
|
bsExisted.Address = addr
|
||||||
bsExisted.StorageID = storageID.StorageID()
|
bsExisted.StorageID = storageID.StorageID()
|
||||||
exRes, err := sh.blobStor.Exists(context.Background(), bsExisted)
|
exRes, err := sh.blobStor.Exists(context.Background(), bsExisted)
|
||||||
require.NoError(t, err, "failed to check blobstore existance")
|
require.NoError(t, err, "failed to check blobstore existence")
|
||||||
require.True(t, exRes.Exists, "invalid blobstore existance result")
|
require.True(t, exRes.Exists, "invalid blobstore existence result")
|
||||||
|
|
||||||
//drop from blobstor
|
//drop from blobstor
|
||||||
var bsDeletePrm common.DeletePrm
|
var bsDeletePrm common.DeletePrm
|
||||||
|
@ -131,10 +131,10 @@ func Test_ObjectNotFoundIfNotDeletedFromMetabase(t *testing.T) {
|
||||||
_, err = sh.blobStor.Delete(context.Background(), bsDeletePrm)
|
_, err = sh.blobStor.Delete(context.Background(), bsDeletePrm)
|
||||||
require.NoError(t, err, "failed to delete from blobstore")
|
require.NoError(t, err, "failed to delete from blobstore")
|
||||||
|
|
||||||
//check existance in blobstore
|
//check existence in blobstore
|
||||||
exRes, err = sh.blobStor.Exists(context.Background(), bsExisted)
|
exRes, err = sh.blobStor.Exists(context.Background(), bsExisted)
|
||||||
require.NoError(t, err, "failed to check blobstore existance")
|
require.NoError(t, err, "failed to check blobstore existence")
|
||||||
require.False(t, exRes.Exists, "invalid blobstore existance result")
|
require.False(t, exRes.Exists, "invalid blobstore existence result")
|
||||||
|
|
||||||
//get should return object not found
|
//get should return object not found
|
||||||
_, err = sh.Get(context.Background(), getPrm)
|
_, err = sh.Get(context.Background(), getPrm)
|
||||||
|
|
|
@ -1,17 +1,15 @@
|
||||||
package shard_test
|
package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
|
||||||
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
@ -27,13 +25,11 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
|
||||||
Value: 100,
|
Value: 100,
|
||||||
}
|
}
|
||||||
|
|
||||||
wcOpts := writecacheconfig.Options{
|
sh := newCustomShard(t, false, shardOptions{
|
||||||
Type: writecacheconfig.TypeBBolt,
|
metaOptions: []meta.Option{meta.WithEpochState(epoch)},
|
||||||
}
|
additionalShardOptions: []Option{WithGCWorkerPoolInitializer(func(int) util.WorkerPool {
|
||||||
sh := newCustomShard(t, t.TempDir(), false, wcOpts, nil, []meta.Option{meta.WithEpochState(epoch)})
|
return util.NewPseudoWorkerPool() // synchronous event processing
|
||||||
|
})},
|
||||||
t.Cleanup(func() {
|
|
||||||
releaseShard(sh, t)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
cnr := cidtest.ID()
|
cnr := cidtest.ID()
|
||||||
|
@ -55,7 +51,7 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
|
||||||
lock.SetAttributes(lockExpirationAttr)
|
lock.SetAttributes(lockExpirationAttr)
|
||||||
lockID, _ := lock.ID()
|
lockID, _ := lock.ID()
|
||||||
|
|
||||||
var putPrm shard.PutPrm
|
var putPrm PutPrm
|
||||||
putPrm.SetObject(obj)
|
putPrm.SetObject(obj)
|
||||||
|
|
||||||
_, err := sh.Put(context.Background(), putPrm)
|
_, err := sh.Put(context.Background(), putPrm)
|
||||||
|
@ -69,14 +65,12 @@ func Test_GCDropsLockedExpiredSimpleObject(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
epoch.Value = 105
|
epoch.Value = 105
|
||||||
sh.NotificationChannel() <- shard.EventNewEpoch(epoch.Value)
|
sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))
|
||||||
|
|
||||||
var getPrm shard.GetPrm
|
var getPrm GetPrm
|
||||||
getPrm.SetAddress(objectCore.AddressOf(obj))
|
getPrm.SetAddress(objectCore.AddressOf(obj))
|
||||||
require.Eventually(t, func() bool {
|
|
||||||
_, err = sh.Get(context.Background(), getPrm)
|
_, err = sh.Get(context.Background(), getPrm)
|
||||||
return client.IsErrObjectNotFound(err)
|
require.True(t, client.IsErrObjectNotFound(err), "expired object must be deleted")
|
||||||
}, 3*time.Second, 1*time.Second, "expired object must be deleted")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
|
func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
|
||||||
|
@ -127,13 +121,11 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
|
||||||
|
|
||||||
linkID, _ := link.ID()
|
linkID, _ := link.ID()
|
||||||
|
|
||||||
wcOpts := writecacheconfig.Options{
|
sh := newCustomShard(t, false, shardOptions{
|
||||||
Type: writecacheconfig.TypeBBolt,
|
metaOptions: []meta.Option{meta.WithEpochState(epoch)},
|
||||||
}
|
additionalShardOptions: []Option{WithGCWorkerPoolInitializer(func(int) util.WorkerPool {
|
||||||
sh := newCustomShard(t, t.TempDir(), false, wcOpts, nil, []meta.Option{meta.WithEpochState(epoch)})
|
return util.NewPseudoWorkerPool() // synchronous event processing
|
||||||
|
})},
|
||||||
t.Cleanup(func() {
|
|
||||||
releaseShard(sh, t)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
lock := testutil.GenerateObjectWithCID(cnr)
|
lock := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
@ -141,7 +133,7 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
|
||||||
lock.SetAttributes(lockExpirationAttr)
|
lock.SetAttributes(lockExpirationAttr)
|
||||||
lockID, _ := lock.ID()
|
lockID, _ := lock.ID()
|
||||||
|
|
||||||
var putPrm shard.PutPrm
|
var putPrm PutPrm
|
||||||
|
|
||||||
for _, child := range children {
|
for _, child := range children {
|
||||||
putPrm.SetObject(child)
|
putPrm.SetObject(child)
|
||||||
|
@ -160,7 +152,7 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
|
||||||
_, err = sh.Put(context.Background(), putPrm)
|
_, err = sh.Put(context.Background(), putPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var getPrm shard.GetPrm
|
var getPrm GetPrm
|
||||||
getPrm.SetAddress(objectCore.AddressOf(parent))
|
getPrm.SetAddress(objectCore.AddressOf(parent))
|
||||||
|
|
||||||
_, err = sh.Get(context.Background(), getPrm)
|
_, err = sh.Get(context.Background(), getPrm)
|
||||||
|
@ -168,10 +160,8 @@ func Test_GCDropsLockedExpiredComplexObject(t *testing.T) {
|
||||||
require.True(t, errors.As(err, &splitInfoError), "split info must be provided")
|
require.True(t, errors.As(err, &splitInfoError), "split info must be provided")
|
||||||
|
|
||||||
epoch.Value = 105
|
epoch.Value = 105
|
||||||
sh.NotificationChannel() <- shard.EventNewEpoch(epoch.Value)
|
sh.gc.handleEvent(context.Background(), EventNewEpoch(epoch.Value))
|
||||||
|
|
||||||
require.Eventually(t, func() bool {
|
|
||||||
_, err = sh.Get(context.Background(), getPrm)
|
_, err = sh.Get(context.Background(), getPrm)
|
||||||
return client.IsErrObjectNotFound(err)
|
require.True(t, client.IsErrObjectNotFound(err), "expired complex object must be deleted on epoch after lock expires")
|
||||||
}, 3*time.Second, 1*time.Second, "expired complex object must be deleted on epoch after lock expires")
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package shard_test
|
package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
@ -9,7 +9,6 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
@ -33,10 +32,9 @@ func TestShard_Get(t *testing.T) {
|
||||||
|
|
||||||
func testShardGet(t *testing.T, hasWriteCache bool) {
|
func testShardGet(t *testing.T, hasWriteCache bool) {
|
||||||
sh := newShard(t, hasWriteCache)
|
sh := newShard(t, hasWriteCache)
|
||||||
defer releaseShard(sh, t)
|
|
||||||
|
|
||||||
var putPrm shard.PutPrm
|
var putPrm PutPrm
|
||||||
var getPrm shard.GetPrm
|
var getPrm GetPrm
|
||||||
|
|
||||||
t.Run("small object", func(t *testing.T) {
|
t.Run("small object", func(t *testing.T) {
|
||||||
obj := testutil.GenerateObject()
|
obj := testutil.GenerateObject()
|
||||||
|
@ -116,7 +114,7 @@ func testShardGet(t *testing.T, hasWriteCache bool) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func testGet(t *testing.T, sh *shard.Shard, getPrm shard.GetPrm, hasWriteCache bool) (shard.GetRes, error) {
|
func testGet(t *testing.T, sh *Shard, getPrm GetPrm, hasWriteCache bool) (GetRes, error) {
|
||||||
res, err := sh.Get(context.Background(), getPrm)
|
res, err := sh.Get(context.Background(), getPrm)
|
||||||
if hasWriteCache {
|
if hasWriteCache {
|
||||||
require.Eventually(t, func() bool {
|
require.Eventually(t, func() bool {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package shard_test
|
package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -8,7 +8,6 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
@ -31,10 +30,9 @@ func TestShard_Head(t *testing.T) {
|
||||||
|
|
||||||
func testShardHead(t *testing.T, hasWriteCache bool) {
|
func testShardHead(t *testing.T, hasWriteCache bool) {
|
||||||
sh := newShard(t, hasWriteCache)
|
sh := newShard(t, hasWriteCache)
|
||||||
defer releaseShard(sh, t)
|
|
||||||
|
|
||||||
var putPrm shard.PutPrm
|
var putPrm PutPrm
|
||||||
var headPrm shard.HeadPrm
|
var headPrm HeadPrm
|
||||||
|
|
||||||
t.Run("regular object", func(t *testing.T) {
|
t.Run("regular object", func(t *testing.T) {
|
||||||
obj := testutil.GenerateObject()
|
obj := testutil.GenerateObject()
|
||||||
|
@ -87,7 +85,7 @@ func testShardHead(t *testing.T, hasWriteCache bool) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func testHead(t *testing.T, sh *shard.Shard, headPrm shard.HeadPrm, hasWriteCache bool) (shard.HeadRes, error) {
|
func testHead(t *testing.T, sh *Shard, headPrm HeadPrm, hasWriteCache bool) (HeadRes, error) {
|
||||||
res, err := sh.Head(context.Background(), headPrm)
|
res, err := sh.Head(context.Background(), headPrm)
|
||||||
if hasWriteCache {
|
if hasWriteCache {
|
||||||
require.Eventually(t, func() bool {
|
require.Eventually(t, func() bool {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package shard_test
|
package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -6,7 +6,6 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -28,7 +27,6 @@ func TestShard_Inhume(t *testing.T) {
|
||||||
|
|
||||||
func testShardInhume(t *testing.T, hasWriteCache bool) {
|
func testShardInhume(t *testing.T, hasWriteCache bool) {
|
||||||
sh := newShard(t, hasWriteCache)
|
sh := newShard(t, hasWriteCache)
|
||||||
defer releaseShard(sh, t)
|
|
||||||
|
|
||||||
cnr := cidtest.ID()
|
cnr := cidtest.ID()
|
||||||
|
|
||||||
|
@ -37,13 +35,13 @@ func testShardInhume(t *testing.T, hasWriteCache bool) {
|
||||||
|
|
||||||
ts := testutil.GenerateObjectWithCID(cnr)
|
ts := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
|
||||||
var putPrm shard.PutPrm
|
var putPrm PutPrm
|
||||||
putPrm.SetObject(obj)
|
putPrm.SetObject(obj)
|
||||||
|
|
||||||
var inhPrm shard.InhumePrm
|
var inhPrm InhumePrm
|
||||||
inhPrm.SetTarget(object.AddressOf(ts), object.AddressOf(obj))
|
inhPrm.SetTarget(object.AddressOf(ts), object.AddressOf(obj))
|
||||||
|
|
||||||
var getPrm shard.GetPrm
|
var getPrm GetPrm
|
||||||
getPrm.SetAddress(object.AddressOf(obj))
|
getPrm.SetAddress(object.AddressOf(obj))
|
||||||
|
|
||||||
_, err := sh.Put(context.Background(), putPrm)
|
_, err := sh.Put(context.Background(), putPrm)
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package shard_test
|
package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -7,7 +7,6 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
@ -19,19 +18,17 @@ func TestShard_List(t *testing.T) {
|
||||||
t.Run("without write cache", func(t *testing.T) {
|
t.Run("without write cache", func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
sh := newShard(t, false)
|
sh := newShard(t, false)
|
||||||
defer releaseShard(sh, t)
|
|
||||||
testShardList(t, sh)
|
testShardList(t, sh)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("with write cache", func(t *testing.T) {
|
t.Run("with write cache", func(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
shWC := newShard(t, true)
|
shWC := newShard(t, true)
|
||||||
defer releaseShard(shWC, t)
|
|
||||||
testShardList(t, shWC)
|
testShardList(t, shWC)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func testShardList(t *testing.T, sh *shard.Shard) {
|
func testShardList(t *testing.T, sh *Shard) {
|
||||||
const C = 10
|
const C = 10
|
||||||
const N = 5
|
const N = 5
|
||||||
|
|
||||||
|
@ -59,7 +56,7 @@ func testShardList(t *testing.T, sh *shard.Shard) {
|
||||||
objs[object.AddressOf(obj).EncodeToString()] = 0
|
objs[object.AddressOf(obj).EncodeToString()] = 0
|
||||||
mtx.Unlock()
|
mtx.Unlock()
|
||||||
|
|
||||||
var putPrm shard.PutPrm
|
var putPrm PutPrm
|
||||||
putPrm.SetObject(obj)
|
putPrm.SetObject(obj)
|
||||||
|
|
||||||
_, err := sh.Put(context.Background(), putPrm)
|
_, err := sh.Put(context.Background(), putPrm)
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package shard_test
|
package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -11,7 +11,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
@ -26,13 +25,13 @@ import (
|
||||||
func TestShard_Lock(t *testing.T) {
|
func TestShard_Lock(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
var sh *shard.Shard
|
var sh *Shard
|
||||||
|
|
||||||
rootPath := t.TempDir()
|
rootPath := t.TempDir()
|
||||||
opts := []shard.Option{
|
opts := []Option{
|
||||||
shard.WithID(shard.NewIDFromBytes([]byte{})),
|
WithID(NewIDFromBytes([]byte{})),
|
||||||
shard.WithLogger(&logger.Logger{Logger: zap.NewNop()}),
|
WithLogger(&logger.Logger{Logger: zap.NewNop()}),
|
||||||
shard.WithBlobStorOptions(
|
WithBlobStorOptions(
|
||||||
blobstor.WithStorages([]blobstor.SubStorage{
|
blobstor.WithStorages([]blobstor.SubStorage{
|
||||||
{
|
{
|
||||||
Storage: blobovniczatree.NewBlobovniczaTree(
|
Storage: blobovniczatree.NewBlobovniczaTree(
|
||||||
|
@ -49,16 +48,16 @@ func TestShard_Lock(t *testing.T) {
|
||||||
},
|
},
|
||||||
}),
|
}),
|
||||||
),
|
),
|
||||||
shard.WithMetaBaseOptions(
|
WithMetaBaseOptions(
|
||||||
meta.WithPath(filepath.Join(rootPath, "meta")),
|
meta.WithPath(filepath.Join(rootPath, "meta")),
|
||||||
meta.WithEpochState(epochState{}),
|
meta.WithEpochState(epochState{}),
|
||||||
),
|
),
|
||||||
shard.WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) {
|
WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) {
|
||||||
sh.HandleDeletedLocks(addresses)
|
sh.HandleDeletedLocks(addresses)
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
|
|
||||||
sh = shard.New(opts...)
|
sh = New(opts...)
|
||||||
require.NoError(t, sh.Open())
|
require.NoError(t, sh.Open())
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
|
|
||||||
|
@ -76,7 +75,7 @@ func TestShard_Lock(t *testing.T) {
|
||||||
|
|
||||||
// put the object
|
// put the object
|
||||||
|
|
||||||
var putPrm shard.PutPrm
|
var putPrm PutPrm
|
||||||
putPrm.SetObject(obj)
|
putPrm.SetObject(obj)
|
||||||
|
|
||||||
_, err := sh.Put(context.Background(), putPrm)
|
_, err := sh.Put(context.Background(), putPrm)
|
||||||
|
@ -94,7 +93,7 @@ func TestShard_Lock(t *testing.T) {
|
||||||
t.Run("inhuming locked objects", func(t *testing.T) {
|
t.Run("inhuming locked objects", func(t *testing.T) {
|
||||||
ts := testutil.GenerateObjectWithCID(cnr)
|
ts := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
|
||||||
var inhumePrm shard.InhumePrm
|
var inhumePrm InhumePrm
|
||||||
inhumePrm.SetTarget(objectcore.AddressOf(ts), objectcore.AddressOf(obj))
|
inhumePrm.SetTarget(objectcore.AddressOf(ts), objectcore.AddressOf(obj))
|
||||||
|
|
||||||
var objLockedErr *apistatus.ObjectLocked
|
var objLockedErr *apistatus.ObjectLocked
|
||||||
|
@ -110,7 +109,7 @@ func TestShard_Lock(t *testing.T) {
|
||||||
t.Run("inhuming lock objects", func(t *testing.T) {
|
t.Run("inhuming lock objects", func(t *testing.T) {
|
||||||
ts := testutil.GenerateObjectWithCID(cnr)
|
ts := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
|
||||||
var inhumePrm shard.InhumePrm
|
var inhumePrm InhumePrm
|
||||||
inhumePrm.SetTarget(objectcore.AddressOf(ts), objectcore.AddressOf(lock))
|
inhumePrm.SetTarget(objectcore.AddressOf(ts), objectcore.AddressOf(lock))
|
||||||
|
|
||||||
_, err = sh.Inhume(context.Background(), inhumePrm)
|
_, err = sh.Inhume(context.Background(), inhumePrm)
|
||||||
|
@ -122,7 +121,7 @@ func TestShard_Lock(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("force objects inhuming", func(t *testing.T) {
|
t.Run("force objects inhuming", func(t *testing.T) {
|
||||||
var inhumePrm shard.InhumePrm
|
var inhumePrm InhumePrm
|
||||||
inhumePrm.MarkAsGarbage(objectcore.AddressOf(lock))
|
inhumePrm.MarkAsGarbage(objectcore.AddressOf(lock))
|
||||||
inhumePrm.ForceRemoval()
|
inhumePrm.ForceRemoval()
|
||||||
|
|
||||||
|
@ -132,7 +131,7 @@ func TestShard_Lock(t *testing.T) {
|
||||||
// it should be possible to remove
|
// it should be possible to remove
|
||||||
// lock object now
|
// lock object now
|
||||||
|
|
||||||
inhumePrm = shard.InhumePrm{}
|
inhumePrm = InhumePrm{}
|
||||||
inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj))
|
inhumePrm.MarkAsGarbage(objectcore.AddressOf(obj))
|
||||||
|
|
||||||
_, err = sh.Inhume(context.Background(), inhumePrm)
|
_, err = sh.Inhume(context.Background(), inhumePrm)
|
||||||
|
@ -140,7 +139,7 @@ func TestShard_Lock(t *testing.T) {
|
||||||
|
|
||||||
// check that object has been removed
|
// check that object has been removed
|
||||||
|
|
||||||
var getPrm shard.GetPrm
|
var getPrm GetPrm
|
||||||
getPrm.SetAddress(objectcore.AddressOf(obj))
|
getPrm.SetAddress(objectcore.AddressOf(obj))
|
||||||
|
|
||||||
_, err = sh.Get(context.Background(), getPrm)
|
_, err = sh.Get(context.Background(), getPrm)
|
||||||
|
@ -160,7 +159,7 @@ func TestShard_IsLocked(t *testing.T) {
|
||||||
|
|
||||||
// put the object
|
// put the object
|
||||||
|
|
||||||
var putPrm shard.PutPrm
|
var putPrm PutPrm
|
||||||
putPrm.SetObject(obj)
|
putPrm.SetObject(obj)
|
||||||
|
|
||||||
_, err := sh.Put(context.Background(), putPrm)
|
_, err := sh.Put(context.Background(), putPrm)
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package shard_test
|
package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -12,7 +12,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard/mode"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -126,9 +125,6 @@ func (m *metricsStore) DeleteShardMetrics() {
|
||||||
m.errCounter = 0
|
m.errCounter = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
const physical = "phy"
|
|
||||||
const logical = "logic"
|
|
||||||
|
|
||||||
func TestCounters(t *testing.T) {
|
func TestCounters(t *testing.T) {
|
||||||
t.Parallel()
|
t.Parallel()
|
||||||
|
|
||||||
|
@ -163,8 +159,7 @@ func TestCounters(t *testing.T) {
|
||||||
totalPayload += oSize
|
totalPayload += oSize
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Run("put", func(t *testing.T) {
|
var prm PutPrm
|
||||||
var prm shard.PutPrm
|
|
||||||
|
|
||||||
for i := 0; i < objNumber; i++ {
|
for i := 0; i < objNumber; i++ {
|
||||||
prm.SetObject(oo[i])
|
prm.SetObject(oo[i])
|
||||||
|
@ -177,10 +172,9 @@ func TestCounters(t *testing.T) {
|
||||||
require.Equal(t, uint64(objNumber), mm.getObjectCounter(logical))
|
require.Equal(t, uint64(objNumber), mm.getObjectCounter(logical))
|
||||||
require.Equal(t, expectedSizes, mm.containerSizes())
|
require.Equal(t, expectedSizes, mm.containerSizes())
|
||||||
require.Equal(t, totalPayload, mm.payloadSize())
|
require.Equal(t, totalPayload, mm.payloadSize())
|
||||||
})
|
|
||||||
|
|
||||||
t.Run("inhume_GC", func(t *testing.T) {
|
t.Run("inhume_GC", func(t *testing.T) {
|
||||||
var prm shard.InhumePrm
|
var prm InhumePrm
|
||||||
inhumedNumber := objNumber / 4
|
inhumedNumber := objNumber / 4
|
||||||
|
|
||||||
for i := 0; i < inhumedNumber; i++ {
|
for i := 0; i < inhumedNumber; i++ {
|
||||||
|
@ -199,7 +193,7 @@ func TestCounters(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("inhume_TS", func(t *testing.T) {
|
t.Run("inhume_TS", func(t *testing.T) {
|
||||||
var prm shard.InhumePrm
|
var prm InhumePrm
|
||||||
ts := objectcore.AddressOf(testutil.GenerateObject())
|
ts := objectcore.AddressOf(testutil.GenerateObject())
|
||||||
|
|
||||||
phy := mm.getObjectCounter(physical)
|
phy := mm.getObjectCounter(physical)
|
||||||
|
@ -220,7 +214,7 @@ func TestCounters(t *testing.T) {
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("Delete", func(t *testing.T) {
|
t.Run("Delete", func(t *testing.T) {
|
||||||
var prm shard.DeletePrm
|
var prm DeletePrm
|
||||||
|
|
||||||
phy := mm.getObjectCounter(physical)
|
phy := mm.getObjectCounter(physical)
|
||||||
logic := mm.getObjectCounter(logical)
|
logic := mm.getObjectCounter(logical)
|
||||||
|
@ -246,7 +240,7 @@ func TestCounters(t *testing.T) {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func shardWithMetrics(t *testing.T, path string) (*shard.Shard, *metricsStore) {
|
func shardWithMetrics(t *testing.T, path string) (*Shard, *metricsStore) {
|
||||||
blobOpts := []blobstor.Option{
|
blobOpts := []blobstor.Option{
|
||||||
blobstor.WithStorages([]blobstor.SubStorage{
|
blobstor.WithStorages([]blobstor.SubStorage{
|
||||||
{
|
{
|
||||||
|
@ -266,14 +260,14 @@ func shardWithMetrics(t *testing.T, path string) (*shard.Shard, *metricsStore) {
|
||||||
cnrSize: make(map[string]int64),
|
cnrSize: make(map[string]int64),
|
||||||
}
|
}
|
||||||
|
|
||||||
sh := shard.New(
|
sh := New(
|
||||||
shard.WithID(shard.NewIDFromBytes([]byte{})),
|
WithID(NewIDFromBytes([]byte{})),
|
||||||
shard.WithBlobStorOptions(blobOpts...),
|
WithBlobStorOptions(blobOpts...),
|
||||||
shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(path, "pilorama"))),
|
WithPiloramaOptions(pilorama.WithPath(filepath.Join(path, "pilorama"))),
|
||||||
shard.WithMetaBaseOptions(
|
WithMetaBaseOptions(
|
||||||
meta.WithPath(filepath.Join(path, "meta")),
|
meta.WithPath(filepath.Join(path, "meta")),
|
||||||
meta.WithEpochState(epochState{})),
|
meta.WithEpochState(epochState{})),
|
||||||
shard.WithMetricsWriter(mm),
|
WithMetricsWriter(mm),
|
||||||
)
|
)
|
||||||
require.NoError(t, sh.Open())
|
require.NoError(t, sh.Open())
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package shard_test
|
package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -11,7 +11,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
||||||
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config"
|
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||||
|
@ -77,8 +76,10 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
sh := newCustomShard(t, t.TempDir(), hasWriteCache, wcOpts,
|
sh := newCustomShard(t, hasWriteCache, shardOptions{
|
||||||
[]blobstor.Option{blobstor.WithStorages([]blobstor.SubStorage{
|
wcOpts: wcOpts,
|
||||||
|
bsOpts: []blobstor.Option{
|
||||||
|
blobstor.WithStorages([]blobstor.SubStorage{
|
||||||
{
|
{
|
||||||
Storage: blobovniczatree.NewBlobovniczaTree(
|
Storage: blobovniczatree.NewBlobovniczaTree(
|
||||||
blobovniczatree.WithLogger(test.NewLogger(t, true)),
|
blobovniczatree.WithLogger(test.NewLogger(t, true)),
|
||||||
|
@ -93,9 +94,9 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) {
|
||||||
Storage: fstree.New(
|
Storage: fstree.New(
|
||||||
fstree.WithPath(filepath.Join(t.TempDir(), "blob"))),
|
fstree.WithPath(filepath.Join(t.TempDir(), "blob"))),
|
||||||
},
|
},
|
||||||
})},
|
}),
|
||||||
nil)
|
},
|
||||||
defer releaseShard(sh, t)
|
})
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
@ -106,13 +107,13 @@ func testShardGetRange(t *testing.T, hasWriteCache bool) {
|
||||||
addr := object.AddressOf(obj)
|
addr := object.AddressOf(obj)
|
||||||
payload := slice.Copy(obj.Payload())
|
payload := slice.Copy(obj.Payload())
|
||||||
|
|
||||||
var putPrm shard.PutPrm
|
var putPrm PutPrm
|
||||||
putPrm.SetObject(obj)
|
putPrm.SetObject(obj)
|
||||||
|
|
||||||
_, err := sh.Put(context.Background(), putPrm)
|
_, err := sh.Put(context.Background(), putPrm)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
var rngPrm shard.RngPrm
|
var rngPrm RngPrm
|
||||||
rngPrm.SetAddress(addr)
|
rngPrm.SetAddress(addr)
|
||||||
rngPrm.SetRange(tc.rng.GetOffset(), tc.rng.GetLength())
|
rngPrm.SetRange(tc.rng.GetOffset(), tc.rng.GetLength())
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package shard_test
|
package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -11,7 +11,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/fstree"
|
||||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/pilorama"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
||||||
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config"
|
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebadger"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
|
||||||
|
@ -31,39 +30,50 @@ func (s epochState) CurrentEpoch() uint64 {
|
||||||
return s.Value
|
return s.Value
|
||||||
}
|
}
|
||||||
|
|
||||||
func newShard(t testing.TB, enableWriteCache bool) *shard.Shard {
|
type shardOptions struct {
|
||||||
return newCustomShard(t, t.TempDir(), enableWriteCache,
|
rootPath string
|
||||||
writecacheconfig.Options{Type: writecacheconfig.TypeBBolt},
|
dontRelease bool
|
||||||
nil,
|
wcOpts writecacheconfig.Options
|
||||||
nil)
|
bsOpts []blobstor.Option
|
||||||
|
metaOptions []meta.Option
|
||||||
|
|
||||||
|
additionalShardOptions []Option
|
||||||
}
|
}
|
||||||
|
|
||||||
func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts writecacheconfig.Options, bsOpts []blobstor.Option, metaOptions []meta.Option) *shard.Shard {
|
func newShard(t testing.TB, enableWriteCache bool) *Shard {
|
||||||
var sh *shard.Shard
|
return newCustomShard(t, enableWriteCache, shardOptions{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func newCustomShard(t testing.TB, enableWriteCache bool, o shardOptions) *Shard {
|
||||||
|
if o.rootPath == "" {
|
||||||
|
o.rootPath = t.TempDir()
|
||||||
|
}
|
||||||
|
if enableWriteCache && o.wcOpts.Type == 0 {
|
||||||
|
o.wcOpts.Type = writecacheconfig.TypeBBolt
|
||||||
|
}
|
||||||
|
|
||||||
|
var sh *Shard
|
||||||
if enableWriteCache {
|
if enableWriteCache {
|
||||||
rootPath = filepath.Join(rootPath, "wc")
|
switch o.wcOpts.Type {
|
||||||
switch wcOpts.Type {
|
|
||||||
case writecacheconfig.TypeBBolt:
|
case writecacheconfig.TypeBBolt:
|
||||||
wcOpts.BBoltOptions = append(
|
o.wcOpts.BBoltOptions = append(
|
||||||
[]writecachebbolt.Option{writecachebbolt.WithPath(filepath.Join(rootPath, "wcache"))},
|
[]writecachebbolt.Option{writecachebbolt.WithPath(filepath.Join(o.rootPath, "wcache"))},
|
||||||
wcOpts.BBoltOptions...)
|
o.wcOpts.BBoltOptions...)
|
||||||
case writecacheconfig.TypeBadger:
|
case writecacheconfig.TypeBadger:
|
||||||
wcOpts.BadgerOptions = append(
|
o.wcOpts.BadgerOptions = append(
|
||||||
[]writecachebadger.Option{writecachebadger.WithPath(filepath.Join(rootPath, "wcache"))},
|
[]writecachebadger.Option{writecachebadger.WithPath(filepath.Join(o.rootPath, "wcache"))},
|
||||||
wcOpts.BadgerOptions...)
|
o.wcOpts.BadgerOptions...)
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
rootPath = filepath.Join(rootPath, "nowc")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if bsOpts == nil {
|
if o.bsOpts == nil {
|
||||||
bsOpts = []blobstor.Option{
|
o.bsOpts = []blobstor.Option{
|
||||||
blobstor.WithLogger(test.NewLogger(t, true)),
|
blobstor.WithLogger(test.NewLogger(t, true)),
|
||||||
blobstor.WithStorages([]blobstor.SubStorage{
|
blobstor.WithStorages([]blobstor.SubStorage{
|
||||||
{
|
{
|
||||||
Storage: blobovniczatree.NewBlobovniczaTree(
|
Storage: blobovniczatree.NewBlobovniczaTree(
|
||||||
blobovniczatree.WithLogger(test.NewLogger(t, true)),
|
blobovniczatree.WithLogger(test.NewLogger(t, true)),
|
||||||
blobovniczatree.WithRootPath(filepath.Join(rootPath, "blob", "blobovnicza")),
|
blobovniczatree.WithRootPath(filepath.Join(o.rootPath, "blob", "blobovnicza")),
|
||||||
blobovniczatree.WithBlobovniczaShallowDepth(1),
|
blobovniczatree.WithBlobovniczaShallowDepth(1),
|
||||||
blobovniczatree.WithBlobovniczaShallowWidth(1)),
|
blobovniczatree.WithBlobovniczaShallowWidth(1)),
|
||||||
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
Policy: func(_ *objectSDK.Object, data []byte) bool {
|
||||||
|
@ -72,46 +82,51 @@ func newCustomShard(t testing.TB, rootPath string, enableWriteCache bool, wcOpts
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
Storage: fstree.New(
|
Storage: fstree.New(
|
||||||
fstree.WithPath(filepath.Join(rootPath, "blob"))),
|
fstree.WithPath(filepath.Join(o.rootPath, "blob"))),
|
||||||
},
|
},
|
||||||
}),
|
}),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
opts := []shard.Option{
|
opts := []Option{
|
||||||
shard.WithID(shard.NewIDFromBytes([]byte{})),
|
WithID(NewIDFromBytes([]byte{})),
|
||||||
shard.WithLogger(test.NewLogger(t, true)),
|
WithLogger(test.NewLogger(t, true)),
|
||||||
shard.WithBlobStorOptions(bsOpts...),
|
WithBlobStorOptions(o.bsOpts...),
|
||||||
shard.WithMetaBaseOptions(
|
WithMetaBaseOptions(
|
||||||
append([]meta.Option{
|
append([]meta.Option{
|
||||||
meta.WithPath(filepath.Join(rootPath, "meta")), meta.WithEpochState(epochState{})},
|
meta.WithPath(filepath.Join(o.rootPath, "meta")), meta.WithEpochState(epochState{})},
|
||||||
metaOptions...)...,
|
o.metaOptions...)...,
|
||||||
),
|
),
|
||||||
shard.WithPiloramaOptions(pilorama.WithPath(filepath.Join(rootPath, "pilorama"))),
|
WithPiloramaOptions(pilorama.WithPath(filepath.Join(o.rootPath, "pilorama"))),
|
||||||
shard.WithWriteCache(enableWriteCache),
|
WithWriteCache(enableWriteCache),
|
||||||
shard.WithWriteCacheOptions(wcOpts),
|
WithWriteCacheOptions(o.wcOpts),
|
||||||
shard.WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) {
|
WithDeletedLockCallback(func(_ context.Context, addresses []oid.Address) {
|
||||||
sh.HandleDeletedLocks(addresses)
|
sh.HandleDeletedLocks(addresses)
|
||||||
}),
|
}),
|
||||||
shard.WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) {
|
WithExpiredLocksCallback(func(ctx context.Context, epoch uint64, a []oid.Address) {
|
||||||
sh.HandleExpiredLocks(ctx, epoch, a)
|
sh.HandleExpiredLocks(ctx, epoch, a)
|
||||||
}),
|
}),
|
||||||
shard.WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
|
WithGCWorkerPoolInitializer(func(sz int) util.WorkerPool {
|
||||||
pool, err := ants.NewPool(sz)
|
pool, err := ants.NewPool(sz)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
return pool
|
return pool
|
||||||
}),
|
}),
|
||||||
shard.WithGCRemoverSleepInterval(100 * time.Millisecond),
|
WithGCRemoverSleepInterval(100 * time.Millisecond),
|
||||||
}
|
}
|
||||||
|
opts = append(opts, o.additionalShardOptions...)
|
||||||
|
|
||||||
sh = shard.New(opts...)
|
sh = New(opts...)
|
||||||
|
|
||||||
require.NoError(t, sh.Open())
|
require.NoError(t, sh.Open())
|
||||||
require.NoError(t, sh.Init(context.Background()))
|
require.NoError(t, sh.Init(context.Background()))
|
||||||
|
|
||||||
|
if !o.dontRelease {
|
||||||
|
t.Cleanup(func() { releaseShard(sh, t) })
|
||||||
|
}
|
||||||
|
|
||||||
return sh
|
return sh
|
||||||
}
|
}
|
||||||
|
|
||||||
func releaseShard(s *shard.Shard, t testing.TB) {
|
func releaseShard(s *Shard, t testing.TB) {
|
||||||
require.NoError(t, s.Close())
|
require.NoError(t, s.Close())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package shard_test
|
package shard
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -7,7 +7,6 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
|
||||||
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config"
|
writecacheconfig "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/config"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/writecache/writecachebbolt"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
|
@ -44,13 +43,13 @@ func TestWriteCacheObjectLoss(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
sh := newCustomShard(t, dir, true, wcOpts, nil, nil)
|
sh := newCustomShard(t, true, shardOptions{dontRelease: true, rootPath: dir, wcOpts: wcOpts})
|
||||||
|
|
||||||
var errG errgroup.Group
|
var errG errgroup.Group
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
obj := objects[i]
|
obj := objects[i]
|
||||||
errG.Go(func() error {
|
errG.Go(func() error {
|
||||||
var putPrm shard.PutPrm
|
var putPrm PutPrm
|
||||||
putPrm.SetObject(obj)
|
putPrm.SetObject(obj)
|
||||||
_, err := sh.Put(context.Background(), putPrm)
|
_, err := sh.Put(context.Background(), putPrm)
|
||||||
return err
|
return err
|
||||||
|
@ -59,10 +58,9 @@ func TestWriteCacheObjectLoss(t *testing.T) {
|
||||||
require.NoError(t, errG.Wait())
|
require.NoError(t, errG.Wait())
|
||||||
require.NoError(t, sh.Close())
|
require.NoError(t, sh.Close())
|
||||||
|
|
||||||
sh = newCustomShard(t, dir, true, wcOpts, nil, nil)
|
sh = newCustomShard(t, true, shardOptions{rootPath: dir, wcOpts: wcOpts})
|
||||||
defer releaseShard(sh, t)
|
|
||||||
|
|
||||||
var getPrm shard.GetPrm
|
var getPrm GetPrm
|
||||||
|
|
||||||
for i := range objects {
|
for i := range objects {
|
||||||
getPrm.SetAddress(object.AddressOf(objects[i]))
|
getPrm.SetAddress(object.AddressOf(objects[i]))
|
||||||
|
|
Loading…
Add table
Reference in a new issue