metabase: Drop toMoveIt bucket #997

Merged
dstepanov-yadro merged 1 commit from dstepanov-yadro/frostfs-node:feat/drop_move_it into master 2024-02-21 07:22:24 +00:00
10 changed files with 16 additions and 347 deletions

View file

@ -220,7 +220,6 @@ const (
EngineCouldNotCloseShard = "could not close shard"
EngineCouldNotReloadAShard = "could not reload a shard"
EngineAddedNewShard = "added new shard"
EngineCouldNotMarkObjectForShardRelocation = "could not mark object for shard relocation"
EngineCouldNotPutObjectToShard = "could not put object to shard"
EngineErrorDuringSearchingForObjectChildren = "error during searching for object children"
EngineCouldNotInhumeObjectInShard = "could not inhume object in shard"

View file

@ -700,7 +700,7 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok {
continue
}
switch e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object).status {
switch e.putToShard(ctx, shards[j].hashedShard, shards[j].pool, addr, object).status {
case putToShardSuccess:
res.objEvacuated.Add(1)
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,

View file

@ -84,7 +84,7 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
}
var shRes putToShardRes
e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) {
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
e.mtx.RLock()
pool, ok := e.shardPools[sh.ID().String()]
e.mtx.RUnlock()
@ -92,7 +92,7 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
// Shard was concurrently removed, skip.
return false
}
shRes = e.putToShard(ctx, sh, ind, pool, addr, prm.obj)
shRes = e.putToShard(ctx, sh, pool, addr, prm.obj)
return shRes.status != putToShardUnknown
})
switch shRes.status {
@ -109,7 +109,7 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
// putToShard puts object to sh.
// Return putToShardStatus and error if it is necessary to propagate an error upper.
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, pool util.WorkerPool,
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool util.WorkerPool,
addr oid.Address, obj *objectSDK.Object,
) (res putToShardRes) {
exitCh := make(chan struct{})
@ -132,20 +132,6 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
}
if exists.Exists() {
if ind != 0 {
var toMoveItPrm shard.ToMoveItPrm
toMoveItPrm.SetAddress(addr)
_, err = sh.ToMoveIt(ctx, toMoveItPrm)
if err != nil {
e.log.Warn(logs.EngineCouldNotMarkObjectForShardRelocation,
zap.Stringer("shard", sh.ID()),
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
)
}
}
res.status = putToShardExists
return
}

View file

