forked from TrueCloudLab/frostfs-node
[#1271] getSvc: Fix head --raw
assemble for EC
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
ca4d6df1cc
commit
37b83c0856
4 changed files with 20 additions and 70 deletions
|
@ -11,7 +11,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (r *request) assembleEC(ctx context.Context) {
|
func (r *request) assembleEC(ctx context.Context) {
|
||||||
if r.isRaw() && r.isLocal() {
|
if r.isRaw() {
|
||||||
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -43,7 +43,7 @@ func (r *request) assembleEC(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
r.prm.common = r.prm.common.WithLocalOnly(false)
|
r.prm.common = r.prm.common.WithLocalOnly(false)
|
||||||
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.localStorage, r.containerSource, r.log, r.headOnly(), r.isRaw(), r.traverserGenerator, r.curProcEpoch)
|
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.localStorage, r.containerSource, r.log, r.headOnly(), r.traverserGenerator, r.curProcEpoch)
|
||||||
|
|
||||||
r.log.Debug(logs.GetAssemblingECObject,
|
r.log.Debug(logs.GetAssemblingECObject,
|
||||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
|
|
|
@ -37,7 +37,6 @@ type assemblerec struct {
|
||||||
cs container.Source
|
cs container.Source
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
head bool
|
head bool
|
||||||
raw bool
|
|
||||||
traverserGenerator traverserGenerator
|
traverserGenerator traverserGenerator
|
||||||
epoch uint64
|
epoch uint64
|
||||||
}
|
}
|
||||||
|
@ -51,7 +50,6 @@ func newAssemblerEC(
|
||||||
cs container.Source,
|
cs container.Source,
|
||||||
log *logger.Logger,
|
log *logger.Logger,
|
||||||
head bool,
|
head bool,
|
||||||
raw bool,
|
|
||||||
tg traverserGenerator,
|
tg traverserGenerator,
|
||||||
epoch uint64,
|
epoch uint64,
|
||||||
) *assemblerec {
|
) *assemblerec {
|
||||||
|
@ -64,7 +62,6 @@ func newAssemblerEC(
|
||||||
cs: cs,
|
cs: cs,
|
||||||
log: log,
|
log: log,
|
||||||
head: head,
|
head: head,
|
||||||
raw: raw,
|
|
||||||
traverserGenerator: tg,
|
traverserGenerator: tg,
|
||||||
epoch: epoch,
|
epoch: epoch,
|
||||||
}
|
}
|
||||||
|
@ -74,9 +71,6 @@ func newAssemblerEC(
|
||||||
// It returns parent object.
|
// It returns parent object.
|
||||||
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||||
switch {
|
switch {
|
||||||
case a.raw:
|
|
||||||
err := a.reconstructRawError(ctx)
|
|
||||||
return nil, err
|
|
||||||
case a.head:
|
case a.head:
|
||||||
return a.reconstructHeader(ctx, writer)
|
return a.reconstructHeader(ctx, writer)
|
||||||
case a.rng != nil:
|
case a.rng != nil:
|
||||||
|
@ -149,56 +143,6 @@ func (a *assemblerec) reconstructObjectFromParts(ctx context.Context, headers bo
|
||||||
return c.Reconstruct(parts)
|
return c.Reconstruct(parts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *assemblerec) reconstructRawError(ctx context.Context) error {
|
|
||||||
chunks := make(map[string]objectSDK.ECChunk)
|
|
||||||
var chunksGuard sync.Mutex
|
|
||||||
for _, ch := range a.ecInfo.localChunks {
|
|
||||||
chunks[string(ch.ID.GetValue())] = ch
|
|
||||||
}
|
|
||||||
|
|
||||||
objID := a.addr.Object()
|
|
||||||
trav, _, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
eg, ctx := errgroup.WithContext(ctx)
|
|
||||||
for {
|
|
||||||
batch := trav.Next()
|
|
||||||
if len(batch) == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
for _, node := range batch {
|
|
||||||
var info client.NodeInfo
|
|
||||||
client.NodeInfoFromNetmapElement(&info, node)
|
|
||||||
eg.Go(func() error {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, found := a.ecInfo.remoteChunks[string(info.PublicKey())]; found {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
nodeChunks := a.tryGetChunkListFromNode(ctx, info)
|
|
||||||
|
|
||||||
chunksGuard.Lock()
|
|
||||||
defer chunksGuard.Unlock()
|
|
||||||
for _, ch := range nodeChunks {
|
|
||||||
chunks[string(ch.ID.GetValue())] = ch
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err = eg.Wait(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return createECInfoErr(chunks)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *assemblerec) retrieveParts(ctx context.Context, trav *placement.Traverser, cnr *container.Container) []*objectSDK.Object {
|
func (a *assemblerec) retrieveParts(ctx context.Context, trav *placement.Traverser, cnr *container.Container) []*objectSDK.Object {
|
||||||
dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy())
|
dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy())
|
||||||
parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy())
|
parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy())
|
||||||
|
@ -359,11 +303,3 @@ func (a *assemblerec) tryGetChunkFromRemoteStorage(ctx context.Context, node cli
|
||||||
}
|
}
|
||||||
return object
|
return object
|
||||||
}
|
}
|
||||||
|
|
||||||
func createECInfoErr(chunks map[string]objectSDK.ECChunk) *objectSDK.ECInfoError {
|
|
||||||
info := objectSDK.NewECInfo()
|
|
||||||
for _, ch := range chunks {
|
|
||||||
info.AddChunk(ch)
|
|
||||||
}
|
|
||||||
return objectSDK.NewECInfoError(info)
|
|
||||||
}
|
|
||||||
|
|
|
@ -111,6 +111,10 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
|
||||||
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
|
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
|
||||||
case statusEC:
|
case statusEC:
|
||||||
exec.log.Debug(logs.GetRequestedObjectIsEC)
|
exec.log.Debug(logs.GetRequestedObjectIsEC)
|
||||||
|
if exec.isRaw() && execCnr {
|
||||||
|
exec.executeOnContainer(ctx)
|
||||||
|
exec.analyzeStatus(ctx, false)
|
||||||
|
}
|
||||||
exec.assembleEC(ctx)
|
exec.assembleEC(ctx)
|
||||||
default:
|
default:
|
||||||
exec.log.Debug(logs.OperationFinishedWithError,
|
exec.log.Debug(logs.OperationFinishedWithError,
|
||||||
|
|
|
@ -35,8 +35,12 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
||||||
switch {
|
switch {
|
||||||
default:
|
default:
|
||||||
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
||||||
|
if r.status != statusEC {
|
||||||
|
// for raw requests, continue to collect other parts
|
||||||
r.status = statusUndefined
|
r.status = statusUndefined
|
||||||
r.err = new(apistatus.ObjectNotFound)
|
r.err = new(apistatus.ObjectNotFound)
|
||||||
|
}
|
||||||
|
return false
|
||||||
case err == nil:
|
case err == nil:
|
||||||
r.status = statusOK
|
r.status = statusOK
|
||||||
r.err = nil
|
r.err = nil
|
||||||
|
@ -48,22 +52,28 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
||||||
r.collectedObject = obj
|
r.collectedObject = obj
|
||||||
r.writeCollectedObject(ctx)
|
r.writeCollectedObject(ctx)
|
||||||
}
|
}
|
||||||
|
return true
|
||||||
case errors.As(err, &errRemoved):
|
case errors.As(err, &errRemoved):
|
||||||
r.status = statusINHUMED
|
r.status = statusINHUMED
|
||||||
r.err = errRemoved
|
r.err = errRemoved
|
||||||
|
return true
|
||||||
case errors.As(err, &errOutOfRange):
|
case errors.As(err, &errOutOfRange):
|
||||||
r.status = statusOutOfRange
|
r.status = statusOutOfRange
|
||||||
r.err = errOutOfRange
|
r.err = errOutOfRange
|
||||||
|
return true
|
||||||
case errors.As(err, &errSplitInfo):
|
case errors.As(err, &errSplitInfo):
|
||||||
r.status = statusVIRTUAL
|
r.status = statusVIRTUAL
|
||||||
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
||||||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||||
|
return true
|
||||||
case errors.As(err, &errECInfo):
|
case errors.As(err, &errECInfo):
|
||||||
r.status = statusEC
|
r.status = statusEC
|
||||||
r.err = r.infoEC.addRemote(string(info.PublicKey()), errECInfo.ECInfo())
|
r.err = r.infoEC.addRemote(string(info.PublicKey()), errECInfo.ECInfo())
|
||||||
|
if r.isRaw() {
|
||||||
|
return false // continue to collect all parts
|
||||||
|
}
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
return r.status != statusUndefined
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.NodeInfo) (*objectSDK.Object, error) {
|
func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.NodeInfo) (*objectSDK.Object, error) {
|
||||||
|
|
Loading…
Reference in a new issue