metabase: Drop toMoveIt bucket #997
10 changed files with 16 additions and 347 deletions
|
@ -220,7 +220,6 @@ const (
|
|||
EngineCouldNotCloseShard = "could not close shard"
|
||||
EngineCouldNotReloadAShard = "could not reload a shard"
|
||||
EngineAddedNewShard = "added new shard"
|
||||
EngineCouldNotMarkObjectForShardRelocation = "could not mark object for shard relocation"
|
||||
EngineCouldNotPutObjectToShard = "could not put object to shard"
|
||||
EngineErrorDuringSearchingForObjectChildren = "error during searching for object children"
|
||||
EngineCouldNotInhumeObjectInShard = "could not inhume object in shard"
|
||||
|
|
|
@ -700,7 +700,7 @@ func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Add
|
|||
if _, ok := shardsToEvacuate[shards[j].ID().String()]; ok {
|
||||
continue
|
||||
}
|
||||
switch e.putToShard(ctx, shards[j].hashedShard, j, shards[j].pool, addr, object).status {
|
||||
switch e.putToShard(ctx, shards[j].hashedShard, shards[j].pool, addr, object).status {
|
||||
case putToShardSuccess:
|
||||
res.objEvacuated.Add(1)
|
||||
e.log.Debug(logs.EngineObjectIsMovedToAnotherShard,
|
||||
|
|
|
@ -84,7 +84,7 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
|||
}
|
||||
|
||||
var shRes putToShardRes
|
||||
e.iterateOverSortedShards(addr, func(ind int, sh hashedShard) (stop bool) {
|
||||
e.iterateOverSortedShards(addr, func(_ int, sh hashedShard) (stop bool) {
|
||||
e.mtx.RLock()
|
||||
pool, ok := e.shardPools[sh.ID().String()]
|
||||
e.mtx.RUnlock()
|
||||
|
@ -92,7 +92,7 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
|||
// Shard was concurrently removed, skip.
|
||||
return false
|
||||
}
|
||||
shRes = e.putToShard(ctx, sh, ind, pool, addr, prm.obj)
|
||||
shRes = e.putToShard(ctx, sh, pool, addr, prm.obj)
|
||||
return shRes.status != putToShardUnknown
|
||||
})
|
||||
switch shRes.status {
|
||||
|
@ -109,7 +109,7 @@ func (e *StorageEngine) put(ctx context.Context, prm PutPrm) error {
|
|||
|
||||
// putToShard puts object to sh.
|
||||
// Return putToShardStatus and error if it is necessary to propagate an error upper.
|
||||
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int, pool util.WorkerPool,
|
||||
func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, pool util.WorkerPool,
|
||||
addr oid.Address, obj *objectSDK.Object,
|
||||
) (res putToShardRes) {
|
||||
exitCh := make(chan struct{})
|
||||
|
@ -132,20 +132,6 @@ func (e *StorageEngine) putToShard(ctx context.Context, sh hashedShard, ind int,
|
|||
}
|
||||
|
||||
if exists.Exists() {
|
||||
if ind != 0 {
|
||||
var toMoveItPrm shard.ToMoveItPrm
|
||||
toMoveItPrm.SetAddress(addr)
|
||||
|
||||
_, err = sh.ToMoveIt(ctx, toMoveItPrm)
|
||||
if err != nil {
|
||||
e.log.Warn(logs.EngineCouldNotMarkObjectForShardRelocation,
|
||||
zap.Stringer("shard", sh.ID()),
|
||||
zap.String("error", err.Error()),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
res.status = putToShardExists
|
||||
return
|
||||
}
|
||||
|
|
|
@ -74,23 +74,6 @@ func TestDB_Containers(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
assertContains(cnrs, cnr)
|
||||
})
|
||||
|
||||
t.Run("ToMoveIt", func(t *testing.T) {
|
||||
obj := testutil.GenerateObject()
|
||||
|
||||
require.NoError(t, putBig(db, obj))
|
||||
|
||||
cnrs, err := db.Containers(context.Background())
|
||||
require.NoError(t, err)
|
||||
cnr, _ := obj.ContainerID()
|
||||
assertContains(cnrs, cnr)
|
||||
|
||||
require.NoError(t, metaToMoveIt(db, object.AddressOf(obj)))
|
||||
|
||||
cnrs, err = db.Containers(context.Background())
|
||||
require.NoError(t, err)
|
||||
assertContains(cnrs, cnr)
|
||||
})
|
||||
}
|
||||
|
||||
func TestDB_ContainersCount(t *testing.T) {
|
||||
|
|
|
@ -117,12 +117,16 @@ func (db *DB) init(reset bool) error {
|
|||
string(containerVolumeBucketName): {},
|
||||
string(containerCounterBucketName): {},
|
||||
string(graveyardBucketName): {},
|
||||
string(toMoveItBucketName): {},
|
||||
string(garbageBucketName): {},
|
||||
string(shardInfoBucket): {},
|
||||
string(bucketNameLocked): {},
|
||||
}
|
||||
|
||||
// buckets that are not used anymore
|
||||
deprecatedBuckets := [][]byte{
|
||||
toMoveItBucketName,
|
||||
}
|
||||
|
||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
var err error
|
||||
if !reset {
|
||||
|
@ -147,6 +151,13 @@ func (db *DB) init(reset bool) error {
|
|||
}
|
||||
}
|
||||
|
||||
for _, b := range deprecatedBuckets {
|
||||
err := tx.DeleteBucket(b)
|
||||
if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
|
||||
return fmt.Errorf("could not delete deprecated bucket %s: %w", string(b), err)
|
||||
}
|
||||
}
|
||||
|
||||
if !reset { // counters will be recalculated by refill metabase
|
||||
err = syncCounter(tx, false)
|
||||
if err != nil {
|
||||
|
|
|
@ -434,7 +434,6 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error
|
|||
addr := object.AddressOf(obj)
|
||||
|
||||
objKey := objectKey(addr.Object(), make([]byte, objectKeySize))
|
||||
addrKey := addressKey(addr, make([]byte, addressKeySize))
|
||||
cnr := addr.Container()
|
||||
bucketName := make([]byte, bucketKeySize)
|
||||
|
||||
|
@ -470,10 +469,6 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error
|
|||
name: rootBucketName(cnr, bucketName),
|
||||
key: objKey,
|
||||
})
|
||||
delUniqueIndexItem(tx, namedBucketItem{ // remove from ToMoveIt index
|
||||
name: toMoveItBucketName,
|
||||
key: addrKey,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -33,15 +33,6 @@ func TestDB_Delete(t *testing.T) {
|
|||
err := putBig(db, child)
|
||||
require.NoError(t, err)
|
||||
|
||||
// fill ToMoveIt index
|
||||
err = metaToMoveIt(db, object.AddressOf(child))
|
||||
require.NoError(t, err)
|
||||
|
||||
// check if Movable list is not empty
|
||||
l, err := metaMovable(db)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, l, 1)
|
||||
|
||||
// try to remove parent, should be no-op, error-free
|
||||
err = metaDelete(db, object.AddressOf(parent))
|
||||
require.NoError(t, err)
|
||||
|
@ -61,11 +52,6 @@ func TestDB_Delete(t *testing.T) {
|
|||
err = metaDelete(db, object.AddressOf(child))
|
||||
require.NoError(t, err)
|
||||
|
||||
// check if there is no data in Movable index
|
||||
l, err = metaMovable(db)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, l, 0)
|
||||
|
||||
// check if they marked as already removed
|
||||
|
||||
ok, err := metaExists(db, object.AddressOf(child))
|
||||
|
|
|
@ -1,144 +0,0 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/metaerr"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// ToMoveItPrm groups the parameters of ToMoveIt operation.
|
||||
type ToMoveItPrm struct {
|
||||
addr oid.Address
|
||||
}
|
||||
|
||||
// ToMoveItRes groups the resulting values of ToMoveIt operation.
|
||||
type ToMoveItRes struct{}
|
||||
|
||||
// SetAddress sets address of the object to move into another shard.
|
||||
func (p *ToMoveItPrm) SetAddress(addr oid.Address) {
|
||||
p.addr = addr
|
||||
}
|
||||
|
||||
// DoNotMovePrm groups the parameters of DoNotMove operation.
|
||||
type DoNotMovePrm struct {
|
||||
addr oid.Address
|
||||
}
|
||||
|
||||
// DoNotMoveRes groups the resulting values of DoNotMove operation.
|
||||
type DoNotMoveRes struct{}
|
||||
|
||||
// SetAddress sets address of the object to prevent moving into another shard.
|
||||
func (p *DoNotMovePrm) SetAddress(addr oid.Address) {
|
||||
p.addr = addr
|
||||
}
|
||||
|
||||
// MovablePrm groups the parameters of Movable operation.
|
||||
type MovablePrm struct{}
|
||||
|
||||
// MovableRes groups the resulting values of Movable operation.
|
||||
type MovableRes struct {
|
||||
addrList []oid.Address
|
||||
}
|
||||
|
||||
// AddressList returns resulting addresses of Movable operation.
|
||||
func (p MovableRes) AddressList() []oid.Address {
|
||||
return p.addrList
|
||||
}
|
||||
|
||||
// ToMoveIt marks objects to move it into another shard. This useful for
|
||||
// faster HRW fetching.
|
||||
func (db *DB) ToMoveIt(ctx context.Context, prm ToMoveItPrm) (res ToMoveItRes, err error) {
|
||||
_, span := tracing.StartSpanFromContext(ctx, "metabase.ToMoveIt",
|
||||
trace.WithAttributes(
|
||||
attribute.String("address", prm.addr.EncodeToString()),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return res, ErrDegradedMode
|
||||
} else if db.mode.ReadOnly() {
|
||||
return res, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
key := make([]byte, addressKeySize)
|
||||
key = addressKey(prm.addr, key)
|
||||
|
||||
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
toMoveIt := tx.Bucket(toMoveItBucketName)
|
||||
return toMoveIt.Put(key, zeroValue)
|
||||
})
|
||||
|
||||
return res, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
// DoNotMove removes `MoveIt` mark from the object.
|
||||
func (db *DB) DoNotMove(prm DoNotMovePrm) (res DoNotMoveRes, err error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return res, ErrDegradedMode
|
||||
} else if db.mode.ReadOnly() {
|
||||
return res, ErrReadOnlyMode
|
||||
}
|
||||
|
||||
key := make([]byte, addressKeySize)
|
||||
key = addressKey(prm.addr, key)
|
||||
|
||||
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
toMoveIt := tx.Bucket(toMoveItBucketName)
|
||||
return toMoveIt.Delete(key)
|
||||
})
|
||||
|
||||
return res, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
// Movable returns list of marked objects to move into other shard.
|
||||
func (db *DB) Movable(_ MovablePrm) (MovableRes, error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if db.mode.NoMetabase() {
|
||||
return MovableRes{}, ErrDegradedMode
|
||||
}
|
||||
|
||||
var strAddrs []string
|
||||
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
toMoveIt := tx.Bucket(toMoveItBucketName)
|
||||
return toMoveIt.ForEach(func(k, v []byte) error {
|
||||
strAddrs = append(strAddrs, string(k))
|
||||
|
||||
return nil
|
||||
})
|
||||
})
|
||||
if err != nil {
|
||||
return MovableRes{}, metaerr.Wrap(err)
|
||||
}
|
||||
|
||||
// we can parse strings to structures in-place, but probably it seems
|
||||
// more efficient to keep bolt db TX code smaller because it might be
|
||||
// bottleneck.
|
||||
addrs := make([]oid.Address, len(strAddrs))
|
||||
|
||||
for i := range strAddrs {
|
||||
err = decodeAddressFromKey(&addrs[i], []byte(strAddrs[i]))
|
||||
if err != nil {
|
||||
return MovableRes{}, metaerr.Wrap(fmt.Errorf("can't parse object address %v: %w",
|
||||
strAddrs[i], err))
|
||||
}
|
||||
}
|
||||
|
||||
return MovableRes{
|
||||
addrList: addrs,
|
||||
}, nil
|
||||
}
|
|
@ -1,85 +0,0 @@
|
|||
package meta_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDB_Movable(t *testing.T) {
|
||||
db := newDB(t)
|
||||
defer func() { require.NoError(t, db.Close()) }()
|
||||
|
||||
raw1 := testutil.GenerateObject()
|
||||
raw2 := testutil.GenerateObject()
|
||||
|
||||
// put two objects in metabase
|
||||
err := putBig(db, raw1)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = putBig(db, raw2)
|
||||
require.NoError(t, err)
|
||||
|
||||
// check if toMoveIt index empty
|
||||
toMoveList, err := metaMovable(db)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, toMoveList, 0)
|
||||
|
||||
// mark to move object2
|
||||
err = metaToMoveIt(db, object.AddressOf(raw2))
|
||||
require.NoError(t, err)
|
||||
|
||||
// check if toMoveIt index contains address of object 2
|
||||
toMoveList, err = metaMovable(db)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, toMoveList, 1)
|
||||
require.Contains(t, toMoveList, object.AddressOf(raw2))
|
||||
|
||||
// remove from toMoveIt index non existing address
|
||||
err = metaDoNotMove(db, object.AddressOf(raw1))
|
||||
require.NoError(t, err)
|
||||
|
||||
// check if toMoveIt index hasn't changed
|
||||
toMoveList, err = metaMovable(db)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, toMoveList, 1)
|
||||
|
||||
// remove from toMoveIt index existing address
|
||||
err = metaDoNotMove(db, object.AddressOf(raw2))
|
||||
require.NoError(t, err)
|
||||
|
||||
// check if toMoveIt index is empty now
|
||||
toMoveList, err = metaMovable(db)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, toMoveList, 0)
|
||||
}
|
||||
|
||||
func metaToMoveIt(db *meta.DB, addr oid.Address) error {
|
||||
var toMovePrm meta.ToMoveItPrm
|
||||
toMovePrm.SetAddress(addr)
|
||||
|
||||
_, err := db.ToMoveIt(context.Background(), toMovePrm)
|
||||
return err
|
||||
}
|
||||
|
||||
func metaMovable(db *meta.DB) ([]oid.Address, error) {
|
||||
r, err := db.Movable(meta.MovablePrm{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r.AddressList(), nil
|
||||
}
|
||||
|
||||
func metaDoNotMove(db *meta.DB, addr oid.Address) error {
|
||||
var doNotMovePrm meta.DoNotMovePrm
|
||||
doNotMovePrm.SetAddress(addr)
|
||||
|
||||
_, err := db.DoNotMove(doNotMovePrm)
|
||||
return err
|
||||
}
|
|
@ -1,62 +0,0 @@
|
|||
package shard
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||
tracingPkg "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/tracing"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// ToMoveItPrm encapsulates parameters for ToMoveIt operation.
|
||||
type ToMoveItPrm struct {
|
||||
addr oid.Address
|
||||
}
|
||||
|
||||
// ToMoveItRes encapsulates results of ToMoveIt operation.
|
||||
type ToMoveItRes struct{}
|
||||
|
||||
// SetAddress sets object address that should be marked to move into another
|
||||
// shard.
|
||||
func (p *ToMoveItPrm) SetAddress(addr oid.Address) {
|
||||
p.addr = addr
|
||||
}
|
||||
|
||||
// ToMoveIt calls metabase.ToMoveIt method to mark object as relocatable to
|
||||
// another shard.
|
||||
func (s *Shard) ToMoveIt(ctx context.Context, prm ToMoveItPrm) (ToMoveItRes, error) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "Shard.ToMoveIt",
|
||||
trace.WithAttributes(
|
||||
attribute.String("shard_id", s.ID().String()),
|
||||
attribute.String("address", prm.addr.EncodeToString()),
|
||||
))
|
||||
defer span.End()
|
||||
|
||||
s.m.RLock()
|
||||
defer s.m.RUnlock()
|
||||
|
||||
m := s.info.Mode
|
||||
if m.ReadOnly() {
|
||||
return ToMoveItRes{}, ErrReadOnlyMode
|
||||
} else if m.NoMetabase() {
|
||||
return ToMoveItRes{}, ErrDegradedMode
|
||||
}
|
||||
|
||||
var toMovePrm meta.ToMoveItPrm
|
||||
toMovePrm.SetAddress(prm.addr)
|
||||
|
||||
_, err := s.metaBase.ToMoveIt(ctx, toMovePrm)
|
||||
if err != nil {
|
||||
s.log.Debug(logs.ShardCouldNotMarkObjectForShardRelocationInMetabase,
|
||||
zap.String("error", err.Error()),
|
||||
zap.String("trace_id", tracingPkg.GetTraceID(ctx)),
|
||||
)
|
||||
}
|
||||
|
||||
return ToMoveItRes{}, nil
|
||||
}
|
Loading…
Reference in a new issue