package frostfs import ( "context" "encoding/json" "errors" "fmt" "io" "time" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) // PartInfo is upload information about part. type PartInfo struct { Key string `json:"key"` UploadID string `json:"uploadId"` Number int `json:"number"` OID oid.ID `json:"oid"` Size uint64 `json:"size"` ETag string `json:"etag"` MD5 string `json:"md5"` Created time.Time `json:"created"` } type GetFrostFSParams struct { // payload range Off, Ln uint64 Addr oid.Address } type PartObj struct { OID oid.ID Size uint64 } type readerInitiator interface { InitFrostFSObjectPayloadReader(ctx context.Context, p GetFrostFSParams) (io.ReadCloser, error) } // MultiObjectReader implements io.Reader of payloads of the object list stored in the FrostFS network. type MultiObjectReader struct { ctx context.Context layer readerInitiator startPartOffset uint64 endPartLength uint64 prm GetFrostFSParams curIndex int curReader io.ReadCloser parts []PartObj } type MultiObjectReaderConfig struct { Initiator readerInitiator // the offset of complete object and total size to read Off, Ln uint64 Addr oid.Address Parts []PartObj } var ( errOffsetIsOutOfRange = errors.New("offset is out of payload range") errLengthIsOutOfRange = errors.New("length is out of payload range") errEmptyPartsList = errors.New("empty parts list") errorZeroRangeLength = errors.New("zero range length") ) func (x *FrostFS) InitMultiObjectReader(ctx context.Context, p handler.PrmInitMultiObjectReader) (io.Reader, error) { combinedObj, err := x.GetObject(ctx, handler.PrmObjectGet{ PrmAuth: handler.PrmAuth{BearerToken: p.Bearer}, Address: p.Addr, }) if err != nil { return nil, fmt.Errorf("get combined object '%s': %w", p.Addr.Object().EncodeToString(), err) } var parts []*PartInfo if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil { return nil, fmt.Errorf("unmarshal combined object parts: %w", err) } objParts := make([]PartObj, len(parts)) for i, part := range parts { objParts[i] = PartObj{ OID: part.OID, Size: part.Size, } } return NewMultiObjectReader(ctx, MultiObjectReaderConfig{ Initiator: x, Off: p.Off, Ln: p.Ln, Parts: objParts, Addr: p.Addr, }) } func NewMultiObjectReader(ctx context.Context, cfg MultiObjectReaderConfig) (*MultiObjectReader, error) { if len(cfg.Parts) == 0 { return nil, errEmptyPartsList } r := &MultiObjectReader{ ctx: ctx, layer: cfg.Initiator, prm: GetFrostFSParams{ Addr: cfg.Addr, }, parts: cfg.Parts, } if cfg.Off+cfg.Ln == 0 { return r, nil } if cfg.Off > 0 && cfg.Ln == 0 { return nil, errorZeroRangeLength } startPartIndex, startPartOffset := findStartPart(cfg) if startPartIndex == -1 { return nil, errOffsetIsOutOfRange } r.startPartOffset = startPartOffset endPartIndex, endPartLength := findEndPart(cfg) if endPartIndex == -1 { return nil, errLengthIsOutOfRange } r.endPartLength = endPartLength r.parts = cfg.Parts[startPartIndex : endPartIndex+1] return r, nil } func findStartPart(cfg MultiObjectReaderConfig) (index int, offset uint64) { position := cfg.Off for i, part := range cfg.Parts { // Strict inequality when searching for start position to avoid reading zero length part. if position < part.Size { return i, position } position -= part.Size } return -1, 0 } func findEndPart(cfg MultiObjectReaderConfig) (index int, length uint64) { position := cfg.Off + cfg.Ln for i, part := range cfg.Parts { // Non-strict inequality when searching for end position to avoid out of payload range error. if position <= part.Size { return i, position } position -= part.Size } return -1, 0 } func (x *MultiObjectReader) Read(p []byte) (n int, err error) { if x.curReader != nil { n, err = x.curReader.Read(p) if err != nil { if closeErr := x.curReader.Close(); closeErr != nil { return n, fmt.Errorf("%w (close err: %v)", err, closeErr) } } if !errors.Is(err, io.EOF) { return n, err } x.curIndex++ } if x.curIndex == len(x.parts) { return n, io.EOF } x.prm.Addr.SetObject(x.parts[x.curIndex].OID) if x.curIndex == 0 { x.prm.Off = x.startPartOffset x.prm.Ln = x.parts[x.curIndex].Size - x.startPartOffset } if x.curIndex == len(x.parts)-1 { x.prm.Ln = x.endPartLength - x.prm.Off } x.curReader, err = x.layer.InitFrostFSObjectPayloadReader(x.ctx, x.prm) if err != nil { return n, fmt.Errorf("init payload reader for the next part: %w", err) } x.prm.Off = 0 x.prm.Ln = 0 next, err := x.Read(p[n:]) return n + next, err } // InitFrostFSObjectPayloadReader initializes payload reader of the FrostFS object. // Zero range corresponds to full payload (panics if only offset is set). func (x *FrostFS) InitFrostFSObjectPayloadReader(ctx context.Context, p GetFrostFSParams) (io.ReadCloser, error) { var prmAuth handler.PrmAuth if p.Off+p.Ln != 0 { prm := handler.PrmObjectRange{ PrmAuth: prmAuth, PayloadRange: [2]uint64{p.Off, p.Ln}, Address: p.Addr, } return x.RangeObject(ctx, prm) } prm := handler.PrmObjectGet{ PrmAuth: prmAuth, Address: p.Addr, } res, err := x.GetObject(ctx, prm) if err != nil { return nil, err } return res.Payload, nil }