WIP: Process EC container for Get/GetRange/Head concurrently #1237

Closed
dstepanov-yadro wants to merge 6 commits from dstepanov-yadro/frostfs-node:fix/ec_get_failover into master
3 changed files with 178 additions and 1 deletions
Showing only changes of commit 4fff95a386 - Show all commits

View file

@ -123,6 +123,7 @@ const (
GetRequestedObjectIsVirtual = "requested object is virtual"
GetRequestedObjectIsEC = "requested object is erasure-coded"
GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds"
GetCouldNotGetContainer = "could not get container"
PutAdditionalContainerBroadcastFailure = "additional container broadcast failure"
SearchReturnResultDirectly = "return result directly"
SearchCouldNotConstructRemoteNodeClient = "could not construct remote node client"

View file

@ -2,10 +2,19 @@ package getsvc
import (
"context"
"encoding/hex"
"errors"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
func (r *request) executeOnContainer(ctx context.Context) {
@ -53,11 +62,28 @@ func (r *request) processCurrentEpoch(ctx context.Context) bool {
return true
}
// In most cases, the value from the cache will be returned.
// Container appears in the cache when traverser is generated.
cnr, err := r.containerSource.Get(r.address().Container())
if err != nil {
r.status = statusUndefined
r.err = err
r.log.Debug(logs.GetCouldNotGetContainer, zap.Error(err))
return true
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
r.status = statusUndefined
if policy.IsECPlacement(cnr.Value.PlacementPolicy()) {
return r.processECNodes(ctx, traverser, policy.ECDataCount(cnr.Value.PlacementPolicy()))
}
return r.processRepNodes(ctx, traverser)
}
func (r *request) processRepNodes(ctx context.Context, traverser *placement.Traverser) bool {
for {
addrs := traverser.Next()
if len(addrs) == 0 {
@ -91,3 +117,139 @@ func (r *request) processCurrentEpoch(ctx context.Context) bool {
}
}
}
func (r *request) processECNodes(ctx context.Context, traverser *placement.Traverser, dataCount int) bool {
err := r.traverseECNodes(ctx, traverser, dataCount)
var errSplitInfo *objectSDK.SplitInfoError
var errECInfo *objectSDK.ECInfoError
var errRemoved *apistatus.ObjectAlreadyRemoved
var errOutOfRange *apistatus.ObjectOutOfRange
var errSuccess *ecGetSuccessErr
switch {
case err == nil: // nil is returned if all nodes failed or incomplete EC info received
if len(r.infoEC.Chunks) > 0 {
r.status = statusEC
r.err = objectSDK.NewECInfoError(r.infoEC)
} else {
r.status = statusUndefined
r.err = new(apistatus.ObjectNotFound)
}
case errors.As(err, &errRemoved):
r.status = statusINHUMED
r.err = errRemoved
case errors.As(err, &errOutOfRange):
r.status = statusOutOfRange
r.err = errOutOfRange
case errors.As(err, &errSplitInfo):
r.status = statusVIRTUAL
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errECInfo):
r.status = statusEC
r.err = err
case errors.As(err, &errSuccess):
r.status = statusOK
r.err = nil
if errSuccess.Object != nil {
r.collectedObject = errSuccess.Object
r.writeCollectedObject(ctx)
}
}
return r.status != statusUndefined
}
func (r *request) traverseECNodes(ctx context.Context, traverser *placement.Traverser, dataCount int) error {
nodes := make(chan placement.Node, dataCount)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
batch := traverser.Next()
if len(batch) == 0 {
r.log.Debug(logs.NoMoreNodesAbortPlacementIteration)
close(nodes)
return
}
for _, node := range batch {
select {
case <-ctx.Done():
return
case nodes <- node:
}
}
}
}()
err := r.processECNodesRequests(ctx, nodes, dataCount)
cancel()
wg.Wait()
return err
}
func (r *request) processECNodesRequests(ctx context.Context, nodes <-chan placement.Node, dataCount int) error {
var ecInfoGuard sync.Mutex
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(dataCount)
for node := range nodes {
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node)
eg.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
rs, err := r.remoteStorageConstructor.Get(info)
if err != nil {
r.log.Debug(logs.GetCouldNotConstructRemoteNodeClient, zap.String("node_key", hex.EncodeToString(info.PublicKey())))
return err
}
obj, err := r.getRemote(ctx, rs, info)
var errSplitInfo *objectSDK.SplitInfoError
var errECInfo *objectSDK.ECInfoError
var errRemoved *apistatus.ObjectAlreadyRemoved
var errOutOfRange *apistatus.ObjectOutOfRange
switch {
default:
// something failed, continue
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
return nil
case err == nil:
// non EC object found (tombstone, linking, lock), stop
return &ecGetSuccessErr{Object: obj}
case errors.As(err, &errRemoved) || errors.As(err, &errOutOfRange) || errors.As(err, &errSplitInfo):
// non EC error found, stop
return err
case errors.As(err, &errECInfo):
ecInfoGuard.Lock()
defer ecInfoGuard.Unlock()
r.infoEC = util.MergeECInfo(errECInfo.ECInfo(), r.infoEC)
if r.isRaw() {
if len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total) {
return objectSDK.NewECInfoError(r.infoEC)
}
return nil
}
if len(r.infoEC.Chunks) >= dataCount {
return objectSDK.NewECInfoError(r.infoEC)
}
return nil
}
})
}
return eg.Wait()
}
type ecGetSuccessErr struct {
Object *objectSDK.Object
}
func (s *ecGetSuccessErr) Error() string { return "" }

View file

@ -11,6 +11,7 @@ import (
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
@ -273,6 +274,16 @@ func (ks *testKeyStorage) GetKey(_ *util.SessionInfo) (*ecdsa.PrivateKey, error)
return &ecdsa.PrivateKey{}, nil
}
type testContainerSource struct{}
func (s *testContainerSource) Get(idCnr cid.ID) (*containerCore.Container, error) {
return &containerCore.Container{
Value: container.Container{},
}, nil
}
func (s *testContainerSource) DeletionInfo(cid.ID) (*containerCore.DelInfo, error) { return nil, nil }
func TestGetLocalOnly(t *testing.T) {
ctx := context.Background()
@ -551,6 +562,7 @@ func TestGetRemoteSmall(t *testing.T) {
epochSource: testEpochReceiver(curEpoch),
remoteStorageConstructor: c,
keyStore: &testKeyStorage{},
containerSource: &testContainerSource{},
}
}
@ -1722,6 +1734,7 @@ func TestGetRange(t *testing.T) {
epochSource: testEpochReceiver(curEpoch),
remoteStorageConstructor: c,
keyStore: &testKeyStorage{},
containerSource: &testContainerSource{},
}
}
@ -1879,7 +1892,8 @@ func TestGetFromPastEpoch(t *testing.T) {
as[1][1]: c22,
},
},
keyStore: &testKeyStorage{},
keyStore: &testKeyStorage{},
containerSource: &testContainerSource{},
}
w := NewSimpleObjectWriter()