Fix Get EC object from non container node #1253

Merged
dstepanov-yadro merged 2 commits from dstepanov-yadro/frostfs-node:fix/ec_get_non_container_node into master 2024-09-04 19:51:10 +00:00
15 changed files with 435 additions and 125 deletions

View file

@ -108,6 +108,7 @@ const (
GetUnableToGetPartECObject = "unable to get part of the erasure-encoded object" GetUnableToGetPartECObject = "unable to get part of the erasure-encoded object"
GetUnableToHeadPartECObject = "unable to head part of the erasure-encoded object" GetUnableToHeadPartECObject = "unable to head part of the erasure-encoded object"
GetUnableToGetECObjectContainer = "unable to get container for erasure-coded object" GetUnableToGetECObjectContainer = "unable to get container for erasure-coded object"
GetUnableToHeadPartsECObject = "unable to head parts of the erasure-encoded object"
GetAssemblingSplittedObjectCompleted = "assembling splitted object completed" GetAssemblingSplittedObjectCompleted = "assembling splitted object completed"
GetAssemblingECObjectCompleted = "assembling erasure-coded object completed" GetAssemblingECObjectCompleted = "assembling erasure-coded object completed"
GetFailedToAssembleSplittedObject = "failed to assemble splitted object" GetFailedToAssembleSplittedObject = "failed to assemble splitted object"
@ -123,6 +124,7 @@ const (
GetRequestedObjectIsVirtual = "requested object is virtual" GetRequestedObjectIsVirtual = "requested object is virtual"
GetRequestedObjectIsEC = "requested object is erasure-coded" GetRequestedObjectIsEC = "requested object is erasure-coded"
GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds" GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds"
GetUnexpectedECObject = "failed to get EC object from node: expected EC info, but got full object"
PutAdditionalContainerBroadcastFailure = "additional container broadcast failure" PutAdditionalContainerBroadcastFailure = "additional container broadcast failure"
SearchReturnResultDirectly = "return result directly" SearchReturnResultDirectly = "return result directly"
SearchCouldNotConstructRemoteNodeClient = "could not construct remote node client" SearchCouldNotConstructRemoteNodeClient = "could not construct remote node client"

View file

@ -30,7 +30,12 @@ func (w *headSvcWrapper) headAddress(ctx context.Context, exec *execCtx, addr oi
wr := getsvc.NewSimpleObjectWriter() wr := getsvc.NewSimpleObjectWriter()
p := getsvc.HeadPrm{} p := getsvc.HeadPrm{}
p.SetCommonParameters(exec.commonParameters())
if cp := exec.commonParameters(); cp != nil {
commonParameters := *cp
p.SetCommonParameters(&commonParameters)
}

commonParameters may be changed during Head request (ForgetTokens, WithLocalOnly), so need a copy. Founded with autotests.

`commonParameters` may be changed during Head request (`ForgetTokens`, `WithLocalOnly`), so need a copy. Founded with autotests.

Previously it was possible to pass nil, now we pass a pointer to the empty struct.
This is a change in behaviour, can anything bad happen?

Previously it was possible to pass `nil`, now we pass a pointer to the empty struct. This is a change in behaviour, can anything bad happen?

Previously it was possible to pass nil, now we pass a pointer to the empty struct.
This is a change in behaviour, can anything bad happen?

Previously it was possible to pass `nil`, now we pass a pointer to the empty struct. This is a change in behaviour, can anything bad happen?

Fixed

Fixed
p.SetHeaderWriter(wr) p.SetHeaderWriter(wr)
p.WithRawFlag(true) p.WithRawFlag(true)
p.WithAddress(addr) p.WithAddress(addr)

View file

@ -140,7 +140,7 @@ func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm Reque
prm: prm, prm: prm,
infoSplit: objectSDK.NewSplitInfo(), infoSplit: objectSDK.NewSplitInfo(),
infoEC: objectSDK.NewECInfo(), infoEC: newECInfo(),
log: r.log, log: r.log,
} }

View file

@ -6,11 +6,12 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.uber.org/zap" "go.uber.org/zap"
) )
func (r *request) assembleEC(ctx context.Context) { func (r *request) assembleEC(ctx context.Context) {
if r.isRaw() { if r.isRaw() && r.isLocal() {
r.log.Debug(logs.GetCanNotAssembleTheObject) r.log.Debug(logs.GetCanNotAssembleTheObject)
return return
} }
@ -35,8 +36,14 @@ func (r *request) assembleEC(ctx context.Context) {
r.log.Debug(logs.GetTryingToAssembleTheECObject) r.log.Debug(logs.GetTryingToAssembleTheECObject)
// initialize epoch number
ok := r.initEpoch()

Logging and status change - inside r.initEpoch() method.

Logging and status change - inside `r.initEpoch()` method.
if !ok {
return
}
r.prm.common = r.prm.common.WithLocalOnly(false) r.prm.common = r.prm.common.WithLocalOnly(false)
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log) assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.localStorage, r.containerSource, r.log, r.headOnly(), r.isRaw(), r.traverserGenerator, r.curProcEpoch)
r.log.Debug(logs.GetAssemblingECObject, r.log.Debug(logs.GetAssemblingECObject,
zap.Uint64("range_offset", r.ctxRange().GetOffset()), zap.Uint64("range_offset", r.ctxRange().GetOffset()),
@ -47,8 +54,8 @@ func (r *request) assembleEC(ctx context.Context) {
zap.Uint64("range_length", r.ctxRange().GetLength()), zap.Uint64("range_length", r.ctxRange().GetLength()),
) )
obj, err := assembler.Assemble(ctx, r.prm.objWriter, r.headOnly()) obj, err := assembler.Assemble(ctx, r.prm.objWriter)
if err != nil { if err != nil && !errors.As(err, new(*objectSDK.ECInfoError)) {
r.log.Warn(logs.GetFailedToAssembleECObject, r.log.Warn(logs.GetFailedToAssembleECObject,
zap.Error(err), zap.Error(err),
zap.Uint64("range_offset", r.ctxRange().GetOffset()), zap.Uint64("range_offset", r.ctxRange().GetOffset()),
@ -58,6 +65,7 @@ func (r *request) assembleEC(ctx context.Context) {
var errRemoved *apistatus.ObjectAlreadyRemoved var errRemoved *apistatus.ObjectAlreadyRemoved
var errOutOfRange *apistatus.ObjectOutOfRange var errOutOfRange *apistatus.ObjectOutOfRange
var errECInfo *objectSDK.ECInfoError
switch { switch {
default: default:
@ -73,5 +81,8 @@ func (r *request) assembleEC(ctx context.Context) {
case errors.As(err, &errOutOfRange): case errors.As(err, &errOutOfRange):
r.status = statusOutOfRange r.status = statusOutOfRange
r.err = errOutOfRange r.err = errOutOfRange
case errors.As(err, &errECInfo):
r.status = statusEC
r.err = err
} }
} }

View file

@ -2,11 +2,16 @@ package getsvc
import ( import (
"context" "context"
"encoding/hex"
"errors"
"fmt" "fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -16,61 +21,79 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
) )
var errECPartsRetrieveCompleted = errors.New("EC parts receive completed")
type ecRemoteStorage interface {
getObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error)
headObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo, raw bool) (*objectSDK.Object, error)
}
type assemblerec struct { type assemblerec struct {
addr oid.Address addr oid.Address
ecInfo *objectSDK.ECInfo ecInfo *ecInfo
rng *objectSDK.Range rng *objectSDK.Range
objGetter objectGetter remoteStorage ecRemoteStorage
localStorage localStorage
cs container.Source cs container.Source
log *logger.Logger log *logger.Logger
head bool
raw bool
traverserGenerator traverserGenerator
epoch uint64
} }
func newAssemblerEC( func newAssemblerEC(
addr oid.Address, addr oid.Address,
ecInfo *objectSDK.ECInfo, ecInfo *ecInfo,
rng *objectSDK.Range, rng *objectSDK.Range,
objGetter objectGetter, remoteStorage ecRemoteStorage,
localStorage localStorage,
cs container.Source, cs container.Source,
log *logger.Logger, log *logger.Logger,
head bool,
raw bool,
tg traverserGenerator,
epoch uint64,
) *assemblerec { ) *assemblerec {
return &assemblerec{ return &assemblerec{
addr: addr, addr: addr,
rng: rng, rng: rng,
ecInfo: ecInfo, ecInfo: ecInfo,
objGetter: objGetter, remoteStorage: remoteStorage,
localStorage: localStorage,
cs: cs, cs: cs,
log: log, log: log,
head: head,
raw: raw,
traverserGenerator: tg,
epoch: epoch,
} }
} }
// Assemble assembles erasure-coded object and writes it's content to ObjectWriter. // Assemble assembles erasure-coded object and writes it's content to ObjectWriter.
// It returns parent object. // It returns parent object.
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) { func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
if headOnly { switch {
case a.raw:
err := a.reconstructRawError(ctx)
return nil, err
case a.head:
return a.reconstructHeader(ctx, writer) return a.reconstructHeader(ctx, writer)
} else if a.rng != nil { case a.rng != nil:
return a.reconstructRange(ctx, writer) return a.reconstructRange(ctx, writer)
} default:
return a.reconstructObject(ctx, writer) return a.reconstructObject(ctx, writer)
} }
func (a *assemblerec) getConstructor() (*erasurecode.Constructor, error) {
cnt, err := a.cs.Get(a.addr.Container())
if err != nil {
return nil, err
} }
dataCount := policy.ECDataCount(cnt.Value.PlacementPolicy())
parityCount := policy.ECParityCount(cnt.Value.PlacementPolicy()) func (a *assemblerec) getConstructor(cnr *container.Container) (*erasurecode.Constructor, error) {
dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy())
parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy())
return erasurecode.NewConstructor(dataCount, parityCount) return erasurecode.NewConstructor(dataCount, parityCount)
} }
func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
parts := a.retrieveParts(ctx, true) obj, err := a.reconstructObjectFromParts(ctx, true)
c, err := a.getConstructor()
if err != nil {
return nil, err
}
obj, err := c.ReconstructHeader(parts)
if err == nil { if err == nil {
return obj, writer.WriteHeader(ctx, obj) return obj, writer.WriteHeader(ctx, obj)
} }
@ -78,12 +101,7 @@ func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter
} }
func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
parts := a.retrieveParts(ctx, false) obj, err := a.reconstructObjectFromParts(ctx, false)
c, err := a.getConstructor()
if err != nil {
return nil, err
}
obj, err := c.Reconstruct(parts)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -101,12 +119,7 @@ func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter)
} }
func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) { func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
parts := a.retrieveParts(ctx, false) obj, err := a.reconstructObjectFromParts(ctx, false)
c, err := a.getConstructor()
if err != nil {
return nil, err
}
obj, err := c.Reconstruct(parts)
if err == nil { if err == nil {
err = writer.WriteHeader(ctx, obj.CutPayload()) err = writer.WriteHeader(ctx, obj.CutPayload())
if err == nil { if err == nil {
@ -119,41 +132,238 @@ func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter
return obj, err return obj, err
} }
func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object { func (a *assemblerec) reconstructObjectFromParts(ctx context.Context, headers bool) (*objectSDK.Object, error) {
parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total)) objID := a.addr.Object()
errGroup, ctx := errgroup.WithContext(mainCtx) trav, cnr, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch)
if err != nil {
return nil, err
}
c, err := a.getConstructor(cnr)
if err != nil {
return nil, err

This part seems to be common for all reconstruct*() helpers. If we move it one level above, will it make the code cleaner?

trav, cnr, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch)
if err != nil {
	return nil, err
}
c, err := a.getConstructor(cnr)
if err != nil {
	return nil, err
}
This part seems to be common for all `reconstruct*()` helpers. If we move it one level above, will it make the code cleaner? ``` trav, cnr, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch) if err != nil { return nil, err } c, err := a.getConstructor(cnr) if err != nil { return nil, err } ```

Done

Done
}
parts := a.retrieveParts(ctx, trav, cnr)
if headers {
return c.ReconstructHeader(parts)
}
return c.Reconstruct(parts)
}
for i := range a.ecInfo.Chunks { func (a *assemblerec) reconstructRawError(ctx context.Context) error {
chunk := a.ecInfo.Chunks[i] chunks := make(map[string]objectSDK.ECChunk)
errGroup.Go(func() error { var chunksGuard sync.Mutex
objID := new(oid.ID) for _, ch := range a.ecInfo.localChunks {
err := objID.ReadFromV2(chunk.ID) chunks[string(ch.ID.GetValue())] = ch
if err != nil {
return fmt.Errorf("invalid object ID: %w", err)
} }
var obj *objectSDK.Object
if headOnly { objID := a.addr.Object()
obj, err = a.objGetter.HeadObject(ctx, *objID) trav, _, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch)
if err != nil { if err != nil {
a.log.Debug(logs.GetUnableToHeadPartECObject, zap.Stringer("part_id", objID), zap.Error(err)) return err
}
eg, ctx := errgroup.WithContext(ctx)
for {
batch := trav.Next()
if len(batch) == 0 {
break
}
for _, node := range batch {
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node)
eg.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if _, found := a.ecInfo.remoteChunks[string(info.PublicKey())]; found {
return nil return nil
} }
} else {
sow := NewSimpleObjectWriter() nodeChunks := a.tryGetChunkListFromNode(ctx, info)
obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow)
if err != nil { chunksGuard.Lock()
a.log.Debug(logs.GetUnableToGetPartECObject, zap.Stringer("part_id", objID), zap.Error(err)) defer chunksGuard.Unlock()
return nil for _, ch := range nodeChunks {
chunks[string(ch.ID.GetValue())] = ch
} }
obj.SetPayload(sow.pld)
}
parts[chunk.Index] = obj
return nil return nil
}) })
} }
}
if err = eg.Wait(); err != nil {
return err
}
return createECInfoErr(chunks)
}
if err := errGroup.Wait(); err != nil { func (a *assemblerec) retrieveParts(ctx context.Context, trav *placement.Traverser, cnr *container.Container) []*objectSDK.Object {
dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy())

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

ch.ID is objectID. In certain situations, you can get the same chunk from different nodes.

`ch.ID` is objectID. In certain situations, you can get the same chunk from different nodes.
parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy())
remoteNodes := make([]placement.Node, 0)
Review

