242 lines
5.3 KiB
Go
242 lines
5.3 KiB
Go
|
package frostfs
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"encoding/json"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"time"
|
||
|
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||
|
)
|
||
|
|
||
|
// PartInfo is upload information about part.
|
||
|
type PartInfo struct {
|
||
|
Key string `json:"key"`
|
||
|
UploadID string `json:"uploadId"`
|
||
|
Number int `json:"number"`
|
||
|
OID oid.ID `json:"oid"`
|
||
|
Size uint64 `json:"size"`
|
||
|
ETag string `json:"etag"`
|
||
|
MD5 string `json:"md5"`
|
||
|
Created time.Time `json:"created"`
|
||
|
}
|
||
|
|
||
|
type GetFrostFSParams struct {
|
||
|
// payload range
|
||
|
Off, Ln uint64
|
||
|
Addr oid.Address
|
||
|
}
|
||
|
|
||
|
type PartObj struct {
|
||
|
OID oid.ID
|
||
|
Size uint64
|
||
|
}
|
||
|
|
||
|
type readerInitiator interface {
|
||
|
InitFrostFSObjectPayloadReader(ctx context.Context, p GetFrostFSParams) (io.ReadCloser, error)
|
||
|
}
|
||
|
|
||
|
// MultiObjectReader implements io.Reader of payloads of the object list stored in the FrostFS network.
|
||
|
type MultiObjectReader struct {
|
||
|
ctx context.Context
|
||
|
|
||
|
layer readerInitiator
|
||
|
|
||
|
startPartOffset uint64
|
||
|
endPartLength uint64
|
||
|
|
||
|
prm GetFrostFSParams
|
||
|
|
||
|
curIndex int
|
||
|
curReader io.ReadCloser
|
||
|
|
||
|
parts []PartObj
|
||
|
}
|
||
|
|
||
|
type MultiObjectReaderConfig struct {
|
||
|
Initiator readerInitiator
|
||
|
|
||
|
// the offset of complete object and total size to read
|
||
|
Off, Ln uint64
|
||
|
|
||
|
Addr oid.Address
|
||
|
Parts []PartObj
|
||
|
}
|
||
|
|
||
|
var (
|
||
|
errOffsetIsOutOfRange = errors.New("offset is out of payload range")
|
||
|
errLengthIsOutOfRange = errors.New("length is out of payload range")
|
||
|
errEmptyPartsList = errors.New("empty parts list")
|
||
|
errorZeroRangeLength = errors.New("zero range length")
|
||
|
)
|
||
|
|
||
|
func (x *FrostFS) InitMultiObjectReader(ctx context.Context, p handler.PrmInitMultiObjectReader) (io.Reader, error) {
|
||
|
combinedObj, err := x.GetObject(ctx, handler.PrmObjectGet{
|
||
|
PrmAuth: handler.PrmAuth{BearerToken: p.Bearer},
|
||
|
Address: p.Addr,
|
||
|
})
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("get combined object '%s': %w", p.Addr.Object().EncodeToString(), err)
|
||
|
}
|
||
|
|
||
|
var parts []*PartInfo
|
||
|
if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
|
||
|
return nil, fmt.Errorf("unmarshal combined object parts: %w", err)
|
||
|
}
|
||
|
|
||
|
objParts := make([]PartObj, len(parts))
|
||
|
for i, part := range parts {
|
||
|
objParts[i] = PartObj{
|
||
|
OID: part.OID,
|
||
|
Size: part.Size,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return NewMultiObjectReader(ctx, MultiObjectReaderConfig{
|
||
|
Initiator: x,
|
||
|
Off: p.Off,
|
||
|
Ln: p.Ln,
|
||
|
Parts: objParts,
|
||
|
Addr: p.Addr,
|
||
|
})
|
||
|
}
|
||
|
|
||
|
func NewMultiObjectReader(ctx context.Context, cfg MultiObjectReaderConfig) (*MultiObjectReader, error) {
|
||
|
if len(cfg.Parts) == 0 {
|
||
|
return nil, errEmptyPartsList
|
||
|
}
|
||
|
|
||
|
r := &MultiObjectReader{
|
||
|
ctx: ctx,
|
||
|
layer: cfg.Initiator,
|
||
|
prm: GetFrostFSParams{
|
||
|
Addr: cfg.Addr,
|
||
|
},
|
||
|
parts: cfg.Parts,
|
||
|
}
|
||
|
|
||
|
if cfg.Off+cfg.Ln == 0 {
|
||
|
return r, nil
|
||
|
}
|
||
|
|
||
|
if cfg.Off > 0 && cfg.Ln == 0 {
|
||
|
return nil, errorZeroRangeLength
|
||
|
}
|
||
|
|
||
|
startPartIndex, startPartOffset := findStartPart(cfg)
|
||
|
if startPartIndex == -1 {
|
||
|
return nil, errOffsetIsOutOfRange
|
||
|
}
|
||
|
r.startPartOffset = startPartOffset
|
||
|
|
||
|
endPartIndex, endPartLength := findEndPart(cfg)
|
||
|
if endPartIndex == -1 {
|
||
|
return nil, errLengthIsOutOfRange
|
||
|
}
|
||
|
r.endPartLength = endPartLength
|
||
|
|
||
|
r.parts = cfg.Parts[startPartIndex : endPartIndex+1]
|
||
|
|
||
|
return r, nil
|
||
|
}
|
||
|
|
||
|
func findStartPart(cfg MultiObjectReaderConfig) (index int, offset uint64) {
|
||
|
position := cfg.Off
|
||
|
for i, part := range cfg.Parts {
|
||
|
// Strict inequality when searching for start position to avoid reading zero length part.
|
||
|
if position < part.Size {
|
||
|
return i, position
|
||
|
}
|
||
|
position -= part.Size
|
||
|
}
|
||
|
|
||
|
return -1, 0
|
||
|
}
|
||
|
|
||
|
func findEndPart(cfg MultiObjectReaderConfig) (index int, length uint64) {
|
||
|
position := cfg.Off + cfg.Ln
|
||
|
for i, part := range cfg.Parts {
|
||
|
// Non-strict inequality when searching for end position to avoid out of payload range error.
|
||
|
if position <= part.Size {
|
||
|
return i, position
|
||
|
}
|
||
|
position -= part.Size
|
||
|
}
|
||
|
|
||
|
return -1, 0
|
||
|
}
|
||
|
|
||
|
func (x *MultiObjectReader) Read(p []byte) (n int, err error) {
|
||
|
if x.curReader != nil {
|
||
|
n, err = x.curReader.Read(p)
|
||
|
if err != nil {
|
||
|
if closeErr := x.curReader.Close(); closeErr != nil {
|
||
|
return n, fmt.Errorf("%w (close err: %v)", err, closeErr)
|
||
|
}
|
||
|
}
|
||
|
if !errors.Is(err, io.EOF) {
|
||
|
return n, err
|
||
|
}
|
||
|
|
||
|
x.curIndex++
|
||
|
}
|
||
|
|
||
|
if x.curIndex == len(x.parts) {
|
||
|
return n, io.EOF
|
||
|
}
|
||
|
|
||
|
x.prm.Addr.SetObject(x.parts[x.curIndex].OID)
|
||
|
|
||
|
if x.curIndex == 0 {
|
||
|
x.prm.Off = x.startPartOffset
|
||
|
x.prm.Ln = x.parts[x.curIndex].Size - x.startPartOffset
|
||
|
}
|
||
|
|
||
|
if x.curIndex == len(x.parts)-1 {
|
||
|
x.prm.Ln = x.endPartLength - x.prm.Off
|
||
|
}
|
||
|
|
||
|
x.curReader, err = x.layer.InitFrostFSObjectPayloadReader(x.ctx, x.prm)
|
||
|
if err != nil {
|
||
|
return n, fmt.Errorf("init payload reader for the next part: %w", err)
|
||
|
}
|
||
|
|
||
|
x.prm.Off = 0
|
||
|
x.prm.Ln = 0
|
||
|
|
||
|
next, err := x.Read(p[n:])
|
||
|
|
||
|
return n + next, err
|
||
|
}
|
||
|
|
||
|
// InitFrostFSObjectPayloadReader initializes payload reader of the FrostFS object.
|
||
|
// Zero range corresponds to full payload (panics if only offset is set).
|
||
|
func (x *FrostFS) InitFrostFSObjectPayloadReader(ctx context.Context, p GetFrostFSParams) (io.ReadCloser, error) {
|
||
|
var prmAuth handler.PrmAuth
|
||
|
|
||
|
if p.Off+p.Ln != 0 {
|
||
|
prm := handler.PrmObjectRange{
|
||
|
PrmAuth: prmAuth,
|
||
|
PayloadRange: [2]uint64{p.Off, p.Ln},
|
||
|
Address: p.Addr,
|
||
|
}
|
||
|
|
||
|
return x.RangeObject(ctx, prm)
|
||
|
}
|
||
|
|
||
|
prm := handler.PrmObjectGet{
|
||
|
PrmAuth: prmAuth,
|
||
|
Address: p.Addr,
|
||
|
}
|
||
|
|
||
|
res, err := x.GetObject(ctx, prm)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return res.Payload, nil
|
||
|
}
|