WIP: Process EC container for Get/GetRange/Head concurrently #1237

Closed
dstepanov-yadro wants to merge 6 commits from dstepanov-yadro/frostfs-node:fix/ec_get_failover into master
15 changed files with 424 additions and 117 deletions

View file

@ -104,10 +104,9 @@ const (
GetTryingToAssembleTheECObject = "trying to assemble the ec object..." GetTryingToAssembleTheECObject = "trying to assemble the ec object..."
GetAssemblingSplittedObject = "assembling splitted object..." GetAssemblingSplittedObject = "assembling splitted object..."
GetAssemblingECObject = "assembling erasure-coded object..." GetAssemblingECObject = "assembling erasure-coded object..."
GetUnableToGetAllPartsECObject = "unable to get all parts, continue to reconstruct with existed" GetUnableToGetECPartLocal = "failed to get EC part from local storage"
GetUnableToGetPartECObject = "unable to get part of the erasure-encoded object" GetUnableToGetECPartRemote = "failed to get EC part from remote node"
GetUnableToHeadPartECObject = "unable to head part of the erasure-encoded object" GetECObjectInRepContainer = "found erasure-coded object in REP container"
GetUnableToGetECObjectContainer = "unable to get container for erasure-coded object"
GetAssemblingSplittedObjectCompleted = "assembling splitted object completed" GetAssemblingSplittedObjectCompleted = "assembling splitted object completed"
GetAssemblingECObjectCompleted = "assembling erasure-coded object completed" GetAssemblingECObjectCompleted = "assembling erasure-coded object completed"
GetFailedToAssembleSplittedObject = "failed to assemble splitted object" GetFailedToAssembleSplittedObject = "failed to assemble splitted object"
@ -123,6 +122,7 @@ const (
GetRequestedObjectIsVirtual = "requested object is virtual" GetRequestedObjectIsVirtual = "requested object is virtual"
GetRequestedObjectIsEC = "requested object is erasure-coded" GetRequestedObjectIsEC = "requested object is erasure-coded"
GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds" GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds"
GetCouldNotGetContainer = "could not get container"
PutAdditionalContainerBroadcastFailure = "additional container broadcast failure" PutAdditionalContainerBroadcastFailure = "additional container broadcast failure"
SearchReturnResultDirectly = "return result directly" SearchReturnResultDirectly = "return result directly"
SearchCouldNotConstructRemoteNodeClient = "could not construct remote node client" SearchCouldNotConstructRemoteNodeClient = "could not construct remote node client"

View file

@ -140,7 +140,8 @@ func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm Reque
prm: prm, prm: prm,
infoSplit: objectSDK.NewSplitInfo(), infoSplit: objectSDK.NewSplitInfo(),
infoEC: objectSDK.NewECInfo(), infoEC: make(map[uint32]objectSDK.ECChunk),
partsEC: make(map[uint32]*objectSDK.Object),
log: r.log, log: r.log,
} }

View file