@ -74,23 +74,6 @@ func TestDB_Containers(t *testing.T) {
require.NoError(t, err)
assertContains(cnrs, cnr)
})
t.Run("ToMoveIt", func(t *testing.T) {
obj := testutil.GenerateObject()
require.NoError(t, putBig(db, obj))
cnrs, err := db.Containers(context.Background())
require.NoError(t, err)
cnr, _ := obj.ContainerID()
assertContains(cnrs, cnr)
require.NoError(t, metaToMoveIt(db, object.AddressOf(obj)))
cnrs, err = db.Containers(context.Background())
require.NoError(t, err)
assertContains(cnrs, cnr)
})
}
func TestDB_ContainersCount(t *testing.T) {

View file

@ -117,12 +117,16 @@ func (db *DB) init(reset bool) error {
string(containerVolumeBucketName): {},
string(containerCounterBucketName): {},
string(graveyardBucketName): {},
string(toMoveItBucketName): {},
string(garbageBucketName): {},
string(shardInfoBucket): {},
string(bucketNameLocked): {},
}
// buckets that are not used anymore
deprecatedBuckets := [][]byte{
toMoveItBucketName,
}
return db.boltDB.Update(func(tx *bbolt.Tx) error {
var err error
if !reset {
@ -147,6 +151,13 @@ func (db *DB) init(reset bool) error {
}
}
for _, b := range deprecatedBuckets {
err := tx.DeleteBucket(b)
if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
return fmt.Errorf("could not delete deprecated bucket %s: %w", string(b), err)
}
}
if !reset { // counters will be recalculated by refill metabase
err = syncCounter(tx, false)
if err != nil {

View file

@ -434,7 +434,6 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error
addr := object.AddressOf(obj)
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
addrKey := addressKey(addr, make([]byte, addressKeySize))
cnr := addr.Container()
bucketName := make([]byte, bucketKeySize)
@ -470,10 +469,6 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error
name: rootBucketName(cnr, bucketName),
key: objKey,
})
delUniqueIndexItem(tx, namedBucketItem{ // remove from ToMoveIt index
name: toMoveItBucketName,
key: addrKey,
})
return nil
}

View file

@ -33,15 +33,6 @@ func TestDB_Delete(t *testing.T) {
err := putBig(db, child)
require.NoError(t, err)
// fill ToMoveIt index
err = metaToMoveIt(db, object.AddressOf(child))
require.NoError(t, err)
// check if Movable list is not empty
l, err := metaMovable(db)
require.NoError(t, err)
require.Len(t, l, 1)
// try to remove parent, should be no-op, error-free
err = metaDelete(db, object.AddressOf(parent))
require.NoError(t, err)
@ -61,11 +52,6 @@ func TestDB_Delete(t *testing.T) {
err = metaDelete(db, object.AddressOf(child))
require.NoError(t, err)
// check if there is no data in Movable index
l, err = metaMovable(db)
require.NoError(t, err)
require.Len(t, l, 0)
// check if they marked as already removed
ok, err := metaExists(db, object.AddressOf(child))

View file

@ -1,144 +0,0 @@
package meta
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.etcd.io/bbolt"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// ToMoveItPrm groups the parameters of ToMoveIt operation.
type ToMoveItPrm struct {
addr oid.Address
}
// ToMoveItRes groups the resulting values of ToMoveIt operation.
type ToMoveItRes struct{}
// SetAddress sets address of the object to move into another shard.
func (p *ToMoveItPrm) SetAddress(addr oid.Address) {
p.addr = addr
}
// DoNotMovePrm groups the parameters of DoNotMove operation.
type DoNotMovePrm struct {
addr oid.Address
}
// DoNotMoveRes groups the resulting values of DoNotMove operation.
type DoNotMoveRes struct{}
// SetAddress sets address of the object to prevent moving into another shard.
func (p *DoNotMovePrm) SetAddress(addr oid.Address) {
p.addr = addr
}
// MovablePrm groups the parameters of Movable operation.
type MovablePrm struct{}
// MovableRes groups the resulting values of Movable operation.
type MovableRes struct {
addrList []oid.Address
}
// AddressList returns resulting addresses of Movable operation.
func (p MovableRes) AddressList() []oid.Address {
return p.addrList
}
// ToMoveIt marks objects to move it into another shard. This useful for
// faster HRW fetching.
func (db *DB) ToMoveIt(ctx context.Context, prm ToMoveItPrm) (res ToMoveItRes, err error) {
_, span := tracing.StartSpanFromContext(ctx, "metabase.ToMoveIt",
trace.WithAttributes(
attribute.String("address", prm.addr.EncodeToString()),
))
defer span.End()
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return res, ErrDegradedMode
} else if db.mode.ReadOnly() {
return res, ErrReadOnlyMode
}
key := make([]byte, addressKeySize)
key = addressKey(prm.addr, key)
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
toMoveIt := tx.Bucket(toMoveItBucketName)
return toMoveIt.Put(key, zeroValue)
})
return res, metaerr.Wrap(err)
}
// DoNotMove removes `MoveIt` mark from the object.
func (db *DB) DoNotMove(prm DoNotMovePrm) (res DoNotMoveRes, err error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return res, ErrDegradedMode
} else if db.mode.ReadOnly() {
return res, ErrReadOnlyMode
}
key := make([]byte, addressKeySize)
key = addressKey(prm.addr, key)
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
toMoveIt := tx.Bucket(toMoveItBucketName)
return toMoveIt.Delete(key)
})
return res, metaerr.Wrap(err)
}
// Movable returns list of marked objects to move into other shard.
func (db *DB) Movable(_ MovablePrm) (MovableRes, error) {
db.modeMtx.RLock()
defer db.modeMtx.RUnlock()
if db.mode.NoMetabase() {
return MovableRes{}, ErrDegradedMode
}
var strAddrs []string
err := db.boltDB.View(func(tx *bbolt.Tx) error {
toMoveIt := tx.Bucket(toMoveItBucketName)
return toMoveIt.ForEach(func(k, v []byte) error {
strAddrs = append(strAddrs, string(k))
return nil
})
})
if err != nil {
return MovableRes{}, metaerr.Wrap(err)
}
// we can parse strings to structures in-place, but probably it seems
// more efficient to keep bolt db TX code smaller because it might be
// bottleneck.
addrs := make([]oid.Address, len(strAddrs))
for i := range strAddrs {
err = decodeAddressFromKey(&addrs[i], []byte(strAddrs[i]))
if err != nil {
return MovableRes{}, metaerr.Wrap(fmt.Errorf("can't parse object address %v: %w",
strAddrs[i], err))
}
}
return MovableRes{
addrList: addrs,
}, nil
}

