[#85] get-service: Use assembler to assemble LOB

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-03-09 11:02:27 +03:00
parent 07de839f18
commit b8e93d4c08
9 changed files with 126 additions and 335 deletions

View file

@ -1,6 +1,7 @@
package getsvc
import (
"context"
"errors"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -35,223 +36,102 @@ func (exec *execCtx) assemble() {
exec.log.Debug("trying to assemble the object...")
splitInfo := exec.splitInfo()
assembler := newAssembler(exec.address(), exec.splitInfo(), exec.ctxRange(), exec)
childID, ok := splitInfo.Link()
if !ok {
childID, ok = splitInfo.LastPart()
if !ok {
exec.log.Debug("neither linking nor last part of split-chain is presented in split info")
return
}
}
prev, children := exec.initFromChild(childID)
if len(children) > 0 {
if exec.ctxRange() == nil {
if ok := exec.writeCollectedHeader(); ok {
exec.overtakePayloadDirectly(children, nil, true)
}
} else {
// TODO: #1155 choose one-by-one restoring algorithm according to size
// * if size > MAX => go right-to-left with HEAD and back with GET
// * else go right-to-left with GET and compose in single object before writing
if ok := exec.overtakePayloadInReverse(children[len(children)-1]); ok {
// payload of all children except the last are written, write last payload
exec.writeObjectPayload(exec.collectedObject)
}
}
} else if prev != nil {
if ok := exec.writeCollectedHeader(); ok {
// TODO: #1155 choose one-by-one restoring algorithm according to size
// * if size > MAX => go right-to-left with HEAD and back with GET
// * else go right-to-left with GET and compose in single object before writing
if ok := exec.overtakePayloadInReverse(*prev); ok {
// payload of all children except the last are written, write last payloa
exec.writeObjectPayload(exec.collectedObject)
}
}
} else {
exec.log.Debug("could not init parent from child")
}
}
func (exec *execCtx) initFromChild(obj oid.ID) (prev *oid.ID, children []oid.ID) {
log := exec.log.With(zap.Stringer("child ID", obj))
log.Debug("starting assembling from child")
child, ok := exec.getChild(obj, nil, true)
if !ok {
return
}
par := child.Parent()
if par == nil {
exec.status = statusUndefined
exec.err = errors.New("received child with empty parent")
log.Debug("received child with empty parent")
return
}
exec.collectedObject = par
var payload []byte
if rng := exec.ctxRange(); rng != nil {
seekOff := rng.GetOffset()
seekLen := rng.GetLength()
seekTo := seekOff + seekLen
parSize := par.PayloadSize()
if seekTo < seekOff || parSize < seekOff || parSize < seekTo {
var errOutOfRange apistatus.ObjectOutOfRange
exec.err = &errOutOfRange
exec.status = statusOutOfRange
return
}
childSize := child.PayloadSize()
exec.curOff = parSize - childSize
from := uint64(0)
if exec.curOff < seekOff {
from = seekOff - exec.curOff
}
to := uint64(0)
if seekOff+seekLen > exec.curOff+from {
to = seekOff + seekLen - exec.curOff
}
payload = child.Payload()[from:to]
rng.SetLength(rng.GetLength() - to + from)
} else {
payload = child.Payload()
}
exec.collectedObject.SetPayload(payload)
idPrev, ok := child.PreviousID()
if ok {
return &idPrev, child.Children()
}
return nil, child.Children()
}
func (exec *execCtx) overtakePayloadDirectly(children []oid.ID, rngs []objectSDK.Range, checkRight bool) {
withRng := len(rngs) > 0 && exec.ctxRange() != nil
for i := range children {
var r *objectSDK.Range
if withRng {
r = &rngs[i]
}
child, ok := exec.getChild(children[i], r, !withRng && checkRight)
if !ok {
return
}
if ok := exec.writeObjectPayload(child); !ok {
return
}
}
exec.status = statusOK
exec.err = nil
}
func (exec *execCtx) overtakePayloadInReverse(prev oid.ID) bool {
chain, rngs, ok := exec.buildChainInReverse(prev)
if !ok {
return false
}
reverseRngs := len(rngs) > 0
// reverse chain
for left, right := 0, len(chain)-1; left < right; left, right = left+1, right-1 {
chain[left], chain[right] = chain[right], chain[left]
if reverseRngs {
rngs[left], rngs[right] = rngs[right], rngs[left]
}
}
exec.overtakePayloadDirectly(chain, rngs, false)
return exec.status == statusOK
}
func (exec *execCtx) buildChainInReverse(prev oid.ID) ([]oid.ID, []objectSDK.Range, bool) {
var (
chain = make([]oid.ID, 0)
rngs = make([]objectSDK.Range, 0)
seekRng = exec.ctxRange()
from = seekRng.GetOffset()
to = from + seekRng.GetLength()
withPrev = true
exec.log.Debug("assembling splitted object...",
zap.Stringer("address", exec.address()),
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
zap.Uint64("range_length", exec.ctxRange().GetLength()),
)
defer exec.log.Debug("assembling splitted object completed",
zap.Stringer("address", exec.address()),
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
zap.Uint64("range_length", exec.ctxRange().GetLength()),
)
// fill the chain end-to-start
for withPrev {
// check that only for "range" requests,
// for `GET` it stops via the false `withPrev`
if seekRng != nil && exec.curOff <= from {
break
}
head, ok := exec.headChild(prev)
if !ok {
return nil, nil, false
}
if seekRng != nil {
sz := head.PayloadSize()
exec.curOff -= sz
if exec.curOff < to {
off := uint64(0)
if from > exec.curOff {
off = from - exec.curOff
sz -= from - exec.curOff
}
if to < exec.curOff+off+sz {
sz = to - off - exec.curOff
}
index := len(rngs)
rngs = append(rngs, objectSDK.Range{})
rngs[index].SetOffset(off)
rngs[index].SetLength(sz)
id, _ := head.ID()
chain = append(chain, id)
}
} else {
id, _ := head.ID()
chain = append(chain, id)
}
prev, withPrev = head.PreviousID()
obj, err := assembler.Assemble(exec.context(), exec.prm.objWriter)
if err != nil {
exec.log.Warn("failed to assemble splitted object",
zap.Error(err),
zap.Stringer("address", exec.address()),
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
zap.Uint64("range_length", exec.ctxRange().GetLength()),
)
}
return chain, rngs, true
var errSplitInfo *objectSDK.SplitInfoError
var errRemovedRemote *apistatus.ObjectAlreadyRemoved
var errOutOfRangeRemote *apistatus.ObjectOutOfRange
var errRemovedLocal apistatus.ObjectAlreadyRemoved
var errOutOfRangeLocal apistatus.ObjectOutOfRange
switch {
default:
exec.status = statusUndefined
exec.err = err
case err == nil:
exec.status = statusOK
exec.err = nil
exec.collectedObject = obj
case errors.As(err, &errRemovedRemote):
exec.status = statusINHUMED
exec.err = errRemovedRemote
case errors.As(err, &errRemovedLocal):
exec.status = statusINHUMED
exec.err = errRemovedLocal
case errors.As(err, &errSplitInfo):
exec.status = statusVIRTUAL
exec.err = errSplitInfo
case errors.As(err, &errOutOfRangeRemote):
exec.status = statusOutOfRange
exec.err = errOutOfRangeRemote
case errors.As(err, &errOutOfRangeLocal):
exec.status = statusOutOfRange
exec.err = errOutOfRangeLocal
}
}
func equalAddresses(a, b oid.Address) bool {
return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object())
}
func (exec *execCtx) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) {
p := exec.prm
p.common = p.common.WithLocalOnly(false)
p.addr.SetContainer(exec.containerID())
p.addr.SetObject(id)
prm := HeadPrm{
commonPrm: p.commonPrm,
}
w := NewSimpleObjectWriter()
prm.SetHeaderWriter(w)
err := exec.svc.Head(exec.context(), prm)
if err != nil {
return nil, err
}
return w.Object(), nil
}
func (exec *execCtx) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Range) (*objectSDK.Object, error) {
w := NewSimpleObjectWriter()
p := exec.prm
p.common = p.common.WithLocalOnly(false)
p.objWriter = w
p.SetRange(rng)
p.addr.SetContainer(exec.containerID())
p.addr.SetObject(id)
statusError := exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng))
if statusError.err != nil {
return nil, statusError.err
}
return w.Object(), nil
}