@ -36,7 +36,7 @@ func (r *request) assembleEC(ctx context.Context) {
r.log.Debug(logs.GetTryingToAssembleTheECObject) r.log.Debug(logs.GetTryingToAssembleTheECObject)
r.prm.common = r.prm.common.WithLocalOnly(false) r.prm.common = r.prm.common.WithLocalOnly(false)
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log) assembler := newAssemblerEC(r.address(), r.partsEC, r.infoEC, r.ctxRange(), r.headOnly(), r.containerSource)
r.log.Debug(logs.GetAssemblingECObject, r.log.Debug(logs.GetAssemblingECObject,
zap.Uint64("range_offset", r.ctxRange().GetOffset()), zap.Uint64("range_offset", r.ctxRange().GetOffset()),
@ -47,7 +47,7 @@ func (r *request) assembleEC(ctx context.Context) {
zap.Uint64("range_length", r.ctxRange().GetLength()), zap.Uint64("range_length", r.ctxRange().GetLength()),
) )
obj, err := assembler.Assemble(ctx, r.prm.objWriter, r.headOnly()) obj, err := assembler.Assemble(ctx, r.prm.objWriter)
if err != nil { if err != nil {
r.log.Warn(logs.GetFailedToAssembleECObject, r.log.Warn(logs.GetFailedToAssembleECObject,
zap.Error(err), zap.Error(err),

View file

@ -2,51 +2,46 @@ package getsvc
import ( import (
"context" "context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
) )
type assemblerec struct { type assemblerec struct {
addr oid.Address addr oid.Address
ecInfo *objectSDK.ECInfo ecParts map[uint32]*objectSDK.Object
rng *objectSDK.Range ecChunks map[uint32]objectSDK.ECChunk
objGetter objectGetter rng *objectSDK.Range
cs container.Source head bool
log *logger.Logger cs container.Source
} }
func newAssemblerEC( func newAssemblerEC(
addr oid.Address, addr oid.Address,
ecInfo *objectSDK.ECInfo, ecParts map[uint32]*objectSDK.Object,
ecChunks map[uint32]objectSDK.ECChunk,
rng *objectSDK.Range, rng *objectSDK.Range,
objGetter objectGetter, head bool,
cs container.Source, cs container.Source,
log *logger.Logger,
) *assemblerec { ) *assemblerec {
return &assemblerec{ return &assemblerec{
addr: addr, addr: addr,
rng: rng, rng: rng,
ecInfo: ecInfo, head: head,
objGetter: objGetter, ecParts: ecParts,
cs: cs, ecChunks: ecChunks,
log: log, cs: cs,
} }
} }
// Assemble assembles erasure-coded object and writes it's content to ObjectWriter. // Assemble assembles erasure-coded object and writes it's content to ObjectWriter.
// It returns parent object. // It returns parent object.
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) { func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
if headOnly { if a.head {
return a.reconstructHeader(ctx, writer) return a.reconstructHeader(ctx, writer)
} else if a.rng != nil { } else if a.rng != nil {
return a.reconstructRange(ctx, writer) return a.reconstructRange(ctx, writer)
@ -65,7 +60,7 @@ func (a *assemblerec) getConstructor() (*erasurecode.Constructor, error) {
} }
func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
parts := a.retrieveParts(ctx, true) parts := a.retrieveParts()
c, err := a.getConstructor() c, err := a.getConstructor()
if err != nil { if err != nil {
return nil, err return nil, err
@ -78,7 +73,7 @@ func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter
} }
func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
parts := a.retrieveParts(ctx, false) parts := a.retrieveParts()
c, err := a.getConstructor() c, err := a.getConstructor()
if err != nil { if err != nil {
return nil, err return nil, err
@ -101,7 +96,7 @@ func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter)
} }
func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
parts := a.retrieveParts(ctx, false) parts := a.retrieveParts()
c, err := a.getConstructor() c, err := a.getConstructor()
if err != nil { if err != nil {
return nil, err return nil, err
@ -119,41 +114,17 @@ func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter
return obj, err return obj, err
} }
func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object { func (a *assemblerec) retrieveParts() []*objectSDK.Object {
parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total)) parts := make([]*objectSDK.Object, a.getTotalCount())
errGroup, ctx := errgroup.WithContext(mainCtx) for idx := range parts {
parts[idx] = a.ecParts[uint32(idx)]
for i := range a.ecInfo.Chunks {
chunk := a.ecInfo.Chunks[i]
errGroup.Go(func() error {
objID := new(oid.ID)
err := objID.ReadFromV2(chunk.ID)
if err != nil {
return fmt.Errorf("invalid object ID: %w", err)
}
var obj *objectSDK.Object
if headOnly {
obj, err = a.objGetter.HeadObject(ctx, *objID)
if err != nil {
a.log.Debug(logs.GetUnableToHeadPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
} else {
sow := NewSimpleObjectWriter()
obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow)
if err != nil {
a.log.Debug(logs.GetUnableToGetPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
obj.SetPayload(sow.pld)
}
parts[chunk.Index] = obj
return nil
})
}
if err := errGroup.Wait(); err != nil {
a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err))
} }
return parts return parts
} }
func (a *assemblerec) getTotalCount() uint32 {
for _, ch := range a.ecChunks {
return ch.Total
}
return 0
}

View file

@ -2,10 +2,21 @@ package getsvc
import ( import (
"context" "context"
"encoding/hex"
"errors"
"fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/sync/errgroup"
) )
func (r *request) executeOnContainer(ctx context.Context) { func (r *request) executeOnContainer(ctx context.Context) {
@ -48,7 +59,7 @@ func (r *request) processCurrentEpoch(ctx context.Context) bool {
zap.Uint64("number", r.curProcEpoch), zap.Uint64("number", r.curProcEpoch),
) )
traverser, ok := r.generateTraverser(r.address()) traverser, cnr, ok := r.generateTraverser(r.address())
if !ok { if !ok {
return true return true
} }
@ -58,6 +69,13 @@ func (r *request) processCurrentEpoch(ctx context.Context) bool {
r.status = statusUndefined r.status = statusUndefined
if policy.IsECPlacement(cnr.Value.PlacementPolicy()) {
return r.processECNodes(ctx, traverser, policy.ECDataCount(cnr.Value.PlacementPolicy()), policy.ECParityCount(cnr.Value.PlacementPolicy()))
}
return r.processRepNodes(ctx, traverser)
}
func (r *request) processRepNodes(ctx context.Context, traverser *placement.Traverser) bool {
for { for {
addrs := traverser.Next() addrs := traverser.Next()
if len(addrs) == 0 { if len(addrs) == 0 {
@ -84,10 +102,211 @@ func (r *request) processCurrentEpoch(ctx context.Context) bool {
client.NodeInfoFromNetmapElement(&info, addrs[i]) client.NodeInfoFromNetmapElement(&info, addrs[i])
if r.processNode(ctx, info) { if r.processRepNode(ctx, info) {
r.log.Debug(logs.GetCompletingTheOperation) r.log.Debug(logs.GetCompletingTheOperation)
return true return true
} }
} }
} }
} }
func (r *request) processECNodes(ctx context.Context, traverser *placement.Traverser, dataCount, parityCount int) bool {
ctx, span := tracing.StartSpanFromContext(ctx, "getService.processECNodes")
defer span.End()
if !r.isRaw() && len(r.partsEC) >= dataCount {
return true
}
if r.isRaw() && len(r.infoEC) == dataCount+parityCount {
return true
}
err := r.traverseECNodes(ctx, traverser, dataCount)
var errSplitInfo *objectSDK.SplitInfoError
var errECInfo *objectSDK.ECInfoError
var errRemoved *apistatus.ObjectAlreadyRemoved
var errOutOfRange *apistatus.ObjectOutOfRange
var errSuccess *ecGetSuccessErr
switch {

What is the benefit of using a separate error type for successfully performed requests?
Go has multiple value returns can we use them?

What is the benefit of using a separate error type for successfully performed requests? Go has multiple value returns can we use them?

To cancel errgroup's context and do not run other requests: if eg.Go's argument returns error, then errgroup cancels context. Of course it is possible to make the same thing without errgroup, but it will be not so simple.
Also added ctx.Done check.

To cancel `errgroup`'s context and do not run other requests: if `eg.Go`'s argument returns error, then errgroup cancels context. Of course it is possible to make the same thing without errgroup, but it will be not so simple. Also added `ctx.Done` check.

It can be done with a special error, e.g. errStopIteration is what we use somewhere.

It can be done with a special error, e.g. `errStopIteration` is what we use somewhere.

But ecGetSuccessErr is special error used only to stop EC handling and pass result object.

But `ecGetSuccessErr` is special error used only to stop EC handling and pass result object.
case err == nil: // nil is returned if all nodes failed or incomplete EC info received
if len(r.infoEC) > 0 {
r.status = statusEC
fyrchik marked this conversation as resolved Outdated

s/returns/is returned/

`s/returns/is returned/`

done

done
r.err = r.createECInfoError()
} else {
r.status = statusUndefined
r.err = new(apistatus.ObjectNotFound)
}
case errors.As(err, &errRemoved):
r.status = statusINHUMED
r.err = errRemoved
case errors.As(err, &errOutOfRange):
r.status = statusOutOfRange
r.err = errOutOfRange
case errors.As(err, &errSplitInfo):
r.status = statusVIRTUAL
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errECInfo):
r.status = statusEC
acid-ant marked this conversation as resolved Outdated

You are canceling context here and using it below for r.writeCollectedObject(ctx), looks wrong.

You are canceling context here and using it below for `r.writeCollectedObject(ctx)`, looks wrong.

Right, fixed.

Right, fixed.
r.err = err
case errors.As(err, &errSuccess):
r.status = statusOK
r.err = nil
if errSuccess.Object != nil {
r.collectedObject = errSuccess.Object
r.writeCollectedObject(ctx)
}
}
return r.status != statusUndefined
}
func (r *request) traverseECNodes(ctx context.Context, traverser *placement.Traverser, dataCount int) error {
nodes := make(chan placement.Node, dataCount)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
aarifullin marked this conversation as resolved Outdated

[Optional]

What do you think: won't this approach cause goroutine spam if too many get-like requests are processed at the same time? Using errgroup would be safe I guess

[Optional] What do you think: won't this approach cause goroutine spam if too many get-like requests are processed at the same time? Using `errgroup` would be safe I guess

I didn't catch the thought.
errgroup creates the same goroutines.

I didn't catch the thought. `errgroup` creates the same goroutines.

errgroup creates the same goroutines.

Yes, it does but with the fixed number of workers.

Nevermind, sorry. I incorrecly read this part:

go func() {
		defer wg.Done()

		for {
			batch := traverser.Next()
			if len(batch) == 0 {
				r.log.Debug(logs.NoMoreNodesAbortPlacementIteration)
				close(nodes)
				return
			}
			for _, node := range batch {
				select {
				case <-ctx.Done():
					return
				case nodes <- node:
				}
			}
		}
	}()

I've mistaken this goroutine generation within for-loop

> errgroup creates the same goroutines. Yes, it does but with the fixed number of workers. Nevermind, sorry. I incorrecly read this part: ```go go func() { defer wg.Done() for { batch := traverser.Next() if len(batch) == 0 { r.log.Debug(logs.NoMoreNodesAbortPlacementIteration) close(nodes) return } for _, node := range batch { select { case <-ctx.Done(): return case nodes <- node: } } } }() ``` I've mistaken this goroutine generation within for-loop
for {
batch := traverser.Next()
if len(batch) == 0 {
r.log.Debug(logs.NoMoreNodesAbortPlacementIteration)
close(nodes)
return
}
for _, node := range batch {
select {
case <-ctx.Done():
return
case nodes <- node:
}
}
}
}()
err := r.processECNodesRequests(ctx, nodes, dataCount)
cancel()
wg.Wait()
return err
}
func (r *request) processECNodesRequests(ctx context.Context, nodes <-chan placement.Node, dataCount int) error {
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(dataCount)
for node := range nodes {
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node)
eg.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
rs, err := r.remoteStorageConstructor.Get(info)
if err != nil {
r.log.Debug(logs.GetCouldNotConstructRemoteNodeClient, zap.String("node_key", hex.EncodeToString(info.PublicKey())))
return err

So for each node we call getRemote with the same parameters.
Where is the place in code where we do combine chunks in the final object?

So for each node we call `getRemote` with the same parameters. Where is the place in code where we do combine chunks in the final object?

func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object {

https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/3bf6e6dde60f15f4f90caf44eda429e8f6269cc1/pkg/services/object/get/assemblerec.go#L122

I have realised where my confusion comes from: we have 2 steps:

  1. Get ECInfo from enough nodes
  2. Pull EC chunks

The "enough nodes" is defined on step 1, even though nodes may go down at step 2. In this case we will fail to fetch object?
The situation will happen during failover tests.

I have realised where my confusion comes from: we have 2 steps: 1. Get ECInfo from enough nodes 2. Pull EC chunks The "enough nodes" is defined on step 1, even though nodes may go down at step 2. In this case we will fail to fetch object? The situation _will_ happen during failover tests.

Right, if node fails between step 1 and step 2, then such request will fail.

Right, if node fails between step 1 and step 2, then such request will fail.
}
obj, err := r.getRemote(ctx, rs, info)
var errSplitInfo *objectSDK.SplitInfoError
var errECInfo *objectSDK.ECInfoError
var errRemoved *apistatus.ObjectAlreadyRemoved
var errOutOfRange *apistatus.ObjectOutOfRange
switch {
default:
// something failed, continue
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
return nil
case err == nil:
// non EC object found (tombstone, linking, lock), stop
return &ecGetSuccessErr{Object: obj}
case errors.As(err, &errRemoved) || errors.As(err, &errOutOfRange) || errors.As(err, &errSplitInfo):
// non EC error found, stop
return err
case errors.As(err, &errECInfo):
if r.isRaw() {
return r.appendECChunksWithCheck(errECInfo)

Could you explain this line a bit? Why do we NOT return error if the number of chunks is not equal to the expected?

Could you explain this line a bit? Why do we NOT return error if the number of chunks is not equal to the expected?

In case of HEAD request with raw flag it is expected to get raw EC info error with all chunks.

In case of `HEAD` request with `raw` flag it is expected to get raw EC info error with all chunks.
}
return r.getECChunksRemote(ctx, errECInfo, info, dataCount)
}
})
}
return eg.Wait()
}
func (r *request) appendECChunksWithCheck(errECInfo *objectSDK.ECInfoError) error {
r.ecGuard.Lock()
defer r.ecGuard.Unlock()
for _, ch := range errECInfo.ECInfo().Chunks {
r.infoEC[ch.Index] = objectSDK.ECChunk(ch)
}
if len(r.infoEC) == int(errECInfo.ECInfo().Chunks[0].Total) {
return r.createECInfoError()
}
return nil
}
func (r *request) getECChunksRemote(ctx context.Context, errECInfo *objectSDK.ECInfoError, info client.NodeInfo, dataCount int) error {
for _, ch := range errECInfo.ECInfo().Chunks {
var objID oid.ID
err := objID.ReadFromV2(ch.ID)
if err != nil {
return fmt.Errorf("invalid object ID: %w", err)
}
var address oid.Address
address.SetContainer(r.containerID())
address.SetObject(objID)
var obj *objectSDK.Object
if r.headOnly() {
obj, err = r.headObjectFromNode(ctx, address, info)
} else {
obj, err = r.getObjectFromNode(ctx, address, info)
}
if err != nil {
r.log.Warn(logs.GetUnableToGetECPartRemote, zap.Error(err), zap.Stringer("part_address", address),
zap.String("node", hex.EncodeToString(info.PublicKey())))
continue
}
if err := r.appendECChunkAndObjectWithCheck(objectSDK.ECChunk(ch), obj, dataCount); err != nil {
return err
}
}
return nil
}
func (r *request) appendECChunkAndObjectWithCheck(chunk objectSDK.ECChunk, object *objectSDK.Object, dataCount int) error {
if object == nil {
return nil
}
r.ecGuard.Lock()
defer r.ecGuard.Unlock()
r.infoEC[chunk.Index] = chunk
r.partsEC[chunk.Index] = object
if len(r.infoEC) >= dataCount && len(r.partsEC) >= dataCount {
return r.createECInfoError()
}
return nil
}
func (r *request) createECInfoError() error {
ecInfo := objectSDK.NewECInfo()
for _, chunk := range r.infoEC {
ecInfo.AddChunk(objectSDK.ECChunk(chunk))
}
return objectSDK.NewECInfoError(ecInfo)
}
type ecGetSuccessErr struct {
Object *objectSDK.Object
}
func (s *ecGetSuccessErr) Error() string { return "" }

View file

@ -77,7 +77,8 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error {
prm: prm, prm: prm,
infoSplit: objectSDK.NewSplitInfo(), infoSplit: objectSDK.NewSplitInfo(),
infoEC: objectSDK.NewECInfo(), infoEC: make(map[uint32]objectSDK.ECChunk),
partsEC: make(map[uint32]*objectSDK.Object),
log: s.log, log: s.log,
} }

