forked from TrueCloudLab/frostfs-node
[#1271] getSvc: Fix head --raw
assemble for EC
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
9c2c76ca32
commit
8398a8b4b3
4 changed files with 20 additions and 70 deletions
|
@ -11,7 +11,7 @@ import (
|
|||
)
|
||||
|
||||
func (r *request) assembleEC(ctx context.Context) {
|
||||
if r.isRaw() && r.isLocal() {
|
||||
if r.isRaw() {
|
||||
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
||||
return
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ func (r *request) assembleEC(ctx context.Context) {
|
|||
}
|
||||
|
||||
r.prm.common = r.prm.common.WithLocalOnly(false)
|
||||
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.localStorage, r.containerSource, r.log, r.headOnly(), r.isRaw(), r.traverserGenerator, r.curProcEpoch)
|
||||
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.localStorage, r.containerSource, r.log, r.headOnly(), r.traverserGenerator, r.curProcEpoch)
|
||||
|
||||
r.log.Debug(logs.GetAssemblingECObject,
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
|
|
|
@ -37,7 +37,6 @@ type assemblerec struct {
|
|||
cs container.Source
|
||||
log *logger.Logger
|
||||
head bool
|
||||
raw bool
|
||||
traverserGenerator traverserGenerator
|
||||
epoch uint64
|
||||
}
|
||||
|
@ -51,7 +50,6 @@ func newAssemblerEC(
|
|||
cs container.Source,
|
||||
log *logger.Logger,
|
||||
head bool,
|
||||
raw bool,
|
||||
tg traverserGenerator,
|
||||
epoch uint64,
|
||||
) *assemblerec {
|
||||
|
@ -64,7 +62,6 @@ func newAssemblerEC(
|
|||
cs: cs,
|
||||
log: log,
|
||||
head: head,
|
||||
raw: raw,
|
||||
traverserGenerator: tg,
|
||||
epoch: epoch,
|
||||
}
|
||||
|
@ -74,9 +71,6 @@ func newAssemblerEC(
|
|||
// It returns parent object.
|
||||
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
switch {
|
||||
case a.raw:
|
||||
err := a.reconstructRawError(ctx)
|
||||
return nil, err
|
||||
case a.head:
|
||||
return a.reconstructHeader(ctx, writer)
|
||||
case a.rng != nil:
|
||||
|
@ -149,56 +143,6 @@ func (a *assemblerec) reconstructObjectFromParts(ctx context.Context, headers bo
|
|||
return c.Reconstruct(parts)
|
||||
}
|
||||
|
||||
func (a *assemblerec) reconstructRawError(ctx context.Context) error {
|
||||
chunks := make(map[string]objectSDK.ECChunk)
|
||||
var chunksGuard sync.Mutex
|
||||
for _, ch := range a.ecInfo.localChunks {
|
||||
chunks[string(ch.ID.GetValue())] = ch
|
||||
}
|
||||
|
||||
objID := a.addr.Object()
|
||||
trav, _, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
for {
|
||||
batch := trav.Next()
|
||||
if len(batch) == 0 {
|
||||
break
|
||||
}
|
||||
for _, node := range batch {
|
||||
var info client.NodeInfo
|
||||
client.NodeInfoFromNetmapElement(&info, node)
|
||||
eg.Go(func() error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if _, found := a.ecInfo.remoteChunks[string(info.PublicKey())]; found {
|
||||
return nil
|
||||
}
|
||||
|
||||
nodeChunks := a.tryGetChunkListFromNode(ctx, info)
|
||||
|
||||
chunksGuard.Lock()
|
||||
defer chunksGuard.Unlock()
|
||||
for _, ch := range nodeChunks {
|
||||
chunks[string(ch.ID.GetValue())] = ch
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
if err = eg.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
return createECInfoErr(chunks)
|
||||
}
|
||||
|
||||
func (a *assemblerec) retrieveParts(ctx context.Context, trav *placement.Traverser, cnr *container.Container) []*objectSDK.Object {
|
||||
dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy())
|
||||
parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy())
|
||||
|
@ -359,11 +303,3 @@ func (a *assemblerec) tryGetChunkFromRemoteStorage(ctx context.Context, node cli
|
|||
}
|
||||
return object
|
||||
}
|
||||
|
||||
func createECInfoErr(chunks map[string]objectSDK.ECChunk) *objectSDK.ECInfoError {
|
||||
info := objectSDK.NewECInfo()
|
||||
for _, ch := range chunks {
|
||||
info.AddChunk(ch)
|
||||
}
|
||||
return objectSDK.NewECInfoError(info)
|
||||
}
|
||||
|
|
|
@ -111,6 +111,10 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
|
|||
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
|
||||
case statusEC:
|
||||
exec.log.Debug(logs.GetRequestedObjectIsEC)
|
||||
if exec.isRaw() && execCnr {
|
||||
exec.executeOnContainer(ctx)
|
||||
exec.analyzeStatus(ctx, false)
|
||||
}
|
||||
exec.assembleEC(ctx)
|
||||
default:
|
||||
exec.log.Debug(logs.OperationFinishedWithError,
|
||||
|
|
|
@ -35,8 +35,12 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
|||
switch {
|
||||
default:
|
||||
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
||||
r.status = statusUndefined
|
||||
r.err = new(apistatus.ObjectNotFound)
|
||||
if r.status != statusEC {
|
||||
// for raw requests, continue to collect other parts
|
||||
r.status = statusUndefined
|
||||
r.err = new(apistatus.ObjectNotFound)
|
||||
}
|
||||
return false
|
||||
case err == nil:
|
||||
r.status = statusOK
|
||||
r.err = nil
|
||||
|
@ -48,22 +52,28 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
|||
r.collectedObject = obj
|
||||
r.writeCollectedObject(ctx)
|
||||
}
|
||||
return true
|
||||
case errors.As(err, &errRemoved):
|
||||
r.status = statusINHUMED
|
||||
r.err = errRemoved
|
||||
return true
|
||||
case errors.As(err, &errOutOfRange):
|
||||
r.status = statusOutOfRange
|
||||
r.err = errOutOfRange
|
||||
return true
|
||||
case errors.As(err, &errSplitInfo):
|
||||
r.status = statusVIRTUAL
|
||||
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
||||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||
return true
|
||||
case errors.As(err, &errECInfo):
|
||||
r.status = statusEC
|
||||
r.err = r.infoEC.addRemote(string(info.PublicKey()), errECInfo.ECInfo())
|
||||
if r.isRaw() {
|
||||
return false // continue to collect all parts
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
return r.status != statusUndefined
|
||||
}
|
||||
|
||||
func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.NodeInfo) (*objectSDK.Object, error) {
|
||||
|
|
Loading…
Reference in a new issue