Fix head --raw
request handling for EC #1271
4 changed files with 21 additions and 71 deletions
|
@ -11,7 +11,7 @@ import (
|
|||
)
|
||||
|
||||
func (r *request) assembleEC(ctx context.Context) {
|
||||
if r.isRaw() && r.isLocal() {
|
||||
if r.isRaw() {
|
||||
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
||||
return
|
||||
}
|
||||
|
@ -43,7 +43,7 @@ func (r *request) assembleEC(ctx context.Context) {
|
|||
}
|
||||
|
||||
r.prm.common = r.prm.common.WithLocalOnly(false)
|
||||
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.localStorage, r.containerSource, r.log, r.headOnly(), r.isRaw(), r.traverserGenerator, r.curProcEpoch)
|
||||
assembler := newAssemblerEC(r.address(), r.infoEC, r.ctxRange(), r, r.localStorage, r.containerSource, r.log, r.headOnly(), r.traverserGenerator, r.curProcEpoch)
|
||||
|
||||
r.log.Debug(logs.GetAssemblingECObject,
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
|
|
|
@ -37,7 +37,6 @@ type assemblerec struct {
|
|||
cs container.Source
|
||||
log *logger.Logger
|
||||
head bool
|
||||
raw bool
|
||||
traverserGenerator traverserGenerator
|
||||
epoch uint64
|
||||
}
|
||||
|
@ -51,7 +50,6 @@ func newAssemblerEC(
|
|||
cs container.Source,
|
||||
log *logger.Logger,
|
||||
head bool,
|
||||
raw bool,
|
||||
tg traverserGenerator,
|
||||
epoch uint64,
|
||||
) *assemblerec {
|
||||
|
@ -64,7 +62,6 @@ func newAssemblerEC(
|
|||
cs: cs,
|
||||
log: log,
|
||||
head: head,
|
||||
raw: raw,
|
||||
traverserGenerator: tg,
|
||||
epoch: epoch,
|
||||
}
|
||||
|
@ -74,9 +71,6 @@ func newAssemblerEC(
|
|||
// It returns parent object.
|
||||
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
switch {
|
||||
case a.raw:
|
||||
err := a.reconstructRawError(ctx)
|
||||
return nil, err
|
||||
case a.head:
|
||||
return a.reconstructHeader(ctx, writer)
|
||||
case a.rng != nil:
|
||||
|
@ -149,56 +143,6 @@ func (a *assemblerec) reconstructObjectFromParts(ctx context.Context, headers bo
|
|||
return c.Reconstruct(parts)
|
||||
}
|
||||
|
||||
func (a *assemblerec) reconstructRawError(ctx context.Context) error {
|
||||
chunks := make(map[string]objectSDK.ECChunk)
|
||||
var chunksGuard sync.Mutex
|
||||
for _, ch := range a.ecInfo.localChunks {
|
||||
chunks[string(ch.ID.GetValue())] = ch
|
||||
}
|
||||
|
||||
objID := a.addr.Object()
|
||||
trav, _, err := a.traverserGenerator.GenerateTraverser(a.addr.Container(), &objID, a.epoch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
eg, ctx := errgroup.WithContext(ctx)
|
||||
for {
|
||||
batch := trav.Next()
|
||||
if len(batch) == 0 {
|
||||
break
|
||||
}
|
||||
for _, node := range batch {
|
||||
var info client.NodeInfo
|
||||
client.NodeInfoFromNetmapElement(&info, node)
|
||||
eg.Go(func() error {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
}
|
||||
|
||||
if _, found := a.ecInfo.remoteChunks[string(info.PublicKey())]; found {
|
||||
return nil
|
||||
}
|
||||
|
||||
nodeChunks := a.tryGetChunkListFromNode(ctx, info)
|
||||
|
||||
chunksGuard.Lock()
|
||||
defer chunksGuard.Unlock()
|
||||
for _, ch := range nodeChunks {
|
||||
chunks[string(ch.ID.GetValue())] = ch
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
}
|
||||
if err = eg.Wait(); err != nil {
|
||||
return err
|
||||
}
|
||||
return createECInfoErr(chunks)
|
||||
}
|
||||
|
||||
func (a *assemblerec) retrieveParts(ctx context.Context, trav *placement.Traverser, cnr *container.Container) []*objectSDK.Object {
|
||||
dataCount := policy.ECDataCount(cnr.Value.PlacementPolicy())
|
||||
parityCount := policy.ECParityCount(cnr.Value.PlacementPolicy())
|
||||
|
@ -293,7 +237,7 @@ func (a *assemblerec) tryGetChunkFromLocalStorage(ctx context.Context, ch object
|
|||
return nil
|
||||
}
|
||||
var addr oid.Address
|
||||
addr.SetContainer(addr.Container())
|
||||
addr.SetContainer(a.addr.Container())
|
||||
|
||||
addr.SetObject(objID)
|
||||
var object *objectSDK.Object
|
||||
if a.head {
|
||||
|
@ -359,11 +303,3 @@ func (a *assemblerec) tryGetChunkFromRemoteStorage(ctx context.Context, node cli
|
|||
}
|
||||
return object
|
||||
}
|
||||
|
||||
func createECInfoErr(chunks map[string]objectSDK.ECChunk) *objectSDK.ECInfoError {
|
||||
info := objectSDK.NewECInfo()
|
||||
for _, ch := range chunks {
|
||||
info.AddChunk(ch)
|
||||
}
|
||||
return objectSDK.NewECInfoError(info)
|
||||
}
|
||||
|
|
|
@ -111,6 +111,10 @@ func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
|
|||
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
|
||||
case statusEC:
|
||||
exec.log.Debug(logs.GetRequestedObjectIsEC)
|
||||
if exec.isRaw() && execCnr {
|
||||
exec.executeOnContainer(ctx)
|
||||
exec.analyzeStatus(ctx, false)
|
||||
}
|
||||
exec.assembleEC(ctx)
|
||||
default:
|
||||
exec.log.Debug(logs.OperationFinishedWithError,
|
||||
|
|
|
@ -35,8 +35,12 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
|||
switch {
|
||||
default:
|
||||
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
||||
r.status = statusUndefined
|
||||
r.err = new(apistatus.ObjectNotFound)
|
||||
if r.status != statusEC {
|
||||
// for raw requests, continue to collect other parts
|
||||
r.status = statusUndefined
|
||||
r.err = new(apistatus.ObjectNotFound)
|
||||
}
|
||||
return false
|
||||
case err == nil:
|
||||
r.status = statusOK
|
||||
r.err = nil
|
||||
|
@ -48,22 +52,28 @@ func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
|||
r.collectedObject = obj
|
||||
r.writeCollectedObject(ctx)
|
||||
}
|
||||
return true
|
||||
case errors.As(err, &errRemoved):
|
||||
r.status = statusINHUMED
|
||||
r.err = errRemoved
|
||||
return true
|
||||
case errors.As(err, &errOutOfRange):
|
||||
r.status = statusOutOfRange
|
||||
r.err = errOutOfRange
|
||||
return true
|
||||
case errors.As(err, &errSplitInfo):
|
||||
r.status = statusVIRTUAL
|
||||
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
||||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||
return true
|
||||
case errors.As(err, &errECInfo):
|
||||
r.status = statusEC
|
||||
r.err = r.infoEC.addRemote(string(info.PublicKey()), errECInfo.ECInfo())
|
||||
if r.isRaw() {
|
||||
dstepanov-yadro
commented
It can be written more simply, but it's clearer this way. It can be written more simply, but it's clearer this way.
fyrchik
commented
questionable, but ok questionable, but ok
|
||||
return false // continue to collect all parts
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
return r.status != statusUndefined
|
||||
}
|
||||
|
||||
func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.NodeInfo) (*objectSDK.Object, error) {
|
||||
|
|
Loading…
Reference in a new issue
Shame on me
It seems like another bug, what could happen here?
The local chunk was never taken in the assembly of the object.