Fix Get EC object from non container node #1253
|
@ -108,6 +108,7 @@ const (
|
|||
GetUnableToGetPartECObject = "unable to get part of the erasure-encoded object"
|
||||
GetUnableToHeadPartECObject = "unable to head part of the erasure-encoded object"
|
||||
GetUnableToGetECObjectContainer = "unable to get container for erasure-coded object"
|
||||
GetUnableToHeadPartsECObject = "unable to head parts of the erasure-encoded object"
|
||||
GetAssemblingSplittedObjectCompleted = "assembling splitted object completed"
|
||||
GetAssemblingECObjectCompleted = "assembling erasure-coded object completed"
|
||||
GetFailedToAssembleSplittedObject = "failed to assemble splitted object"
|
||||
|
@ -123,6 +124,7 @@ const (
|
|||
GetRequestedObjectIsVirtual = "requested object is virtual"
|
||||
GetRequestedObjectIsEC = "requested object is erasure-coded"
|
||||
GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds"
|
||||
GetUnexpectedECObject = "failed to get EC object from node: expected EC info, but got full object"
|
||||
PutAdditionalContainerBroadcastFailure = "additional container broadcast failure"
|
||||
SearchReturnResultDirectly = "return result directly"
|
||||
SearchCouldNotConstructRemoteNodeClient = "could not construct remote node client"
|
||||
|
|
|
@ -30,7 +30,12 @@ func (w *headSvcWrapper) headAddress(ctx context.Context, exec *execCtx, addr oi
|
|||
wr := getsvc.NewSimpleObjectWriter()
|
||||
|
||||
p := getsvc.HeadPrm{}
|
||||
p.SetCommonParameters(exec.commonParameters())
|
||||
|
||||
if cp := exec.commonParameters(); cp != nil {
|
||||
commonParameters := *cp
|
||||
p.SetCommonParameters(&commonParameters)
|
||||
}
|
||||
|
||||
|
||||
p.SetHeaderWriter(wr)
|
||||
p.WithRawFlag(true)
|
||||
p.WithAddress(addr)
|
||||
|
|
|
@ -140,7 +140,7 @@ func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm Reque
|
|||
|
||||
prm: prm,
|
||||
infoSplit: objectSDK.NewSplitInfo(),
|
||||
infoEC: objectSDK.NewECInfo(),
|
||||
infoEC: newECInfo(),
|
||||
log: r.log,
|
||||
}
|
||||
|
||||
|
|
|
@ -6,11 +6,12 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (r *request) assembleEC(ctx context.Context) {
|
||||
if r.isRaw() {
|
||||
if r.isRaw() && r.isLocal() {
|
||||
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
||||
return
|
||||
}
|
||||
|
@ -35,8 +36,14 @@ func (r *request) assembleEC(ctx context.Context) {
|
|||
|
||||
r.log.Debug(logs.GetTryingToAssembleTheECObject)
|
||||
|
||||
// initialize epoch number
|
||||
ok := r.initEpoch()
|
||||
dstepanov-yadro
commented
Logging and status change - inside Logging and status change - inside `r.initEpoch()` method.
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
r.prm.common = r.prm.common.WithLocalOnly(false)
|
||||
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log)
|
||||
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.localStorage, r.containerSource, r.log, r.headOnly(), r.isRaw(), r.traverserGenerator, r.curProcEpoch)
|
||||
|
||||
r.log.Debug(logs.GetAssemblingECObject,
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
|
@ -47,8 +54,8 @@ func (r *request) assembleEC(ctx context.Context) {
|
|||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||
)
|
||||
|
||||
obj, err := assembler.Assemble(ctx, r.prm.objWriter, r.headOnly())
|
||||
if err != nil {
|
||||
obj, err := assembler.Assemble(ctx, r.prm.objWriter)
|
||||
if err != nil && !errors.As(err, new(*objectSDK.ECInfoError)) {
|
||||
r.log.Warn(logs.GetFailedToAssembleECObject,
|
||||
zap.Error(err),
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
|
@ -58,6 +65,7 @@ func (r *request) assembleEC(ctx context.Context) {
|
|||
|
||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||
var errOutOfRange *apistatus.ObjectOutOfRange
|
||||
var errECInfo *objectSDK.ECInfoError
|
||||
|
||||
switch {
|
||||
default:
|
||||
|
@ -73,5 +81,8 @@ func (r *request) assembleEC(ctx context.Context) {
|
|||
case errors.As(err, &errOutOfRange):
|
||||
r.status = statusOutOfRange
|
||||
r.err = errOutOfRange
|
||||
case errors.As(err, &errECInfo):
|
||||
r.status = statusEC
|
||||
r.err = err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,11 +2,16 @@ package getsvc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -16,61 +21,79 @@ import (
|
|||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
var errECPartsRetrieveCompleted = errors.New("EC parts receive completed")
|
||||
|
||||
type ecRemoteStorage interface {
|
||||
getObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error)
|
||||
headObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo, raw bool) (*objectSDK.Object, error)
|
||||
}
|
||||
|
||||
type assemblerec struct {
|
||||
addr oid.Address
|
||||
ecInfo *objectSDK.ECInfo
|
||||
rng *objectSDK.Range
|
||||
objGetter objectGetter
|
||||
cs container.Source
|
||||
log *logger.Logger
|
||||
addr oid.Address
|
||||
ecInfo *ecInfo
|
||||
rng *objectSDK.Range
|
||||
remoteStorage ecRemoteStorage
|
||||
localStorage localStorage
|
||||
cs container.Source
|
||||
log *logger.Logger
|
||||
head bool
|
||||
raw bool
|
||||
traverserGenerator traverserGenerator
|
||||
epoch uint64
|
||||
}
|
||||
|
||||
func newAssemblerEC(
|
||||
addr oid.Address,
|
||||
ecInfo *objectSDK.ECInfo,
|
||||
ecInfo *ecInfo,
|
||||
rng *objectSDK.Range,
|
||||
objGetter objectGetter,
|
||||
remoteStorage ecRemoteStorage,
|
||||
localStorage localStorage,
|
||||
cs container.Source,
|
||||
log *logger.Logger,
|
||||
head bool,
|
||||
raw bool,
|
||||
tg traverserGenerator,
|
||||
epoch uint64,
|
||||
) *assemblerec {
|
||||
return &assemblerec{
|
||||
addr: addr,
|
||||
rng: rng,
|
||||
ecInfo: ecInfo,
|
||||
objGetter: objGetter,
|
||||
cs: cs,
|
||||
log: log,
|
||||
addr: addr,
|
||||
rng: rng,
|
||||
ecInfo: ecInfo,
|
||||
remoteStorage: remoteStorage,
|
||||
localStorage: localStorage,
|
||||
cs: cs,
|
||||
log: log,
|
||||
head: head,
|
||||
raw: raw,
|
||||
traverserGenerator: tg,
|
||||
epoch: epoch,
|
||||
}
|
||||
}
|
||||
|
||||
// Assemble assembles erasure-coded object and writes it's content to ObjectWriter.
|
||||
// It returns parent object.
|
||||
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) {
|
||||
if headOnly {
|
||||
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
switch {
|
||||
case a.raw:
|
||||
err := a.reconstructRawError(ctx)
|
||||
return nil, err
|
||||
case a.head:
|
||||
return a.reconstructHeader(ctx, writer)
|
||||
} else if a.rng != nil {
|
||||
case a.rng != nil:
|
||||
return a.reconstructRange(ctx, writer)
|
||||
default:
|
||||
return a.reconstructObject(ctx, writer)
|
||||
}
|
||||
return a.reconstructObject(ctx, writer)
|
||||
}
|
||||
|
||||
func (a *assemblerec) getConstructor() (*erasurecode.Constructor, error) {
|
||||
cnt, err := a.cs.Get(a.addr.Container())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dataCount := policy.ECDataCount(cnt.Value.PlacementPolicy())
|
||||
parityCount := policy.ECParityCount(cnt.Value.PlacementPolicy())
|
||||
func (a *assemblerec) getConstructor(cnr *container.Container) (*erasurecode.Constructor, error) {
|
||||
dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy())
|
||||
parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy())
|
||||
return erasurecode.NewConstructor(dataCount, parityCount)
|
||||
}
|
||||
|
||||
func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
parts := a.retrieveParts(ctx, true)
|
||||
c, err := a.getConstructor()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
obj, err := c.ReconstructHeader(parts)
|
||||
obj, err := a.reconstructObjectFromParts(ctx, true)
|
||||
if err == nil {
|
||||
return obj, writer.WriteHeader(ctx, obj)
|
||||
}
|
||||
|
@ -78,12 +101,7 @@ func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter
|
|||
}
|
||||
|
||||
func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
parts := a.retrieveParts(ctx, false)
|
||||
c, err := a.getConstructor()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
obj, err := c.Reconstruct(parts)
|
||||
obj, err := a.reconstructObjectFromParts(ctx, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -101,12 +119,7 @@ func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter)
|
|||
}
|
||||
|
||||
func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
parts := a.retrieveParts(ctx, false)
|
||||
c, err := a.getConstructor()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
obj, err := c.Reconstruct(parts)
|
||||
obj, err := a.reconstructObjectFromParts(ctx, false)
|
||||
if err == nil {
|
||||
err = writer.WriteHeader(ctx, obj.CutPayload())
|
||||
if err == nil {
|
||||
|
@ -119,41 +132,238 @@ func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter
|
|||
return obj, err
|
||||
}
|
||||
|
||||
func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object {
|
||||
parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total))
|
||||
errGroup, ctx := errgroup.WithContext(mainCtx)
|
||||
func (a *assemblerec) reconstructObjectFromParts(ctx context.Context, headers bool) (*objectSDK.Object, error) {
|
||||
objID := a.addr.Object()
|
||||
trav, cnr, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c, err := a.getConstructor(cnr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
fyrchik
commented
This part seems to be common for all
This part seems to be common for all `reconstruct*()` helpers. If we move it one level above, will it make the code cleaner?
```
trav, cnr, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch)
if err != nil {
return nil, err
}
c, err := a.getConstructor(cnr)
if err != nil {
return nil, err
}
```
dstepanov-yadro
commented
Done Done
|
||||
}
|
||||
parts := a.retrieveParts(ctx, trav, cnr)
|
||||
if headers {
|
||||
return c.ReconstructHeader(parts)
|
||||
}
|
||||
return c.Reconstruct(parts)
|
||||
}
|
||||
|
||||
for i := range a.ecInfo.Chunks {
|
||||
chunk := a.ecInfo.Chunks[i]
|
||||
errGroup.Go(func() error {
|
||||
objID := new(oid.ID)
|
||||
err := objID.ReadFromV2(chunk.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid object ID: %w", err)
|
||||
}
|
||||
var obj *objectSDK.Object
|
||||
if headOnly {
|
||||
obj, err = a.objGetter.HeadObject(ctx, *objID)
|
||||
if err != nil {
|
||||
a.log.Debug(logs.GetUnableToHeadPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
sow := NewSimpleObjectWriter()
|
||||
obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow)
|
||||
if err != nil {
|
||||
a.log.Debug(logs.GetUnableToGetPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
obj.SetPayload(sow.pld)
|
||||
}
|
||||
parts[chunk.Index] = obj
|
||||
return nil
|
||||
})
|
||||
func (a *assemblerec) reconstructRawError(ctx context.Context) error {
|
||||
chunks := make(map[string]objectSDK.ECChunk)
|
||||
var chunksGuard sync.Mutex
|
||||
for _, ch := range a.ecInfo.localChunks {
|
||||
chunks[string(ch.ID.GetValue())] = ch
|
||||
}
|
||||
|
||||
if err := errGroup.Wait(); err != nil {
|
||||
objID := a.addr.Object()
|
||||
trav, _, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
for {
|
||||
batch := trav.Next()
|
||||
if len(batch) == 0 {
|
||||
break
|
||||
}
|
||||
for _, node := range batch {
|
||||
var info client.NodeInfo
|
||||
client.NodeInfoFromNetmapElement(&info, node)
|
||||
eg.Go(func() error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if _, found := a.ecInfo.remoteChunks[string(info.PublicKey())]; found {
|
||||
return nil
|
||||
}
|
||||
|
||||
nodeChunks := a.tryGetChunkListFromNode(ctx, info)
|
||||
|
||||
chunksGuard.Lock()
|
||||
defer chunksGuard.Unlock()
|
||||
for _, ch := range nodeChunks {
|
||||
chunks[string(ch.ID.GetValue())] = ch
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
if err = eg.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
return createECInfoErr(chunks)
|
||||
}
|
||||
|
||||
func (a *assemblerec) retrieveParts(ctx context.Context, trav *placement.Traverser, cnr *container.Container) []*objectSDK.Object {
|
||||
dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy())
|
||||
fyrchik
commented
We can override the old value here, does it matter? We can override the old value here, does it matter?
fyrchik
commented
We can override the old value here, does it matter? We can override the old value here, does it matter?
fyrchik
commented
We can override the old value here, does it matter? We can override the old value here, does it matter?
fyrchik
commented
We can override the old value here, does it matter? We can override the old value here, does it matter?
fyrchik
commented
We can override the old value here, does it matter? We can override the old value here, does it matter?
fyrchik
commented
We can override the old value here, does it matter? We can override the old value here, does it matter?
fyrchik
commented
We can override the old value here, does it matter? We can override the old value here, does it matter?
fyrchik
commented
We can override the old value here, does it matter? We can override the old value here, does it matter?
fyrchik
commented
We can override the old value here, does it matter? We can override the old value here, does it matter?
dstepanov-yadro
commented
`ch.ID` is objectID. In certain situations, you can get the same chunk from different nodes.
|
||||
parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy())
|
||||
|
||||
remoteNodes := make([]placement.Node, 0)
|
||||
fyrchik
commented
`var remoteNodes []placement.Node`?
|
||||
for {
|
||||
batch := trav.Next()
|
||||
if len(batch) == 0 {
|
||||
break
|
||||
}
|
||||
remoteNodes = append(remoteNodes, batch...)
|
||||
}
|
||||
|
||||
parts, err := a.processECNodesRequests(ctx, remoteNodes, dataCount, parityCount)
|
||||
if err != nil {
|
||||
a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err))
|
||||
}
|
||||
return parts
|
||||
}
|
||||
|
||||
func (a *assemblerec) processECNodesRequests(ctx context.Context, nodes []placement.Node, dataCount, parityCount int) ([]*objectSDK.Object, error) {
|
||||
foundChunks := make(map[uint32]*objectSDK.Object)
|
||||
var foundChunksGuard sync.Mutex
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
eg.SetLimit(dataCount)
|
||||
|
||||
for _, ch := range a.ecInfo.localChunks {
|
||||
ch := ch
|
||||
eg.Go(func() error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
object := a.tryGetChunkFromLocalStorage(ctx, ch)
|
||||
fyrchik
commented
`s/founded/found`, founded is what happened with Saint-Petersburg in 1703 :)
dstepanov-yadro
commented
Fixed Fixed
|
||||
if object == nil {
|
||||
return nil
|
||||
}
|
||||
foundChunksGuard.Lock()
|
||||
foundChunks[ch.Index] = object
|
||||
count := len(foundChunks)
|
||||
foundChunksGuard.Unlock()
|
||||
if count >= dataCount {
|
||||
return errECPartsRetrieveCompleted
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
var info client.NodeInfo
|
||||
client.NodeInfoFromNetmapElement(&info, node)
|
||||
eg.Go(func() error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
chunks := a.tryGetChunkListFromNode(ctx, info)
|
||||
aarifullin
commented
Just for my curiosity: We get the list of chunks available on the observing Just for my curiosity:
We get the list of chunks available on the observing `node` - could these sets of `chunks` be intersected by some reason?
It still works as you're using `foundChunks` map but can be some chunks is gotten by `tryGetChunkFromRemoteStorage` a few times?
dstepanov-yadro
commented
Yes, in case of remove shard/return shard scenario. But it looks like rare case. Yes, in case of remove shard/return shard scenario. But it looks like rare case.
|
||||
for _, ch := range chunks {
|
||||
object := a.tryGetChunkFromRemoteStorage(ctx, info, ch)
|
||||
if object == nil {
|
||||
continue
|
||||
}
|
||||
foundChunksGuard.Lock()
|
||||
foundChunks[ch.Index] = object
|
||||
count := len(foundChunks)
|
||||
foundChunksGuard.Unlock()
|
||||
if count >= dataCount {
|
||||
return errECPartsRetrieveCompleted
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
err := eg.Wait()
|
||||
if err == nil || errors.Is(err, errECPartsRetrieveCompleted) {
|
||||
parts := make([]*objectSDK.Object, dataCount+parityCount)
|
||||
for idx, chunk := range foundChunks {
|
||||
parts[idx] = chunk
|
||||
}
|
||||
return parts, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (a *assemblerec) tryGetChunkFromLocalStorage(ctx context.Context, ch objectSDK.ECChunk) *objectSDK.Object {
|
||||
var objID oid.ID
|
||||
err := objID.ReadFromV2(ch.ID)
|
||||
if err != nil {
|
||||
a.log.Error(logs.GetUnableToHeadPartECObject, zap.String("node", "local"), zap.Uint32("part_index", ch.Index), zap.Error(fmt.Errorf("invalid object ID: %w", err)))
|
||||
return nil
|
||||
}
|
||||
var addr oid.Address
|
||||
addr.SetContainer(addr.Container())
|
||||
addr.SetObject(objID)
|
||||
var object *objectSDK.Object
|
||||
if a.head {
|
||||
object, err = a.localStorage.Head(ctx, addr, false)
|
||||
if err != nil {
|
||||
a.log.Warn(logs.GetUnableToHeadPartECObject, zap.String("node", "local"), zap.Stringer("part_id", objID), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
object, err = a.localStorage.Get(ctx, addr)
|
||||
if err != nil {
|
||||
a.log.Warn(logs.GetUnableToGetPartECObject, zap.String("node", "local"), zap.Stringer("part_id", objID), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return object
|
||||
}
|
||||
|
||||
func (a *assemblerec) tryGetChunkListFromNode(ctx context.Context, node client.NodeInfo) []objectSDK.ECChunk {
|
||||
if chunks, found := a.ecInfo.remoteChunks[string(node.PublicKey())]; found {
|
||||
return chunks
|
||||
}
|
||||
var errECInfo *objectSDK.ECInfoError
|
||||
_, err := a.remoteStorage.headObjectFromNode(ctx, a.addr, node, true)
|
||||
if err == nil {
|
||||
a.log.Error(logs.GetUnexpectedECObject, zap.String("node", hex.EncodeToString(node.PublicKey())))
|
||||
return nil
|
||||
}
|
||||
if !errors.As(err, &errECInfo) {
|
||||
a.log.Warn(logs.GetUnableToHeadPartsECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
result := make([]objectSDK.ECChunk, 0, len(errECInfo.ECInfo().Chunks))
|
||||
for _, ch := range errECInfo.ECInfo().Chunks {
|
||||
result = append(result, objectSDK.ECChunk(ch))
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (a *assemblerec) tryGetChunkFromRemoteStorage(ctx context.Context, node client.NodeInfo, ch objectSDK.ECChunk) *objectSDK.Object {
|
||||
var objID oid.ID
|
||||
err := objID.ReadFromV2(ch.ID)
|
||||
if err != nil {
|
||||
a.log.Error(logs.GetUnableToHeadPartECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Uint32("part_index", ch.Index), zap.Error(fmt.Errorf("invalid object ID: %w", err)))
|
||||
return nil
|
||||
}
|
||||
var addr oid.Address
|
||||
addr.SetContainer(a.addr.Container())
|
||||
addr.SetObject(objID)
|
||||
var object *objectSDK.Object
|
||||
if a.head {
|
||||
object, err = a.remoteStorage.headObjectFromNode(ctx, addr, node, false)
|
||||
if err != nil {
|
||||
a.log.Warn(logs.GetUnableToHeadPartECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Stringer("part_id", objID), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
object, err = a.remoteStorage.getObjectFromNode(ctx, addr, node)
|
||||
if err != nil {
|
||||
a.log.Warn(logs.GetUnableToGetPartECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Stringer("part_id", objID), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return object
|
||||
}
|
||||
|
||||
func createECInfoErr(chunks map[string]objectSDK.ECChunk) *objectSDK.ECInfoError {
|
||||
info := objectSDK.NewECInfo()
|
||||
for _, ch := range chunks {
|
||||
info.AddChunk(ch)
|
||||
}
|
||||
return objectSDK.NewECInfoError(info)
|
||||
}
|
||||
|
|
|
@ -77,7 +77,7 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error {
|
|||
|
||||
prm: prm,
|
||||
infoSplit: objectSDK.NewSplitInfo(),
|
||||
infoEC: objectSDK.NewECInfo(),
|
||||
infoEC: newECInfo(),
|
||||
log: s.log,
|
||||
}
|
||||
|
||||
|
@ -110,15 +110,8 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
|
|||
case statusOutOfRange:
|
||||
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
|
||||
case statusEC:
|
||||
if !exec.isLocal() {
|
||||
if execCnr {
|
||||
exec.executeOnContainer(ctx)
|
||||
exec.analyzeStatus(ctx, false)
|
||||
} else {
|
||||
exec.log.Debug(logs.GetRequestedObjectIsEC)
|
||||
exec.assembleEC(ctx)
|
||||
}
|
||||
}
|
||||
exec.log.Debug(logs.GetRequestedObjectIsEC)
|
||||
exec.assembleEC(ctx)
|
||||
default:
|
||||
exec.log.Debug(logs.OperationFinishedWithError,
|
||||
zap.Error(exec.err),
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
|
@ -78,7 +79,7 @@ func newTestStorage() *testStorage {
|
|||
}
|
||||
}
|
||||
|
||||
func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, error) {
|
||||
func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, *containerCore.Container, error) {
|
||||
opts := make([]placement.Option, 0, 4)
|
||||
opts = append(opts,
|
||||
placement.ForContainer(g.c),
|
||||
|
@ -90,7 +91,10 @@ func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e ui
|
|||
opts = append(opts, placement.ForObject(*obj))
|
||||
}
|
||||
|
||||
return placement.NewTraverser(opts...)
|
||||
t, err := placement.NewTraverser(opts...)
|
||||
return t, &containerCore.Container{
|
||||
Value: g.c,
|
||||
}, err
|
||||
}
|
||||
|
||||
func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
|
||||
|
@ -474,6 +478,7 @@ func testNodeMatrix(t testing.TB, dim []int) ([][]netmap.NodeInfo, [][]string) {
|
|||
|
||||
var ni netmap.NodeInfo
|
||||
ni.SetNetworkEndpoints(a)
|
||||
ni.SetPublicKey([]byte(a))
|
||||
fyrchik
commented
Why is it important? Why is it important?
fyrchik
commented
Why is it important? Why is it important?
dstepanov-yadro
commented
`Get` service uses node's public key as map key for EC chunks. Without this change all chunks belong to the same node without public key in unit tests, so unit test fails.
|
||||
|
||||
var na network.AddressGroup
|
||||
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -46,8 +45,7 @@ func (r *request) executeLocal(ctx context.Context) {
|
|||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||
case errors.As(err, &errECInfo):
|
||||
r.status = statusEC
|
||||
util.MergeECInfo(errECInfo.ECInfo(), r.infoEC)
|
||||
r.err = objectSDK.NewECInfoError(r.infoEC)
|
||||
r.err = r.infoEC.addLocal(errECInfo.ECInfo())
|
||||
case errors.As(err, &errOutOfRange):
|
||||
r.status = statusOutOfRange
|
||||
r.err = errOutOfRange
|
||||
|
|
|
@ -7,11 +7,10 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -36,11 +35,6 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
|||
switch {
|
||||
default:
|
||||
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
||||
if r.status == statusEC {
|
||||
// we need to continue getting another chunks from another nodes
|
||||
// in case of network issue
|
||||
return false
|
||||
}
|
||||
r.status = statusUndefined
|
||||
r.err = new(apistatus.ObjectNotFound)
|
||||
case err == nil:
|
||||
|
@ -66,18 +60,7 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
|||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||
case errors.As(err, &errECInfo):
|
||||
r.status = statusEC
|
||||
util.MergeECInfo(r.infoEC, errECInfo.ECInfo())
|
||||
r.infoEC = errECInfo.ECInfo()
|
||||
r.err = objectSDK.NewECInfoError(r.infoEC)
|
||||
if r.isRaw() {
|
||||
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
|
||||
}
|
||||
cnt, err := r.containerSource.Get(r.address().Container())
|
||||
if err == nil {
|
||||
return len(r.infoEC.Chunks) == policy.ECDataCount(cnt.Value.PlacementPolicy())
|
||||
}
|
||||
r.log.Debug(logs.GetUnableToGetECObjectContainer, zap.Error(err))
|
||||
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
|
||||
r.err = r.infoEC.addRemote(string(info.PublicKey()), errECInfo.ECInfo())
|
||||
}
|
||||
|
||||
return r.status != statusUndefined
|
||||
|
@ -116,3 +99,50 @@ func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.N
|
|||
|
||||
return rs.Get(ctx, r.address(), prm)
|
||||
}
|
||||
|
||||
func (r *request) getObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error) {
|
||||
rs, err := r.remoteStorageConstructor.Get(info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
key, err := r.key()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
prm := RemoteRequestParams{
|
||||
Epoch: r.curProcEpoch,
|
||||
TTL: 1,
|
||||
PrivateKey: key,
|
||||
SessionToken: r.prm.common.SessionToken(),
|
||||
BearerToken: r.prm.common.BearerToken(),
|
||||
XHeaders: r.prm.common.XHeaders(),
|
||||
}
|
||||
|
||||
return rs.Get(ctx, addr, prm)
|
||||
}
|
||||
|
||||
func (r *request) headObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo, raw bool) (*objectSDK.Object, error) {
|
||||
rs, err := r.remoteStorageConstructor.Get(info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
key, err := r.key()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
prm := RemoteRequestParams{
|
||||
Epoch: r.curProcEpoch,
|
||||
TTL: 1,
|
||||
PrivateKey: key,
|
||||
SessionToken: r.prm.common.SessionToken(),
|
||||
BearerToken: r.prm.common.BearerToken(),
|
||||
XHeaders: r.prm.common.XHeaders(),
|
||||
IsRaw: raw,
|
||||
}
|
||||
|
||||
return rs.Head(ctx, addr, prm)
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ type request struct {
|
|||
|
||||
infoSplit *objectSDK.SplitInfo
|
||||
|
||||
infoEC *objectSDK.ECInfo
|
||||
infoEC *ecInfo
|
||||
|
||||
log *logger.Logger
|
||||
|
||||
|
@ -141,7 +141,7 @@ func (r *request) initEpoch() bool {
|
|||
func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
|
||||
obj := addr.Object()
|
||||
|
||||
t, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch)
|
||||
t, _, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch)
|
||||
|
||||
switch {
|
||||
default:
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"errors"
|
||||
|
||||
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
|
@ -23,7 +24,7 @@ type epochSource interface {
|
|||
}
|
||||
|
||||
type traverserGenerator interface {
|
||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
|
||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error)
|
||||
}
|
||||
|
||||
type keyStorage interface {
|
||||
|
@ -236,3 +237,51 @@ type RangeHashRes struct {
|
|||
func (r *RangeHashRes) Hashes() [][]byte {
|
||||
return r.hashes
|
||||
}
|
||||
|
||||
type ecInfo struct {
|
||||
localChunks []objectSDK.ECChunk
|
||||
remoteChunks map[string][]objectSDK.ECChunk // node pk -> chunk slice
|
||||
}
|
||||
|
||||
func newECInfo() *ecInfo {
|
||||
return &ecInfo{
|
||||
localChunks: make([]objectSDK.ECChunk, 0),
|
||||
remoteChunks: make(map[string][]objectSDK.ECChunk),
|
||||
}
|
||||
}
|
||||
|
||||
func (e *ecInfo) addLocal(ecInfo *objectSDK.ECInfo) *objectSDK.ECInfoError {
|
||||
for _, ch := range ecInfo.Chunks {
|
||||
e.localChunks = append(e.localChunks, objectSDK.ECChunk(ch))
|
||||
}
|
||||
return e.createECInfoErr()
|
||||
}
|
||||
|
||||
func (e *ecInfo) addRemote(nodePK string, ecInfo *objectSDK.ECInfo) *objectSDK.ECInfoError {
|
||||
for _, ch := range ecInfo.Chunks {
|
||||
e.remoteChunks[nodePK] = append(e.remoteChunks[nodePK], objectSDK.ECChunk(ch))
|
||||
}
|
||||
return e.createECInfoErr()
|
||||
}
|
||||
|
||||
func (e *ecInfo) createECInfoErr() *objectSDK.ECInfoError {
|
||||
unique := make(map[string]struct{})
|
||||
result := objectSDK.NewECInfo()
|
||||
for _, ch := range e.localChunks {
|
||||
if _, found := unique[string(ch.ID.GetValue())]; found {
|
||||
continue
|
||||
}
|
||||
result.AddChunk(ch)
|
||||
unique[string(ch.ID.GetValue())] = struct{}{}
|
||||
}
|
||||
for _, chunks := range e.remoteChunks {
|
||||
for _, ch := range chunks {
|
||||
if _, found := unique[string(ch.ID.GetValue())]; found {
|
||||
continue
|
||||
}
|
||||
result.AddChunk(ch)
|
||||
unique[string(ch.ID.GetValue())] = struct{}{}
|
||||
}
|
||||
}
|
||||
return objectSDK.NewECInfoError(result)
|
||||
}
|
||||
|
|
|
@ -47,7 +47,7 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) error {
|
|||
zap.Uint64("number", exec.curProcEpoch),
|
||||
)
|
||||
|
||||
traverser, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch)
|
||||
traverser, _, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s: %w", logs.SearchCouldNotGenerateContainerTraverser, err)
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
|
@ -80,12 +81,13 @@ func newTestStorage() *testStorage {
|
|||
}
|
||||
}
|
||||
|
||||
func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, error) {
|
||||
return placement.NewTraverser(
|
||||
func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, *containerCore.Container, error) {
|
||||
t, err := placement.NewTraverser(
|
||||
placement.ForContainer(g.c),
|
||||
placement.UseBuilder(g.b[epoch]),
|
||||
placement.WithoutSuccessTracking(),
|
||||
)
|
||||
return t, &containerCore.Container{Value: g.c}, err
|
||||
}
|
||||
|
||||
func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
|
@ -45,7 +46,7 @@ type cfg struct {
|
|||
}
|
||||
|
||||
traverserGenerator interface {
|
||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
|
||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error)
|
||||
}
|
||||
|
||||
currentEpochReceiver interface {
|
||||
|
|
|
@ -122,17 +122,17 @@ func (g *TraverserGenerator) WithTraverseOptions(opts ...placement.Option) *Trav
|
|||
|
||||
// GenerateTraverser generates placement Traverser for provided object address
|
||||
// using epoch-th network map.
|
||||
func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, error) {
|
||||
func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, *container.Container, error) {
|
||||
// get network map by epoch
|
||||
nm, err := g.netMapSrc.GetNetMapByEpoch(epoch)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get network map #%d: %w", epoch, err)
|
||||
return nil, nil, fmt.Errorf("could not get network map #%d: %w", epoch, err)
|
||||
}
|
||||
|
||||
// get container related container
|
||||
cnr, err := g.cnrSrc.Get(idCnr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not get container: %w", err)
|
||||
return nil, nil, fmt.Errorf("could not get container: %w", err)
|
||||
}
|
||||
|
||||
// allocate placement traverser options
|
||||
|
@ -160,5 +160,9 @@ func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoc
|
|||
)
|
||||
}
|
||||
|
||||
return placement.NewTraverser(traverseOpts...)
|
||||
t, err := placement.NewTraverser(traverseOpts...)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return t, cnr, nil
|
||||
}
|
||||
|
|
commonParameters
may be changed during Head request (ForgetTokens
,WithLocalOnly
), so need a copy. Founded with autotests.Previously it was possible to pass
nil
, now we pass a pointer to the empty struct.This is a change in behaviour, can anything bad happen?
Previously it was possible to pass
nil
, now we pass a pointer to the empty struct.This is a change in behaviour, can anything bad happen?
Fixed