Split head/get/range into separate functions #1616

Merged
fyrchik merged 5 commits from fyrchik/frostfs-node:refactor-get into master 2025-01-30 06:50:38 +00:00
3 changed files with 158 additions and 115 deletions

View file

@ -2,6 +2,7 @@ package getsvc
import (
"context"
"slices"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -59,53 +60,24 @@ func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectS
if previousID == nil && len(childrenIDs) == 0 {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
if len(childrenIDs) > 0 {
a-savchuk marked this conversation as resolved
Review

How about something like this?

switch {
case len(childrenIDs) > 0 && rng != nil:
    err = a.assembleObjectByChildrenListRange(ctx, childrenIDs, writer)
case len(childrenIDs) > 0:
    err = a.assembleObjectByChildrenList(ctx, childrenIDs, writer)
case rng != nil:
    err = a.assemleObjectByPreviousIDInReverseRange(ctx, *previousID, writer)
default:
    err = a.assemleObjectByPreviousIDInReverse(ctx, *previousID, writer)
}
How about something like this? ```go switch { case len(childrenIDs) > 0 && rng != nil: err = a.assembleObjectByChildrenListRange(ctx, childrenIDs, writer) case len(childrenIDs) > 0: err = a.assembleObjectByChildrenList(ctx, childrenIDs, writer) case rng != nil: err = a.assemleObjectByPreviousIDInReverseRange(ctx, *previousID, writer) default: err = a.assemleObjectByPreviousIDInReverse(ctx, *previousID, writer) } ```
Review

It definitely looks better, to my taste, but I will remove this in another PR anyway.
In this one, I tried to minimize diff (do not touch outer if).

It definitely looks better, to my taste, but I will remove this in another PR anyway. In this one, I tried to minimize diff (do not touch outer `if`).
if err := a.assembleObjectByChildrenList(ctx, childrenIDs, writer); err != nil {
return nil, err
if a.rng != nil {
err = a.assembleObjectByChildrenListRange(ctx, childrenIDs, writer)
} else {
err = a.assembleObjectByChildrenList(ctx, childrenIDs, writer)
}
} else {
if err := a.assemleObjectByPreviousIDInReverse(ctx, *previousID, writer); err != nil {
return nil, err
if a.rng != nil {
err = a.assemleObjectByPreviousIDInReverseRange(ctx, *previousID, writer)
} else {
err = a.assemleObjectByPreviousIDInReverse(ctx, *previousID, writer)
}
}
return a.parentObject, nil
}
func (a *assembler) assembleHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
var sourceObjectIDs []oid.ID
sourceObjectID, ok := a.splitInfo.Link()
if ok {
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
}
sourceObjectID, ok = a.splitInfo.LastPart()
if ok {
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
}
if len(sourceObjectIDs) == 0 {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
for _, sourceObjectID = range sourceObjectIDs {
obj, err := a.getParent(ctx, sourceObjectID, writer)
if err == nil {
return obj, nil
}
}
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
func (a *assembler) getParent(ctx context.Context, sourceObjectID oid.ID, writer ObjectWriter) (*objectSDK.Object, error) {
obj, err := a.objGetter.HeadObject(ctx, sourceObjectID)
if err != nil {
return nil, err
}
parent := obj.Parent()
if parent == nil {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
if err := writer.WriteHeader(ctx, parent); err != nil {
return nil, err
}
return obj, nil
return a.parentObject, nil
}
func (a *assembler) getLastPartOrLinkObjectID() (oid.ID, bool) {
@ -190,26 +162,16 @@ func (a *assembler) getChildObject(ctx context.Context, id oid.ID, rng *objectSD
}
func (a *assembler) assembleObjectByChildrenList(ctx context.Context, childrenIDs []oid.ID, writer ObjectWriter) error {
if a.rng == nil {
if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil {
return err
}
return a.assemblePayloadByObjectIDs(ctx, writer, childrenIDs, nil, true)
}
if err := a.assemblePayloadInReverse(ctx, writer, childrenIDs[len(childrenIDs)-1]); err != nil {
if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil {
return err
}
return writer.WriteChunk(ctx, a.parentObject.Payload())
return a.assemblePayloadByObjectIDs(ctx, writer, childrenIDs, true)
}
func (a *assembler) assemleObjectByPreviousIDInReverse(ctx context.Context, prevID oid.ID, writer ObjectWriter) error {
if a.rng == nil {
if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil {
return err
}
if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil {
return err
}
if err := a.assemblePayloadInReverse(ctx, writer, prevID); err != nil {
return err
}
@ -219,16 +181,9 @@ func (a *assembler) assemleObjectByPreviousIDInReverse(ctx context.Context, prev
return nil
}
func (a *assembler) assemblePayloadByObjectIDs(ctx context.Context, writer ObjectWriter, partIDs []oid.ID, partRanges []objectSDK.Range, verifyIsChild bool) error {
withRng := len(partRanges) > 0 && a.rng != nil
func (a *assembler) assemblePayloadByObjectIDs(ctx context.Context, writer ObjectWriter, partIDs []oid.ID, verifyIsChild bool) error {
for i := range partIDs {
var r *objectSDK.Range
if withRng {
r = &partRanges[i]
}
_, err := a.getChildObject(ctx, partIDs[i], r, verifyIsChild, writer)
_, err := a.getChildObject(ctx, partIDs[i], nil, verifyIsChild, writer)
if err != nil {
return err
}
@ -237,22 +192,13 @@ func (a *assembler) assemblePayloadByObjectIDs(ctx context.Context, writer Objec
}
func (a *assembler) assemblePayloadInReverse(ctx context.Context, writer ObjectWriter, prevID oid.ID) error {
chain, rngs, err := a.buildChain(ctx, prevID)
chain, err := a.buildChain(ctx, prevID)
if err != nil {
return err
}
reverseRngs := len(rngs) > 0
for left, right := 0, len(chain)-1; left < right; left, right = left+1, right-1 {
chain[left], chain[right] = chain[right], chain[left]
if reverseRngs {
rngs[left], rngs[right] = rngs[right], rngs[left]
}
}
return a.assemblePayloadByObjectIDs(ctx, writer, chain, rngs, false)
slices.Reverse(chain)
return a.assemblePayloadByObjectIDs(ctx, writer, chain, false)
}
func (a *assembler) isChild(obj *objectSDK.Object) bool {
@ -260,63 +206,28 @@ func (a *assembler) isChild(obj *objectSDK.Object) bool {
return parent == nil || equalAddresses(a.addr, object.AddressOf(parent))
}
func (a *assembler) buildChain(ctx context.Context, prevID oid.ID) ([]oid.ID, []objectSDK.Range, error) {
func (a *assembler) buildChain(ctx context.Context, prevID oid.ID) ([]oid.ID, error) {
var (
chain []oid.ID
rngs []objectSDK.Range
from = a.rng.GetOffset()
to = from + a.rng.GetLength()
hasPrev = true
)
// fill the chain end-to-start
for hasPrev {
// check that only for "range" requests,
// for `GET` it stops via the false `withPrev`
if a.rng != nil && a.currentOffset <= from {
break
}
head, err := a.objGetter.HeadObject(ctx, prevID)
if err != nil {
return nil, nil, err
return nil, err
}
if !a.isChild(head) {
return nil, nil, errParentAddressDiffers
return nil, errParentAddressDiffers
}
if a.rng != nil {
sz := head.PayloadSize()
a.currentOffset -= sz
if a.currentOffset < to {
off := uint64(0)
if from > a.currentOffset {
off = from - a.currentOffset
sz -= from - a.currentOffset
}
if to < a.currentOffset+off+sz {
sz = to - off - a.currentOffset
}
index := len(rngs)
rngs = append(rngs, objectSDK.Range{})
rngs[index].SetOffset(off)
rngs[index].SetLength(sz)
id, _ := head.ID()
chain = append(chain, id)
}
} else {
id, _ := head.ID()
chain = append(chain, id)
}
id, _ := head.ID()
chain = append(chain, id)
prevID, hasPrev = head.PreviousID()
}
return chain, rngs, nil
return chain, nil
}

View file

@ -0,0 +1,45 @@
package getsvc
import (
"context"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
func (a *assembler) assembleHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
var sourceObjectIDs []oid.ID
sourceObjectID, ok := a.splitInfo.Link()
if ok {
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
}
sourceObjectID, ok = a.splitInfo.LastPart()
if ok {
sourceObjectIDs = append(sourceObjectIDs, sourceObjectID)
}
if len(sourceObjectIDs) == 0 {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
for _, sourceObjectID = range sourceObjectIDs {
obj, err := a.getParent(ctx, sourceObjectID, writer)
if err == nil {
return obj, nil
}
}
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
func (a *assembler) getParent(ctx context.Context, sourceObjectID oid.ID, writer ObjectWriter) (*objectSDK.Object, error) {
obj, err := a.objGetter.HeadObject(ctx, sourceObjectID)
if err != nil {
return nil, err
}
parent := obj.Parent()
if parent == nil {
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
}
if err := writer.WriteHeader(ctx, parent); err != nil {
return nil, err
}
return obj, nil
}

View file

@ -0,0 +1,87 @@
package getsvc
import (
"context"
"slices"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
func (a *assembler) assembleObjectByChildrenListRange(ctx context.Context, childrenIDs []oid.ID, writer ObjectWriter) error {
if err := a.assemblePayloadInReverseRange(ctx, writer, childrenIDs[len(childrenIDs)-1]); err != nil {
return err
}
return writer.WriteChunk(ctx, a.parentObject.Payload())
}
func (a *assembler) assemleObjectByPreviousIDInReverseRange(ctx context.Context, prevID oid.ID, writer ObjectWriter) error {
if err := a.assemblePayloadInReverseRange(ctx, writer, prevID); err != nil {
return err
}
if err := writer.WriteChunk(ctx, a.parentObject.Payload()); err != nil { // last part
return err
}
return nil
}
func (a *assembler) assemblePayloadByObjectIDsRange(ctx context.Context, writer ObjectWriter, partIDs []oid.ID, partRanges []objectSDK.Range) error {
for i := range partIDs {
_, err := a.getChildObject(ctx, partIDs[i], &partRanges[i], false, writer)
if err != nil {
return err
}
}
return nil
}
func (a *assembler) assemblePayloadInReverseRange(ctx context.Context, writer ObjectWriter, prevID oid.ID) error {
chain, rngs, err := a.buildChainRange(ctx, prevID)
if err != nil {
return err
}
slices.Reverse(chain)
slices.Reverse(rngs)
return a.assemblePayloadByObjectIDsRange(ctx, writer, chain, rngs)
}
func (a *assembler) buildChainRange(ctx context.Context, prevID oid.ID) ([]oid.ID, []objectSDK.Range, error) {
var (
chain []oid.ID
rngs []objectSDK.Range
from = a.rng.GetOffset()
to = from + a.rng.GetLength()
hasPrev = true
)
// fill the chain end-to-start
for hasPrev && from < a.currentOffset {
head, err := a.objGetter.HeadObject(ctx, prevID)
if err != nil {
return nil, nil, err
}
if !a.isChild(head) {
return nil, nil, errParentAddressDiffers
}
nextOffset := a.currentOffset - head.PayloadSize()
clampedFrom := max(from, nextOffset)
clampedTo := min(to, a.currentOffset)
if clampedFrom < clampedTo {
index := len(rngs)
rngs = append(rngs, objectSDK.Range{})
rngs[index].SetOffset(clampedFrom - nextOffset)
rngs[index].SetLength(clampedTo - clampedFrom)
id, _ := head.ID()
chain = append(chain, id)
}
a.currentOffset = nextOffset
prevID, hasPrev = head.PreviousID()
}
return chain, rngs, nil
}