Roman Loginov
8b3252cbd0
The Access Denied status may be received from APE due to exceeding the quota. In this situation, you need to return the appropriate error. The Conflict status is used because this error was made based on the LimitExceeded error from aws iam error https://docs.aws.amazon.com/IAM/latest/APIReference/API_CreateUser.html#API_CreateUser_Errors. Signed-off-by: Roman Loginov <r.loginov@yadro.com>
519 lines
14 KiB
Go
519 lines
14 KiB
Go
package frostfs
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
|
|
frosterr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/errors"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/util"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ape"
|
|
objectv2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
)
|
|
|
|
// FrostFS represents virtual connection to the FrostFS network.
|
|
// It is used to provide an interface to dependent packages
|
|
// which work with FrostFS.
|
|
type FrostFS struct {
|
|
pool *pool.Pool
|
|
await pool.WaitParams
|
|
owner user.ID
|
|
}
|
|
|
|
const (
|
|
defaultPollInterval = time.Second // overrides default value from pool
|
|
defaultPollTimeout = 120 * time.Second // same as default value from pool
|
|
)
|
|
|
|
// NewFrostFS creates new FrostFS using provided pool.Pool.
|
|
func NewFrostFS(p *pool.Pool, key *keys.PrivateKey) *FrostFS {
|
|
await := pool.WaitParams{PollInterval: defaultPollInterval, Timeout: defaultPollTimeout}
|
|
|
|
var owner user.ID
|
|
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
|
|
|
|
return &FrostFS{
|
|
pool: p,
|
|
await: await,
|
|
owner: owner,
|
|
}
|
|
}
|
|
|
|
// TimeToEpoch implements layer.FrostFS interface method.
|
|
func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (uint64, uint64, error) {
|
|
if futureTime.Before(now) {
|
|
return 0, 0, fmt.Errorf("time '%s' must be in the future (after %s)",
|
|
futureTime.Format(time.RFC3339), now.Format(time.RFC3339))
|
|
}
|
|
|
|
networkInfo, err := x.pool.NetworkInfo(ctx)
|
|
if err != nil {
|
|
return 0, 0, handleObjectError("get network info via client", err)
|
|
}
|
|
|
|
epoch, err := util.TimeToEpoch(&networkInfo, now, futureTime)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
return networkInfo.CurrentEpoch(), epoch, nil
|
|
}
|
|
|
|
// Container implements layer.FrostFS interface method.
|
|
func (x *FrostFS) Container(ctx context.Context, layerPrm frostfs.PrmContainer) (*container.Container, error) {
|
|
prm := pool.PrmContainerGet{
|
|
ContainerID: layerPrm.ContainerID,
|
|
Session: layerPrm.SessionToken,
|
|
}
|
|
|
|
res, err := x.pool.GetContainer(ctx, prm)
|
|
if err != nil {
|
|
return nil, handleObjectError("read container via connection pool", err)
|
|
}
|
|
|
|
return &res, nil
|
|
}
|
|
|
|
// CreateContainer implements layer.FrostFS interface method.
|
|
func (x *FrostFS) CreateContainer(ctx context.Context, prm frostfs.PrmContainerCreate) (*frostfs.ContainerCreateResult, error) {
|
|
var cnr container.Container
|
|
cnr.Init()
|
|
cnr.SetPlacementPolicy(prm.Policy)
|
|
cnr.SetOwner(prm.Creator)
|
|
|
|
creationTime := prm.CreationTime
|
|
if creationTime.IsZero() {
|
|
creationTime = time.Now()
|
|
}
|
|
container.SetCreationTime(&cnr, creationTime)
|
|
|
|
if prm.Name != "" {
|
|
var d container.Domain
|
|
d.SetName(prm.Name)
|
|
d.SetZone(prm.Zone)
|
|
|
|
container.WriteDomain(&cnr, d)
|
|
container.SetName(&cnr, prm.Name)
|
|
}
|
|
|
|
for i := range prm.AdditionalAttributes {
|
|
cnr.SetAttribute(prm.AdditionalAttributes[i][0], prm.AdditionalAttributes[i][1])
|
|
}
|
|
|
|
err := pool.SyncContainerWithNetwork(ctx, &cnr, x.pool)
|
|
if err != nil {
|
|
return nil, handleObjectError("sync container with the network state", err)
|
|
}
|
|
|
|
prmPut := pool.PrmContainerPut{
|
|
ClientParams: client.PrmContainerPut{
|
|
Container: &cnr,
|
|
Session: prm.SessionToken,
|
|
},
|
|
WaitParams: &x.await,
|
|
}
|
|
|
|
// send request to save the container
|
|
idCnr, err := x.pool.PutContainer(ctx, prmPut)
|
|
return &frostfs.ContainerCreateResult{
|
|
ContainerID: idCnr,
|
|
HomomorphicHashDisabled: container.IsHomomorphicHashingDisabled(cnr),
|
|
}, handleObjectError("save container via connection pool", err)
|
|
}
|
|
|
|
// AddContainerPolicyChain implements frostfs.FrostFS interface method.
|
|
func (x *FrostFS) AddContainerPolicyChain(ctx context.Context, prm frostfs.PrmAddContainerPolicyChain) error {
|
|
data, err := prm.Chain.MarshalBinary()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
prmAddAPEChain := pool.PrmAddAPEChain{
|
|
Target: ape.ChainTarget{
|
|
TargetType: ape.TargetTypeContainer,
|
|
Name: prm.ContainerID.EncodeToString(),
|
|
},
|
|
Chain: ape.Chain{Raw: data},
|
|
}
|
|
|
|
err = x.pool.AddAPEChain(ctx, prmAddAPEChain)
|
|
return handleObjectError("add ape chain to container", err)
|
|
}
|
|
|
|
// UserContainers implements layer.FrostFS interface method.
|
|
func (x *FrostFS) UserContainers(ctx context.Context, layerPrm frostfs.PrmUserContainers) ([]cid.ID, error) {
|
|
prm := pool.PrmContainerList{
|
|
OwnerID: layerPrm.UserID,
|
|
Session: layerPrm.SessionToken,
|
|
}
|
|
|
|
r, err := x.pool.ListContainers(ctx, prm)
|
|
return r, handleObjectError("list user containers via connection pool", err)
|
|
}
|
|
|
|
// DeleteContainer implements layer.FrostFS interface method.
|
|
func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Container) error {
|
|
prm := pool.PrmContainerDelete{ContainerID: id, Session: token, WaitParams: &x.await}
|
|
|
|
err := x.pool.DeleteContainer(ctx, prm)
|
|
return handleObjectError("delete container via connection pool", err)
|
|
}
|
|
|
|
// CreateObject implements layer.FrostFS interface method.
|
|
func (x *FrostFS) CreateObject(ctx context.Context, prm frostfs.PrmObjectCreate) (*frostfs.CreateObjectResult, error) {
|
|
attrNum := len(prm.Attributes) + 1 // + creation time
|
|
|
|
if prm.Filepath != "" {
|
|
attrNum++
|
|
}
|
|
|
|
attrs := make([]object.Attribute, 0, attrNum)
|
|
var a *object.Attribute
|
|
|
|
a = object.NewAttribute()
|
|
a.SetKey(object.AttributeTimestamp)
|
|
|
|
creationTime := prm.CreationTime
|
|
if creationTime.IsZero() {
|
|
creationTime = time.Now()
|
|
}
|
|
a.SetValue(strconv.FormatInt(creationTime.Unix(), 10))
|
|
|
|
attrs = append(attrs, *a)
|
|
|
|
for i := range prm.Attributes {
|
|
a = object.NewAttribute()
|
|
a.SetKey(prm.Attributes[i][0])
|
|
a.SetValue(prm.Attributes[i][1])
|
|
attrs = append(attrs, *a)
|
|
}
|
|
|
|
if prm.Filepath != "" {
|
|
a = object.NewAttribute()
|
|
a.SetKey(object.AttributeFilePath)
|
|
a.SetValue(prm.Filepath)
|
|
attrs = append(attrs, *a)
|
|
}
|
|
|
|
obj := object.New()
|
|
obj.SetContainerID(prm.Container)
|
|
obj.SetOwnerID(x.owner)
|
|
obj.SetAttributes(attrs...)
|
|
obj.SetPayloadSize(prm.PayloadSize)
|
|
obj.SetType(prm.Type)
|
|
|
|
if prm.BearerToken == nil && prm.PrivateKey != nil {
|
|
var owner user.ID
|
|
user.IDFromKey(&owner, prm.PrivateKey.PublicKey)
|
|
obj.SetOwnerID(owner)
|
|
}
|
|
|
|
if len(prm.Locks) > 0 {
|
|
lock := new(object.Lock)
|
|
lock.WriteMembers(prm.Locks)
|
|
objectv2.WriteLock(obj.ToV2(), (objectv2.Lock)(*lock))
|
|
}
|
|
|
|
var prmPut pool.PrmObjectPut
|
|
prmPut.SetHeader(*obj)
|
|
prmPut.SetPayload(prm.Payload)
|
|
prmPut.SetCopiesNumberVector(prm.CopiesNumber)
|
|
prmPut.SetClientCut(prm.ClientCut)
|
|
prmPut.WithoutHomomorphicHash(prm.WithoutHomomorphicHash)
|
|
prmPut.SetBufferMaxSize(prm.BufferMaxSize)
|
|
|
|
if prm.BearerToken != nil {
|
|
prmPut.UseBearer(*prm.BearerToken)
|
|
} else {
|
|
prmPut.UseKey(prm.PrivateKey)
|
|
}
|
|
|
|
res, err := x.pool.PutObject(ctx, prmPut)
|
|
if err = handleObjectError("save object via connection pool", err); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &frostfs.CreateObjectResult{
|
|
ObjectID: res.ObjectID,
|
|
CreationEpoch: res.Epoch,
|
|
}, nil
|
|
}
|
|
|
|
// wraps io.ReadCloser and transforms Read errors related to access violation
|
|
// to frostfs.ErrAccessDenied.
|
|
type payloadReader struct {
|
|
io.ReadCloser
|
|
}
|
|
|
|
func (x payloadReader) Read(p []byte) (int, error) {
|
|
n, err := x.ReadCloser.Read(p)
|
|
if err != nil && errors.Is(err, io.EOF) {
|
|
return n, err
|
|
}
|
|
return n, handleObjectError("read payload", err)
|
|
}
|
|
|
|
// HeadObject implements layer.FrostFS interface method.
|
|
func (x *FrostFS) HeadObject(ctx context.Context, prm frostfs.PrmObjectHead) (*object.Object, error) {
|
|
var addr oid.Address
|
|
addr.SetContainer(prm.Container)
|
|
addr.SetObject(prm.Object)
|
|
|
|
var prmHead pool.PrmObjectHead
|
|
prmHead.SetAddress(addr)
|
|
|
|
if prm.BearerToken != nil {
|
|
prmHead.UseBearer(*prm.BearerToken)
|
|
} else {
|
|
prmHead.UseKey(prm.PrivateKey)
|
|
}
|
|
|
|
res, err := x.pool.HeadObject(ctx, prmHead)
|
|
if err != nil {
|
|
return nil, handleObjectError("read object header via connection pool", err)
|
|
}
|
|
|
|
return &res, nil
|
|
}
|
|
|
|
// GetObject implements layer.FrostFS interface method.
|
|
func (x *FrostFS) GetObject(ctx context.Context, prm frostfs.PrmObjectGet) (*frostfs.Object, error) {
|
|
var addr oid.Address
|
|
addr.SetContainer(prm.Container)
|
|
addr.SetObject(prm.Object)
|
|
|
|
var prmGet pool.PrmObjectGet
|
|
prmGet.SetAddress(addr)
|
|
|
|
if prm.BearerToken != nil {
|
|
prmGet.UseBearer(*prm.BearerToken)
|
|
} else {
|
|
prmGet.UseKey(prm.PrivateKey)
|
|
}
|
|
|
|
res, err := x.pool.GetObject(ctx, prmGet)
|
|
if err != nil {
|
|
return nil, handleObjectError("init full object reading via connection pool", err)
|
|
}
|
|
|
|
return &frostfs.Object{
|
|
Header: res.Header,
|
|
Payload: res.Payload,
|
|
}, nil
|
|
}
|
|
|
|
// RangeObject implements layer.FrostFS interface method.
|
|
func (x *FrostFS) RangeObject(ctx context.Context, prm frostfs.PrmObjectRange) (io.ReadCloser, error) {
|
|
var addr oid.Address
|
|
addr.SetContainer(prm.Container)
|
|
addr.SetObject(prm.Object)
|
|
|
|
var prmRange pool.PrmObjectRange
|
|
prmRange.SetAddress(addr)
|
|
prmRange.SetOffset(prm.PayloadRange[0])
|
|
prmRange.SetLength(prm.PayloadRange[1])
|
|
|
|
if prm.BearerToken != nil {
|
|
prmRange.UseBearer(*prm.BearerToken)
|
|
} else {
|
|
prmRange.UseKey(prm.PrivateKey)
|
|
}
|
|
|
|
res, err := x.pool.ObjectRange(ctx, prmRange)
|
|
if err != nil {
|
|
return nil, handleObjectError("init payload range reading via connection pool", err)
|
|
}
|
|
|
|
return payloadReader{&res}, nil
|
|
}
|
|
|
|
// DeleteObject implements layer.FrostFS interface method.
|
|
func (x *FrostFS) DeleteObject(ctx context.Context, prm frostfs.PrmObjectDelete) error {
|
|
var addr oid.Address
|
|
addr.SetContainer(prm.Container)
|
|
addr.SetObject(prm.Object)
|
|
|
|
var prmDelete pool.PrmObjectDelete
|
|
prmDelete.SetAddress(addr)
|
|
|
|
if prm.BearerToken != nil {
|
|
prmDelete.UseBearer(*prm.BearerToken)
|
|
} else {
|
|
prmDelete.UseKey(prm.PrivateKey)
|
|
}
|
|
|
|
err := x.pool.DeleteObject(ctx, prmDelete)
|
|
return handleObjectError("mark object removal via connection pool", err)
|
|
}
|
|
|
|
// SearchObjects implements layer.FrostFS interface method.
|
|
func (x *FrostFS) SearchObjects(ctx context.Context, prm frostfs.PrmObjectSearch) ([]oid.ID, error) {
|
|
filters := object.NewSearchFilters()
|
|
filters.AddRootFilter()
|
|
|
|
if prm.ExactAttribute[0] != "" {
|
|
filters.AddFilter(prm.ExactAttribute[0], prm.ExactAttribute[1], object.MatchStringEqual)
|
|
}
|
|
|
|
if prm.FilePrefix != "" {
|
|
filters.AddFilter(object.AttributeFileName, prm.FilePrefix, object.MatchCommonPrefix)
|
|
}
|
|
|
|
var prmSearch pool.PrmObjectSearch
|
|
prmSearch.SetContainerID(prm.Container)
|
|
prmSearch.SetFilters(filters)
|
|
|
|
if prm.BearerToken != nil {
|
|
prmSearch.UseBearer(*prm.BearerToken)
|
|
} else {
|
|
prmSearch.UseKey(prm.PrivateKey)
|
|
}
|
|
|
|
res, err := x.pool.SearchObjects(ctx, prmSearch)
|
|
if err != nil {
|
|
return nil, handleObjectError("init object search via connection pool", err)
|
|
}
|
|
defer res.Close()
|
|
|
|
var buf []oid.ID
|
|
err = res.Iterate(func(id oid.ID) bool {
|
|
buf = append(buf, id)
|
|
return false
|
|
})
|
|
return buf, handleObjectError("read object list", err)
|
|
}
|
|
|
|
// NetworkInfo implements layer.FrostFS interface method.
|
|
func (x *FrostFS) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
|
|
ni, err := x.pool.NetworkInfo(ctx)
|
|
if err != nil {
|
|
return ni, handleObjectError("get network info via connection pool", err)
|
|
}
|
|
|
|
return ni, nil
|
|
}
|
|
|
|
func (x *FrostFS) NetmapSnapshot(ctx context.Context) (netmap.NetMap, error) {
|
|
netmapSnapshot, err := x.pool.NetMapSnapshot(ctx)
|
|
if err != nil {
|
|
return netmapSnapshot, handleObjectError("get netmap via connection pool", err)
|
|
}
|
|
|
|
return netmapSnapshot, nil
|
|
}
|
|
|
|
func (x *FrostFS) PatchObject(ctx context.Context, prm frostfs.PrmObjectPatch) (oid.ID, error) {
|
|
var addr oid.Address
|
|
addr.SetContainer(prm.Container)
|
|
addr.SetObject(prm.Object)
|
|
|
|
var prmPatch pool.PrmObjectPatch
|
|
prmPatch.SetAddress(addr)
|
|
|
|
var rng object.Range
|
|
rng.SetOffset(prm.Offset)
|
|
rng.SetLength(prm.Length)
|
|
if prm.Length+prm.Offset > prm.ObjectSize {
|
|
rng.SetLength(prm.ObjectSize - prm.Offset)
|
|
}
|
|
|
|
prmPatch.SetRange(&rng)
|
|
prmPatch.SetPayloadReader(prm.Payload)
|
|
|
|
if prm.BearerToken != nil {
|
|
prmPatch.UseBearer(*prm.BearerToken)
|
|
} else {
|
|
prmPatch.UseKey(prm.PrivateKey)
|
|
}
|
|
|
|
res, err := x.pool.PatchObject(ctx, prmPatch)
|
|
if err != nil {
|
|
return oid.ID{}, handleObjectError("patch object via connection pool", err)
|
|
}
|
|
|
|
return res.ObjectID, nil
|
|
}
|
|
|
|
func (x *FrostFS) Relations() relations.Relations {
|
|
return x.pool
|
|
}
|
|
|
|
// ResolverFrostFS represents virtual connection to the FrostFS network.
|
|
// It implements resolver.FrostFS.
|
|
type ResolverFrostFS struct {
|
|
pool *pool.Pool
|
|
}
|
|
|
|
// NewResolverFrostFS creates new ResolverFrostFS using provided pool.Pool.
|
|
func NewResolverFrostFS(p *pool.Pool) *ResolverFrostFS {
|
|
return &ResolverFrostFS{pool: p}
|
|
}
|
|
|
|
// SystemDNS implements resolver.FrostFS interface method.
|
|
func (x *ResolverFrostFS) SystemDNS(ctx context.Context) (string, error) {
|
|
networkInfo, err := x.pool.NetworkInfo(ctx)
|
|
if err != nil {
|
|
return "", handleObjectError("read network info via client", err)
|
|
}
|
|
|
|
domain := networkInfo.RawNetworkParameter("SystemDNS")
|
|
if domain == nil {
|
|
return "", errors.New("system DNS parameter not found or empty")
|
|
}
|
|
|
|
return string(domain), nil
|
|
}
|
|
|
|
func handleObjectError(msg string, err error) error {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
if reason, ok := frosterr.IsErrObjectAccessDenied(err); ok {
|
|
if strings.Contains(reason, "limit reached") {
|
|
return fmt.Errorf("%s: %w: %s", msg, frostfs.ErrQuotaLimitReached, reason)
|
|
}
|
|
return fmt.Errorf("%s: %w: %s", msg, frostfs.ErrAccessDenied, reason)
|
|
}
|
|
|
|
if frosterr.IsTimeoutError(err) {
|
|
return fmt.Errorf("%s: %w: %s", msg, frostfs.ErrGatewayTimeout, err.Error())
|
|
}
|
|
|
|
if strings.Contains(err.Error(), "global domain is already taken") {
|
|
return fmt.Errorf("%s: %w: %s", msg, frostfs.ErrGlobalDomainIsAlreadyTaken, err.Error())
|
|
}
|
|
|
|
return fmt.Errorf("%s: %w", msg, err)
|
|
}
|
|
|
|
// PoolStatistic is a mediator which implements authmate.FrostFS through pool.Pool.
|
|
type PoolStatistic struct {
|
|
pool *pool.Pool
|
|
}
|
|
|
|
// NewPoolStatistic creates new PoolStatistic using provided pool.Pool.
|
|
func NewPoolStatistic(p *pool.Pool) *PoolStatistic {
|
|
return &PoolStatistic{pool: p}
|
|
}
|
|
|
|
// Statistic implements interface method.
|
|
func (x *PoolStatistic) Statistic() pool.Statistic {
|
|
return x.pool.Statistic()
|
|
}
|