WIP: Process EC container for Get/GetRange/Head concurrently #1237

Closed
dstepanov-yadro wants to merge 6 commits from dstepanov-yadro/frostfs-node:fix/ec_get_failover into master
9 changed files with 232 additions and 90 deletions
Showing only changes of commit 50b9dea0fb - Show all commits

View file

@ -104,9 +104,8 @@ const (
GetTryingToAssembleTheECObject = "trying to assemble the ec object..."
GetAssemblingSplittedObject = "assembling splitted object..."
GetAssemblingECObject = "assembling erasure-coded object..."
GetUnableToGetAllPartsECObject = "unable to get all parts, continue to reconstruct with existed"
GetUnableToGetPartECObject = "unable to get part of the erasure-encoded object"
GetUnableToHeadPartECObject = "unable to head part of the erasure-encoded object"
GetUnableToGetECPartLocal = "failed to get EC part from local storage"
GetUnableToGetECPartRemote = "failed to get EC part from remote node"
GetECObjectInRepContainer = "found erasure-coded object in REP container"
GetAssemblingSplittedObjectCompleted = "assembling splitted object completed"
GetAssemblingECObjectCompleted = "assembling erasure-coded object completed"

View file

@ -140,7 +140,8 @@ func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm Reque
prm: prm,
infoSplit: objectSDK.NewSplitInfo(),
infoEC: objectSDK.NewECInfo(),
infoEC: make(map[uint32]objectSDK.ECChunk),
partsEC: make(map[uint32]*objectSDK.Object),
log: r.log,
}

View file