View file

@ -15,11 +15,6 @@ type objectGetter interface {
HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error)
}
type objectWriter interface {
WriteChunk(context.Context, []byte) error
WriteHeader(context.Context, *objectSDK.Object) error
}
var (
errParentAddressDiffers = errors.New("parent address in child object differs")
)
@ -50,7 +45,7 @@ func newAssembler(
// Assemble assembles splitted large object and writes it's content to ObjectWriter.
// It returns parent object.
func (a *assembler) Assemble(ctx context.Context, writer objectWriter) (*objectSDK.Object, error) {
func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
sourceObjectID, ok := a.getLastPartOrLinkObjectID()
if !ok {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
@ -153,7 +148,7 @@ func (a *assembler) getChildObject(ctx context.Context, id oid.ID, rng *objectSD
return obj, nil
}
func (a *assembler) assembleObjectByChildrenList(ctx context.Context, childrenIDs []oid.ID, writer objectWriter) error {
func (a *assembler) assembleObjectByChildrenList(ctx context.Context, childrenIDs []oid.ID, writer ObjectWriter) error {
if a.rng == nil {
if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil {
return err
@ -170,7 +165,7 @@ func (a *assembler) assembleObjectByChildrenList(ctx context.Context, childrenID
return nil
}
func (a *assembler) assemleObjectByPreviousIDInReverse(ctx context.Context, prevID oid.ID, writer objectWriter) error {
func (a *assembler) assemleObjectByPreviousIDInReverse(ctx context.Context, prevID oid.ID, writer ObjectWriter) error {
if a.rng == nil {
if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil {
return err
@ -186,7 +181,7 @@ func (a *assembler) assemleObjectByPreviousIDInReverse(ctx context.Context, prev
return nil
}
func (a *assembler) assemblePayloadByObjectIDs(ctx context.Context, writer objectWriter, partIDs []oid.ID, partRanges []objectSDK.Range, verifyIsChild bool) error {
func (a *assembler) assemblePayloadByObjectIDs(ctx context.Context, writer ObjectWriter, partIDs []oid.ID, partRanges []objectSDK.Range, verifyIsChild bool) error {
withRng := len(partRanges) > 0 && a.rng != nil
for i := range partIDs {
@ -207,7 +202,7 @@ func (a *assembler) assemblePayloadByObjectIDs(ctx context.Context, writer objec
return nil
}
func (a *assembler) assemblePayloadInReverse(ctx context.Context, writer objectWriter, prevID oid.ID) error {
func (a *assembler) assemblePayloadInReverse(ctx context.Context, writer ObjectWriter, prevID oid.ID) error {
chain, rngs, err := a.buildChain(ctx, prevID)
if err != nil {
return err

View file

@ -3,10 +3,8 @@ package getsvc
import (
"context"
"crypto/ecdsa"
"errors"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
@ -36,8 +34,6 @@ type execCtx struct {
collectedObject *objectSDK.Object
curOff uint64
head bool
curProcEpoch uint64
@ -99,15 +95,6 @@ func (exec execCtx) address() oid.Address {
return exec.prm.addr
}
// isChild checks if reading object is a parent of the given object.
// Object without reference to the parent (only children with the parent header
// have it) is automatically considered as child: this should be guaranteed by
// upper level logic.
func (exec execCtx) isChild(obj *objectSDK.Object) bool {
par := obj.Parent()
return par == nil || equalAddresses(exec.address(), object.AddressOf(par))
}
func (exec execCtx) key() (*ecdsa.PrivateKey, error) {
if exec.prm.signerKey != nil {
// the key has already been requested and
@ -199,78 +186,6 @@ func (exec *execCtx) generateTraverser(addr oid.Address) (*placement.Traverser,
}
}
func (exec *execCtx) getChild(id oid.ID, rng *objectSDK.Range, withHdr bool) (*objectSDK.Object, bool) {
w := NewSimpleObjectWriter()
p := exec.prm
p.common = p.common.WithLocalOnly(false)
p.objWriter = w
p.SetRange(rng)
p.addr.SetContainer(exec.containerID())
p.addr.SetObject(id)
exec.statusError = exec.svc.get(exec.context(), p.commonPrm, withPayloadRange(rng))
child := w.Object()
ok := exec.status == statusOK
if ok && withHdr && !exec.isChild(child) {
exec.status = statusUndefined
exec.err = errors.New("wrong child header")
exec.log.Debug("parent address in child object differs")
return nil, false
}
return child, ok
}
func (exec *execCtx) headChild(id oid.ID) (*objectSDK.Object, bool) {
p := exec.prm
p.common = p.common.WithLocalOnly(false)
p.addr.SetContainer(exec.containerID())
p.addr.SetObject(id)
prm := HeadPrm{
commonPrm: p.commonPrm,
}
w := NewSimpleObjectWriter()
prm.SetHeaderWriter(w)
err := exec.svc.Head(exec.context(), prm)
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug("could not get child object header",
zap.Stringer("child ID", id),
zap.String("error", err.Error()),
)
return nil, false
case err == nil:
child := w.Object()
if !exec.isChild(child) {
exec.status = statusUndefined
exec.err = errors.New("parent address in child object differs")
exec.log.Debug("parent address in child object differs")
return nil, false
} else {
exec.status = statusOK
exec.err = nil
}
return child, true
}
}
func (exec execCtx) remoteClient(info clientcore.NodeInfo) (getClient, bool) {
c, err := exec.svc.clientCache.get(info)
@ -307,6 +222,7 @@ func (exec *execCtx) writeCollectedHeader() bool {
}
err := exec.prm.objWriter.WriteHeader(
exec.context(),
exec.collectedObject.CutPayload(),
)
@ -331,7 +247,7 @@ func (exec *execCtx) writeObjectPayload(obj *objectSDK.Object) bool {
return true
}
err := exec.prm.objWriter.WriteChunk(obj.Payload())
err := exec.prm.objWriter.WriteChunk(exec.context(), obj.Payload())
switch {
default:

View file

@ -216,11 +216,11 @@ func (whe *writeHeaderError) Error() string {
type writeHeaderErrorObjectWriter struct {
}
func (w *writeHeaderErrorObjectWriter) WriteHeader(_ *objectSDK.Object) error {
func (w *writeHeaderErrorObjectWriter) WriteHeader(_ context.Context, _ *objectSDK.Object) error {
return &writeHeaderError{}
}
func (w *writeHeaderErrorObjectWriter) WriteChunk(p []byte) error {
func (w *writeHeaderErrorObjectWriter) WriteChunk(_ context.Context, _ []byte) error {
return nil
}
@ -233,11 +233,11 @@ func (whe *writePayloadError) Error() string {
type writePayloadErrorObjectWriter struct {
}
func (w *writePayloadErrorObjectWriter) WriteHeader(_ *objectSDK.Object) error {
func (w *writePayloadErrorObjectWriter) WriteHeader(_ context.Context, _ *objectSDK.Object) error {
return nil
}
func (w *writePayloadErrorObjectWriter) WriteChunk(p []byte) error {
func (w *writePayloadErrorObjectWriter) WriteChunk(_ context.Context, _ []byte) error {
return &writePayloadError{}
}
@ -1094,8 +1094,7 @@ func TestGetRemoteSmall(t *testing.T) {
p.WithAddress(addr)
err := svc.Get(ctx, p)
require.Error(t, err)
require.Equal(t, err.Error(), "wrong child header")
require.ErrorIs(t, err, errParentAddressDiffers)
w = NewSimpleObjectWriter()
payloadSz := srcObj.PayloadSize()
@ -1107,8 +1106,7 @@ func TestGetRemoteSmall(t *testing.T) {
rngPrm.WithAddress(addr)
err = svc.GetRange(ctx, rngPrm)
require.Error(t, err)
require.Equal(t, err.Error(), "wrong child header")
require.ErrorIs(t, err, errParentAddressDiffers)
})
t.Run("linked object with parent udefined", func(t *testing.T) {
@ -1464,8 +1462,7 @@ func TestGetRemoteSmall(t *testing.T) {
p.WithAddress(addr)
err := svc.Get(ctx, p)
require.Error(t, err)
require.Equal(t, err.Error(), "parent address in child object differs")
require.ErrorIs(t, err, errParentAddressDiffers)
w = NewSimpleObjectWriter()
payloadSz := srcObj.PayloadSize()
@ -1477,8 +1474,7 @@ func TestGetRemoteSmall(t *testing.T) {
rngPrm.WithAddress(addr)
err = svc.GetRange(ctx, rngPrm)
require.Error(t, err)
require.Equal(t, err.Error(), "parent address in child object differs")
require.ErrorIs(t, err, errParentAddressDiffers)
})
t.Run("OK", func(t *testing.T) {

View file

@ -1,6 +1,7 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"errors"
"hash"
@ -85,13 +86,13 @@ type commonPrm struct {
// ChunkWriter is an interface of target component
// to write payload chunk.
type ChunkWriter interface {
WriteChunk([]byte) error
WriteChunk(context.Context, []byte) error
}
// HeaderWriter is an interface of target component
// to write object header.
type HeaderWriter interface {
WriteHeader(*object.Object) error
WriteHeader(context.Context, *object.Object) error
}
// ObjectWriter is an interface of target component to write object.

View file

@ -1,6 +1,7 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"errors"
"io"
@ -54,7 +55,7 @@ func NewSimpleObjectWriter() *SimpleObjectWriter {
}
}
func (s *SimpleObjectWriter) WriteHeader(obj *object.Object) error {
func (s *SimpleObjectWriter) WriteHeader(_ context.Context, obj *object.Object) error {
s.obj = obj
s.pld = make([]byte, 0, obj.PayloadSize())
@ -62,7 +63,7 @@ func (s *SimpleObjectWriter) WriteHeader(obj *object.Object) error {
return nil
}
func (s *SimpleObjectWriter) WriteChunk(p []byte) error {
func (s *SimpleObjectWriter) WriteChunk(_ context.Context, p []byte) error {
s.pld = append(s.pld, p...)
return nil
}
@ -231,12 +232,12 @@ func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
}
}
func (w *partWriter) WriteChunk(p []byte) error {
return w.chunkWriter.WriteChunk(p)
func (w *partWriter) WriteChunk(ctx context.Context, p []byte) error {
return w.chunkWriter.WriteChunk(ctx, p)
}
func (w *partWriter) WriteHeader(o *object.Object) error {
return w.headWriter.WriteHeader(o)
func (w *partWriter) WriteHeader(ctx context.Context, o *object.Object) error {
return w.headWriter.WriteHeader(ctx, o)
}
func payloadOnlyObject(payload []byte) *object.Object {
@ -246,7 +247,7 @@ func payloadOnlyObject(payload []byte) *object.Object {
return obj
}
func (h *hasherWrapper) WriteChunk(p []byte) error {
func (h *hasherWrapper) WriteChunk(_ context.Context, p []byte) error {
_, err := h.hash.Write(p)
return err
}

View file

@ -1,6 +1,8 @@
package getsvc
import (
"context"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -14,7 +16,7 @@ type streamObjectRangeWriter struct {
objectSvc.GetObjectRangeStream
}
func (s *streamObjectWriter) WriteHeader(obj *object.Object) error {
func (s *streamObjectWriter) WriteHeader(_ context.Context, obj *object.Object) error {
p := new(objectV2.GetObjectPartInit)
objV2 := obj.ToV2()
@ -25,7 +27,7 @@ func (s *streamObjectWriter) WriteHeader(obj *object.Object) error {
return s.GetObjectStream.Send(newResponse(p))
}
func (s *streamObjectWriter) WriteChunk(chunk []byte) error {
func (s *streamObjectWriter) WriteChunk(_ context.Context, chunk []byte) error {
p := new(objectV2.GetObjectPartChunk)
p.SetChunk(chunk)
@ -43,7 +45,7 @@ func newResponse(p objectV2.GetObjectPart) *objectV2.GetResponse {
return r
}
func (s *streamObjectRangeWriter) WriteChunk(chunk []byte) error {
func (s *streamObjectRangeWriter) WriteChunk(_ context.Context, chunk []byte) error {
return s.GetObjectRangeStream.Send(newRangeResponse(chunk))
}

View file

@ -162,7 +162,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
obj.SetHeader(v.GetHeader())
onceHeaderSending.Do(func() {
err = streamWrapper.WriteHeader(object.NewFromV2(obj))
err = streamWrapper.WriteHeader(stream.Context(), object.NewFromV2(obj))
})
if err != nil {
return nil, fmt.Errorf("could not write object header in Get forwarder: %w", err)
@ -180,7 +180,7 @@ func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStre
continue
}
if err = streamWrapper.WriteChunk(chunk); err != nil {
if err = streamWrapper.WriteChunk(stream.Context(), chunk); err != nil {
return nil, fmt.Errorf("could not write object chunk in Get forwarder: %w", err)
}
@ -320,7 +320,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
continue
}
if err = streamWrapper.WriteChunk(chunk); err != nil {
if err = streamWrapper.WriteChunk(stream.Context(), chunk); err != nil {
return nil, fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err)
}
@ -414,7 +414,7 @@ type headResponseWriter struct {
body *objectV2.HeadResponseBody
}
func (w *headResponseWriter) WriteHeader(hdr *object.Object) error {
func (w *headResponseWriter) WriteHeader(_ context.Context, hdr *object.Object) error {
if w.mainOnly {
w.body.SetHeaderPart(toShortObjectHeader(hdr))
} else {

View file

@ -51,7 +51,7 @@ type headerWriter struct {
o *objectSDK.Object
}
func (h *headerWriter) WriteHeader(o *objectSDK.Object) error {
func (h *headerWriter) WriteHeader(_ context.Context, o *objectSDK.Object) error {
h.o = o
return nil
}