View file

@ -11,6 +11,7 @@ import (
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
@ -78,7 +79,7 @@ func newTestStorage() *testStorage {
} }
} }
func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, error) { func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, *containerCore.Container, error) {
opts := make([]placement.Option, 0, 4) opts := make([]placement.Option, 0, 4)
opts = append(opts, opts = append(opts,
placement.ForContainer(g.c), placement.ForContainer(g.c),
@ -90,7 +91,10 @@ func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e ui
opts = append(opts, placement.ForObject(*obj)) opts = append(opts, placement.ForObject(*obj))
} }
return placement.NewTraverser(opts...) t, err := placement.NewTraverser(opts...)
return t, &containerCore.Container{
Value: g.c,
}, err
} }
func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
@ -273,6 +277,16 @@ func (ks *testKeyStorage) GetKey(_ *util.SessionInfo) (*ecdsa.PrivateKey, error)
return &ecdsa.PrivateKey{}, nil return &ecdsa.PrivateKey{}, nil
} }
type testContainerSource struct{}
func (s *testContainerSource) Get(idCnr cid.ID) (*containerCore.Container, error) {
return &containerCore.Container{
Value: container.Container{},
}, nil
}
func (s *testContainerSource) DeletionInfo(cid.ID) (*containerCore.DelInfo, error) { return nil, nil }
func TestGetLocalOnly(t *testing.T) { func TestGetLocalOnly(t *testing.T) {
ctx := context.Background() ctx := context.Background()
@ -551,6 +565,7 @@ func TestGetRemoteSmall(t *testing.T) {
epochSource: testEpochReceiver(curEpoch), epochSource: testEpochReceiver(curEpoch),
remoteStorageConstructor: c, remoteStorageConstructor: c,
keyStore: &testKeyStorage{}, keyStore: &testKeyStorage{},
containerSource: &testContainerSource{},
} }
} }
@ -1722,6 +1737,7 @@ func TestGetRange(t *testing.T) {
epochSource: testEpochReceiver(curEpoch), epochSource: testEpochReceiver(curEpoch),
remoteStorageConstructor: c, remoteStorageConstructor: c,
keyStore: &testKeyStorage{}, keyStore: &testKeyStorage{},
containerSource: &testContainerSource{},
} }
} }
@ -1879,7 +1895,8 @@ func TestGetFromPastEpoch(t *testing.T) {
as[1][1]: c22, as[1][1]: c22,
}, },
}, },
keyStore: &testKeyStorage{}, keyStore: &testKeyStorage{},
containerSource: &testContainerSource{},
} }
w := NewSimpleObjectWriter() w := NewSimpleObjectWriter()

