diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 2bbadcad8..73c8b13a7 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -379,8 +379,17 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string listPrm.Handler = func(ctx context.Context, addrList []object.Info) error { return e.evacuateObjects(ctx, sh, addrList, prm, res, shards, shardsToEvacuate) } - listPrm.ObjectsWorkers = e.cfg.evacuationObjectWorkerCount - listPrm.ContainersWorkers = e.cfg.evacuationContainerWorkerCount + listPrm.ObjectsWorker = e.cfg.evacuationObjectWorkerCount + listPrm.ContainersWorker = e.cfg.evacuationContainerWorkerCount + + if prm.RepOneOnly { + listPrm.ExcludeContainer = func(cid cid.ID) (bool, error) { + return e.isNotRepOne(cid) + } + listPrm.CalcExcluded = func(count uint64) { + res.objSkipped.Add(count) + } + } err := sh.ListConcurrently(ctx, listPrm) if err != nil { @@ -650,16 +659,6 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to default: } addr := toEvacuate[i].Address - if prm.RepOneOnly { - repOne, err := e.isRepOne(addr.Container()) - if err != nil { - return err - } - if !repOne { - res.objSkipped.Add(1) - continue - } - } var getPrm shard.GetPrm getPrm.SetAddress(addr) getPrm.SkipEvacCheck(true) @@ -709,7 +708,7 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to return nil } -func (e *StorageEngine) isRepOne(cid cid.ID) (bool, error) { +func (e *StorageEngine) isNotRepOne(cid cid.ID) (bool, error) { c, err := e.containerSource.Load().cs.Get(cid) if err != nil { return false, err @@ -717,10 +716,10 @@ func (e *StorageEngine) isRepOne(cid cid.ID) (bool, error) { p := c.Value.PlacementPolicy() for i := range p.NumberOfReplicas() { if p.ReplicaDescriptor(i).NumberOfObjects() == 1 { - return true, nil + return false, nil } } - return false, nil + return true, nil } func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index aee159e24..d86a39532 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -667,13 +667,14 @@ func TestEvacuateShardObjectsRepOneOnly(t *testing.T) { e.SetContainerSource(&containerStorage{cntmap: cnrmap}) for _, sh := range ids { - for i := 0; i < 4; i++ { - obj := testutil.GenerateObjectWithCID(cids[i%2]) - - var putPrm shard.PutPrm - putPrm.SetObject(obj) - _, err := e.shards[sh.String()].Put(context.Background(), putPrm) - require.NoError(t, err) + for j := range 2 { + for range 4 { + obj := testutil.GenerateObjectWithCID(cids[j]) + var putPrm shard.PutPrm + putPrm.SetObject(obj) + _, err := e.shards[sh.String()].Put(context.Background(), putPrm) + require.NoError(t, err) + } } } @@ -686,7 +687,7 @@ func TestEvacuateShardObjectsRepOneOnly(t *testing.T) { res, err := e.Evacuate(context.Background(), prm) require.NoError(t, err) - require.Equal(t, uint64(2), res.ObjectsEvacuated()) - require.Equal(t, uint64(2), res.ObjectsSkipped()) + require.Equal(t, uint64(4), res.ObjectsEvacuated()) + require.Equal(t, uint64(4), res.ObjectsSkipped()) require.Equal(t, uint64(0), res.ObjectsFailed()) } diff --git a/pkg/local_object_storage/metabase/list.go b/pkg/local_object_storage/metabase/list.go index 9b63f8faa..4626d7b14 100644 --- a/pkg/local_object_storage/metabase/list.go +++ b/pkg/local_object_storage/metabase/list.go @@ -68,10 +68,14 @@ type ListConcurrentlyPrm struct { Handler func(context.Context, []objectcore.Info) error // BatchSize maximum amount of addresses that will be passed to Handler. BatchSize uint32 - // ContainersWorkers amount of containers computed concurrently. + // ContainersWorker amount of containers computed concurrently. ContainersWorker uint32 - // ObjectsWorkers amount of workers runs Handler concurrently for each container. + // ObjectsWorker amount of workers runs Handler concurrently for each container. ObjectsWorker uint32 + // ExcludeContainer function to check should container be excluded or not. + ExcludeContainer func(cid.ID) (bool, error) + // CalcExcluded count excluded objects + CalcExcluded func(uint642 uint64) } // ListWithCursor lists physical objects available in metabase starting from @@ -259,6 +263,24 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket return to, offset, cursor, nil } +// countAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage. +func countAliveObjectsInBucket(bkt *bbolt.Bucket, // main bucket + graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets + cidRaw []byte, // container ID prefix, optimization +) uint64 { + c := bkt.Cursor() + k, _ := c.First() + var count uint64 + for ; k != nil; k, _ = c.Next() { + if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 { + continue + } + count++ + } + + return count +} + func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) { if len(name) < bucketKeySize { return nil, 0 @@ -343,6 +365,21 @@ func (db *DB) listConcurrently(ctx context.Context, tx *bbolt.Tx, prm ListConcur var cnt cid.ID copy(cnt[:], containerID[:]) eg.Go(func() error { + if prm.ExcludeContainer != nil { + exclude, err := prm.ExcludeContainer(cnt) + if err != nil { + return err + } + if exclude { + if prm.CalcExcluded != nil { + buf := make([]byte, cidSize, addressKeySize) + copy(buf, rawAddr) + prm.CalcExcluded( + countAliveObjectsInBucket(bkt, graveyardBkt, garbageBkt, buf)) + } + return nil + } + } return selectConcurrentlyFromBucket(egCtx, bkt, objType, graveyardBkt, garbageBkt, rawAddr, cnt, prm) }) diff --git a/pkg/local_object_storage/shard/list.go b/pkg/local_object_storage/shard/list.go index 4f6076d6e..be468699d 100644 --- a/pkg/local_object_storage/shard/list.go +++ b/pkg/local_object_storage/shard/list.go @@ -41,9 +41,13 @@ type ListConcurrentlyPrm struct { // BatchSize maximum amount of addresses that will be passed to Handler. BatchSize uint32 // ContainersWorkers amount of containers computed concurrently. - ContainersWorkers uint32 + ContainersWorker uint32 // ObjectsWorkers amount of workers runs Handler concurrently - ObjectsWorkers uint32 + ObjectsWorker uint32 + // ExcludeContainer function to check should container be excluded or not. + ExcludeContainer func(cid.ID) (bool, error) + // CalcExcluded count excluded objects + CalcExcluded func(uint642 uint64) } // ListWithCursorPrm contains parameters for ListWithCursor operation. @@ -184,8 +188,8 @@ func (s *Shard) ListConcurrently(ctx context.Context, prm ListConcurrentlyPrm) e trace.WithAttributes( attribute.Int64("batch_size", int64(prm.BatchSize)), attribute.Bool("has_handler", prm.Handler != nil), - attribute.Int("objects_workers", int(prm.ObjectsWorkers)), - attribute.Int("containers_workers", int(prm.ContainersWorkers)), + attribute.Int("objects_worker", int(prm.ObjectsWorker)), + attribute.Int("containers_worker", int(prm.ContainersWorker)), )) defer span.End() @@ -196,8 +200,10 @@ func (s *Shard) ListConcurrently(ctx context.Context, prm ListConcurrentlyPrm) e var metaPrm meta.ListConcurrentlyPrm metaPrm.BatchSize = prm.BatchSize metaPrm.Handler = prm.Handler - metaPrm.ContainersWorker = prm.ContainersWorkers - metaPrm.ObjectsWorker = prm.ObjectsWorkers + metaPrm.ContainersWorker = prm.ContainersWorker + metaPrm.ObjectsWorker = prm.ObjectsWorker + metaPrm.ExcludeContainer = prm.ExcludeContainer + metaPrm.CalcExcluded = prm.CalcExcluded err := s.metaBase.ListConcurrently(ctx, metaPrm) if err != nil { return fmt.Errorf("could not list objects concurrently: %w", err)