From fc383ea6ae3bfa98ade02a53bb591f5ad0cb4780 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 15 Jul 2024 18:03:43 +0300 Subject: [PATCH 1/2] [#1253] getSvc: Fix EC objects get Now EC objects assembling is performed concurrently. Also fixed issue with an error in case of getting EC object via non-container node. Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 2 + pkg/services/object/get/assemble.go | 2 +- pkg/services/object/get/assembleec.go | 19 +- pkg/services/object/get/assemblerec.go | 358 +++++++++++++++++----- pkg/services/object/get/get.go | 13 +- pkg/services/object/get/get_test.go | 9 +- pkg/services/object/get/local.go | 4 +- pkg/services/object/get/remote.go | 68 ++-- pkg/services/object/get/request.go | 4 +- pkg/services/object/get/types.go | 51 ++- pkg/services/object/search/container.go | 2 +- pkg/services/object/search/search_test.go | 6 +- pkg/services/object/search/service.go | 3 +- pkg/services/object/util/placement.go | 12 +- 14 files changed, 429 insertions(+), 124 deletions(-) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 5a195f688..67f173f29 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -108,6 +108,7 @@ const ( GetUnableToGetPartECObject = "unable to get part of the erasure-encoded object" GetUnableToHeadPartECObject = "unable to head part of the erasure-encoded object" GetUnableToGetECObjectContainer = "unable to get container for erasure-coded object" + GetUnableToHeadPartsECObject = "unable to head parts of the erasure-encoded object" GetAssemblingSplittedObjectCompleted = "assembling splitted object completed" GetAssemblingECObjectCompleted = "assembling erasure-coded object completed" GetFailedToAssembleSplittedObject = "failed to assemble splitted object" @@ -123,6 +124,7 @@ const ( GetRequestedObjectIsVirtual = "requested object is virtual" GetRequestedObjectIsEC = "requested object is erasure-coded" GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds" + GetUnexpectedECObject = "failed to get EC object from node: expected EC info, but got full object" PutAdditionalContainerBroadcastFailure = "additional container broadcast failure" SearchReturnResultDirectly = "return result directly" SearchCouldNotConstructRemoteNodeClient = "could not construct remote node client" diff --git a/pkg/services/object/get/assemble.go b/pkg/services/object/get/assemble.go index 66c4580b0..ba6fddec5 100644 --- a/pkg/services/object/get/assemble.go +++ b/pkg/services/object/get/assemble.go @@ -140,7 +140,7 @@ func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm Reque prm: prm, infoSplit: objectSDK.NewSplitInfo(), - infoEC: objectSDK.NewECInfo(), + infoEC: newECInfo(), log: r.log, } diff --git a/pkg/services/object/get/assembleec.go b/pkg/services/object/get/assembleec.go index 5c999929a..7bbd9ca1e 100644 --- a/pkg/services/object/get/assembleec.go +++ b/pkg/services/object/get/assembleec.go @@ -6,11 +6,12 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "go.uber.org/zap" ) func (r *request) assembleEC(ctx context.Context) { - if r.isRaw() { + if r.isRaw() && r.isLocal() { r.log.Debug(logs.GetCanNotAssembleTheObject) return } @@ -35,8 +36,14 @@ func (r *request) assembleEC(ctx context.Context) { r.log.Debug(logs.GetTryingToAssembleTheECObject) + // initialize epoch number + ok := r.initEpoch() + if !ok { + return + } + r.prm.common = r.prm.common.WithLocalOnly(false) - assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log) + assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.localStorage, r.containerSource, r.log, r.headOnly(), r.isRaw(), r.traverserGenerator, r.curProcEpoch) r.log.Debug(logs.GetAssemblingECObject, zap.Uint64("range_offset", r.ctxRange().GetOffset()), @@ -47,8 +54,8 @@ func (r *request) assembleEC(ctx context.Context) { zap.Uint64("range_length", r.ctxRange().GetLength()), ) - obj, err := assembler.Assemble(ctx, r.prm.objWriter, r.headOnly()) - if err != nil { + obj, err := assembler.Assemble(ctx, r.prm.objWriter) + if err != nil && !errors.As(err, new(*objectSDK.ECInfoError)) { r.log.Warn(logs.GetFailedToAssembleECObject, zap.Error(err), zap.Uint64("range_offset", r.ctxRange().GetOffset()), @@ -58,6 +65,7 @@ func (r *request) assembleEC(ctx context.Context) { var errRemoved *apistatus.ObjectAlreadyRemoved var errOutOfRange *apistatus.ObjectOutOfRange + var errECInfo *objectSDK.ECInfoError switch { default: @@ -73,5 +81,8 @@ func (r *request) assembleEC(ctx context.Context) { case errors.As(err, &errOutOfRange): r.status = statusOutOfRange r.err = errOutOfRange + case errors.As(err, &errECInfo): + r.status = statusEC + r.err = err } } diff --git a/pkg/services/object/get/assemblerec.go b/pkg/services/object/get/assemblerec.go index 4a624e467..a4021ee5e 100644 --- a/pkg/services/object/get/assemblerec.go +++ b/pkg/services/object/get/assemblerec.go @@ -2,11 +2,16 @@ package getsvc import ( "context" + "encoding/hex" + "errors" "fmt" + "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -16,61 +21,79 @@ import ( "golang.org/x/sync/errgroup" ) +var errECPartsRetrieveCompleted = errors.New("EC parts receive completed") + +type ecRemoteStorage interface { + getObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error) + headObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo, raw bool) (*objectSDK.Object, error) +} + type assemblerec struct { - addr oid.Address - ecInfo *objectSDK.ECInfo - rng *objectSDK.Range - objGetter objectGetter - cs container.Source - log *logger.Logger + addr oid.Address + ecInfo *ecInfo + rng *objectSDK.Range + remoteStorage ecRemoteStorage + localStorage localStorage + cs container.Source + log *logger.Logger + head bool + raw bool + traverserGenerator traverserGenerator + epoch uint64 } func newAssemblerEC( addr oid.Address, - ecInfo *objectSDK.ECInfo, + ecInfo *ecInfo, rng *objectSDK.Range, - objGetter objectGetter, + remoteStorage ecRemoteStorage, + localStorage localStorage, cs container.Source, log *logger.Logger, + head bool, + raw bool, + tg traverserGenerator, + epoch uint64, ) *assemblerec { return &assemblerec{ - addr: addr, - rng: rng, - ecInfo: ecInfo, - objGetter: objGetter, - cs: cs, - log: log, + addr: addr, + rng: rng, + ecInfo: ecInfo, + remoteStorage: remoteStorage, + localStorage: localStorage, + cs: cs, + log: log, + head: head, + raw: raw, + traverserGenerator: tg, + epoch: epoch, } } // Assemble assembles erasure-coded object and writes it's content to ObjectWriter. // It returns parent object. -func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) { - if headOnly { +func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { + switch { + case a.raw: + err := a.reconstructRawError(ctx) + return nil, err + case a.head: return a.reconstructHeader(ctx, writer) - } else if a.rng != nil { + case a.rng != nil: return a.reconstructRange(ctx, writer) + default: + return a.reconstructObject(ctx, writer) } - return a.reconstructObject(ctx, writer) } -func (a *assemblerec) getConstructor() (*erasurecode.Constructor, error) { - cnt, err := a.cs.Get(a.addr.Container()) - if err != nil { - return nil, err - } - dataCount := policy.ECDataCount(cnt.Value.PlacementPolicy()) - parityCount := policy.ECParityCount(cnt.Value.PlacementPolicy()) +func (a *assemblerec) getConstructor(cnr *container.Container) (*erasurecode.Constructor, error) { + dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy()) + parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy()) return erasurecode.NewConstructor(dataCount, parityCount) } func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { - parts := a.retrieveParts(ctx, true) - c, err := a.getConstructor() - if err != nil { - return nil, err - } - obj, err := c.ReconstructHeader(parts) + obj, err := a.reconstructObjectFromParts(ctx, true) if err == nil { return obj, writer.WriteHeader(ctx, obj) } @@ -78,12 +101,7 @@ func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter } func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { - parts := a.retrieveParts(ctx, false) - c, err := a.getConstructor() - if err != nil { - return nil, err - } - obj, err := c.Reconstruct(parts) + obj, err := a.reconstructObjectFromParts(ctx, false) if err != nil { return nil, err } @@ -101,12 +119,7 @@ func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) } func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { - parts := a.retrieveParts(ctx, false) - c, err := a.getConstructor() - if err != nil { - return nil, err - } - obj, err := c.Reconstruct(parts) + obj, err := a.reconstructObjectFromParts(ctx, false) if err == nil { err = writer.WriteHeader(ctx, obj.CutPayload()) if err == nil { @@ -119,41 +132,238 @@ func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter return obj, err } -func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object { - parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total)) - errGroup, ctx := errgroup.WithContext(mainCtx) +func (a *assemblerec) reconstructObjectFromParts(ctx context.Context, headers bool) (*objectSDK.Object, error) { + objID := a.addr.Object() + trav, cnr, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch) + if err != nil { + return nil, err + } + c, err := a.getConstructor(cnr) + if err != nil { + return nil, err + } + parts := a.retrieveParts(ctx, trav, cnr) + if headers { + return c.ReconstructHeader(parts) + } + return c.Reconstruct(parts) +} - for i := range a.ecInfo.Chunks { - chunk := a.ecInfo.Chunks[i] - errGroup.Go(func() error { - objID := new(oid.ID) - err := objID.ReadFromV2(chunk.ID) - if err != nil { - return fmt.Errorf("invalid object ID: %w", err) - } - var obj *objectSDK.Object - if headOnly { - obj, err = a.objGetter.HeadObject(ctx, *objID) - if err != nil { - a.log.Debug(logs.GetUnableToHeadPartECObject, zap.Stringer("part_id", objID), zap.Error(err)) - return nil - } - } else { - sow := NewSimpleObjectWriter() - obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow) - if err != nil { - a.log.Debug(logs.GetUnableToGetPartECObject, zap.Stringer("part_id", objID), zap.Error(err)) - return nil - } - obj.SetPayload(sow.pld) - } - parts[chunk.Index] = obj - return nil - }) +func (a *assemblerec) reconstructRawError(ctx context.Context) error { + chunks := make(map[string]objectSDK.ECChunk) + var chunksGuard sync.Mutex + for _, ch := range a.ecInfo.localChunks { + chunks[string(ch.ID.GetValue())] = ch } - if err := errGroup.Wait(); err != nil { + objID := a.addr.Object() + trav, _, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch) + if err != nil { + return err + } + + eg, ctx := errgroup.WithContext(ctx) + for { + batch := trav.Next() + if len(batch) == 0 { + break + } + for _, node := range batch { + var info client.NodeInfo + client.NodeInfoFromNetmapElement(&info, node) + eg.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if _, found := a.ecInfo.remoteChunks[string(info.PublicKey())]; found { + return nil + } + + nodeChunks := a.tryGetChunkListFromNode(ctx, info) + + chunksGuard.Lock() + defer chunksGuard.Unlock() + for _, ch := range nodeChunks { + chunks[string(ch.ID.GetValue())] = ch + } + return nil + }) + } + } + if err = eg.Wait(); err != nil { + return err + } + return createECInfoErr(chunks) +} + +func (a *assemblerec) retrieveParts(ctx context.Context, trav *placement.Traverser, cnr *container.Container) []*objectSDK.Object { + dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy()) + parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy()) + + remoteNodes := make([]placement.Node, 0) + for { + batch := trav.Next() + if len(batch) == 0 { + break + } + remoteNodes = append(remoteNodes, batch...) + } + + parts, err := a.processECNodesRequests(ctx, remoteNodes, dataCount, parityCount) + if err != nil { a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err)) } return parts } + +func (a *assemblerec) processECNodesRequests(ctx context.Context, nodes []placement.Node, dataCount, parityCount int) ([]*objectSDK.Object, error) { + foundChunks := make(map[uint32]*objectSDK.Object) + var foundChunksGuard sync.Mutex + eg, ctx := errgroup.WithContext(ctx) + eg.SetLimit(dataCount) + + for _, ch := range a.ecInfo.localChunks { + ch := ch + eg.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + object := a.tryGetChunkFromLocalStorage(ctx, ch) + if object == nil { + return nil + } + foundChunksGuard.Lock() + foundChunks[ch.Index] = object + count := len(foundChunks) + foundChunksGuard.Unlock() + if count >= dataCount { + return errECPartsRetrieveCompleted + } + return nil + }) + } + + for _, node := range nodes { + var info client.NodeInfo + client.NodeInfoFromNetmapElement(&info, node) + eg.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + chunks := a.tryGetChunkListFromNode(ctx, info) + for _, ch := range chunks { + object := a.tryGetChunkFromRemoteStorage(ctx, info, ch) + if object == nil { + continue + } + foundChunksGuard.Lock() + foundChunks[ch.Index] = object + count := len(foundChunks) + foundChunksGuard.Unlock() + if count >= dataCount { + return errECPartsRetrieveCompleted + } + } + return nil + }) + } + err := eg.Wait() + if err == nil || errors.Is(err, errECPartsRetrieveCompleted) { + parts := make([]*objectSDK.Object, dataCount+parityCount) + for idx, chunk := range foundChunks { + parts[idx] = chunk + } + return parts, nil + } + return nil, err +} + +func (a *assemblerec) tryGetChunkFromLocalStorage(ctx context.Context, ch objectSDK.ECChunk) *objectSDK.Object { + var objID oid.ID + err := objID.ReadFromV2(ch.ID) + if err != nil { + a.log.Error(logs.GetUnableToHeadPartECObject, zap.String("node", "local"), zap.Uint32("part_index", ch.Index), zap.Error(fmt.Errorf("invalid object ID: %w", err))) + return nil + } + var addr oid.Address + addr.SetContainer(addr.Container()) + addr.SetObject(objID) + var object *objectSDK.Object + if a.head { + object, err = a.localStorage.Head(ctx, addr, false) + if err != nil { + a.log.Warn(logs.GetUnableToHeadPartECObject, zap.String("node", "local"), zap.Stringer("part_id", objID), zap.Error(err)) + return nil + } + } else { + object, err = a.localStorage.Get(ctx, addr) + if err != nil { + a.log.Warn(logs.GetUnableToGetPartECObject, zap.String("node", "local"), zap.Stringer("part_id", objID), zap.Error(err)) + return nil + } + } + return object +} + +func (a *assemblerec) tryGetChunkListFromNode(ctx context.Context, node client.NodeInfo) []objectSDK.ECChunk { + if chunks, found := a.ecInfo.remoteChunks[string(node.PublicKey())]; found { + return chunks + } + var errECInfo *objectSDK.ECInfoError + _, err := a.remoteStorage.headObjectFromNode(ctx, a.addr, node, true) + if err == nil { + a.log.Error(logs.GetUnexpectedECObject, zap.String("node", hex.EncodeToString(node.PublicKey()))) + return nil + } + if !errors.As(err, &errECInfo) { + a.log.Warn(logs.GetUnableToHeadPartsECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err)) + return nil + } + result := make([]objectSDK.ECChunk, 0, len(errECInfo.ECInfo().Chunks)) + for _, ch := range errECInfo.ECInfo().Chunks { + result = append(result, objectSDK.ECChunk(ch)) + } + return result +} + +func (a *assemblerec) tryGetChunkFromRemoteStorage(ctx context.Context, node client.NodeInfo, ch objectSDK.ECChunk) *objectSDK.Object { + var objID oid.ID + err := objID.ReadFromV2(ch.ID) + if err != nil { + a.log.Error(logs.GetUnableToHeadPartECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Uint32("part_index", ch.Index), zap.Error(fmt.Errorf("invalid object ID: %w", err))) + return nil + } + var addr oid.Address + addr.SetContainer(a.addr.Container()) + addr.SetObject(objID) + var object *objectSDK.Object + if a.head { + object, err = a.remoteStorage.headObjectFromNode(ctx, addr, node, false) + if err != nil { + a.log.Warn(logs.GetUnableToHeadPartECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Stringer("part_id", objID), zap.Error(err)) + return nil + } + } else { + object, err = a.remoteStorage.getObjectFromNode(ctx, addr, node) + if err != nil { + a.log.Warn(logs.GetUnableToGetPartECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Stringer("part_id", objID), zap.Error(err)) + return nil + } + } + return object +} + +func createECInfoErr(chunks map[string]objectSDK.ECChunk) *objectSDK.ECInfoError { + info := objectSDK.NewECInfo() + for _, ch := range chunks { + info.AddChunk(ch) + } + return objectSDK.NewECInfoError(info) +} diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index 9738935d2..5a57bc56e 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -77,7 +77,7 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error { prm: prm, infoSplit: objectSDK.NewSplitInfo(), - infoEC: objectSDK.NewECInfo(), + infoEC: newECInfo(), log: s.log, } @@ -110,15 +110,8 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) { case statusOutOfRange: exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds) case statusEC: - if !exec.isLocal() { - if execCnr { - exec.executeOnContainer(ctx) - exec.analyzeStatus(ctx, false) - } else { - exec.log.Debug(logs.GetRequestedObjectIsEC) - exec.assembleEC(ctx) - } - } + exec.log.Debug(logs.GetRequestedObjectIsEC) + exec.assembleEC(ctx) default: exec.log.Debug(logs.OperationFinishedWithError, zap.Error(exec.err), diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index 988cd6982..29a15ba78 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -11,6 +11,7 @@ import ( "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" @@ -78,7 +79,7 @@ func newTestStorage() *testStorage { } } -func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, error) { +func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, *containerCore.Container, error) { opts := make([]placement.Option, 0, 4) opts = append(opts, placement.ForContainer(g.c), @@ -90,7 +91,10 @@ func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e ui opts = append(opts, placement.ForObject(*obj)) } - return placement.NewTraverser(opts...) + t, err := placement.NewTraverser(opts...) + return t, &containerCore.Container{ + Value: g.c, + }, err } func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { @@ -474,6 +478,7 @@ func testNodeMatrix(t testing.TB, dim []int) ([][]netmap.NodeInfo, [][]string) { var ni netmap.NodeInfo ni.SetNetworkEndpoints(a) + ni.SetPublicKey([]byte(a)) var na network.AddressGroup diff --git a/pkg/services/object/get/local.go b/pkg/services/object/get/local.go index fcfc9befc..1cd5e549c 100644 --- a/pkg/services/object/get/local.go +++ b/pkg/services/object/get/local.go @@ -5,7 +5,6 @@ import ( "errors" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -46,8 +45,7 @@ func (r *request) executeLocal(ctx context.Context) { r.err = objectSDK.NewSplitInfoError(r.infoSplit) case errors.As(err, &errECInfo): r.status = statusEC - util.MergeECInfo(errECInfo.ECInfo(), r.infoEC) - r.err = objectSDK.NewECInfoError(r.infoEC) + r.err = r.infoEC.addLocal(errECInfo.ECInfo()) case errors.As(err, &errOutOfRange): r.status = statusOutOfRange r.err = errOutOfRange diff --git a/pkg/services/object/get/remote.go b/pkg/services/object/get/remote.go index 302a4a4bc..4dee15242 100644 --- a/pkg/services/object/get/remote.go +++ b/pkg/services/object/get/remote.go @@ -7,11 +7,10 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) @@ -36,11 +35,6 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool { switch { default: r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err)) - if r.status == statusEC { - // we need to continue getting another chunks from another nodes - // in case of network issue - return false - } r.status = statusUndefined r.err = new(apistatus.ObjectNotFound) case err == nil: @@ -66,18 +60,7 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool { r.err = objectSDK.NewSplitInfoError(r.infoSplit) case errors.As(err, &errECInfo): r.status = statusEC - util.MergeECInfo(r.infoEC, errECInfo.ECInfo()) - r.infoEC = errECInfo.ECInfo() - r.err = objectSDK.NewECInfoError(r.infoEC) - if r.isRaw() { - return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total) - } - cnt, err := r.containerSource.Get(r.address().Container()) - if err == nil { - return len(r.infoEC.Chunks) == policy.ECDataCount(cnt.Value.PlacementPolicy()) - } - r.log.Debug(logs.GetUnableToGetECObjectContainer, zap.Error(err)) - return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total) + r.err = r.infoEC.addRemote(string(info.PublicKey()), errECInfo.ECInfo()) } return r.status != statusUndefined @@ -116,3 +99,50 @@ func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.N return rs.Get(ctx, r.address(), prm) } + +func (r *request) getObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error) { + rs, err := r.remoteStorageConstructor.Get(info) + if err != nil { + return nil, err + } + + key, err := r.key() + if err != nil { + return nil, err + } + + prm := RemoteRequestParams{ + Epoch: r.curProcEpoch, + TTL: 1, + PrivateKey: key, + SessionToken: r.prm.common.SessionToken(), + BearerToken: r.prm.common.BearerToken(), + XHeaders: r.prm.common.XHeaders(), + } + + return rs.Get(ctx, addr, prm) +} + +func (r *request) headObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo, raw bool) (*objectSDK.Object, error) { + rs, err := r.remoteStorageConstructor.Get(info) + if err != nil { + return nil, err + } + + key, err := r.key() + if err != nil { + return nil, err + } + + prm := RemoteRequestParams{ + Epoch: r.curProcEpoch, + TTL: 1, + PrivateKey: key, + SessionToken: r.prm.common.SessionToken(), + BearerToken: r.prm.common.BearerToken(), + XHeaders: r.prm.common.XHeaders(), + IsRaw: raw, + } + + return rs.Head(ctx, addr, prm) +} diff --git a/pkg/services/object/get/request.go b/pkg/services/object/get/request.go index d0b79e30c..9ddfeddf2 100644 --- a/pkg/services/object/get/request.go +++ b/pkg/services/object/get/request.go @@ -23,7 +23,7 @@ type request struct { infoSplit *objectSDK.SplitInfo - infoEC *objectSDK.ECInfo + infoEC *ecInfo log *logger.Logger @@ -141,7 +141,7 @@ func (r *request) initEpoch() bool { func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, bool) { obj := addr.Object() - t, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch) + t, _, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch) switch { default: diff --git a/pkg/services/object/get/types.go b/pkg/services/object/get/types.go index a866132cc..9669afdba 100644 --- a/pkg/services/object/get/types.go +++ b/pkg/services/object/get/types.go @@ -6,6 +6,7 @@ import ( "errors" coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" @@ -23,7 +24,7 @@ type epochSource interface { } type traverserGenerator interface { - GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error) + GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error) } type keyStorage interface { @@ -236,3 +237,51 @@ type RangeHashRes struct { func (r *RangeHashRes) Hashes() [][]byte { return r.hashes } + +type ecInfo struct { + localChunks []objectSDK.ECChunk + remoteChunks map[string][]objectSDK.ECChunk // node pk -> chunk slice +} + +func newECInfo() *ecInfo { + return &ecInfo{ + localChunks: make([]objectSDK.ECChunk, 0), + remoteChunks: make(map[string][]objectSDK.ECChunk), + } +} + +func (e *ecInfo) addLocal(ecInfo *objectSDK.ECInfo) *objectSDK.ECInfoError { + for _, ch := range ecInfo.Chunks { + e.localChunks = append(e.localChunks, objectSDK.ECChunk(ch)) + } + return e.createECInfoErr() +} + +func (e *ecInfo) addRemote(nodePK string, ecInfo *objectSDK.ECInfo) *objectSDK.ECInfoError { + for _, ch := range ecInfo.Chunks { + e.remoteChunks[nodePK] = append(e.remoteChunks[nodePK], objectSDK.ECChunk(ch)) + } + return e.createECInfoErr() +} + +func (e *ecInfo) createECInfoErr() *objectSDK.ECInfoError { + unique := make(map[string]struct{}) + result := objectSDK.NewECInfo() + for _, ch := range e.localChunks { + if _, found := unique[string(ch.ID.GetValue())]; found { + continue + } + result.AddChunk(ch) + unique[string(ch.ID.GetValue())] = struct{}{} + } + for _, chunks := range e.remoteChunks { + for _, ch := range chunks { + if _, found := unique[string(ch.ID.GetValue())]; found { + continue + } + result.AddChunk(ch) + unique[string(ch.ID.GetValue())] = struct{}{} + } + } + return objectSDK.NewECInfoError(result) +} diff --git a/pkg/services/object/search/container.go b/pkg/services/object/search/container.go index a8865f5f0..d70574156 100644 --- a/pkg/services/object/search/container.go +++ b/pkg/services/object/search/container.go @@ -47,7 +47,7 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) error { zap.Uint64("number", exec.curProcEpoch), ) - traverser, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch) + traverser, _, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch) if err != nil { return fmt.Errorf("%s: %w", logs.SearchCouldNotGenerateContainerTraverser, err) } diff --git a/pkg/services/object/search/search_test.go b/pkg/services/object/search/search_test.go index 802fa33ef..679380402 100644 --- a/pkg/services/object/search/search_test.go +++ b/pkg/services/object/search/search_test.go @@ -11,6 +11,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" @@ -80,12 +81,13 @@ func newTestStorage() *testStorage { } } -func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, error) { - return placement.NewTraverser( +func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, *containerCore.Container, error) { + t, err := placement.NewTraverser( placement.ForContainer(g.c), placement.UseBuilder(g.b[epoch]), placement.WithoutSuccessTracking(), ) + return t, &containerCore.Container{Value: g.c}, err } func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { diff --git a/pkg/services/object/search/service.go b/pkg/services/object/search/service.go index 0a54f54c5..cc388c1b2 100644 --- a/pkg/services/object/search/service.go +++ b/pkg/services/object/search/service.go @@ -4,6 +4,7 @@ import ( "context" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" @@ -45,7 +46,7 @@ type cfg struct { } traverserGenerator interface { - GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error) + GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error) } currentEpochReceiver interface { diff --git a/pkg/services/object/util/placement.go b/pkg/services/object/util/placement.go index 6cd3856f4..1bd39f9ea 100644 --- a/pkg/services/object/util/placement.go +++ b/pkg/services/object/util/placement.go @@ -122,17 +122,17 @@ func (g *TraverserGenerator) WithTraverseOptions(opts ...placement.Option) *Trav // GenerateTraverser generates placement Traverser for provided object address // using epoch-th network map. -func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, error) { +func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, *container.Container, error) { // get network map by epoch nm, err := g.netMapSrc.GetNetMapByEpoch(epoch) if err != nil { - return nil, fmt.Errorf("could not get network map #%d: %w", epoch, err) + return nil, nil, fmt.Errorf("could not get network map #%d: %w", epoch, err) } // get container related container cnr, err := g.cnrSrc.Get(idCnr) if err != nil { - return nil, fmt.Errorf("could not get container: %w", err) + return nil, nil, fmt.Errorf("could not get container: %w", err) } // allocate placement traverser options @@ -160,5 +160,9 @@ func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoc ) } - return placement.NewTraverser(traverseOpts...) + t, err := placement.NewTraverser(traverseOpts...) + if err != nil { + return nil, nil, err + } + return t, cnr, nil } -- 2.45.3 From e83d39e33f901a2158f82434fb8b98bd0b0d9b2e Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 15 Jul 2024 23:04:41 +0300 Subject: [PATCH 2/2] [#1253] deleteSvc: Use copy of common parameters getSvc may change the values of some fields, so Head will affect Delete or Put. In this case, the change is necessary so that the session token is stored in the tombstone object (EC assemble calls `ForgetTokens`). Signed-off-by: Dmitrii Stepanov --- pkg/services/object/delete/util.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/services/object/delete/util.go b/pkg/services/object/delete/util.go index bb2b5f00b..a78fd7747 100644 --- a/pkg/services/object/delete/util.go +++ b/pkg/services/object/delete/util.go @@ -30,7 +30,12 @@ func (w *headSvcWrapper) headAddress(ctx context.Context, exec *execCtx, addr oi wr := getsvc.NewSimpleObjectWriter() p := getsvc.HeadPrm{} - p.SetCommonParameters(exec.commonParameters()) + + if cp := exec.commonParameters(); cp != nil { + commonParameters := *cp + p.SetCommonParameters(&commonParameters) + } + p.SetHeaderWriter(wr) p.WithRawFlag(true) p.WithAddress(addr) -- 2.45.3