View file

@ -3,12 +3,13 @@ package getsvc
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -45,9 +46,19 @@ func (r *request) executeLocal(ctx context.Context) {
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo()) mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
r.err = objectSDK.NewSplitInfoError(r.infoSplit) r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errECInfo): case errors.As(err, &errECInfo):
if r.isRaw() {
r.appendECChunks(errECInfo)
r.status = statusEC
r.err = r.createECInfoError()
break
}
if err := r.getECChunksLocal(ctx, errECInfo); err != nil {
r.status = statusUndefined
r.err = err
break
}
r.status = statusEC r.status = statusEC
util.MergeECInfo(errECInfo.ECInfo(), r.infoEC) r.err = r.createECInfoError()
r.err = objectSDK.NewECInfoError(r.infoEC)
case errors.As(err, &errOutOfRange): case errors.As(err, &errOutOfRange):
r.status = statusOutOfRange r.status = statusOutOfRange
r.err = errOutOfRange r.err = errOutOfRange
@ -63,3 +74,49 @@ func (r *request) get(ctx context.Context) (*objectSDK.Object, error) {
} }
return r.localStorage.Get(ctx, r.address()) return r.localStorage.Get(ctx, r.address())
} }
func (r *request) appendECChunks(errECInfo *objectSDK.ECInfoError) {
r.ecGuard.Lock()
defer r.ecGuard.Unlock()
for _, ch := range errECInfo.ECInfo().Chunks {
r.infoEC[ch.Index] = objectSDK.ECChunk(ch)
}
}
func (r *request) appendECChunkAndObject(chunk objectSDK.ECChunk, object *objectSDK.Object) {
if object == nil {
return
}
r.ecGuard.Lock()
defer r.ecGuard.Unlock()
r.infoEC[chunk.Index] = chunk
r.partsEC[chunk.Index] = object
}
func (r *request) getECChunksLocal(ctx context.Context, errECInfo *objectSDK.ECInfoError) error {
for _, ch := range errECInfo.ECInfo().Chunks {
var objID oid.ID
err := objID.ReadFromV2(ch.ID)
if err != nil {
return fmt.Errorf("invalid object ID: %w", err)
}
var address oid.Address
address.SetContainer(r.containerID())
address.SetObject(objID)
var obj *objectSDK.Object
if r.headOnly() {
obj, err = r.localStorage.Head(ctx, address, false)
} else {
obj, err = r.localStorage.Get(ctx, address)
}
if err != nil {
r.log.Warn(logs.GetUnableToGetECPartLocal, zap.Error(err), zap.Stringer("part_address", address))
continue
}
r.appendECChunkAndObject(objectSDK.ECChunk(ch), obj)
}
return nil
}