@ -36,7 +36,7 @@ func (r *request) assembleEC(ctx context.Context) {
r.log.Debug(logs.GetTryingToAssembleTheECObject)
r.prm.common = r.prm.common.WithLocalOnly(false)
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log)
assembler := newAssemblerEC(r.address(), r.partsEC, r.infoEC, r.ctxRange(), r.headOnly(), r.containerSource)
r.log.Debug(logs.GetAssemblingECObject,
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
@ -47,7 +47,7 @@ func (r *request) assembleEC(ctx context.Context) {
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
obj, err := assembler.Assemble(ctx, r.prm.objWriter, r.headOnly())
obj, err := assembler.Assemble(ctx, r.prm.objWriter)
if err != nil {
r.log.Warn(logs.GetFailedToAssembleECObject,
zap.Error(err),

View file

@ -2,51 +2,46 @@ package getsvc
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
type assemblerec struct {
addr oid.Address
ecInfo *objectSDK.ECInfo
ecParts map[uint32]*objectSDK.Object
ecChunks map[uint32]objectSDK.ECChunk
rng *objectSDK.Range
objGetter objectGetter
head bool
cs container.Source
log *logger.Logger
}
func newAssemblerEC(
addr oid.Address,
ecInfo *objectSDK.ECInfo,
ecParts map[uint32]*objectSDK.Object,
ecChunks map[uint32]objectSDK.ECChunk,
rng *objectSDK.Range,
objGetter objectGetter,
head bool,
cs container.Source,
log *logger.Logger,
) *assemblerec {
return &assemblerec{
addr: addr,
rng: rng,
ecInfo: ecInfo,
objGetter: objGetter,
head: head,
ecParts: ecParts,
ecChunks: ecChunks,
cs: cs,
log: log,
}
}
// Assemble assembles erasure-coded object and writes it's content to ObjectWriter.
// It returns parent object.
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) {
if headOnly {
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
if a.head {
return a.reconstructHeader(ctx, writer)
} else if a.rng != nil {
return a.reconstructRange(ctx, writer)
@ -65,7 +60,7 @@ func (a *assemblerec) getConstructor() (*erasurecode.Constructor, error) {
}
func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
parts := a.retrieveParts(ctx, true)
parts := a.retrieveParts()
c, err := a.getConstructor()
if err != nil {
return nil, err
@ -78,7 +73,7 @@ func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter
}
func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
parts := a.retrieveParts(ctx, false)
parts := a.retrieveParts()
c, err := a.getConstructor()
if err != nil {
return nil, err
@ -101,7 +96,7 @@ func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter)
}
func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
parts := a.retrieveParts(ctx, false)
parts := a.retrieveParts()
c, err := a.getConstructor()
if err != nil {
return nil, err
@ -119,41 +114,17 @@ func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter
return obj, err
}
func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object {
parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total))
errGroup, ctx := errgroup.WithContext(mainCtx)
for i := range a.ecInfo.Chunks {
chunk := a.ecInfo.Chunks[i]
errGroup.Go(func() error {
objID := new(oid.ID)
err := objID.ReadFromV2(chunk.ID)
if err != nil {
return fmt.Errorf("invalid object ID: %w", err)
}
var obj *objectSDK.Object
if headOnly {
obj, err = a.objGetter.HeadObject(ctx, *objID)
if err != nil {
a.log.Debug(logs.GetUnableToHeadPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
} else {
sow := NewSimpleObjectWriter()
obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow)
if err != nil {
a.log.Debug(logs.GetUnableToGetPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
obj.SetPayload(sow.pld)
}
parts[chunk.Index] = obj
return nil
})
}
if err := errGroup.Wait(); err != nil {
a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err))
func (a *assemblerec) retrieveParts() []*objectSDK.Object {
parts := make([]*objectSDK.Object, a.getTotalCount())
for idx := range parts {
parts[idx] = a.ecParts[uint32(idx)]
}
return parts
}
func (a *assemblerec) getTotalCount() uint32 {
for _, ch := range a.ecChunks {
return ch.Total
}
return 0
}

View file

@ -4,16 +4,17 @@ import (
"context"
"encoding/hex"
"errors"
"fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
@ -69,7 +70,7 @@ func (r *request) processCurrentEpoch(ctx context.Context) bool {
r.status = statusUndefined
if policy.IsECPlacement(cnr.Value.PlacementPolicy()) {
return r.processECNodes(ctx, traverser, policy.ECDataCount(cnr.Value.PlacementPolicy()))
return r.processECNodes(ctx, traverser, policy.ECDataCount(cnr.Value.PlacementPolicy()), policy.ECParityCount(cnr.Value.PlacementPolicy()))
}
return r.processRepNodes(ctx, traverser)
}
@ -109,10 +110,17 @@ func (r *request) processRepNodes(ctx context.Context, traverser *placement.Trav
}
}
func (r *request) processECNodes(ctx context.Context, traverser *placement.Traverser, dataCount int) bool {
func (r *request) processECNodes(ctx context.Context, traverser *placement.Traverser, dataCount, parityCount int) bool {
ctx, span := tracing.StartSpanFromContext(ctx, "getService.processECNodes")
defer span.End()
if !r.isRaw() && len(r.partsEC) >= dataCount {
return true
}
if r.isRaw() && len(r.infoEC) == dataCount+parityCount {
return true
}
err := r.traverseECNodes(ctx, traverser, dataCount)
var errSplitInfo *objectSDK.SplitInfoError
@ -123,9 +131,9 @@ func (r *request) processECNodes(ctx context.Context, traverser *placement.Trave
switch {

What is the benefit of using a separate error type for successfully performed requests?
Go has multiple value returns can we use them?

What is the benefit of using a separate error type for successfully performed requests? Go has multiple value returns can we use them?

To cancel errgroup's context and do not run other requests: if eg.Go's argument returns error, then errgroup cancels context. Of course it is possible to make the same thing without errgroup, but it will be not so simple.
Also added ctx.Done check.

To cancel `errgroup`'s context and do not run other requests: if `eg.Go`'s argument returns error, then errgroup cancels context. Of course it is possible to make the same thing without errgroup, but it will be not so simple. Also added `ctx.Done` check.

It can be done with a special error, e.g. errStopIteration is what we use somewhere.

It can be done with a special error, e.g. `errStopIteration` is what we use somewhere.

But ecGetSuccessErr is special error used only to stop EC handling and pass result object.

But `ecGetSuccessErr` is special error used only to stop EC handling and pass result object.
case err == nil: // nil is returned if all nodes failed or incomplete EC info received
if len(r.infoEC.Chunks) > 0 {
if len(r.infoEC) > 0 {
r.status = statusEC
fyrchik marked this conversation as resolved Outdated

s/returns/is returned/

`s/returns/is returned/`

done

done
r.err = objectSDK.NewECInfoError(r.infoEC)
r.err = r.createECInfoError()
} else {
r.status = statusUndefined
r.err = new(apistatus.ObjectNotFound)
@ -187,7 +195,6 @@ func (r *request) traverseECNodes(ctx context.Context, traverser *placement.Trav
}
func (r *request) processECNodesRequests(ctx context.Context, nodes <-chan placement.Node, dataCount int) error {
var ecInfoGuard sync.Mutex
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(dataCount)
for node := range nodes {
@ -222,26 +229,82 @@ func (r *request) processECNodesRequests(ctx context.Context, nodes <-chan place
// non EC error found, stop
return err
case errors.As(err, &errECInfo):
ecInfoGuard.Lock()
defer ecInfoGuard.Unlock()
r.infoEC = util.MergeECInfo(errECInfo.ECInfo(), r.infoEC)
if r.isRaw() {
if len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total) {
return objectSDK.NewECInfoError(r.infoEC)
return r.appendECChunksWithCheck(errECInfo)

Could you explain this line a bit? Why do we NOT return error if the number of chunks is not equal to the expected?

Could you explain this line a bit? Why do we NOT return error if the number of chunks is not equal to the expected?

In case of HEAD request with raw flag it is expected to get raw EC info error with all chunks.

In case of `HEAD` request with `raw` flag it is expected to get raw EC info error with all chunks.
}
return nil
}
if len(r.infoEC.Chunks) >= dataCount {
return objectSDK.NewECInfoError(r.infoEC)
}
return nil
return r.getECChunksRemote(ctx, errECInfo, info, dataCount)
}
})
}
return eg.Wait()
}
func (r *request) appendECChunksWithCheck(errECInfo *objectSDK.ECInfoError) error {
r.ecGuard.Lock()
defer r.ecGuard.Unlock()
for _, ch := range errECInfo.ECInfo().Chunks {
r.infoEC[ch.Index] = objectSDK.ECChunk(ch)
}
if len(r.infoEC) == int(errECInfo.ECInfo().Chunks[0].Total) {
return r.createECInfoError()
}
return nil
}
func (r *request) getECChunksRemote(ctx context.Context, errECInfo *objectSDK.ECInfoError, info client.NodeInfo, dataCount int) error {
for _, ch := range errECInfo.ECInfo().Chunks {
var objID oid.ID
err := objID.ReadFromV2(ch.ID)
if err != nil {
return fmt.Errorf("invalid object ID: %w", err)
}
var address oid.Address
address.SetContainer(r.containerID())
address.SetObject(objID)
var obj *objectSDK.Object
if r.headOnly() {
obj, err = r.headObjectFromNode(ctx, address, info)
} else {
obj, err = r.getObjectFromNode(ctx, address, info)
}
if err != nil {
r.log.Warn(logs.GetUnableToGetECPartRemote, zap.Error(err), zap.Stringer("part_address", address),
zap.String("node", hex.EncodeToString(info.PublicKey())))
continue
}
if err := r.appendECChunkAndObjectWithCheck(objectSDK.ECChunk(ch), obj, dataCount); err != nil {
return err
}
}
return nil
}
func (r *request) appendECChunkAndObjectWithCheck(chunk objectSDK.ECChunk, object *objectSDK.Object, dataCount int) error {
if object == nil {
return nil
}
r.ecGuard.Lock()
defer r.ecGuard.Unlock()
r.infoEC[chunk.Index] = chunk
r.partsEC[chunk.Index] = object
if len(r.infoEC) >= dataCount && len(r.partsEC) >= dataCount {
return r.createECInfoError()
}
return nil
}
func (r *request) createECInfoError() error {
ecInfo := objectSDK.NewECInfo()
for _, chunk := range r.infoEC {
ecInfo.AddChunk(objectSDK.ECChunk(chunk))
}
return objectSDK.NewECInfoError(ecInfo)
}
type ecGetSuccessErr struct {
Object *objectSDK.Object
}

View file

@ -77,7 +77,8 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error {
prm: prm,
infoSplit: objectSDK.NewSplitInfo(),
infoEC: objectSDK.NewECInfo(),
infoEC: make(map[uint32]objectSDK.ECChunk),
partsEC: make(map[uint32]*objectSDK.Object),
log: s.log,
}

View file

@ -3,12 +3,13 @@ package getsvc
import (
"context"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
@ -45,9 +46,19 @@ func (r *request) executeLocal(ctx context.Context) {
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errECInfo):
if r.isRaw() {
r.appendECChunks(errECInfo)
r.status = statusEC
util.MergeECInfo(errECInfo.ECInfo(), r.infoEC)
r.err = objectSDK.NewECInfoError(r.infoEC)
r.err = r.createECInfoError()
break
}
if err := r.getECChunksLocal(ctx, errECInfo); err != nil {
r.status = statusUndefined
r.err = err
break
}
r.status = statusEC
r.err = r.createECInfoError()
case errors.As(err, &errOutOfRange):
r.status = statusOutOfRange
r.err = errOutOfRange
@ -63,3 +74,49 @@ func (r *request) get(ctx context.Context) (*objectSDK.Object, error) {
}
return r.localStorage.Get(ctx, r.address())
}
func (r *request) appendECChunks(errECInfo *objectSDK.ECInfoError) {
r.ecGuard.Lock()
defer r.ecGuard.Unlock()
for _, ch := range errECInfo.ECInfo().Chunks {
r.infoEC[ch.Index] = objectSDK.ECChunk(ch)
}
}
func (r *request) appendECChunkAndObject(chunk objectSDK.ECChunk, object *objectSDK.Object) {
if object == nil {
return
}
r.ecGuard.Lock()
defer r.ecGuard.Unlock()
r.infoEC[chunk.Index] = chunk
r.partsEC[chunk.Index] = object
}
func (r *request) getECChunksLocal(ctx context.Context, errECInfo *objectSDK.ECInfoError) error {
for _, ch := range errECInfo.ECInfo().Chunks {
var objID oid.ID
err := objID.ReadFromV2(ch.ID)
if err != nil {
return fmt.Errorf("invalid object ID: %w", err)
}
var address oid.Address
address.SetContainer(r.containerID())
address.SetObject(objID)
var obj *objectSDK.Object
if r.headOnly() {
obj, err = r.localStorage.Head(ctx, address, false)
} else {
obj, err = r.localStorage.Get(ctx, address)
}
if err != nil {
r.log.Warn(logs.GetUnableToGetECPartLocal, zap.Error(err), zap.Stringer("part_address", address))
continue
}
r.appendECChunkAndObject(objectSDK.ECChunk(ch), obj)
}
return nil
}

View file

@ -10,6 +10,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
@ -102,3 +103,49 @@ func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.N
return rs.Get(ctx, r.address(), prm)
}
func (r *request) getObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error) {
rs, err := r.remoteStorageConstructor.Get(info)
if err != nil {
return nil, err
}
key, err := r.key()
if err != nil {
return nil, err
}
prm := RemoteRequestParams{
Epoch: r.curProcEpoch,
TTL: 1,
PrivateKey: key,
SessionToken: r.prm.common.SessionToken(),
BearerToken: r.prm.common.BearerToken(),
XHeaders: r.prm.common.XHeaders(),
}
return rs.Get(ctx, addr, prm)
}
func (r *request) headObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error) {
rs, err := r.remoteStorageConstructor.Get(info)
if err != nil {
return nil, err
}
key, err := r.key()
if err != nil {
return nil, err
}
prm := RemoteRequestParams{
Epoch: r.curProcEpoch,
TTL: 1,
PrivateKey: key,
SessionToken: r.prm.common.SessionToken(),
BearerToken: r.prm.common.BearerToken(),
XHeaders: r.prm.common.XHeaders(),
}
return rs.Head(ctx, addr, prm)
}

View file

@ -3,6 +3,7 @@ package getsvc
import (
"context"
"crypto/ecdsa"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
@ -23,7 +24,9 @@ type request struct {
infoSplit *objectSDK.SplitInfo
infoEC *objectSDK.ECInfo
ecGuard sync.Mutex
infoEC map[uint32]objectSDK.ECChunk
partsEC map[uint32]*objectSDK.Object
log *logger.Logger
@ -220,7 +223,7 @@ func (r *request) writeCollectedObject(ctx context.Context) {
// isForwardingEnabled returns true if common execution
// parameters has request forwarding closure set.
func (r request) isForwardingEnabled() bool {
func (r *request) isForwardingEnabled() bool {
return r.prm.forwarder != nil
}