Split head/get/range into separate functions #1616
3 changed files with 158 additions and 115 deletions
|
@ -2,6 +2,7 @@ package getsvc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
|
@ -59,53 +60,24 @@ func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectS
|
|||
if previousID == nil && len(childrenIDs) == 0 {
|
||||
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||
}
|
||||
|
||||
if len(childrenIDs) > 0 {
|
||||
a-savchuk marked this conversation as resolved
|
||||
if err := a.assembleObjectByChildrenList(ctx, childrenIDs, writer); err != nil {
|
||||
return nil, err
|
||||
if a.rng != nil {
|
||||
err = a.assembleObjectByChildrenListRange(ctx, childrenIDs, writer)
|
||||
} else {
|
||||
err = a.assembleObjectByChildrenList(ctx, childrenIDs, writer)
|
||||
}
|
||||
} else {
|
||||
if err := a.assemleObjectByPreviousIDInReverse(ctx, *previousID, writer); err != nil {
|
||||
return nil, err
|
||||
if a.rng != nil {
|
||||
err = a.assemleObjectByPreviousIDInReverseRange(ctx, *previousID, writer)
|
||||
} else {
|
||||
err = a.assemleObjectByPreviousIDInReverse(ctx, *previousID, writer)
|
||||
}
|
||||
}
|
||||
return a.parentObject, nil
|
||||
}
|
||||
|
||||
func (a *assembler) assembleHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
var sourceObjectIDs []oid.ID
|
||||
sourceObjectID, ok := a.splitInfo.Link()
|
||||
if ok {
|
||||
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
|
||||
}
|
||||
sourceObjectID, ok = a.splitInfo.LastPart()
|
||||
if ok {
|
||||
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
|
||||
}
|
||||
if len(sourceObjectIDs) == 0 {
|
||||
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||
}
|
||||
for _, sourceObjectID = range sourceObjectIDs {
|
||||
obj, err := a.getParent(ctx, sourceObjectID, writer)
|
||||
if err == nil {
|
||||
return obj, nil
|
||||
}
|
||||
}
|
||||
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||
}
|
||||
|
||||
func (a *assembler) getParent(ctx context.Context, sourceObjectID oid.ID, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
obj, err := a.objGetter.HeadObject(ctx, sourceObjectID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
parent := obj.Parent()
|
||||
if parent == nil {
|
||||
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||
}
|
||||
if err := writer.WriteHeader(ctx, parent); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return obj, nil
|
||||
return a.parentObject, nil
|
||||
}
|
||||
|
||||
func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) {
|
||||
|
@ -190,26 +162,16 @@ func (a *assembler) getChildObject(ctx context.Context, id oid.ID, rng *objectSD
|
|||
}
|
||||
|
||||
func (a *assembler) assembleObjectByChildrenList(ctx context.Context, childrenIDs []oid.ID, writer ObjectWriter) error {
|
||||
if a.rng == nil {
|
||||
if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil {
|
||||
return err
|
||||
}
|
||||
return a.assemblePayloadByObjectIDs(ctx, writer, childrenIDs, nil, true)
|
||||
}
|
||||
|
||||
if err := a.assemblePayloadInReverse(ctx, writer, childrenIDs[len(childrenIDs)-1]); err != nil {
|
||||
if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil {
|
||||
return err
|
||||
}
|
||||
return writer.WriteChunk(ctx, a.parentObject.Payload())
|
||||
return a.assemblePayloadByObjectIDs(ctx, writer, childrenIDs, true)
|
||||
}
|
||||
|
||||
func (a *assembler) assemleObjectByPreviousIDInReverse(ctx context.Context, prevID oid.ID, writer ObjectWriter) error {
|
||||
if a.rng == nil {
|
||||
if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := a.assemblePayloadInReverse(ctx, writer, prevID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -219,16 +181,9 @@ func (a *assembler) assemleObjectByPreviousIDInReverse(ctx context.Context, prev
|
|||
return nil
|
||||
}
|
||||
|
||||
func (a *assembler) assemblePayloadByObjectIDs(ctx context.Context, writer ObjectWriter, partIDs []oid.ID, partRanges []objectSDK.Range, verifyIsChild bool) error {
|
||||
withRng := len(partRanges) > 0 && a.rng != nil
|
||||
|
||||
func (a *assembler) assemblePayloadByObjectIDs(ctx context.Context, writer ObjectWriter, partIDs []oid.ID, verifyIsChild bool) error {
|
||||
for i := range partIDs {
|
||||
var r *objectSDK.Range
|
||||
if withRng {
|
||||
r = &partRanges[i]
|
||||
}
|
||||
|
||||
_, err := a.getChildObject(ctx, partIDs[i], r, verifyIsChild, writer)
|
||||
_, err := a.getChildObject(ctx, partIDs[i], nil, verifyIsChild, writer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -237,22 +192,13 @@ func (a *assembler) assemblePayloadByObjectIDs(ctx context.Context, writer Objec
|
|||
}
|
||||
|
||||
func (a *assembler) assemblePayloadInReverse(ctx context.Context, writer ObjectWriter, prevID oid.ID) error {
|
||||
chain, rngs, err := a.buildChain(ctx, prevID)
|
||||
chain, err := a.buildChain(ctx, prevID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reverseRngs := len(rngs) > 0
|
||||
|
||||
for left, right := 0, len(chain)-1; left < right; left, right = left+1, right-1 {
|
||||
chain[left], chain[right] = chain[right], chain[left]
|
||||
|
||||
if reverseRngs {
|
||||
rngs[left], rngs[right] = rngs[right], rngs[left]
|
||||
}
|
||||
}
|
||||
|
||||
return a.assemblePayloadByObjectIDs(ctx, writer, chain, rngs, false)
|
||||
slices.Reverse(chain)
|
||||
return a.assemblePayloadByObjectIDs(ctx, writer, chain, false)
|
||||
}
|
||||
|
||||
func (a *assembler) isChild(obj *objectSDK.Object) bool {
|
||||
|
@ -260,63 +206,28 @@ func (a *assembler) isChild(obj *objectSDK.Object) bool {
|
|||
return parent == nil || equalAddresses(a.addr, object.AddressOf(parent))
|
||||
}
|
||||
|
||||
func (a *assembler) buildChain(ctx context.Context, prevID oid.ID) ([]oid.ID, []objectSDK.Range, error) {
|
||||
func (a *assembler) buildChain(ctx context.Context, prevID oid.ID) ([]oid.ID, error) {
|
||||
var (
|
||||
chain []oid.ID
|
||||
rngs []objectSDK.Range
|
||||
from = a.rng.GetOffset()
|
||||
to = from + a.rng.GetLength()
|
||||
|
||||
hasPrev = true
|
||||
)
|
||||
|
||||
// fill the chain end-to-start
|
||||
for hasPrev {
|
||||
// check that only for "range" requests,
|
||||
// for `GET` it stops via the false `withPrev`
|
||||
if a.rng != nil && a.currentOffset <= from {
|
||||
break
|
||||
}
|
||||
|
||||
head, err := a.objGetter.HeadObject(ctx, prevID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
if !a.isChild(head) {
|
||||
return nil, nil, errParentAddressDiffers
|
||||
return nil, errParentAddressDiffers
|
||||
}
|
||||
|
||||
if a.rng != nil {
|
||||
sz := head.PayloadSize()
|
||||
|
||||
a.currentOffset -= sz
|
||||
|
||||
if a.currentOffset < to {
|
||||
off := uint64(0)
|
||||
if from > a.currentOffset {
|
||||
off = from - a.currentOffset
|
||||
sz -= from - a.currentOffset
|
||||
}
|
||||
|
||||
if to < a.currentOffset+off+sz {
|
||||
sz = to - off - a.currentOffset
|
||||
}
|
||||
|
||||
index := len(rngs)
|
||||
rngs = append(rngs, objectSDK.Range{})
|
||||
rngs[index].SetOffset(off)
|
||||
rngs[index].SetLength(sz)
|
||||
|
||||
id, _ := head.ID()
|
||||
chain = append(chain, id)
|
||||
}
|
||||
} else {
|
||||
id, _ := head.ID()
|
||||
chain = append(chain, id)
|
||||
}
|
||||
id, _ := head.ID()
|
||||
chain = append(chain, id)
|
||||
|
||||
prevID, hasPrev = head.PreviousID()
|
||||
}
|
||||
|
||||
return chain, rngs, nil
|
||||
return chain, nil
|
||||
}
|
||||
|
|
45
pkg/services/object/get/assembler_head.go
Normal file
45
pkg/services/object/get/assembler_head.go
Normal file
|
@ -0,0 +1,45 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
func (a *assembler) assembleHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
var sourceObjectIDs []oid.ID
|
||||
sourceObjectID, ok := a.splitInfo.Link()
|
||||
if ok {
|
||||
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
|
||||
}
|
||||
sourceObjectID, ok = a.splitInfo.LastPart()
|
||||
if ok {
|
||||
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
|
||||
}
|
||||
if len(sourceObjectIDs) == 0 {
|
||||
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||
}
|
||||
for _, sourceObjectID = range sourceObjectIDs {
|
||||
obj, err := a.getParent(ctx, sourceObjectID, writer)
|
||||
if err == nil {
|
||||
return obj, nil
|
||||
}
|
||||
}
|
||||
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||
}
|
||||
|
||||
func (a *assembler) getParent(ctx context.Context, sourceObjectID oid.ID, writer ObjectWriter) (*objectSDK.Object, error) {
|
||||
obj, err := a.objGetter.HeadObject(ctx, sourceObjectID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
parent := obj.Parent()
|
||||
if parent == nil {
|
||||
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||
}
|
||||
if err := writer.WriteHeader(ctx, parent); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return obj, nil
|
||||
}
|
87
pkg/services/object/get/assembler_range.go
Normal file
87
pkg/services/object/get/assembler_range.go
Normal file
|
@ -0,0 +1,87 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
func (a *assembler) assembleObjectByChildrenListRange(ctx context.Context, childrenIDs []oid.ID, writer ObjectWriter) error {
|
||||
if err := a.assemblePayloadInReverseRange(ctx, writer, childrenIDs[len(childrenIDs)-1]); err != nil {
|
||||
return err
|
||||
}
|
||||
return writer.WriteChunk(ctx, a.parentObject.Payload())
|
||||
}
|
||||
|
||||
func (a *assembler) assemleObjectByPreviousIDInReverseRange(ctx context.Context, prevID oid.ID, writer ObjectWriter) error {
|
||||
if err := a.assemblePayloadInReverseRange(ctx, writer, prevID); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := writer.WriteChunk(ctx, a.parentObject.Payload()); err != nil { // last part
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *assembler) assemblePayloadByObjectIDsRange(ctx context.Context, writer ObjectWriter, partIDs []oid.ID, partRanges []objectSDK.Range) error {
|
||||
for i := range partIDs {
|
||||
_, err := a.getChildObject(ctx, partIDs[i], &partRanges[i], false, writer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *assembler) assemblePayloadInReverseRange(ctx context.Context, writer ObjectWriter, prevID oid.ID) error {
|
||||
chain, rngs, err := a.buildChainRange(ctx, prevID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
slices.Reverse(chain)
|
||||
slices.Reverse(rngs)
|
||||
return a.assemblePayloadByObjectIDsRange(ctx, writer, chain, rngs)
|
||||
}
|
||||
|
||||
func (a *assembler) buildChainRange(ctx context.Context, prevID oid.ID) ([]oid.ID, []objectSDK.Range, error) {
|
||||
var (
|
||||
chain []oid.ID
|
||||
rngs []objectSDK.Range
|
||||
from = a.rng.GetOffset()
|
||||
to = from + a.rng.GetLength()
|
||||
|
||||
hasPrev = true
|
||||
)
|
||||
|
||||
// fill the chain end-to-start
|
||||
for hasPrev && from < a.currentOffset {
|
||||
head, err := a.objGetter.HeadObject(ctx, prevID)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if !a.isChild(head) {
|
||||
return nil, nil, errParentAddressDiffers
|
||||
}
|
||||
|
||||
nextOffset := a.currentOffset - head.PayloadSize()
|
||||
clampedFrom := max(from, nextOffset)
|
||||
clampedTo := min(to, a.currentOffset)
|
||||
if clampedFrom < clampedTo {
|
||||
index := len(rngs)
|
||||
rngs = append(rngs, objectSDK.Range{})
|
||||
rngs[index].SetOffset(clampedFrom - nextOffset)
|
||||
rngs[index].SetLength(clampedTo - clampedFrom)
|
||||
|
||||
id, _ := head.ID()
|
||||
chain = append(chain, id)
|
||||
}
|
||||
|
||||
a.currentOffset = nextOffset
|
||||
prevID, hasPrev = head.PreviousID()
|
||||
}
|
||||
|
||||
return chain, rngs, nil
|
||||
}
|
Loading…
Add table
Reference in a new issue
How about something like this?
It definitely looks better, to my taste, but I will remove this in another PR anyway.
In this one, I tried to minimize diff (do not touch outer
if
).