WIP: Process EC container for Get/GetRange/Head concurrently #1237

Closed
dstepanov-yadro wants to merge 6 commits from dstepanov-yadro/frostfs-node:fix/ec_get_failover into master
15 changed files with 424 additions and 117 deletions

View file

@ -104,10 +104,9 @@ const (
GetTryingToAssembleTheECObject = "trying to assemble the ec object..." GetTryingToAssembleTheECObject = "trying to assemble the ec object..."
GetAssemblingSplittedObject = "assembling splitted object..." GetAssemblingSplittedObject = "assembling splitted object..."
GetAssemblingECObject = "assembling erasure-coded object..." GetAssemblingECObject = "assembling erasure-coded object..."
GetUnableToGetAllPartsECObject = "unable to get all parts, continue to reconstruct with existed" GetUnableToGetECPartLocal = "failed to get EC part from local storage"
GetUnableToGetPartECObject = "unable to get part of the erasure-encoded object" GetUnableToGetECPartRemote = "failed to get EC part from remote node"
GetUnableToHeadPartECObject = "unable to head part of the erasure-encoded object" GetECObjectInRepContainer = "found erasure-coded object in REP container"
GetUnableToGetECObjectContainer = "unable to get container for erasure-coded object"
GetAssemblingSplittedObjectCompleted = "assembling splitted object completed" GetAssemblingSplittedObjectCompleted = "assembling splitted object completed"
GetAssemblingECObjectCompleted = "assembling erasure-coded object completed" GetAssemblingECObjectCompleted = "assembling erasure-coded object completed"
GetFailedToAssembleSplittedObject = "failed to assemble splitted object" GetFailedToAssembleSplittedObject = "failed to assemble splitted object"
@ -123,6 +122,7 @@ const (
GetRequestedObjectIsVirtual = "requested object is virtual" GetRequestedObjectIsVirtual = "requested object is virtual"
GetRequestedObjectIsEC = "requested object is erasure-coded" GetRequestedObjectIsEC = "requested object is erasure-coded"
GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds" GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds"
GetCouldNotGetContainer = "could not get container"
PutAdditionalContainerBroadcastFailure = "additional container broadcast failure" PutAdditionalContainerBroadcastFailure = "additional container broadcast failure"
SearchReturnResultDirectly = "return result directly" SearchReturnResultDirectly = "return result directly"
SearchCouldNotConstructRemoteNodeClient = "could not construct remote node client" SearchCouldNotConstructRemoteNodeClient = "could not construct remote node client"

View file

@ -140,7 +140,8 @@ func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm Reque
prm: prm, prm: prm,
infoSplit: objectSDK.NewSplitInfo(), infoSplit: objectSDK.NewSplitInfo(),
infoEC: objectSDK.NewECInfo(), infoEC: make(map[uint32]objectSDK.ECChunk),
partsEC: make(map[uint32]*objectSDK.Object),
log: r.log, log: r.log,
} }

View file

