Fix Get EC object from non container node #1253

Merged
dstepanov-yadro merged 2 commits from dstepanov-yadro/frostfs-node:fix/ec_get_non_container_node into master 2024-09-04 19:51:10 +00:00
15 changed files with 435 additions and 125 deletions

View file

@ -108,6 +108,7 @@ const (
GetUnableToGetPartECObject = "unable to get part of the erasure-encoded object"
GetUnableToHeadPartECObject = "unable to head part of the erasure-encoded object"
GetUnableToGetECObjectContainer = "unable to get container for erasure-coded object"
GetUnableToHeadPartsECObject = "unable to head parts of the erasure-encoded object"
GetAssemblingSplittedObjectCompleted = "assembling splitted object completed"
GetAssemblingECObjectCompleted = "assembling erasure-coded object completed"
GetFailedToAssembleSplittedObject = "failed to assemble splitted object"
@ -123,6 +124,7 @@ const (
GetRequestedObjectIsVirtual = "requested object is virtual"
GetRequestedObjectIsEC = "requested object is erasure-coded"
GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds"
GetUnexpectedECObject = "failed to get EC object from node: expected EC info, but got full object"
PutAdditionalContainerBroadcastFailure = "additional container broadcast failure"
SearchReturnResultDirectly = "return result directly"
SearchCouldNotConstructRemoteNodeClient = "could not construct remote node client"

View file

@ -30,7 +30,12 @@ func (w *headSvcWrapper) headAddress(ctx context.Context, exec *execCtx, addr oi
wr := getsvc.NewSimpleObjectWriter()
p := getsvc.HeadPrm{}
p.SetCommonParameters(exec.commonParameters())
if cp := exec.commonParameters(); cp != nil {
commonParameters := *cp
p.SetCommonParameters(&commonParameters)
}

commonParameters may be changed during Head request (ForgetTokens, WithLocalOnly), so need a copy. Founded with autotests.

`commonParameters` may be changed during Head request (`ForgetTokens`, `WithLocalOnly`), so need a copy. Founded with autotests.

Previously it was possible to pass nil, now we pass a pointer to the empty struct.
This is a change in behaviour, can anything bad happen?

Previously it was possible to pass `nil`, now we pass a pointer to the empty struct. This is a change in behaviour, can anything bad happen?

Previously it was possible to pass nil, now we pass a pointer to the empty struct.
This is a change in behaviour, can anything bad happen?

Previously it was possible to pass `nil`, now we pass a pointer to the empty struct. This is a change in behaviour, can anything bad happen?

Fixed

Fixed
p.SetHeaderWriter(wr)
p.WithRawFlag(true)
p.WithAddress(addr)

View file

@ -140,7 +140,7 @@ func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm Reque
prm: prm,
infoSplit: objectSDK.NewSplitInfo(),
infoEC: objectSDK.NewECInfo(),
infoEC: newECInfo(),
log: r.log,
}

View file

@ -6,11 +6,12 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.uber.org/zap"
)
func (r *request) assembleEC(ctx context.Context) {
if r.isRaw() {
if r.isRaw() && r.isLocal() {
r.log.Debug(logs.GetCanNotAssembleTheObject)
return
}
@ -35,8 +36,14 @@ func (r *request) assembleEC(ctx context.Context) {
r.log.Debug(logs.GetTryingToAssembleTheECObject)
// initialize epoch number
ok := r.initEpoch()

Logging and status change - inside r.initEpoch() method.

Logging and status change - inside `r.initEpoch()` method.
if !ok {
return
}
r.prm.common = r.prm.common.WithLocalOnly(false)
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log)
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.localStorage, r.containerSource, r.log, r.headOnly(), r.isRaw(), r.traverserGenerator, r.curProcEpoch)
r.log.Debug(logs.GetAssemblingECObject,
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
@ -47,8 +54,8 @@ func (r *request) assembleEC(ctx context.Context) {
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
obj, err := assembler.Assemble(ctx, r.prm.objWriter, r.headOnly())
if err != nil {
obj, err := assembler.Assemble(ctx, r.prm.objWriter)
if err != nil && !errors.As(err, new(*objectSDK.ECInfoError)) {
r.log.Warn(logs.GetFailedToAssembleECObject,
zap.Error(err),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
@ -58,6 +65,7 @@ func (r *request) assembleEC(ctx context.Context) {
var errRemoved *apistatus.ObjectAlreadyRemoved
var errOutOfRange *apistatus.ObjectOutOfRange
var errECInfo *objectSDK.ECInfoError
switch {
default:
@ -73,5 +81,8 @@ func (r *request) assembleEC(ctx context.Context) {
case errors.As(err, &errOutOfRange):
r.status = statusOutOfRange
r.err = errOutOfRange
case errors.As(err, &errECInfo):
r.status = statusEC
r.err = err
}
}

View file

@ -2,11 +2,16 @@ package getsvc
import (
"context"
"encoding/hex"
"errors"
"fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -16,61 +21,79 @@ import (
"golang.org/x/sync/errgroup"
)
var errECPartsRetrieveCompleted = errors.New("EC parts receive completed")
type ecRemoteStorage interface {
getObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error)
headObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo, raw bool) (*objectSDK.Object, error)
}
type assemblerec struct {
addr oid.Address
ecInfo *objectSDK.ECInfo
rng *objectSDK.Range
objGetter objectGetter
cs container.Source
log *logger.Logger
addr oid.Address
ecInfo *ecInfo
rng *objectSDK.Range
remoteStorage ecRemoteStorage
localStorage localStorage
cs container.Source
log *logger.Logger
head bool
raw bool
traverserGenerator traverserGenerator
epoch uint64
}
func newAssemblerEC(
addr oid.Address,
ecInfo *objectSDK.ECInfo,
ecInfo *ecInfo,
rng *objectSDK.Range,
objGetter objectGetter,
remoteStorage ecRemoteStorage,
localStorage localStorage,
cs container.Source,
log *logger.Logger,
head bool,
raw bool,
tg traverserGenerator,
epoch uint64,
) *assemblerec {
return &assemblerec{
addr: addr,
rng: rng,
ecInfo: ecInfo,
objGetter: objGetter,
cs: cs,
log: log,
addr: addr,
rng: rng,
ecInfo: ecInfo,
remoteStorage: remoteStorage,
localStorage: localStorage,
cs: cs,
log: log,
head: head,
raw: raw,
traverserGenerator: tg,
epoch: epoch,
}
}
// Assemble assembles erasure-coded object and writes it's content to ObjectWriter.
// It returns parent object.
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) {
if headOnly {
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
switch {
case a.raw:
err := a.reconstructRawError(ctx)
return nil, err
case a.head:
return a.reconstructHeader(ctx, writer)
} else if a.rng != nil {
case a.rng != nil:
return a.reconstructRange(ctx, writer)
default:
return a.reconstructObject(ctx, writer)
}
return a.reconstructObject(ctx, writer)
}
func (a *assemblerec) getConstructor() (*erasurecode.Constructor, error) {
cnt, err := a.cs.Get(a.addr.Container())
if err != nil {
return nil, err
}
dataCount := policy.ECDataCount(cnt.Value.PlacementPolicy())
parityCount := policy.ECParityCount(cnt.Value.PlacementPolicy())
func (a *assemblerec) getConstructor(cnr *container.Container) (*erasurecode.Constructor, error) {
dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy())
parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy())
return erasurecode.NewConstructor(dataCount, parityCount)
}
func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
parts := a.retrieveParts(ctx, true)
c, err := a.getConstructor()
if err != nil {
return nil, err
}
obj, err := c.ReconstructHeader(parts)
obj, err := a.reconstructObjectFromParts(ctx, true)
if err == nil {
return obj, writer.WriteHeader(ctx, obj)
}
@ -78,12 +101,7 @@ func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter
}
func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
parts := a.retrieveParts(ctx, false)
c, err := a.getConstructor()
if err != nil {
return nil, err
}
obj, err := c.Reconstruct(parts)
obj, err := a.reconstructObjectFromParts(ctx, false)
if err != nil {
return nil, err
}
@ -101,12 +119,7 @@ func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter)
}
func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
parts := a.retrieveParts(ctx, false)
c, err := a.getConstructor()
if err != nil {
return nil, err
}
obj, err := c.Reconstruct(parts)
obj, err := a.reconstructObjectFromParts(ctx, false)
if err == nil {
err = writer.WriteHeader(ctx, obj.CutPayload())
if err == nil {
@ -119,41 +132,238 @@ func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter
return obj, err
}
func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object {
parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total))
errGroup, ctx := errgroup.WithContext(mainCtx)
func (a *assemblerec) reconstructObjectFromParts(ctx context.Context, headers bool) (*objectSDK.Object, error) {
objID := a.addr.Object()
trav, cnr, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch)
if err != nil {
return nil, err
}
c, err := a.getConstructor(cnr)
if err != nil {
return nil, err

This part seems to be common for all reconstruct*() helpers. If we move it one level above, will it make the code cleaner?

trav, cnr, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch)
if err != nil {
	return nil, err
}
c, err := a.getConstructor(cnr)
if err != nil {
	return nil, err
}
This part seems to be common for all `reconstruct*()` helpers. If we move it one level above, will it make the code cleaner? ``` trav, cnr, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch) if err != nil { return nil, err } c, err := a.getConstructor(cnr) if err != nil { return nil, err } ```

Done

Done
}
parts := a.retrieveParts(ctx, trav, cnr)
if headers {
return c.ReconstructHeader(parts)
}
return c.Reconstruct(parts)
}
for i := range a.ecInfo.Chunks {
chunk := a.ecInfo.Chunks[i]
errGroup.Go(func() error {
objID := new(oid.ID)
err := objID.ReadFromV2(chunk.ID)
if err != nil {
return fmt.Errorf("invalid object ID: %w", err)
}
var obj *objectSDK.Object
if headOnly {
obj, err = a.objGetter.HeadObject(ctx, *objID)
if err != nil {
a.log.Debug(logs.GetUnableToHeadPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
} else {
sow := NewSimpleObjectWriter()
obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow)
if err != nil {
a.log.Debug(logs.GetUnableToGetPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
obj.SetPayload(sow.pld)
}
parts[chunk.Index] = obj
return nil
})
func (a *assemblerec) reconstructRawError(ctx context.Context) error {
chunks := make(map[string]objectSDK.ECChunk)
var chunksGuard sync.Mutex
for _, ch := range a.ecInfo.localChunks {
chunks[string(ch.ID.GetValue())] = ch
}
if err := errGroup.Wait(); err != nil {
objID := a.addr.Object()
trav, _, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch)
if err != nil {
return err
}
eg, ctx := errgroup.WithContext(ctx)
for {
batch := trav.Next()
if len(batch) == 0 {
break
}
for _, node := range batch {
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node)
eg.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if _, found := a.ecInfo.remoteChunks[string(info.PublicKey())]; found {
return nil
}
nodeChunks := a.tryGetChunkListFromNode(ctx, info)
chunksGuard.Lock()
defer chunksGuard.Unlock()
for _, ch := range nodeChunks {
chunks[string(ch.ID.GetValue())] = ch
}
return nil
})
}
}
if err = eg.Wait(); err != nil {
return err
}
return createECInfoErr(chunks)
}
func (a *assemblerec) retrieveParts(ctx context.Context, trav *placement.Traverser, cnr *container.Container) []*objectSDK.Object {
dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy())

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

We can override the old value here, does it matter?

ch.ID is objectID. In certain situations, you can get the same chunk from different nodes.

`ch.ID` is objectID. In certain situations, you can get the same chunk from different nodes.
parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy())
remoteNodes := make([]placement.Node, 0)
Review

