Fix head --raw
request handling for EC #1271
4 changed files with 21 additions and 71 deletions
|
@ -11,7 +11,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (r *request) assembleEC(ctx context.Context) {
|
func (r *request) assembleEC(ctx context.Context) {
|
||||||
if r.isRaw() && r.isLocal() {
|
if r.isRaw() {
|
||||||
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -43,7 +43,7 @@ func (r *request) assembleEC(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
r.prm.common = r.prm.common.WithLocalOnly(false)
|
r.prm.common = r.prm.common.WithLocalOnly(false)
|
||||||
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.localStorage, r.containerSource, r.log, r.headOnly(), r.isRaw(), r.traverserGenerator, r.curProcEpoch)
|
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.localStorage, r.containerSource, r.log, r.headOnly(), r.traverserGenerator, r.curProcEpoch)
|
||||||
|
|
||||||
r.log.Debug(logs.GetAssemblingECObject,
|
r.log.Debug(logs.GetAssemblingECObject,
|
||||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
|
|
|
@ -37,7 +37,6 @@ type assemblerec struct {
|
||||||
cs container.Source
|
cs container.Source
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
head bool
|
head bool
|
||||||
raw bool
|
|
||||||
traverserGenerator traverserGenerator
|
traverserGenerator traverserGenerator
|
||||||
epoch uint64
|
epoch uint64
|
||||||
}
|
}
|
||||||
|
@ -51,7 +50,6 @@ func newAssemblerEC(
|
||||||
cs container.Source,
|
cs container.Source,
|
||||||
log *logger.Logger,
|
log *logger.Logger,
|
||||||
head bool,
|
head bool,
|
||||||
raw bool,
|
|
||||||
tg traverserGenerator,
|
tg traverserGenerator,
|
||||||
epoch uint64,
|
epoch uint64,
|
||||||
) *assemblerec {
|
) *assemblerec {
|
||||||
|
@ -64,7 +62,6 @@ func newAssemblerEC(
|
||||||
cs: cs,
|
cs: cs,
|
||||||
log: log,
|
log: log,
|
||||||
head: head,
|
head: head,
|
||||||
raw: raw,
|
|
||||||
traverserGenerator: tg,
|
traverserGenerator: tg,
|
||||||
epoch: epoch,
|
epoch: epoch,
|
||||||
}
|
}
|
||||||
|
@ -74,9 +71,6 @@ func newAssemblerEC(
|
||||||
// It returns parent object.
|
// It returns parent object.
|
||||||
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||||
switch {
|
switch {
|
||||||
case a.raw:
|
|
||||||
err := a.reconstructRawError(ctx)
|
|
||||||
return nil, err
|
|
||||||
case a.head:
|
case a.head:
|
||||||
return a.reconstructHeader(ctx, writer)
|
return a.reconstructHeader(ctx, writer)
|
||||||
case a.rng != nil:
|
case a.rng != nil:
|
||||||
|
@ -149,56 +143,6 @@ func (a *assemblerec) reconstructObjectFromParts(ctx context.Context, headers bo
|
||||||
return c.Reconstruct(parts)
|
return c.Reconstruct(parts)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *assemblerec) reconstructRawError(ctx context.Context) error {
|
|
||||||
chunks := make(map[string]objectSDK.ECChunk)
|
|
||||||
var chunksGuard sync.Mutex
|
|
||||||
for _, ch := range a.ecInfo.localChunks {
|
|
||||||
chunks[string(ch.ID.GetValue())] = ch
|
|
||||||
}
|
|
||||||
|
|
||||||
objID := a.addr.Object()
|
|
||||||
trav, _, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
eg, ctx := errgroup.WithContext(ctx)
|
|
||||||
for {
|
|
||||||
batch := trav.Next()
|
|
||||||
if len(batch) == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
for _, node := range batch {
|
|
||||||
var info client.NodeInfo
|
|
||||||
client.NodeInfoFromNetmapElement(&info, node)
|
|
||||||
eg.Go(func() error {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return ctx.Err()
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, found := a.ecInfo.remoteChunks[string(info.PublicKey())]; found {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
nodeChunks := a.tryGetChunkListFromNode(ctx, info)
|
|
||||||
|
|
||||||
chunksGuard.Lock()
|
|
||||||
defer chunksGuard.Unlock()
|
|
||||||
for _, ch := range nodeChunks {
|
|
||||||
chunks[string(ch.ID.GetValue())] = ch
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if err = eg.Wait(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return createECInfoErr(chunks)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a *assemblerec) retrieveParts(ctx context.Context, trav *placement.Traverser, cnr *container.Container) []*objectSDK.Object {
|
func (a *assemblerec) retrieveParts(ctx context.Context, trav *placement.Traverser, cnr *container.Container) []*objectSDK.Object {
|
||||||
dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy())
|
dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy())
|
||||||
parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy())
|
parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy())
|
||||||
|
@ -293,7 +237,7 @@ func (a *assemblerec) tryGetChunkFromLocalStorage(ctx context.Context, ch object
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(addr.Container())
|
addr.SetContainer(a.addr.Container())
|
||||||
|
|||||||
addr.SetObject(objID)
|
addr.SetObject(objID)
|
||||||
var object *objectSDK.Object
|
var object *objectSDK.Object
|
||||||
if a.head {
|
if a.head {
|
||||||
|
@ -359,11 +303,3 @@ func (a *assemblerec) tryGetChunkFromRemoteStorage(ctx context.Context, node cli
|
||||||
}
|
}
|
||||||
return object
|
return object
|
||||||
}
|
}
|
||||||
|
|
||||||
func createECInfoErr(chunks map[string]objectSDK.ECChunk) *objectSDK.ECInfoError {
|
|
||||||
info := objectSDK.NewECInfo()
|
|
||||||
for _, ch := range chunks {
|
|
||||||
info.AddChunk(ch)
|
|
||||||
}
|
|
||||||
return objectSDK.NewECInfoError(info)
|
|
||||||
}
|
|
||||||
|
|
|
@ -111,6 +111,10 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
|
||||||
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
|
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
|
||||||
case statusEC:
|
case statusEC:
|
||||||
exec.log.Debug(logs.GetRequestedObjectIsEC)
|
exec.log.Debug(logs.GetRequestedObjectIsEC)
|
||||||
|
if exec.isRaw() && execCnr {
|
||||||
|
exec.executeOnContainer(ctx)
|
||||||
|
exec.analyzeStatus(ctx, false)
|
||||||
|
}
|
||||||
exec.assembleEC(ctx)
|
exec.assembleEC(ctx)
|
||||||
default:
|
default:
|
||||||
exec.log.Debug(logs.OperationFinishedWithError,
|
exec.log.Debug(logs.OperationFinishedWithError,
|
||||||
|
|
|
@ -35,8 +35,12 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
||||||
switch {
|
switch {
|
||||||
default:
|
default:
|
||||||
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
||||||
r.status = statusUndefined
|
if r.status != statusEC {
|
||||||
r.err = new(apistatus.ObjectNotFound)
|
// for raw requests, continue to collect other parts
|
||||||
|
r.status = statusUndefined
|
||||||
|
r.err = new(apistatus.ObjectNotFound)
|
||||||
|
}
|
||||||
|
return false
|
||||||
case err == nil:
|
case err == nil:
|
||||||
r.status = statusOK
|
r.status = statusOK
|
||||||
r.err = nil
|
r.err = nil
|
||||||
|
@ -48,22 +52,28 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
||||||
r.collectedObject = obj
|
r.collectedObject = obj
|
||||||
r.writeCollectedObject(ctx)
|
r.writeCollectedObject(ctx)
|
||||||
}
|
}
|
||||||
|
return true
|
||||||
case errors.As(err, &errRemoved):
|
case errors.As(err, &errRemoved):
|
||||||
r.status = statusINHUMED
|
r.status = statusINHUMED
|
||||||
r.err = errRemoved
|
r.err = errRemoved
|
||||||
|
return true
|
||||||
case errors.As(err, &errOutOfRange):
|
case errors.As(err, &errOutOfRange):
|
||||||
r.status = statusOutOfRange
|
r.status = statusOutOfRange
|
||||||
r.err = errOutOfRange
|
r.err = errOutOfRange
|
||||||
|
return true
|
||||||
case errors.As(err, &errSplitInfo):
|
case errors.As(err, &errSplitInfo):
|
||||||
r.status = statusVIRTUAL
|
r.status = statusVIRTUAL
|
||||||
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
||||||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||||
|
return true
|
||||||
case errors.As(err, &errECInfo):
|
case errors.As(err, &errECInfo):
|
||||||
r.status = statusEC
|
r.status = statusEC
|
||||||
r.err = r.infoEC.addRemote(string(info.PublicKey()), errECInfo.ECInfo())
|
r.err = r.infoEC.addRemote(string(info.PublicKey()), errECInfo.ECInfo())
|
||||||
|
if r.isRaw() {
|
||||||
dstepanov-yadro
commented
It can be written more simply, but it's clearer this way. It can be written more simply, but it's clearer this way.
fyrchik
commented
questionable, but ok questionable, but ok
|
|||||||
|
return false // continue to collect all parts
|
||||||
|
}
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
return r.status != statusUndefined
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.NodeInfo) (*objectSDK.Object, error) {
|
func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.NodeInfo) (*objectSDK.Object, error) {
|
||||||
|
|
Loading…
Add table
Reference in a new issue
Shame on me
It seems like another bug, what could happen here?
The local chunk was never taken in the assembly of the object.