WIP: Change metabase engine to pebble #1221
3 changed files with 7 additions and 13 deletions
|
@ -44,9 +44,9 @@ func (r ListContainersRes) Containers() []cid.ID {
|
||||||
// ContainerSize returns the sum of estimation container sizes among all shards.
|
// ContainerSize returns the sum of estimation container sizes among all shards.
|
||||||
//
|
//
|
||||||
// Returns an error if executions are blocked (see BlockExecution).
|
// Returns an error if executions are blocked (see BlockExecution).
|
||||||
func (e *StorageEngine) ContainerSize(prm ContainerSizePrm) (res ContainerSizeRes, err error) {
|
func (e *StorageEngine) ContainerSize(ctx context.Context, prm ContainerSizePrm) (res ContainerSizeRes, err error) {
|
||||||
err = e.execIfNotBlocked(func() error {
|
err = e.execIfNotBlocked(func() error {
|
||||||
res, err = e.containerSize(prm)
|
res, err = e.containerSize(ctx, prm)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -54,12 +54,12 @@ func (e *StorageEngine) ContainerSize(prm ContainerSizePrm) (res ContainerSizeRe
|
||||||
}
|
}
|
||||||
|
|
||||||
// ContainerSize calls ContainerSize method on engine to calculate sum of estimation container sizes among all shards.
|
// ContainerSize calls ContainerSize method on engine to calculate sum of estimation container sizes among all shards.
|
||||||
func ContainerSize(e *StorageEngine, id cid.ID) (uint64, error) {
|
func ContainerSize(ctx context.Context, e *StorageEngine, id cid.ID) (uint64, error) {
|
||||||
var prm ContainerSizePrm
|
var prm ContainerSizePrm
|
||||||
|
|
||||||
prm.SetContainerID(id)
|
prm.SetContainerID(id)
|
||||||
|
|
||||||
res, err := e.ContainerSize(prm)
|
res, err := e.ContainerSize(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
@ -67,7 +67,7 @@ func ContainerSize(e *StorageEngine, id cid.ID) (uint64, error) {
|
||||||
return res.Size(), nil
|
return res.Size(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *StorageEngine) containerSize(prm ContainerSizePrm) (res ContainerSizeRes, err error) {
|
func (e *StorageEngine) containerSize(ctx context.Context, prm ContainerSizePrm) (res ContainerSizeRes, err error) {
|
||||||
if e.metrics != nil {
|
if e.metrics != nil {
|
||||||
defer elapsed("EstimateContainerSize", e.metrics.AddMethodDuration)()
|
defer elapsed("EstimateContainerSize", e.metrics.AddMethodDuration)()
|
||||||
}
|
}
|
||||||
|
@ -76,7 +76,7 @@ func (e *StorageEngine) containerSize(prm ContainerSizePrm) (res ContainerSizeRe
|
||||||
var csPrm shard.ContainerSizePrm
|
var csPrm shard.ContainerSizePrm
|
||||||
csPrm.SetContainerID(prm.cnr)
|
csPrm.SetContainerID(prm.cnr)
|
||||||
|
|
||||||
csRes, err := sh.Shard.ContainerSize(csPrm)
|
csRes, err := sh.Shard.ContainerSize(ctx, csPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.reportShardError(sh, "can't get container size", err,
|
e.reportShardError(sh, "can't get container size", err,
|
||||||
zap.Stringer("container_id", prm.cnr))
|
zap.Stringer("container_id", prm.cnr))
|
||||||
|
|
|
@ -10,7 +10,6 @@ import (
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
|
@ -24,7 +23,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.etcd.io/bbolt"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// TestInitializationFailure checks that shard is initialized and closed even if media
|
// TestInitializationFailure checks that shard is initialized and closed even if media
|
||||||
|
@ -53,10 +51,6 @@ func TestInitializationFailure(t *testing.T) {
|
||||||
shard.WithBlobStorOptions(
|
shard.WithBlobStorOptions(
|
||||||
blobstor.WithStorages(storages)),
|
blobstor.WithStorages(storages)),
|
||||||
shard.WithMetaBaseOptions(
|
shard.WithMetaBaseOptions(
|
||||||
meta.WithBoltDBOptions(&bbolt.Options{
|
|
||||||
Timeout: 100 * time.Millisecond,
|
|
||||||
OpenFile: opts.openFileMetabase,
|
|
||||||
}),
|
|
||||||
meta.WithPath(filepath.Join(t.TempDir(), "metabase")),
|
meta.WithPath(filepath.Join(t.TempDir(), "metabase")),
|
||||||
meta.WithPermissions(0o700),
|
meta.WithPermissions(0o700),
|
||||||
meta.WithEpochState(epochState{})),
|
meta.WithEpochState(epochState{})),
|
||||||
|
|
|
@ -317,7 +317,7 @@ func (e *StorageEngine) processZeroSizeContainers(ctx context.Context, ids []cid
|
||||||
var drop []cid.ID
|
var drop []cid.ID
|
||||||
for id := range idMap {
|
for id := range idMap {
|
||||||
prm.SetContainerID(id)
|
prm.SetContainerID(id)
|
||||||
s, err := sh.ContainerSize(prm)
|
s, err := sh.ContainerSize(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.log.Warn(logs.EngineFailedToGetContainerSize, zap.Stringer("container_id", id), zap.Error(err))
|
e.log.Warn(logs.EngineFailedToGetContainerSize, zap.Stringer("container_id", id), zap.Error(err))
|
||||||
failed = true
|
failed = true
|
||||||
|
|
Loading…
Reference in a new issue