View file

@ -1,85 +0,0 @@
package meta_test
import (
"context"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/stretchr/testify/require"
)
func TestDB_Movable(t *testing.T) {
db := newDB(t)
defer func() { require.NoError(t, db.Close()) }()
raw1 := testutil.GenerateObject()
raw2 := testutil.GenerateObject()
// put two objects in metabase
err := putBig(db, raw1)
require.NoError(t, err)
err = putBig(db, raw2)
require.NoError(t, err)
// check if toMoveIt index empty
toMoveList, err := metaMovable(db)
require.NoError(t, err)
require.Len(t, toMoveList, 0)
// mark to move object2
err = metaToMoveIt(db, object.AddressOf(raw2))
require.NoError(t, err)
// check if toMoveIt index contains address of object 2
toMoveList, err = metaMovable(db)
require.NoError(t, err)
require.Len(t, toMoveList, 1)
require.Contains(t, toMoveList, object.AddressOf(raw2))
// remove from toMoveIt index non existing address
err = metaDoNotMove(db, object.AddressOf(raw1))
require.NoError(t, err)
// check if toMoveIt index hasn't changed
toMoveList, err = metaMovable(db)
require.NoError(t, err)
require.Len(t, toMoveList, 1)
// remove from toMoveIt index existing address
err = metaDoNotMove(db, object.AddressOf(raw2))
require.NoError(t, err)
// check if toMoveIt index is empty now
toMoveList, err = metaMovable(db)
require.NoError(t, err)
require.Len(t, toMoveList, 0)
}
func metaToMoveIt(db *meta.DB, addr oid.Address) error {
var toMovePrm meta.ToMoveItPrm
toMovePrm.SetAddress(addr)
_, err := db.ToMoveIt(context.Background(), toMovePrm)
return err
}
func metaMovable(db *meta.DB) ([]oid.Address, error) {
r, err := db.Movable(meta.MovablePrm{})
if err != nil {
return nil, err
}
return r.AddressList(), nil
}
func metaDoNotMove(db *meta.DB, addr oid.Address) error {
var doNotMovePrm meta.DoNotMovePrm
doNotMovePrm.SetAddress(addr)
_, err := db.DoNotMove(doNotMovePrm)
return err
}

View file

@ -1,62 +0,0 @@
package shard
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
)
// ToMoveItPrm encapsulates parameters for ToMoveIt operation.
type ToMoveItPrm struct {
addr oid.Address
}
// ToMoveItRes encapsulates results of ToMoveIt operation.
type ToMoveItRes struct{}
// SetAddress sets object address that should be marked to move into another
// shard.
func (p *ToMoveItPrm) SetAddress(addr oid.Address) {
p.addr = addr
}
// ToMoveIt calls metabase.ToMoveIt method to mark object as relocatable to
// another shard.
func (s *Shard) ToMoveIt(ctx context.Context, prm ToMoveItPrm) (ToMoveItRes, error) {
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.ToMoveIt",
trace.WithAttributes(
attribute.String("shard_id", s.ID().String()),
attribute.String("address", prm.addr.EncodeToString()),
))
defer span.End()
s.m.RLock()
defer s.m.RUnlock()
m := s.info.Mode
if m.ReadOnly() {
return ToMoveItRes{}, ErrReadOnlyMode
} else if m.NoMetabase() {
return ToMoveItRes{}, ErrDegradedMode
}
var toMovePrm meta.ToMoveItPrm
toMovePrm.SetAddress(prm.addr)
_, err := s.metaBase.ToMoveIt(ctx, toMovePrm)
if err != nil {
s.log.Debug(logs.ShardCouldNotMarkObjectForShardRelocationInMetabase,
zap.String("error", err.Error()),
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
)
}
return ToMoveItRes{}, nil
}