[#1616] getsvc: Move range assembling to a separate file
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
This commit is contained in:
parent
c0a341a7f6
commit
6410542d19
2 changed files with 129 additions and 76 deletions
|
@ -59,15 +59,23 @@ func (a *assembler) Assemble(ctx context.Context, writer ObjectWriter) (*objectS
|
||||||
if previousID == nil && len(childrenIDs) == 0 {
|
if previousID == nil && len(childrenIDs) == 0 {
|
||||||
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
return nil, objectSDK.NewSplitInfoError(a.splitInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(childrenIDs) > 0 {
|
if len(childrenIDs) > 0 {
|
||||||
if err := a.assembleObjectByChildrenList(ctx, childrenIDs, writer); err != nil {
|
if a.rng != nil {
|
||||||
return nil, err
|
err = a.assembleObjectByChildrenListRange(ctx, childrenIDs, writer)
|
||||||
|
} else {
|
||||||
|
err = a.assembleObjectByChildrenList(ctx, childrenIDs, writer)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if err := a.assemleObjectByPreviousIDInReverse(ctx, *previousID, writer); err != nil {
|
if a.rng != nil {
|
||||||
return nil, err
|
err = a.assemleObjectByPreviousIDInReverseRange(ctx, *previousID, writer)
|
||||||
|
} else {
|
||||||
|
err = a.assemleObjectByPreviousIDInReverse(ctx, *previousID, writer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
return a.parentObject, nil
|
return a.parentObject, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -153,26 +161,16 @@ func (a *assembler) getChildObject(ctx context.Context, id oid.ID, rng *objectSD
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *assembler) assembleObjectByChildrenList(ctx context.Context, childrenIDs []oid.ID, writer ObjectWriter) error {
|
func (a *assembler) assembleObjectByChildrenList(ctx context.Context, childrenIDs []oid.ID, writer ObjectWriter) error {
|
||||||
if a.rng == nil {
|
|
||||||
if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil {
|
if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return a.assemblePayloadByObjectIDs(ctx, writer, childrenIDs, nil, true)
|
return a.assemblePayloadByObjectIDs(ctx, writer, childrenIDs, true)
|
||||||
}
|
|
||||||
|
|
||||||
if err := a.assemblePayloadInReverse(ctx, writer, childrenIDs[len(childrenIDs)-1]); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return writer.WriteChunk(ctx, a.parentObject.Payload())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *assembler) assemleObjectByPreviousIDInReverse(ctx context.Context, prevID oid.ID, writer ObjectWriter) error {
|
func (a *assembler) assemleObjectByPreviousIDInReverse(ctx context.Context, prevID oid.ID, writer ObjectWriter) error {
|
||||||
if a.rng == nil {
|
|
||||||
if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil {
|
if err := writer.WriteHeader(ctx, a.parentObject.CutPayload()); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if err := a.assemblePayloadInReverse(ctx, writer, prevID); err != nil {
|
if err := a.assemblePayloadInReverse(ctx, writer, prevID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -182,16 +180,9 @@ func (a *assembler) assemleObjectByPreviousIDInReverse(ctx context.Context, prev
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *assembler) assemblePayloadByObjectIDs(ctx context.Context, writer ObjectWriter, partIDs []oid.ID, partRanges []objectSDK.Range, verifyIsChild bool) error {
|
func (a *assembler) assemblePayloadByObjectIDs(ctx context.Context, writer ObjectWriter, partIDs []oid.ID, verifyIsChild bool) error {
|
||||||
withRng := len(partRanges) > 0 && a.rng != nil
|
|
||||||
|
|
||||||
for i := range partIDs {
|
for i := range partIDs {
|
||||||
var r *objectSDK.Range
|
_, err := a.getChildObject(ctx, partIDs[i], nil, verifyIsChild, writer)
|
||||||
if withRng {
|
|
||||||
r = &partRanges[i]
|
|
||||||
}
|
|
||||||
|
|
||||||
_, err := a.getChildObject(ctx, partIDs[i], r, verifyIsChild, writer)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -200,22 +191,16 @@ func (a *assembler) assemblePayloadByObjectIDs(ctx context.Context, writer Objec
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *assembler) assemblePayloadInReverse(ctx context.Context, writer ObjectWriter, prevID oid.ID) error {
|
func (a *assembler) assemblePayloadInReverse(ctx context.Context, writer ObjectWriter, prevID oid.ID) error {
|
||||||
chain, rngs, err := a.buildChain(ctx, prevID)
|
chain, err := a.buildChain(ctx, prevID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
reverseRngs := len(rngs) > 0
|
|
||||||
|
|
||||||
for left, right := 0, len(chain)-1; left < right; left, right = left+1, right-1 {
|
for left, right := 0, len(chain)-1; left < right; left, right = left+1, right-1 {
|
||||||
chain[left], chain[right] = chain[right], chain[left]
|
chain[left], chain[right] = chain[right], chain[left]
|
||||||
|
|
||||||
if reverseRngs {
|
|
||||||
rngs[left], rngs[right] = rngs[right], rngs[left]
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return a.assemblePayloadByObjectIDs(ctx, writer, chain, rngs, false)
|
return a.assemblePayloadByObjectIDs(ctx, writer, chain, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *assembler) isChild(obj *objectSDK.Object) bool {
|
func (a *assembler) isChild(obj *objectSDK.Object) bool {
|
||||||
|
@ -223,63 +208,28 @@ func (a *assembler) isChild(obj *objectSDK.Object) bool {
|
||||||
return parent == nil || equalAddresses(a.addr, object.AddressOf(parent))
|
return parent == nil || equalAddresses(a.addr, object.AddressOf(parent))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *assembler) buildChain(ctx context.Context, prevID oid.ID) ([]oid.ID, []objectSDK.Range, error) {
|
func (a *assembler) buildChain(ctx context.Context, prevID oid.ID) ([]oid.ID, error) {
|
||||||
var (
|
var (
|
||||||
chain []oid.ID
|
chain []oid.ID
|
||||||
rngs []objectSDK.Range
|
|
||||||
from = a.rng.GetOffset()
|
|
||||||
to = from + a.rng.GetLength()
|
|
||||||
|
|
||||||
hasPrev = true
|
hasPrev = true
|
||||||
)
|
)
|
||||||
|
|
||||||
// fill the chain end-to-start
|
// fill the chain end-to-start
|
||||||
for hasPrev {
|
for hasPrev {
|
||||||
// check that only for "range" requests,
|
|
||||||
// for `GET` it stops via the false `withPrev`
|
|
||||||
if a.rng != nil && a.currentOffset <= from {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
head, err := a.objGetter.HeadObject(ctx, prevID)
|
head, err := a.objGetter.HeadObject(ctx, prevID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if !a.isChild(head) {
|
if !a.isChild(head) {
|
||||||
return nil, nil, errParentAddressDiffers
|
return nil, errParentAddressDiffers
|
||||||
}
|
}
|
||||||
|
|
||||||
if a.rng != nil {
|
|
||||||
sz := head.PayloadSize()
|
|
||||||
|
|
||||||
a.currentOffset -= sz
|
|
||||||
|
|
||||||
if a.currentOffset < to {
|
|
||||||
off := uint64(0)
|
|
||||||
if from > a.currentOffset {
|
|
||||||
off = from - a.currentOffset
|
|
||||||
sz -= from - a.currentOffset
|
|
||||||
}
|
|
||||||
|
|
||||||
if to < a.currentOffset+off+sz {
|
|
||||||
sz = to - off - a.currentOffset
|
|
||||||
}
|
|
||||||
|
|
||||||
index := len(rngs)
|
|
||||||
rngs = append(rngs, objectSDK.Range{})
|
|
||||||
rngs[index].SetOffset(off)
|
|
||||||
rngs[index].SetLength(sz)
|
|
||||||
|
|
||||||
id, _ := head.ID()
|
id, _ := head.ID()
|
||||||
chain = append(chain, id)
|
chain = append(chain, id)
|
||||||
}
|
|
||||||
} else {
|
|
||||||
id, _ := head.ID()
|
|
||||||
chain = append(chain, id)
|
|
||||||
}
|
|
||||||
|
|
||||||
prevID, hasPrev = head.PreviousID()
|
prevID, hasPrev = head.PreviousID()
|
||||||
}
|
}
|
||||||
|
|
||||||
return chain, rngs, nil
|
return chain, nil
|
||||||
}
|
}
|
||||||
|
|
103
pkg/services/object/get/assembler_range.go
Normal file
103
pkg/services/object/get/assembler_range.go
Normal file
|
@ -0,0 +1,103 @@
|
||||||
|
package getsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (a *assembler) assembleObjectByChildrenListRange(ctx context.Context, childrenIDs []oid.ID, writer ObjectWriter) error {
|
||||||
|
if err := a.assemblePayloadInReverseRange(ctx, writer, childrenIDs[len(childrenIDs)-1]); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return writer.WriteChunk(ctx, a.parentObject.Payload())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *assembler) assemleObjectByPreviousIDInReverseRange(ctx context.Context, prevID oid.ID, writer ObjectWriter) error {
|
||||||
|
if err := a.assemblePayloadInReverseRange(ctx, writer, prevID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := writer.WriteChunk(ctx, a.parentObject.Payload()); err != nil { // last part
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *assembler) assemblePayloadByObjectIDsRange(ctx context.Context, writer ObjectWriter, partIDs []oid.ID, partRanges []objectSDK.Range) error {
|
||||||
|
for i := range partIDs {
|
||||||
|
_, err := a.getChildObject(ctx, partIDs[i], &partRanges[i], false, writer)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *assembler) assemblePayloadInReverseRange(ctx context.Context, writer ObjectWriter, prevID oid.ID) error {
|
||||||
|
chain, rngs, err := a.buildChainRange(ctx, prevID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for left, right := 0, len(chain)-1; left < right; left, right = left+1, right-1 {
|
||||||
|
chain[left], chain[right] = chain[right], chain[left]
|
||||||
|
rngs[left], rngs[right] = rngs[right], rngs[left]
|
||||||
|
}
|
||||||
|
|
||||||
|
return a.assemblePayloadByObjectIDsRange(ctx, writer, chain, rngs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *assembler) buildChainRange(ctx context.Context, prevID oid.ID) ([]oid.ID, []objectSDK.Range, error) {
|
||||||
|
var (
|
||||||
|
chain []oid.ID
|
||||||
|
rngs []objectSDK.Range
|
||||||
|
from = a.rng.GetOffset()
|
||||||
|
to = from + a.rng.GetLength()
|
||||||
|
|
||||||
|
hasPrev = true
|
||||||
|
)
|
||||||
|
|
||||||
|
// fill the chain end-to-start
|
||||||
|
for hasPrev {
|
||||||
|
if a.currentOffset <= from {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
head, err := a.objGetter.HeadObject(ctx, prevID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
if !a.isChild(head) {
|
||||||
|
return nil, nil, errParentAddressDiffers
|
||||||
|
}
|
||||||
|
|
||||||
|
sz := head.PayloadSize()
|
||||||
|
|
||||||
|
a.currentOffset -= sz
|
||||||
|
|
||||||
|
if a.currentOffset < to {
|
||||||
|
off := uint64(0)
|
||||||
|
if from > a.currentOffset {
|
||||||
|
off = from - a.currentOffset
|
||||||
|
sz -= from - a.currentOffset
|
||||||
|
}
|
||||||
|
|
||||||
|
if to < a.currentOffset+off+sz {
|
||||||
|
sz = to - off - a.currentOffset
|
||||||
|
}
|
||||||
|
|
||||||
|
index := len(rngs)
|
||||||
|
rngs = append(rngs, objectSDK.Range{})
|
||||||
|
rngs[index].SetOffset(off)
|
||||||
|
rngs[index].SetLength(sz)
|
||||||
|
|
||||||
|
id, _ := head.ID()
|
||||||
|
chain = append(chain, id)
|
||||||
|
}
|
||||||
|
|
||||||
|
prevID, hasPrev = head.PreviousID()
|
||||||
|
}
|
||||||
|
|
||||||
|
return chain, rngs, nil
|
||||||
|
}
|
Loading…
Add table
Reference in a new issue