var remoteNodes []placement.Node?

`var remoteNodes []placement.Node`?
for {
batch := trav.Next()
if len(batch) == 0 {
break
}
remoteNodes = append(remoteNodes, batch...)
}
parts, err := a.processECNodesRequests(ctx, remoteNodes, dataCount, parityCount)
if err != nil {
a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err)) a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err))
} }
return parts return parts
} }
func (a *assemblerec) processECNodesRequests(ctx context.Context, nodes []placement.Node, dataCount, parityCount int) ([]*objectSDK.Object, error) {
foundChunks := make(map[uint32]*objectSDK.Object)
var foundChunksGuard sync.Mutex
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(dataCount)
for _, ch := range a.ecInfo.localChunks {
ch := ch
eg.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
object := a.tryGetChunkFromLocalStorage(ctx, ch)

s/founded/found, founded is what happened with Saint-Petersburg in 1703 :)

`s/founded/found`, founded is what happened with Saint-Petersburg in 1703 :)

Fixed

Fixed
if object == nil {
return nil
}
foundChunksGuard.Lock()
foundChunks[ch.Index] = object
count := len(foundChunks)
foundChunksGuard.Unlock()
if count >= dataCount {
return errECPartsRetrieveCompleted
}
return nil
})
}
for _, node := range nodes {
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node)
eg.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
chunks := a.tryGetChunkListFromNode(ctx, info)

