From fc383ea6ae3bfa98ade02a53bb591f5ad0cb4780 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 15 Jul 2024 18:03:43 +0300 Subject: [PATCH] [#1253] getSvc: Fix EC objects get Now EC objects assembling is performed concurrently. Also fixed issue with an error in case of getting EC object via non-container node. Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 2 + pkg/services/object/get/assemble.go | 2 +- pkg/services/object/get/assembleec.go | 19 +- pkg/services/object/get/assemblerec.go | 358 +++++++++++++++++----- pkg/services/object/get/get.go | 13 +- pkg/services/object/get/get_test.go | 9 +- pkg/services/object/get/local.go | 4 +- pkg/services/object/get/remote.go | 68 ++-- pkg/services/object/get/request.go | 4 +- pkg/services/object/get/types.go | 51 ++- pkg/services/object/search/container.go | 2 +- pkg/services/object/search/search_test.go | 6 +- pkg/services/object/search/service.go | 3 +- pkg/services/object/util/placement.go | 12 +- 14 files changed, 429 insertions(+), 124 deletions(-) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 5a195f68..67f173f2 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -108,6 +108,7 @@ const ( GetUnableToGetPartECObject = "unable to get part of the erasure-encoded object" GetUnableToHeadPartECObject = "unable to head part of the erasure-encoded object" GetUnableToGetECObjectContainer = "unable to get container for erasure-coded object" + GetUnableToHeadPartsECObject = "unable to head parts of the erasure-encoded object" GetAssemblingSplittedObjectCompleted = "assembling splitted object completed" GetAssemblingECObjectCompleted = "assembling erasure-coded object completed" GetFailedToAssembleSplittedObject = "failed to assemble splitted object" @@ -123,6 +124,7 @@ const ( GetRequestedObjectIsVirtual = "requested object is virtual" GetRequestedObjectIsEC = "requested object is erasure-coded" GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds" + GetUnexpectedECObject = "failed to get EC object from node: expected EC info, but got full object" PutAdditionalContainerBroadcastFailure = "additional container broadcast failure" SearchReturnResultDirectly = "return result directly" SearchCouldNotConstructRemoteNodeClient = "could not construct remote node client" diff --git a/pkg/services/object/get/assemble.go b/pkg/services/object/get/assemble.go index 66c4580b..ba6fddec 100644 --- a/pkg/services/object/get/assemble.go +++ b/pkg/services/object/get/assemble.go @@ -140,7 +140,7 @@ func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm Reque prm: prm, infoSplit: objectSDK.NewSplitInfo(), - infoEC: objectSDK.NewECInfo(), + infoEC: newECInfo(), log: r.log, } diff --git a/pkg/services/object/get/assembleec.go b/pkg/services/object/get/assembleec.go index 5c999929..7bbd9ca1 100644 --- a/pkg/services/object/get/assembleec.go +++ b/pkg/services/object/get/assembleec.go @@ -6,11 +6,12 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "go.uber.org/zap" ) func (r *request) assembleEC(ctx context.Context) { - if r.isRaw() { + if r.isRaw() && r.isLocal() { r.log.Debug(logs.GetCanNotAssembleTheObject) return } @@ -35,8 +36,14 @@ func (r *request) assembleEC(ctx context.Context) { r.log.Debug(logs.GetTryingToAssembleTheECObject) + // initialize epoch number + ok := r.initEpoch() + if !ok { + return + } + r.prm.common = r.prm.common.WithLocalOnly(false) - assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log) + assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.localStorage, r.containerSource, r.log, r.headOnly(), r.isRaw(), r.traverserGenerator, r.curProcEpoch) r.log.Debug(logs.GetAssemblingECObject, zap.Uint64("range_offset", r.ctxRange().GetOffset()), @@ -47,8 +54,8 @@ func (r *request) assembleEC(ctx context.Context) { zap.Uint64("range_length", r.ctxRange().GetLength()), ) - obj, err := assembler.Assemble(ctx, r.prm.objWriter, r.headOnly()) - if err != nil { + obj, err := assembler.Assemble(ctx, r.prm.objWriter) + if err != nil && !errors.As(err, new(*objectSDK.ECInfoError)) { r.log.Warn(logs.GetFailedToAssembleECObject, zap.Error(err), zap.Uint64("range_offset", r.ctxRange().GetOffset()), @@ -58,6 +65,7 @@ func (r *request) assembleEC(ctx context.Context) { var errRemoved *apistatus.ObjectAlreadyRemoved var errOutOfRange *apistatus.ObjectOutOfRange + var errECInfo *objectSDK.ECInfoError switch { default: @@ -73,5 +81,8 @@ func (r *request) assembleEC(ctx context.Context) { case errors.As(err, &errOutOfRange): r.status = statusOutOfRange r.err = errOutOfRange + case errors.As(err, &errECInfo): + r.status = statusEC + r.err = err } } diff --git a/pkg/services/object/get/assemblerec.go b/pkg/services/object/get/assemblerec.go index 4a624e46..a4021ee5 100644 --- a/pkg/services/object/get/assemblerec.go +++ b/pkg/services/object/get/assemblerec.go @@ -2,11 +2,16 @@ package getsvc import ( "context" + "encoding/hex" + "errors" "fmt" + "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -16,61 +21,79 @@ import ( "golang.org/x/sync/errgroup" ) +var errECPartsRetrieveCompleted = errors.New("EC parts receive completed") + +type ecRemoteStorage interface { + getObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error) + headObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo, raw bool) (*objectSDK.Object, error) +} + type assemblerec struct { - addr oid.Address - ecInfo *objectSDK.ECInfo - rng *objectSDK.Range - objGetter objectGetter - cs container.Source - log *logger.Logger + addr oid.Address + ecInfo *ecInfo + rng *objectSDK.Range + remoteStorage ecRemoteStorage + localStorage localStorage + cs container.Source + log *logger.Logger + head bool + raw bool + traverserGenerator traverserGenerator + epoch uint64 } func newAssemblerEC( addr oid.Address, - ecInfo *objectSDK.ECInfo, + ecInfo *ecInfo, rng *objectSDK.Range, - objGetter objectGetter, + remoteStorage ecRemoteStorage, + localStorage localStorage, cs container.Source, log *logger.Logger, + head bool, + raw bool, + tg traverserGenerator, + epoch uint64, ) *assemblerec { return &assemblerec{ - addr: addr, - rng: rng, - ecInfo: ecInfo, - objGetter: objGetter, - cs: cs, - log: log, + addr: addr, + rng: rng, + ecInfo: ecInfo, + remoteStorage: remoteStorage, + localStorage: localStorage, + cs: cs, + log: log, + head: head, + raw: raw, + traverserGenerator: tg, + epoch: epoch, } } // Assemble assembles erasure-coded object and writes it's content to ObjectWriter. // It returns parent object. -func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) { - if headOnly { +func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { + switch { + case a.raw: + err := a.reconstructRawError(ctx) + return nil, err + case a.head: return a.reconstructHeader(ctx, writer) - } else if a.rng != nil { + case a.rng != nil: return a.reconstructRange(ctx, writer) + default: + return a.reconstructObject(ctx, writer) } - return a.reconstructObject(ctx, writer) } -func (a *assemblerec) getConstructor() (*erasurecode.Constructor, error) { - cnt, err := a.cs.Get(a.addr.Container()) - if err != nil { - return nil, err - } - dataCount := policy.ECDataCount(cnt.Value.PlacementPolicy()) - parityCount := policy.ECParityCount(cnt.Value.PlacementPolicy()) +func (a *assemblerec) getConstructor(cnr *container.Container) (*erasurecode.Constructor, error) { + dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy()) + parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy()) return erasurecode.NewConstructor(dataCount, parityCount) } func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { - parts := a.retrieveParts(ctx, true) - c, err := a.getConstructor() - if err != nil { - return nil, err - } - obj, err := c.ReconstructHeader(parts) + obj, err := a.reconstructObjectFromParts(ctx, true) if err == nil { return obj, writer.WriteHeader(ctx, obj) } @@ -78,12 +101,7 @@ func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter } func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { - parts := a.retrieveParts(ctx, false) - c, err := a.getConstructor() - if err != nil { - return nil, err - } - obj, err := c.Reconstruct(parts) + obj, err := a.reconstructObjectFromParts(ctx, false) if err != nil { return nil, err } @@ -101,12 +119,7 @@ func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) } func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { - parts := a.retrieveParts(ctx, false) - c, err := a.getConstructor() - if err != nil { - return nil, err - } - obj, err := c.Reconstruct(parts) + obj, err := a.reconstructObjectFromParts(ctx, false) if err == nil { err = writer.WriteHeader(ctx, obj.CutPayload()) if err == nil { @@ -119,41 +132,238 @@ func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter return obj, err } -func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object { - parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total)) - errGroup, ctx := errgroup.WithContext(mainCtx) +func (a *assemblerec) reconstructObjectFromParts(ctx context.Context, headers bool) (*objectSDK.Object, error) { + objID := a.addr.Object() + trav, cnr, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch) + if err != nil { + return nil, err + } + c, err := a.getConstructor(cnr) + if err != nil { + return nil, err + } + parts := a.retrieveParts(ctx, trav, cnr) + if headers { + return c.ReconstructHeader(parts) + } + return c.Reconstruct(parts) +} - for i := range a.ecInfo.Chunks { - chunk := a.ecInfo.Chunks[i] - errGroup.Go(func() error { - objID := new(oid.ID) - err := objID.ReadFromV2(chunk.ID) - if err != nil { - return fmt.Errorf("invalid object ID: %w", err) - } - var obj *objectSDK.Object - if headOnly { - obj, err = a.objGetter.HeadObject(ctx, *objID) - if err != nil { - a.log.Debug(logs.GetUnableToHeadPartECObject, zap.Stringer("part_id", objID), zap.Error(err)) - return nil - } - } else { - sow := NewSimpleObjectWriter() - obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow) - if err != nil { - a.log.Debug(logs.GetUnableToGetPartECObject, zap.Stringer("part_id", objID), zap.Error(err)) - return nil - } - obj.SetPayload(sow.pld) - } - parts[chunk.Index] = obj - return nil - }) +func (a *assemblerec) reconstructRawError(ctx context.Context) error { + chunks := make(map[string]objectSDK.ECChunk) + var chunksGuard sync.Mutex + for _, ch := range a.ecInfo.localChunks { + chunks[string(ch.ID.GetValue())] = ch } - if err := errGroup.Wait(); err != nil { + objID := a.addr.Object() + trav, _, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch) + if err != nil { + return err + } + + eg, ctx := errgroup.WithContext(ctx) + for { + batch := trav.Next() + if len(batch) == 0 { + break + } + for _, node := range batch { + var info client.NodeInfo + client.NodeInfoFromNetmapElement(&info, node) + eg.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if _, found := a.ecInfo.remoteChunks[string(info.PublicKey())]; found { + return nil + } + + nodeChunks := a.tryGetChunkListFromNode(ctx, info) + + chunksGuard.Lock() + defer chunksGuard.Unlock() + for _, ch := range nodeChunks { + chunks[string(ch.ID.GetValue())] = ch + } + return nil + }) + } + } + if err = eg.Wait(); err != nil { + return err + } + return createECInfoErr(chunks) +} + +func (a *assemblerec) retrieveParts(ctx context.Context, trav *placement.Traverser, cnr *container.Container) []*objectSDK.Object { + dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy()) + parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy()) + + remoteNodes := make([]placement.Node, 0) + for { + batch := trav.Next() + if len(batch) == 0 { + break + } + remoteNodes = append(remoteNodes, batch...) + } + + parts, err := a.processECNodesRequests(ctx, remoteNodes, dataCount, parityCount) + if err != nil { a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err)) } return parts } + +func (a *assemblerec) processECNodesRequests(ctx context.Context, nodes []placement.Node, dataCount, parityCount int) ([]*objectSDK.Object, error) { + foundChunks := make(map[uint32]*objectSDK.Object) + var foundChunksGuard sync.Mutex + eg, ctx := errgroup.WithContext(ctx) + eg.SetLimit(dataCount) + + for _, ch := range a.ecInfo.localChunks { + ch := ch + eg.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + object := a.tryGetChunkFromLocalStorage(ctx, ch) + if object == nil { + return nil + } + foundChunksGuard.Lock() + foundChunks[ch.Index] = object + count := len(foundChunks) + foundChunksGuard.Unlock() + if count >= dataCount { + return errECPartsRetrieveCompleted + } + return nil + }) + } + + for _, node := range nodes { + var info client.NodeInfo + client.NodeInfoFromNetmapElement(&info, node) + eg.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + chunks := a.tryGetChunkListFromNode(ctx, info) + for _, ch := range chunks { + object := a.tryGetChunkFromRemoteStorage(ctx, info, ch) + if object == nil { + continue + } + foundChunksGuard.Lock() + foundChunks[ch.Index] = object + count := len(foundChunks) + foundChunksGuard.Unlock() + if count >= dataCount { + return errECPartsRetrieveCompleted + } + } + return nil + }) + } + err := eg.Wait() + if err == nil || errors.Is(err, errECPartsRetrieveCompleted) { + parts := make([]*objectSDK.Object, dataCount+parityCount) + for idx, chunk := range foundChunks { + parts[idx] = chunk + } + return parts, nil + } + return nil, err +} + +func (a *assemblerec) tryGetChunkFromLocalStorage(ctx context.Context, ch objectSDK.ECChunk) *objectSDK.Object { + var objID oid.ID + err := objID.ReadFromV2(ch.ID) + if err != nil { + a.log.Error(logs.GetUnableToHeadPartECObject, zap.String("node", "local"), zap.Uint32("part_index", ch.Index), zap.Error(fmt.Errorf("invalid object ID: %w", err))) + return nil + } + var addr oid.Address + addr.SetContainer(addr.Container()) + addr.SetObject(objID) + var object *objectSDK.Object + if a.head { + object, err = a.localStorage.Head(ctx, addr, false) + if err != nil { + a.log.Warn(logs.GetUnableToHeadPartECObject, zap.String("node", "local"), zap.Stringer("part_id", objID), zap.Error(err)) + return nil + } + } else { + object, err = a.localStorage.Get(ctx, addr) + if err != nil { + a.log.Warn(logs.GetUnableToGetPartECObject, zap.String("node", "local"), zap.Stringer("part_id", objID), zap.Error(err)) + return nil + } + } + return object +} + +func (a *assemblerec) tryGetChunkListFromNode(ctx context.Context, node client.NodeInfo) []objectSDK.ECChunk { + if chunks, found := a.ecInfo.remoteChunks[string(node.PublicKey())]; found { + return chunks + } + var errECInfo *objectSDK.ECInfoError + _, err := a.remoteStorage.headObjectFromNode(ctx, a.addr, node, true) + if err == nil { + a.log.Error(logs.GetUnexpectedECObject, zap.String("node", hex.EncodeToString(node.PublicKey()))) + return nil + } + if !errors.As(err, &errECInfo) { + a.log.Warn(logs.GetUnableToHeadPartsECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err)) + return nil + } + result := make([]objectSDK.ECChunk, 0, len(errECInfo.ECInfo().Chunks)) + for _, ch := range errECInfo.ECInfo().Chunks { + result = append(result, objectSDK.ECChunk(ch)) + } + return result +} + +func (a *assemblerec) tryGetChunkFromRemoteStorage(ctx context.Context, node client.NodeInfo, ch objectSDK.ECChunk) *objectSDK.Object { + var objID oid.ID + err := objID.ReadFromV2(ch.ID) + if err != nil { + a.log.Error(logs.GetUnableToHeadPartECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Uint32("part_index", ch.Index), zap.Error(fmt.Errorf("invalid object ID: %w", err))) + return nil + } + var addr oid.Address + addr.SetContainer(a.addr.Container()) + addr.SetObject(objID) + var object *objectSDK.Object + if a.head { + object, err = a.remoteStorage.headObjectFromNode(ctx, addr, node, false) + if err != nil { + a.log.Warn(logs.GetUnableToHeadPartECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Stringer("part_id", objID), zap.Error(err)) + return nil + } + } else { + object, err = a.remoteStorage.getObjectFromNode(ctx, addr, node) + if err != nil { + a.log.Warn(logs.GetUnableToGetPartECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Stringer("part_id", objID), zap.Error(err)) + return nil + } + } + return object +} + +func createECInfoErr(chunks map[string]objectSDK.ECChunk) *objectSDK.ECInfoError { + info := objectSDK.NewECInfo() + for _, ch := range chunks { + info.AddChunk(ch) + } + return objectSDK.NewECInfoError(info) +} diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index 9738935d..5a57bc56 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -77,7 +77,7 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error { prm: prm, infoSplit: objectSDK.NewSplitInfo(), - infoEC: objectSDK.NewECInfo(), + infoEC: newECInfo(), log: s.log, } @@ -110,15 +110,8 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) { case statusOutOfRange: exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds) case statusEC: - if !exec.isLocal() { - if execCnr { - exec.executeOnContainer(ctx) - exec.analyzeStatus(ctx, false) - } else { - exec.log.Debug(logs.GetRequestedObjectIsEC) - exec.assembleEC(ctx) - } - } + exec.log.Debug(logs.GetRequestedObjectIsEC) + exec.assembleEC(ctx) default: exec.log.Debug(logs.OperationFinishedWithError, zap.Error(exec.err), diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index 988cd698..29a15ba7 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -11,6 +11,7 @@ import ( "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" @@ -78,7 +79,7 @@ func newTestStorage() *testStorage { } } -func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, error) { +func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, *containerCore.Container, error) { opts := make([]placement.Option, 0, 4) opts = append(opts, placement.ForContainer(g.c), @@ -90,7 +91,10 @@ func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e ui opts = append(opts, placement.ForObject(*obj)) } - return placement.NewTraverser(opts...) + t, err := placement.NewTraverser(opts...) + return t, &containerCore.Container{ + Value: g.c, + }, err } func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { @@ -474,6 +478,7 @@ func testNodeMatrix(t testing.TB, dim []int) ([][]netmap.NodeInfo, [][]string) { var ni netmap.NodeInfo ni.SetNetworkEndpoints(a) + ni.SetPublicKey([]byte(a)) var na network.AddressGroup diff --git a/pkg/services/object/get/local.go b/pkg/services/object/get/local.go index fcfc9bef..1cd5e549 100644 --- a/pkg/services/object/get/local.go +++ b/pkg/services/object/get/local.go @@ -5,7 +5,6 @@ import ( "errors" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -46,8 +45,7 @@ func (r *request) executeLocal(ctx context.Context) { r.err = objectSDK.NewSplitInfoError(r.infoSplit) case errors.As(err, &errECInfo): r.status = statusEC - util.MergeECInfo(errECInfo.ECInfo(), r.infoEC) - r.err = objectSDK.NewECInfoError(r.infoEC) + r.err = r.infoEC.addLocal(errECInfo.ECInfo()) case errors.As(err, &errOutOfRange): r.status = statusOutOfRange r.err = errOutOfRange diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go index 302a4a4b..4dee1524 100644 --- a/pkg/services/object/get/remote.go +++ b/pkg/services/object/get/remote.go @@ -7,11 +7,10 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) @@ -36,11 +35,6 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool { switch { default: r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err)) - if r.status == statusEC { - // we need to continue getting another chunks from another nodes - // in case of network issue - return false - } r.status = statusUndefined r.err = new(apistatus.ObjectNotFound) case err == nil: @@ -66,18 +60,7 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool { r.err = objectSDK.NewSplitInfoError(r.infoSplit) case errors.As(err, &errECInfo): r.status = statusEC - util.MergeECInfo(r.infoEC, errECInfo.ECInfo()) - r.infoEC = errECInfo.ECInfo() - r.err = objectSDK.NewECInfoError(r.infoEC) - if r.isRaw() { - return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total) - } - cnt, err := r.containerSource.Get(r.address().Container()) - if err == nil { - return len(r.infoEC.Chunks) == policy.ECDataCount(cnt.Value.PlacementPolicy()) - } - r.log.Debug(logs.GetUnableToGetECObjectContainer, zap.Error(err)) - return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total) + r.err = r.infoEC.addRemote(string(info.PublicKey()), errECInfo.ECInfo()) } return r.status != statusUndefined @@ -116,3 +99,50 @@ func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.N return rs.Get(ctx, r.address(), prm) } + +func (r *request) getObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error) { + rs, err := r.remoteStorageConstructor.Get(info) + if err != nil { + return nil, err + } + + key, err := r.key() + if err != nil { + return nil, err + } + + prm := RemoteRequestParams{ + Epoch: r.curProcEpoch, + TTL: 1, + PrivateKey: key, + SessionToken: r.prm.common.SessionToken(), + BearerToken: r.prm.common.BearerToken(), + XHeaders: r.prm.common.XHeaders(), + } + + return rs.Get(ctx, addr, prm) +} + +func (r *request) headObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo, raw bool) (*objectSDK.Object, error) { + rs, err := r.remoteStorageConstructor.Get(info) + if err != nil { + return nil, err + } + + key, err := r.key() + if err != nil { + return nil, err + } + + prm := RemoteRequestParams{ + Epoch: r.curProcEpoch, + TTL: 1, + PrivateKey: key, + SessionToken: r.prm.common.SessionToken(), + BearerToken: r.prm.common.BearerToken(), + XHeaders: r.prm.common.XHeaders(), + IsRaw: raw, + } + + return rs.Head(ctx, addr, prm) +} diff --git a/pkg/services/object/get/request.go b/pkg/services/object/get/request.go index d0b79e30..9ddfeddf 100644 --- a/pkg/services/object/get/request.go +++ b/pkg/services/object/get/request.go @@ -23,7 +23,7 @@ type request struct { infoSplit *objectSDK.SplitInfo - infoEC *objectSDK.ECInfo + infoEC *ecInfo log *logger.Logger @@ -141,7 +141,7 @@ func (r *request) initEpoch() bool { func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, bool) { obj := addr.Object() - t, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch) + t, _, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch) switch { default: diff --git a/pkg/services/object/get/types.go b/pkg/services/object/get/types.go index a866132c..9669afdb 100644 --- a/pkg/services/object/get/types.go +++ b/pkg/services/object/get/types.go @@ -6,6 +6,7 @@ import ( "errors" coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" @@ -23,7 +24,7 @@ type epochSource interface { } type traverserGenerator interface { - GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error) + GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error) } type keyStorage interface { @@ -236,3 +237,51 @@ type RangeHashRes struct { func (r *RangeHashRes) Hashes() [][]byte { return r.hashes } + +type ecInfo struct { + localChunks []objectSDK.ECChunk + remoteChunks map[string][]objectSDK.ECChunk // node pk -> chunk slice +} + +func newECInfo() *ecInfo { + return &ecInfo{ + localChunks: make([]objectSDK.ECChunk, 0), + remoteChunks: make(map[string][]objectSDK.ECChunk), + } +} + +func (e *ecInfo) addLocal(ecInfo *objectSDK.ECInfo) *objectSDK.ECInfoError { + for _, ch := range ecInfo.Chunks { + e.localChunks = append(e.localChunks, objectSDK.ECChunk(ch)) + } + return e.createECInfoErr() +} + +func (e *ecInfo) addRemote(nodePK string, ecInfo *objectSDK.ECInfo) *objectSDK.ECInfoError { + for _, ch := range ecInfo.Chunks { + e.remoteChunks[nodePK] = append(e.remoteChunks[nodePK], objectSDK.ECChunk(ch)) + } + return e.createECInfoErr() +} + +func (e *ecInfo) createECInfoErr() *objectSDK.ECInfoError { + unique := make(map[string]struct{}) + result := objectSDK.NewECInfo() + for _, ch := range e.localChunks { + if _, found := unique[string(ch.ID.GetValue())]; found { + continue + } + result.AddChunk(ch) + unique[string(ch.ID.GetValue())] = struct{}{} + } + for _, chunks := range e.remoteChunks { + for _, ch := range chunks { + if _, found := unique[string(ch.ID.GetValue())]; found { + continue + } + result.AddChunk(ch) + unique[string(ch.ID.GetValue())] = struct{}{} + } + } + return objectSDK.NewECInfoError(result) +} diff --git a/pkg/services/object/search/container.go b/pkg/services/object/search/container.go index a8865f5f..d7057415 100644 --- a/pkg/services/object/search/container.go +++ b/pkg/services/object/search/container.go @@ -47,7 +47,7 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) error { zap.Uint64("number", exec.curProcEpoch), ) - traverser, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch) + traverser, _, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch) if err != nil { return fmt.Errorf("%s: %w", logs.SearchCouldNotGenerateContainerTraverser, err) } diff --git a/pkg/services/object/search/search_test.go b/pkg/services/object/search/search_test.go index 802fa33e..67938040 100644 --- a/pkg/services/object/search/search_test.go +++ b/pkg/services/object/search/search_test.go @@ -11,6 +11,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" @@ -80,12 +81,13 @@ func newTestStorage() *testStorage { } } -func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, error) { - return placement.NewTraverser( +func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, *containerCore.Container, error) { + t, err := placement.NewTraverser( placement.ForContainer(g.c), placement.UseBuilder(g.b[epoch]), placement.WithoutSuccessTracking(), ) + return t, &containerCore.Container{Value: g.c}, err } func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { diff --git a/pkg/services/object/search/service.go b/pkg/services/object/search/service.go index 0a54f54c..cc388c1b 100644 --- a/pkg/services/object/search/service.go +++ b/pkg/services/object/search/service.go @@ -4,6 +4,7 @@ import ( "context" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" @@ -45,7 +46,7 @@ type cfg struct { } traverserGenerator interface { - GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error) + GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error) } currentEpochReceiver interface { diff --git a/pkg/services/object/util/placement.go b/pkg/services/object/util/placement.go index 6cd3856f..1bd39f9e 100644 --- a/pkg/services/object/util/placement.go +++ b/pkg/services/object/util/placement.go @@ -122,17 +122,17 @@ func (g *TraverserGenerator) WithTraverseOptions(opts ...placement.Option) *Trav // GenerateTraverser generates placement Traverser for provided object address // using epoch-th network map. -func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, error) { +func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, *container.Container, error) { // get network map by epoch nm, err := g.netMapSrc.GetNetMapByEpoch(epoch) if err != nil { - return nil, fmt.Errorf("could not get network map #%d: %w", epoch, err) + return nil, nil, fmt.Errorf("could not get network map #%d: %w", epoch, err) } // get container related container cnr, err := g.cnrSrc.Get(idCnr) if err != nil { - return nil, fmt.Errorf("could not get container: %w", err) + return nil, nil, fmt.Errorf("could not get container: %w", err) } // allocate placement traverser options @@ -160,5 +160,9 @@ func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoc ) } - return placement.NewTraverser(traverseOpts...) + t, err := placement.NewTraverser(traverseOpts...) + if err != nil { + return nil, nil, err + } + return t, cnr, nil }