diff --git a/internal/logs/logs.go b/internal/logs/logs.go index ed88e615..1c5815c6 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -53,6 +53,7 @@ const ( PolicerRoutineStopped = "routine stopped" PolicerFailureAtObjectSelectForReplication = "failure at object select for replication" PolicerPoolSubmission = "pool submission" + PolicerUnableToProcessObj = "unable to process object" ReplicatorFinishWork = "finish work" ReplicatorCouldNotGetObjectFromLocalStorage = "could not get object from local storage" ReplicatorCouldNotReplicateObject = "could not replicate object" diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index bc82b144..c0427346 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -3,6 +3,7 @@ package policer import ( "context" "errors" + "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" @@ -16,48 +17,33 @@ import ( "go.uber.org/zap" ) -func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) { +func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.AddressWithType) error { addr := addrWithType.Address idCnr := addr.Container() idObj := addr.Object() cnr, err := p.cnrSrc.Get(idCnr) if err != nil { - p.log.Error(logs.PolicerCouldNotGetContainer, - zap.Stringer("cid", idCnr), - zap.String("error", err.Error()), - ) if client.IsErrContainerNotFound(err) { - existed, err := containercore.WasRemoved(p.cnrSrc, idCnr) - if err != nil { - p.log.Error(logs.PolicerCouldNotConfirmContainerRemoval, - zap.Stringer("cid", idCnr), - zap.Stringer("oid", idObj), - zap.String("error", err.Error())) + existed, errWasRemoved := containercore.WasRemoved(p.cnrSrc, idCnr) + if errWasRemoved != nil { + return fmt.Errorf("%s: %w", logs.PolicerCouldNotConfirmContainerRemoval, errWasRemoved) } else if existed { err := p.buryFn(ctx, addrWithType.Address) if err != nil { - p.log.Error(logs.PolicerCouldNotInhumeObjectWithMissingContainer, - zap.Stringer("cid", idCnr), - zap.Stringer("oid", idObj), - zap.String("error", err.Error())) + return fmt.Errorf("%s: %w", logs.PolicerCouldNotInhumeObjectWithMissingContainer, err) } } } - return + return fmt.Errorf("%s: %w", logs.PolicerCouldNotGetContainer, err) } policy := cnr.Value.PlacementPolicy() nn, err := p.placementBuilder.BuildPlacement(idCnr, &idObj, policy) if err != nil { - p.log.Error(logs.PolicerCouldNotBuildPlacementVectorForObject, - zap.Stringer("cid", idCnr), - zap.String("error", err.Error()), - ) - - return + return fmt.Errorf("%s: %w", logs.PolicerCouldNotBuildPlacementVectorForObject, err) } c := &placementRequirements{} @@ -73,7 +59,7 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add for i := range nn { select { case <-ctx.Done(): - return + return ctx.Err() default: } @@ -87,6 +73,7 @@ func (p *Policer) processObject(ctx context.Context, addrWithType objectcore.Add p.cbRedundantCopy(ctx, addr) } + return nil } type placementRequirements struct { diff --git a/pkg/services/policer/policer_test.go b/pkg/services/policer/policer_test.go index 8b3eecd3..c73d3362 100644 --- a/pkg/services/policer/policer_test.go +++ b/pkg/services/policer/policer_test.go @@ -12,6 +12,7 @@ import ( objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" @@ -245,7 +246,8 @@ func TestProcessObject(t *testing.T) { Type: ti.objType, } - p.processObject(context.Background(), addrWithType) + err := p.processObject(context.Background(), addrWithType) + require.NoError(t, err) sort.Ints(gotReplicateTo) require.Equal(t, ti.wantRemoveRedundant, gotRemoveRedundant) @@ -254,6 +256,35 @@ func TestProcessObject(t *testing.T) { } } +func TestProcessObjectError(t *testing.T) { + addr := oidtest.Address() + // Container source + cnr := &container.Container{} + cnr.Value.Init() + source := containerSrc{ + get: func(id cid.ID) (*container.Container, error) { + return nil, new(apistatus.ContainerNotFound) + }, + deletionInfo: func(id cid.ID) (*container.DelInfo, error) { + return nil, new(apistatus.ContainerNotFound) + }, + } + buryFn := func(ctx context.Context, a oid.Address) error { + t.Errorf("unexpected object buried: %v", a) + return nil + } + p := New( + WithContainerSource(source), + WithBuryFunc(buryFn), + ) + + addrWithType := objectcore.AddressWithType{ + Address: addr, + } + + require.True(t, client.IsErrContainerNotFound(p.processObject(context.Background(), addrWithType))) +} + func TestIteratorContract(t *testing.T) { addr := oidtest.Address() objs := []objectcore.AddressWithType{{ diff --git a/pkg/services/policer/process.go b/pkg/services/policer/process.go index 1f61c69f..2e8fe929 100644 --- a/pkg/services/policer/process.go +++ b/pkg/services/policer/process.go @@ -52,7 +52,12 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) { } if p.objsInWork.add(addr.Address) { - p.processObject(ctx, addr) + err := p.processObject(ctx, addr) + if err != nil { + p.log.Error(logs.PolicerUnableToProcessObj, + zap.Stringer("object", addr.Address), + zap.String("error", err.Error())) + } p.cache.Add(addr.Address, time.Now()) p.objsInWork.remove(addr.Address) }