[#1356] review
Some checks failed
DCO action / DCO (pull_request) Successful in 1m13s
Tests and linters / Run gofumpt (pull_request) Successful in 1m30s
Vulncheck / Vulncheck (pull_request) Failing after 2m27s
Tests and linters / Tests with -race (pull_request) Failing after 3m24s
Tests and linters / gopls check (pull_request) Successful in 3m53s
Pre-commit hooks / Pre-commit (pull_request) Successful in 4m45s
Tests and linters / Staticcheck (pull_request) Successful in 5m12s
Build / Build Components (pull_request) Successful in 5m22s
Tests and linters / Lint (pull_request) Successful in 5m47s
Tests and linters / Tests (pull_request) Successful in 7m20s

Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
This commit is contained in:
Anton Nikiforov 2024-09-18 15:00:19 +03:00
parent 313563157e
commit 69c75a660b
4 changed files with 75 additions and 32 deletions

View file

@ -379,8 +379,17 @@ func (e *StorageEngine) evacuateShardObjects(ctx context.Context, shardID string
listPrm.Handler = func(ctx context.Context, addrList []object.Info) error { listPrm.Handler = func(ctx context.Context, addrList []object.Info) error {
return e.evacuateObjects(ctx, sh, addrList, prm, res, shards, shardsToEvacuate) return e.evacuateObjects(ctx, sh, addrList, prm, res, shards, shardsToEvacuate)
} }
listPrm.ObjectsWorkers = e.cfg.evacuationObjectWorkerCount listPrm.ObjectsWorker = e.cfg.evacuationObjectWorkerCount
listPrm.ContainersWorkers = e.cfg.evacuationContainerWorkerCount listPrm.ContainersWorker = e.cfg.evacuationContainerWorkerCount
if prm.RepOneOnly {
listPrm.ExcludeContainer = func(cid cid.ID) (bool, error) {
return e.isNotRepOne(cid)
}
listPrm.CalcExcluded = func(count uint64) {
res.objSkipped.Add(count)
}
}
err := sh.ListConcurrently(ctx, listPrm) err := sh.ListConcurrently(ctx, listPrm)
if err != nil { if err != nil {
@ -650,16 +659,6 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
default: default:
} }
addr := toEvacuate[i].Address addr := toEvacuate[i].Address
if prm.RepOneOnly {
repOne, err := e.isRepOne(addr.Container())
if err != nil {
return err
}
if !repOne {
res.objSkipped.Add(1)
continue
}
}
var getPrm shard.GetPrm var getPrm shard.GetPrm
getPrm.SetAddress(addr) getPrm.SetAddress(addr)
getPrm.SkipEvacCheck(true) getPrm.SkipEvacCheck(true)
@ -709,7 +708,7 @@ func (e *StorageEngine) evacuateObjects(ctx context.Context, sh *shard.Shard, to
return nil return nil
} }
func (e *StorageEngine) isRepOne(cid cid.ID) (bool, error) { func (e *StorageEngine) isNotRepOne(cid cid.ID) (bool, error) {
c, err := e.containerSource.Load().cs.Get(cid) c, err := e.containerSource.Load().cs.Get(cid)
if err != nil { if err != nil {
return false, err return false, err
@ -717,11 +716,11 @@ func (e *StorageEngine) isRepOne(cid cid.ID) (bool, error) {
p := c.Value.PlacementPolicy() p := c.Value.PlacementPolicy()
for i := range p.NumberOfReplicas() { for i := range p.NumberOfReplicas() {
if p.ReplicaDescriptor(i).NumberOfObjects() == 1 { if p.ReplicaDescriptor(i).NumberOfObjects() == 1 {
return true, nil
}
}
return false, nil return false, nil
} }
}
return true, nil
}
func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard, func (e *StorageEngine) tryEvacuateObjectLocal(ctx context.Context, addr oid.Address, object *objectSDK.Object, sh *shard.Shard,
shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes, shards []pooledShard, shardsToEvacuate map[string]*shard.Shard, res *EvacuateShardRes,

View file

@ -667,15 +667,16 @@ func TestEvacuateShardObjectsRepOneOnly(t *testing.T) {
e.SetContainerSource(&containerStorage{cntmap: cnrmap}) e.SetContainerSource(&containerStorage{cntmap: cnrmap})
for _, sh := range ids { for _, sh := range ids {
for i := 0; i < 4; i++ { for j := range 2 {
obj := testutil.GenerateObjectWithCID(cids[i%2]) for range 4 {
obj := testutil.GenerateObjectWithCID(cids[j])
var putPrm shard.PutPrm var putPrm shard.PutPrm
putPrm.SetObject(obj) putPrm.SetObject(obj)
_, err := e.shards[sh.String()].Put(context.Background(), putPrm) _, err := e.shards[sh.String()].Put(context.Background(), putPrm)
require.NoError(t, err) require.NoError(t, err)
} }
} }
}
var prm EvacuateShardPrm var prm EvacuateShardPrm
prm.ShardID = ids[0:1] prm.ShardID = ids[0:1]
@ -686,7 +687,7 @@ func TestEvacuateShardObjectsRepOneOnly(t *testing.T) {
res, err := e.Evacuate(context.Background(), prm) res, err := e.Evacuate(context.Background(), prm)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, uint64(2), res.ObjectsEvacuated()) require.Equal(t, uint64(4), res.ObjectsEvacuated())
require.Equal(t, uint64(2), res.ObjectsSkipped()) require.Equal(t, uint64(4), res.ObjectsSkipped())
require.Equal(t, uint64(0), res.ObjectsFailed()) require.Equal(t, uint64(0), res.ObjectsFailed())
} }

View file

@ -68,10 +68,14 @@ type ListConcurrentlyPrm struct {
Handler func(context.Context, []objectcore.Info) error Handler func(context.Context, []objectcore.Info) error
// BatchSize maximum amount of addresses that will be passed to Handler. // BatchSize maximum amount of addresses that will be passed to Handler.
BatchSize uint32 BatchSize uint32
// ContainersWorkers amount of containers computed concurrently. // ContainersWorker amount of containers computed concurrently.
ContainersWorker uint32 ContainersWorker uint32
// ObjectsWorkers amount of workers runs Handler concurrently for each container. // ObjectsWorker amount of workers runs Handler concurrently for each container.
ObjectsWorker uint32 ObjectsWorker uint32
// ExcludeContainer function to check should container be excluded or not.
ExcludeContainer func(cid.ID) (bool, error)
// CalcExcluded count excluded objects
CalcExcluded func(uint642 uint64)
} }
// ListWithCursor lists physical objects available in metabase starting from // ListWithCursor lists physical objects available in metabase starting from
@ -259,6 +263,24 @@ func selectNFromBucket(bkt *bbolt.Bucket, // main bucket
return to, offset, cursor, nil return to, offset, cursor, nil
} }
// countAliveObjectsInBucket count objects in bucket which aren't in graveyard or garbage.
func countAliveObjectsInBucket(bkt *bbolt.Bucket, // main bucket
graveyardBkt, garbageBkt *bbolt.Bucket, // cached graveyard buckets
cidRaw []byte, // container ID prefix, optimization
) uint64 {
c := bkt.Cursor()
k, _ := c.First()
var count uint64
for ; k != nil; k, _ = c.Next() {
if inGraveyardWithKey(append(cidRaw, k...), graveyardBkt, garbageBkt) > 0 {
continue
}
count++
}
return count
}
func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) { func parseContainerIDWithPrefix(containerID *cid.ID, name []byte) ([]byte, byte) {
if len(name) < bucketKeySize { if len(name) < bucketKeySize {
return nil, 0 return nil, 0
@ -343,6 +365,21 @@ func (db *DB) listConcurrently(ctx context.Context, tx *bbolt.Tx, prm ListConcur
var cnt cid.ID var cnt cid.ID
copy(cnt[:], containerID[:]) copy(cnt[:], containerID[:])
eg.Go(func() error { eg.Go(func() error {
if prm.ExcludeContainer != nil {
exclude, err := prm.ExcludeContainer(cnt)
if err != nil {
return err
}
if exclude {
if prm.CalcExcluded != nil {
buf := make([]byte, cidSize, addressKeySize)
copy(buf, rawAddr)
prm.CalcExcluded(
countAliveObjectsInBucket(bkt, graveyardBkt, garbageBkt, buf))
}
return nil
}
}
return selectConcurrentlyFromBucket(egCtx, return selectConcurrentlyFromBucket(egCtx,
bkt, objType, graveyardBkt, garbageBkt, rawAddr, cnt, prm) bkt, objType, graveyardBkt, garbageBkt, rawAddr, cnt, prm)
}) })

View file

@ -41,9 +41,13 @@ type ListConcurrentlyPrm struct {
// BatchSize maximum amount of addresses that will be passed to Handler. // BatchSize maximum amount of addresses that will be passed to Handler.
BatchSize uint32 BatchSize uint32
// ContainersWorkers amount of containers computed concurrently. // ContainersWorkers amount of containers computed concurrently.
ContainersWorkers uint32 ContainersWorker uint32
// ObjectsWorkers amount of workers runs Handler concurrently // ObjectsWorkers amount of workers runs Handler concurrently
ObjectsWorkers uint32 ObjectsWorker uint32
// ExcludeContainer function to check should container be excluded or not.
ExcludeContainer func(cid.ID) (bool, error)
// CalcExcluded count excluded objects
CalcExcluded func(uint642 uint64)
} }
// ListWithCursorPrm contains parameters for ListWithCursor operation. // ListWithCursorPrm contains parameters for ListWithCursor operation.
@ -184,8 +188,8 @@ func (s *Shard) ListConcurrently(ctx context.Context, prm ListConcurrentlyPrm) e
trace.WithAttributes( trace.WithAttributes(
attribute.Int64("batch_size", int64(prm.BatchSize)), attribute.Int64("batch_size", int64(prm.BatchSize)),
attribute.Bool("has_handler", prm.Handler != nil), attribute.Bool("has_handler", prm.Handler != nil),
attribute.Int("objects_workers", int(prm.ObjectsWorkers)), attribute.Int("objects_worker", int(prm.ObjectsWorker)),
attribute.Int("containers_workers", int(prm.ContainersWorkers)), attribute.Int("containers_worker", int(prm.ContainersWorker)),
)) ))
defer span.End() defer span.End()
@ -196,8 +200,10 @@ func (s *Shard) ListConcurrently(ctx context.Context, prm ListConcurrentlyPrm) e
var metaPrm meta.ListConcurrentlyPrm var metaPrm meta.ListConcurrentlyPrm
metaPrm.BatchSize = prm.BatchSize metaPrm.BatchSize = prm.BatchSize
metaPrm.Handler = prm.Handler metaPrm.Handler = prm.Handler
metaPrm.ContainersWorker = prm.ContainersWorkers metaPrm.ContainersWorker = prm.ContainersWorker
metaPrm.ObjectsWorker = prm.ObjectsWorkers metaPrm.ObjectsWorker = prm.ObjectsWorker
metaPrm.ExcludeContainer = prm.ExcludeContainer
metaPrm.CalcExcluded = prm.CalcExcluded
err := s.metaBase.ListConcurrently(ctx, metaPrm) err := s.metaBase.ListConcurrently(ctx, metaPrm)
if err != nil { if err != nil {
return fmt.Errorf("could not list objects concurrently: %w", err) return fmt.Errorf("could not list objects concurrently: %w", err)