[#142] Fix multipart-objects download
Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
parent
7e80f0cce6
commit
ea38b4e6c8
16 changed files with 761 additions and 197 deletions
|
@ -18,11 +18,10 @@ import (
|
|||
|
||||
v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/services"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
|
@ -401,7 +400,7 @@ func (a *app) setHealthStatus() {
|
|||
}
|
||||
|
||||
func (a *app) Serve() {
|
||||
handler := handler.New(a.AppParams(), a.settings, tree.NewTree(services.NewPoolWrapper(a.treePool)))
|
||||
handler := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool)))
|
||||
|
||||
// Configure router.
|
||||
a.configureRouter(handler)
|
||||
|
|
1
go.mod
1
go.mod
|
@ -9,6 +9,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
||||
github.com/bluele/gcache v0.0.2
|
||||
github.com/fasthttp/router v1.4.1
|
||||
github.com/minio/sio v0.4.1
|
||||
github.com/nspcc-dev/neo-go v0.106.2
|
||||
github.com/prometheus/client_golang v1.19.0
|
||||
github.com/prometheus/client_model v0.5.0
|
||||
|
|
2
go.sum
2
go.sum
|
@ -576,6 +576,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5
|
|||
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
|
||||
github.com/maxbrunsfeld/counterfeiter/v6 v6.2.2/go.mod h1:eD9eIE7cdwcMi9rYluz88Jz2VyhSmden33/aXg4oVIY=
|
||||
github.com/miekg/pkcs11 v1.0.3/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs=
|
||||
github.com/minio/sio v0.4.1 h1:EMe3YBC1nf+sRQia65Rutxi+Z554XPV0dt8BIBA+a/0=
|
||||
github.com/minio/sio v0.4.1/go.mod h1:oBSjJeGbBdRMZZwna07sX9EFzZy+ywu5aofRiV1g79I=
|
||||
github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible/go.mod h1:8AuVvqP/mXw1px98n46wfvcGfQ4ci2FwoAjKYxuo3Z4=
|
||||
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
|
||||
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
|
||||
|
|
41
internal/data/object.go
Normal file
41
internal/data/object.go
Normal file
|
@ -0,0 +1,41 @@
|
|||
package data
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
type (
|
||||
ObjectInfo struct {
|
||||
ID oid.ID
|
||||
CID cid.ID
|
||||
|
||||
Bucket string
|
||||
Name string
|
||||
Size uint64
|
||||
Headers map[string]string
|
||||
}
|
||||
|
||||
// PartInfo is upload information about part.
|
||||
PartInfo struct {
|
||||
Key string `json:"key"`
|
||||
UploadID string `json:"uploadId"`
|
||||
Number int `json:"number"`
|
||||
OID oid.ID `json:"oid"`
|
||||
Size uint64 `json:"size"`
|
||||
ETag string `json:"etag"`
|
||||
MD5 string `json:"md5"`
|
||||
Created time.Time `json:"created"`
|
||||
}
|
||||
)
|
||||
|
||||
// Address returns object address.
|
||||
func (o *ObjectInfo) Address() oid.Address {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(o.CID)
|
||||
addr.SetObject(o.ID)
|
||||
|
||||
return addr
|
||||
}
|
|
@ -11,6 +11,7 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
|
@ -45,13 +46,13 @@ func (h *Handler) DownloadByAttribute(c *fasthttp.RequestCtx) {
|
|||
h.byAttribute(c, h.receiveFile)
|
||||
}
|
||||
|
||||
func (h *Handler) search(ctx context.Context, cnrID *cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) {
|
||||
func (h *Handler) search(ctx context.Context, cnrID *cid.ID, key, val string, op object.SearchMatchType) (frostfs.ResObjectSearch, error) {
|
||||
filters := object.NewSearchFilters()
|
||||
filters.AddRootFilter()
|
||||
filters.AddFilter(key, val, op)
|
||||
|
||||
prm := PrmObjectSearch{
|
||||
PrmAuth: PrmAuth{
|
||||
prm := frostfs.PrmObjectSearch{
|
||||
PrmAuth: frostfs.PrmAuth{
|
||||
BearerToken: bearerToken(ctx),
|
||||
},
|
||||
Container: *cnrID,
|
||||
|
@ -153,8 +154,8 @@ func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
|
|||
}
|
||||
|
||||
func (h *Handler) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error {
|
||||
prm := PrmObjectGet{
|
||||
PrmAuth: PrmAuth{
|
||||
prm := frostfs.PrmObjectGet{
|
||||
PrmAuth: frostfs.PrmAuth{
|
||||
BearerToken: btoken,
|
||||
},
|
||||
Address: addr,
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"io"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||
|
@ -58,7 +59,7 @@ func (t *TestFrostFS) AllowUserOperation(cnrID cid.ID, userID user.ID, op acl.Op
|
|||
t.accessList[fmt.Sprintf("%s/%s/%s/%s", cnrID, userID, op, objID)] = true
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) Container(_ context.Context, prm PrmContainer) (*container.Container, error) {
|
||||
func (t *TestFrostFS) Container(_ context.Context, prm frostfs.PrmContainer) (*container.Container, error) {
|
||||
for k, v := range t.containers {
|
||||
if k == prm.ContainerID.EncodeToString() {
|
||||
return v, nil
|
||||
|
@ -85,7 +86,7 @@ func (t *TestFrostFS) retrieveObject(addr oid.Address, btoken *bearer.Token) (*o
|
|||
owner := t.requestOwner(btoken)
|
||||
|
||||
if !t.isAllowed(addr.Container(), owner, acl.OpObjectGet, addr.Object()) {
|
||||
return nil, ErrAccessDenied
|
||||
return nil, frostfs.ErrAccessDenied
|
||||
}
|
||||
|
||||
return obj, nil
|
||||
|
@ -94,23 +95,23 @@ func (t *TestFrostFS) retrieveObject(addr oid.Address, btoken *bearer.Token) (*o
|
|||
return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, addr)
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) HeadObject(_ context.Context, prm PrmObjectHead) (*object.Object, error) {
|
||||
func (t *TestFrostFS) HeadObject(_ context.Context, prm frostfs.PrmObjectHead) (*object.Object, error) {
|
||||
return t.retrieveObject(prm.Address, prm.BearerToken)
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) GetObject(_ context.Context, prm PrmObjectGet) (*Object, error) {
|
||||
func (t *TestFrostFS) GetObject(_ context.Context, prm frostfs.PrmObjectGet) (*frostfs.Object, error) {
|
||||
obj, err := t.retrieveObject(prm.Address, prm.BearerToken)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Object{
|
||||
return &frostfs.Object{
|
||||
Header: *obj,
|
||||
Payload: io.NopCloser(bytes.NewReader(obj.Payload())),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) RangeObject(_ context.Context, prm PrmObjectRange) (io.ReadCloser, error) {
|
||||
func (t *TestFrostFS) RangeObject(_ context.Context, prm frostfs.PrmObjectRange) (io.ReadCloser, error) {
|
||||
obj, err := t.retrieveObject(prm.Address, prm.BearerToken)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -121,7 +122,7 @@ func (t *TestFrostFS) RangeObject(_ context.Context, prm PrmObjectRange) (io.Rea
|
|||
return io.NopCloser(bytes.NewReader(payload)), nil
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) {
|
||||
func (t *TestFrostFS) CreateObject(_ context.Context, prm frostfs.PrmObjectCreate) (oid.ID, error) {
|
||||
b := make([]byte, 32)
|
||||
if _, err := io.ReadFull(rand.Reader, b); err != nil {
|
||||
return oid.ID{}, err
|
||||
|
@ -158,7 +159,7 @@ func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.
|
|||
owner := t.requestOwner(prm.BearerToken)
|
||||
|
||||
if !t.isAllowed(cnrID, owner, acl.OpObjectPut, objID) {
|
||||
return oid.ID{}, ErrAccessDenied
|
||||
return oid.ID{}, frostfs.ErrAccessDenied
|
||||
}
|
||||
|
||||
addr := newAddress(cnrID, objID)
|
||||
|
@ -195,9 +196,9 @@ func (r *resObjectSearchMock) Iterate(f func(oid.ID) bool) error {
|
|||
|
||||
func (r *resObjectSearchMock) Close() {}
|
||||
|
||||
func (t *TestFrostFS) SearchObjects(_ context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
|
||||
func (t *TestFrostFS) SearchObjects(_ context.Context, prm frostfs.PrmObjectSearch) (frostfs.ResObjectSearch, error) {
|
||||
if !t.isAllowed(prm.Container, t.requestOwner(prm.BearerToken), acl.OpObjectSearch, oid.ID{}) {
|
||||
return nil, ErrAccessDenied
|
||||
return nil, frostfs.ErrAccessDenied
|
||||
}
|
||||
|
||||
cidStr := prm.Container.EncodeToString()
|
||||
|
|
|
@ -12,10 +12,10 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
|
@ -34,110 +34,14 @@ type Config interface {
|
|||
NamespaceHeader() string
|
||||
}
|
||||
|
||||
// PrmContainer groups parameters of FrostFS.Container operation.
|
||||
type PrmContainer struct {
|
||||
// Container identifier.
|
||||
ContainerID cid.ID
|
||||
}
|
||||
|
||||
// PrmAuth groups authentication parameters for the FrostFS operation.
|
||||
type PrmAuth struct {
|
||||
// Bearer token to be used for the operation. Overlaps PrivateKey. Optional.
|
||||
BearerToken *bearer.Token
|
||||
}
|
||||
|
||||
// PrmObjectHead groups parameters of FrostFS.HeadObject operation.
|
||||
type PrmObjectHead struct {
|
||||
// Authentication parameters.
|
||||
PrmAuth
|
||||
|
||||
// Address to read the object header from.
|
||||
Address oid.Address
|
||||
}
|
||||
|
||||
// PrmObjectGet groups parameters of FrostFS.GetObject operation.
|
||||
type PrmObjectGet struct {
|
||||
// Authentication parameters.
|
||||
PrmAuth
|
||||
|
||||
// Address to read the object header from.
|
||||
Address oid.Address
|
||||
}
|
||||
|
||||
// PrmObjectRange groups parameters of FrostFS.RangeObject operation.
|
||||
type PrmObjectRange struct {
|
||||
// Authentication parameters.
|
||||
PrmAuth
|
||||
|
||||
// Address to read the object header from.
|
||||
Address oid.Address
|
||||
|
||||
// Offset-length range of the object payload to be read.
|
||||
PayloadRange [2]uint64
|
||||
}
|
||||
|
||||
// Object represents FrostFS object.
|
||||
type Object struct {
|
||||
// Object header (doesn't contain payload).
|
||||
Header object.Object
|
||||
|
||||
// Object payload part encapsulated in io.Reader primitive.
|
||||
// Returns ErrAccessDenied on read access violation.
|
||||
Payload io.ReadCloser
|
||||
}
|
||||
|
||||
// PrmObjectCreate groups parameters of FrostFS.CreateObject operation.
|
||||
type PrmObjectCreate struct {
|
||||
// Authentication parameters.
|
||||
PrmAuth
|
||||
|
||||
Object *object.Object
|
||||
|
||||
// Object payload encapsulated in io.Reader primitive.
|
||||
Payload io.Reader
|
||||
|
||||
// Enables client side object preparing.
|
||||
ClientCut bool
|
||||
|
||||
// Disables using Tillich-Zémor hash for payload.
|
||||
WithoutHomomorphicHash bool
|
||||
|
||||
// Sets max buffer size to read payload.
|
||||
BufferMaxSize uint64
|
||||
}
|
||||
|
||||
// PrmObjectSearch groups parameters of FrostFS.sear SearchObjects operation.
|
||||
type PrmObjectSearch struct {
|
||||
// Authentication parameters.
|
||||
PrmAuth
|
||||
|
||||
// Container to select the objects from.
|
||||
Container cid.ID
|
||||
|
||||
Filters object.SearchFilters
|
||||
}
|
||||
|
||||
type ResObjectSearch interface {
|
||||
Read(buf []oid.ID) (int, error)
|
||||
Iterate(f func(oid.ID) bool) error
|
||||
Close()
|
||||
}
|
||||
|
||||
var (
|
||||
// ErrAccessDenied is returned from FrostFS in case of access violation.
|
||||
ErrAccessDenied = errors.New("access denied")
|
||||
// ErrGatewayTimeout is returned from FrostFS in case of timeout, deadline exceeded etc.
|
||||
ErrGatewayTimeout = errors.New("gateway timeout")
|
||||
)
|
||||
|
||||
// FrostFS represents virtual connection to FrostFS network.
|
||||
type FrostFS interface {
|
||||
Container(context.Context, PrmContainer) (*container.Container, error)
|
||||
HeadObject(context.Context, PrmObjectHead) (*object.Object, error)
|
||||
GetObject(context.Context, PrmObjectGet) (*Object, error)
|
||||
RangeObject(context.Context, PrmObjectRange) (io.ReadCloser, error)
|
||||
CreateObject(context.Context, PrmObjectCreate) (oid.ID, error)
|
||||
SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error)
|
||||
Container(context.Context, frostfs.PrmContainer) (*container.Container, error)
|
||||
HeadObject(context.Context, frostfs.PrmObjectHead) (*object.Object, error)
|
||||
GetObject(context.Context, frostfs.PrmObjectGet) (*frostfs.Object, error)
|
||||
RangeObject(context.Context, frostfs.PrmObjectRange) (io.ReadCloser, error)
|
||||
CreateObject(context.Context, frostfs.PrmObjectCreate) (oid.ID, error)
|
||||
SearchObjects(context.Context, frostfs.PrmObjectSearch) (frostfs.ResObjectSearch, error)
|
||||
utils.EpochInfoFetcher
|
||||
}
|
||||
|
||||
|
@ -177,7 +81,7 @@ func New(params *AppParams, config Config, tree *tree.Tree) *Handler {
|
|||
|
||||
// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||
// prepares request and object address to it.
|
||||
func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address, *data.BucketInfo)) {
|
||||
var (
|
||||
idCnr, _ = c.UserValue("cid").(string)
|
||||
idObj, _ = c.UserValue("oid").(string)
|
||||
|
@ -203,12 +107,12 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
|
|||
addr.SetContainer(bktInfo.CID)
|
||||
addr.SetObject(*objID)
|
||||
|
||||
f(ctx, *h.newRequest(c, log), addr)
|
||||
f(ctx, *h.newRequest(c, log), addr, bktInfo)
|
||||
}
|
||||
|
||||
// byObjectName is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||
// prepares request and object address to it.
|
||||
func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context, request, oid.Address, *data.BucketInfo)) {
|
||||
var (
|
||||
bucketname = req.UserValue("cid").(string)
|
||||
key = req.UserValue("oid").(string)
|
||||
|
@ -250,11 +154,11 @@ func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context,
|
|||
addr.SetContainer(bktInfo.CID)
|
||||
addr.SetObject(foundOid.OID)
|
||||
|
||||
f(ctx, *h.newRequest(req, log), addr)
|
||||
f(ctx, *h.newRequest(req, log), addr, bktInfo)
|
||||
}
|
||||
|
||||
// byAttribute is a wrapper similar to byAddress.
|
||||
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address, *data.BucketInfo)) {
|
||||
scid, _ := c.UserValue("cid").(string)
|
||||
key, _ := c.UserValue("attr_key").(string)
|
||||
val, _ := c.UserValue("attr_val").(string)
|
||||
|
@ -311,7 +215,7 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re
|
|||
addrObj.SetContainer(bktInfo.CID)
|
||||
addrObj.SetObject(buf[0])
|
||||
|
||||
f(ctx, *h.newRequest(c, log), addrObj)
|
||||
f(ctx, *h.newRequest(c, log), addrObj, bktInfo)
|
||||
}
|
||||
|
||||
// resolveContainer decode container id, if it's not a valid container id
|
||||
|
@ -359,7 +263,7 @@ func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *
|
|||
}
|
||||
|
||||
func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.BucketInfo, error) {
|
||||
prm := PrmContainer{ContainerID: cnrID}
|
||||
prm := frostfs.PrmContainer{ContainerID: cnrID}
|
||||
res, err := h.frostfs.Container(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get frostfs container '%s': %w", cnrID.String(), err)
|
||||
|
|
|
@ -7,7 +7,9 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -24,13 +26,13 @@ const (
|
|||
hdrContainerID = "X-Container-Id"
|
||||
)
|
||||
|
||||
func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid.Address) {
|
||||
func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid.Address, _ *data.BucketInfo) {
|
||||
var start = time.Now()
|
||||
|
||||
btoken := bearerToken(ctx)
|
||||
|
||||
prm := PrmObjectHead{
|
||||
PrmAuth: PrmAuth{
|
||||
prm := frostfs.PrmObjectHead{
|
||||
PrmAuth: frostfs.PrmAuth{
|
||||
BearerToken: btoken,
|
||||
},
|
||||
Address: objectAddress,
|
||||
|
@ -74,8 +76,8 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
|
|||
|
||||
if len(contentType) == 0 {
|
||||
contentType, _, err = readContentType(obj.PayloadSize(), func(sz uint64) (io.Reader, error) {
|
||||
prmRange := PrmObjectRange{
|
||||
PrmAuth: PrmAuth{
|
||||
prmRange := frostfs.PrmObjectRange{
|
||||
PrmAuth: frostfs.PrmAuth{
|
||||
BearerToken: btoken,
|
||||
},
|
||||
Address: objectAddress,
|
||||
|
|
|
@ -1,13 +1,30 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/multipart"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/minio/sio"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
FrostFSSystemMetadataPrefix = "S3-"
|
||||
AttributeEncryptionAlgorithm = FrostFSSystemMetadataPrefix + "Algorithm"
|
||||
AttributeHMACSalt = FrostFSSystemMetadataPrefix + "HMAC-Salt"
|
||||
AttributeHMACKey = FrostFSSystemMetadataPrefix + "HMAC-Key"
|
||||
AttributeMultipartObjectSize = FrostFSSystemMetadataPrefix + "Multipart-Object-Size"
|
||||
)
|
||||
|
||||
// MultipartFile provides standard ReadCloser interface and also allows one to
|
||||
// get file name, it's used for multipart uploads.
|
||||
type MultipartFile interface {
|
||||
|
@ -45,3 +62,145 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF
|
|||
return part, nil
|
||||
}
|
||||
}
|
||||
|
||||
type getParams struct {
|
||||
// payload range
|
||||
off, ln uint64
|
||||
|
||||
objInfo *data.ObjectInfo
|
||||
bktInfo *data.BucketInfo
|
||||
}
|
||||
|
||||
// getPayload returns initial payload if object is not multipart else composes new reader with parts data.
|
||||
func (h *Handler) getPayload(p getPayloadParams) (io.ReadCloser, uint64, error) {
|
||||
sizeValue, ok := p.attrs[AttributeMultipartObjectSize]
|
||||
if !ok {
|
||||
return p.obj.Payload, p.obj.Header.PayloadSize(), nil
|
||||
}
|
||||
cid, ok := p.obj.Header.ContainerID()
|
||||
if !ok {
|
||||
return nil, 0, errors.New("no container id set")
|
||||
}
|
||||
oid, ok := p.obj.Header.ID()
|
||||
if !ok {
|
||||
return nil, 0, errors.New("no object id set")
|
||||
}
|
||||
size, err := strconv.ParseUint(sizeValue, 10, 64)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
ctx := p.req.RequestCtx
|
||||
params := getParams{
|
||||
off: 0,
|
||||
ln: 0,
|
||||
objInfo: &data.ObjectInfo{
|
||||
ID: oid,
|
||||
CID: cid,
|
||||
Bucket: p.bktinfo.Name,
|
||||
Name: p.attrs["FilePath"],
|
||||
Size: size,
|
||||
Headers: p.attrs,
|
||||
},
|
||||
bktInfo: p.bktinfo,
|
||||
}
|
||||
payload, err := h.initMultipartReader(ctx, params)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
return io.NopCloser(payload), size, nil
|
||||
}
|
||||
|
||||
func (h *Handler) initMultipartReader(ctx context.Context, p getParams) (io.Reader, error) {
|
||||
combinedObj, err := h.frostfs.GetObject(ctx, frostfs.PrmObjectGet{
|
||||
PrmAuth: frostfs.PrmAuth{BearerToken: bearerToken(ctx)},
|
||||
Address: p.objInfo.Address(),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get combined object '%s': %w", p.objInfo.ID.EncodeToString(), err)
|
||||
}
|
||||
|
||||
var parts []*data.PartInfo
|
||||
if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal combined object parts: %w", err)
|
||||
}
|
||||
|
||||
isEncrypted := FormEncryptionInfo(p.objInfo.Headers).Enabled
|
||||
objParts := make([]frostfs.PartObj, len(parts))
|
||||
for i, part := range parts {
|
||||
size := part.Size
|
||||
if isEncrypted {
|
||||
if size, err = sio.EncryptedSize(part.Size); err != nil {
|
||||
return nil, fmt.Errorf("compute encrypted size: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
objParts[i] = frostfs.PartObj{
|
||||
OID: part.OID,
|
||||
Size: size,
|
||||
}
|
||||
}
|
||||
|
||||
return frostfs.NewMultiObjectReader(ctx, frostfs.MultiObjectReaderConfig{
|
||||
Handler: h,
|
||||
Off: p.off,
|
||||
Ln: p.ln,
|
||||
Parts: objParts,
|
||||
BktInfo: p.bktInfo,
|
||||
Log: h.log,
|
||||
})
|
||||
}
|
||||
|
||||
// InitFrostFSObjectPayloadReader initializes payload reader of the FrostFS object.
|
||||
// Zero range corresponds to full payload (panics if only offset is set).
|
||||
func (h *Handler) InitFrostFSObjectPayloadReader(ctx context.Context, p frostfs.GetFrostFSParams) (io.Reader, error) {
|
||||
var prmAuth frostfs.PrmAuth
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(p.BktInfo.CID)
|
||||
addr.SetObject(p.Oid)
|
||||
|
||||
if p.Off+p.Ln != 0 {
|
||||
prm := frostfs.PrmObjectRange{
|
||||
PrmAuth: prmAuth,
|
||||
Container: p.BktInfo.CID,
|
||||
Object: p.Oid,
|
||||
PayloadRange: [2]uint64{p.Off, p.Ln},
|
||||
Address: addr,
|
||||
}
|
||||
|
||||
return h.frostfs.RangeObject(ctx, prm)
|
||||
}
|
||||
|
||||
prm := frostfs.PrmObjectGet{
|
||||
PrmAuth: prmAuth,
|
||||
Container: p.BktInfo.CID,
|
||||
Object: p.Oid,
|
||||
Address: addr,
|
||||
}
|
||||
|
||||
res, err := h.frostfs.GetObject(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.Payload, nil
|
||||
}
|
||||
|
||||
// ObjectEncryption stores parsed object encryption headers.
|
||||
type ObjectEncryption struct {
|
||||
Enabled bool
|
||||
Algorithm string
|
||||
HMACKey string
|
||||
HMACSalt string
|
||||
}
|
||||
|
||||
func FormEncryptionInfo(headers map[string]string) ObjectEncryption {
|
||||
algorithm := headers[AttributeEncryptionAlgorithm]
|
||||
return ObjectEncryption{
|
||||
Enabled: len(algorithm) > 0,
|
||||
Algorithm: algorithm,
|
||||
HMACKey: headers[AttributeHMACKey],
|
||||
HMACSalt: headers[AttributeHMACSalt],
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,11 +5,12 @@ import (
|
|||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"path"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -47,19 +48,24 @@ func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (str
|
|||
return http.DetectContentType(buf), buf, err // to not lose io.EOF
|
||||
}
|
||||
|
||||
func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oid.Address) {
|
||||
type getPayloadParams struct {
|
||||
obj *frostfs.Object
|
||||
req request
|
||||
bktinfo *data.BucketInfo
|
||||
attrs map[string]string
|
||||
}
|
||||
|
||||
func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.Address, bktInfo *data.BucketInfo) {
|
||||
var (
|
||||
err error
|
||||
dis = "inline"
|
||||
shouldDownload = req.QueryArgs().GetBool("download")
|
||||
start = time.Now()
|
||||
filename string
|
||||
)
|
||||
|
||||
prm := PrmObjectGet{
|
||||
PrmAuth: PrmAuth{
|
||||
prm := frostfs.PrmObjectGet{
|
||||
PrmAuth: frostfs.PrmAuth{
|
||||
BearerToken: bearerToken(ctx),
|
||||
},
|
||||
Address: objectAddress,
|
||||
Address: objAddress,
|
||||
}
|
||||
|
||||
rObj, err := h.frostfs.GetObject(ctx, prm)
|
||||
|
@ -70,51 +76,40 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
|||
|
||||
// we can't close reader in this function, so how to do it?
|
||||
|
||||
if req.Request.URI().QueryArgs().GetBool("download") {
|
||||
dis = "attachment"
|
||||
}
|
||||
attrs := makeAttributesMap(rObj.Header.Attributes())
|
||||
req.setAttributes(attrs)
|
||||
|
||||
payloadSize := rObj.Header.PayloadSize()
|
||||
req.setIDs(rObj.Header)
|
||||
req.setDisposition(shouldDownload, attrs[object.AttributeFileName])
|
||||
|
||||
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
||||
var contentType string
|
||||
for _, attr := range rObj.Header.Attributes() {
|
||||
key := attr.Key()
|
||||
val := attr.Value()
|
||||
if !isValidToken(key) || !isValidValue(val) {
|
||||
continue
|
||||
}
|
||||
|
||||
key = utils.BackwardTransformIfSystem(key)
|
||||
|
||||
req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
|
||||
switch key {
|
||||
case object.AttributeFileName:
|
||||
filename = val
|
||||
case object.AttributeTimestamp:
|
||||
value, err := strconv.ParseInt(val, 10, 64)
|
||||
if err != nil {
|
||||
req.log.Info(logs.CouldntParseCreationDate,
|
||||
zap.String("key", key),
|
||||
zap.String("val", val),
|
||||
if err = req.setTimestamp(attrs[object.AttributeTimestamp]); err != nil {
|
||||
req.log.Error(logs.CouldntParseCreationDate,
|
||||
zap.String("val", attrs[object.AttributeTimestamp]),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
req.Response.Header.Set(fasthttp.HeaderLastModified,
|
||||
time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
||||
case object.AttributeContentType:
|
||||
contentType = val
|
||||
}
|
||||
response.Error(req.RequestCtx, "failed to convert timestamp: "+err.Error(), fasthttp.StatusInternalServerError)
|
||||
}
|
||||
|
||||
idsToResponse(&req.Response, &rObj.Header)
|
||||
payloadParams := getPayloadParams{
|
||||
obj: rObj,
|
||||
req: req,
|
||||
bktinfo: bktInfo,
|
||||
attrs: attrs,
|
||||
}
|
||||
|
||||
payload, payloadSize, err := h.getPayload(payloadParams)
|
||||
if err != nil {
|
||||
req.handleFrostFSErr(err, start)
|
||||
return
|
||||
}
|
||||
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
||||
|
||||
contentType := attrs[object.AttributeContentType]
|
||||
if len(contentType) == 0 {
|
||||
// determine the Content-Type from the payload head
|
||||
var payloadHead []byte
|
||||
|
||||
contentType, payloadHead, err = readContentType(payloadSize, func(uint64) (io.Reader, error) {
|
||||
return rObj.Payload, nil
|
||||
return payload, nil
|
||||
})
|
||||
if err != nil && err != io.EOF {
|
||||
req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err))
|
||||
|
@ -126,16 +121,27 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
|||
var headReader io.Reader = bytes.NewReader(payloadHead)
|
||||
|
||||
if err != io.EOF { // otherwise, we've already read full payload
|
||||
headReader = io.MultiReader(headReader, rObj.Payload)
|
||||
headReader = io.MultiReader(headReader, payload)
|
||||
}
|
||||
|
||||
// note: we could do with io.Reader, but SetBodyStream below closes body stream
|
||||
// if it implements io.Closer and that's useful for us.
|
||||
rObj.Payload = readCloser{headReader, rObj.Payload}
|
||||
payload = readCloser{headReader, payload}
|
||||
}
|
||||
req.SetContentType(contentType)
|
||||
|
||||
req.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
||||
|
||||
req.Response.SetBodyStream(rObj.Payload, int(payloadSize))
|
||||
req.Response.SetBodyStream(payload, int(payloadSize))
|
||||
}
|
||||
|
||||
func makeAttributesMap(attrs []object.Attribute) map[string]string {
|
||||
attributes := make(map[string]string)
|
||||
for _, attr := range attrs {
|
||||
if !isValidToken(attr.Key()) || !isValidValue(attr.Value()) {
|
||||
continue
|
||||
}
|
||||
key := utils.BackwardTransformIfSystem(attr.Key())
|
||||
|
||||
attributes[key] = attr.Value()
|
||||
}
|
||||
return attributes
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
|
@ -131,8 +132,8 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
|
|||
obj.SetOwnerID(*h.ownerID)
|
||||
obj.SetAttributes(attributes...)
|
||||
|
||||
prm := PrmObjectCreate{
|
||||
PrmAuth: PrmAuth{
|
||||
prm := frostfs.PrmObjectCreate{
|
||||
PrmAuth: frostfs.PrmAuth{
|
||||
BearerToken: h.fetchBearerToken(ctx),
|
||||
},
|
||||
Object: obj,
|
||||
|
|
|
@ -2,14 +2,19 @@ package handler
|
|||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -31,6 +36,43 @@ func (r *request) handleFrostFSErr(err error, start time.Time) {
|
|||
response.Error(r.RequestCtx, msg, statusCode)
|
||||
}
|
||||
|
||||
func (r *request) setAttributes(attrs map[string]string) {
|
||||
for key, val := range attrs {
|
||||
r.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *request) setIDs(obj object.Object) {
|
||||
objID, _ := obj.ID()
|
||||
cnrID, _ := obj.ContainerID()
|
||||
r.Response.Header.Set(hdrObjectID, objID.String())
|
||||
r.Response.Header.Set(hdrOwnerID, obj.OwnerID().String())
|
||||
r.Response.Header.Set(hdrContainerID, cnrID.String())
|
||||
}
|
||||
|
||||
func (r *request) setDisposition(shouldDownload bool, filename string) {
|
||||
const (
|
||||
inlineDisposition = "inline"
|
||||
attachmentDisposition = "attachment"
|
||||
)
|
||||
dis := inlineDisposition
|
||||
if shouldDownload {
|
||||
dis = attachmentDisposition
|
||||
}
|
||||
r.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
||||
}
|
||||
|
||||
func (r *request) setTimestamp(timestamp string) error {
|
||||
value, err := strconv.ParseInt(timestamp, 10, 64)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.Response.Header.Set(fasthttp.HeaderLastModified,
|
||||
time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func bearerToken(ctx context.Context) *bearer.Token {
|
||||
if tkn, err := tokens.LoadBearerToken(ctx); err == nil {
|
||||
return tkn
|
||||
|
|
|
@ -7,15 +7,115 @@ import (
|
|||
"io"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
)
|
||||
|
||||
type ResObjectSearch interface {
|
||||
Read(buf []oid.ID) (int, error)
|
||||
Iterate(f func(oid.ID) bool) error
|
||||
Close()
|
||||
}
|
||||
|
||||
// Object represents FrostFS object.
|
||||
type Object struct {
|
||||
// Object header (doesn't contain payload).
|
||||
Header object.Object
|
||||
|
||||
// Object payload part encapsulated in io.Reader primitive.
|
||||
// Returns ErrAccessDenied on read access violation.
|
||||
Payload io.ReadCloser
|
||||
}
|
||||
|
||||
type (
|
||||
// PrmContainer groups parameters of FrostFS.Container operation.
|
||||
PrmContainer struct {
|
||||
// Container identifier.
|
||||
ContainerID cid.ID
|
||||
}
|
||||
// PrmAuth groups authentication parameters for the FrostFS operation.
|
||||
PrmAuth struct {
|
||||
// Bearer token to be used for the operation. Overlaps PrivateKey. Optional.
|
||||
BearerToken *bearer.Token
|
||||
}
|
||||
// PrmObjectHead groups parameters of FrostFS.HeadObject operation.
|
||||
PrmObjectHead struct {
|
||||
// Authentication parameters.
|
||||
PrmAuth
|
||||
|
||||
// Address to read the object header from.
|
||||
Address oid.Address
|
||||
}
|
||||
// PrmObjectGet groups parameters of FrostFS.GetObject operation.
|
||||
PrmObjectGet struct {
|
||||
// Authentication parameters.
|
||||
PrmAuth
|
||||
|
||||
// Address to read the object header from.
|
||||
Address oid.Address
|
||||
Container cid.ID
|
||||
Object oid.ID
|
||||
}
|
||||
// PrmObjectRange groups parameters of FrostFS.RangeObject operation.
|
||||
PrmObjectRange struct {
|
||||
// Authentication parameters.
|
||||
PrmAuth
|
||||
|
||||
// Address to read the object header from.
|
||||
Address oid.Address
|
||||
|
||||
// Offset-length range of the object payload to be read.
|
||||
PayloadRange [2]uint64
|
||||
|
||||
Container cid.ID
|
||||
|
||||
Object oid.ID
|
||||
}
|
||||
// PrmObjectCreate groups parameters of FrostFS.CreateObject operation.
|
||||
PrmObjectCreate struct {
|
||||
// Authentication parameters.
|
||||
PrmAuth
|
||||
|
||||
Object *object.Object
|
||||
|
||||
// Object payload encapsulated in io.Reader primitive.
|
||||
Payload io.Reader
|
||||
|
||||
// Enables client side object preparing.
|
||||
ClientCut bool
|
||||
|
||||
// Disables using Tillich-Zémor hash for payload.
|
||||
WithoutHomomorphicHash bool
|
||||
|
||||
// Sets max buffer size to read payload.
|
||||
BufferMaxSize uint64
|
||||
}
|
||||
// PrmObjectSearch groups parameters of FrostFS.sear SearchObjects operation.
|
||||
PrmObjectSearch struct {
|
||||
// Authentication parameters.
|
||||
PrmAuth
|
||||
|
||||
// Container to select the objects from.
|
||||
Container cid.ID
|
||||
|
||||
Filters object.SearchFilters
|
||||
}
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrAccessDenied is returned from FrostFS in case of access violation.
|
||||
ErrAccessDenied = errors.New("access denied")
|
||||
// ErrGatewayTimeout is returned from FrostFS in case of timeout, deadline exceeded etc.
|
||||
ErrGatewayTimeout = errors.New("gateway timeout")
|
||||
)
|
||||
|
||||
// FrostFS represents virtual connection to the FrostFS network.
|
||||
|
@ -33,9 +133,9 @@ func NewFrostFS(p *pool.Pool) *FrostFS {
|
|||
}
|
||||
|
||||
// Container implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) Container(ctx context.Context, layerPrm handler.PrmContainer) (*container.Container, error) {
|
||||
func (x *FrostFS) Container(ctx context.Context, containerPrm PrmContainer) (*container.Container, error) {
|
||||
prm := pool.PrmContainerGet{
|
||||
ContainerID: layerPrm.ContainerID,
|
||||
ContainerID: containerPrm.ContainerID,
|
||||
}
|
||||
|
||||
res, err := x.pool.GetContainer(ctx, prm)
|
||||
|
@ -47,7 +147,7 @@ func (x *FrostFS) Container(ctx context.Context, layerPrm handler.PrmContainer)
|
|||
}
|
||||
|
||||
// CreateObject implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) CreateObject(ctx context.Context, prm handler.PrmObjectCreate) (oid.ID, error) {
|
||||
func (x *FrostFS) CreateObject(ctx context.Context, prm PrmObjectCreate) (oid.ID, error) {
|
||||
var prmPut pool.PrmObjectPut
|
||||
prmPut.SetHeader(*prm.Object)
|
||||
prmPut.SetPayload(prm.Payload)
|
||||
|
@ -78,7 +178,7 @@ func (x payloadReader) Read(p []byte) (int, error) {
|
|||
}
|
||||
|
||||
// HeadObject implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) HeadObject(ctx context.Context, prm handler.PrmObjectHead) (*object.Object, error) {
|
||||
func (x *FrostFS) HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Object, error) {
|
||||
var prmHead pool.PrmObjectHead
|
||||
prmHead.SetAddress(prm.Address)
|
||||
|
||||
|
@ -95,7 +195,7 @@ func (x *FrostFS) HeadObject(ctx context.Context, prm handler.PrmObjectHead) (*o
|
|||
}
|
||||
|
||||
// GetObject implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) GetObject(ctx context.Context, prm handler.PrmObjectGet) (*handler.Object, error) {
|
||||
func (x *FrostFS) GetObject(ctx context.Context, prm PrmObjectGet) (*Object, error) {
|
||||
var prmGet pool.PrmObjectGet
|
||||
prmGet.SetAddress(prm.Address)
|
||||
|
||||
|
@ -108,14 +208,14 @@ func (x *FrostFS) GetObject(ctx context.Context, prm handler.PrmObjectGet) (*han
|
|||
return nil, handleObjectError("init full object reading via connection pool", err)
|
||||
}
|
||||
|
||||
return &handler.Object{
|
||||
return &Object{
|
||||
Header: res.Header,
|
||||
Payload: res.Payload,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// RangeObject implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) RangeObject(ctx context.Context, prm handler.PrmObjectRange) (io.ReadCloser, error) {
|
||||
func (x *FrostFS) RangeObject(ctx context.Context, prm PrmObjectRange) (io.ReadCloser, error) {
|
||||
var prmRange pool.PrmObjectRange
|
||||
prmRange.SetAddress(prm.Address)
|
||||
prmRange.SetOffset(prm.PayloadRange[0])
|
||||
|
@ -134,7 +234,7 @@ func (x *FrostFS) RangeObject(ctx context.Context, prm handler.PrmObjectRange) (
|
|||
}
|
||||
|
||||
// SearchObjects implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) SearchObjects(ctx context.Context, prm handler.PrmObjectSearch) (handler.ResObjectSearch, error) {
|
||||
func (x *FrostFS) SearchObjects(ctx context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
|
||||
var prmSearch pool.PrmObjectSearch
|
||||
prmSearch.SetContainerID(prm.Container)
|
||||
prmSearch.SetFilters(prm.Filters)
|
||||
|
@ -202,11 +302,11 @@ func handleObjectError(msg string, err error) error {
|
|||
}
|
||||
|
||||
if reason, ok := IsErrObjectAccessDenied(err); ok {
|
||||
return fmt.Errorf("%s: %w: %s", msg, handler.ErrAccessDenied, reason)
|
||||
return fmt.Errorf("%s: %w: %s", msg, ErrAccessDenied, reason)
|
||||
}
|
||||
|
||||
if IsTimeoutError(err) {
|
||||
return fmt.Errorf("%s: %w: %s", msg, handler.ErrGatewayTimeout, err.Error())
|
||||
return fmt.Errorf("%s: %w: %s", msg, ErrGatewayTimeout, err.Error())
|
||||
}
|
||||
|
||||
return fmt.Errorf("%s: %w", msg, err)
|
168
internal/service/frostfs/multi_object_reader.go
Normal file
168
internal/service/frostfs/multi_object_reader.go
Normal file
|
@ -0,0 +1,168 @@
|
|||
package frostfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type GetFrostFSParams struct {
|
||||
// payload range
|
||||
Off, Ln uint64
|
||||
|
||||
Oid oid.ID
|
||||
BktInfo *data.BucketInfo
|
||||
}
|
||||
|
||||
type PartObj struct {
|
||||
OID oid.ID
|
||||
Size uint64
|
||||
}
|
||||
|
||||
type readerInitiator interface {
|
||||
InitFrostFSObjectPayloadReader(ctx context.Context, p GetFrostFSParams) (io.Reader, error)
|
||||
}
|
||||
|
||||
// MultiObjectReader implements io.Reader of payloads of the object list stored in the FrostFS network.
|
||||
type MultiObjectReader struct {
|
||||
ctx context.Context
|
||||
log *zap.Logger
|
||||
|
||||
layer readerInitiator
|
||||
|
||||
startPartOffset uint64
|
||||
endPartLength uint64
|
||||
|
||||
prm GetFrostFSParams
|
||||
|
||||
curIndex int
|
||||
curReader io.Reader
|
||||
|
||||
parts []PartObj
|
||||
}
|
||||
|
||||
type MultiObjectReaderConfig struct {
|
||||
Handler readerInitiator
|
||||
Log *zap.Logger
|
||||
|
||||
// the offset of complete object and total size to read
|
||||
Off, Ln uint64
|
||||
|
||||
BktInfo *data.BucketInfo
|
||||
Parts []PartObj
|
||||
}
|
||||
|
||||
var (
|
||||
errOffsetIsOutOfRange = errors.New("offset is out of payload range")
|
||||
errLengthIsOutOfRange = errors.New("length is out of payload range")
|
||||
errEmptyPartsList = errors.New("empty parts list")
|
||||
errorZeroRangeLength = errors.New("zero range length")
|
||||
)
|
||||
|
||||
func NewMultiObjectReader(ctx context.Context, cfg MultiObjectReaderConfig) (*MultiObjectReader, error) {
|
||||
if len(cfg.Parts) == 0 {
|
||||
return nil, errEmptyPartsList
|
||||
}
|
||||
|
||||
r := &MultiObjectReader{
|
||||
ctx: ctx,
|
||||
layer: cfg.Handler,
|
||||
prm: GetFrostFSParams{
|
||||
BktInfo: cfg.BktInfo,
|
||||
},
|
||||
parts: cfg.Parts,
|
||||
log: cfg.Log,
|
||||
}
|
||||
|
||||
if cfg.Off+cfg.Ln == 0 {
|
||||
return r, nil
|
||||
}
|
||||
|
||||
if cfg.Off > 0 && cfg.Ln == 0 {
|
||||
return nil, errorZeroRangeLength
|
||||
}
|
||||
|
||||
startPartIndex, startPartOffset := findStartPart(cfg)
|
||||
if startPartIndex == -1 {
|
||||
return nil, errOffsetIsOutOfRange
|
||||
}
|
||||
r.startPartOffset = startPartOffset
|
||||
|
||||
endPartIndex, endPartLength := findEndPart(cfg)
|
||||
if endPartIndex == -1 {
|
||||
return nil, errLengthIsOutOfRange
|
||||
}
|
||||
r.endPartLength = endPartLength
|
||||
|
||||
r.parts = cfg.Parts[startPartIndex : endPartIndex+1]
|
||||
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func findStartPart(cfg MultiObjectReaderConfig) (index int, offset uint64) {
|
||||
position := cfg.Off
|
||||
for i, part := range cfg.Parts {
|
||||
// Strict inequality when searching for start position to avoid reading zero length part.
|
||||
if position < part.Size {
|
||||
return i, position
|
||||
}
|
||||
position -= part.Size
|
||||
}
|
||||
|
||||
return -1, 0
|
||||
}
|
||||
|
||||
func findEndPart(cfg MultiObjectReaderConfig) (index int, length uint64) {
|
||||
position := cfg.Off + cfg.Ln
|
||||
for i, part := range cfg.Parts {
|
||||
// Non-strict inequality when searching for end position to avoid out of payload range error.
|
||||
if position <= part.Size {
|
||||
return i, position
|
||||
}
|
||||
position -= part.Size
|
||||
}
|
||||
|
||||
return -1, 0
|
||||
}
|
||||
|
||||
func (x *MultiObjectReader) Read(p []byte) (n int, err error) {
|
||||
if x.curReader != nil {
|
||||
n, err = x.curReader.Read(p)
|
||||
if !errors.Is(err, io.EOF) {
|
||||
return n, err
|
||||
}
|
||||
x.curIndex++
|
||||
}
|
||||
|
||||
if x.curIndex == len(x.parts) {
|
||||
return n, io.EOF
|
||||
}
|
||||
|
||||
x.prm.Oid = x.parts[x.curIndex].OID
|
||||
|
||||
if x.curIndex == 0 {
|
||||
x.prm.Off = x.startPartOffset
|
||||
x.prm.Ln = x.parts[x.curIndex].Size - x.startPartOffset
|
||||
}
|
||||
|
||||
if x.curIndex == len(x.parts)-1 {
|
||||
x.prm.Ln = x.endPartLength - x.prm.Off
|
||||
}
|
||||
|
||||
x.curReader, err = x.layer.InitFrostFSObjectPayloadReader(x.ctx, x.prm)
|
||||
if err != nil {
|
||||
return n, fmt.Errorf("init payload reader for the next part: %w", err)
|
||||
}
|
||||
|
||||
x.prm.Off = 0
|
||||
x.prm.Ln = 0
|
||||
|
||||
next, err := x.Read(p[n:])
|
||||
|
||||
return n + next, err
|
||||
}
|
137
internal/service/frostfs/multi_object_reader_test.go
Normal file
137
internal/service/frostfs/multi_object_reader_test.go
Normal file
|
@ -0,0 +1,137 @@
|
|||
package frostfs
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type readerInitiatorMock struct {
|
||||
parts map[oid.ID][]byte
|
||||
}
|
||||
|
||||
func (r *readerInitiatorMock) InitFrostFSObjectPayloadReader(_ context.Context, p GetFrostFSParams) (io.Reader, error) {
|
||||
partPayload, ok := r.parts[p.Oid]
|
||||
if !ok {
|
||||
return nil, errors.New("part not found")
|
||||
}
|
||||
|
||||
if p.Off+p.Ln == 0 {
|
||||
return bytes.NewReader(partPayload), nil
|
||||
}
|
||||
|
||||
if p.Off > uint64(len(partPayload)-1) {
|
||||
return nil, fmt.Errorf("invalid offset: %d/%d", p.Off, len(partPayload))
|
||||
}
|
||||
|
||||
if p.Off+p.Ln > uint64(len(partPayload)) {
|
||||
return nil, fmt.Errorf("invalid range: %d-%d/%d", p.Off, p.Off+p.Ln, len(partPayload))
|
||||
}
|
||||
|
||||
return bytes.NewReader(partPayload[p.Off : p.Off+p.Ln]), nil
|
||||
}
|
||||
|
||||
func prepareDataReader() ([]byte, []PartObj, *readerInitiatorMock) {
|
||||
mockInitReader := &readerInitiatorMock{
|
||||
parts: map[oid.ID][]byte{
|
||||
oidtest.ID(): []byte("first part 1"),
|
||||
oidtest.ID(): []byte("second part 2"),
|
||||
oidtest.ID(): []byte("third part 3"),
|
||||
},
|
||||
}
|
||||
|
||||
var fullPayload []byte
|
||||
parts := make([]PartObj, 0, len(mockInitReader.parts))
|
||||
for id, payload := range mockInitReader.parts {
|
||||
parts = append(parts, PartObj{OID: id, Size: uint64(len(payload))})
|
||||
fullPayload = append(fullPayload, payload...)
|
||||
}
|
||||
|
||||
return fullPayload, parts, mockInitReader
|
||||
}
|
||||
|
||||
func TestMultiReader(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
fullPayload, parts, mockInitReader := prepareDataReader()
|
||||
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
off uint64
|
||||
ln uint64
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "simple read all",
|
||||
},
|
||||
{
|
||||
name: "simple read with length",
|
||||
ln: uint64(len(fullPayload)),
|
||||
},
|
||||
{
|
||||
name: "middle of parts",
|
||||
off: parts[0].Size + 2,
|
||||
ln: 4,
|
||||
},
|
||||
{
|
||||
name: "first and second",
|
||||
off: parts[0].Size - 4,
|
||||
ln: 8,
|
||||
},
|
||||
{
|
||||
name: "first and third",
|
||||
off: parts[0].Size - 4,
|
||||
ln: parts[1].Size + 8,
|
||||
},
|
||||
{
|
||||
name: "second part",
|
||||
off: parts[0].Size,
|
||||
ln: parts[1].Size,
|
||||
},
|
||||
{
|
||||
name: "second and third",
|
||||
off: parts[0].Size,
|
||||
ln: parts[1].Size + parts[2].Size,
|
||||
},
|
||||
{
|
||||
name: "offset out of range",
|
||||
off: uint64(len(fullPayload) + 1),
|
||||
ln: 1,
|
||||
err: errOffsetIsOutOfRange,
|
||||
},
|
||||
{
|
||||
name: "zero length",
|
||||
off: parts[1].Size + 1,
|
||||
ln: 0,
|
||||
err: errorZeroRangeLength,
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
multiReader, err := NewMultiObjectReader(ctx, MultiObjectReaderConfig{
|
||||
Handler: mockInitReader,
|
||||
Parts: parts,
|
||||
Off: tc.off,
|
||||
Ln: tc.ln,
|
||||
})
|
||||
require.ErrorIs(t, err, tc.err)
|
||||
|
||||
if tc.err == nil {
|
||||
off := tc.off
|
||||
ln := tc.ln
|
||||
if off+ln == 0 {
|
||||
ln = uint64(len(fullPayload))
|
||||
}
|
||||
data, err := io.ReadAll(multiReader)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, fullPayload[off:off+ln], data)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package services
|
||||
package frostfs
|
||||
|
||||
import (
|
||||
"context"
|
Loading…
Reference in a new issue