@ -36,7 +36,7 @@ func (r *request) assembleEC(ctx context.Context) {
r.log.Debug(logs.GetTryingToAssembleTheECObject) r.log.Debug(logs.GetTryingToAssembleTheECObject)
r.prm.common = r.prm.common.WithLocalOnly(false) r.prm.common = r.prm.common.WithLocalOnly(false)
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log) assembler := newAssemblerEC(r.address(), r.partsEC, r.infoEC, r.ctxRange(), r.headOnly(), r.containerSource)
r.log.Debug(logs.GetAssemblingECObject, r.log.Debug(logs.GetAssemblingECObject,
zap.Uint64("range_offset", r.ctxRange().GetOffset()), zap.Uint64("range_offset", r.ctxRange().GetOffset()),
@ -47,7 +47,7 @@ func (r *request) assembleEC(ctx context.Context) {
zap.Uint64("range_length", r.ctxRange().GetLength()), zap.Uint64("range_length", r.ctxRange().GetLength()),
) )
obj, err := assembler.Assemble(ctx, r.prm.objWriter, r.headOnly()) obj, err := assembler.Assemble(ctx, r.prm.objWriter)
if err != nil { if err != nil {
r.log.Warn(logs.GetFailedToAssembleECObject, r.log.Warn(logs.GetFailedToAssembleECObject,
zap.Error(err), zap.Error(err),

View file

@ -2,51 +2,46 @@ package getsvc
import ( import (
"context" "context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
) )
type assemblerec struct { type assemblerec struct {
addr oid.Address addr oid.Address
ecInfo *objectSDK.ECInfo ecParts map[uint32]*objectSDK.Object
rng *objectSDK.Range ecChunks map[uint32]objectSDK.ECChunk
objGetter objectGetter rng *objectSDK.Range
cs container.Source head bool
log *logger.Logger cs container.Source
} }
func newAssemblerEC( func newAssemblerEC(
addr oid.Address, addr oid.Address,
ecInfo *objectSDK.ECInfo, ecParts map[uint32]*objectSDK.Object,
ecChunks map[uint32]objectSDK.ECChunk,
rng *objectSDK.Range, rng *objectSDK.Range,
objGetter objectGetter, head bool,
cs container.Source, cs container.Source,
log *logger.Logger,
) *assemblerec { ) *assemblerec {
return &assemblerec{ return &assemblerec{
addr: addr, addr: addr,
rng: rng, rng: rng,
ecInfo: ecInfo, head: head,
objGetter: objGetter, ecParts: ecParts,
cs: cs, ecChunks: ecChunks,
log: log, cs: cs,
} }
} }
// Assemble assembles erasure-coded object and writes it's content to ObjectWriter. // Assemble assembles erasure-coded object and writes it's content to ObjectWriter.
// It returns parent object. // It returns parent object.
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) { func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
if headOnly { if a.head {
return a.reconstructHeader(ctx, writer) return a.reconstructHeader(ctx, writer)
} else if a.rng != nil { } else if a.rng != nil {
return a.reconstructRange(ctx, writer) return a.reconstructRange(ctx, writer)
@ -65,7 +60,7 @@ func (a *assemblerec) getConstructor() (*erasurecode.Constructor, error) {
} }
func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
parts := a.retrieveParts(ctx, true) parts := a.retrieveParts()
c, err := a.getConstructor() c, err := a.getConstructor()
if err != nil { if err != nil {
return nil, err return nil, err
@ -78,7 +73,7 @@ func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter
} }
func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
parts := a.retrieveParts(ctx, false) parts := a.retrieveParts()
c, err := a.getConstructor() c, err := a.getConstructor()
if err != nil { if err != nil {
return nil, err return nil, err
@ -101,7 +96,7 @@ func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter)
} }
func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
parts := a.retrieveParts(ctx, false) parts := a.retrieveParts()
c, err := a.getConstructor() c, err := a.getConstructor()
if err != nil { if err != nil {
return nil, err return nil, err
@ -119,41 +114,17 @@ func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter
return obj, err return obj, err
} }
func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object { func (a *assemblerec) retrieveParts() []*objectSDK.Object {
parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total)) parts := make([]*objectSDK.Object, a.getTotalCount())
errGroup, ctx := errgroup.WithContext(mainCtx) for idx := range parts {
parts[idx] = a.ecParts[uint32(idx)]
for i := range a.ecInfo.Chunks {
chunk := a.ecInfo.Chunks[i]
errGroup.Go(func() error {
objID := new(oid.ID)
err := objID.ReadFromV2(chunk.ID)
if err != nil {
return fmt.Errorf("invalid object ID: %w", err)
}
var obj *objectSDK.Object
if headOnly {
obj, err = a.objGetter.HeadObject(ctx, *objID)
if err != nil {
a.log.Debug(logs.GetUnableToHeadPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
} else {
sow := NewSimpleObjectWriter()
obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow)
if err != nil {
a.log.Debug(logs.GetUnableToGetPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
obj.SetPayload(sow.pld)
}
parts[chunk.Index] = obj
return nil
})
}
if err := errGroup.Wait(); err != nil {
a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err))
} }
return parts return parts
} }
func (a *assemblerec) getTotalCount() uint32 {
for _, ch := range a.ecChunks {
return ch.Total
}
return 0
}

View file

@ -2,10 +2,21 @@ package getsvc
import ( import (
"context" "context"
"encoding/hex"
"errors"
"fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup"
) )
func (r *request) executeOnContainer(ctx context.Context) { func (r *request) executeOnContainer(ctx context.Context) {
@ -48,7 +59,7 @@ func (r *request) processCurrentEpoch(ctx context.Context) bool {
zap.Uint64("number", r.curProcEpoch), zap.Uint64("number", r.curProcEpoch),
) )
traverser, ok := r.generateTraverser(r.address()) traverser, cnr, ok := r.generateTraverser(r.address())
if !ok { if !ok {
return true return true
} }
@ -58,6 +69,13 @@ func (r *request) processCurrentEpoch(ctx context.Context) bool {
r.status = statusUndefined r.status = statusUndefined
if policy.IsECPlacement(cnr.Value.PlacementPolicy()) {
return r.processECNodes(ctx, traverser, policy.ECDataCount(cnr.Value.PlacementPolicy()), policy.ECParityCount(cnr.Value.PlacementPolicy()))
}
return r.processRepNodes(ctx, traverser)
}
func (r *request) processRepNodes(ctx context.Context, traverser *placement.Traverser) bool {
for { for {
addrs := traverser.Next() addrs := traverser.Next()
if len(addrs) == 0 { if len(addrs) == 0 {
@ -84,10 +102,211 @@ func (r *request) processCurrentEpoch(ctx context.Context) bool {
client.NodeInfoFromNetmapElement(&info, addrs[i]) client.NodeInfoFromNetmapElement(&info, addrs[i])
if r.processNode(ctx, info) { if r.processRepNode(ctx, info) {
r.log.Debug(logs.GetCompletingTheOperation) r.log.Debug(logs.GetCompletingTheOperation)
return true return true
} }
} }
} }
} }
func (r *request) processECNodes(ctx context.Context, traverser *placement.Traverser, dataCount, parityCount int) bool {
ctx, span := tracing.StartSpanFromContext(ctx, "getService.processECNodes")
defer span.End()
if !r.isRaw() && len(r.partsEC) >= dataCount {
return true
}
if r.isRaw() && len(r.infoEC) == dataCount+parityCount {
return true
}
err := r.traverseECNodes(ctx, traverser, dataCount)
var errSplitInfo *objectSDK.SplitInfoError
var errECInfo *objectSDK.ECInfoError
var errRemoved *apistatus.ObjectAlreadyRemoved
var errOutOfRange *apistatus.ObjectOutOfRange
var errSuccess *ecGetSuccessErr
switch {
case err == nil: // nil is returned if all nodes failed or incomplete EC info received
if len(r.infoEC) > 0 {
r.status = statusEC
r.err = r.createECInfoError()
} else {
r.status = statusUndefined
r.err = new(apistatus.ObjectNotFound)
}
case errors.As(err, &errRemoved):
r.status = statusINHUMED
r.err = errRemoved
case errors.As(err, &errOutOfRange):
r.status = statusOutOfRange
r.err = errOutOfRange
case errors.As(err, &errSplitInfo):
r.status = statusVIRTUAL
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errECInfo):
r.status = statusEC
r.err = err
case errors.As(err, &errSuccess):
r.status = statusOK
r.err = nil
if errSuccess.Object != nil {
r.collectedObject = errSuccess.Object
r.writeCollectedObject(ctx)
}
}
return r.status != statusUndefined
}
func (r *request) traverseECNodes(ctx context.Context, traverser *placement.Traverser, dataCount int) error {
nodes := make(chan placement.Node, dataCount)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for {
batch := traverser.Next()
if len(batch) == 0 {
r.log.Debug(logs.NoMoreNodesAbortPlacementIteration)
close(nodes)
return
}
for _, node := range batch {
select {
case <-ctx.Done():
return
case nodes <- node:
}
}
}
}()
err := r.processECNodesRequests(ctx, nodes, dataCount)
cancel()
wg.Wait()
return err
}
func (r *request) processECNodesRequests(ctx context.Context, nodes <-chan placement.Node, dataCount int) error {
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(dataCount)
for node := range nodes {
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node)
eg.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
rs, err := r.remoteStorageConstructor.Get(info)
if err != nil {
r.log.Debug(logs.GetCouldNotConstructRemoteNodeClient, zap.String("node_key", hex.EncodeToString(info.PublicKey())))
return err
}
obj, err := r.getRemote(ctx, rs, info)
var errSplitInfo *objectSDK.SplitInfoError
var errECInfo *objectSDK.ECInfoError
var errRemoved *apistatus.ObjectAlreadyRemoved
var errOutOfRange *apistatus.ObjectOutOfRange
switch {
default:
// something failed, continue
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
return nil
case err == nil:
// non EC object found (tombstone, linking, lock), stop
return &ecGetSuccessErr{Object: obj}
case errors.As(err, &errRemoved) || errors.As(err, &errOutOfRange) || errors.As(err, &errSplitInfo):
// non EC error found, stop
return err
case errors.As(err, &errECInfo):
if r.isRaw() {
return r.appendECChunksWithCheck(errECInfo)
}
return r.getECChunksRemote(ctx, errECInfo, info, dataCount)
}
})
}
return eg.Wait()
}
func (r *request) appendECChunksWithCheck(errECInfo *objectSDK.ECInfoError) error {
r.ecGuard.Lock()
defer r.ecGuard.Unlock()
for _, ch := range errECInfo.ECInfo().Chunks {
r.infoEC[ch.Index] = objectSDK.ECChunk(ch)
}
if len(r.infoEC) == int(errECInfo.ECInfo().Chunks[0].Total) {
return r.createECInfoError()
}
return nil
}
func (r *request) getECChunksRemote(ctx context.Context, errECInfo *objectSDK.ECInfoError, info client.NodeInfo, dataCount int) error {
for _, ch := range errECInfo.ECInfo().Chunks {
var objID oid.ID
err := objID.ReadFromV2(ch.ID)
if err != nil {
return fmt.Errorf("invalid object ID: %w", err)
}
var address oid.Address
address.SetContainer(r.containerID())
address.SetObject(objID)
var obj *objectSDK.Object
if r.headOnly() {
obj, err = r.headObjectFromNode(ctx, address, info)
} else {
obj, err = r.getObjectFromNode(ctx, address, info)
}
if err != nil {
r.log.Warn(logs.GetUnableToGetECPartRemote, zap.Error(err), zap.Stringer("part_address", address),
zap.String("node", hex.EncodeToString(info.PublicKey())))
continue
}
if err := r.appendECChunkAndObjectWithCheck(objectSDK.ECChunk(ch), obj, dataCount); err != nil {
return err
}
}
return nil
}
func (r *request) appendECChunkAndObjectWithCheck(chunk objectSDK.ECChunk, object *objectSDK.Object, dataCount int) error {
if object == nil {
return nil
}
r.ecGuard.Lock()
defer r.ecGuard.Unlock()
r.infoEC[chunk.Index] = chunk
r.partsEC[chunk.Index] = object
if len(r.infoEC) >= dataCount && len(r.partsEC) >= dataCount {
return r.createECInfoError()
}
return nil
}
func (r *request) createECInfoError() error {
ecInfo := objectSDK.NewECInfo()
for _, chunk := range r.infoEC {
ecInfo.AddChunk(objectSDK.ECChunk(chunk))
}
return objectSDK.NewECInfoError(ecInfo)
}
type ecGetSuccessErr struct {
Object *objectSDK.Object
}
func (s *ecGetSuccessErr) Error() string { return "" }