View file

@ -7,16 +7,17 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap" "go.uber.org/zap"
) )
func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool { var errECObjectInRepContainer = errors.New("found erasure-coded object in REP container")
ctx, span := tracing.StartSpanFromContext(ctx, "getService.processNode")
func (r *request) processRepNode(ctx context.Context, info client.NodeInfo) bool {
ctx, span := tracing.StartSpanFromContext(ctx, "getService.processRepNode")
defer span.End() defer span.End()
r.log.Debug(logs.ProcessingNode, zap.String("node_key", hex.EncodeToString(info.PublicKey()))) r.log.Debug(logs.ProcessingNode, zap.String("node_key", hex.EncodeToString(info.PublicKey())))
@ -36,11 +37,6 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
switch { switch {
default: default:
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err)) r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
if r.status == statusEC {
// we need to continue getting another chunks from another nodes
// in case of network issue
return false
}
r.status = statusUndefined r.status = statusUndefined
r.err = new(apistatus.ObjectNotFound) r.err = new(apistatus.ObjectNotFound)
case err == nil: case err == nil:
@ -65,19 +61,10 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo()) mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
r.err = objectSDK.NewSplitInfoError(r.infoSplit) r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errECInfo): case errors.As(err, &errECInfo):
r.status = statusEC r.log.Error(logs.GetECObjectInRepContainer, zap.Stringer("address", r.address()))
util.MergeECInfo(r.infoEC, errECInfo.ECInfo()) r.status = statusUndefined
r.infoEC = errECInfo.ECInfo() r.err = errECObjectInRepContainer
r.err = objectSDK.NewECInfoError(r.infoEC) return true
acid-ant marked this conversation as resolved Outdated

Looks like part of the code in the default section is no more needed:

		if r.status == statusEC {
			// we need to continue getting another chunks  from another nodes
...
Looks like part of the code in the `default` section is no more needed: ``` if r.status == statusEC { // we need to continue getting another chunks from another nodes ... ```

done

done
if r.isRaw() {
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
}
cnt, err := r.containerSource.Get(r.address().Container())
if err == nil {
return len(r.infoEC.Chunks) == policy.ECDataCount(cnt.Value.PlacementPolicy())
}
r.log.Debug(logs.GetUnableToGetECObjectContainer, zap.Error(err))
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
} }
return r.status != statusUndefined return r.status != statusUndefined
@ -116,3 +103,49 @@ func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.N
return rs.Get(ctx, r.address(), prm) return rs.Get(ctx, r.address(), prm)
} }
func (r *request) getObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error) {
rs, err := r.remoteStorageConstructor.Get(info)
if err != nil {
return nil, err
}
key, err := r.key()
if err != nil {
return nil, err
}
prm := RemoteRequestParams{
Epoch: r.curProcEpoch,
TTL: 1,
PrivateKey: key,
SessionToken: r.prm.common.SessionToken(),
BearerToken: r.prm.common.BearerToken(),
XHeaders: r.prm.common.XHeaders(),
}
return rs.Get(ctx, addr, prm)
}
func (r *request) headObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error) {
rs, err := r.remoteStorageConstructor.Get(info)
if err != nil {
return nil, err
}
key, err := r.key()
if err != nil {
return nil, err
}
prm := RemoteRequestParams{
Epoch: r.curProcEpoch,
TTL: 1,
PrivateKey: key,
SessionToken: r.prm.common.SessionToken(),
BearerToken: r.prm.common.BearerToken(),
XHeaders: r.prm.common.XHeaders(),
}
return rs.Head(ctx, addr, prm)
}

