From 4fff95a38668b58a3b388764c556b0c56b16ee28 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 10 Jul 2024 10:32:51 +0300 Subject: [PATCH 1/6] [#1237] getSvc: Process EC container concurrently Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 1 + pkg/services/object/get/container.go | 162 +++++++++++++++++++++++++++ pkg/services/object/get/get_test.go | 16 ++- 3 files changed, 178 insertions(+), 1 deletion(-) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 5a195f688..65e19ed58 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -123,6 +123,7 @@ const ( GetRequestedObjectIsVirtual = "requested object is virtual" GetRequestedObjectIsEC = "requested object is erasure-coded" GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds" + GetCouldNotGetContainer = "could not get container" PutAdditionalContainerBroadcastFailure = "additional container broadcast failure" SearchReturnResultDirectly = "return result directly" SearchCouldNotConstructRemoteNodeClient = "could not construct remote node client" diff --git a/pkg/services/object/get/container.go b/pkg/services/object/get/container.go index d22b14192..0ddfb5de0 100644 --- a/pkg/services/object/get/container.go +++ b/pkg/services/object/get/container.go @@ -2,10 +2,19 @@ package getsvc import ( "context" + "encoding/hex" + "errors" + "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) func (r *request) executeOnContainer(ctx context.Context) { @@ -53,11 +62,28 @@ func (r *request) processCurrentEpoch(ctx context.Context) bool { return true } + // In most cases, the value from the cache will be returned. + // Container appears in the cache when traverser is generated. + cnr, err := r.containerSource.Get(r.address().Container()) + if err != nil { + r.status = statusUndefined + r.err = err + r.log.Debug(logs.GetCouldNotGetContainer, zap.Error(err)) + return true + } + ctx, cancel := context.WithCancel(ctx) defer cancel() r.status = statusUndefined + if policy.IsECPlacement(cnr.Value.PlacementPolicy()) { + return r.processECNodes(ctx, traverser, policy.ECDataCount(cnr.Value.PlacementPolicy())) + } + return r.processRepNodes(ctx, traverser) +} + +func (r *request) processRepNodes(ctx context.Context, traverser *placement.Traverser) bool { for { addrs := traverser.Next() if len(addrs) == 0 { @@ -91,3 +117,139 @@ func (r *request) processCurrentEpoch(ctx context.Context) bool { } } } + +func (r *request) processECNodes(ctx context.Context, traverser *placement.Traverser, dataCount int) bool { + err := r.traverseECNodes(ctx, traverser, dataCount) + + var errSplitInfo *objectSDK.SplitInfoError + var errECInfo *objectSDK.ECInfoError + var errRemoved *apistatus.ObjectAlreadyRemoved + var errOutOfRange *apistatus.ObjectOutOfRange + var errSuccess *ecGetSuccessErr + + switch { + case err == nil: // nil is returned if all nodes failed or incomplete EC info received + if len(r.infoEC.Chunks) > 0 { + r.status = statusEC + r.err = objectSDK.NewECInfoError(r.infoEC) + } else { + r.status = statusUndefined + r.err = new(apistatus.ObjectNotFound) + } + case errors.As(err, &errRemoved): + r.status = statusINHUMED + r.err = errRemoved + case errors.As(err, &errOutOfRange): + r.status = statusOutOfRange + r.err = errOutOfRange + case errors.As(err, &errSplitInfo): + r.status = statusVIRTUAL + mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo()) + r.err = objectSDK.NewSplitInfoError(r.infoSplit) + case errors.As(err, &errECInfo): + r.status = statusEC + r.err = err + case errors.As(err, &errSuccess): + r.status = statusOK + r.err = nil + if errSuccess.Object != nil { + r.collectedObject = errSuccess.Object + r.writeCollectedObject(ctx) + } + } + return r.status != statusUndefined +} + +func (r *request) traverseECNodes(ctx context.Context, traverser *placement.Traverser, dataCount int) error { + nodes := make(chan placement.Node, dataCount) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + for { + batch := traverser.Next() + if len(batch) == 0 { + r.log.Debug(logs.NoMoreNodesAbortPlacementIteration) + close(nodes) + return + } + for _, node := range batch { + select { + case <-ctx.Done(): + return + case nodes <- node: + } + } + } + }() + + err := r.processECNodesRequests(ctx, nodes, dataCount) + cancel() + wg.Wait() + return err +} + +func (r *request) processECNodesRequests(ctx context.Context, nodes <-chan placement.Node, dataCount int) error { + var ecInfoGuard sync.Mutex + eg, ctx := errgroup.WithContext(ctx) + eg.SetLimit(dataCount) + for node := range nodes { + var info client.NodeInfo + client.NodeInfoFromNetmapElement(&info, node) + eg.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + rs, err := r.remoteStorageConstructor.Get(info) + if err != nil { + r.log.Debug(logs.GetCouldNotConstructRemoteNodeClient, zap.String("node_key", hex.EncodeToString(info.PublicKey()))) + return err + } + obj, err := r.getRemote(ctx, rs, info) + var errSplitInfo *objectSDK.SplitInfoError + var errECInfo *objectSDK.ECInfoError + var errRemoved *apistatus.ObjectAlreadyRemoved + var errOutOfRange *apistatus.ObjectOutOfRange + + switch { + default: + // something failed, continue + r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err)) + return nil + case err == nil: + // non EC object found (tombstone, linking, lock), stop + return &ecGetSuccessErr{Object: obj} + case errors.As(err, &errRemoved) || errors.As(err, &errOutOfRange) || errors.As(err, &errSplitInfo): + // non EC error found, stop + return err + case errors.As(err, &errECInfo): + ecInfoGuard.Lock() + defer ecInfoGuard.Unlock() + r.infoEC = util.MergeECInfo(errECInfo.ECInfo(), r.infoEC) + if r.isRaw() { + if len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total) { + return objectSDK.NewECInfoError(r.infoEC) + } + return nil + } + if len(r.infoEC.Chunks) >= dataCount { + return objectSDK.NewECInfoError(r.infoEC) + } + return nil + } + }) + } + + return eg.Wait() +} + +type ecGetSuccessErr struct { + Object *objectSDK.Object +} + +func (s *ecGetSuccessErr) Error() string { return "" } diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index 988cd6982..8de206ec0 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -11,6 +11,7 @@ import ( "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" @@ -273,6 +274,16 @@ func (ks *testKeyStorage) GetKey(_ *util.SessionInfo) (*ecdsa.PrivateKey, error) return &ecdsa.PrivateKey{}, nil } +type testContainerSource struct{} + +func (s *testContainerSource) Get(idCnr cid.ID) (*containerCore.Container, error) { + return &containerCore.Container{ + Value: container.Container{}, + }, nil +} + +func (s *testContainerSource) DeletionInfo(cid.ID) (*containerCore.DelInfo, error) { return nil, nil } + func TestGetLocalOnly(t *testing.T) { ctx := context.Background() @@ -551,6 +562,7 @@ func TestGetRemoteSmall(t *testing.T) { epochSource: testEpochReceiver(curEpoch), remoteStorageConstructor: c, keyStore: &testKeyStorage{}, + containerSource: &testContainerSource{}, } } @@ -1722,6 +1734,7 @@ func TestGetRange(t *testing.T) { epochSource: testEpochReceiver(curEpoch), remoteStorageConstructor: c, keyStore: &testKeyStorage{}, + containerSource: &testContainerSource{}, } } @@ -1879,7 +1892,8 @@ func TestGetFromPastEpoch(t *testing.T) { as[1][1]: c22, }, }, - keyStore: &testKeyStorage{}, + keyStore: &testKeyStorage{}, + containerSource: &testContainerSource{}, } w := NewSimpleObjectWriter() -- 2.45.2 From 1a289745a400ebccf8c26e451bf9d2ac51aa3e12 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 10 Jul 2024 12:14:44 +0300 Subject: [PATCH 2/6] [#1237] getSvc: Return error if EC object found in REP container Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 2 +- pkg/services/object/get/remote.go | 26 ++++++-------------------- 2 files changed, 7 insertions(+), 21 deletions(-) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 65e19ed58..06d8668b2 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -107,7 +107,7 @@ const ( GetUnableToGetAllPartsECObject = "unable to get all parts, continue to reconstruct with existed" GetUnableToGetPartECObject = "unable to get part of the erasure-encoded object" GetUnableToHeadPartECObject = "unable to head part of the erasure-encoded object" - GetUnableToGetECObjectContainer = "unable to get container for erasure-coded object" + GetECObjectInRepContainer = "found erasure-coded object in REP container" GetAssemblingSplittedObjectCompleted = "assembling splitted object completed" GetAssemblingECObjectCompleted = "assembling erasure-coded object completed" GetFailedToAssembleSplittedObject = "failed to assemble splitted object" diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go index 302a4a4bc..9efd083c4 100644 --- a/pkg/services/object/get/remote.go +++ b/pkg/services/object/get/remote.go @@ -7,14 +7,14 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "go.uber.org/zap" ) +var errECObjectInRepContainer = errors.New("found erasure-coded object in REP container") + func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool { ctx, span := tracing.StartSpanFromContext(ctx, "getService.processNode") defer span.End() @@ -36,11 +36,6 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool { switch { default: r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err)) - if r.status == statusEC { - // we need to continue getting another chunks from another nodes - // in case of network issue - return false - } r.status = statusUndefined r.err = new(apistatus.ObjectNotFound) case err == nil: @@ -65,19 +60,10 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool { mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo()) r.err = objectSDK.NewSplitInfoError(r.infoSplit) case errors.As(err, &errECInfo): - r.status = statusEC - util.MergeECInfo(r.infoEC, errECInfo.ECInfo()) - r.infoEC = errECInfo.ECInfo() - r.err = objectSDK.NewECInfoError(r.infoEC) - if r.isRaw() { - return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total) - } - cnt, err := r.containerSource.Get(r.address().Container()) - if err == nil { - return len(r.infoEC.Chunks) == policy.ECDataCount(cnt.Value.PlacementPolicy()) - } - r.log.Debug(logs.GetUnableToGetECObjectContainer, zap.Error(err)) - return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total) + r.log.Error(logs.GetECObjectInRepContainer, zap.Stringer("address", r.address())) + r.status = statusUndefined + r.err = errECObjectInRepContainer + return true } return r.status != statusUndefined -- 2.45.2 From fa642d95c7e274fe3466b911d71d93ef28d3b467 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 10 Jul 2024 12:15:38 +0300 Subject: [PATCH 3/6] [#1237] getSvc: Rename method Signed-off-by: Dmitrii Stepanov --- pkg/services/object/get/container.go | 2 +- pkg/services/object/get/remote.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/services/object/get/container.go b/pkg/services/object/get/container.go index 0ddfb5de0..b61780ad4 100644 --- a/pkg/services/object/get/container.go +++ b/pkg/services/object/get/container.go @@ -110,7 +110,7 @@ func (r *request) processRepNodes(ctx context.Context, traverser *placement.Trav client.NodeInfoFromNetmapElement(&info, addrs[i]) - if r.processNode(ctx, info) { + if r.processRepNode(ctx, info) { r.log.Debug(logs.GetCompletingTheOperation) return true } diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go index 9efd083c4..b7700143f 100644 --- a/pkg/services/object/get/remote.go +++ b/pkg/services/object/get/remote.go @@ -15,8 +15,8 @@ import ( var errECObjectInRepContainer = errors.New("found erasure-coded object in REP container") -func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool { - ctx, span := tracing.StartSpanFromContext(ctx, "getService.processNode") +func (r *request) processRepNode(ctx context.Context, info client.NodeInfo) bool { + ctx, span := tracing.StartSpanFromContext(ctx, "getService.processRepNode") defer span.End() r.log.Debug(logs.ProcessingNode, zap.String("node_key", hex.EncodeToString(info.PublicKey()))) -- 2.45.2 From 0a2f60880ecd0c4e5ed4ef60324d02e81cb2e75f Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 10 Jul 2024 12:17:11 +0300 Subject: [PATCH 4/6] [#1237] getSvc: Add tracing span for EC processing Signed-off-by: Dmitrii Stepanov --- pkg/services/object/get/container.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/services/object/get/container.go b/pkg/services/object/get/container.go index b61780ad4..203fa3c78 100644 --- a/pkg/services/object/get/container.go +++ b/pkg/services/object/get/container.go @@ -11,6 +11,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" + "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "go.uber.org/zap" @@ -119,6 +120,9 @@ func (r *request) processRepNodes(ctx context.Context, traverser *placement.Trav } func (r *request) processECNodes(ctx context.Context, traverser *placement.Traverser, dataCount int) bool { + ctx, span := tracing.StartSpanFromContext(ctx, "getService.processECNodes") + defer span.End() + err := r.traverseECNodes(ctx, traverser, dataCount) var errSplitInfo *objectSDK.SplitInfoError -- 2.45.2 From bdd56cf8d51d2893a244745136397e5f6fedab4b Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 10 Jul 2024 15:08:39 +0300 Subject: [PATCH 5/6] [#1237] objectSvc: Return container from traverser generator Signed-off-by: Dmitrii Stepanov --- pkg/services/object/get/container.go | 12 +----------- pkg/services/object/get/get_test.go | 7 +++++-- pkg/services/object/get/request.go | 13 +++++-------- pkg/services/object/get/types.go | 3 ++- pkg/services/object/search/container.go | 2 +- pkg/services/object/search/search_test.go | 6 ++++-- pkg/services/object/search/service.go | 3 ++- pkg/services/object/util/placement.go | 12 ++++++++---- 8 files changed, 28 insertions(+), 30 deletions(-) diff --git a/pkg/services/object/get/container.go b/pkg/services/object/get/container.go index 203fa3c78..6ea3c7806 100644 --- a/pkg/services/object/get/container.go +++ b/pkg/services/object/get/container.go @@ -58,21 +58,11 @@ func (r *request) processCurrentEpoch(ctx context.Context) bool { zap.Uint64("number", r.curProcEpoch), ) - traverser, ok := r.generateTraverser(r.address()) + traverser, cnr, ok := r.generateTraverser(r.address()) if !ok { return true } - // In most cases, the value from the cache will be returned. - // Container appears in the cache when traverser is generated. - cnr, err := r.containerSource.Get(r.address().Container()) - if err != nil { - r.status = statusUndefined - r.err = err - r.log.Debug(logs.GetCouldNotGetContainer, zap.Error(err)) - return true - } - ctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index 8de206ec0..ca30204f7 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -79,7 +79,7 @@ func newTestStorage() *testStorage { } } -func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, error) { +func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, *containerCore.Container, error) { opts := make([]placement.Option, 0, 4) opts = append(opts, placement.ForContainer(g.c), @@ -91,7 +91,10 @@ func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e ui opts = append(opts, placement.ForObject(*obj)) } - return placement.NewTraverser(opts...) + t, err := placement.NewTraverser(opts...) + return t, &containerCore.Container{ + Value: g.c, + }, err } func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { diff --git a/pkg/services/object/get/request.go b/pkg/services/object/get/request.go index d0b79e30c..1424c8b67 100644 --- a/pkg/services/object/get/request.go +++ b/pkg/services/object/get/request.go @@ -138,22 +138,19 @@ func (r *request) initEpoch() bool { } } -func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, bool) { +func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, *container.Container, bool) { obj := addr.Object() - t, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch) - - switch { - default: + t, cnr, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch) + if err != nil { r.status = statusUndefined r.err = err r.log.Debug(logs.GetCouldNotGenerateContainerTraverser, zap.Error(err)) - return nil, false - case err == nil: - return t, true + return nil, nil, false } + return t, cnr, true } func (r *request) getRemoteStorage(info clientcore.NodeInfo) (remoteStorage, bool) { diff --git a/pkg/services/object/get/types.go b/pkg/services/object/get/types.go index a866132cc..839e180a1 100644 --- a/pkg/services/object/get/types.go +++ b/pkg/services/object/get/types.go @@ -6,6 +6,7 @@ import ( "errors" coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" @@ -23,7 +24,7 @@ type epochSource interface { } type traverserGenerator interface { - GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error) + GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error) } type keyStorage interface { diff --git a/pkg/services/object/search/container.go b/pkg/services/object/search/container.go index a8865f5f0..d70574156 100644 --- a/pkg/services/object/search/container.go +++ b/pkg/services/object/search/container.go @@ -47,7 +47,7 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) error { zap.Uint64("number", exec.curProcEpoch), ) - traverser, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch) + traverser, _, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch) if err != nil { return fmt.Errorf("%s: %w", logs.SearchCouldNotGenerateContainerTraverser, err) } diff --git a/pkg/services/object/search/search_test.go b/pkg/services/object/search/search_test.go index 802fa33ef..679380402 100644 --- a/pkg/services/object/search/search_test.go +++ b/pkg/services/object/search/search_test.go @@ -11,6 +11,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" @@ -80,12 +81,13 @@ func newTestStorage() *testStorage { } } -func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, error) { - return placement.NewTraverser( +func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, *containerCore.Container, error) { + t, err := placement.NewTraverser( placement.ForContainer(g.c), placement.UseBuilder(g.b[epoch]), placement.WithoutSuccessTracking(), ) + return t, &containerCore.Container{Value: g.c}, err } func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { diff --git a/pkg/services/object/search/service.go b/pkg/services/object/search/service.go index 0a54f54c5..cc388c1b2 100644 --- a/pkg/services/object/search/service.go +++ b/pkg/services/object/search/service.go @@ -4,6 +4,7 @@ import ( "context" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" @@ -45,7 +46,7 @@ type cfg struct { } traverserGenerator interface { - GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error) + GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error) } currentEpochReceiver interface { diff --git a/pkg/services/object/util/placement.go b/pkg/services/object/util/placement.go index 6cd3856f4..1bd39f9ea 100644 --- a/pkg/services/object/util/placement.go +++ b/pkg/services/object/util/placement.go @@ -122,17 +122,17 @@ func (g *TraverserGenerator) WithTraverseOptions(opts ...placement.Option) *Trav // GenerateTraverser generates placement Traverser for provided object address // using epoch-th network map. -func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, error) { +func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, *container.Container, error) { // get network map by epoch nm, err := g.netMapSrc.GetNetMapByEpoch(epoch) if err != nil { - return nil, fmt.Errorf("could not get network map #%d: %w", epoch, err) + return nil, nil, fmt.Errorf("could not get network map #%d: %w", epoch, err) } // get container related container cnr, err := g.cnrSrc.Get(idCnr) if err != nil { - return nil, fmt.Errorf("could not get container: %w", err) + return nil, nil, fmt.Errorf("could not get container: %w", err) } // allocate placement traverser options @@ -160,5 +160,9 @@ func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoc ) } - return placement.NewTraverser(traverseOpts...) + t, err := placement.NewTraverser(traverseOpts...) + if err != nil { + return nil, nil, err + } + return t, cnr, nil } -- 2.45.2 From 50b9dea0fbced5aa9dc63b301be971862a7cc529 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Fri, 12 Jul 2024 09:20:45 +0300 Subject: [PATCH 6/6] [#1237] objectSvc: Collect EC parts right after HEAD Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 5 +- pkg/services/object/get/assemble.go | 3 +- pkg/services/object/get/assembleec.go | 4 +- pkg/services/object/get/assemblerec.go | 91 ++++++++--------------- pkg/services/object/get/container.go | 99 +++++++++++++++++++++----- pkg/services/object/get/get.go | 3 +- pkg/services/object/get/local.go | 63 +++++++++++++++- pkg/services/object/get/remote.go | 47 ++++++++++++ pkg/services/object/get/request.go | 7 +- 9 files changed, 232 insertions(+), 90 deletions(-) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 06d8668b2..8ab2a9ed6 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -104,9 +104,8 @@ const ( GetTryingToAssembleTheECObject = "trying to assemble the ec object..." GetAssemblingSplittedObject = "assembling splitted object..." GetAssemblingECObject = "assembling erasure-coded object..." - GetUnableToGetAllPartsECObject = "unable to get all parts, continue to reconstruct with existed" - GetUnableToGetPartECObject = "unable to get part of the erasure-encoded object" - GetUnableToHeadPartECObject = "unable to head part of the erasure-encoded object" + GetUnableToGetECPartLocal = "failed to get EC part from local storage" + GetUnableToGetECPartRemote = "failed to get EC part from remote node" GetECObjectInRepContainer = "found erasure-coded object in REP container" GetAssemblingSplittedObjectCompleted = "assembling splitted object completed" GetAssemblingECObjectCompleted = "assembling erasure-coded object completed" diff --git a/pkg/services/object/get/assemble.go b/pkg/services/object/get/assemble.go index 66c4580b0..1bb60470e 100644 --- a/pkg/services/object/get/assemble.go +++ b/pkg/services/object/get/assemble.go @@ -140,7 +140,8 @@ func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm Reque prm: prm, infoSplit: objectSDK.NewSplitInfo(), - infoEC: objectSDK.NewECInfo(), + infoEC: make(map[uint32]objectSDK.ECChunk), + partsEC: make(map[uint32]*objectSDK.Object), log: r.log, } diff --git a/pkg/services/object/get/assembleec.go b/pkg/services/object/get/assembleec.go index 5c999929a..d096fba3b 100644 --- a/pkg/services/object/get/assembleec.go +++ b/pkg/services/object/get/assembleec.go @@ -36,7 +36,7 @@ func (r *request) assembleEC(ctx context.Context) { r.log.Debug(logs.GetTryingToAssembleTheECObject) r.prm.common = r.prm.common.WithLocalOnly(false) - assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log) + assembler := newAssemblerEC(r.address(), r.partsEC, r.infoEC, r.ctxRange(), r.headOnly(), r.containerSource) r.log.Debug(logs.GetAssemblingECObject, zap.Uint64("range_offset", r.ctxRange().GetOffset()), @@ -47,7 +47,7 @@ func (r *request) assembleEC(ctx context.Context) { zap.Uint64("range_length", r.ctxRange().GetLength()), ) - obj, err := assembler.Assemble(ctx, r.prm.objWriter, r.headOnly()) + obj, err := assembler.Assemble(ctx, r.prm.objWriter) if err != nil { r.log.Warn(logs.GetFailedToAssembleECObject, zap.Error(err), diff --git a/pkg/services/object/get/assemblerec.go b/pkg/services/object/get/assemblerec.go index 4a624e467..3266529af 100644 --- a/pkg/services/object/get/assemblerec.go +++ b/pkg/services/object/get/assemblerec.go @@ -2,51 +2,46 @@ package getsvc import ( "context" - "fmt" - "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.uber.org/zap" - "golang.org/x/sync/errgroup" ) type assemblerec struct { - addr oid.Address - ecInfo *objectSDK.ECInfo - rng *objectSDK.Range - objGetter objectGetter - cs container.Source - log *logger.Logger + addr oid.Address + ecParts map[uint32]*objectSDK.Object + ecChunks map[uint32]objectSDK.ECChunk + rng *objectSDK.Range + head bool + cs container.Source } func newAssemblerEC( addr oid.Address, - ecInfo *objectSDK.ECInfo, + ecParts map[uint32]*objectSDK.Object, + ecChunks map[uint32]objectSDK.ECChunk, rng *objectSDK.Range, - objGetter objectGetter, + head bool, cs container.Source, - log *logger.Logger, ) *assemblerec { return &assemblerec{ - addr: addr, - rng: rng, - ecInfo: ecInfo, - objGetter: objGetter, - cs: cs, - log: log, + addr: addr, + rng: rng, + head: head, + ecParts: ecParts, + ecChunks: ecChunks, + cs: cs, } } // Assemble assembles erasure-coded object and writes it's content to ObjectWriter. // It returns parent object. -func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) { - if headOnly { +func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { + if a.head { return a.reconstructHeader(ctx, writer) } else if a.rng != nil { return a.reconstructRange(ctx, writer) @@ -65,7 +60,7 @@ func (a *assemblerec) getConstructor() (*erasurecode.Constructor, error) { } func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { - parts := a.retrieveParts(ctx, true) + parts := a.retrieveParts() c, err := a.getConstructor() if err != nil { return nil, err @@ -78,7 +73,7 @@ func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter } func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { - parts := a.retrieveParts(ctx, false) + parts := a.retrieveParts() c, err := a.getConstructor() if err != nil { return nil, err @@ -101,7 +96,7 @@ func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) } func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { - parts := a.retrieveParts(ctx, false) + parts := a.retrieveParts() c, err := a.getConstructor() if err != nil { return nil, err @@ -119,41 +114,17 @@ func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter return obj, err } -func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object { - parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total)) - errGroup, ctx := errgroup.WithContext(mainCtx) - - for i := range a.ecInfo.Chunks { - chunk := a.ecInfo.Chunks[i] - errGroup.Go(func() error { - objID := new(oid.ID) - err := objID.ReadFromV2(chunk.ID) - if err != nil { - return fmt.Errorf("invalid object ID: %w", err) - } - var obj *objectSDK.Object - if headOnly { - obj, err = a.objGetter.HeadObject(ctx, *objID) - if err != nil { - a.log.Debug(logs.GetUnableToHeadPartECObject, zap.Stringer("part_id", objID), zap.Error(err)) - return nil - } - } else { - sow := NewSimpleObjectWriter() - obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow) - if err != nil { - a.log.Debug(logs.GetUnableToGetPartECObject, zap.Stringer("part_id", objID), zap.Error(err)) - return nil - } - obj.SetPayload(sow.pld) - } - parts[chunk.Index] = obj - return nil - }) - } - - if err := errGroup.Wait(); err != nil { - a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err)) +func (a *assemblerec) retrieveParts() []*objectSDK.Object { + parts := make([]*objectSDK.Object, a.getTotalCount()) + for idx := range parts { + parts[idx] = a.ecParts[uint32(idx)] } return parts } + +func (a *assemblerec) getTotalCount() uint32 { + for _, ch := range a.ecChunks { + return ch.Total + } + return 0 +} diff --git a/pkg/services/object/get/container.go b/pkg/services/object/get/container.go index 6ea3c7806..830b4fb4c 100644 --- a/pkg/services/object/get/container.go +++ b/pkg/services/object/get/container.go @@ -4,16 +4,17 @@ import ( "context" "encoding/hex" "errors" + "fmt" "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) @@ -69,7 +70,7 @@ func (r *request) processCurrentEpoch(ctx context.Context) bool { r.status = statusUndefined if policy.IsECPlacement(cnr.Value.PlacementPolicy()) { - return r.processECNodes(ctx, traverser, policy.ECDataCount(cnr.Value.PlacementPolicy())) + return r.processECNodes(ctx, traverser, policy.ECDataCount(cnr.Value.PlacementPolicy()), policy.ECParityCount(cnr.Value.PlacementPolicy())) } return r.processRepNodes(ctx, traverser) } @@ -109,10 +110,17 @@ func (r *request) processRepNodes(ctx context.Context, traverser *placement.Trav } } -func (r *request) processECNodes(ctx context.Context, traverser *placement.Traverser, dataCount int) bool { +func (r *request) processECNodes(ctx context.Context, traverser *placement.Traverser, dataCount, parityCount int) bool { ctx, span := tracing.StartSpanFromContext(ctx, "getService.processECNodes") defer span.End() + if !r.isRaw() && len(r.partsEC) >= dataCount { + return true + } + if r.isRaw() && len(r.infoEC) == dataCount+parityCount { + return true + } + err := r.traverseECNodes(ctx, traverser, dataCount) var errSplitInfo *objectSDK.SplitInfoError @@ -123,9 +131,9 @@ func (r *request) processECNodes(ctx context.Context, traverser *placement.Trave switch { case err == nil: // nil is returned if all nodes failed or incomplete EC info received - if len(r.infoEC.Chunks) > 0 { + if len(r.infoEC) > 0 { r.status = statusEC - r.err = objectSDK.NewECInfoError(r.infoEC) + r.err = r.createECInfoError() } else { r.status = statusUndefined r.err = new(apistatus.ObjectNotFound) @@ -187,7 +195,6 @@ func (r *request) traverseECNodes(ctx context.Context, traverser *placement.Trav } func (r *request) processECNodesRequests(ctx context.Context, nodes <-chan placement.Node, dataCount int) error { - var ecInfoGuard sync.Mutex eg, ctx := errgroup.WithContext(ctx) eg.SetLimit(dataCount) for node := range nodes { @@ -222,26 +229,82 @@ func (r *request) processECNodesRequests(ctx context.Context, nodes <-chan place // non EC error found, stop return err case errors.As(err, &errECInfo): - ecInfoGuard.Lock() - defer ecInfoGuard.Unlock() - r.infoEC = util.MergeECInfo(errECInfo.ECInfo(), r.infoEC) if r.isRaw() { - if len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total) { - return objectSDK.NewECInfoError(r.infoEC) - } - return nil + return r.appendECChunksWithCheck(errECInfo) } - if len(r.infoEC.Chunks) >= dataCount { - return objectSDK.NewECInfoError(r.infoEC) - } - return nil + return r.getECChunksRemote(ctx, errECInfo, info, dataCount) } }) } - return eg.Wait() } +func (r *request) appendECChunksWithCheck(errECInfo *objectSDK.ECInfoError) error { + r.ecGuard.Lock() + defer r.ecGuard.Unlock() + for _, ch := range errECInfo.ECInfo().Chunks { + r.infoEC[ch.Index] = objectSDK.ECChunk(ch) + } + if len(r.infoEC) == int(errECInfo.ECInfo().Chunks[0].Total) { + return r.createECInfoError() + } + return nil +} + +func (r *request) getECChunksRemote(ctx context.Context, errECInfo *objectSDK.ECInfoError, info client.NodeInfo, dataCount int) error { + for _, ch := range errECInfo.ECInfo().Chunks { + var objID oid.ID + err := objID.ReadFromV2(ch.ID) + if err != nil { + return fmt.Errorf("invalid object ID: %w", err) + } + + var address oid.Address + address.SetContainer(r.containerID()) + address.SetObject(objID) + var obj *objectSDK.Object + if r.headOnly() { + obj, err = r.headObjectFromNode(ctx, address, info) + } else { + obj, err = r.getObjectFromNode(ctx, address, info) + } + if err != nil { + r.log.Warn(logs.GetUnableToGetECPartRemote, zap.Error(err), zap.Stringer("part_address", address), + zap.String("node", hex.EncodeToString(info.PublicKey()))) + continue + } + + if err := r.appendECChunkAndObjectWithCheck(objectSDK.ECChunk(ch), obj, dataCount); err != nil { + return err + } + } + return nil +} + +func (r *request) appendECChunkAndObjectWithCheck(chunk objectSDK.ECChunk, object *objectSDK.Object, dataCount int) error { + if object == nil { + return nil + } + r.ecGuard.Lock() + defer r.ecGuard.Unlock() + + r.infoEC[chunk.Index] = chunk + r.partsEC[chunk.Index] = object + + if len(r.infoEC) >= dataCount && len(r.partsEC) >= dataCount { + return r.createECInfoError() + } + return nil +} + +func (r *request) createECInfoError() error { + ecInfo := objectSDK.NewECInfo() + for _, chunk := range r.infoEC { + ecInfo.AddChunk(objectSDK.ECChunk(chunk)) + } + return objectSDK.NewECInfoError(ecInfo) +} + type ecGetSuccessErr struct { Object *objectSDK.Object } diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index 9738935d2..b7f58e2b5 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -77,7 +77,8 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error { prm: prm, infoSplit: objectSDK.NewSplitInfo(), - infoEC: objectSDK.NewECInfo(), + infoEC: make(map[uint32]objectSDK.ECChunk), + partsEC: make(map[uint32]*objectSDK.Object), log: s.log, } diff --git a/pkg/services/object/get/local.go b/pkg/services/object/get/local.go index fcfc9befc..4acce0b37 100644 --- a/pkg/services/object/get/local.go +++ b/pkg/services/object/get/local.go @@ -3,12 +3,13 @@ package getsvc import ( "context" "errors" + "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) @@ -45,9 +46,19 @@ func (r *request) executeLocal(ctx context.Context) { mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo()) r.err = objectSDK.NewSplitInfoError(r.infoSplit) case errors.As(err, &errECInfo): + if r.isRaw() { + r.appendECChunks(errECInfo) + r.status = statusEC + r.err = r.createECInfoError() + break + } + if err := r.getECChunksLocal(ctx, errECInfo); err != nil { + r.status = statusUndefined + r.err = err + break + } r.status = statusEC - util.MergeECInfo(errECInfo.ECInfo(), r.infoEC) - r.err = objectSDK.NewECInfoError(r.infoEC) + r.err = r.createECInfoError() case errors.As(err, &errOutOfRange): r.status = statusOutOfRange r.err = errOutOfRange @@ -63,3 +74,49 @@ func (r *request) get(ctx context.Context) (*objectSDK.Object, error) { } return r.localStorage.Get(ctx, r.address()) } + +func (r *request) appendECChunks(errECInfo *objectSDK.ECInfoError) { + r.ecGuard.Lock() + defer r.ecGuard.Unlock() + for _, ch := range errECInfo.ECInfo().Chunks { + r.infoEC[ch.Index] = objectSDK.ECChunk(ch) + } +} + +func (r *request) appendECChunkAndObject(chunk objectSDK.ECChunk, object *objectSDK.Object) { + if object == nil { + return + } + r.ecGuard.Lock() + defer r.ecGuard.Unlock() + + r.infoEC[chunk.Index] = chunk + r.partsEC[chunk.Index] = object +} + +func (r *request) getECChunksLocal(ctx context.Context, errECInfo *objectSDK.ECInfoError) error { + for _, ch := range errECInfo.ECInfo().Chunks { + var objID oid.ID + err := objID.ReadFromV2(ch.ID) + if err != nil { + return fmt.Errorf("invalid object ID: %w", err) + } + + var address oid.Address + address.SetContainer(r.containerID()) + address.SetObject(objID) + var obj *objectSDK.Object + if r.headOnly() { + obj, err = r.localStorage.Head(ctx, address, false) + } else { + obj, err = r.localStorage.Get(ctx, address) + } + if err != nil { + r.log.Warn(logs.GetUnableToGetECPartLocal, zap.Error(err), zap.Stringer("part_address", address)) + continue + } + + r.appendECChunkAndObject(objectSDK.ECChunk(ch), obj) + } + return nil +} diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go index b7700143f..4c549457e 100644 --- a/pkg/services/object/get/remote.go +++ b/pkg/services/object/get/remote.go @@ -10,6 +10,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) @@ -102,3 +103,49 @@ func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.N return rs.Get(ctx, r.address(), prm) } + +func (r *request) getObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error) { + rs, err := r.remoteStorageConstructor.Get(info) + if err != nil { + return nil, err + } + + key, err := r.key() + if err != nil { + return nil, err + } + + prm := RemoteRequestParams{ + Epoch: r.curProcEpoch, + TTL: 1, + PrivateKey: key, + SessionToken: r.prm.common.SessionToken(), + BearerToken: r.prm.common.BearerToken(), + XHeaders: r.prm.common.XHeaders(), + } + + return rs.Get(ctx, addr, prm) +} + +func (r *request) headObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error) { + rs, err := r.remoteStorageConstructor.Get(info) + if err != nil { + return nil, err + } + + key, err := r.key() + if err != nil { + return nil, err + } + + prm := RemoteRequestParams{ + Epoch: r.curProcEpoch, + TTL: 1, + PrivateKey: key, + SessionToken: r.prm.common.SessionToken(), + BearerToken: r.prm.common.BearerToken(), + XHeaders: r.prm.common.XHeaders(), + } + + return rs.Head(ctx, addr, prm) +} diff --git a/pkg/services/object/get/request.go b/pkg/services/object/get/request.go index 1424c8b67..4536252f6 100644 --- a/pkg/services/object/get/request.go +++ b/pkg/services/object/get/request.go @@ -3,6 +3,7 @@ package getsvc import ( "context" "crypto/ecdsa" + "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" @@ -23,7 +24,9 @@ type request struct { infoSplit *objectSDK.SplitInfo - infoEC *objectSDK.ECInfo + ecGuard sync.Mutex + infoEC map[uint32]objectSDK.ECChunk + partsEC map[uint32]*objectSDK.Object log *logger.Logger @@ -220,7 +223,7 @@ func (r *request) writeCollectedObject(ctx context.Context) { // isForwardingEnabled returns true if common execution // parameters has request forwarding closure set. -func (r request) isForwardingEnabled() bool { +func (r *request) isForwardingEnabled() bool { return r.prm.forwarder != nil } -- 2.45.2