Just for my curiosity:

We get the list of chunks available on the observing node - could these sets of chunks be intersected by some reason?
It still works as you're using foundChunks map but can be some chunks is gotten by tryGetChunkFromRemoteStorage a few times?

Just for my curiosity: We get the list of chunks available on the observing `node` - could these sets of `chunks` be intersected by some reason? It still works as you're using `foundChunks` map but can be some chunks is gotten by `tryGetChunkFromRemoteStorage` a few times?

Yes, in case of remove shard/return shard scenario. But it looks like rare case.

Yes, in case of remove shard/return shard scenario. But it looks like rare case.
for _, ch := range chunks {
object := a.tryGetChunkFromRemoteStorage(ctx, info, ch)
if object == nil {
continue
}
foundChunksGuard.Lock()
foundChunks[ch.Index] = object
count := len(foundChunks)
foundChunksGuard.Unlock()
if count >= dataCount {
return errECPartsRetrieveCompleted
}
}
return nil
})
}
err := eg.Wait()
if err == nil || errors.Is(err, errECPartsRetrieveCompleted) {
parts := make([]*objectSDK.Object, dataCount+parityCount)
for idx, chunk := range foundChunks {
parts[idx] = chunk
}
return parts, nil
}
return nil, err
}
func (a *assemblerec) tryGetChunkFromLocalStorage(ctx context.Context, ch objectSDK.ECChunk) *objectSDK.Object {
var objID oid.ID
err := objID.ReadFromV2(ch.ID)
if err != nil {
a.log.Error(logs.GetUnableToHeadPartECObject, zap.String("node", "local"), zap.Uint32("part_index", ch.Index), zap.Error(fmt.Errorf("invalid object ID: %w", err)))
return nil
}
var addr oid.Address
addr.SetContainer(addr.Container())
addr.SetObject(objID)
var object *objectSDK.Object
if a.head {
object, err = a.localStorage.Head(ctx, addr, false)
if err != nil {
a.log.Warn(logs.GetUnableToHeadPartECObject, zap.String("node", "local"), zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
} else {
object, err = a.localStorage.Get(ctx, addr)
if err != nil {
a.log.Warn(logs.GetUnableToGetPartECObject, zap.String("node", "local"), zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
}
return object
}
func (a *assemblerec) tryGetChunkListFromNode(ctx context.Context, node client.NodeInfo) []objectSDK.ECChunk {
if chunks, found := a.ecInfo.remoteChunks[string(node.PublicKey())]; found {
return chunks
}
var errECInfo *objectSDK.ECInfoError
_, err := a.remoteStorage.headObjectFromNode(ctx, a.addr, node, true)
if err == nil {
a.log.Error(logs.GetUnexpectedECObject, zap.String("node", hex.EncodeToString(node.PublicKey())))
return nil
}
if !errors.As(err, &errECInfo) {
a.log.Warn(logs.GetUnableToHeadPartsECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err))
return nil
}
result := make([]objectSDK.ECChunk, 0, len(errECInfo.ECInfo().Chunks))
for _, ch := range errECInfo.ECInfo().Chunks {
result = append(result, objectSDK.ECChunk(ch))
}
return result
}
func (a *assemblerec) tryGetChunkFromRemoteStorage(ctx context.Context, node client.NodeInfo, ch objectSDK.ECChunk) *objectSDK.Object {
var objID oid.ID
err := objID.ReadFromV2(ch.ID)
if err != nil {
a.log.Error(logs.GetUnableToHeadPartECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Uint32("part_index", ch.Index), zap.Error(fmt.Errorf("invalid object ID: %w", err)))
return nil
}
var addr oid.Address
addr.SetContainer(a.addr.Container())
addr.SetObject(objID)
var object *objectSDK.Object
if a.head {
object, err = a.remoteStorage.headObjectFromNode(ctx, addr, node, false)
if err != nil {
a.log.Warn(logs.GetUnableToHeadPartECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
} else {
object, err = a.remoteStorage.getObjectFromNode(ctx, addr, node)
if err != nil {
a.log.Warn(logs.GetUnableToGetPartECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
}
return object
}
func createECInfoErr(chunks map[string]objectSDK.ECChunk) *objectSDK.ECInfoError {
info := objectSDK.NewECInfo()
for _, ch := range chunks {
info.AddChunk(ch)
}
return objectSDK.NewECInfoError(info)
}

View file

@ -77,7 +77,7 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error {
prm: prm, prm: prm,
infoSplit: objectSDK.NewSplitInfo(), infoSplit: objectSDK.NewSplitInfo(),
infoEC: objectSDK.NewECInfo(), infoEC: newECInfo(),
log: s.log, log: s.log,
} }
@ -110,15 +110,8 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
case statusOutOfRange: case statusOutOfRange:
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds) exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
case statusEC: case statusEC:
if !exec.isLocal() {
if execCnr {
exec.executeOnContainer(ctx)
exec.analyzeStatus(ctx, false)
} else {
exec.log.Debug(logs.GetRequestedObjectIsEC) exec.log.Debug(logs.GetRequestedObjectIsEC)
exec.assembleEC(ctx) exec.assembleEC(ctx)
}
}
default: default:
exec.log.Debug(logs.OperationFinishedWithError, exec.log.Debug(logs.OperationFinishedWithError,
zap.Error(exec.err), zap.Error(exec.err),

View file

@ -11,6 +11,7 @@ import (
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
@ -78,7 +79,7 @@ func newTestStorage() *testStorage {
} }
} }
func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, error) { func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, *containerCore.Container, error) {
opts := make([]placement.Option, 0, 4) opts := make([]placement.Option, 0, 4)
opts = append(opts, opts = append(opts,
placement.ForContainer(g.c), placement.ForContainer(g.c),
@ -90,7 +91,10 @@ func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e ui
opts = append(opts, placement.ForObject(*obj)) opts = append(opts, placement.ForObject(*obj))
} }
return placement.NewTraverser(opts...) t, err := placement.NewTraverser(opts...)
return t, &containerCore.Container{
Value: g.c,
}, err
} }
func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
@ -474,6 +478,7 @@ func testNodeMatrix(t testing.TB, dim []int) ([][]netmap.NodeInfo, [][]string) {
var ni netmap.NodeInfo var ni netmap.NodeInfo
ni.SetNetworkEndpoints(a) ni.SetNetworkEndpoints(a)
ni.SetPublicKey([]byte(a))

Why is it important?

Why is it important?

Why is it important?

Why is it important?

Get service uses node's public key as map key for EC chunks. Without this change all chunks belong to the same node without public key in unit tests, so unit test fails.

`Get` service uses node's public key as map key for EC chunks. Without this change all chunks belong to the same node without public key in unit tests, so unit test fails.
var na network.AddressGroup var na network.AddressGroup

View file

@ -5,7 +5,6 @@ import (
"errors" "errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -46,8 +45,7 @@ func (r *request) executeLocal(ctx context.Context) {
r.err = objectSDK.NewSplitInfoError(r.infoSplit) r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errECInfo): case errors.As(err, &errECInfo):
r.status = statusEC r.status = statusEC
util.MergeECInfo(errECInfo.ECInfo(), r.infoEC) r.err = r.infoEC.addLocal(errECInfo.ECInfo())
r.err = objectSDK.NewECInfoError(r.infoEC)
case errors.As(err, &errOutOfRange): case errors.As(err, &errOutOfRange):
r.status = statusOutOfRange r.status = statusOutOfRange
r.err = errOutOfRange r.err = errOutOfRange

View file

@ -7,11 +7,10 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -36,11 +35,6 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
switch { switch {
default: default:
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err)) r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
if r.status == statusEC {
// we need to continue getting another chunks from another nodes
// in case of network issue
return false
}
r.status = statusUndefined r.status = statusUndefined
r.err = new(apistatus.ObjectNotFound) r.err = new(apistatus.ObjectNotFound)
case err == nil: case err == nil:
@ -66,18 +60,7 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
r.err = objectSDK.NewSplitInfoError(r.infoSplit) r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errECInfo): case errors.As(err, &errECInfo):
r.status = statusEC r.status = statusEC
util.MergeECInfo(r.infoEC, errECInfo.ECInfo()) r.err = r.infoEC.addRemote(string(info.PublicKey()), errECInfo.ECInfo())
r.infoEC = errECInfo.ECInfo()
r.err = objectSDK.NewECInfoError(r.infoEC)
if r.isRaw() {
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
}
cnt, err := r.containerSource.Get(r.address().Container())
if err == nil {
return len(r.infoEC.Chunks) == policy.ECDataCount(cnt.Value.PlacementPolicy())
}
r.log.Debug(logs.GetUnableToGetECObjectContainer, zap.Error(err))
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
} }
return r.status != statusUndefined return r.status != statusUndefined
@ -116,3 +99,50 @@ func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.N
return rs.Get(ctx, r.address(), prm) return rs.Get(ctx, r.address(), prm)
} }
func (r *request) getObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error) {
rs, err := r.remoteStorageConstructor.Get(info)
if err != nil {
return nil, err
}
key, err := r.key()
if err != nil {
return nil, err
}
prm := RemoteRequestParams{
Epoch: r.curProcEpoch,
TTL: 1,
PrivateKey: key,
SessionToken: r.prm.common.SessionToken(),
BearerToken: r.prm.common.BearerToken(),
XHeaders: r.prm.common.XHeaders(),
}
return rs.Get(ctx, addr, prm)
}
func (r *request) headObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo, raw bool) (*objectSDK.Object, error) {
rs, err := r.remoteStorageConstructor.Get(info)
if err != nil {
return nil, err
}
key, err := r.key()
if err != nil {
return nil, err
}
prm := RemoteRequestParams{
Epoch: r.curProcEpoch,
TTL: 1,
PrivateKey: key,
SessionToken: r.prm.common.SessionToken(),
BearerToken: r.prm.common.BearerToken(),
XHeaders: r.prm.common.XHeaders(),
IsRaw: raw,
}
return rs.Head(ctx, addr, prm)
}