View file

@ -77,7 +77,8 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error {
prm: prm, prm: prm,
infoSplit: objectSDK.NewSplitInfo(), infoSplit: objectSDK.NewSplitInfo(),
infoEC: objectSDK.NewECInfo(), infoEC: make(map[uint32]objectSDK.ECChunk),
partsEC: make(map[uint32]*objectSDK.Object),
log: s.log, log: s.log,
} }

View file

@ -11,6 +11,7 @@ import (
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
@ -78,7 +79,7 @@ func newTestStorage() *testStorage {
} }
} }
func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, error) { func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, *containerCore.Container, error) {
opts := make([]placement.Option, 0, 4) opts := make([]placement.Option, 0, 4)
opts = append(opts, opts = append(opts,
placement.ForContainer(g.c), placement.ForContainer(g.c),
@ -90,7 +91,10 @@ func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e ui
opts = append(opts, placement.ForObject(*obj)) opts = append(opts, placement.ForObject(*obj))
} }
return placement.NewTraverser(opts...) t, err := placement.NewTraverser(opts...)
return t, &containerCore.Container{
Value: g.c,
}, err
} }
func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
@ -273,6 +277,16 @@ func (ks *testKeyStorage) GetKey(_ *util.SessionInfo) (*ecdsa.PrivateKey, error)
return &ecdsa.PrivateKey{}, nil return &ecdsa.PrivateKey{}, nil
} }
type testContainerSource struct{}
func (s *testContainerSource) Get(idCnr cid.ID) (*containerCore.Container, error) {
return &containerCore.Container{
Value: container.Container{},
}, nil
}
func (s *testContainerSource) DeletionInfo(cid.ID) (*containerCore.DelInfo, error) { return nil, nil }
func TestGetLocalOnly(t *testing.T) { func TestGetLocalOnly(t *testing.T) {
ctx := context.Background() ctx := context.Background()
@ -551,6 +565,7 @@ func TestGetRemoteSmall(t *testing.T) {
epochSource: testEpochReceiver(curEpoch), epochSource: testEpochReceiver(curEpoch),
remoteStorageConstructor: c, remoteStorageConstructor: c,
keyStore: &testKeyStorage{}, keyStore: &testKeyStorage{},
containerSource: &testContainerSource{},
} }
} }
@ -1722,6 +1737,7 @@ func TestGetRange(t *testing.T) {
epochSource: testEpochReceiver(curEpoch), epochSource: testEpochReceiver(curEpoch),
remoteStorageConstructor: c, remoteStorageConstructor: c,
keyStore: &testKeyStorage{}, keyStore: &testKeyStorage{},
containerSource: &testContainerSource{},
} }
} }
@ -1879,7 +1895,8 @@ func TestGetFromPastEpoch(t *testing.T) {
as[1][1]: c22, as[1][1]: c22,
}, },
}, },
keyStore: &testKeyStorage{}, keyStore: &testKeyStorage{},
containerSource: &testContainerSource{},
} }
w := NewSimpleObjectWriter() w := NewSimpleObjectWriter()

View file

@ -3,12 +3,13 @@ package getsvc
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -45,9 +46,19 @@ func (r *request) executeLocal(ctx context.Context) {
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo()) mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
r.err = objectSDK.NewSplitInfoError(r.infoSplit) r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errECInfo): case errors.As(err, &errECInfo):
if r.isRaw() {
r.appendECChunks(errECInfo)
r.status = statusEC
r.err = r.createECInfoError()
break
}
if err := r.getECChunksLocal(ctx, errECInfo); err != nil {
r.status = statusUndefined
r.err = err
break
}
r.status = statusEC r.status = statusEC
util.MergeECInfo(errECInfo.ECInfo(), r.infoEC) r.err = r.createECInfoError()
r.err = objectSDK.NewECInfoError(r.infoEC)
case errors.As(err, &errOutOfRange): case errors.As(err, &errOutOfRange):
r.status = statusOutOfRange r.status = statusOutOfRange
r.err = errOutOfRange r.err = errOutOfRange
@ -63,3 +74,49 @@ func (r *request) get(ctx context.Context) (*objectSDK.Object, error) {
} }
return r.localStorage.Get(ctx, r.address()) return r.localStorage.Get(ctx, r.address())
} }
func (r *request) appendECChunks(errECInfo *objectSDK.ECInfoError) {
r.ecGuard.Lock()
defer r.ecGuard.Unlock()
for _, ch := range errECInfo.ECInfo().Chunks {
r.infoEC[ch.Index] = objectSDK.ECChunk(ch)
}
}
func (r *request) appendECChunkAndObject(chunk objectSDK.ECChunk, object *objectSDK.Object) {
if object == nil {
return
}
r.ecGuard.Lock()
defer r.ecGuard.Unlock()
r.infoEC[chunk.Index] = chunk
r.partsEC[chunk.Index] = object
}
func (r *request) getECChunksLocal(ctx context.Context, errECInfo *objectSDK.ECInfoError) error {
for _, ch := range errECInfo.ECInfo().Chunks {
var objID oid.ID
err := objID.ReadFromV2(ch.ID)
if err != nil {
return fmt.Errorf("invalid object ID: %w", err)
}
var address oid.Address
address.SetContainer(r.containerID())
address.SetObject(objID)
var obj *objectSDK.Object
if r.headOnly() {
obj, err = r.localStorage.Head(ctx, address, false)
} else {
obj, err = r.localStorage.Get(ctx, address)
}
if err != nil {
r.log.Warn(logs.GetUnableToGetECPartLocal, zap.Error(err), zap.Stringer("part_address", address))
continue
}
r.appendECChunkAndObject(objectSDK.ECChunk(ch), obj)
}
return nil
}

