[#85] get-service: Add assembler
Extract assemble logic to assembler Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
b4582239bf
commit
8b9e40a848
1 changed files with 293 additions and 0 deletions
293
pkg/services/object/get/assembler.go
Normal file
293
pkg/services/object/get/assembler.go
Normal file
|
@ -0,0 +1,293 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
type objectGetter interface {
|
||||
GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Range) (*objectSDK.Object, error)
|
||||
HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error)
|
||||
}
|
||||
|
||||
type objectWriter interface {
|
||||
WriteChunk(context.Context, []byte) error
|
||||
WriteHeader(context.Context, *objectSDK.Object) error
|
||||
}
|
||||
|
||||
var (
|
||||
errParentAddressDiffers = errors.New("parent address in child object differs")
|
||||
)
|
||||
|
||||
type assembler struct {
|
||||
addr oid.Address
|
||||
splitInfo *objectSDK.SplitInfo
|
||||
rng *objectSDK.Range
|
||||
objGetter objectGetter
|
||||
|
||||
currentOffset uint64
|
||||
|
||||
parentObject *objectSDK.Object
|
||||
}
|
||||
|
||||
func newAssembler(
|
||||
addr oid.Address,
|
||||
splitInfo *objectSDK.SplitInfo,
|
||||
rng *objectSDK.Range,
|
||||
objGetter objectGetter) *assembler {
|
||||
return &assembler{
|
||||
addr: addr,
|
||||
rng: rng,
|
||||
splitInfo: splitInfo,
|
||||
objGetter: objGetter,
|
||||
}
|
||||
}
|
||||
|
||||
// Assemble assembles splitted large object and writes it's content to ObjectWriter.
|
||||
// It returns parent object.
|
||||
func (a *assembler) Assemble(ctx context.Context, writer objectWriter) (*objectSDK.Object, error) {
|
||||
sourceObjectID, ok := a.getLastPartOrLinkObjectID()
|
||||
if !ok {
|
||||
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||
}
|
||||
previousID, childrenIDs, err := a.initializeFromSourceObjectID(ctx, sourceObjectID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if previousID == nil && len(childrenIDs) == 0 {
|
||||
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||
}
|
||||
if len(childrenIDs) > 0 {
|
||||
if err := a.assembleObjectByChildrenList(ctx, childrenIDs, writer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
if err := a.assemleObjectByPreviousIDInReverse(ctx, *previousID, writer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
return a.parentObject, nil
|
||||
}
|
||||
|
||||
func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) {
|
||||
sourceObjectID, ok := a.splitInfo.Link()
|
||||
if ok {
|
||||
return sourceObjectID, true
|
||||
}
|
||||
sourceObjectID, ok = a.splitInfo.LastPart()
|
||||
if ok {
|
||||
return sourceObjectID, true
|
||||
}
|
||||
return oid.ID{}, false
|
||||
}
|
||||
|
||||
func (a *assembler) initializeFromSourceObjectID(ctx context.Context, id oid.ID) (*oid.ID, []oid.ID, error) {
|
||||
sourceObject, err := a.getChildObject(ctx, id, nil, true)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
parentObject := sourceObject.Parent()
|
||||
if parentObject == nil {
|
||||
return nil, nil, errors.New("received child with empty parent")
|
||||
}
|
||||
|
||||
a.parentObject = parentObject
|
||||
|
||||
var payload []byte
|
||||
|
||||
if a.rng != nil {
|
||||
seekOff := a.rng.GetOffset()
|
||||
seekLen := a.rng.GetLength()
|
||||
seekTo := seekOff + seekLen
|
||||
parentSize := parentObject.PayloadSize()
|
||||
|
||||
if seekTo < seekOff || parentSize < seekOff || parentSize < seekTo {
|
||||
return nil, nil, &apistatus.ObjectOutOfRange{}
|
||||
}
|
||||
|
||||
sourceSize := sourceObject.PayloadSize()
|
||||
|
||||
a.currentOffset = parentSize - sourceSize
|
||||
|
||||
from := uint64(0)
|
||||
if a.currentOffset < seekOff {
|
||||
from = seekOff - a.currentOffset
|
||||
}
|
||||
|
||||
to := uint64(0)
|
||||
if seekOff+seekLen > a.currentOffset+from {
|
||||
to = seekOff + seekLen - a.currentOffset
|
||||
}
|
||||
|
||||
payload = sourceObject.Payload()[from:to]
|
||||
a.rng.SetLength(a.rng.GetLength() - to + from)
|
||||
} else {
|
||||
payload = sourceObject.Payload()
|
||||
}
|
||||
|
||||
a.parentObject.SetPayload(payload)
|
||||
|
||||
idPrev, ok := sourceObject.PreviousID()
|
||||
if ok {
|
||||
return &idPrev, sourceObject.Children(), nil
|
||||
}
|
||||
|
||||
return nil, sourceObject.Children(), nil
|
||||
}
|
||||
|
||||
func (a *assembler) getChildObject(ctx context.Context, id oid.ID, rng *objectSDK.Range, verifyIsChild bool) (*objectSDK.Object, error) {
|
||||
obj, err := a.objGetter.GetObject(ctx, id, rng)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if verifyIsChild && !a.isChild(obj) {
|
||||
return nil, errParentAddressDiffers
|
||||
}
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
func (a *assembler) assembleObjectByChildrenList(ctx context.Context, childrenIDs []oid.ID, writer objectWriter) error {
|
||||
if a.rng == nil {
|
||||
if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil {
|
||||
return err
|
||||
}
|
||||
return a.assemblePayloadByObjectIDs(ctx, writer, childrenIDs, nil, true)
|
||||
}
|
||||
|
||||
if err := a.assemblePayloadInReverse(ctx, writer, childrenIDs[len(childrenIDs)-1]); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := writer.WriteChunk(ctx, a.parentObject.Payload()); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *assembler) assemleObjectByPreviousIDInReverse(ctx context.Context, prevID oid.ID, writer objectWriter) error {
|
||||
if a.rng == nil {
|
||||
if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := a.assemblePayloadInReverse(ctx, writer, prevID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := writer.WriteChunk(ctx, a.parentObject.Payload()); err != nil { // last part
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *assembler) assemblePayloadByObjectIDs(ctx context.Context, writer objectWriter, partIDs []oid.ID, partRanges []objectSDK.Range, verifyIsChild bool) error {
|
||||
withRng := len(partRanges) > 0 && a.rng != nil
|
||||
|
||||
for i := range partIDs {
|
||||
var r *objectSDK.Range
|
||||
if withRng {
|
||||
r = &partRanges[i]
|
||||
}
|
||||
|
||||
child, err := a.getChildObject(ctx, partIDs[i], r, verifyIsChild)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := writer.WriteChunk(ctx, child.Payload()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *assembler) assemblePayloadInReverse(ctx context.Context, writer objectWriter, prevID oid.ID) error {
|
||||
chain, rngs, err := a.buildChain(ctx, prevID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reverseRngs := len(rngs) > 0
|
||||
|
||||
for left, right := 0, len(chain)-1; left < right; left, right = left+1, right-1 {
|
||||
chain[left], chain[right] = chain[right], chain[left]
|
||||
|
||||
if reverseRngs {
|
||||
rngs[left], rngs[right] = rngs[right], rngs[left]
|
||||
}
|
||||
}
|
||||
|
||||
return a.assemblePayloadByObjectIDs(ctx, writer, chain, rngs, false)
|
||||
}
|
||||
|
||||
func (a *assembler) isChild(obj *objectSDK.Object) bool {
|
||||
parent := obj.Parent()
|
||||
return parent == nil || equalAddresses(a.addr, object.AddressOf(parent))
|
||||
}
|
||||
|
||||
func (a *assembler) buildChain(ctx context.Context, prevID oid.ID) ([]oid.ID, []objectSDK.Range, error) {
|
||||
var (
|
||||
chain []oid.ID
|
||||
rngs []objectSDK.Range
|
||||
from = a.rng.GetOffset()
|
||||
to = from + a.rng.GetLength()
|
||||
|
||||
hasPrev = true
|
||||
)
|
||||
|
||||
// fill the chain end-to-start
|
||||
for hasPrev {
|
||||
// check that only for "range" requests,
|
||||
// for `GET` it stops via the false `withPrev`
|
||||
if a.rng != nil && a.currentOffset <= from {
|
||||
break
|
||||
}
|
||||
|
||||
head, err := a.objGetter.HeadObject(ctx, prevID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if !a.isChild(head) {
|
||||
return nil, nil, errParentAddressDiffers
|
||||
}
|
||||
|
||||
if a.rng != nil {
|
||||
sz := head.PayloadSize()
|
||||
|
||||
a.currentOffset -= sz
|
||||
|
||||
if a.currentOffset < to {
|
||||
off := uint64(0)
|
||||
if from > a.currentOffset {
|
||||
off = from - a.currentOffset
|
||||
sz -= from - a.currentOffset
|
||||
}
|
||||
|
||||
if to < a.currentOffset+off+sz {
|
||||
sz = to - off - a.currentOffset
|
||||
}
|
||||
|
||||
index := len(rngs)
|
||||
rngs = append(rngs, objectSDK.Range{})
|
||||
rngs[index].SetOffset(off)
|
||||
rngs[index].SetLength(sz)
|
||||
|
||||
id, _ := head.ID()
|
||||
chain = append(chain, id)
|
||||
}
|
||||
} else {
|
||||
id, _ := head.ID()
|
||||
chain = append(chain, id)
|
||||
}
|
||||
|
||||
prevID, hasPrev = head.PreviousID()
|
||||
}
|
||||
|
||||
return chain, rngs, nil
|
||||
}
|
Loading…
Reference in a new issue