[#117] Add mocked handler for tests
All checks were successful
/ DCO (pull_request) Successful in 1m22s
/ Builds (1.21) (pull_request) Successful in 1m58s
/ Builds (1.22) (pull_request) Successful in 1m53s
/ Vulncheck (pull_request) Successful in 3m32s
/ Lint (pull_request) Successful in 5m1s
/ Tests (1.21) (pull_request) Successful in 2m38s
/ Tests (1.22) (pull_request) Successful in 3m4s
All checks were successful
/ DCO (pull_request) Successful in 1m22s
/ Builds (1.21) (pull_request) Successful in 1m58s
/ Builds (1.22) (pull_request) Successful in 1m53s
/ Vulncheck (pull_request) Successful in 3m32s
/ Lint (pull_request) Successful in 5m1s
/ Tests (1.21) (pull_request) Successful in 2m38s
/ Tests (1.22) (pull_request) Successful in 3m4s
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
826dd0cdbe
commit
3741e3b003
12 changed files with 1005 additions and 113 deletions
260
internal/frostfs/frostfs.go
Normal file
260
internal/frostfs/frostfs.go
Normal file
|
@ -0,0 +1,260 @@
|
|||
package frostfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// FrostFS represents virtual connection to the FrostFS network.
|
||||
// It is used to provide an interface to dependent packages
|
||||
// which work with FrostFS.
|
||||
type FrostFS struct {
|
||||
pool *pool.Pool
|
||||
}
|
||||
|
||||
// NewFrostFS creates new FrostFS using provided pool.Pool.
|
||||
func NewFrostFS(p *pool.Pool) *FrostFS {
|
||||
return &FrostFS{
|
||||
pool: p,
|
||||
}
|
||||
}
|
||||
|
||||
// Container implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) Container(ctx context.Context, layerPrm handler.PrmContainer) (*container.Container, error) {
|
||||
prm := pool.PrmContainerGet{
|
||||
ContainerID: layerPrm.ContainerID,
|
||||
}
|
||||
|
||||
res, err := x.pool.GetContainer(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("read container via connection pool", err)
|
||||
}
|
||||
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
// CreateObject implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) CreateObject(ctx context.Context, prm handler.PrmObjectCreate) (oid.ID, error) {
|
||||
var prmPut pool.PrmObjectPut
|
||||
prmPut.SetHeader(*prm.Object)
|
||||
prmPut.SetPayload(prm.Payload)
|
||||
prmPut.SetClientCut(prm.ClientCut)
|
||||
prmPut.WithoutHomomorphicHash(prm.WithoutHomomorphicHash)
|
||||
prmPut.SetBufferMaxSize(prm.BufferMaxSize)
|
||||
|
||||
if prm.BearerToken != nil {
|
||||
prmPut.UseBearer(*prm.BearerToken)
|
||||
}
|
||||
|
||||
idObj, err := x.pool.PutObject(ctx, prmPut)
|
||||
return idObj, handleObjectError("save object via connection pool", err)
|
||||
}
|
||||
|
||||
// wraps io.ReadCloser and transforms Read errors related to access violation
|
||||
// to frostfs.ErrAccessDenied.
|
||||
type payloadReader struct {
|
||||
io.ReadCloser
|
||||
}
|
||||
|
||||
func (x payloadReader) Read(p []byte) (int, error) {
|
||||
n, err := x.ReadCloser.Read(p)
|
||||
if err != nil && errors.Is(err, io.EOF) {
|
||||
return n, err
|
||||
}
|
||||
return n, handleObjectError("read payload", err)
|
||||
}
|
||||
|
||||
// ReadObject implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) ReadObject(ctx context.Context, prm handler.PrmObjectRead) (*handler.ObjectPart, error) {
|
||||
var prmGet pool.PrmObjectGet
|
||||
prmGet.SetAddress(prm.Address)
|
||||
|
||||
if prm.BearerToken != nil {
|
||||
prmGet.UseBearer(*prm.BearerToken)
|
||||
}
|
||||
|
||||
if prm.WithHeader {
|
||||
if prm.WithPayload {
|
||||
res, err := x.pool.GetObject(ctx, prmGet)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("init full object reading via connection pool", err)
|
||||
}
|
||||
|
||||
defer res.Payload.Close()
|
||||
|
||||
payload, err := io.ReadAll(res.Payload)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("read full object payload", err)
|
||||
}
|
||||
|
||||
res.Header.SetPayload(payload)
|
||||
|
||||
return &handler.ObjectPart{
|
||||
Head: &res.Header,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var prmHead pool.PrmObjectHead
|
||||
prmHead.SetAddress(prm.Address)
|
||||
|
||||
if prm.BearerToken != nil {
|
||||
prmHead.UseBearer(*prm.BearerToken)
|
||||
}
|
||||
|
||||
hdr, err := x.pool.HeadObject(ctx, prmHead)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("read object header via connection pool", err)
|
||||
}
|
||||
|
||||
return &handler.ObjectPart{
|
||||
Head: &hdr,
|
||||
}, nil
|
||||
} else if prm.PayloadRange[0]+prm.PayloadRange[1] == 0 {
|
||||
res, err := x.pool.GetObject(ctx, prmGet)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("init full payload range reading via connection pool", err)
|
||||
}
|
||||
|
||||
return &handler.ObjectPart{
|
||||
Payload: res.Payload,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var prmRange pool.PrmObjectRange
|
||||
prmRange.SetAddress(prm.Address)
|
||||
prmRange.SetOffset(prm.PayloadRange[0])
|
||||
prmRange.SetLength(prm.PayloadRange[1])
|
||||
|
||||
if prm.BearerToken != nil {
|
||||
prmRange.UseBearer(*prm.BearerToken)
|
||||
}
|
||||
|
||||
res, err := x.pool.ObjectRange(ctx, prmRange)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("init payload range reading via connection pool", err)
|
||||
}
|
||||
|
||||
return &handler.ObjectPart{
|
||||
Payload: payloadReader{&res},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SearchObjects implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) SearchObjects(ctx context.Context, prm handler.PrmObjectSearch) (handler.ResObjectSearch, error) {
|
||||
var prmSearch pool.PrmObjectSearch
|
||||
prmSearch.SetContainerID(prm.Container)
|
||||
prmSearch.SetFilters(prm.Filters)
|
||||
|
||||
if prm.BearerToken != nil {
|
||||
prmSearch.UseBearer(*prm.BearerToken)
|
||||
}
|
||||
|
||||
res, err := x.pool.SearchObjects(ctx, prmSearch)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("init object search via connection pool", err)
|
||||
}
|
||||
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
// GetEpochDurations implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) GetEpochDurations(ctx context.Context) (*utils.EpochDurations, error) {
|
||||
networkInfo, err := x.pool.NetworkInfo(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res := &utils.EpochDurations{
|
||||
CurrentEpoch: networkInfo.CurrentEpoch(),
|
||||
MsPerBlock: networkInfo.MsPerBlock(),
|
||||
BlockPerEpoch: networkInfo.EpochDuration(),
|
||||
}
|
||||
|
||||
if res.BlockPerEpoch == 0 {
|
||||
return nil, fmt.Errorf("EpochDuration is empty")
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
// ResolverFrostFS represents virtual connection to the FrostFS network.
|
||||
// It implements resolver.FrostFS.
|
||||
type ResolverFrostFS struct {
|
||||
pool *pool.Pool
|
||||
}
|
||||
|
||||
// NewResolverFrostFS creates new ResolverFrostFS using provided pool.Pool.
|
||||
func NewResolverFrostFS(p *pool.Pool) *ResolverFrostFS {
|
||||
return &ResolverFrostFS{pool: p}
|
||||
}
|
||||
|
||||
// SystemDNS implements resolver.FrostFS interface method.
|
||||
func (x *ResolverFrostFS) SystemDNS(ctx context.Context) (string, error) {
|
||||
networkInfo, err := x.pool.NetworkInfo(ctx)
|
||||
if err != nil {
|
||||
return "", handleObjectError("read network info via client", err)
|
||||
}
|
||||
|
||||
domain := networkInfo.RawNetworkParameter("SystemDNS")
|
||||
if domain == nil {
|
||||
return "", errors.New("system DNS parameter not found or empty")
|
||||
}
|
||||
|
||||
return string(domain), nil
|
||||
}
|
||||
|
||||
func handleObjectError(msg string, err error) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if reason, ok := IsErrObjectAccessDenied(err); ok {
|
||||
return fmt.Errorf("%s: %w: %s", msg, handler.ErrAccessDenied, reason)
|
||||
}
|
||||
|
||||
if IsTimeoutError(err) {
|
||||
return fmt.Errorf("%s: %w: %s", msg, handler.ErrGatewayTimeout, err.Error())
|
||||
}
|
||||
|
||||
return fmt.Errorf("%s: %w", msg, err)
|
||||
}
|
||||
|
||||
func UnwrapErr(err error) error {
|
||||
unwrappedErr := errors.Unwrap(err)
|
||||
for unwrappedErr != nil {
|
||||
err = unwrappedErr
|
||||
unwrappedErr = errors.Unwrap(err)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func IsErrObjectAccessDenied(err error) (string, bool) {
|
||||
err = UnwrapErr(err)
|
||||
switch err := err.(type) {
|
||||
default:
|
||||
return "", false
|
||||
case *apistatus.ObjectAccessDenied:
|
||||
return err.Reason(), true
|
||||
}
|
||||
}
|
||||
|
||||
func IsTimeoutError(err error) bool {
|
||||
if strings.Contains(err.Error(), "timeout") ||
|
||||
errors.Is(err, context.DeadlineExceeded) {
|
||||
return true
|
||||
}
|
||||
|
||||
return status.Code(UnwrapErr(err)) == codes.DeadlineExceeded
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue