WIP: Process EC container for Get/GetRange/Head concurrently #1237
|
@ -104,10 +104,9 @@ const (
|
|||
GetTryingToAssembleTheECObject = "trying to assemble the ec object..."
|
||||
GetAssemblingSplittedObject = "assembling splitted object..."
|
||||
GetAssemblingECObject = "assembling erasure-coded object..."
|
||||
GetUnableToGetAllPartsECObject = "unable to get all parts, continue to reconstruct with existed"
|
||||
GetUnableToGetPartECObject = "unable to get part of the erasure-encoded object"
|
||||
GetUnableToHeadPartECObject = "unable to head part of the erasure-encoded object"
|
||||
GetUnableToGetECObjectContainer = "unable to get container for erasure-coded object"
|
||||
GetUnableToGetECPartLocal = "failed to get EC part from local storage"
|
||||
GetUnableToGetECPartRemote = "failed to get EC part from remote node"
|
||||
GetECObjectInRepContainer = "found erasure-coded object in REP container"
|
||||
GetAssemblingSplittedObjectCompleted = "assembling splitted object completed"
|
||||
GetAssemblingECObjectCompleted = "assembling erasure-coded object completed"
|
||||
GetFailedToAssembleSplittedObject = "failed to assemble splitted object"
|
||||
|
@ -123,6 +122,7 @@ const (
|
|||
GetRequestedObjectIsVirtual = "requested object is virtual"
|
||||
GetRequestedObjectIsEC = "requested object is erasure-coded"
|
||||
GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds"
|
||||
GetCouldNotGetContainer = "could not get container"
|
||||
PutAdditionalContainerBroadcastFailure = "additional container broadcast failure"
|
||||
SearchReturnResultDirectly = "return result directly"
|
||||
SearchCouldNotConstructRemoteNodeClient = "could not construct remote node client"
|
||||
|
|
|
@ -140,7 +140,8 @@ func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm Reque
|
|||
|
||||
prm: prm,
|
||||
infoSplit: objectSDK.NewSplitInfo(),
|
||||
infoEC: objectSDK.NewECInfo(),
|
||||
infoEC: make(map[uint32]objectSDK.ECChunk),
|
||||
partsEC: make(map[uint32]*objectSDK.Object),
|
||||
log: r.log,
|
||||
}
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@ func (r *request) assembleEC(ctx context.Context) {
|
|||
r.log.Debug(logs.GetTryingToAssembleTheECObject)
|
||||
|
||||
r.prm.common = r.prm.common.WithLocalOnly(false)
|
||||
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log)
|
||||
assembler := newAssemblerEC(r.address(), r.partsEC, r.infoEC, r.ctxRange(), r.headOnly(), r.containerSource)
|
||||
|
||||
r.log.Debug(logs.GetAssemblingECObject,
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
|
@ -47,7 +47,7 @@ func (r *request) assembleEC(ctx context.Context) {
|
|||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||
)
|
||||
|
||||
obj, err := assembler.Assemble(ctx, r.prm.objWriter, r.headOnly())
|
||||
obj, err := assembler.Assemble(ctx, r.prm.objWriter)
|
||||
if err != nil {
|
||||
r.log.Warn(logs.GetFailedToAssembleECObject,
|
||||
zap.Error(err),
|
||||
|
|
|
@ -2,51 +2,46 @@ package getsvc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
type assemblerec struct {
|
||||
addr oid.Address
|
||||
ecInfo *objectSDK.ECInfo
|
||||
rng *objectSDK.Range
|
||||
objGetter objectGetter
|
||||
cs container.Source
|
||||
log *logger.Logger
|
||||
addr oid.Address
|
||||
ecParts map[uint32]*objectSDK.Object
|
||||
ecChunks map[uint32]objectSDK.ECChunk
|
||||
rng *objectSDK.Range
|
||||
head bool
|
||||
cs container.Source
|
||||
}
|
||||
|
||||
func newAssemblerEC(
|
||||
addr oid.Address,
|
||||
ecInfo *objectSDK.ECInfo,
|
||||
ecParts map[uint32]*objectSDK.Object,
|
||||
ecChunks map[uint32]objectSDK.ECChunk,
|
||||
rng *objectSDK.Range,
|
||||
objGetter objectGetter,
|
||||
head bool,
|
||||
cs container.Source,
|
||||
log *logger.Logger,
|
||||
) *assemblerec {
|
||||
return &assemblerec{
|
||||
addr: addr,
|
||||
rng: rng,
|
||||
ecInfo: ecInfo,
|
||||
objGetter: objGetter,
|
||||
cs: cs,
|
||||
log: log,
|
||||
addr: addr,
|
||||
rng: rng,
|
||||
head: head,
|
||||
ecParts: ecParts,
|
||||
ecChunks: ecChunks,
|
||||
cs: cs,
|
||||
}
|
||||
}
|
||||
|
||||
// Assemble assembles erasure-coded object and writes it's content to ObjectWriter.
|
||||
// It returns parent object.
|
||||
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) {
|
||||
if headOnly {
|
||||
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
if a.head {
|
||||
return a.reconstructHeader(ctx, writer)
|
||||
} else if a.rng != nil {
|
||||
return a.reconstructRange(ctx, writer)
|
||||
|
@ -65,7 +60,7 @@ func (a *assemblerec) getConstructor() (*erasurecode.Constructor, error) {
|
|||
}
|
||||
|
||||
func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
parts := a.retrieveParts(ctx, true)
|
||||
parts := a.retrieveParts()
|
||||
c, err := a.getConstructor()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -78,7 +73,7 @@ func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter
|
|||
}
|
||||
|
||||
func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
parts := a.retrieveParts(ctx, false)
|
||||
parts := a.retrieveParts()
|
||||
c, err := a.getConstructor()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -101,7 +96,7 @@ func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter)
|
|||
}
|
||||
|
||||
func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
parts := a.retrieveParts(ctx, false)
|
||||
parts := a.retrieveParts()
|
||||
c, err := a.getConstructor()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -119,41 +114,17 @@ func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter
|
|||
return obj, err
|
||||
}
|
||||
|
||||
func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object {
|
||||
parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total))
|
||||
errGroup, ctx := errgroup.WithContext(mainCtx)
|
||||
|
||||
for i := range a.ecInfo.Chunks {
|
||||
chunk := a.ecInfo.Chunks[i]
|
||||
errGroup.Go(func() error {
|
||||
objID := new(oid.ID)
|
||||
err := objID.ReadFromV2(chunk.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid object ID: %w", err)
|
||||
}
|
||||
var obj *objectSDK.Object
|
||||
if headOnly {
|
||||
obj, err = a.objGetter.HeadObject(ctx, *objID)
|
||||
if err != nil {
|
||||
a.log.Debug(logs.GetUnableToHeadPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
sow := NewSimpleObjectWriter()
|
||||
obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow)
|
||||
if err != nil {
|
||||
a.log.Debug(logs.GetUnableToGetPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
obj.SetPayload(sow.pld)
|
||||
}
|
||||
parts[chunk.Index] = obj
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
if err := errGroup.Wait(); err != nil {
|
||||
a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err))
|
||||
func (a *assemblerec) retrieveParts() []*objectSDK.Object {
|
||||
parts := make([]*objectSDK.Object, a.getTotalCount())
|
||||
for idx := range parts {
|
||||
parts[idx] = a.ecParts[uint32(idx)]
|
||||
}
|
||||
return parts
|
||||
}
|
||||
|
||||
func (a *assemblerec) getTotalCount() uint32 {
|
||||
for _, ch := range a.ecChunks {
|
||||
return ch.Total
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
|
|
@ -2,10 +2,21 @@ package getsvc
|
|||||
|
||||||
import (
|
||||||
"context"
|
||||||
"encoding/hex"
|
||||||
"errors"
|
||||||
"fmt"
|
||||||
"sync"
|
||||||
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
||||||
"golang.org/x/sync/errgroup"
|
||||||
)
|
||||||
|
||||||
func (r *request) executeOnContainer(ctx context.Context) {
|
||||||
|
@ -48,7 +59,7 @@ func (r *request) processCurrentEpoch(ctx context.Context) bool {
|
|||||
zap.Uint64("number", r.curProcEpoch),
|
||||||
)
|
||||||
|
||||||
traverser, ok := r.generateTraverser(r.address())
|
||||||
traverser, cnr, ok := r.generateTraverser(r.address())
|
||||||
if !ok {
|
||||||
return true
|
||||||
}
|
||||||
|
@ -58,6 +69,13 @@ func (r *request) processCurrentEpoch(ctx context.Context) bool {
|
|||||
|
||||||
r.status = statusUndefined
|
||||||
|
||||||
if policy.IsECPlacement(cnr.Value.PlacementPolicy()) {
|
||||||
return r.processECNodes(ctx, traverser, policy.ECDataCount(cnr.Value.PlacementPolicy()), policy.ECParityCount(cnr.Value.PlacementPolicy()))
|
||||||
}
|
||||||
return r.processRepNodes(ctx, traverser)
|
||||||
}
|
||||||
|
||||||
func (r *request) processRepNodes(ctx context.Context, traverser *placement.Traverser) bool {
|
||||||
for {
|
||||||
addrs := traverser.Next()
|
||||||
if len(addrs) == 0 {
|
||||||
|
@ -84,10 +102,211 @@ func (r *request) processCurrentEpoch(ctx context.Context) bool {
|
|||||
|
||||||
client.NodeInfoFromNetmapElement(&info, addrs[i])
|
||||||
|
||||||
if r.processNode(ctx, info) {
|
||||||
if r.processRepNode(ctx, info) {
|
||||||
r.log.Debug(logs.GetCompletingTheOperation)
|
||||||
return true
|
||||||
}
|
||||||
}
|
||||||
}
|
||||||
}
|
||||||
|
||||||
func (r *request) processECNodes(ctx context.Context, traverser *placement.Traverser, dataCount, parityCount int) bool {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "getService.processECNodes")
|
||||||
defer span.End()
|
||||||
|
||||||
if !r.isRaw() && len(r.partsEC) >= dataCount {
|
||||||
return true
|
||||||
}
|
||||||
if r.isRaw() && len(r.infoEC) == dataCount+parityCount {
|
||||||
return true
|
||||||
}
|
||||||
|
||||||
err := r.traverseECNodes(ctx, traverser, dataCount)
|
||||||
|
||||||
var errSplitInfo *objectSDK.SplitInfoError
|
||||||
var errECInfo *objectSDK.ECInfoError
|
||||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||||
var errOutOfRange *apistatus.ObjectOutOfRange
|
||||||
var errSuccess *ecGetSuccessErr
|
||||||
|
||||||
switch {
|
||||||
|
||||||
case err == nil: // nil is returned if all nodes failed or incomplete EC info received
|
||||||
if len(r.infoEC) > 0 {
|
||||||
r.status = statusEC
|
||||||
fyrchik marked this conversation as resolved
Outdated
fyrchik
commented
`s/returns/is returned/`
dstepanov-yadro
commented
done done
|
||||||
r.err = r.createECInfoError()
|
||||||
} else {
|
||||||
r.status = statusUndefined
|
||||||
r.err = new(apistatus.ObjectNotFound)
|
||||||
}
|
||||||
case errors.As(err, &errRemoved):
|
||||||
r.status = statusINHUMED
|
||||||
r.err = errRemoved
|
||||||
case errors.As(err, &errOutOfRange):
|
||||||
r.status = statusOutOfRange
|
||||||
r.err = errOutOfRange
|
||||||
case errors.As(err, &errSplitInfo):
|
||||||
r.status = statusVIRTUAL
|
||||||
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
||||||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||||
case errors.As(err, &errECInfo):
|
||||||
r.status = statusEC
|
||||||
acid-ant marked this conversation as resolved
Outdated
acid-ant
commented
You are canceling context here and using it below for You are canceling context here and using it below for `r.writeCollectedObject(ctx)`, looks wrong.
dstepanov-yadro
commented
Right, fixed. Right, fixed.
|
||||||
r.err = err
|
||||||
case errors.As(err, &errSuccess):
|
||||||
r.status = statusOK
|
||||||
r.err = nil
|
||||||
if errSuccess.Object != nil {
|
||||||
r.collectedObject = errSuccess.Object
|
||||||
r.writeCollectedObject(ctx)
|
||||||
}
|
||||||
}
|
||||||
return r.status != statusUndefined
|
||||||
}
|
||||||
|
||||||
func (r *request) traverseECNodes(ctx context.Context, traverser *placement.Traverser, dataCount int) error {
|
||||||
nodes := make(chan placement.Node, dataCount)
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
||||||
var wg sync.WaitGroup
|
||||||
wg.Add(1)
|
||||||
go func() {
|
||||||
defer wg.Done()
|
||||||
|
||||||
aarifullin marked this conversation as resolved
Outdated
aarifullin
commented
[Optional] What do you think: won't this approach cause goroutine spam if too many get-like requests are processed at the same time? Using [Optional]
What do you think: won't this approach cause goroutine spam if too many get-like requests are processed at the same time? Using `errgroup` would be safe I guess
dstepanov-yadro
commented
I didn't catch the thought. I didn't catch the thought.
`errgroup` creates the same goroutines.
aarifullin
commented
Yes, it does but with the fixed number of workers. Nevermind, sorry. I incorrecly read this part:
I've mistaken this goroutine generation within for-loop > errgroup creates the same goroutines.
Yes, it does but with the fixed number of workers.
Nevermind, sorry. I incorrecly read this part:
```go
go func() {
defer wg.Done()
for {
batch := traverser.Next()
if len(batch) == 0 {
r.log.Debug(logs.NoMoreNodesAbortPlacementIteration)
close(nodes)
return
}
for _, node := range batch {
select {
case <-ctx.Done():
return
case nodes <- node:
}
}
}
}()
```
I've mistaken this goroutine generation within for-loop
|
||||||
for {
|
||||||
batch := traverser.Next()
|
||||||
if len(batch) == 0 {
|
||||||
r.log.Debug(logs.NoMoreNodesAbortPlacementIteration)
|
||||||
close(nodes)
|
||||||
return
|
||||||
}
|
||||||
for _, node := range batch {
|
||||||
select {
|
||||||
case <-ctx.Done():
|
||||||
return
|
||||||
case nodes <- node:
|
||||||
}
|
||||||
}
|
||||||
}
|
||||||
}()
|
||||||
|
||||||
err := r.processECNodesRequests(ctx, nodes, dataCount)
|
||||||
cancel()
|
||||||
wg.Wait()
|
||||||
return err
|
||||||
}
|
||||||
|
||||||
func (r *request) processECNodesRequests(ctx context.Context, nodes <-chan placement.Node, dataCount int) error {
|
||||||
eg, ctx := errgroup.WithContext(ctx)
|
||||||
eg.SetLimit(dataCount)
|
||||||
for node := range nodes {
|
||||||
var info client.NodeInfo
|
||||||
client.NodeInfoFromNetmapElement(&info, node)
|
||||||
eg.Go(func() error {
|
||||||
select {
|
||||||
case <-ctx.Done():
|
||||||
return ctx.Err()
|
||||||
default:
|
||||||
}
|
||||||
rs, err := r.remoteStorageConstructor.Get(info)
|
||||||
if err != nil {
|
||||||
r.log.Debug(logs.GetCouldNotConstructRemoteNodeClient, zap.String("node_key", hex.EncodeToString(info.PublicKey())))
|
||||||
return err
|
||||||
fyrchik
commented
So for each node we call So for each node we call `getRemote` with the same parameters.
Where is the place in code where we do combine chunks in the final object?
dstepanov-yadro
commented
Line 122 in 3bf6e6d
https://git.frostfs.info/TrueCloudLab/frostfs-node/src/commit/3bf6e6dde60f15f4f90caf44eda429e8f6269cc1/pkg/services/object/get/assemblerec.go#L122
fyrchik
commented
I have realised where my confusion comes from: we have 2 steps:
The "enough nodes" is defined on step 1, even though nodes may go down at step 2. In this case we will fail to fetch object? I have realised where my confusion comes from: we have 2 steps:
1. Get ECInfo from enough nodes
2. Pull EC chunks
The "enough nodes" is defined on step 1, even though nodes may go down at step 2. In this case we will fail to fetch object?
The situation _will_ happen during failover tests.
dstepanov-yadro
commented
Right, if node fails between step 1 and step 2, then such request will fail. Right, if node fails between step 1 and step 2, then such request will fail.
|
||||||
}
|
||||||
obj, err := r.getRemote(ctx, rs, info)
|
||||||
var errSplitInfo *objectSDK.SplitInfoError
|
||||||
var errECInfo *objectSDK.ECInfoError
|
||||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||||
var errOutOfRange *apistatus.ObjectOutOfRange
|
||||||
|
||||||
switch {
|
||||||
default:
|
||||||
// something failed, continue
|
||||||
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
||||||
return nil
|
||||||
case err == nil:
|
||||||
// non EC object found (tombstone, linking, lock), stop
|
||||||
return &ecGetSuccessErr{Object: obj}
|
||||||
case errors.As(err, &errRemoved) || errors.As(err, &errOutOfRange) || errors.As(err, &errSplitInfo):
|
||||||
// non EC error found, stop
|
||||||
return err
|
||||||
case errors.As(err, &errECInfo):
|
||||||
if r.isRaw() {
|
||||||
return r.appendECChunksWithCheck(errECInfo)
|
||||||
fyrchik
commented
Could you explain this line a bit? Why do we NOT return error if the number of chunks is not equal to the expected? Could you explain this line a bit? Why do we NOT return error if the number of chunks is not equal to the expected?
dstepanov-yadro
commented
In case of In case of `HEAD` request with `raw` flag it is expected to get raw EC info error with all chunks.
|
||||||
}
|
||||||
return r.getECChunksRemote(ctx, errECInfo, info, dataCount)
|
||||||
}
|
||||||
})
|
||||||
}
|
||||||
return eg.Wait()
|
||||||
}
|
||||||
|
||||||
func (r *request) appendECChunksWithCheck(errECInfo *objectSDK.ECInfoError) error {
|
||||||
r.ecGuard.Lock()
|
||||||
defer r.ecGuard.Unlock()
|
||||||
for _, ch := range errECInfo.ECInfo().Chunks {
|
||||||
r.infoEC[ch.Index] = objectSDK.ECChunk(ch)
|
||||||
}
|
||||||
if len(r.infoEC) == int(errECInfo.ECInfo().Chunks[0].Total) {
|
||||||
return r.createECInfoError()
|
||||||
}
|
||||||
return nil
|
||||||
}
|
||||||
|
||||||
func (r *request) getECChunksRemote(ctx context.Context, errECInfo *objectSDK.ECInfoError, info client.NodeInfo, dataCount int) error {
|
||||||
for _, ch := range errECInfo.ECInfo().Chunks {
|
||||||
var objID oid.ID
|
||||||
err := objID.ReadFromV2(ch.ID)
|
||||||
if err != nil {
|
||||||
return fmt.Errorf("invalid object ID: %w", err)
|
||||||
}
|
||||||
|
||||||
var address oid.Address
|
||||||
address.SetContainer(r.containerID())
|
||||||
address.SetObject(objID)
|
||||||
var obj *objectSDK.Object
|
||||||
if r.headOnly() {
|
||||||
obj, err = r.headObjectFromNode(ctx, address, info)
|
||||||
} else {
|
||||||
obj, err = r.getObjectFromNode(ctx, address, info)
|
||||||
}
|
||||||
if err != nil {
|
||||||
r.log.Warn(logs.GetUnableToGetECPartRemote, zap.Error(err), zap.Stringer("part_address", address),
|
||||||
zap.String("node", hex.EncodeToString(info.PublicKey())))
|
||||||
continue
|
||||||
}
|
||||||
|
||||||
if err := r.appendECChunkAndObjectWithCheck(objectSDK.ECChunk(ch), obj, dataCount); err != nil {
|
||||||
return err
|
||||||
}
|
||||||
}
|
||||||
return nil
|
||||||
}
|
||||||
|
||||||
func (r *request) appendECChunkAndObjectWithCheck(chunk objectSDK.ECChunk, object *objectSDK.Object, dataCount int) error {
|
||||||
if object == nil {
|
||||||
return nil
|
||||||
}
|
||||||
r.ecGuard.Lock()
|
||||||
defer r.ecGuard.Unlock()
|
||||||
|
||||||
r.infoEC[chunk.Index] = chunk
|
||||||
r.partsEC[chunk.Index] = object
|
||||||
|
||||||
if len(r.infoEC) >= dataCount && len(r.partsEC) >= dataCount {
|
||||||
return r.createECInfoError()
|
||||||
}
|
||||||
return nil
|
||||||
}
|
||||||
|
||||||
func (r *request) createECInfoError() error {
|
||||||
ecInfo := objectSDK.NewECInfo()
|
||||||
for _, chunk := range r.infoEC {
|
||||||
ecInfo.AddChunk(objectSDK.ECChunk(chunk))
|
||||||
}
|
||||||
return objectSDK.NewECInfoError(ecInfo)
|
||||||
}
|
||||||
|
||||||
type ecGetSuccessErr struct {
|
||||||
Object *objectSDK.Object
|
||||||
}
|
||||||
|
||||||
func (s *ecGetSuccessErr) Error() string { return "" }
|
||||||
|
|
|
@ -77,7 +77,8 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error {
|
|||
|
||||
prm: prm,
|
||||
infoSplit: objectSDK.NewSplitInfo(),
|
||||
infoEC: objectSDK.NewECInfo(),
|
||||
infoEC: make(map[uint32]objectSDK.ECChunk),
|
||||
partsEC: make(map[uint32]*objectSDK.Object),
|
||||
log: s.log,
|
||||
}
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
|
@ -78,7 +79,7 @@ func newTestStorage() *testStorage {
|
|||
}
|
||||
}
|
||||
|
||||
func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, error) {
|
||||
func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, *containerCore.Container, error) {
|
||||
opts := make([]placement.Option, 0, 4)
|
||||
opts = append(opts,
|
||||
placement.ForContainer(g.c),
|
||||
|
@ -90,7 +91,10 @@ func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e ui
|
|||
opts = append(opts, placement.ForObject(*obj))
|
||||
}
|
||||
|
||||
return placement.NewTraverser(opts...)
|
||||
t, err := placement.NewTraverser(opts...)
|
||||
return t, &containerCore.Container{
|
||||
Value: g.c,
|
||||
}, err
|
||||
}
|
||||
|
||||
func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
|
||||
|
@ -273,6 +277,16 @@ func (ks *testKeyStorage) GetKey(_ *util.SessionInfo) (*ecdsa.PrivateKey, error)
|
|||
return &ecdsa.PrivateKey{}, nil
|
||||
}
|
||||
|
||||
type testContainerSource struct{}
|
||||
|
||||
func (s *testContainerSource) Get(idCnr cid.ID) (*containerCore.Container, error) {
|
||||
return &containerCore.Container{
|
||||
Value: container.Container{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *testContainerSource) DeletionInfo(cid.ID) (*containerCore.DelInfo, error) { return nil, nil }
|
||||
|
||||
func TestGetLocalOnly(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
|
@ -551,6 +565,7 @@ func TestGetRemoteSmall(t *testing.T) {
|
|||
epochSource: testEpochReceiver(curEpoch),
|
||||
remoteStorageConstructor: c,
|
||||
keyStore: &testKeyStorage{},
|
||||
containerSource: &testContainerSource{},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1722,6 +1737,7 @@ func TestGetRange(t *testing.T) {
|
|||
epochSource: testEpochReceiver(curEpoch),
|
||||
remoteStorageConstructor: c,
|
||||
keyStore: &testKeyStorage{},
|
||||
containerSource: &testContainerSource{},
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1879,7 +1895,8 @@ func TestGetFromPastEpoch(t *testing.T) {
|
|||
as[1][1]: c22,
|
||||
},
|
||||
},
|
||||
keyStore: &testKeyStorage{},
|
||||
keyStore: &testKeyStorage{},
|
||||
containerSource: &testContainerSource{},
|
||||
}
|
||||
|
||||
w := NewSimpleObjectWriter()
|
||||
|
|
|
@ -3,12 +3,13 @@ package getsvc
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -45,9 +46,19 @@ func (r *request) executeLocal(ctx context.Context) {
|
|||
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
||||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||
case errors.As(err, &errECInfo):
|
||||
if r.isRaw() {
|
||||
r.appendECChunks(errECInfo)
|
||||
r.status = statusEC
|
||||
r.err = r.createECInfoError()
|
||||
break
|
||||
}
|
||||
if err := r.getECChunksLocal(ctx, errECInfo); err != nil {
|
||||
r.status = statusUndefined
|
||||
r.err = err
|
||||
break
|
||||
}
|
||||
r.status = statusEC
|
||||
util.MergeECInfo(errECInfo.ECInfo(), r.infoEC)
|
||||
r.err = objectSDK.NewECInfoError(r.infoEC)
|
||||
r.err = r.createECInfoError()
|
||||
case errors.As(err, &errOutOfRange):
|
||||
r.status = statusOutOfRange
|
||||
r.err = errOutOfRange
|
||||
|
@ -63,3 +74,49 @@ func (r *request) get(ctx context.Context) (*objectSDK.Object, error) {
|
|||
}
|
||||
return r.localStorage.Get(ctx, r.address())
|
||||
}
|
||||
|
||||
func (r *request) appendECChunks(errECInfo *objectSDK.ECInfoError) {
|
||||
r.ecGuard.Lock()
|
||||
defer r.ecGuard.Unlock()
|
||||
for _, ch := range errECInfo.ECInfo().Chunks {
|
||||
r.infoEC[ch.Index] = objectSDK.ECChunk(ch)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *request) appendECChunkAndObject(chunk objectSDK.ECChunk, object *objectSDK.Object) {
|
||||
if object == nil {
|
||||
return
|
||||
}
|
||||
r.ecGuard.Lock()
|
||||
defer r.ecGuard.Unlock()
|
||||
|
||||
r.infoEC[chunk.Index] = chunk
|
||||
r.partsEC[chunk.Index] = object
|
||||
}
|
||||
|
||||
func (r *request) getECChunksLocal(ctx context.Context, errECInfo *objectSDK.ECInfoError) error {
|
||||
for _, ch := range errECInfo.ECInfo().Chunks {
|
||||
var objID oid.ID
|
||||
err := objID.ReadFromV2(ch.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid object ID: %w", err)
|
||||
}
|
||||
|
||||
var address oid.Address
|
||||
address.SetContainer(r.containerID())
|
||||
address.SetObject(objID)
|
||||
var obj *objectSDK.Object
|
||||
if r.headOnly() {
|
||||
obj, err = r.localStorage.Head(ctx, address, false)
|
||||
} else {
|
||||
obj, err = r.localStorage.Get(ctx, address)
|
||||
}
|
||||
if err != nil {
|
||||
r.log.Warn(logs.GetUnableToGetECPartLocal, zap.Error(err), zap.Stringer("part_address", address))
|
||||
continue
|
||||
}
|
||||
|
||||
r.appendECChunkAndObject(objectSDK.ECChunk(ch), obj)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -7,16 +7,17 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "getService.processNode")
|
||||
var errECObjectInRepContainer = errors.New("found erasure-coded object in REP container")
|
||||
|
||||
func (r *request) processRepNode(ctx context.Context, info client.NodeInfo) bool {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "getService.processRepNode")
|
||||
defer span.End()
|
||||
|
||||
r.log.Debug(logs.ProcessingNode, zap.String("node_key", hex.EncodeToString(info.PublicKey())))
|
||||
|
@ -36,11 +37,6 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
|||
switch {
|
||||
default:
|
||||
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
||||
if r.status == statusEC {
|
||||
// we need to continue getting another chunks from another nodes
|
||||
// in case of network issue
|
||||
return false
|
||||
}
|
||||
r.status = statusUndefined
|
||||
r.err = new(apistatus.ObjectNotFound)
|
||||
case err == nil:
|
||||
|
@ -65,19 +61,10 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
|||
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
||||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||
case errors.As(err, &errECInfo):
|
||||
r.status = statusEC
|
||||
util.MergeECInfo(r.infoEC, errECInfo.ECInfo())
|
||||
r.infoEC = errECInfo.ECInfo()
|
||||
r.err = objectSDK.NewECInfoError(r.infoEC)
|
||||
if r.isRaw() {
|
||||
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
|
||||
}
|
||||
cnt, err := r.containerSource.Get(r.address().Container())
|
||||
if err == nil {
|
||||
return len(r.infoEC.Chunks) == policy.ECDataCount(cnt.Value.PlacementPolicy())
|
||||
}
|
||||
r.log.Debug(logs.GetUnableToGetECObjectContainer, zap.Error(err))
|
||||
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
|
||||
r.log.Error(logs.GetECObjectInRepContainer, zap.Stringer("address", r.address()))
|
||||
r.status = statusUndefined
|
||||
r.err = errECObjectInRepContainer
|
||||
return true
|
||||
acid-ant marked this conversation as resolved
Outdated
acid-ant
commented
Looks like part of the code in the
Looks like part of the code in the `default` section is no more needed:
```
if r.status == statusEC {
// we need to continue getting another chunks from another nodes
...
```
dstepanov-yadro
commented
done done
|
||||
}
|
||||
|
||||
return r.status != statusUndefined
|
||||
|
@ -116,3 +103,49 @@ func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.N
|
|||
|
||||
return rs.Get(ctx, r.address(), prm)
|
||||
}
|
||||
|
||||
func (r *request) getObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error) {
|
||||
rs, err := r.remoteStorageConstructor.Get(info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
key, err := r.key()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
prm := RemoteRequestParams{
|
||||
Epoch: r.curProcEpoch,
|
||||
TTL: 1,
|
||||
PrivateKey: key,
|
||||
SessionToken: r.prm.common.SessionToken(),
|
||||
BearerToken: r.prm.common.BearerToken(),
|
||||
XHeaders: r.prm.common.XHeaders(),
|
||||
}
|
||||
|
||||
return rs.Get(ctx, addr, prm)
|
||||
}
|
||||
|
||||
func (r *request) headObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error) {
|
||||
rs, err := r.remoteStorageConstructor.Get(info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
key, err := r.key()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
prm := RemoteRequestParams{
|
||||
Epoch: r.curProcEpoch,
|
||||
TTL: 1,
|
||||
PrivateKey: key,
|
||||
SessionToken: r.prm.common.SessionToken(),
|
||||
BearerToken: r.prm.common.BearerToken(),
|
||||
XHeaders: r.prm.common.XHeaders(),
|
||||
}
|
||||
|
||||
return rs.Head(ctx, addr, prm)
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package getsvc
|
|||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
|
@ -23,7 +24,9 @@ type request struct {
|
|||
|
||||
infoSplit *objectSDK.SplitInfo
|
||||
|
||||
infoEC *objectSDK.ECInfo
|
||||
ecGuard sync.Mutex
|
||||
infoEC map[uint32]objectSDK.ECChunk
|
||||
partsEC map[uint32]*objectSDK.Object
|
||||
|
||||
log *logger.Logger
|
||||
|
||||
|
@ -138,22 +141,19 @@ func (r *request) initEpoch() bool {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
|
||||
func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, *container.Container, bool) {
|
||||
obj := addr.Object()
|
||||
|
||||
t, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch)
|
||||
|
||||
switch {
|
||||
default:
|
||||
t, cnr, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch)
|
||||
if err != nil {
|
||||
r.status = statusUndefined
|
||||
r.err = err
|
||||
|
||||
r.log.Debug(logs.GetCouldNotGenerateContainerTraverser, zap.Error(err))
|
||||
|
||||
return nil, false
|
||||
case err == nil:
|
||||
return t, true
|
||||
return nil, nil, false
|
||||
}
|
||||
return t, cnr, true
|
||||
}
|
||||
|
||||
func (r *request) getRemoteStorage(info clientcore.NodeInfo) (remoteStorage, bool) {
|
||||
|
@ -223,7 +223,7 @@ func (r *request) writeCollectedObject(ctx context.Context) {
|
|||
|
||||
// isForwardingEnabled returns true if common execution
|
||||
// parameters has request forwarding closure set.
|
||||
func (r request) isForwardingEnabled() bool {
|
||||
func (r *request) isForwardingEnabled() bool {
|
||||
return r.prm.forwarder != nil
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"errors"
|
||||
|
||||
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
|
@ -23,7 +24,7 @@ type epochSource interface {
|
|||
}
|
||||
|
||||
type traverserGenerator interface {
|
||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
|
||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error)
|
||||
}
|
||||
|
||||
type keyStorage interface {
|
||||
|
|
|
@ -47,7 +47,7 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) error {
|
|||
zap.Uint64("number", exec.curProcEpoch),
|
||||
)
|
||||
|
||||
traverser, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch)
|
||||
traverser, _, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: %w", logs.SearchCouldNotGenerateContainerTraverser, err)
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
|
@ -80,12 +81,13 @@ func newTestStorage() *testStorage {
|
|||
}
|
||||
}
|
||||
|
||||
func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, error) {
|
||||
return placement.NewTraverser(
|
||||
func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, *containerCore.Container, error) {
|
||||
t, err := placement.NewTraverser(
|
||||
placement.ForContainer(g.c),
|
||||
placement.UseBuilder(g.b[epoch]),
|
||||
placement.WithoutSuccessTracking(),
|
||||
)
|
||||
return t, &containerCore.Container{Value: g.c}, err
|
||||
}
|
||||
|
||||
func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
|
@ -45,7 +46,7 @@ type cfg struct {
|
|||
}
|
||||
|
||||
traverserGenerator interface {
|
||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
|
||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error)
|
||||
}
|
||||
|
||||
currentEpochReceiver interface {
|
||||
|
|
|
@ -122,17 +122,17 @@ func (g *TraverserGenerator) WithTraverseOptions(opts ...placement.Option) *Trav
|
|||
|
||||
// GenerateTraverser generates placement Traverser for provided object address
|
||||
// using epoch-th network map.
|
||||
func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, error) {
|
||||
func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, *container.Container, error) {
|
||||
// get network map by epoch
|
||||
nm, err := g.netMapSrc.GetNetMapByEpoch(epoch)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get network map #%d: %w", epoch, err)
|
||||
return nil, nil, fmt.Errorf("could not get network map #%d: %w", epoch, err)
|
||||
}
|
||||
|
||||
// get container related container
|
||||
cnr, err := g.cnrSrc.Get(idCnr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get container: %w", err)
|
||||
return nil, nil, fmt.Errorf("could not get container: %w", err)
|
||||
}
|
||||
|
||||
// allocate placement traverser options
|
||||
|
@ -160,5 +160,9 @@ func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoc
|
|||
)
|
||||
}
|
||||
|
||||
return placement.NewTraverser(traverseOpts...)
|
||||
t, err := placement.NewTraverser(traverseOpts...)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return t, cnr, nil
|
||||
}
|
||||
|
|
What is the benefit of using a separate error type for successfully performed requests?
Go has multiple value returns can we use them?
To cancel
errgroup
's context and do not run other requests: ifeg.Go
's argument returns error, then errgroup cancels context. Of course it is possible to make the same thing without errgroup, but it will be not so simple.Also added
ctx.Done
check.It can be done with a special error, e.g.
errStopIteration
is what we use somewhere.But
ecGetSuccessErr
is special error used only to stop EC handling and pass result object.