View file

@ -3,6 +3,7 @@ package getsvc
import ( import (
"context" "context"
"crypto/ecdsa" "crypto/ecdsa"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
@ -23,7 +24,9 @@ type request struct {
infoSplit *objectSDK.SplitInfo infoSplit *objectSDK.SplitInfo
infoEC *objectSDK.ECInfo ecGuard sync.Mutex
infoEC map[uint32]objectSDK.ECChunk
partsEC map[uint32]*objectSDK.Object
log *logger.Logger log *logger.Logger
@ -138,22 +141,19 @@ func (r *request) initEpoch() bool {
} }
} }
func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, bool) { func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, *container.Container, bool) {
obj := addr.Object() obj := addr.Object()
t, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch) t, cnr, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch)
if err != nil {
switch {
default:
r.status = statusUndefined r.status = statusUndefined
r.err = err r.err = err
r.log.Debug(logs.GetCouldNotGenerateContainerTraverser, zap.Error(err)) r.log.Debug(logs.GetCouldNotGenerateContainerTraverser, zap.Error(err))
return nil, false return nil, nil, false
case err == nil:
return t, true
} }
return t, cnr, true
} }
func (r *request) getRemoteStorage(info clientcore.NodeInfo) (remoteStorage, bool) { func (r *request) getRemoteStorage(info clientcore.NodeInfo) (remoteStorage, bool) {
@ -223,7 +223,7 @@ func (r *request) writeCollectedObject(ctx context.Context) {
// isForwardingEnabled returns true if common execution // isForwardingEnabled returns true if common execution
// parameters has request forwarding closure set. // parameters has request forwarding closure set.
func (r request) isForwardingEnabled() bool { func (r *request) isForwardingEnabled() bool {
return r.prm.forwarder != nil return r.prm.forwarder != nil
} }

View file

@ -6,6 +6,7 @@ import (
"errors" "errors"
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
@ -23,7 +24,7 @@ type epochSource interface {
} }
type traverserGenerator interface { type traverserGenerator interface {
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error) GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error)
} }
type keyStorage interface { type keyStorage interface {

View file

@ -47,7 +47,7 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) error {
zap.Uint64("number", exec.curProcEpoch), zap.Uint64("number", exec.curProcEpoch),
) )
traverser, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch) traverser, _, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch)
if err != nil { if err != nil {
return fmt.Errorf("%s: %w", logs.SearchCouldNotGenerateContainerTraverser, err) return fmt.Errorf("%s: %w", logs.SearchCouldNotGenerateContainerTraverser, err)
} }

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
@ -80,12 +81,13 @@ func newTestStorage() *testStorage {
} }
} }
func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, error) { func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, *containerCore.Container, error) {
return placement.NewTraverser( t, err := placement.NewTraverser(
placement.ForContainer(g.c), placement.ForContainer(g.c),
placement.UseBuilder(g.b[epoch]), placement.UseBuilder(g.b[epoch]),
placement.WithoutSuccessTracking(), placement.WithoutSuccessTracking(),
) )
return t, &containerCore.Container{Value: g.c}, err
} }
func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
@ -45,7 +46,7 @@ type cfg struct {
} }
traverserGenerator interface { traverserGenerator interface {
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error) GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error)
} }
currentEpochReceiver interface { currentEpochReceiver interface {

View file

@ -122,17 +122,17 @@ func (g *TraverserGenerator) WithTraverseOptions(opts ...placement.Option) *Trav
// GenerateTraverser generates placement Traverser for provided object address // GenerateTraverser generates placement Traverser for provided object address
// using epoch-th network map. // using epoch-th network map.
func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, error) { func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, *container.Container, error) {
// get network map by epoch // get network map by epoch
nm, err := g.netMapSrc.GetNetMapByEpoch(epoch) nm, err := g.netMapSrc.GetNetMapByEpoch(epoch)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get network map #%d: %w", epoch, err) return nil, nil, fmt.Errorf("could not get network map #%d: %w", epoch, err)
} }
// get container related container // get container related container
cnr, err := g.cnrSrc.Get(idCnr) cnr, err := g.cnrSrc.Get(idCnr)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get container: %w", err) return nil, nil, fmt.Errorf("could not get container: %w", err)
} }
// allocate placement traverser options // allocate placement traverser options
@ -160,5 +160,9 @@ func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoc
) )
} }
return placement.NewTraverser(traverseOpts...) t, err := placement.NewTraverser(traverseOpts...)
if err != nil {
return nil, nil, err
}
return t, cnr, nil
} }