frostfs-s3-gw/internal/frostfs/frostfs.go
Roman Loginov 8b3252cbd0
All checks were successful
/ Vulncheck (push) Successful in 52s
/ Builds (push) Successful in 1m29s
/ OCI image (push) Successful in 2m8s
/ Lint (push) Successful in 2m9s
/ Tests (push) Successful in 1m48s
[#589] Add LimitExceeded error
The Access Denied status may be received
from APE due to exceeding the quota. In
this situation, you need to return the
appropriate error. The Conflict status is
used because this error was made based on
the LimitExceeded error from aws iam error
https://docs.aws.amazon.com/IAM/latest/APIReference/API_CreateUser.html#API_CreateUser_Errors.

Signed-off-by: Roman Loginov <r.loginov@yadro.com>
2025-01-17 06:31:08 +00:00

519 lines
14 KiB
Go

package frostfs
import (
"context"
"errors"
"fmt"
"io"
"strconv"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
frosterr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/util"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ape"
objectv2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
)
// FrostFS represents virtual connection to the FrostFS network.
// It is used to provide an interface to dependent packages
// which work with FrostFS.
type FrostFS struct {
pool *pool.Pool
await pool.WaitParams
owner user.ID
}
const (
defaultPollInterval = time.Second // overrides default value from pool
defaultPollTimeout = 120 * time.Second // same as default value from pool
)
// NewFrostFS creates new FrostFS using provided pool.Pool.
func NewFrostFS(p *pool.Pool, key *keys.PrivateKey) *FrostFS {
await := pool.WaitParams{PollInterval: defaultPollInterval, Timeout: defaultPollTimeout}
var owner user.ID
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
return &FrostFS{
pool: p,
await: await,
owner: owner,
}
}
// TimeToEpoch implements layer.FrostFS interface method.
func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (uint64, uint64, error) {
if futureTime.Before(now) {
return 0, 0, fmt.Errorf("time '%s' must be in the future (after %s)",
futureTime.Format(time.RFC3339), now.Format(time.RFC3339))
}
networkInfo, err := x.pool.NetworkInfo(ctx)
if err != nil {
return 0, 0, handleObjectError("get network info via client", err)
}
epoch, err := util.TimeToEpoch(&networkInfo, now, futureTime)
if err != nil {
return 0, 0, err
}
return networkInfo.CurrentEpoch(), epoch, nil
}
// Container implements layer.FrostFS interface method.
func (x *FrostFS) Container(ctx context.Context, layerPrm frostfs.PrmContainer) (*container.Container, error) {
prm := pool.PrmContainerGet{
ContainerID: layerPrm.ContainerID,
Session: layerPrm.SessionToken,
}
res, err := x.pool.GetContainer(ctx, prm)
if err != nil {
return nil, handleObjectError("read container via connection pool", err)
}
return &res, nil
}
// CreateContainer implements layer.FrostFS interface method.
func (x *FrostFS) CreateContainer(ctx context.Context, prm frostfs.PrmContainerCreate) (*frostfs.ContainerCreateResult, error) {
var cnr container.Container
cnr.Init()
cnr.SetPlacementPolicy(prm.Policy)
cnr.SetOwner(prm.Creator)
creationTime := prm.CreationTime
if creationTime.IsZero() {
creationTime = time.Now()
}
container.SetCreationTime(&cnr, creationTime)
if prm.Name != "" {
var d container.Domain
d.SetName(prm.Name)
d.SetZone(prm.Zone)
container.WriteDomain(&cnr, d)
container.SetName(&cnr, prm.Name)
}
for i := range prm.AdditionalAttributes {
cnr.SetAttribute(prm.AdditionalAttributes[i][0], prm.AdditionalAttributes[i][1])
}
err := pool.SyncContainerWithNetwork(ctx, &cnr, x.pool)
if err != nil {
return nil, handleObjectError("sync container with the network state", err)
}
prmPut := pool.PrmContainerPut{
ClientParams: client.PrmContainerPut{
Container: &cnr,
Session: prm.SessionToken,
},
WaitParams: &x.await,
}
// send request to save the container
idCnr, err := x.pool.PutContainer(ctx, prmPut)
return &frostfs.ContainerCreateResult{
ContainerID: idCnr,
HomomorphicHashDisabled: container.IsHomomorphicHashingDisabled(cnr),
}, handleObjectError("save container via connection pool", err)
}
// AddContainerPolicyChain implements frostfs.FrostFS interface method.
func (x *FrostFS) AddContainerPolicyChain(ctx context.Context, prm frostfs.PrmAddContainerPolicyChain) error {
data, err := prm.Chain.MarshalBinary()
if err != nil {
return err
}
prmAddAPEChain := pool.PrmAddAPEChain{
Target: ape.ChainTarget{
TargetType: ape.TargetTypeContainer,
Name: prm.ContainerID.EncodeToString(),
},
Chain: ape.Chain{Raw: data},
}
err = x.pool.AddAPEChain(ctx, prmAddAPEChain)
return handleObjectError("add ape chain to container", err)
}
// UserContainers implements layer.FrostFS interface method.
func (x *FrostFS) UserContainers(ctx context.Context, layerPrm frostfs.PrmUserContainers) ([]cid.ID, error) {
prm := pool.PrmContainerList{
OwnerID: layerPrm.UserID,
Session: layerPrm.SessionToken,
}
r, err := x.pool.ListContainers(ctx, prm)
return r, handleObjectError("list user containers via connection pool", err)
}
// DeleteContainer implements layer.FrostFS interface method.
func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Container) error {
prm := pool.PrmContainerDelete{ContainerID: id, Session: token, WaitParams: &x.await}
err := x.pool.DeleteContainer(ctx, prm)
return handleObjectError("delete container via connection pool", err)
}
// CreateObject implements layer.FrostFS interface method.
func (x *FrostFS) CreateObject(ctx context.Context, prm frostfs.PrmObjectCreate) (*frostfs.CreateObjectResult, error) {
attrNum := len(prm.Attributes) + 1 // + creation time
if prm.Filepath != "" {
attrNum++
}
attrs := make([]object.Attribute, 0, attrNum)
var a *object.Attribute
a = object.NewAttribute()
a.SetKey(object.AttributeTimestamp)
creationTime := prm.CreationTime
if creationTime.IsZero() {
creationTime = time.Now()
}
a.SetValue(strconv.FormatInt(creationTime.Unix(), 10))
attrs = append(attrs, *a)
for i := range prm.Attributes {
a = object.NewAttribute()
a.SetKey(prm.Attributes[i][0])
a.SetValue(prm.Attributes[i][1])
attrs = append(attrs, *a)
}
if prm.Filepath != "" {
a = object.NewAttribute()
a.SetKey(object.AttributeFilePath)
a.SetValue(prm.Filepath)
attrs = append(attrs, *a)
}
obj := object.New()
obj.SetContainerID(prm.Container)
obj.SetOwnerID(x.owner)
obj.SetAttributes(attrs...)
obj.SetPayloadSize(prm.PayloadSize)
obj.SetType(prm.Type)
if prm.BearerToken == nil && prm.PrivateKey != nil {
var owner user.ID
user.IDFromKey(&owner, prm.PrivateKey.PublicKey)
obj.SetOwnerID(owner)
}
if len(prm.Locks) > 0 {
lock := new(object.Lock)
lock.WriteMembers(prm.Locks)
objectv2.WriteLock(obj.ToV2(), (objectv2.Lock)(*lock))
}
var prmPut pool.PrmObjectPut
prmPut.SetHeader(*obj)
prmPut.SetPayload(prm.Payload)
prmPut.SetCopiesNumberVector(prm.CopiesNumber)
prmPut.SetClientCut(prm.ClientCut)
prmPut.WithoutHomomorphicHash(prm.WithoutHomomorphicHash)
prmPut.SetBufferMaxSize(prm.BufferMaxSize)
if prm.BearerToken != nil {
prmPut.UseBearer(*prm.BearerToken)
} else {
prmPut.UseKey(prm.PrivateKey)
}
res, err := x.pool.PutObject(ctx, prmPut)
if err = handleObjectError("save object via connection pool", err); err != nil {
return nil, err
}
return &frostfs.CreateObjectResult{
ObjectID: res.ObjectID,
CreationEpoch: res.Epoch,
}, nil
}
// wraps io.ReadCloser and transforms Read errors related to access violation
// to frostfs.ErrAccessDenied.
type payloadReader struct {
io.ReadCloser
}
func (x payloadReader) Read(p []byte) (int, error) {
n, err := x.ReadCloser.Read(p)
if err != nil && errors.Is(err, io.EOF) {
return n, err
}
return n, handleObjectError("read payload", err)
}
// HeadObject implements layer.FrostFS interface method.
func (x *FrostFS) HeadObject(ctx context.Context, prm frostfs.PrmObjectHead) (*object.Object, error) {
var addr oid.Address
addr.SetContainer(prm.Container)
addr.SetObject(prm.Object)
var prmHead pool.PrmObjectHead
prmHead.SetAddress(addr)
if prm.BearerToken != nil {
prmHead.UseBearer(*prm.BearerToken)
} else {
prmHead.UseKey(prm.PrivateKey)
}
res, err := x.pool.HeadObject(ctx, prmHead)
if err != nil {
return nil, handleObjectError("read object header via connection pool", err)
}
return &res, nil
}
// GetObject implements layer.FrostFS interface method.
func (x *FrostFS) GetObject(ctx context.Context, prm frostfs.PrmObjectGet) (*frostfs.Object, error) {
var addr oid.Address
addr.SetContainer(prm.Container)
addr.SetObject(prm.Object)
var prmGet pool.PrmObjectGet
prmGet.SetAddress(addr)
if prm.BearerToken != nil {
prmGet.UseBearer(*prm.BearerToken)
} else {
prmGet.UseKey(prm.PrivateKey)
}
res, err := x.pool.GetObject(ctx, prmGet)
if err != nil {
return nil, handleObjectError("init full object reading via connection pool", err)
}
return &frostfs.Object{
Header: res.Header,
Payload: res.Payload,
}, nil
}
// RangeObject implements layer.FrostFS interface method.
func (x *FrostFS) RangeObject(ctx context.Context, prm frostfs.PrmObjectRange) (io.ReadCloser, error) {
var addr oid.Address
addr.SetContainer(prm.Container)
addr.SetObject(prm.Object)
var prmRange pool.PrmObjectRange
prmRange.SetAddress(addr)
prmRange.SetOffset(prm.PayloadRange[0])
prmRange.SetLength(prm.PayloadRange[1])
if prm.BearerToken != nil {
prmRange.UseBearer(*prm.BearerToken)
} else {
prmRange.UseKey(prm.PrivateKey)
}
res, err := x.pool.ObjectRange(ctx, prmRange)
if err != nil {
return nil, handleObjectError("init payload range reading via connection pool", err)
}
return payloadReader{&res}, nil
}
// DeleteObject implements layer.FrostFS interface method.
func (x *FrostFS) DeleteObject(ctx context.Context, prm frostfs.PrmObjectDelete) error {
var addr oid.Address
addr.SetContainer(prm.Container)
addr.SetObject(prm.Object)
var prmDelete pool.PrmObjectDelete
prmDelete.SetAddress(addr)
if prm.BearerToken != nil {
prmDelete.UseBearer(*prm.BearerToken)
} else {
prmDelete.UseKey(prm.PrivateKey)
}
err := x.pool.DeleteObject(ctx, prmDelete)
return handleObjectError("mark object removal via connection pool", err)
}
// SearchObjects implements layer.FrostFS interface method.
func (x *FrostFS) SearchObjects(ctx context.Context, prm frostfs.PrmObjectSearch) ([]oid.ID, error) {
filters := object.NewSearchFilters()
filters.AddRootFilter()
if prm.ExactAttribute[0] != "" {
filters.AddFilter(prm.ExactAttribute[0], prm.ExactAttribute[1], object.MatchStringEqual)
}
if prm.FilePrefix != "" {
filters.AddFilter(object.AttributeFileName, prm.FilePrefix, object.MatchCommonPrefix)
}
var prmSearch pool.PrmObjectSearch
prmSearch.SetContainerID(prm.Container)
prmSearch.SetFilters(filters)
if prm.BearerToken != nil {
prmSearch.UseBearer(*prm.BearerToken)
} else {
prmSearch.UseKey(prm.PrivateKey)
}
res, err := x.pool.SearchObjects(ctx, prmSearch)
if err != nil {
return nil, handleObjectError("init object search via connection pool", err)
}
defer res.Close()
var buf []oid.ID
err = res.Iterate(func(id oid.ID) bool {
buf = append(buf, id)
return false
})
return buf, handleObjectError("read object list", err)
}
// NetworkInfo implements layer.FrostFS interface method.
func (x *FrostFS) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
ni, err := x.pool.NetworkInfo(ctx)
if err != nil {
return ni, handleObjectError("get network info via connection pool", err)
}
return ni, nil
}
func (x *FrostFS) NetmapSnapshot(ctx context.Context) (netmap.NetMap, error) {
netmapSnapshot, err := x.pool.NetMapSnapshot(ctx)
if err != nil {
return netmapSnapshot, handleObjectError("get netmap via connection pool", err)
}
return netmapSnapshot, nil
}
func (x *FrostFS) PatchObject(ctx context.Context, prm frostfs.PrmObjectPatch) (oid.ID, error) {
var addr oid.Address
addr.SetContainer(prm.Container)
addr.SetObject(prm.Object)
var prmPatch pool.PrmObjectPatch
prmPatch.SetAddress(addr)
var rng object.Range
rng.SetOffset(prm.Offset)
rng.SetLength(prm.Length)
if prm.Length+prm.Offset > prm.ObjectSize {
rng.SetLength(prm.ObjectSize - prm.Offset)
}
prmPatch.SetRange(&rng)
prmPatch.SetPayloadReader(prm.Payload)
if prm.BearerToken != nil {
prmPatch.UseBearer(*prm.BearerToken)
} else {
prmPatch.UseKey(prm.PrivateKey)
}
res, err := x.pool.PatchObject(ctx, prmPatch)
if err != nil {
return oid.ID{}, handleObjectError("patch object via connection pool", err)
}
return res.ObjectID, nil
}
func (x *FrostFS) Relations() relations.Relations {
return x.pool
}
// ResolverFrostFS represents virtual connection to the FrostFS network.
// It implements resolver.FrostFS.
type ResolverFrostFS struct {
pool *pool.Pool
}
// NewResolverFrostFS creates new ResolverFrostFS using provided pool.Pool.
func NewResolverFrostFS(p *pool.Pool) *ResolverFrostFS {
return &ResolverFrostFS{pool: p}
}
// SystemDNS implements resolver.FrostFS interface method.
func (x *ResolverFrostFS) SystemDNS(ctx context.Context) (string, error) {
networkInfo, err := x.pool.NetworkInfo(ctx)
if err != nil {
return "", handleObjectError("read network info via client", err)
}
domain := networkInfo.RawNetworkParameter("SystemDNS")
if domain == nil {
return "", errors.New("system DNS parameter not found or empty")
}
return string(domain), nil
}
func handleObjectError(msg string, err error) error {
if err == nil {
return nil
}
if reason, ok := frosterr.IsErrObjectAccessDenied(err); ok {
if strings.Contains(reason, "limit reached") {
return fmt.Errorf("%s: %w: %s", msg, frostfs.ErrQuotaLimitReached, reason)
}
return fmt.Errorf("%s: %w: %s", msg, frostfs.ErrAccessDenied, reason)
}
if frosterr.IsTimeoutError(err) {
return fmt.Errorf("%s: %w: %s", msg, frostfs.ErrGatewayTimeout, err.Error())
}
if strings.Contains(err.Error(), "global domain is already taken") {
return fmt.Errorf("%s: %w: %s", msg, frostfs.ErrGlobalDomainIsAlreadyTaken, err.Error())
}
return fmt.Errorf("%s: %w", msg, err)
}
// PoolStatistic is a mediator which implements authmate.FrostFS through pool.Pool.
type PoolStatistic struct {
pool *pool.Pool
}
// NewPoolStatistic creates new PoolStatistic using provided pool.Pool.
func NewPoolStatistic(p *pool.Pool) *PoolStatistic {
return &PoolStatistic{pool: p}
}
// Statistic implements interface method.
func (x *PoolStatistic) Statistic() pool.Statistic {
return x.pool.Statistic()
}