View file

@ -7,16 +7,17 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap" "go.uber.org/zap"
) )
func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool { var errECObjectInRepContainer = errors.New("found erasure-coded object in REP container")
ctx, span := tracing.StartSpanFromContext(ctx, "getService.processNode")
func (r *request) processRepNode(ctx context.Context, info client.NodeInfo) bool {
ctx, span := tracing.StartSpanFromContext(ctx, "getService.processRepNode")
defer span.End() defer span.End()
r.log.Debug(logs.ProcessingNode, zap.String("node_key", hex.EncodeToString(info.PublicKey()))) r.log.Debug(logs.ProcessingNode, zap.String("node_key", hex.EncodeToString(info.PublicKey())))
@ -36,11 +37,6 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
switch { switch {
default: default:
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err)) r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
if r.status == statusEC {
// we need to continue getting another chunks from another nodes
// in case of network issue
return false
}
r.status = statusUndefined r.status = statusUndefined
r.err = new(apistatus.ObjectNotFound) r.err = new(apistatus.ObjectNotFound)
case err == nil: case err == nil:
@ -65,19 +61,10 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo()) mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
r.err = objectSDK.NewSplitInfoError(r.infoSplit) r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errECInfo): case errors.As(err, &errECInfo):
r.status = statusEC r.log.Error(logs.GetECObjectInRepContainer, zap.Stringer("address", r.address()))
util.MergeECInfo(r.infoEC, errECInfo.ECInfo()) r.status = statusUndefined
r.infoEC = errECInfo.ECInfo() r.err = errECObjectInRepContainer
r.err = objectSDK.NewECInfoError(r.infoEC) return true
if r.isRaw() {
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
}
cnt, err := r.containerSource.Get(r.address().Container())
if err == nil {
return len(r.infoEC.Chunks) == policy.ECDataCount(cnt.Value.PlacementPolicy())
}
r.log.Debug(logs.GetUnableToGetECObjectContainer, zap.Error(err))
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
} }
return r.status != statusUndefined return r.status != statusUndefined
@ -116,3 +103,49 @@ func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.N
return rs.Get(ctx, r.address(), prm) return rs.Get(ctx, r.address(), prm)
} }
func (r *request) getObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error) {
rs, err := r.remoteStorageConstructor.Get(info)
if err != nil {
return nil, err
}
key, err := r.key()
if err != nil {
return nil, err
}
prm := RemoteRequestParams{
Epoch: r.curProcEpoch,
TTL: 1,
PrivateKey: key,
SessionToken: r.prm.common.SessionToken(),
BearerToken: r.prm.common.BearerToken(),
XHeaders: r.prm.common.XHeaders(),
}
return rs.Get(ctx, addr, prm)
}
func (r *request) headObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error) {
rs, err := r.remoteStorageConstructor.Get(info)
if err != nil {
return nil, err
}
key, err := r.key()
if err != nil {
return nil, err
}
prm := RemoteRequestParams{
Epoch: r.curProcEpoch,
TTL: 1,
PrivateKey: key,
SessionToken: r.prm.common.SessionToken(),
BearerToken: r.prm.common.BearerToken(),
XHeaders: r.prm.common.XHeaders(),
}
return rs.Head(ctx, addr, prm)
}

