feature/add_mocks #117
12 changed files with 1005 additions and 113 deletions
|
@ -16,6 +16,7 @@ import (
|
|||
|
||||
v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/services"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
||||
|
@ -600,10 +601,10 @@ func (a *app) reqNamespace(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
|||
}
|
||||
}
|
||||
|
||||
func (a *app) AppParams() *utils.AppParams {
|
||||
return &utils.AppParams{
|
||||
func (a *app) AppParams() *handler.AppParams {
|
||||
return &handler.AppParams{
|
||||
Logger: a.log,
|
||||
Pool: a.pool,
|
||||
FrostFS: frostfs.NewFrostFS(a.pool),
|
||||
Owner: a.owner,
|
||||
Resolver: a.resolver,
|
||||
Cache: cache.NewBucketCache(getCacheOptions(a.cfg, a.log)),
|
||||
|
|
260
internal/frostfs/frostfs.go
Normal file
260
internal/frostfs/frostfs.go
Normal file
|
@ -0,0 +1,260 @@
|
|||
package frostfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// FrostFS represents virtual connection to the FrostFS network.
|
||||
// It is used to provide an interface to dependent packages
|
||||
// which work with FrostFS.
|
||||
type FrostFS struct {
|
||||
pool *pool.Pool
|
||||
}
|
||||
|
||||
// NewFrostFS creates new FrostFS using provided pool.Pool.
|
||||
func NewFrostFS(p *pool.Pool) *FrostFS {
|
||||
return &FrostFS{
|
||||
pool: p,
|
||||
}
|
||||
}
|
||||
|
||||
// Container implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) Container(ctx context.Context, layerPrm handler.PrmContainer) (*container.Container, error) {
|
||||
prm := pool.PrmContainerGet{
|
||||
ContainerID: layerPrm.ContainerID,
|
||||
}
|
||||
|
||||
res, err := x.pool.GetContainer(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("read container via connection pool", err)
|
||||
}
|
||||
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
// CreateObject implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) CreateObject(ctx context.Context, prm handler.PrmObjectCreate) (oid.ID, error) {
|
||||
var prmPut pool.PrmObjectPut
|
||||
prmPut.SetHeader(*prm.Object)
|
||||
prmPut.SetPayload(prm.Payload)
|
||||
prmPut.SetClientCut(prm.ClientCut)
|
||||
prmPut.WithoutHomomorphicHash(prm.WithoutHomomorphicHash)
|
||||
prmPut.SetBufferMaxSize(prm.BufferMaxSize)
|
||||
|
||||
if prm.BearerToken != nil {
|
||||
prmPut.UseBearer(*prm.BearerToken)
|
||||
}
|
||||
|
||||
idObj, err := x.pool.PutObject(ctx, prmPut)
|
||||
return idObj, handleObjectError("save object via connection pool", err)
|
||||
}
|
||||
|
||||
// wraps io.ReadCloser and transforms Read errors related to access violation
|
||||
// to frostfs.ErrAccessDenied.
|
||||
type payloadReader struct {
|
||||
io.ReadCloser
|
||||
}
|
||||
|
||||
func (x payloadReader) Read(p []byte) (int, error) {
|
||||
n, err := x.ReadCloser.Read(p)
|
||||
if err != nil && errors.Is(err, io.EOF) {
|
||||
return n, err
|
||||
}
|
||||
return n, handleObjectError("read payload", err)
|
||||
}
|
||||
|
||||
// ReadObject implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) ReadObject(ctx context.Context, prm handler.PrmObjectRead) (*handler.ObjectPart, error) {
|
||||
var prmGet pool.PrmObjectGet
|
||||
prmGet.SetAddress(prm.Address)
|
||||
|
||||
if prm.BearerToken != nil {
|
||||
prmGet.UseBearer(*prm.BearerToken)
|
||||
}
|
||||
|
||||
if prm.WithHeader {
|
||||
if prm.WithPayload {
|
||||
res, err := x.pool.GetObject(ctx, prmGet)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("init full object reading via connection pool", err)
|
||||
}
|
||||
|
||||
defer res.Payload.Close()
|
||||
|
||||
payload, err := io.ReadAll(res.Payload)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("read full object payload", err)
|
||||
}
|
||||
|
||||
res.Header.SetPayload(payload)
|
||||
|
||||
return &handler.ObjectPart{
|
||||
Head: &res.Header,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var prmHead pool.PrmObjectHead
|
||||
prmHead.SetAddress(prm.Address)
|
||||
|
||||
if prm.BearerToken != nil {
|
||||
prmHead.UseBearer(*prm.BearerToken)
|
||||
}
|
||||
|
||||
hdr, err := x.pool.HeadObject(ctx, prmHead)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("read object header via connection pool", err)
|
||||
}
|
||||
|
||||
return &handler.ObjectPart{
|
||||
Head: &hdr,
|
||||
}, nil
|
||||
} else if prm.PayloadRange[0]+prm.PayloadRange[1] == 0 {
|
||||
res, err := x.pool.GetObject(ctx, prmGet)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("init full payload range reading via connection pool", err)
|
||||
}
|
||||
|
||||
return &handler.ObjectPart{
|
||||
Payload: res.Payload,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var prmRange pool.PrmObjectRange
|
||||
prmRange.SetAddress(prm.Address)
|
||||
prmRange.SetOffset(prm.PayloadRange[0])
|
||||
prmRange.SetLength(prm.PayloadRange[1])
|
||||
|
||||
if prm.BearerToken != nil {
|
||||
prmRange.UseBearer(*prm.BearerToken)
|
||||
}
|
||||
|
||||
res, err := x.pool.ObjectRange(ctx, prmRange)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("init payload range reading via connection pool", err)
|
||||
}
|
||||
|
||||
return &handler.ObjectPart{
|
||||
Payload: payloadReader{&res},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SearchObjects implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) SearchObjects(ctx context.Context, prm handler.PrmObjectSearch) (handler.ResObjectSearch, error) {
|
||||
var prmSearch pool.PrmObjectSearch
|
||||
prmSearch.SetContainerID(prm.Container)
|
||||
prmSearch.SetFilters(prm.Filters)
|
||||
|
||||
if prm.BearerToken != nil {
|
||||
prmSearch.UseBearer(*prm.BearerToken)
|
||||
}
|
||||
|
||||
res, err := x.pool.SearchObjects(ctx, prmSearch)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("init object search via connection pool", err)
|
||||
}
|
||||
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
// GetEpochDurations implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) GetEpochDurations(ctx context.Context) (*utils.EpochDurations, error) {
|
||||
networkInfo, err := x.pool.NetworkInfo(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res := &utils.EpochDurations{
|
||||
CurrentEpoch: networkInfo.CurrentEpoch(),
|
||||
MsPerBlock: networkInfo.MsPerBlock(),
|
||||
BlockPerEpoch: networkInfo.EpochDuration(),
|
||||
}
|
||||
|
||||
if res.BlockPerEpoch == 0 {
|
||||
return nil, fmt.Errorf("EpochDuration is empty")
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// ResolverFrostFS represents virtual connection to the FrostFS network.
|
||||
// It implements resolver.FrostFS.
|
||||
type ResolverFrostFS struct {
|
||||
pool *pool.Pool
|
||||
}
|
||||
|
||||
// NewResolverFrostFS creates new ResolverFrostFS using provided pool.Pool.
|
||||
func NewResolverFrostFS(p *pool.Pool) *ResolverFrostFS {
|
||||
return &ResolverFrostFS{pool: p}
|
||||
}
|
||||
|
||||
// SystemDNS implements resolver.FrostFS interface method.
|
||||
func (x *ResolverFrostFS) SystemDNS(ctx context.Context) (string, error) {
|
||||
networkInfo, err := x.pool.NetworkInfo(ctx)
|
||||
if err != nil {
|
||||
return "", handleObjectError("read network info via client", err)
|
||||
}
|
||||
|
||||
domain := networkInfo.RawNetworkParameter("SystemDNS")
|
||||
if domain == nil {
|
||||
return "", errors.New("system DNS parameter not found or empty")
|
||||
}
|
||||
|
||||
return string(domain), nil
|
||||
}
|
||||
|
||||
func handleObjectError(msg string, err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if reason, ok := IsErrObjectAccessDenied(err); ok {
|
||||
return fmt.Errorf("%s: %w: %s", msg, handler.ErrAccessDenied, reason)
|
||||
}
|
||||
|
||||
if IsTimeoutError(err) {
|
||||
return fmt.Errorf("%s: %w: %s", msg, handler.ErrGatewayTimeout, err.Error())
|
||||
}
|
||||
|
||||
return fmt.Errorf("%s: %w", msg, err)
|
||||
}
|
||||
|
||||
func UnwrapErr(err error) error {
|
||||
unwrappedErr := errors.Unwrap(err)
|
||||
for unwrappedErr != nil {
|
||||
err = unwrappedErr
|
||||
unwrappedErr = errors.Unwrap(err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func IsErrObjectAccessDenied(err error) (string, bool) {
|
||||
err = UnwrapErr(err)
|
||||
switch err := err.(type) {
|
||||
default:
|
||||
return "", false
|
||||
case *apistatus.ObjectAccessDenied:
|
||||
return err.Reason(), true
|
||||
}
|
||||
}
|
||||
|
||||
func IsTimeoutError(err error) bool {
|
||||
if strings.Contains(err.Error(), "timeout") ||
|
||||
errors.Is(err, context.DeadlineExceeded) {
|
||||
return true
|
||||
}
|
||||
|
||||
return status.Code(UnwrapErr(err)) == codes.DeadlineExceeded
|
||||
}
|
|
@ -17,7 +17,6 @@ import (
|
|||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -46,19 +45,20 @@ func (h *Handler) DownloadByAttribute(c *fasthttp.RequestCtx) {
|
|||
h.byAttribute(c, h.receiveFile)
|
||||
}
|
||||
|
||||
func (h *Handler) search(ctx context.Context, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) {
|
||||
func (h *Handler) search(ctx context.Context, cnrID *cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) {
|
||||
filters := object.NewSearchFilters()
|
||||
filters.AddRootFilter()
|
||||
filters.AddFilter(key, val, op)
|
||||
|
||||
var prm pool.PrmObjectSearch
|
||||
prm.SetContainerID(*cid)
|
||||
prm.SetFilters(filters)
|
||||
if btoken := bearerToken(ctx); btoken != nil {
|
||||
prm.UseBearer(*btoken)
|
||||
prm := PrmObjectSearch{
|
||||
PrmAuth: PrmAuth{
|
||||
BearerToken: bearerToken(ctx),
|
||||
},
|
||||
Container: *cnrID,
|
||||
Filters: filters,
|
||||
}
|
||||
|
||||
return h.pool.SearchObjects(ctx, prm)
|
||||
return h.frostfs.SearchObjects(ctx, prm)
|
||||
}
|
||||
|
||||
func (h *Handler) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer, error) {
|
||||
|
@ -153,18 +153,21 @@ func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
|
|||
}
|
||||
|
||||
func (h *Handler) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error {
|
||||
var prm pool.PrmObjectGet
|
||||
prm.SetAddress(addr)
|
||||
if btoken != nil {
|
||||
prm.UseBearer(*btoken)
|
||||
prm := PrmObjectRead{
|
||||
PrmAuth: PrmAuth{
|
||||
BearerToken: btoken,
|
||||
},
|
||||
Address: addr,
|
||||
WithHeader: true,
|
||||
WithPayload: true,
|
||||
}
|
||||
|
||||
resGet, err := h.pool.GetObject(ctx, prm)
|
||||
resGet, err := h.frostfs.ReadObject(ctx, prm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get FrostFS object: %v", err)
|
||||
}
|
||||
|
||||
objWriter, err := h.addObjectToZip(zipWriter, &resGet.Header)
|
||||
objWriter, err := h.addObjectToZip(zipWriter, resGet.Head)
|
||||
if err != nil {
|
||||
return fmt.Errorf("zip create header: %v", err)
|
||||
}
|
||||
|
|
260
internal/handler/frostfs_mock.go
Normal file
260
internal/handler/frostfs_mock.go
Normal file
|
@ -0,0 +1,260 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"crypto/sha256"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
)
|
||||
|
||||
type TestFrostFS struct {
|
||||
objects map[string]*object.Object
|
||||
containers map[string]*container.Container
|
||||
accessList map[string]bool
|
||||
key *keys.PrivateKey
|
||||
}
|
||||
|
||||
func NewTestFrostFS(key *keys.PrivateKey) *TestFrostFS {
|
||||
return &TestFrostFS{
|
||||
objects: make(map[string]*object.Object),
|
||||
containers: make(map[string]*container.Container),
|
||||
accessList: make(map[string]bool),
|
||||
key: key,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) ContainerID(name string) (*cid.ID, error) {
|
||||
for id, cnr := range t.containers {
|
||||
if container.Name(*cnr) == name {
|
||||
var cnrID cid.ID
|
||||
return &cnrID, cnrID.DecodeString(id)
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("not found")
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) SetContainer(cnrID cid.ID, cnr *container.Container) {
|
||||
t.containers[cnrID.EncodeToString()] = cnr
|
||||
}
|
||||
|
||||
// AllowUserOperation grants access to object operations.
|
||||
// Empty userID and objID means any user and object respectively.
|
||||
func (t *TestFrostFS) AllowUserOperation(cnrID cid.ID, userID user.ID, op acl.Op, objID oid.ID) {
|
||||
t.accessList[fmt.Sprintf("%s/%s/%s/%s", cnrID, userID, op, objID)] = true
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) Container(_ context.Context, prm PrmContainer) (*container.Container, error) {
|
||||
for k, v := range t.containers {
|
||||
if k == prm.ContainerID.EncodeToString() {
|
||||
return v, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("container not found %s", prm.ContainerID)
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) requestOwner(btoken *bearer.Token) user.ID {
|
||||
if btoken != nil {
|
||||
return bearer.ResolveIssuer(*btoken)
|
||||
}
|
||||
|
||||
var owner user.ID
|
||||
user.IDFromKey(&owner, t.key.PrivateKey.PublicKey)
|
||||
return owner
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) ReadObject(_ context.Context, prm PrmObjectRead) (*ObjectPart, error) {
|
||||
sAddr := prm.Address.EncodeToString()
|
||||
|
||||
if obj, ok := t.objects[sAddr]; ok {
|
||||
owner := t.requestOwner(prm.BearerToken)
|
||||
|
||||
if !t.isAllowed(prm.Address.Container(), owner, acl.OpObjectGet, prm.Address.Object()) {
|
||||
return nil, ErrAccessDenied
|
||||
}
|
||||
|
||||
payload := obj.Payload()
|
||||
|
||||
if prm.PayloadRange[0]+prm.PayloadRange[1] > 0 {
|
||||
off := prm.PayloadRange[0]
|
||||
payload = payload[off : off+prm.PayloadRange[1]]
|
||||
}
|
||||
|
||||
return &ObjectPart{
|
||||
Head: obj,
|
||||
Payload: io.NopCloser(bytes.NewReader(payload)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, prm.Address)
|
||||
}
|
||||
func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) {
|
||||
b := make([]byte, 32)
|
||||
if _, err := io.ReadFull(rand.Reader, b); err != nil {
|
||||
return oid.ID{}, err
|
||||
}
|
||||
var id oid.ID
|
||||
id.SetSHA256(sha256.Sum256(b))
|
||||
prm.Object.SetID(id)
|
||||
|
||||
attrs := prm.Object.Attributes()
|
||||
if prm.ClientCut {
|
||||
a := object.NewAttribute()
|
||||
a.SetKey("s3-client-cut")
|
||||
a.SetValue("true")
|
||||
attrs = append(attrs, *a)
|
||||
}
|
||||
|
||||
prm.Object.SetAttributes(attrs...)
|
||||
|
||||
if prm.Payload != nil {
|
||||
all, err := io.ReadAll(prm.Payload)
|
||||
if err != nil {
|
||||
return oid.ID{}, err
|
||||
}
|
||||
prm.Object.SetPayload(all)
|
||||
prm.Object.SetPayloadSize(uint64(len(all)))
|
||||
var hash checksum.Checksum
|
||||
checksum.Calculate(&hash, checksum.SHA256, all)
|
||||
prm.Object.SetPayloadChecksum(hash)
|
||||
}
|
||||
|
||||
cnrID, _ := prm.Object.ContainerID()
|
||||
objID, _ := prm.Object.ID()
|
||||
|
||||
owner := t.requestOwner(prm.BearerToken)
|
||||
|
||||
if !t.isAllowed(cnrID, owner, acl.OpObjectPut, objID) {
|
||||
return oid.ID{}, ErrAccessDenied
|
||||
}
|
||||
|
||||
addr := newAddress(cnrID, objID)
|
||||
t.objects[addr.EncodeToString()] = prm.Object
|
||||
return objID, nil
|
||||
}
|
||||
|
||||
type resObjectSearchMock struct {
|
||||
res []oid.ID
|
||||
}
|
||||
|
||||
func (r *resObjectSearchMock) Read(buf []oid.ID) (int, error) {
|
||||
for i := range buf {
|
||||
if i > len(r.res)-1 {
|
||||
return len(r.res), io.EOF
|
||||
}
|
||||
buf[i] = r.res[i]
|
||||
}
|
||||
|
||||
r.res = r.res[len(buf):]
|
||||
|
||||
return len(buf), nil
|
||||
}
|
||||
|
||||
func (r *resObjectSearchMock) Iterate(f func(oid.ID) bool) error {
|
||||
for _, id := range r.res {
|
||||
if f(id) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *resObjectSearchMock) Close() {}
|
||||
|
||||
func (t *TestFrostFS) SearchObjects(_ context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
|
||||
if !t.isAllowed(prm.Container, t.requestOwner(prm.BearerToken), acl.OpObjectSearch, oid.ID{}) {
|
||||
return nil, ErrAccessDenied
|
||||
}
|
||||
|
||||
cidStr := prm.Container.EncodeToString()
|
||||
var res []oid.ID
|
||||
|
||||
if len(prm.Filters) == 1 { // match root filter
|
||||
for k, v := range t.objects {
|
||||
if strings.Contains(k, cidStr) {
|
||||
id, _ := v.ID()
|
||||
res = append(res, id)
|
||||
}
|
||||
}
|
||||
return &resObjectSearchMock{res: res}, nil
|
||||
}
|
||||
|
||||
filter := prm.Filters[1]
|
||||
if len(prm.Filters) != 2 ||
|
||||
filter.Operation() != object.MatchCommonPrefix && filter.Operation() != object.MatchStringEqual {
|
||||
return nil, fmt.Errorf("usupported filters")
|
||||
}
|
||||
|
||||
for k, v := range t.objects {
|
||||
if strings.Contains(k, cidStr) && isMatched(v.Attributes(), filter) {
|
||||
id, _ := v.ID()
|
||||
res = append(res, id)
|
||||
}
|
||||
}
|
||||
|
||||
return &resObjectSearchMock{res: res}, nil
|
||||
}
|
||||
|
||||
func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool {
|
||||
for _, attr := range attributes {
|
||||
if attr.Key() == filter.Header() {
|
||||
switch filter.Operation() {
|
||||
case object.MatchStringEqual:
|
||||
return attr.Value() == filter.Value()
|
||||
case object.MatchCommonPrefix:
|
||||
return strings.HasPrefix(attr.Value(), filter.Value())
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) GetEpochDurations(context.Context) (*utils.EpochDurations, error) {
|
||||
return &utils.EpochDurations{
|
||||
CurrentEpoch: 10,
|
||||
MsPerBlock: 1000,
|
||||
BlockPerEpoch: 100,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) isAllowed(cnrID cid.ID, userID user.ID, op acl.Op, objID oid.ID) bool {
|
||||
keysToCheck := []string{
|
||||
fmt.Sprintf("%s/%s/%s/%s", cnrID, userID, op, objID),
|
||||
fmt.Sprintf("%s/%s/%s/%s", cnrID, userID, op, oid.ID{}),
|
||||
fmt.Sprintf("%s/%s/%s/%s", cnrID, user.ID{}, op, objID),
|
||||
fmt.Sprintf("%s/%s/%s/%s", cnrID, user.ID{}, op, oid.ID{}),
|
||||
}
|
||||
|
||||
for _, key := range keysToCheck {
|
||||
if t.accessList[key] {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(cnr)
|
||||
addr.SetObject(obj)
|
||||
return addr
|
||||
}
|
|
@ -12,16 +12,15 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.uber.org/zap"
|
||||
|
@ -35,20 +34,125 @@ type Config interface {
|
|||
NamespaceHeader() string
|
||||
}
|
||||
|
||||
// PrmContainer groups parameters of FrostFS.Container operation.
|
||||
type PrmContainer struct {
|
||||
// Container identifier.
|
||||
ContainerID cid.ID
|
||||
}
|
||||
|
||||
// PrmAuth groups authentication parameters for the FrostFS operation.
|
||||
type PrmAuth struct {
|
||||
// Bearer token to be used for the operation. Overlaps PrivateKey. Optional.
|
||||
BearerToken *bearer.Token
|
||||
}
|
||||
|
||||
// PrmObjectRead groups parameters of FrostFS.ReadObject operation.
|
||||
type PrmObjectRead struct {
|
||||
// Authentication parameters.
|
||||
PrmAuth
|
||||
|
||||
// Address to read the object header from.
|
||||
Address oid.Address
|
||||
|
||||
// Flag to read object header.
|
||||
WithHeader bool
|
||||
|
||||
// Flag to read object payload. False overlaps payload range.
|
||||
WithPayload bool
|
||||
|
||||
// Offset-length range of the object payload to be read.
|
||||
PayloadRange [2]uint64
|
||||
}
|
||||
|
||||
// ObjectPart represents partially read FrostFS object.
|
||||
type ObjectPart struct {
|
||||
// Object header with optional in-memory payload part.
|
||||
Head *object.Object
|
||||
|
||||
// Object payload part encapsulated in io.Reader primitive.
|
||||
// Returns ErrAccessDenied on read access violation.
|
||||
Payload io.ReadCloser
|
||||
}
|
||||
|
||||
// PrmObjectCreate groups parameters of FrostFS.CreateObject operation.
|
||||
type PrmObjectCreate struct {
|
||||
// Authentication parameters.
|
||||
PrmAuth
|
||||
|
||||
Object *object.Object
|
||||
|
||||
// Object payload encapsulated in io.Reader primitive.
|
||||
Payload io.Reader
|
||||
|
||||
// Enables client side object preparing.
|
||||
ClientCut bool
|
||||
|
||||
// Disables using Tillich-Zémor hash for payload.
|
||||
WithoutHomomorphicHash bool
|
||||
|
||||
// Sets max buffer size to read payload.
|
||||
BufferMaxSize uint64
|
||||
}
|
||||
|
||||
// PrmObjectSearch groups parameters of FrostFS.sear SearchObjects operation.
|
||||
type PrmObjectSearch struct {
|
||||
// Authentication parameters.
|
||||
PrmAuth
|
||||
|
||||
// Container to select the objects from.
|
||||
Container cid.ID
|
||||
|
||||
Filters object.SearchFilters
|
||||
}
|
||||
|
||||
type ResObjectSearch interface {
|
||||
Read(buf []oid.ID) (int, error)
|
||||
Iterate(f func(oid.ID) bool) error
|
||||
Close()
|
||||
}
|
||||
|
||||
var (
|
||||
// ErrAccessDenied is returned from FrostFS in case of access violation.
|
||||
ErrAccessDenied = errors.New("access denied")
|
||||
// ErrGatewayTimeout is returned from FrostFS in case of timeout, deadline exceeded etc.
|
||||
ErrGatewayTimeout = errors.New("gateway timeout")
|
||||
)
|
||||
|
||||
// FrostFS represents virtual connection to FrostFS network.
|
||||
type FrostFS interface {
|
||||
Container(context.Context, PrmContainer) (*container.Container, error)
|
||||
ReadObject(context.Context, PrmObjectRead) (*ObjectPart, error)
|
||||
CreateObject(context.Context, PrmObjectCreate) (oid.ID, error)
|
||||
SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error)
|
||||
utils.EpochInfoFetcher
|
||||
}
|
||||
|
||||
type ContainerResolver interface {
|
||||
Resolve(ctx context.Context, name string) (*cid.ID, error)
|
||||
}
|
||||
|
||||
type Handler struct {
|
||||
log *zap.Logger
|
||||
pool *pool.Pool
|
||||
frostfs FrostFS
|
||||
ownerID *user.ID
|
||||
config Config
|
||||
containerResolver *resolver.ContainerResolver
|
||||
containerResolver ContainerResolver
|
||||
tree *tree.Tree
|
||||
cache *cache.BucketCache
|
||||
}
|
||||
|
||||
func New(params *utils.AppParams, config Config, tree *tree.Tree) *Handler {
|
||||
type AppParams struct {
|
||||
Logger *zap.Logger
|
||||
FrostFS FrostFS
|
||||
Owner *user.ID
|
||||
Resolver ContainerResolver
|
||||
Cache *cache.BucketCache
|
||||
}
|
||||
|
||||
func New(params *AppParams, config Config, tree *tree.Tree) *Handler {
|
||||
return &Handler{
|
||||
log: params.Logger,
|
||||
pool: params.Pool,
|
||||
frostfs: params.FrostFS,
|
||||
ownerID: params.Owner,
|
||||
config: config,
|
||||
containerResolver: params.Resolver,
|
||||
|
@ -235,8 +339,8 @@ func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *
|
|||
}
|
||||
|
||||
func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.BucketInfo, error) {
|
||||
prm := pool.PrmContainerGet{ContainerID: cnrID}
|
||||
res, err := h.pool.GetContainer(ctx, prm)
|
||||
prm := PrmContainer{ContainerID: cnrID}
|
||||
res, err := h.frostfs.Container(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get frostfs container '%s': %w", cnrID.String(), err)
|
||||
}
|
||||
|
@ -246,12 +350,12 @@ func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.Bucket
|
|||
Name: cnrID.EncodeToString(),
|
||||
}
|
||||
|
||||
if domain := container.ReadDomain(res); domain.Name() != "" {
|
||||
if domain := container.ReadDomain(*res); domain.Name() != "" {
|
||||
bktInfo.Name = domain.Name()
|
||||
bktInfo.Zone = domain.Zone()
|
||||
}
|
||||
|
||||
bktInfo.HomomorphicHashDisabled = container.IsHomomorphicHashingDisabled(res)
|
||||
bktInfo.HomomorphicHashDisabled = container.IsHomomorphicHashingDisabled(*res)
|
||||
|
||||
return bktInfo, err
|
||||
}
|
||||
|
|
296
internal/handler/handler_test.go
Normal file
296
internal/handler/handler_test.go
Normal file
|
@ -0,0 +1,296 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"archive/zip"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"net/http"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type treeClientMock struct {
|
||||
}
|
||||
|
||||
func (t *treeClientMock) GetNodes(context.Context, *tree.GetNodesParams) ([]tree.NodeResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type configMock struct {
|
||||
}
|
||||
|
||||
func (c *configMock) DefaultTimestamp() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *configMock) ZipCompression() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *configMock) ClientCut() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *configMock) BufferMaxSizeForPut() uint64 {
|
||||
return 0
|
||||
}
|
||||
|
||||
func (c *configMock) NamespaceHeader() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
type handlerContext struct {
|
||||
key *keys.PrivateKey
|
||||
owner user.ID
|
||||
|
||||
h *Handler
|
||||
frostfs *TestFrostFS
|
||||
tree *treeClientMock
|
||||
cfg *configMock
|
||||
}
|
||||
|
||||
func (hc *handlerContext) Handler() *Handler {
|
||||
return hc.h
|
||||
}
|
||||
|
||||
func prepareHandlerContext() (*handlerContext, error) {
|
||||
logger, err := zap.NewDevelopment()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
key, err := keys.NewPrivateKey()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var owner user.ID
|
||||
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
|
||||
|
||||
testFrostFS := NewTestFrostFS(key)
|
||||
|
||||
testResolver := &resolver.Resolver{Name: "test_resolver"}
|
||||
testResolver.SetResolveFunc(func(_ context.Context, name string) (*cid.ID, error) {
|
||||
return testFrostFS.ContainerID(name)
|
||||
})
|
||||
|
||||
params := &AppParams{
|
||||
Logger: logger,
|
||||
FrostFS: testFrostFS,
|
||||
Owner: &owner,
|
||||
Resolver: testResolver,
|
||||
Cache: cache.NewBucketCache(&cache.Config{
|
||||
Size: 1,
|
||||
Lifetime: 1,
|
||||
Logger: logger,
|
||||
}),
|
||||
}
|
||||
|
||||
treeMock := &treeClientMock{}
|
||||
cfgMock := &configMock{}
|
||||
|
||||
handler := New(params, cfgMock, tree.NewTree(treeMock))
|
||||
|
||||
return &handlerContext{
|
||||
key: key,
|
||||
owner: owner,
|
||||
h: handler,
|
||||
frostfs: testFrostFS,
|
||||
tree: treeMock,
|
||||
cfg: cfgMock,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (hc *handlerContext) prepareContainer(name string, basicACL acl.Basic) (cid.ID, *container.Container, error) {
|
||||
var pp netmap.PlacementPolicy
|
||||
err := pp.DecodeString("REP 1")
|
||||
if err != nil {
|
||||
return cid.ID{}, nil, err
|
||||
}
|
||||
|
||||
var cnr container.Container
|
||||
cnr.Init()
|
||||
cnr.SetOwner(hc.owner)
|
||||
cnr.SetPlacementPolicy(pp)
|
||||
cnr.SetBasicACL(basicACL)
|
||||
|
||||
var domain container.Domain
|
||||
domain.SetName(name)
|
||||
container.WriteDomain(&cnr, domain)
|
||||
container.SetName(&cnr, name)
|
||||
container.SetCreationTime(&cnr, time.Now())
|
||||
|
||||
cnrID := cidtest.ID()
|
||||
|
||||
for op := acl.OpObjectGet; op < acl.OpObjectHash; op++ {
|
||||
hc.frostfs.AllowUserOperation(cnrID, hc.owner, op, oid.ID{})
|
||||
if basicACL.IsOpAllowed(op, acl.RoleOthers) {
|
||||
hc.frostfs.AllowUserOperation(cnrID, user.ID{}, op, oid.ID{})
|
||||
}
|
||||
}
|
||||
|
||||
return cnrID, &cnr, nil
|
||||
}
|
||||
|
||||
func TestBasic(t *testing.T) {
|
||||
hc, err := prepareHandlerContext()
|
||||
require.NoError(t, err)
|
||||
|
||||
bktName := "bucket"
|
||||
cnrID, cnr, err := hc.prepareContainer(bktName, acl.PublicRWExtended)
|
||||
require.NoError(t, err)
|
||||
hc.frostfs.SetContainer(cnrID, cnr)
|
||||
|
||||
ctx := context.Background()
|
||||
ctx = middleware.SetNamespace(ctx, "")
|
||||
|
||||
content := "hello"
|
||||
r, err := prepareUploadRequest(ctx, cnrID.EncodeToString(), content)
|
||||
require.NoError(t, err)
|
||||
|
||||
hc.Handler().Upload(r)
|
||||
require.Equal(t, r.Response.StatusCode(), http.StatusOK)
|
||||
|
||||
var putRes putResponse
|
||||
err = json.Unmarshal(r.Response.Body(), &putRes)
|
||||
require.NoError(t, err)
|
||||
|
||||
obj := hc.frostfs.objects[putRes.ContainerID+"/"+putRes.ObjectID]
|
||||
attr := object.NewAttribute()
|
||||
attr.SetKey(object.AttributeFilePath)
|
||||
attr.SetValue(objFileName)
|
||||
obj.SetAttributes(append(obj.Attributes(), *attr)...)
|
||||
|
||||
t.Run("get", func(t *testing.T) {
|
||||
r = prepareGetRequest(ctx, cnrID.EncodeToString(), putRes.ObjectID)
|
||||
hc.Handler().DownloadByAddressOrBucketName(r)
|
||||
require.Equal(t, content, string(r.Response.Body()))
|
||||
})
|
||||
|
||||
t.Run("head", func(t *testing.T) {
|
||||
r = prepareGetRequest(ctx, cnrID.EncodeToString(), putRes.ObjectID)
|
||||
hc.Handler().HeadByAddressOrBucketName(r)
|
||||
require.Equal(t, putRes.ObjectID, string(r.Response.Header.Peek(hdrObjectID)))
|
||||
require.Equal(t, putRes.ContainerID, string(r.Response.Header.Peek(hdrContainerID)))
|
||||
})
|
||||
|
||||
t.Run("get by attribute", func(t *testing.T) {
|
||||
r = prepareGetByAttributeRequest(ctx, bktName, keyAttr, valAttr)
|
||||
hc.Handler().DownloadByAttribute(r)
|
||||
require.Equal(t, content, string(r.Response.Body()))
|
||||
})
|
||||
|
||||
t.Run("head by attribute", func(t *testing.T) {
|
||||
r = prepareGetByAttributeRequest(ctx, bktName, keyAttr, valAttr)
|
||||
hc.Handler().HeadByAttribute(r)
|
||||
require.Equal(t, putRes.ObjectID, string(r.Response.Header.Peek(hdrObjectID)))
|
||||
require.Equal(t, putRes.ContainerID, string(r.Response.Header.Peek(hdrContainerID)))
|
||||
})
|
||||
|
||||
t.Run("zip", func(t *testing.T) {
|
||||
r = prepareGetZipped(ctx, bktName, "")
|
||||
hc.Handler().DownloadZipped(r)
|
||||
|
||||
readerAt := bytes.NewReader(r.Response.Body())
|
||||
zipReader, err := zip.NewReader(readerAt, int64(len(r.Response.Body())))
|
||||
require.NoError(t, err)
|
||||
require.Len(t, zipReader.File, 1)
|
||||
require.Equal(t, objFileName, zipReader.File[0].Name)
|
||||
f, err := zipReader.File[0].Open()
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
inErr := f.Close()
|
||||
require.NoError(t, inErr)
|
||||
}()
|
||||
data, err := io.ReadAll(f)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, content, string(data))
|
||||
})
|
||||
}
|
||||
|
||||
func prepareUploadRequest(ctx context.Context, bucket, content string) (*fasthttp.RequestCtx, error) {
|
||||
r := new(fasthttp.RequestCtx)
|
||||
utils.SetContextToRequest(ctx, r)
|
||||
r.SetUserValue("cid", bucket)
|
||||
return r, fillMultipartBody(r, content)
|
||||
}
|
||||
|
||||
func prepareGetRequest(ctx context.Context, bucket, objID string) *fasthttp.RequestCtx {
|
||||
r := new(fasthttp.RequestCtx)
|
||||
utils.SetContextToRequest(ctx, r)
|
||||
r.SetUserValue("cid", bucket)
|
||||
r.SetUserValue("oid", objID)
|
||||
return r
|
||||
}
|
||||
|
||||
func prepareGetByAttributeRequest(ctx context.Context, bucket, attrKey, attrVal string) *fasthttp.RequestCtx {
|
||||
r := new(fasthttp.RequestCtx)
|
||||
utils.SetContextToRequest(ctx, r)
|
||||
r.SetUserValue("cid", bucket)
|
||||
r.SetUserValue("attr_key", attrKey)
|
||||
r.SetUserValue("attr_val", attrVal)
|
||||
return r
|
||||
}
|
||||
|
||||
func prepareGetZipped(ctx context.Context, bucket, prefix string) *fasthttp.RequestCtx {
|
||||
r := new(fasthttp.RequestCtx)
|
||||
utils.SetContextToRequest(ctx, r)
|
||||
r.SetUserValue("cid", bucket)
|
||||
r.SetUserValue("prefix", prefix)
|
||||
return r
|
||||
}
|
||||
|
||||
const (
|
||||
keyAttr = "User-Attribute"
|
||||
valAttr = "user value"
|
||||
objFileName = "newFile.txt"
|
||||
)
|
||||
|
||||
func fillMultipartBody(r *fasthttp.RequestCtx, content string) error {
|
||||
attributes := map[string]string{
|
||||
object.AttributeFileName: objFileName,
|
||||
keyAttr: valAttr,
|
||||
}
|
||||
|
||||
var buff bytes.Buffer
|
||||
w := multipart.NewWriter(&buff)
|
||||
fw, err := w.CreateFormFile("file", attributes[object.AttributeFileName])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if _, err = io.Copy(fw, bytes.NewBufferString(content)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = w.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.Request.SetBodyStream(&buff, buff.Len())
|
||||
r.Request.Header.Set("Content-Type", w.FormDataContentType())
|
||||
r.Request.Header.Set("X-Attribute-"+keyAttr, valAttr)
|
||||
|
||||
return nil
|
||||
}
|
|
@ -11,7 +11,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -30,21 +29,23 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
|
|||
|
||||
btoken := bearerToken(ctx)
|
||||
|
||||
var prm pool.PrmObjectHead
|
||||
prm.SetAddress(objectAddress)
|
||||
if btoken != nil {
|
||||
prm.UseBearer(*btoken)
|
||||
prm := PrmObjectRead{
|
||||
PrmAuth: PrmAuth{
|
||||
BearerToken: btoken,
|
||||
},
|
||||
Address: objectAddress,
|
||||
WithHeader: true,
|
||||
}
|
||||
|
||||
obj, err := h.pool.HeadObject(ctx, prm)
|
||||
obj, err := h.frostfs.ReadObject(ctx, prm)
|
||||
if err != nil {
|
||||
req.handleFrostFSErr(err, start)
|
||||
return
|
||||
}
|
||||
|
||||
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.PayloadSize(), 10))
|
||||
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.Head.PayloadSize(), 10))
|
||||
var contentType string
|
||||
for _, attr := range obj.Attributes() {
|
||||
for _, attr := range obj.Head.Attributes() {
|
||||
key := attr.Key()
|
||||
val := attr.Value()
|
||||
if !isValidToken(key) || !isValidValue(val) {
|
||||
|
@ -70,22 +71,24 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
|
|||
}
|
||||
}
|
||||
|
||||
idsToResponse(&req.Response, &obj)
|
||||
idsToResponse(&req.Response, obj.Head)
|
||||
|
||||
if len(contentType) == 0 {
|
||||
contentType, _, err = readContentType(obj.PayloadSize(), func(sz uint64) (io.Reader, error) {
|
||||
var prmRange pool.PrmObjectRange
|
||||
prmRange.SetAddress(objectAddress)
|
||||
prmRange.SetLength(sz)
|
||||
if btoken != nil {
|
||||
prmRange.UseBearer(*btoken)
|
||||
contentType, _, err = readContentType(obj.Head.PayloadSize(), func(sz uint64) (io.Reader, error) {
|
||||
prmRange := PrmObjectRead{
|
||||
PrmAuth: PrmAuth{
|
||||
BearerToken: btoken,
|
||||
},
|
||||
Address: objectAddress,
|
||||
WithPayload: true,
|
||||
PayloadRange: [2]uint64{0, sz},
|
||||
}
|
||||
|
||||
resObj, err := h.pool.ObjectRange(ctx, prmRange)
|
||||
resObj, err := h.frostfs.ReadObject(ctx, prmRange)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &resObj, nil
|
||||
return resObj.Payload, nil
|
||||
})
|
||||
if err != nil && err != io.EOF {
|
||||
req.handleFrostFSErr(err, start)
|
||||
|
|
|
@ -14,7 +14,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -56,13 +55,16 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
|||
filename string
|
||||
)
|
||||
|
||||
var prm pool.PrmObjectGet
|
||||
prm.SetAddress(objectAddress)
|
||||
if btoken := bearerToken(ctx); btoken != nil {
|
||||
prm.UseBearer(*btoken)
|
||||
prm := PrmObjectRead{
|
||||
PrmAuth: PrmAuth{
|
||||
BearerToken: bearerToken(ctx),
|
||||
},
|
||||
Address: objectAddress,
|
||||
WithHeader: true,
|
||||
WithPayload: true,
|
||||
}
|
||||
|
||||
rObj, err := h.pool.GetObject(ctx, prm)
|
||||
rObj, err := h.frostfs.ReadObject(ctx, prm)
|
||||
if err != nil {
|
||||
req.handleFrostFSErr(err, start)
|
||||
return
|
||||
|
@ -74,11 +76,11 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
|||
dis = "attachment"
|
||||
}
|
||||
|
||||
payloadSize := rObj.Header.PayloadSize()
|
||||
payloadSize := rObj.Head.PayloadSize()
|
||||
|
||||
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
||||
var contentType string
|
||||
for _, attr := range rObj.Header.Attributes() {
|
||||
for _, attr := range rObj.Head.Attributes() {
|
||||
key := attr.Key()
|
||||
val := attr.Value()
|
||||
if !isValidToken(key) || !isValidValue(val) {
|
||||
|
@ -107,7 +109,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
|||
}
|
||||
}
|
||||
|
||||
idsToResponse(&req.Response, &rObj.Header)
|
||||
idsToResponse(&req.Response, rObj.Head)
|
||||
|
||||
if len(contentType) == 0 {
|
||||
// determine the Content-Type from the payload head
|
||||
|
|
|
@ -15,7 +15,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -98,7 +97,7 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
|
|||
}
|
||||
}
|
||||
|
||||
if err = utils.PrepareExpirationHeader(req, h.pool, filtered, now); err != nil {
|
||||
if err = utils.PrepareExpirationHeader(req, h.frostfs, filtered, now); err != nil {
|
||||
log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err))
|
||||
response.Error(req, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
|
@ -132,19 +131,18 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
|
|||
obj.SetOwnerID(*h.ownerID)
|
||||
obj.SetAttributes(attributes...)
|
||||
|
||||
var prm pool.PrmObjectPut
|
||||
prm.SetHeader(*obj)
|
||||
prm.SetPayload(file)
|
||||
prm.SetClientCut(h.config.ClientCut())
|
||||
prm.SetBufferMaxSize(h.config.BufferMaxSizeForPut())
|
||||
prm.WithoutHomomorphicHash(bktInfo.HomomorphicHashDisabled)
|
||||
|
||||
bt := h.fetchBearerToken(ctx)
|
||||
if bt != nil {
|
||||
prm.UseBearer(*bt)
|
||||
prm := PrmObjectCreate{
|
||||
PrmAuth: PrmAuth{
|
||||
BearerToken: h.fetchBearerToken(ctx),
|
||||
},
|
||||
Object: obj,
|
||||
Payload: file,
|
||||
ClientCut: h.config.ClientCut(),
|
||||
WithoutHomomorphicHash: bktInfo.HomomorphicHashDisabled,
|
||||
BufferMaxSize: h.config.BufferMaxSizeForPut(),
|
||||
}
|
||||
|
||||
if idObj, err = h.pool.PutObject(ctx, prm); err != nil {
|
||||
if idObj, err = h.frostfs.CreateObject(ctx, prm); err != nil {
|
||||
h.handlePutFrostFSErr(req, err)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -11,10 +11,18 @@ import (
|
|||
"time"
|
||||
"unicode"
|
||||
"unicode/utf8"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
)
|
||||
|
||||
type EpochDurations struct {
|
||||
CurrentEpoch uint64
|
||||
MsPerBlock int64
|
||||
BlockPerEpoch uint64
|
||||
}
|
||||
|
||||
type EpochInfoFetcher interface {
|
||||
GetEpochDurations(context.Context) (*EpochDurations, error)
|
||||
}
|
||||
|
||||
const (
|
||||
UserAttributeHeaderPrefix = "X-Attribute-"
|
||||
)
|
||||
|
@ -151,7 +159,7 @@ func title(str string) string {
|
|||
return string(r0) + str[size:]
|
||||
}
|
||||
|
||||
func PrepareExpirationHeader(ctx context.Context, p *pool.Pool, headers map[string]string, now time.Time) error {
|
||||
func PrepareExpirationHeader(ctx context.Context, epochFetcher EpochInfoFetcher, headers map[string]string, now time.Time) error {
|
||||
formatsNum := 0
|
||||
index := -1
|
||||
for i, transformer := range transformers {
|
||||
|
@ -165,7 +173,7 @@ func PrepareExpirationHeader(ctx context.Context, p *pool.Pool, headers map[stri
|
|||
case 0:
|
||||
return nil
|
||||
case 1:
|
||||
epochDuration, err := GetEpochDurations(ctx, p)
|
||||
epochDuration, err := epochFetcher.GetEpochDurations(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("couldn't get epoch durations from network info: %w", err)
|
||||
}
|
||||
|
|
|
@ -1,17 +0,0 @@
|
|||
package utils
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type AppParams struct {
|
||||
Logger *zap.Logger
|
||||
Pool *pool.Pool
|
||||
Owner *user.ID
|
||||
Resolver *resolver.ContainerResolver
|
||||
Cache *cache.BucketCache
|
||||
}
|
|
@ -2,36 +2,10 @@ package utils
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
"github.com/valyala/fasthttp"
|
||||
)
|
||||
|
||||
type EpochDurations struct {
|
||||
CurrentEpoch uint64
|
||||
MsPerBlock int64
|
||||
BlockPerEpoch uint64
|
||||
}
|
||||
|
||||
func GetEpochDurations(ctx context.Context, p *pool.Pool) (*EpochDurations, error) {
|
||||
networkInfo, err := p.NetworkInfo(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res := &EpochDurations{
|
||||
CurrentEpoch: networkInfo.CurrentEpoch(),
|
||||
MsPerBlock: networkInfo.MsPerBlock(),
|
||||
BlockPerEpoch: networkInfo.EpochDuration(),
|
||||
}
|
||||
|
||||
if res.BlockPerEpoch == 0 {
|
||||
return nil, fmt.Errorf("EpochDuration is empty")
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// SetContextToRequest adds new context to fasthttp request.
|
||||
func SetContextToRequest(ctx context.Context, c *fasthttp.RequestCtx) {
|
||||
c.SetUserValue("context", ctx)
|
||||
|
|
Loading…
Reference in a new issue