From 4fff95a38668b58a3b388764c556b0c56b16ee28 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Wed, 10 Jul 2024 10:32:51 +0300 Subject: [PATCH] [#1237] getSvc: Process EC container concurrently Signed-off-by: Dmitrii Stepanov --- internal/logs/logs.go | 1 + pkg/services/object/get/container.go | 162 +++++++++++++++++++++++++++ pkg/services/object/get/get_test.go | 16 ++- 3 files changed, 178 insertions(+), 1 deletion(-) diff --git a/internal/logs/logs.go b/internal/logs/logs.go index 5a195f688..65e19ed58 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -123,6 +123,7 @@ const ( GetRequestedObjectIsVirtual = "requested object is virtual" GetRequestedObjectIsEC = "requested object is erasure-coded" GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds" + GetCouldNotGetContainer = "could not get container" PutAdditionalContainerBroadcastFailure = "additional container broadcast failure" SearchReturnResultDirectly = "return result directly" SearchCouldNotConstructRemoteNodeClient = "could not construct remote node client" diff --git a/pkg/services/object/get/container.go b/pkg/services/object/get/container.go index d22b14192..0ddfb5de0 100644 --- a/pkg/services/object/get/container.go +++ b/pkg/services/object/get/container.go @@ -2,10 +2,19 @@ package getsvc import ( "context" + "encoding/hex" + "errors" + "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) func (r *request) executeOnContainer(ctx context.Context) { @@ -53,11 +62,28 @@ func (r *request) processCurrentEpoch(ctx context.Context) bool { return true } + // In most cases, the value from the cache will be returned. + // Container appears in the cache when traverser is generated. + cnr, err := r.containerSource.Get(r.address().Container()) + if err != nil { + r.status = statusUndefined + r.err = err + r.log.Debug(logs.GetCouldNotGetContainer, zap.Error(err)) + return true + } + ctx, cancel := context.WithCancel(ctx) defer cancel() r.status = statusUndefined + if policy.IsECPlacement(cnr.Value.PlacementPolicy()) { + return r.processECNodes(ctx, traverser, policy.ECDataCount(cnr.Value.PlacementPolicy())) + } + return r.processRepNodes(ctx, traverser) +} + +func (r *request) processRepNodes(ctx context.Context, traverser *placement.Traverser) bool { for { addrs := traverser.Next() if len(addrs) == 0 { @@ -91,3 +117,139 @@ func (r *request) processCurrentEpoch(ctx context.Context) bool { } } } + +func (r *request) processECNodes(ctx context.Context, traverser *placement.Traverser, dataCount int) bool { + err := r.traverseECNodes(ctx, traverser, dataCount) + + var errSplitInfo *objectSDK.SplitInfoError + var errECInfo *objectSDK.ECInfoError + var errRemoved *apistatus.ObjectAlreadyRemoved + var errOutOfRange *apistatus.ObjectOutOfRange + var errSuccess *ecGetSuccessErr + + switch { + case err == nil: // nil is returned if all nodes failed or incomplete EC info received + if len(r.infoEC.Chunks) > 0 { + r.status = statusEC + r.err = objectSDK.NewECInfoError(r.infoEC) + } else { + r.status = statusUndefined + r.err = new(apistatus.ObjectNotFound) + } + case errors.As(err, &errRemoved): + r.status = statusINHUMED + r.err = errRemoved + case errors.As(err, &errOutOfRange): + r.status = statusOutOfRange + r.err = errOutOfRange + case errors.As(err, &errSplitInfo): + r.status = statusVIRTUAL + mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo()) + r.err = objectSDK.NewSplitInfoError(r.infoSplit) + case errors.As(err, &errECInfo): + r.status = statusEC + r.err = err + case errors.As(err, &errSuccess): + r.status = statusOK + r.err = nil + if errSuccess.Object != nil { + r.collectedObject = errSuccess.Object + r.writeCollectedObject(ctx) + } + } + return r.status != statusUndefined +} + +func (r *request) traverseECNodes(ctx context.Context, traverser *placement.Traverser, dataCount int) error { + nodes := make(chan placement.Node, dataCount) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + for { + batch := traverser.Next() + if len(batch) == 0 { + r.log.Debug(logs.NoMoreNodesAbortPlacementIteration) + close(nodes) + return + } + for _, node := range batch { + select { + case <-ctx.Done(): + return + case nodes <- node: + } + } + } + }() + + err := r.processECNodesRequests(ctx, nodes, dataCount) + cancel() + wg.Wait() + return err +} + +func (r *request) processECNodesRequests(ctx context.Context, nodes <-chan placement.Node, dataCount int) error { + var ecInfoGuard sync.Mutex + eg, ctx := errgroup.WithContext(ctx) + eg.SetLimit(dataCount) + for node := range nodes { + var info client.NodeInfo + client.NodeInfoFromNetmapElement(&info, node) + eg.Go(func() error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + rs, err := r.remoteStorageConstructor.Get(info) + if err != nil { + r.log.Debug(logs.GetCouldNotConstructRemoteNodeClient, zap.String("node_key", hex.EncodeToString(info.PublicKey()))) + return err + } + obj, err := r.getRemote(ctx, rs, info) + var errSplitInfo *objectSDK.SplitInfoError + var errECInfo *objectSDK.ECInfoError + var errRemoved *apistatus.ObjectAlreadyRemoved + var errOutOfRange *apistatus.ObjectOutOfRange + + switch { + default: + // something failed, continue + r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err)) + return nil + case err == nil: + // non EC object found (tombstone, linking, lock), stop + return &ecGetSuccessErr{Object: obj} + case errors.As(err, &errRemoved) || errors.As(err, &errOutOfRange) || errors.As(err, &errSplitInfo): + // non EC error found, stop + return err + case errors.As(err, &errECInfo): + ecInfoGuard.Lock() + defer ecInfoGuard.Unlock() + r.infoEC = util.MergeECInfo(errECInfo.ECInfo(), r.infoEC) + if r.isRaw() { + if len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total) { + return objectSDK.NewECInfoError(r.infoEC) + } + return nil + } + if len(r.infoEC.Chunks) >= dataCount { + return objectSDK.NewECInfoError(r.infoEC) + } + return nil + } + }) + } + + return eg.Wait() +} + +type ecGetSuccessErr struct { + Object *objectSDK.Object +} + +func (s *ecGetSuccessErr) Error() string { return "" } diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index 988cd6982..8de206ec0 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -11,6 +11,7 @@ import ( "testing" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" + containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" @@ -273,6 +274,16 @@ func (ks *testKeyStorage) GetKey(_ *util.SessionInfo) (*ecdsa.PrivateKey, error) return &ecdsa.PrivateKey{}, nil } +type testContainerSource struct{} + +func (s *testContainerSource) Get(idCnr cid.ID) (*containerCore.Container, error) { + return &containerCore.Container{ + Value: container.Container{}, + }, nil +} + +func (s *testContainerSource) DeletionInfo(cid.ID) (*containerCore.DelInfo, error) { return nil, nil } + func TestGetLocalOnly(t *testing.T) { ctx := context.Background() @@ -551,6 +562,7 @@ func TestGetRemoteSmall(t *testing.T) { epochSource: testEpochReceiver(curEpoch), remoteStorageConstructor: c, keyStore: &testKeyStorage{}, + containerSource: &testContainerSource{}, } } @@ -1722,6 +1734,7 @@ func TestGetRange(t *testing.T) { epochSource: testEpochReceiver(curEpoch), remoteStorageConstructor: c, keyStore: &testKeyStorage{}, + containerSource: &testContainerSource{}, } } @@ -1879,7 +1892,8 @@ func TestGetFromPastEpoch(t *testing.T) { as[1][1]: c22, }, }, - keyStore: &testKeyStorage{}, + keyStore: &testKeyStorage{}, + containerSource: &testContainerSource{}, } w := NewSimpleObjectWriter()