Fix Get EC object from non container node #1253
|
@ -108,6 +108,7 @@ const (
|
||||||
GetUnableToGetPartECObject = "unable to get part of the erasure-encoded object"
|
GetUnableToGetPartECObject = "unable to get part of the erasure-encoded object"
|
||||||
GetUnableToHeadPartECObject = "unable to head part of the erasure-encoded object"
|
GetUnableToHeadPartECObject = "unable to head part of the erasure-encoded object"
|
||||||
GetUnableToGetECObjectContainer = "unable to get container for erasure-coded object"
|
GetUnableToGetECObjectContainer = "unable to get container for erasure-coded object"
|
||||||
|
GetUnableToHeadPartsECObject = "unable to head parts of the erasure-encoded object"
|
||||||
GetAssemblingSplittedObjectCompleted = "assembling splitted object completed"
|
GetAssemblingSplittedObjectCompleted = "assembling splitted object completed"
|
||||||
GetAssemblingECObjectCompleted = "assembling erasure-coded object completed"
|
GetAssemblingECObjectCompleted = "assembling erasure-coded object completed"
|
||||||
GetFailedToAssembleSplittedObject = "failed to assemble splitted object"
|
GetFailedToAssembleSplittedObject = "failed to assemble splitted object"
|
||||||
|
@ -123,6 +124,7 @@ const (
|
||||||
GetRequestedObjectIsVirtual = "requested object is virtual"
|
GetRequestedObjectIsVirtual = "requested object is virtual"
|
||||||
GetRequestedObjectIsEC = "requested object is erasure-coded"
|
GetRequestedObjectIsEC = "requested object is erasure-coded"
|
||||||
GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds"
|
GetRequestedRangeIsOutOfObjectBounds = "requested range is out of object bounds"
|
||||||
|
GetUnexpectedECObject = "failed to get EC object from node: expected EC info, but got full object"
|
||||||
PutAdditionalContainerBroadcastFailure = "additional container broadcast failure"
|
PutAdditionalContainerBroadcastFailure = "additional container broadcast failure"
|
||||||
SearchReturnResultDirectly = "return result directly"
|
SearchReturnResultDirectly = "return result directly"
|
||||||
SearchCouldNotConstructRemoteNodeClient = "could not construct remote node client"
|
SearchCouldNotConstructRemoteNodeClient = "could not construct remote node client"
|
||||||
|
|
|
@ -30,7 +30,12 @@ func (w *headSvcWrapper) headAddress(ctx context.Context, exec *execCtx, addr oi
|
||||||
wr := getsvc.NewSimpleObjectWriter()
|
wr := getsvc.NewSimpleObjectWriter()
|
||||||
|
|
||||||
p := getsvc.HeadPrm{}
|
p := getsvc.HeadPrm{}
|
||||||
p.SetCommonParameters(exec.commonParameters())
|
|
||||||
|
if cp := exec.commonParameters(); cp != nil {
|
||||||
|
commonParameters := *cp
|
||||||
|
p.SetCommonParameters(&commonParameters)
|
||||||
|
}
|
||||||
|
|
||||||
|
|||||||
p.SetHeaderWriter(wr)
|
p.SetHeaderWriter(wr)
|
||||||
p.WithRawFlag(true)
|
p.WithRawFlag(true)
|
||||||
p.WithAddress(addr)
|
p.WithAddress(addr)
|
||||||
|
|
|
@ -140,7 +140,7 @@ func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm Reque
|
||||||
|
|
||||||
prm: prm,
|
prm: prm,
|
||||||
infoSplit: objectSDK.NewSplitInfo(),
|
infoSplit: objectSDK.NewSplitInfo(),
|
||||||
infoEC: objectSDK.NewECInfo(),
|
infoEC: newECInfo(),
|
||||||
log: r.log,
|
log: r.log,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,11 +6,12 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (r *request) assembleEC(ctx context.Context) {
|
func (r *request) assembleEC(ctx context.Context) {
|
||||||
if r.isRaw() {
|
if r.isRaw() && r.isLocal() {
|
||||||
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -35,8 +36,14 @@ func (r *request) assembleEC(ctx context.Context) {
|
||||||
|
|
||||||
r.log.Debug(logs.GetTryingToAssembleTheECObject)
|
r.log.Debug(logs.GetTryingToAssembleTheECObject)
|
||||||
|
|
||||||
|
// initialize epoch number
|
||||||
|
ok := r.initEpoch()
|
||||||
dstepanov-yadro
commented
Logging and status change - inside Logging and status change - inside `r.initEpoch()` method.
|
|||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
r.prm.common = r.prm.common.WithLocalOnly(false)
|
r.prm.common = r.prm.common.WithLocalOnly(false)
|
||||||
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.containerSource, r.log)
|
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.localStorage, r.containerSource, r.log, r.headOnly(), r.isRaw(), r.traverserGenerator, r.curProcEpoch)
|
||||||
|
|
||||||
r.log.Debug(logs.GetAssemblingECObject,
|
r.log.Debug(logs.GetAssemblingECObject,
|
||||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
|
@ -47,8 +54,8 @@ func (r *request) assembleEC(ctx context.Context) {
|
||||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||||
)
|
)
|
||||||
|
|
||||||
obj, err := assembler.Assemble(ctx, r.prm.objWriter, r.headOnly())
|
obj, err := assembler.Assemble(ctx, r.prm.objWriter)
|
||||||
if err != nil {
|
if err != nil && !errors.As(err, new(*objectSDK.ECInfoError)) {
|
||||||
r.log.Warn(logs.GetFailedToAssembleECObject,
|
r.log.Warn(logs.GetFailedToAssembleECObject,
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
|
@ -58,6 +65,7 @@ func (r *request) assembleEC(ctx context.Context) {
|
||||||
|
|
||||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||||
var errOutOfRange *apistatus.ObjectOutOfRange
|
var errOutOfRange *apistatus.ObjectOutOfRange
|
||||||
|
var errECInfo *objectSDK.ECInfoError
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
default:
|
default:
|
||||||
|
@ -73,5 +81,8 @@ func (r *request) assembleEC(ctx context.Context) {
|
||||||
case errors.As(err, &errOutOfRange):
|
case errors.As(err, &errOutOfRange):
|
||||||
r.status = statusOutOfRange
|
r.status = statusOutOfRange
|
||||||
r.err = errOutOfRange
|
r.err = errOutOfRange
|
||||||
|
case errors.As(err, &errECInfo):
|
||||||
|
r.status = statusEC
|
||||||
|
r.err = err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,11 +2,16 @@ package getsvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
@ -16,61 +21,79 @@ import (
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var errECPartsRetrieveCompleted = errors.New("EC parts receive completed")
|
||||||
|
|
||||||
|
type ecRemoteStorage interface {
|
||||||
|
getObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error)
|
||||||
|
headObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo, raw bool) (*objectSDK.Object, error)
|
||||||
|
}
|
||||||
|
|
||||||
type assemblerec struct {
|
type assemblerec struct {
|
||||||
addr oid.Address
|
addr oid.Address
|
||||||
ecInfo *objectSDK.ECInfo
|
ecInfo *ecInfo
|
||||||
rng *objectSDK.Range
|
rng *objectSDK.Range
|
||||||
objGetter objectGetter
|
remoteStorage ecRemoteStorage
|
||||||
|
localStorage localStorage
|
||||||
cs container.Source
|
cs container.Source
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
head bool
|
||||||
|
raw bool
|
||||||
|
traverserGenerator traverserGenerator
|
||||||
|
epoch uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func newAssemblerEC(
|
func newAssemblerEC(
|
||||||
addr oid.Address,
|
addr oid.Address,
|
||||||
ecInfo *objectSDK.ECInfo,
|
ecInfo *ecInfo,
|
||||||
rng *objectSDK.Range,
|
rng *objectSDK.Range,
|
||||||
objGetter objectGetter,
|
remoteStorage ecRemoteStorage,
|
||||||
|
localStorage localStorage,
|
||||||
cs container.Source,
|
cs container.Source,
|
||||||
log *logger.Logger,
|
log *logger.Logger,
|
||||||
|
head bool,
|
||||||
|
raw bool,
|
||||||
|
tg traverserGenerator,
|
||||||
|
epoch uint64,
|
||||||
) *assemblerec {
|
) *assemblerec {
|
||||||
return &assemblerec{
|
return &assemblerec{
|
||||||
addr: addr,
|
addr: addr,
|
||||||
rng: rng,
|
rng: rng,
|
||||||
ecInfo: ecInfo,
|
ecInfo: ecInfo,
|
||||||
objGetter: objGetter,
|
remoteStorage: remoteStorage,
|
||||||
|
localStorage: localStorage,
|
||||||
cs: cs,
|
cs: cs,
|
||||||
log: log,
|
log: log,
|
||||||
|
head: head,
|
||||||
|
raw: raw,
|
||||||
|
traverserGenerator: tg,
|
||||||
|
epoch: epoch,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Assemble assembles erasure-coded object and writes it's content to ObjectWriter.
|
// Assemble assembles erasure-coded object and writes it's content to ObjectWriter.
|
||||||
// It returns parent object.
|
// It returns parent object.
|
||||||
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) {
|
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||||
if headOnly {
|
switch {
|
||||||
|
case a.raw:
|
||||||
|
err := a.reconstructRawError(ctx)
|
||||||
|
return nil, err
|
||||||
|
case a.head:
|
||||||
return a.reconstructHeader(ctx, writer)
|
return a.reconstructHeader(ctx, writer)
|
||||||
} else if a.rng != nil {
|
case a.rng != nil:
|
||||||
return a.reconstructRange(ctx, writer)
|
return a.reconstructRange(ctx, writer)
|
||||||
}
|
default:
|
||||||
return a.reconstructObject(ctx, writer)
|
return a.reconstructObject(ctx, writer)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *assemblerec) getConstructor() (*erasurecode.Constructor, error) {
|
|
||||||
cnt, err := a.cs.Get(a.addr.Container())
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
}
|
||||||
dataCount := policy.ECDataCount(cnt.Value.PlacementPolicy())
|
|
||||||
parityCount := policy.ECParityCount(cnt.Value.PlacementPolicy())
|
func (a *assemblerec) getConstructor(cnr *container.Container) (*erasurecode.Constructor, error) {
|
||||||
|
dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy())
|
||||||
|
parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy())
|
||||||
return erasurecode.NewConstructor(dataCount, parityCount)
|
return erasurecode.NewConstructor(dataCount, parityCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||||
parts := a.retrieveParts(ctx, true)
|
obj, err := a.reconstructObjectFromParts(ctx, true)
|
||||||
c, err := a.getConstructor()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
obj, err := c.ReconstructHeader(parts)
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return obj, writer.WriteHeader(ctx, obj)
|
return obj, writer.WriteHeader(ctx, obj)
|
||||||
}
|
}
|
||||||
|
@ -78,12 +101,7 @@ func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||||
parts := a.retrieveParts(ctx, false)
|
obj, err := a.reconstructObjectFromParts(ctx, false)
|
||||||
c, err := a.getConstructor()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
obj, err := c.Reconstruct(parts)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -101,12 +119,7 @@ func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||||
parts := a.retrieveParts(ctx, false)
|
obj, err := a.reconstructObjectFromParts(ctx, false)
|
||||||
c, err := a.getConstructor()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
obj, err := c.Reconstruct(parts)
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
err = writer.WriteHeader(ctx, obj.CutPayload())
|
err = writer.WriteHeader(ctx, obj.CutPayload())
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
@ -119,41 +132,238 @@ func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter
|
||||||
return obj, err
|
return obj, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object {
|
func (a *assemblerec) reconstructObjectFromParts(ctx context.Context, headers bool) (*objectSDK.Object, error) {
|
||||||
parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total))
|
objID := a.addr.Object()
|
||||||
errGroup, ctx := errgroup.WithContext(mainCtx)
|
trav, cnr, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
c, err := a.getConstructor(cnr)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
fyrchik
commented
This part seems to be common for all
This part seems to be common for all `reconstruct*()` helpers. If we move it one level above, will it make the code cleaner?
```
trav, cnr, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch)
if err != nil {
return nil, err
}
c, err := a.getConstructor(cnr)
if err != nil {
return nil, err
}
```
dstepanov-yadro
commented
Done Done
|
|||||||
|
}
|
||||||
|
parts := a.retrieveParts(ctx, trav, cnr)
|
||||||
|
if headers {
|
||||||
|
return c.ReconstructHeader(parts)
|
||||||
|
}
|
||||||
|
return c.Reconstruct(parts)
|
||||||
|
}
|
||||||
|
|
||||||
for i := range a.ecInfo.Chunks {
|
func (a *assemblerec) reconstructRawError(ctx context.Context) error {
|
||||||
chunk := a.ecInfo.Chunks[i]
|
chunks := make(map[string]objectSDK.ECChunk)
|
||||||
errGroup.Go(func() error {
|
var chunksGuard sync.Mutex
|
||||||
objID := new(oid.ID)
|
for _, ch := range a.ecInfo.localChunks {
|
||||||
err := objID.ReadFromV2(chunk.ID)
|
chunks[string(ch.ID.GetValue())] = ch
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("invalid object ID: %w", err)
|
|
||||||
}
|
}
|
||||||
var obj *objectSDK.Object
|
|
||||||
if headOnly {
|
objID := a.addr.Object()
|
||||||
obj, err = a.objGetter.HeadObject(ctx, *objID)
|
trav, _, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Debug(logs.GetUnableToHeadPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
eg, ctx := errgroup.WithContext(ctx)
|
||||||
|
for {
|
||||||
|
batch := trav.Next()
|
||||||
|
if len(batch) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
for _, node := range batch {
|
||||||
|
var info client.NodeInfo
|
||||||
|
client.NodeInfoFromNetmapElement(&info, node)
|
||||||
|
eg.Go(func() error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, found := a.ecInfo.remoteChunks[string(info.PublicKey())]; found {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
sow := NewSimpleObjectWriter()
|
nodeChunks := a.tryGetChunkListFromNode(ctx, info)
|
||||||
obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow)
|
|
||||||
if err != nil {
|
chunksGuard.Lock()
|
||||||
a.log.Debug(logs.GetUnableToGetPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
|
defer chunksGuard.Unlock()
|
||||||
return nil
|
for _, ch := range nodeChunks {
|
||||||
|
chunks[string(ch.ID.GetValue())] = ch
|
||||||
}
|
}
|
||||||
obj.SetPayload(sow.pld)
|
|
||||||
}
|
|
||||||
parts[chunk.Index] = obj
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
if err = eg.Wait(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return createECInfoErr(chunks)
|
||||||
|
}
|
||||||
|
|
||||||
if err := errGroup.Wait(); err != nil {
|
func (a *assemblerec) retrieveParts(ctx context.Context, trav *placement.Traverser, cnr *container.Container) []*objectSDK.Object {
|
||||||
|
dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy())
|
||||||
fyrchik
commented
We can override the old value here, does it matter? We can override the old value here, does it matter?
fyrchik
commented
We can override the old value here, does it matter? We can override the old value here, does it matter?
fyrchik
commented
We can override the old value here, does it matter? We can override the old value here, does it matter?
fyrchik
commented
We can override the old value here, does it matter? We can override the old value here, does it matter?
fyrchik
commented
We can override the old value here, does it matter? We can override the old value here, does it matter?
fyrchik
commented
We can override the old value here, does it matter? We can override the old value here, does it matter?
fyrchik
commented
We can override the old value here, does it matter? We can override the old value here, does it matter?
fyrchik
commented
We can override the old value here, does it matter? We can override the old value here, does it matter?
fyrchik
commented
We can override the old value here, does it matter? We can override the old value here, does it matter?
dstepanov-yadro
commented
`ch.ID` is objectID. In certain situations, you can get the same chunk from different nodes.
|
|||||||
|
parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy())
|
||||||
|
|
||||||
|
remoteNodes := make([]placement.Node, 0)
|
||||||
fyrchik
commented
`var remoteNodes []placement.Node`?
|
|||||||
|
for {
|
||||||
|
batch := trav.Next()
|
||||||
|
if len(batch) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
remoteNodes = append(remoteNodes, batch...)
|
||||||
|
}
|
||||||
|
|
||||||
|
parts, err := a.processECNodesRequests(ctx, remoteNodes, dataCount, parityCount)
|
||||||
|
if err != nil {
|
||||||
a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err))
|
a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err))
|
||||||
}
|
}
|
||||||
return parts
|
return parts
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *assemblerec) processECNodesRequests(ctx context.Context, nodes []placement.Node, dataCount, parityCount int) ([]*objectSDK.Object, error) {
|
||||||
|
foundChunks := make(map[uint32]*objectSDK.Object)
|
||||||
|
var foundChunksGuard sync.Mutex
|
||||||
|
eg, ctx := errgroup.WithContext(ctx)
|
||||||
|
eg.SetLimit(dataCount)
|
||||||
|
|
||||||
|
for _, ch := range a.ecInfo.localChunks {
|
||||||
|
ch := ch
|
||||||
|
eg.Go(func() error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
object := a.tryGetChunkFromLocalStorage(ctx, ch)
|
||||||
fyrchik
commented
`s/founded/found`, founded is what happened with Saint-Petersburg in 1703 :)
dstepanov-yadro
commented
Fixed Fixed
|
|||||||
|
if object == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
foundChunksGuard.Lock()
|
||||||
|
foundChunks[ch.Index] = object
|
||||||
|
count := len(foundChunks)
|
||||||
|
foundChunksGuard.Unlock()
|
||||||
|
if count >= dataCount {
|
||||||
|
return errECPartsRetrieveCompleted
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, node := range nodes {
|
||||||
|
var info client.NodeInfo
|
||||||
|
client.NodeInfoFromNetmapElement(&info, node)
|
||||||
|
eg.Go(func() error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
chunks := a.tryGetChunkListFromNode(ctx, info)
|
||||||
aarifullin
commented
Just for my curiosity: We get the list of chunks available on the observing Just for my curiosity:
We get the list of chunks available on the observing `node` - could these sets of `chunks` be intersected by some reason?
It still works as you're using `foundChunks` map but can be some chunks is gotten by `tryGetChunkFromRemoteStorage` a few times?
dstepanov-yadro
commented
Yes, in case of remove shard/return shard scenario. But it looks like rare case. Yes, in case of remove shard/return shard scenario. But it looks like rare case.
|
|||||||
|
for _, ch := range chunks {
|
||||||
|
object := a.tryGetChunkFromRemoteStorage(ctx, info, ch)
|
||||||
|
if object == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
foundChunksGuard.Lock()
|
||||||
|
foundChunks[ch.Index] = object
|
||||||
|
count := len(foundChunks)
|
||||||
|
foundChunksGuard.Unlock()
|
||||||
|
if count >= dataCount {
|
||||||
|
return errECPartsRetrieveCompleted
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
err := eg.Wait()
|
||||||
|
if err == nil || errors.Is(err, errECPartsRetrieveCompleted) {
|
||||||
|
parts := make([]*objectSDK.Object, dataCount+parityCount)
|
||||||
|
for idx, chunk := range foundChunks {
|
||||||
|
parts[idx] = chunk
|
||||||
|
}
|
||||||
|
return parts, nil
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *assemblerec) tryGetChunkFromLocalStorage(ctx context.Context, ch objectSDK.ECChunk) *objectSDK.Object {
|
||||||
|
var objID oid.ID
|
||||||
|
err := objID.ReadFromV2(ch.ID)
|
||||||
|
if err != nil {
|
||||||
|
a.log.Error(logs.GetUnableToHeadPartECObject, zap.String("node", "local"), zap.Uint32("part_index", ch.Index), zap.Error(fmt.Errorf("invalid object ID: %w", err)))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(addr.Container())
|
||||||
|
addr.SetObject(objID)
|
||||||
|
var object *objectSDK.Object
|
||||||
|
if a.head {
|
||||||
|
object, err = a.localStorage.Head(ctx, addr, false)
|
||||||
|
if err != nil {
|
||||||
|
a.log.Warn(logs.GetUnableToHeadPartECObject, zap.String("node", "local"), zap.Stringer("part_id", objID), zap.Error(err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
object, err = a.localStorage.Get(ctx, addr)
|
||||||
|
if err != nil {
|
||||||
|
a.log.Warn(logs.GetUnableToGetPartECObject, zap.String("node", "local"), zap.Stringer("part_id", objID), zap.Error(err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return object
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *assemblerec) tryGetChunkListFromNode(ctx context.Context, node client.NodeInfo) []objectSDK.ECChunk {
|
||||||
|
if chunks, found := a.ecInfo.remoteChunks[string(node.PublicKey())]; found {
|
||||||
|
return chunks
|
||||||
|
}
|
||||||
|
var errECInfo *objectSDK.ECInfoError
|
||||||
|
_, err := a.remoteStorage.headObjectFromNode(ctx, a.addr, node, true)
|
||||||
|
if err == nil {
|
||||||
|
a.log.Error(logs.GetUnexpectedECObject, zap.String("node", hex.EncodeToString(node.PublicKey())))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if !errors.As(err, &errECInfo) {
|
||||||
|
a.log.Warn(logs.GetUnableToHeadPartsECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Error(err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
result := make([]objectSDK.ECChunk, 0, len(errECInfo.ECInfo().Chunks))
|
||||||
|
for _, ch := range errECInfo.ECInfo().Chunks {
|
||||||
|
result = append(result, objectSDK.ECChunk(ch))
|
||||||
|
}
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *assemblerec) tryGetChunkFromRemoteStorage(ctx context.Context, node client.NodeInfo, ch objectSDK.ECChunk) *objectSDK.Object {
|
||||||
|
var objID oid.ID
|
||||||
|
err := objID.ReadFromV2(ch.ID)
|
||||||
|
if err != nil {
|
||||||
|
a.log.Error(logs.GetUnableToHeadPartECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Uint32("part_index", ch.Index), zap.Error(fmt.Errorf("invalid object ID: %w", err)))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(a.addr.Container())
|
||||||
|
addr.SetObject(objID)
|
||||||
|
var object *objectSDK.Object
|
||||||
|
if a.head {
|
||||||
|
object, err = a.remoteStorage.headObjectFromNode(ctx, addr, node, false)
|
||||||
|
if err != nil {
|
||||||
|
a.log.Warn(logs.GetUnableToHeadPartECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Stringer("part_id", objID), zap.Error(err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
object, err = a.remoteStorage.getObjectFromNode(ctx, addr, node)
|
||||||
|
if err != nil {
|
||||||
|
a.log.Warn(logs.GetUnableToGetPartECObject, zap.String("node", hex.EncodeToString(node.PublicKey())), zap.Stringer("part_id", objID), zap.Error(err))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return object
|
||||||
|
}
|
||||||
|
|
||||||
|
func createECInfoErr(chunks map[string]objectSDK.ECChunk) *objectSDK.ECInfoError {
|
||||||
|
info := objectSDK.NewECInfo()
|
||||||
|
for _, ch := range chunks {
|
||||||
|
info.AddChunk(ch)
|
||||||
|
}
|
||||||
|
return objectSDK.NewECInfoError(info)
|
||||||
|
}
|
||||||
|
|
|
@ -77,7 +77,7 @@ func (s *Service) get(ctx context.Context, prm RequestParameters) error {
|
||||||
|
|
||||||
prm: prm,
|
prm: prm,
|
||||||
infoSplit: objectSDK.NewSplitInfo(),
|
infoSplit: objectSDK.NewSplitInfo(),
|
||||||
infoEC: objectSDK.NewECInfo(),
|
infoEC: newECInfo(),
|
||||||
log: s.log,
|
log: s.log,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -110,15 +110,8 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
|
||||||
case statusOutOfRange:
|
case statusOutOfRange:
|
||||||
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
|
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
|
||||||
case statusEC:
|
case statusEC:
|
||||||
if !exec.isLocal() {
|
|
||||||
if execCnr {
|
|
||||||
exec.executeOnContainer(ctx)
|
|
||||||
exec.analyzeStatus(ctx, false)
|
|
||||||
} else {
|
|
||||||
exec.log.Debug(logs.GetRequestedObjectIsEC)
|
exec.log.Debug(logs.GetRequestedObjectIsEC)
|
||||||
exec.assembleEC(ctx)
|
exec.assembleEC(ctx)
|
||||||
}
|
|
||||||
}
|
|
||||||
default:
|
default:
|
||||||
exec.log.Debug(logs.OperationFinishedWithError,
|
exec.log.Debug(logs.OperationFinishedWithError,
|
||||||
zap.Error(exec.err),
|
zap.Error(exec.err),
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
|
@ -78,7 +79,7 @@ func newTestStorage() *testStorage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, error) {
|
func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e uint64) (*placement.Traverser, *containerCore.Container, error) {
|
||||||
opts := make([]placement.Option, 0, 4)
|
opts := make([]placement.Option, 0, 4)
|
||||||
opts = append(opts,
|
opts = append(opts,
|
||||||
placement.ForContainer(g.c),
|
placement.ForContainer(g.c),
|
||||||
|
@ -90,7 +91,10 @@ func (g *testTraverserGenerator) GenerateTraverser(cnr cid.ID, obj *oid.ID, e ui
|
||||||
opts = append(opts, placement.ForObject(*obj))
|
opts = append(opts, placement.ForObject(*obj))
|
||||||
}
|
}
|
||||||
|
|
||||||
return placement.NewTraverser(opts...)
|
t, err := placement.NewTraverser(opts...)
|
||||||
|
return t, &containerCore.Container{
|
||||||
|
Value: g.c,
|
||||||
|
}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
|
func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
|
||||||
|
@ -474,6 +478,7 @@ func testNodeMatrix(t testing.TB, dim []int) ([][]netmap.NodeInfo, [][]string) {
|
||||||
|
|
||||||
var ni netmap.NodeInfo
|
var ni netmap.NodeInfo
|
||||||
ni.SetNetworkEndpoints(a)
|
ni.SetNetworkEndpoints(a)
|
||||||
|
ni.SetPublicKey([]byte(a))
|
||||||
fyrchik
commented
Why is it important? Why is it important?
fyrchik
commented
Why is it important? Why is it important?
dstepanov-yadro
commented
`Get` service uses node's public key as map key for EC chunks. Without this change all chunks belong to the same node without public key in unit tests, so unit test fails.
|
|||||||
|
|
||||||
var na network.AddressGroup
|
var na network.AddressGroup
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,6 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
@ -46,8 +45,7 @@ func (r *request) executeLocal(ctx context.Context) {
|
||||||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||||
case errors.As(err, &errECInfo):
|
case errors.As(err, &errECInfo):
|
||||||
r.status = statusEC
|
r.status = statusEC
|
||||||
util.MergeECInfo(errECInfo.ECInfo(), r.infoEC)
|
r.err = r.infoEC.addLocal(errECInfo.ECInfo())
|
||||||
r.err = objectSDK.NewECInfoError(r.infoEC)
|
|
||||||
case errors.As(err, &errOutOfRange):
|
case errors.As(err, &errOutOfRange):
|
||||||
r.status = statusOutOfRange
|
r.status = statusOutOfRange
|
||||||
r.err = errOutOfRange
|
r.err = errOutOfRange
|
||||||
|
|
|
@ -7,11 +7,10 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/util"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -36,11 +35,6 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
||||||
switch {
|
switch {
|
||||||
default:
|
default:
|
||||||
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
||||||
if r.status == statusEC {
|
|
||||||
// we need to continue getting another chunks from another nodes
|
|
||||||
// in case of network issue
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
r.status = statusUndefined
|
r.status = statusUndefined
|
||||||
r.err = new(apistatus.ObjectNotFound)
|
r.err = new(apistatus.ObjectNotFound)
|
||||||
case err == nil:
|
case err == nil:
|
||||||
|
@ -66,18 +60,7 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
||||||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||||
case errors.As(err, &errECInfo):
|
case errors.As(err, &errECInfo):
|
||||||
r.status = statusEC
|
r.status = statusEC
|
||||||
util.MergeECInfo(r.infoEC, errECInfo.ECInfo())
|
r.err = r.infoEC.addRemote(string(info.PublicKey()), errECInfo.ECInfo())
|
||||||
r.infoEC = errECInfo.ECInfo()
|
|
||||||
r.err = objectSDK.NewECInfoError(r.infoEC)
|
|
||||||
if r.isRaw() {
|
|
||||||
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
|
|
||||||
}
|
|
||||||
cnt, err := r.containerSource.Get(r.address().Container())
|
|
||||||
if err == nil {
|
|
||||||
return len(r.infoEC.Chunks) == policy.ECDataCount(cnt.Value.PlacementPolicy())
|
|
||||||
}
|
|
||||||
r.log.Debug(logs.GetUnableToGetECObjectContainer, zap.Error(err))
|
|
||||||
return len(r.infoEC.Chunks) == int(r.infoEC.Chunks[0].Total)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return r.status != statusUndefined
|
return r.status != statusUndefined
|
||||||
|
@ -116,3 +99,50 @@ func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.N
|
||||||
|
|
||||||
return rs.Get(ctx, r.address(), prm)
|
return rs.Get(ctx, r.address(), prm)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *request) getObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo) (*objectSDK.Object, error) {
|
||||||
|
rs, err := r.remoteStorageConstructor.Get(info)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
key, err := r.key()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
prm := RemoteRequestParams{
|
||||||
|
Epoch: r.curProcEpoch,
|
||||||
|
TTL: 1,
|
||||||
|
PrivateKey: key,
|
||||||
|
SessionToken: r.prm.common.SessionToken(),
|
||||||
|
BearerToken: r.prm.common.BearerToken(),
|
||||||
|
XHeaders: r.prm.common.XHeaders(),
|
||||||
|
}
|
||||||
|
|
||||||
|
return rs.Get(ctx, addr, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) headObjectFromNode(ctx context.Context, addr oid.Address, info client.NodeInfo, raw bool) (*objectSDK.Object, error) {
|
||||||
|
rs, err := r.remoteStorageConstructor.Get(info)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
key, err := r.key()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
prm := RemoteRequestParams{
|
||||||
|
Epoch: r.curProcEpoch,
|
||||||
|
TTL: 1,
|
||||||
|
PrivateKey: key,
|
||||||
|
SessionToken: r.prm.common.SessionToken(),
|
||||||
|
BearerToken: r.prm.common.BearerToken(),
|
||||||
|
XHeaders: r.prm.common.XHeaders(),
|
||||||
|
IsRaw: raw,
|
||||||
|
}
|
||||||
|
|
||||||
|
return rs.Head(ctx, addr, prm)
|
||||||
|
}
|
||||||
|
|
|
@ -23,7 +23,7 @@ type request struct {
|
||||||
|
|
||||||
infoSplit *objectSDK.SplitInfo
|
infoSplit *objectSDK.SplitInfo
|
||||||
|
|
||||||
infoEC *objectSDK.ECInfo
|
infoEC *ecInfo
|
||||||
|
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
|
||||||
|
@ -141,7 +141,7 @@ func (r *request) initEpoch() bool {
|
||||||
func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
|
func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
|
||||||
obj := addr.Object()
|
obj := addr.Object()
|
||||||
|
|
||||||
t, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch)
|
t, _, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch)
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
@ -23,7 +24,7 @@ type epochSource interface {
|
||||||
}
|
}
|
||||||
|
|
||||||
type traverserGenerator interface {
|
type traverserGenerator interface {
|
||||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
|
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type keyStorage interface {
|
type keyStorage interface {
|
||||||
|
@ -236,3 +237,51 @@ type RangeHashRes struct {
|
||||||
func (r *RangeHashRes) Hashes() [][]byte {
|
func (r *RangeHashRes) Hashes() [][]byte {
|
||||||
return r.hashes
|
return r.hashes
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ecInfo struct {
|
||||||
|
localChunks []objectSDK.ECChunk
|
||||||
|
remoteChunks map[string][]objectSDK.ECChunk // node pk -> chunk slice
|
||||||
|
}
|
||||||
|
|
||||||
|
func newECInfo() *ecInfo {
|
||||||
|
return &ecInfo{
|
||||||
|
localChunks: make([]objectSDK.ECChunk, 0),
|
||||||
|
remoteChunks: make(map[string][]objectSDK.ECChunk),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecInfo) addLocal(ecInfo *objectSDK.ECInfo) *objectSDK.ECInfoError {
|
||||||
|
for _, ch := range ecInfo.Chunks {
|
||||||
|
e.localChunks = append(e.localChunks, objectSDK.ECChunk(ch))
|
||||||
|
}
|
||||||
|
return e.createECInfoErr()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecInfo) addRemote(nodePK string, ecInfo *objectSDK.ECInfo) *objectSDK.ECInfoError {
|
||||||
|
for _, ch := range ecInfo.Chunks {
|
||||||
|
e.remoteChunks[nodePK] = append(e.remoteChunks[nodePK], objectSDK.ECChunk(ch))
|
||||||
|
}
|
||||||
|
return e.createECInfoErr()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ecInfo) createECInfoErr() *objectSDK.ECInfoError {
|
||||||
|
unique := make(map[string]struct{})
|
||||||
|
result := objectSDK.NewECInfo()
|
||||||
|
for _, ch := range e.localChunks {
|
||||||
|
if _, found := unique[string(ch.ID.GetValue())]; found {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
result.AddChunk(ch)
|
||||||
|
unique[string(ch.ID.GetValue())] = struct{}{}
|
||||||
|
}
|
||||||
|
for _, chunks := range e.remoteChunks {
|
||||||
|
for _, ch := range chunks {
|
||||||
|
if _, found := unique[string(ch.ID.GetValue())]; found {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
result.AddChunk(ch)
|
||||||
|
unique[string(ch.ID.GetValue())] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return objectSDK.NewECInfoError(result)
|
||||||
|
}
|
||||||
|
|
|
@ -47,7 +47,7 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) error {
|
||||||
zap.Uint64("number", exec.curProcEpoch),
|
zap.Uint64("number", exec.curProcEpoch),
|
||||||
)
|
)
|
||||||
|
|
||||||
traverser, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch)
|
traverser, _, err := exec.svc.traverserGenerator.GenerateTraverser(exec.containerID(), nil, exec.curProcEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%s: %w", logs.SearchCouldNotGenerateContainerTraverser, err)
|
return fmt.Errorf("%s: %w", logs.SearchCouldNotGenerateContainerTraverser, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -11,6 +11,7 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||||
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
containerCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
netmapcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
@ -80,12 +81,13 @@ func newTestStorage() *testStorage {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, error) {
|
func (g *testTraverserGenerator) GenerateTraverser(_ cid.ID, _ *oid.ID, epoch uint64) (*placement.Traverser, *containerCore.Container, error) {
|
||||||
return placement.NewTraverser(
|
t, err := placement.NewTraverser(
|
||||||
placement.ForContainer(g.c),
|
placement.ForContainer(g.c),
|
||||||
placement.UseBuilder(g.b[epoch]),
|
placement.UseBuilder(g.b[epoch]),
|
||||||
placement.WithoutSuccessTracking(),
|
placement.WithoutSuccessTracking(),
|
||||||
)
|
)
|
||||||
|
return t, &containerCore.Container{Value: g.c}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
|
func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
@ -45,7 +46,7 @@ type cfg struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
traverserGenerator interface {
|
traverserGenerator interface {
|
||||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
|
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, *container.Container, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
currentEpochReceiver interface {
|
currentEpochReceiver interface {
|
||||||
|
|
|
@ -122,17 +122,17 @@ func (g *TraverserGenerator) WithTraverseOptions(opts ...placement.Option) *Trav
|
||||||
|
|
||||||
// GenerateTraverser generates placement Traverser for provided object address
|
// GenerateTraverser generates placement Traverser for provided object address
|
||||||
// using epoch-th network map.
|
// using epoch-th network map.
|
||||||
func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, error) {
|
func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoch uint64) (*placement.Traverser, *container.Container, error) {
|
||||||
// get network map by epoch
|
// get network map by epoch
|
||||||
nm, err := g.netMapSrc.GetNetMapByEpoch(epoch)
|
nm, err := g.netMapSrc.GetNetMapByEpoch(epoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get network map #%d: %w", epoch, err)
|
return nil, nil, fmt.Errorf("could not get network map #%d: %w", epoch, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// get container related container
|
// get container related container
|
||||||
cnr, err := g.cnrSrc.Get(idCnr)
|
cnr, err := g.cnrSrc.Get(idCnr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not get container: %w", err)
|
return nil, nil, fmt.Errorf("could not get container: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// allocate placement traverser options
|
// allocate placement traverser options
|
||||||
|
@ -160,5 +160,9 @@ func (g *TraverserGenerator) GenerateTraverser(idCnr cid.ID, idObj *oid.ID, epoc
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
return placement.NewTraverser(traverseOpts...)
|
t, err := placement.NewTraverser(traverseOpts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
return t, cnr, nil
|
||||||
}
|
}
|
||||||
|
|
commonParameters
may be changed during Head request (ForgetTokens
,WithLocalOnly
), so need a copy. Founded with autotests.Previously it was possible to pass
nil
, now we pass a pointer to the empty struct.This is a change in behaviour, can anything bad happen?
Previously it was possible to pass
nil
, now we pass a pointer to the empty struct.This is a change in behaviour, can anything bad happen?
Fixed