Evgenii Stratonikov a49137349b [] engine: Allow to use user handler for evacuated objects
Signed-off-by: Evgenii Stratonikov <>
2022-09-24 13:47:48 +03:00

165 lines
4.1 KiB

package engine
import (
meta ""
objectSDK ""
oid ""
// EvacuateShardPrm represents parameters for the EvacuateShard operation.
type EvacuateShardPrm struct {
shardID *shard.ID
handler func(oid.Address, *objectSDK.Object) error
ignoreErrors bool
// EvacuateShardRes represents result of the EvacuateShard operation.
type EvacuateShardRes struct {
count int
// WithShardID sets shard ID.
func (p *EvacuateShardPrm) WithShardID(id *shard.ID) {
p.shardID = id
// WithIgnoreErrors sets flag to ignore errors.
func (p *EvacuateShardPrm) WithIgnoreErrors(ignore bool) {
p.ignoreErrors = ignore
// WithFaultHandler sets handler to call for objects which cannot be saved on other shards.
func (p *EvacuateShardPrm) WithFaultHandler(f func(oid.Address, *objectSDK.Object) error) {
p.handler = f
// Count returns amount of evacuated objects.
// Objects for which handler returned no error are also assumed evacuated.
func (p EvacuateShardRes) Count() int {
return p.count
const defaultEvacuateBatchSize = 100
type pooledShard struct {
pool util.WorkerPool
var errMustHaveTwoShards = errors.New("amount of shards must be > 2")
// Evacuate moves data from one shard to the others.
// The shard being moved must be in read-only mode.
func (e *StorageEngine) Evacuate(prm EvacuateShardPrm) (EvacuateShardRes, error) {
sid := prm.shardID.String()
sh, ok := e.shards[sid]
if !ok {
return EvacuateShardRes{}, errShardNotFound
if len(e.shards) < 2 && prm.handler == nil {
return EvacuateShardRes{}, errMustHaveTwoShards
if !sh.GetMode().ReadOnly() {
return EvacuateShardRes{}, shard.ErrMustBeReadOnly
// We must have all shards, to have correct information about their
// indexes in a sorted slice and set appropriate marks in the metabase.
// Evacuated shard is skipped during put.
shards := make([]pooledShard, 0, len(e.shards))
for id := range e.shards {
shards = append(shards, pooledShard{
hashedShard: hashedShard(e.shards[id]),
pool: e.shardPools[id],
weights := make([]float64, 0, len(shards))
for i := range shards {
weights = append(weights, e.shardWeight(shards[i].Shard))
var listPrm shard.ListWithCursorPrm
var c *meta.Cursor
var res EvacuateShardRes
for {
// TODO (@fyrchik): #1731 this approach doesn't work in degraded modes
// because ListWithCursor works only with the metabase.
listRes, err := sh.Shard.ListWithCursor(listPrm)
if err != nil {
if errors.Is(err, meta.ErrEndOfListing) {
return res, nil
return res, err
// TODO (@fyrchik): #1731 parallelize the loop
lst := listRes.AddressList()
for i := range lst {
var getPrm shard.GetPrm
getRes, err := sh.Get(getPrm)
if err != nil {
if prm.ignoreErrors {
return res, err
hrw.SortSliceByWeightValue(shards, weights, hrw.Hash([]byte(lst[i].EncodeToString())))
for j := range shards {
if shards[j].ID().String() == sid {
putDone, exists := e.putToShard(shards[j].Shard, j, shards[j].pool, lst[i], getRes.Object())
if putDone || exists {
if putDone {
e.log.Debug("object is moved to another shard",
zap.String("from", sid),
zap.Stringer("to", shards[j].ID()),
zap.Stringer("addr", lst[i]))
continue loop
if prm.handler == nil {
// Do not check ignoreErrors flag here because
// ignoring errors on put make this command kinda useless.
return res, fmt.Errorf("%w: %s", errPutShard, lst[i])
err = prm.handler(lst[i], getRes.Object())
if err != nil {
return res, err
c = listRes.Cursor()