feature/add_mocks #117

Merged
alexvanin merged 2 commits from dkirillov/frostfs-http-gw:feature/add_mocks into master 2024-09-04 19:50:57 +00:00
13 changed files with 1006 additions and 114 deletions

View file

@ -16,6 +16,7 @@ import (
v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/services"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
@ -600,10 +601,10 @@ func (a *app) reqNamespace(h fasthttp.RequestHandler) fasthttp.RequestHandler {
}
}
func (a *app) AppParams() *utils.AppParams {
return &utils.AppParams{
func (a *app) AppParams() *handler.AppParams {
return &handler.AppParams{
Logger: a.log,
Pool: a.pool,
FrostFS: frostfs.NewFrostFS(a.pool),
Owner: a.owner,
Resolver: a.resolver,
Cache: cache.NewBucketCache(getCacheOptions(a.cfg, a.log)),

View file

@ -506,7 +506,7 @@ func createContainer(ctx context.Context, t *testing.T, clientPool *pool.Pool, o
func putObject(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, CID cid.ID, content string, attributes map[string]string) oid.ID {
obj := object.New()
obj.SetContainerID(CID)
obj.SetOwnerID(&ownerID)
obj.SetOwnerID(ownerID)
var attrs []object.Attribute
for key, val := range attributes {

260
internal/frostfs/frostfs.go Normal file
View file

@ -0,0 +1,260 @@
package frostfs
import (
"context"
"errors"
"fmt"
"io"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// FrostFS represents virtual connection to the FrostFS network.
// It is used to provide an interface to dependent packages
// which work with FrostFS.
type FrostFS struct {
pool *pool.Pool
}
// NewFrostFS creates new FrostFS using provided pool.Pool.
func NewFrostFS(p *pool.Pool) *FrostFS {
return &FrostFS{
pool: p,
}
}
// Container implements frostfs.FrostFS interface method.
func (x *FrostFS) Container(ctx context.Context, layerPrm handler.PrmContainer) (*container.Container, error) {
prm := pool.PrmContainerGet{
ContainerID: layerPrm.ContainerID,
}
res, err := x.pool.GetContainer(ctx, prm)
if err != nil {
return nil, handleObjectError("read container via connection pool", err)
}
return &res, nil
}
// CreateObject implements frostfs.FrostFS interface method.
func (x *FrostFS) CreateObject(ctx context.Context, prm handler.PrmObjectCreate) (oid.ID, error) {
var prmPut pool.PrmObjectPut
prmPut.SetHeader(*prm.Object)
prmPut.SetPayload(prm.Payload)
prmPut.SetClientCut(prm.ClientCut)
prmPut.WithoutHomomorphicHash(prm.WithoutHomomorphicHash)
prmPut.SetBufferMaxSize(prm.BufferMaxSize)
if prm.BearerToken != nil {
prmPut.UseBearer(*prm.BearerToken)
}
idObj, err := x.pool.PutObject(ctx, prmPut)
return idObj, handleObjectError("save object via connection pool", err)
}
// wraps io.ReadCloser and transforms Read errors related to access violation
// to frostfs.ErrAccessDenied.
type payloadReader struct {
io.ReadCloser
}
func (x payloadReader) Read(p []byte) (int, error) {
n, err := x.ReadCloser.Read(p)
if err != nil && errors.Is(err, io.EOF) {
return n, err
}
return n, handleObjectError("read payload", err)
}
// ReadObject implements frostfs.FrostFS interface method.
func (x *FrostFS) ReadObject(ctx context.Context, prm handler.PrmObjectRead) (*handler.ObjectPart, error) {
var prmGet pool.PrmObjectGet
prmGet.SetAddress(prm.Address)
if prm.BearerToken != nil {
prmGet.UseBearer(*prm.BearerToken)
}
if prm.WithHeader {
if prm.WithPayload {
res, err := x.pool.GetObject(ctx, prmGet)
if err != nil {
return nil, handleObjectError("init full object reading via connection pool", err)
}
defer res.Payload.Close()
payload, err := io.ReadAll(res.Payload)
if err != nil {
return nil, handleObjectError("read full object payload", err)
}
res.Header.SetPayload(payload)
return &handler.ObjectPart{
Head: &res.Header,
}, nil
}
var prmHead pool.PrmObjectHead
prmHead.SetAddress(prm.Address)
if prm.BearerToken != nil {
prmHead.UseBearer(*prm.BearerToken)
}
hdr, err := x.pool.HeadObject(ctx, prmHead)
if err != nil {
return nil, handleObjectError("read object header via connection pool", err)
}
return &handler.ObjectPart{
Head: &hdr,
}, nil
} else if prm.PayloadRange[0]+prm.PayloadRange[1] == 0 {
res, err := x.pool.GetObject(ctx, prmGet)
if err != nil {
return nil, handleObjectError("init full payload range reading via connection pool", err)
}
return &handler.ObjectPart{
Payload: res.Payload,
}, nil
}
var prmRange pool.PrmObjectRange
prmRange.SetAddress(prm.Address)
prmRange.SetOffset(prm.PayloadRange[0])
prmRange.SetLength(prm.PayloadRange[1])
if prm.BearerToken != nil {
prmRange.UseBearer(*prm.BearerToken)
}
res, err := x.pool.ObjectRange(ctx, prmRange)
if err != nil {
return nil, handleObjectError("init payload range reading via connection pool", err)
}
return &handler.ObjectPart{
Payload: payloadReader{&res},
}, nil
}
// SearchObjects implements frostfs.FrostFS interface method.
func (x *FrostFS) SearchObjects(ctx context.Context, prm handler.PrmObjectSearch) (handler.ResObjectSearch, error) {
var prmSearch pool.PrmObjectSearch
prmSearch.SetContainerID(prm.Container)
prmSearch.SetFilters(prm.Filters)
if prm.BearerToken != nil {
prmSearch.UseBearer(*prm.BearerToken)
}
res, err := x.pool.SearchObjects(ctx, prmSearch)
if err != nil {
return nil, handleObjectError("init object search via connection pool", err)
}
return &res, nil
}
// GetEpochDurations implements frostfs.FrostFS interface method.
func (x *FrostFS) GetEpochDurations(ctx context.Context) (*utils.EpochDurations, error) {
networkInfo, err := x.pool.NetworkInfo(ctx)
if err != nil {
return nil, err
}
res := &utils.EpochDurations{
CurrentEpoch: networkInfo.CurrentEpoch(),
MsPerBlock: networkInfo.MsPerBlock(),
BlockPerEpoch: networkInfo.EpochDuration(),
}
if res.BlockPerEpoch == 0 {
return nil, fmt.Errorf("EpochDuration is empty")
}
return res, nil
}
// ResolverFrostFS represents virtual connection to the FrostFS network.
// It implements resolver.FrostFS.
type ResolverFrostFS struct {
pool *pool.Pool
}
// NewResolverFrostFS creates new ResolverFrostFS using provided pool.Pool.
func NewResolverFrostFS(p *pool.Pool) *ResolverFrostFS {
return &ResolverFrostFS{pool: p}
}
// SystemDNS implements resolver.FrostFS interface method.
func (x *ResolverFrostFS) SystemDNS(ctx context.Context) (string, error) {
networkInfo, err := x.pool.NetworkInfo(ctx)
if err != nil {
return "", handleObjectError("read network info via client", err)
}
domain := networkInfo.RawNetworkParameter("SystemDNS")
if domain == nil {
return "", errors.New("system DNS parameter not found or empty")
}
return string(domain), nil
}
func handleObjectError(msg string, err error) error {
if err == nil {
return nil
}
if reason, ok := IsErrObjectAccessDenied(err); ok {
return fmt.Errorf("%s: %w: %s", msg, handler.ErrAccessDenied, reason)
}
if IsTimeoutError(err) {
return fmt.Errorf("%s: %w: %s", msg, handler.ErrGatewayTimeout, err.Error())
}
return fmt.Errorf("%s: %w", msg, err)
}
func UnwrapErr(err error) error {
unwrappedErr := errors.Unwrap(err)
for unwrappedErr != nil {
err = unwrappedErr
unwrappedErr = errors.Unwrap(err)
}
return err
}
func IsErrObjectAccessDenied(err error) (string, bool) {
err = UnwrapErr(err)
switch err := err.(type) {
default:
return "", false
case *apistatus.ObjectAccessDenied:
return err.Reason(), true
}
}
func IsTimeoutError(err error) bool {
if strings.Contains(err.Error(), "timeout") ||
errors.Is(err, context.DeadlineExceeded) {
return true
}
return status.Code(UnwrapErr(err)) == codes.DeadlineExceeded
}

View file

@ -17,7 +17,6 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
)
@ -46,19 +45,20 @@ func (h *Handler) DownloadByAttribute(c *fasthttp.RequestCtx) {
h.byAttribute(c, h.receiveFile)
}
func (h *Handler) search(ctx context.Context, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) {
func (h *Handler) search(ctx context.Context, cnrID *cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) {
filters := object.NewSearchFilters()
filters.AddRootFilter()
filters.AddFilter(key, val, op)
var prm pool.PrmObjectSearch
prm.SetContainerID(*cid)
prm.SetFilters(filters)
if btoken := bearerToken(ctx); btoken != nil {
prm.UseBearer(*btoken)
prm := PrmObjectSearch{
PrmAuth: PrmAuth{
BearerToken: bearerToken(ctx),
},
Container: *cnrID,
Filters: filters,
}
return h.pool.SearchObjects(ctx, prm)
return h.frostfs.SearchObjects(ctx, prm)
}
func (h *Handler) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer, error) {
@ -153,18 +153,21 @@ func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
}
func (h *Handler) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error {
var prm pool.PrmObjectGet
prm.SetAddress(addr)
if btoken != nil {
prm.UseBearer(*btoken)
prm := PrmObjectRead{
PrmAuth: PrmAuth{
BearerToken: btoken,
},
Address: addr,
WithHeader: true,
WithPayload: true,
}
resGet, err := h.pool.GetObject(ctx, prm)
resGet, err := h.frostfs.ReadObject(ctx, prm)
if err != nil {
return fmt.Errorf("get FrostFS object: %v", err)
}
objWriter, err := h.addObjectToZip(zipWriter, &resGet.Header)
objWriter, err := h.addObjectToZip(zipWriter, resGet.Head)
if err != nil {
return fmt.Errorf("zip create header: %v", err)
}

View file

@ -0,0 +1,260 @@
package handler
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha256"
"fmt"
"io"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
)
type TestFrostFS struct {
objects map[string]*object.Object
containers map[string]*container.Container
accessList map[string]bool
key *keys.PrivateKey
}
func NewTestFrostFS(key *keys.PrivateKey) *TestFrostFS {
return &TestFrostFS{
objects: make(map[string]*object.Object),
containers: make(map[string]*container.Container),
accessList: make(map[string]bool),
key: key,
}
}
func (t *TestFrostFS) ContainerID(name string) (*cid.ID, error) {
for id, cnr := range t.containers {
if container.Name(*cnr) == name {
var cnrID cid.ID
return &cnrID, cnrID.DecodeString(id)
}
}
return nil, fmt.Errorf("not found")
}
func (t *TestFrostFS) SetContainer(cnrID cid.ID, cnr *container.Container) {
t.containers[cnrID.EncodeToString()] = cnr
}
// AllowUserOperation grants access to object operations.
// Empty userID and objID means any user and object respectively.
func (t *TestFrostFS) AllowUserOperation(cnrID cid.ID, userID user.ID, op acl.Op, objID oid.ID) {
t.accessList[fmt.Sprintf("%s/%s/%s/%s", cnrID, userID, op, objID)] = true
}
func (t *TestFrostFS) Container(_ context.Context, prm PrmContainer) (*container.Container, error) {
for k, v := range t.containers {
if k == prm.ContainerID.EncodeToString() {
return v, nil
}
}
return nil, fmt.Errorf("container not found %s", prm.ContainerID)
}
func (t *TestFrostFS) requestOwner(btoken *bearer.Token) user.ID {
if btoken != nil {
return bearer.ResolveIssuer(*btoken)
}
var owner user.ID
user.IDFromKey(&owner, t.key.PrivateKey.PublicKey)
return owner
}
func (t *TestFrostFS) ReadObject(_ context.Context, prm PrmObjectRead) (*ObjectPart, error) {
sAddr := prm.Address.EncodeToString()
if obj, ok := t.objects[sAddr]; ok {
owner := t.requestOwner(prm.BearerToken)
if !t.isAllowed(prm.Address.Container(), owner, acl.OpObjectGet, prm.Address.Object()) {
return nil, ErrAccessDenied
}
payload := obj.Payload()
if prm.PayloadRange[0]+prm.PayloadRange[1] > 0 {
off := prm.PayloadRange[0]
payload = payload[off : off+prm.PayloadRange[1]]
}
return &ObjectPart{
Head: obj,
Payload: io.NopCloser(bytes.NewReader(payload)),
}, nil
}
return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, prm.Address)
}
func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) {
b := make([]byte, 32)
if _, err := io.ReadFull(rand.Reader, b); err != nil {
return oid.ID{}, err
}
var id oid.ID
id.SetSHA256(sha256.Sum256(b))
prm.Object.SetID(id)
attrs := prm.Object.Attributes()
if prm.ClientCut {
a := object.NewAttribute()
a.SetKey("s3-client-cut")
a.SetValue("true")
attrs = append(attrs, *a)
}
prm.Object.SetAttributes(attrs...)
if prm.Payload != nil {
all, err := io.ReadAll(prm.Payload)
if err != nil {
return oid.ID{}, err
}
prm.Object.SetPayload(all)
prm.Object.SetPayloadSize(uint64(len(all)))
var hash checksum.Checksum
checksum.Calculate(&hash, checksum.SHA256, all)
prm.Object.SetPayloadChecksum(hash)
}
cnrID, _ := prm.Object.ContainerID()
objID, _ := prm.Object.ID()
owner := t.requestOwner(prm.BearerToken)
if !t.isAllowed(cnrID, owner, acl.OpObjectPut, objID) {
return oid.ID{}, ErrAccessDenied
}
addr := newAddress(cnrID, objID)
t.objects[addr.EncodeToString()] = prm.Object
return objID, nil
}
type resObjectSearchMock struct {
res []oid.ID
}
func (r *resObjectSearchMock) Read(buf []oid.ID) (int, error) {
for i := range buf {
if i > len(r.res)-1 {
return len(r.res), io.EOF
}
buf[i] = r.res[i]
}
r.res = r.res[len(buf):]
return len(buf), nil
}
func (r *resObjectSearchMock) Iterate(f func(oid.ID) bool) error {
for _, id := range r.res {
if f(id) {
return nil
}
}
return nil
}
func (r *resObjectSearchMock) Close() {}
func (t *TestFrostFS) SearchObjects(_ context.Context, prm PrmObjectSearch) (ResObjectSearch, error) {
if !t.isAllowed(prm.Container, t.requestOwner(prm.BearerToken), acl.OpObjectSearch, oid.ID{}) {
return nil, ErrAccessDenied
}
cidStr := prm.Container.EncodeToString()
var res []oid.ID
if len(prm.Filters) == 1 { // match root filter
for k, v := range t.objects {
if strings.Contains(k, cidStr) {
id, _ := v.ID()
res = append(res, id)
}
}
return &resObjectSearchMock{res: res}, nil
}
filter := prm.Filters[1]
if len(prm.Filters) != 2 ||
filter.Operation() != object.MatchCommonPrefix && filter.Operation() != object.MatchStringEqual {
return nil, fmt.Errorf("usupported filters")
}
for k, v := range t.objects {
if strings.Contains(k, cidStr) && isMatched(v.Attributes(), filter) {
id, _ := v.ID()
res = append(res, id)
}
}
return &resObjectSearchMock{res: res}, nil
}
func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool {
for _, attr := range attributes {
if attr.Key() == filter.Header() {
switch filter.Operation() {
case object.MatchStringEqual:
return attr.Value() == filter.Value()
case object.MatchCommonPrefix:
return strings.HasPrefix(attr.Value(), filter.Value())
default:
return false
}
}
}
return false
}
func (t *TestFrostFS) GetEpochDurations(context.Context) (*utils.EpochDurations, error) {
return &utils.EpochDurations{
CurrentEpoch: 10,
MsPerBlock: 1000,
BlockPerEpoch: 100,
}, nil
}
func (t *TestFrostFS) isAllowed(cnrID cid.ID, userID user.ID, op acl.Op, objID oid.ID) bool {
keysToCheck := []string{
fmt.Sprintf("%s/%s/%s/%s", cnrID, userID, op, objID),
fmt.Sprintf("%s/%s/%s/%s", cnrID, userID, op, oid.ID{}),
fmt.Sprintf("%s/%s/%s/%s", cnrID, user.ID{}, op, objID),
fmt.Sprintf("%s/%s/%s/%s", cnrID, user.ID{}, op, oid.ID{}),
}
for _, key := range keysToCheck {
if t.accessList[key] {
return true
}
}
return false
}
func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
var addr oid.Address
addr.SetContainer(cnr)
addr.SetObject(obj)
return addr
}

View file

@ -12,16 +12,15 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
@ -35,20 +34,125 @@ type Config interface {
NamespaceHeader() string
}
// PrmContainer groups parameters of FrostFS.Container operation.
type PrmContainer struct {
// Container identifier.
ContainerID cid.ID
}
// PrmAuth groups authentication parameters for the FrostFS operation.
type PrmAuth struct {
// Bearer token to be used for the operation. Overlaps PrivateKey. Optional.
BearerToken *bearer.Token
}
// PrmObjectRead groups parameters of FrostFS.ReadObject operation.
type PrmObjectRead struct {
// Authentication parameters.
PrmAuth
// Address to read the object header from.
Address oid.Address
// Flag to read object header.
WithHeader bool
// Flag to read object payload. False overlaps payload range.
WithPayload bool
// Offset-length range of the object payload to be read.
PayloadRange [2]uint64
}
// ObjectPart represents partially read FrostFS object.
type ObjectPart struct {
// Object header with optional in-memory payload part.
Head *object.Object
// Object payload part encapsulated in io.Reader primitive.
// Returns ErrAccessDenied on read access violation.
Payload io.ReadCloser
}
// PrmObjectCreate groups parameters of FrostFS.CreateObject operation.
type PrmObjectCreate struct {
// Authentication parameters.
PrmAuth
Object *object.Object
// Object payload encapsulated in io.Reader primitive.
Payload io.Reader
// Enables client side object preparing.
ClientCut bool
// Disables using Tillich-Zémor hash for payload.
WithoutHomomorphicHash bool
// Sets max buffer size to read payload.
BufferMaxSize uint64
}
// PrmObjectSearch groups parameters of FrostFS.sear SearchObjects operation.
type PrmObjectSearch struct {
// Authentication parameters.
PrmAuth
// Container to select the objects from.
Container cid.ID
Filters object.SearchFilters
}
type ResObjectSearch interface {
Read(buf []oid.ID) (int, error)
Iterate(f func(oid.ID) bool) error
Close()
}
var (
// ErrAccessDenied is returned from FrostFS in case of access violation.
ErrAccessDenied = errors.New("access denied")
// ErrGatewayTimeout is returned from FrostFS in case of timeout, deadline exceeded etc.
ErrGatewayTimeout = errors.New("gateway timeout")
)
// FrostFS represents virtual connection to FrostFS network.
type FrostFS interface {
Container(context.Context, PrmContainer) (*container.Container, error)
ReadObject(context.Context, PrmObjectRead) (*ObjectPart, error)
CreateObject(context.Context, PrmObjectCreate) (oid.ID, error)
SearchObjects(context.Context, PrmObjectSearch) (ResObjectSearch, error)
utils.EpochInfoFetcher
}
type ContainerResolver interface {
Resolve(ctx context.Context, name string) (*cid.ID, error)
}
type Handler struct {
log *zap.Logger
pool *pool.Pool
frostfs FrostFS
ownerID *user.ID
config Config
containerResolver *resolver.ContainerResolver
containerResolver ContainerResolver
tree *tree.Tree
cache *cache.BucketCache
}
func New(params *utils.AppParams, config Config, tree *tree.Tree) *Handler {
type AppParams struct {
Logger *zap.Logger
FrostFS FrostFS
Owner *user.ID
Resolver ContainerResolver
Cache *cache.BucketCache
}
func New(params *AppParams, config Config, tree *tree.Tree) *Handler {
return &Handler{
log: params.Logger,
pool: params.Pool,
frostfs: params.FrostFS,
ownerID: params.Owner,
config: config,
containerResolver: params.Resolver,
@ -235,8 +339,8 @@ func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *
}
func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.BucketInfo, error) {
prm := pool.PrmContainerGet{ContainerID: cnrID}
res, err := h.pool.GetContainer(ctx, prm)
prm := PrmContainer{ContainerID: cnrID}
res, err := h.frostfs.Container(ctx, prm)
if err != nil {
return nil, fmt.Errorf("get frostfs container '%s': %w", cnrID.String(), err)
}
@ -246,12 +350,12 @@ func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.Bucket
Name: cnrID.EncodeToString(),
}
if domain := container.ReadDomain(res); domain.Name() != "" {
if domain := container.ReadDomain(*res); domain.Name() != "" {
bktInfo.Name = domain.Name()
bktInfo.Zone = domain.Zone()
}
bktInfo.HomomorphicHashDisabled = container.IsHomomorphicHashingDisabled(res)
bktInfo.HomomorphicHashDisabled = container.IsHomomorphicHashingDisabled(*res)
return bktInfo, err
}

View file

@ -0,0 +1,296 @@
package handler
import (
"archive/zip"
"bytes"
"context"
"encoding/json"
"io"
"mime/multipart"
"net/http"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/stretchr/testify/require"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
)
type treeClientMock struct {
}
func (t *treeClientMock) GetNodes(context.Context, *tree.GetNodesParams) ([]tree.NodeResponse, error) {
return nil, nil
}
type configMock struct {
}
func (c *configMock) DefaultTimestamp() bool {
return false
}
func (c *configMock) ZipCompression() bool {
return false
}
func (c *configMock) ClientCut() bool {
return false
}
func (c *configMock) BufferMaxSizeForPut() uint64 {
return 0
}
func (c *configMock) NamespaceHeader() string {
return ""
}
type handlerContext struct {
key *keys.PrivateKey
owner user.ID
h *Handler
frostfs *TestFrostFS
tree *treeClientMock
cfg *configMock
}
func (hc *handlerContext) Handler() *Handler {
return hc.h
}
func prepareHandlerContext() (*handlerContext, error) {
logger, err := zap.NewDevelopment()
if err != nil {
return nil, err
}
key, err := keys.NewPrivateKey()
if err != nil {
return nil, err
}
var owner user.ID
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
testFrostFS := NewTestFrostFS(key)
testResolver := &resolver.Resolver{Name: "test_resolver"}
testResolver.SetResolveFunc(func(_ context.Context, name string) (*cid.ID, error) {
return testFrostFS.ContainerID(name)
})
params := &AppParams{
Logger: logger,
FrostFS: testFrostFS,
Owner: &owner,
Resolver: testResolver,
Cache: cache.NewBucketCache(&cache.Config{
Size: 1,
Lifetime: 1,
Logger: logger,
}),
}
treeMock := &treeClientMock{}
cfgMock := &configMock{}
handler := New(params, cfgMock, tree.NewTree(treeMock))
return &handlerContext{
key: key,
owner: owner,
h: handler,
frostfs: testFrostFS,
tree: treeMock,
cfg: cfgMock,
}, nil
}
func (hc *handlerContext) prepareContainer(name string, basicACL acl.Basic) (cid.ID, *container.Container, error) {
var pp netmap.PlacementPolicy
err := pp.DecodeString("REP 1")
if err != nil {
return cid.ID{}, nil, err
}
var cnr container.Container
cnr.Init()
cnr.SetOwner(hc.owner)
cnr.SetPlacementPolicy(pp)
cnr.SetBasicACL(basicACL)
var domain container.Domain
domain.SetName(name)
container.WriteDomain(&cnr, domain)
container.SetName(&cnr, name)
container.SetCreationTime(&cnr, time.Now())
cnrID := cidtest.ID()
for op := acl.OpObjectGet; op < acl.OpObjectHash; op++ {
hc.frostfs.AllowUserOperation(cnrID, hc.owner, op, oid.ID{})
if basicACL.IsOpAllowed(op, acl.RoleOthers) {
hc.frostfs.AllowUserOperation(cnrID, user.ID{}, op, oid.ID{})
}
}
return cnrID, &cnr, nil
}
func TestBasic(t *testing.T) {
hc, err := prepareHandlerContext()
require.NoError(t, err)
bktName := "bucket"
cnrID, cnr, err := hc.prepareContainer(bktName, acl.PublicRWExtended)
require.NoError(t, err)
hc.frostfs.SetContainer(cnrID, cnr)
ctx := context.Background()
ctx = middleware.SetNamespace(ctx, "")
content := "hello"
r, err := prepareUploadRequest(ctx, cnrID.EncodeToString(), content)
require.NoError(t, err)
hc.Handler().Upload(r)
require.Equal(t, r.Response.StatusCode(), http.StatusOK)
var putRes putResponse
err = json.Unmarshal(r.Response.Body(), &putRes)
require.NoError(t, err)
obj := hc.frostfs.objects[putRes.ContainerID+"/"+putRes.ObjectID]
attr := object.NewAttribute()
attr.SetKey(object.AttributeFilePath)
attr.SetValue(objFileName)
obj.SetAttributes(append(obj.Attributes(), *attr)...)
t.Run("get", func(t *testing.T) {
r = prepareGetRequest(ctx, cnrID.EncodeToString(), putRes.ObjectID)
hc.Handler().DownloadByAddressOrBucketName(r)
require.Equal(t, content, string(r.Response.Body()))
})
t.Run("head", func(t *testing.T) {
r = prepareGetRequest(ctx, cnrID.EncodeToString(), putRes.ObjectID)
hc.Handler().HeadByAddressOrBucketName(r)
require.Equal(t, putRes.ObjectID, string(r.Response.Header.Peek(hdrObjectID)))
require.Equal(t, putRes.ContainerID, string(r.Response.Header.Peek(hdrContainerID)))
})
t.Run("get by attribute", func(t *testing.T) {
r = prepareGetByAttributeRequest(ctx, bktName, keyAttr, valAttr)
hc.Handler().DownloadByAttribute(r)
require.Equal(t, content, string(r.Response.Body()))
})
t.Run("head by attribute", func(t *testing.T) {
r = prepareGetByAttributeRequest(ctx, bktName, keyAttr, valAttr)
hc.Handler().HeadByAttribute(r)
require.Equal(t, putRes.ObjectID, string(r.Response.Header.Peek(hdrObjectID)))
require.Equal(t, putRes.ContainerID, string(r.Response.Header.Peek(hdrContainerID)))
})
t.Run("zip", func(t *testing.T) {
r = prepareGetZipped(ctx, bktName, "")
hc.Handler().DownloadZipped(r)
readerAt := bytes.NewReader(r.Response.Body())
zipReader, err := zip.NewReader(readerAt, int64(len(r.Response.Body())))
require.NoError(t, err)
require.Len(t, zipReader.File, 1)
require.Equal(t, objFileName, zipReader.File[0].Name)
f, err := zipReader.File[0].Open()
require.NoError(t, err)
defer func() {
inErr := f.Close()
require.NoError(t, inErr)
}()
data, err := io.ReadAll(f)
require.NoError(t, err)
require.Equal(t, content, string(data))
})
}
func prepareUploadRequest(ctx context.Context, bucket, content string) (*fasthttp.RequestCtx, error) {
r := new(fasthttp.RequestCtx)
utils.SetContextToRequest(ctx, r)
r.SetUserValue("cid", bucket)
return r, fillMultipartBody(r, content)
}
func prepareGetRequest(ctx context.Context, bucket, objID string) *fasthttp.RequestCtx {
r := new(fasthttp.RequestCtx)
utils.SetContextToRequest(ctx, r)
r.SetUserValue("cid", bucket)
r.SetUserValue("oid", objID)
return r
}
func prepareGetByAttributeRequest(ctx context.Context, bucket, attrKey, attrVal string) *fasthttp.RequestCtx {
r := new(fasthttp.RequestCtx)
utils.SetContextToRequest(ctx, r)
r.SetUserValue("cid", bucket)
r.SetUserValue("attr_key", attrKey)
r.SetUserValue("attr_val", attrVal)
return r
}
func prepareGetZipped(ctx context.Context, bucket, prefix string) *fasthttp.RequestCtx {
r := new(fasthttp.RequestCtx)
utils.SetContextToRequest(ctx, r)
r.SetUserValue("cid", bucket)
r.SetUserValue("prefix", prefix)
return r
}
const (
keyAttr = "User-Attribute"
valAttr = "user value"
objFileName = "newFile.txt"
)
func fillMultipartBody(r *fasthttp.RequestCtx, content string) error {
attributes := map[string]string{
object.AttributeFileName: objFileName,
keyAttr: valAttr,
}
var buff bytes.Buffer
w := multipart.NewWriter(&buff)
fw, err := w.CreateFormFile("file", attributes[object.AttributeFileName])
if err != nil {
return err
}
if _, err = io.Copy(fw, bytes.NewBufferString(content)); err != nil {
return err
}
if err = w.Close(); err != nil {
return err
}
r.Request.SetBodyStream(&buff, buff.Len())
r.Request.Header.Set("Content-Type", w.FormDataContentType())
r.Request.Header.Set("X-Attribute-"+keyAttr, valAttr)
return nil
}

View file

@ -11,7 +11,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
)
@ -30,21 +29,23 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
btoken := bearerToken(ctx)
var prm pool.PrmObjectHead
prm.SetAddress(objectAddress)
if btoken != nil {
prm.UseBearer(*btoken)
prm := PrmObjectRead{
PrmAuth: PrmAuth{
BearerToken: btoken,
},
Address: objectAddress,
WithHeader: true,
}
obj, err := h.pool.HeadObject(ctx, prm)
obj, err := h.frostfs.ReadObject(ctx, prm)
if err != nil {
req.handleFrostFSErr(err, start)
return
}
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.PayloadSize(), 10))
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.Head.PayloadSize(), 10))
var contentType string
for _, attr := range obj.Attributes() {
for _, attr := range obj.Head.Attributes() {
key := attr.Key()
val := attr.Value()
if !isValidToken(key) || !isValidValue(val) {
@ -70,22 +71,24 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
}
}
idsToResponse(&req.Response, &obj)
idsToResponse(&req.Response, obj.Head)
if len(contentType) == 0 {
contentType, _, err = readContentType(obj.PayloadSize(), func(sz uint64) (io.Reader, error) {
var prmRange pool.PrmObjectRange
prmRange.SetAddress(objectAddress)
prmRange.SetLength(sz)
if btoken != nil {
prmRange.UseBearer(*btoken)
contentType, _, err = readContentType(obj.Head.PayloadSize(), func(sz uint64) (io.Reader, error) {
prmRange := PrmObjectRead{
PrmAuth: PrmAuth{
BearerToken: btoken,
},
Address: objectAddress,
WithPayload: true,
PayloadRange: [2]uint64{0, sz},
}
resObj, err := h.pool.ObjectRange(ctx, prmRange)
resObj, err := h.frostfs.ReadObject(ctx, prmRange)
if err != nil {
return nil, err
}
return &resObj, nil
return resObj.Payload, nil
})
if err != nil && err != io.EOF {
req.handleFrostFSErr(err, start)

View file

@ -14,7 +14,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
)
@ -56,13 +55,16 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
filename string
)
var prm pool.PrmObjectGet
prm.SetAddress(objectAddress)
if btoken := bearerToken(ctx); btoken != nil {
prm.UseBearer(*btoken)
prm := PrmObjectRead{
PrmAuth: PrmAuth{
BearerToken: bearerToken(ctx),
},
Address: objectAddress,
WithHeader: true,
WithPayload: true,
}
rObj, err := h.pool.GetObject(ctx, prm)
rObj, err := h.frostfs.ReadObject(ctx, prm)
if err != nil {
req.handleFrostFSErr(err, start)
return
@ -74,11 +76,11 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
dis = "attachment"
}
payloadSize := rObj.Header.PayloadSize()
payloadSize := rObj.Head.PayloadSize()
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
var contentType string
for _, attr := range rObj.Header.Attributes() {
for _, attr := range rObj.Head.Attributes() {
key := attr.Key()
val := attr.Value()
if !isValidToken(key) || !isValidValue(val) {
@ -107,7 +109,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
}
}
idsToResponse(&req.Response, &rObj.Header)
idsToResponse(&req.Response, rObj.Head)
if len(contentType) == 0 {
// determine the Content-Type from the payload head

View file

@ -15,7 +15,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
)
@ -98,7 +97,7 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
}
}
if err = utils.PrepareExpirationHeader(req, h.pool, filtered, now); err != nil {
if err = utils.PrepareExpirationHeader(req, h.frostfs, filtered, now); err != nil {
log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err))
response.Error(req, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest)
return
@ -132,19 +131,18 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
obj.SetOwnerID(*h.ownerID)
obj.SetAttributes(attributes...)
var prm pool.PrmObjectPut
prm.SetHeader(*obj)
prm.SetPayload(file)
prm.SetClientCut(h.config.ClientCut())
prm.SetBufferMaxSize(h.config.BufferMaxSizeForPut())
prm.WithoutHomomorphicHash(bktInfo.HomomorphicHashDisabled)
bt := h.fetchBearerToken(ctx)
if bt != nil {
prm.UseBearer(*bt)
prm := PrmObjectCreate{
PrmAuth: PrmAuth{
BearerToken: h.fetchBearerToken(ctx),
},
Object: obj,
Payload: file,
ClientCut: h.config.ClientCut(),
WithoutHomomorphicHash: bktInfo.HomomorphicHashDisabled,
BufferMaxSize: h.config.BufferMaxSizeForPut(),
}
if idObj, err = h.pool.PutObject(ctx, prm); err != nil {
if idObj, err = h.frostfs.CreateObject(ctx, prm); err != nil {
h.handlePutFrostFSErr(req, err)
return
}

View file

@ -11,10 +11,18 @@ import (
"time"
"unicode"
"unicode/utf8"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
)
type EpochDurations struct {
CurrentEpoch uint64
MsPerBlock int64
BlockPerEpoch uint64
}
type EpochInfoFetcher interface {
GetEpochDurations(context.Context) (*EpochDurations, error)
}
const (
UserAttributeHeaderPrefix = "X-Attribute-"
)
@ -151,7 +159,7 @@ func title(str string) string {
return string(r0) + str[size:]
}
func PrepareExpirationHeader(ctx context.Context, p *pool.Pool, headers map[string]string, now time.Time) error {
func PrepareExpirationHeader(ctx context.Context, epochFetcher EpochInfoFetcher, headers map[string]string, now time.Time) error {
formatsNum := 0
index := -1
for i, transformer := range transformers {
@ -165,7 +173,7 @@ func PrepareExpirationHeader(ctx context.Context, p *pool.Pool, headers map[stri
case 0:
return nil
case 1:
epochDuration, err := GetEpochDurations(ctx, p)
epochDuration, err := epochFetcher.GetEpochDurations(ctx)
if err != nil {
return fmt.Errorf("couldn't get epoch durations from network info: %w", err)
}

View file

@ -1,17 +0,0 @@
package utils
import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap"
)
type AppParams struct {
Logger *zap.Logger
Pool *pool.Pool
Owner *user.ID
Resolver *resolver.ContainerResolver
Cache *cache.BucketCache
}

View file

@ -2,36 +2,10 @@ package utils
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"github.com/valyala/fasthttp"
)
type EpochDurations struct {
CurrentEpoch uint64
MsPerBlock int64
BlockPerEpoch uint64
}
func GetEpochDurations(ctx context.Context, p *pool.Pool) (*EpochDurations, error) {
networkInfo, err := p.NetworkInfo(ctx)
if err != nil {
return nil, err
}
res := &EpochDurations{
CurrentEpoch: networkInfo.CurrentEpoch(),
MsPerBlock: networkInfo.MsPerBlock(),
BlockPerEpoch: networkInfo.EpochDuration(),
}
if res.BlockPerEpoch == 0 {
return nil, fmt.Errorf("EpochDuration is empty")
}
return res, nil
}
// SetContextToRequest adds new context to fasthttp request.
func SetContextToRequest(ctx context.Context, c *fasthttp.RequestCtx) {
c.SetUserValue("context", ctx)