View file

@ -23,7 +23,7 @@ type request struct {
infoSplit *objectSDK.SplitInfo infoSplit *objectSDK.SplitInfo
infoEC *objectSDK.ECInfo infoEC *ecInfo
log *logger.Logger log *logger.Logger
@ -141,7 +141,7 @@ func (r *request) initEpoch() bool {
func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, bool) { func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
obj := addr.Object() obj := addr.Object()
t, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch) t, _, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch)
switch { switch {
default: default:

View file

@ -6,6 +6,7 @@ import (
"errors" "errors"
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
@ -23,7 +24,7 @@ type epochSource interface {
} }
type traverserGenerator interface { type traverserGenerator interface {
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error) GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error)
} }
type keyStorage interface { type keyStorage interface {
@ -236,3 +237,51 @@ type RangeHashRes struct {
func (r *RangeHashRes) Hashes() [][]byte { func (r *RangeHashRes) Hashes() [][]byte {
return r.hashes return r.hashes
} }
type ecInfo struct {
localChunks []objectSDK.ECChunk
remoteChunks map[string][]objectSDK.ECChunk // node pk -> chunk slice
}
func newECInfo() *ecInfo {
return &ecInfo{
localChunks: make([]objectSDK.ECChunk, 0),
remoteChunks: make(map[string][]objectSDK.ECChunk),
}
}
func (e *ecInfo) addLocal(ecInfo *objectSDK.ECInfo) *objectSDK.ECInfoError {
for _, ch := range ecInfo.Chunks {
e.localChunks = append(e.localChunks, objectSDK.ECChunk(ch))
}
return e.createECInfoErr()
}
func (e *ecInfo) addRemote(nodePK string, ecInfo *objectSDK.ECInfo) *objectSDK.ECInfoError {
for _, ch := range ecInfo.Chunks {
e.remoteChunks[nodePK] = append(e.remoteChunks[nodePK], objectSDK.ECChunk(ch))
}
return e.createECInfoErr()
}
func (e *ecInfo) createECInfoErr() *objectSDK.ECInfoError {
unique := make(map[string]struct{})
result := objectSDK.NewECInfo()
for _, ch := range e.localChunks {
if _, found := unique[string(ch.ID.GetValue())]; found {
continue
}
result.AddChunk(ch)
unique[string(ch.ID.GetValue())] = struct{}{}
}
for _, chunks := range e.remoteChunks {
for _, ch := range chunks {
if _, found := unique[string(ch.ID.GetValue())]; found {
continue
}
result.AddChunk(ch)
unique[string(ch.ID.GetValue())] = struct{}{}
}
}
return objectSDK.NewECInfoError(result)
}

View file

@ -47,7 +47,7 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) error {
zap.Uint64("number", exec.curProcEpoch), zap.Uint64("number", exec.curProcEpoch),
) )
traverser, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch) traverser, _, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch)
if err != nil { if err != nil {
return fmt.Errorf("%s: %w", logs.SearchCouldNotGenerateContainerTraverser, err) return fmt.Errorf("%s: %w", logs.SearchCouldNotGenerateContainerTraverser, err)
} }

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
@ -80,12 +81,13 @@ func newTestStorage() *testStorage {
} }
} }
func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, error) { func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, *containerCore.Container, error) {
return placement.NewTraverser( t, err := placement.NewTraverser(
placement.ForContainer(g.c), placement.ForContainer(g.c),
placement.UseBuilder(g.b[epoch]), placement.UseBuilder(g.b[epoch]),
placement.WithoutSuccessTracking(), placement.WithoutSuccessTracking(),
) )
return t, &containerCore.Container{Value: g.c}, err
} }
func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {

View file

@ -4,6 +4,7 @@ import (
"context" "context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
@ -45,7 +46,7 @@ type cfg struct {
} }
traverserGenerator interface { traverserGenerator interface {
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error) GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error)
} }
currentEpochReceiver interface { currentEpochReceiver interface {

View file

@ -122,17 +122,17 @@ func (g *TraverserGenerator) WithTraverseOptions(opts ...placement.Option) *Trav
// GenerateTraverser generates placement Traverser for provided object address // GenerateTraverser generates placement Traverser for provided object address
// using epoch-th network map. // using epoch-th network map.
func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, error) { func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, *container.Container, error) {
// get network map by epoch // get network map by epoch
nm, err := g.netMapSrc.GetNetMapByEpoch(epoch) nm, err := g.netMapSrc.GetNetMapByEpoch(epoch)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get network map #%d: %w", epoch, err) return nil, nil, fmt.Errorf("could not get network map #%d: %w", epoch, err)
} }
// get container related container // get container related container
cnr, err := g.cnrSrc.Get(idCnr) cnr, err := g.cnrSrc.Get(idCnr)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not get container: %w", err) return nil, nil, fmt.Errorf("could not get container: %w", err)
} }
// allocate placement traverser options // allocate placement traverser options
@ -160,5 +160,9 @@ func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoc
) )
} }
return placement.NewTraverser(traverseOpts...) t, err := placement.NewTraverser(traverseOpts...)
if err != nil {
return nil, nil, err
}
return t, cnr, nil
} }