View file

@ -3,6 +3,7 @@ package getsvc
import ( import (
"context" "context"
"crypto/ecdsa" "crypto/ecdsa"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
@ -23,7 +24,9 @@ type request struct {
infoSplit *objectSDK.SplitInfo infoSplit *objectSDK.SplitInfo
infoEC *objectSDK.ECInfo ecGuard sync.Mutex
infoEC map[uint32]objectSDK.ECChunk
partsEC map[uint32]*objectSDK.Object
log *logger.Logger log *logger.Logger
@ -138,22 +141,19 @@ func (r *request) initEpoch() bool {
} }
} }
func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, bool) { func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, *container.Container, bool) {
obj := addr.Object() obj := addr.Object()
t, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch) t, cnr, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch)
if err != nil {
switch {
default:
r.status = statusUndefined r.status = statusUndefined
r.err = err r.err = err
r.log.Debug(logs.GetCouldNotGenerateContainerTraverser, zap.Error(err)) r.log.Debug(logs.GetCouldNotGenerateContainerTraverser, zap.Error(err))
return nil, false return nil, nil, false
case err == nil:
return t, true
} }
return t, cnr, true
} }
func (r *request) getRemoteStorage(info clientcore.NodeInfo) (remoteStorage, bool) { func (r *request) getRemoteStorage(info clientcore.NodeInfo) (remoteStorage, bool) {
@ -223,7 +223,7 @@ func (r *request) writeCollectedObject(ctx context.Context) {
// isForwardingEnabled returns true if common execution // isForwardingEnabled returns true if common execution
// parameters has request forwarding closure set. // parameters has request forwarding closure set.
func (r request) isForwardingEnabled() bool { func (r *request) isForwardingEnabled() bool {
return r.prm.forwarder != nil return r.prm.forwarder != nil
} }

View file

@ -6,6 +6,7 @@ import (
"errors" "errors"
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
@ -23,7 +24,7 @@ type epochSource interface {
} }
type traverserGenerator interface { type traverserGenerator interface {
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error) GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error)
} }
type keyStorage interface { type keyStorage interface {

View file

@ -47,7 +47,7 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) error {
zap.Uint64("number", exec.curProcEpoch), zap.Uint64("number", exec.curProcEpoch),
) )
traverser, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch) traverser, _, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch)
if err != nil { if err != nil {
return fmt.Errorf("%s: %w", logs.SearchCouldNotGenerateContainerTraverser, err) return fmt.Errorf("%s: %w", logs.SearchCouldNotGenerateContainerTraverser, err)
} }

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
@ -80,12 +81,13 @@ func newTestStorage() *testStorage {
} }
} }
func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, error) { func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, *containerCore.Container, error) {
return placement.NewTraverser( t, err := placement.NewTraverser(
placement.ForContainer(g.c), placement.ForContainer(g.c),
placement.UseBuilder(g.b[epoch]), placement.UseBuilder(g.b[epoch]),
placement.WithoutSuccessTracking(), placement.WithoutSuccessTracking(),
) )
return t, &containerCore.Container{Value: g.c}, err
} }
func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
@ -45,7 +46,7 @@ type cfg struct {
} }
traverserGenerator interface { traverserGenerator interface {
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error) GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error)
} }
currentEpochReceiver interface { currentEpochReceiver interface {

View file

@ -122,17 +122,17 @@ func (g *TraverserGenerator) WithTraverseOptions(opts ...placement.Option) *Trav
// GenerateTraverser generates placement Traverser for provided object address // GenerateTraverser generates placement Traverser for provided object address
// using epoch-th network map. // using epoch-th network map.
func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, error) { func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, *container.Container, error) {
// get network map by epoch // get network map by epoch
nm, err := g.netMapSrc.GetNetMapByEpoch(epoch) nm, err := g.netMapSrc.GetNetMapByEpoch(epoch)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get network map #%d: %w", epoch, err) return nil, nil, fmt.Errorf("could not get network map #%d: %w", epoch, err)
} }
// get container related container // get container related container
cnr, err := g.cnrSrc.Get(idCnr) cnr, err := g.cnrSrc.Get(idCnr)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get container: %w", err) return nil, nil, fmt.Errorf("could not get container: %w", err)
} }
// allocate placement traverser options // allocate placement traverser options
@ -160,5 +160,9 @@ func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoc
) )
} }
return placement.NewTraverser(traverseOpts...) t, err := placement.NewTraverser(traverseOpts...)
if err != nil {
return nil, nil, err
}
return t, cnr, nil
} }