var remoteNodes []placement.Node?

`var remoteNodes []placement.Node`?
for {
batch := trav.Next()
if len(batch) == 0 {
break
}
remoteNodes = append(remoteNodes, batch...)
}
parts, err := a.processECNodesRequests(ctx, remoteNodes, dataCount, parityCount)
if err != nil {
a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err))
}
return parts
}
func (a *assemblerec) processECNodesRequests(ctx context.Context, nodes []placement.Node, dataCount, parityCount int) ([]*objectSDK.Object, error) {
foundChunks := make(map[uint32]*objectSDK.Object)
var foundChunksGuard sync.Mutex
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(dataCount)
for _, ch := range a.ecInfo.localChunks {
ch := ch
eg.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
object := a.tryGetChunkFromLocalStorage(ctx, ch)

s/founded/found, founded is what happened with Saint-Petersburg in 1703 :)

`s/founded/found`, founded is what happened with Saint-Petersburg in 1703 :)

Fixed

Fixed
if object == nil {
return nil
}
foundChunksGuard.Lock()
foundChunks[ch.Index] = object
count := len(foundChunks)
foundChunksGuard.Unlock()
if count >= dataCount {
return errECPartsRetrieveCompleted
}
return nil
})
}
for _, node := range nodes {
var info client.NodeInfo
client.NodeInfoFromNetmapElement(&info, node)
eg.Go(func() error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
chunks := a.tryGetChunkListFromNode(ctx, info)

Just for my curiosity:

We get the list of chunks available on the observing node - could these sets of chunks be intersected by some reason?
It still works as you're using foundChunks map but can be some chunks is gotten by tryGetChunkFromRemoteStorage a few times?

Just for my curiosity: We get the list of chunks available on the observing `node` - could these sets of `chunks` be intersected by some reason? It still works as you're using `foundChunks` map but can be some chunks is gotten by `tryGetChunkFromRemoteStorage` a few times?

Yes, in case of remove shard/return shard scenario. But it looks like rare case.

Yes, in case of remove shard/return shard scenario. But it looks like rare case.
for _, ch := range chunks {
object := a.tryGetChunkFromRemoteStorage(ctx, info, ch)
if object == nil {
continue
}
foundChunksGuard.Lock()
foundChunks[ch.Index] = object
count := len(foundChunks)
foundChunksGuard.Unlock()
if count >= dataCount {
return errECPartsRetrieveCompleted
}
}
return nil
})
}
err := eg.Wait()
if err == nil || errors.Is(err, errECPartsRetrieveCompleted) {
parts := make([]*objectSDK.Object, dataCount+parityCount)
for idx, chunk := range foundChunks {
parts[idx] = chunk
}
return parts, nil
}
return nil, err
}
func (a *assemblerec) tryGetChunkFromLocalStorage(ctx context.Context, ch objectSDK.ECChunk) *objectSDK.Object {
var objID oid.ID
err := objID.ReadFromV2(ch.ID)
if err != nil {
a.log.Error(logs.GetUnableToHeadPartECObject, zap.String("node", "local"), zap.Uint32("part_index", ch.Index), zap.Error(fmt.Errorf("invalid object ID: %w", err)))
return nil
}
var addr oid.Address
addr.SetContainer(addr.Container())
addr.SetObject(objID)
var object *objectSDK.Object
if a.head {
object, err = a.localStorage.Head(ctx, addr, false)
if err != nil {
a.log.Warn(logs.GetUnableToHeadPartECObject, zap.String("node", "local"), zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
} else {
object, err = a.localStorage.Get(ctx, addr)
if err != nil {
a.log.Warn(logs.GetUnableToGetPartECObject, zap.String("node", "local"), zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
}
return object
}
func (a *assemblerec) tryGetChunkListFromNode(ctx context.Context, node client.NodeInfo) []objectSDK.ECChunk {
if chunks, found := a.ecInfo.remoteChunks[string(node.PublicKey())]; found {
return chunks
}
var errECInfo *objectSDK.ECInfoError
_, err := a.remoteStorage.headObjectFromNode(ctx, a.addr, node, true)
if err == nil {
a.log.Error(logs.GetUnexpectedECObject, zap.String("node", hex.EncodeToString(node.PublicKey())))
return nil
}
if !errors.As(err, &errECInfo) {
a.log.Warn(logs.GetUnableToHeadPartsECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err))
return nil
}
result := make([]objectSDK.ECChunk, 0, len(errECInfo.ECInfo().Chunks))
for _, ch := range errECInfo.ECInfo().Chunks {
result = append(result, objectSDK.ECChunk(ch))
}
return result
}
func (a *assemblerec) tryGetChunkFromRemoteStorage(ctx context.Context, node client.NodeInfo, ch objectSDK.ECChunk) *objectSDK.Object {
var objID oid.ID
err := objID.ReadFromV2(ch.ID)
if err != nil {
a.log.Error(logs.GetUnableToHeadPartECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Uint32("part_index", ch.Index), zap.Error(fmt.Errorf("invalid object ID: %w", err)))
return nil
}
var addr oid.Address
addr.SetContainer(a.addr.Container())
addr.SetObject(objID)
var object *objectSDK.Object
if a.head {
object, err = a.remoteStorage.headObjectFromNode(ctx, addr, node, false)
if err != nil {
a.log.Warn(logs.GetUnableToHeadPartECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
} else {
object, err = a.remoteStorage.getObjectFromNode(ctx, addr, node)
if err != nil {
a.log.Warn(logs.GetUnableToGetPartECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
}
return object
}
func createECInfoErr(chunks map[string]objectSDK.ECChunk) *objectSDK.ECInfoError {
info := objectSDK.NewECInfo()
for _, ch := range chunks {
info.AddChunk(ch)
}
return objectSDK.NewECInfoError(info)
}

View file

@ -77,7 +77,7 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error {
prm: prm,
infoSplit: objectSDK.NewSplitInfo(),
infoEC: objectSDK.NewECInfo(),
infoEC: newECInfo(),
log: s.log,
}
@ -110,15 +110,8 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
case statusOutOfRange:
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
case statusEC:
if !exec.isLocal() {
if execCnr {
exec.executeOnContainer(ctx)
exec.analyzeStatus(ctx, false)
} else {
exec.log.Debug(logs.GetRequestedObjectIsEC)
exec.assembleEC(ctx)
}
}
exec.log.Debug(logs.GetRequestedObjectIsEC)
exec.assembleEC(ctx)
default:
exec.log.Debug(logs.OperationFinishedWithError,
zap.Error(exec.err),

View file

@ -11,6 +11,7 @@ import (
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
@ -78,7 +79,7 @@ func newTestStorage() *testStorage {
}
}
func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, error) {
func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, *containerCore.Container, error) {
opts := make([]placement.Option, 0, 4)
opts = append(opts,
placement.ForContainer(g.c),
@ -90,7 +91,10 @@ func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e ui
opts = append(opts, placement.ForObject(*obj))
}
return placement.NewTraverser(opts...)
t, err := placement.NewTraverser(opts...)
return t, &containerCore.Container{
Value: g.c,
}, err
}
func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
@ -474,6 +478,7 @@ func testNodeMatrix(t testing.TB, dim []int) ([][]netmap.NodeInfo, [][]string) {
var ni netmap.NodeInfo
ni.SetNetworkEndpoints(a)
ni.SetPublicKey([]byte(a))

Why is it important?

Why is it important?

Why is it important?

Why is it important?

Get service uses node's public key as map key for EC chunks. Without this change all chunks belong to the same node without public key in unit tests, so unit test fails.

`Get` service uses node's public key as map key for EC chunks. Without this change all chunks belong to the same node without public key in unit tests, so unit test fails.
var na network.AddressGroup

View file

@ -5,7 +5,6 @@ import (
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -46,8 +45,7 @@ func (r *request) executeLocal(ctx context.Context) {
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errECInfo):
r.status = statusEC
util.MergeECInfo(errECInfo.ECInfo(), r.infoEC)
r.err = objectSDK.NewECInfoError(r.infoEC)
r.err = r.infoEC.addLocal(errECInfo.ECInfo())
case errors.As(err, &errOutOfRange):
r.status = statusOutOfRange
r.err = errOutOfRange

View file

@ -7,11 +7,10 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
@ -36,11 +35,6 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
switch {
default:
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
if r.status == statusEC {
// we need to continue getting another chunks from another nodes
// in case of network issue
return false
}
r.status = statusUndefined
r.err = new(apistatus.ObjectNotFound)
case err == nil:
@ -66,18 +60,7 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errECInfo):
r.status = statusEC
util.MergeECInfo(r.infoEC, errECInfo.ECInfo())
r.infoEC = errECInfo.ECInfo()
r.err = objectSDK.NewECInfoError(r.infoEC)
if r.isRaw() {
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
}
cnt, err := r.containerSource.Get(r.address().Container())
if err == nil {
return len(r.infoEC.Chunks) == policy.ECDataCount(cnt.Value.PlacementPolicy())
}
r.log.Debug(logs.GetUnableToGetECObjectContainer, zap.Error(err))
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
r.err = r.infoEC.addRemote(string(info.PublicKey()), errECInfo.ECInfo())
}
return r.status != statusUndefined
@ -116,3 +99,50 @@ func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.N
return rs.Get(ctx, r.address(), prm)
}
func (r *request) getObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error) {
rs, err := r.remoteStorageConstructor.Get(info)
if err != nil {
return nil, err
}
key, err := r.key()
if err != nil {
return nil, err
}
prm := RemoteRequestParams{
Epoch: r.curProcEpoch,
TTL: 1,
PrivateKey: key,
SessionToken: r.prm.common.SessionToken(),
BearerToken: r.prm.common.BearerToken(),
XHeaders: r.prm.common.XHeaders(),
}
return rs.Get(ctx, addr, prm)
}
func (r *request) headObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo, raw bool) (*objectSDK.Object, error) {
rs, err := r.remoteStorageConstructor.Get(info)
if err != nil {
return nil, err
}
key, err := r.key()
if err != nil {
return nil, err
}
prm := RemoteRequestParams{
Epoch: r.curProcEpoch,
TTL: 1,
PrivateKey: key,
SessionToken: r.prm.common.SessionToken(),
BearerToken: r.prm.common.BearerToken(),
XHeaders: r.prm.common.XHeaders(),
IsRaw: raw,
}
return rs.Head(ctx, addr, prm)
}

View file

@ -23,7 +23,7 @@ type request struct {
infoSplit *objectSDK.SplitInfo
infoEC *objectSDK.ECInfo
infoEC *ecInfo
log *logger.Logger
@ -141,7 +141,7 @@ func (r *request) initEpoch() bool {
func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
obj := addr.Object()
t, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch)
t, _, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch)
switch {
default:

View file

@ -6,6 +6,7 @@ import (
"errors"
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
@ -23,7 +24,7 @@ type epochSource interface {
}
type traverserGenerator interface {
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error)
}
type keyStorage interface {
@ -236,3 +237,51 @@ type RangeHashRes struct {
func (r *RangeHashRes) Hashes() [][]byte {
return r.hashes
}
type ecInfo struct {
localChunks []objectSDK.ECChunk
remoteChunks map[string][]objectSDK.ECChunk // node pk -> chunk slice
}
func newECInfo() *ecInfo {
return &ecInfo{
localChunks: make([]objectSDK.ECChunk, 0),
remoteChunks: make(map[string][]objectSDK.ECChunk),
}
}
func (e *ecInfo) addLocal(ecInfo *objectSDK.ECInfo) *objectSDK.ECInfoError {
for _, ch := range ecInfo.Chunks {
e.localChunks = append(e.localChunks, objectSDK.ECChunk(ch))
}
return e.createECInfoErr()
}
func (e *ecInfo) addRemote(nodePK string, ecInfo *objectSDK.ECInfo) *objectSDK.ECInfoError {
for _, ch := range ecInfo.Chunks {
e.remoteChunks[nodePK] = append(e.remoteChunks[nodePK], objectSDK.ECChunk(ch))
}
return e.createECInfoErr()
}
func (e *ecInfo) createECInfoErr() *objectSDK.ECInfoError {
unique := make(map[string]struct{})
result := objectSDK.NewECInfo()
for _, ch := range e.localChunks {
if _, found := unique[string(ch.ID.GetValue())]; found {
continue
}
result.AddChunk(ch)
unique[string(ch.ID.GetValue())] = struct{}{}
}
for _, chunks := range e.remoteChunks {
for _, ch := range chunks {
if _, found := unique[string(ch.ID.GetValue())]; found {
continue
}
result.AddChunk(ch)
unique[string(ch.ID.GetValue())] = struct{}{}
}
}
return objectSDK.NewECInfoError(result)
}

View file

@ -47,7 +47,7 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) error {
zap.Uint64("number", exec.curProcEpoch),
)
traverser, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch)
traverser, _, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch)
if err != nil {
return fmt.Errorf("%s: %w", logs.SearchCouldNotGenerateContainerTraverser, err)
}

View file

@ -11,6 +11,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
@ -80,12 +81,13 @@ func newTestStorage() *testStorage {
}
}
func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, error) {
return placement.NewTraverser(
func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, *containerCore.Container, error) {
t, err := placement.NewTraverser(
placement.ForContainer(g.c),
placement.UseBuilder(g.b[epoch]),
placement.WithoutSuccessTracking(),
)
return t, &containerCore.Container{Value: g.c}, err
}
func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {

View file

@ -4,6 +4,7 @@ import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
@ -45,7 +46,7 @@ type cfg struct {
}
traverserGenerator interface {
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error)
}
currentEpochReceiver interface {

View file

@ -122,17 +122,17 @@ func (g *TraverserGenerator) WithTraverseOptions(opts ...placement.Option) *Trav
// GenerateTraverser generates placement Traverser for provided object address
// using epoch-th network map.
func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, error) {
func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, *container.Container, error) {
// get network map by epoch
nm, err := g.netMapSrc.GetNetMapByEpoch(epoch)
if err != nil {
return nil, fmt.Errorf("could not get network map #%d: %w", epoch, err)
return nil, nil, fmt.Errorf("could not get network map #%d: %w", epoch, err)
}
// get container related container
cnr, err := g.cnrSrc.Get(idCnr)
if err != nil {
return nil, fmt.Errorf("could not get container: %w", err)
return nil, nil, fmt.Errorf("could not get container: %w", err)
}
// allocate placement traverser options
@ -160,5 +160,9 @@ func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoc
)
}
return placement.NewTraverser(traverseOpts...)
t, err := placement.NewTraverser(traverseOpts...)
if err != nil {
return nil, nil, err
}
return t, cnr, nil
}