frostfs-s3-gw/internal/frostfs/frostfs.go
Roman Loginov f2274b2786 [#582] Return BucketAlreadyExists when global domain taken
A situation may occur when the global
domain is already occupied when
creating the container. The error
comes from the nns smart contract.
This error actually means that the
bucket has already been created.

Signed-off-by: Roman Loginov <r.loginov@yadro.com>
2024-12-17 12:39:09 +00:00

507 lines
14 KiB
Go

package frostfs
import (
"context"
"errors"
"fmt"
"io"
"strconv"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
frosterr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/util"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ape"
objectv2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
)
// FrostFS represents virtual connection to the FrostFS network.
// It is used to provide an interface to dependent packages
// which work with FrostFS.
type FrostFS struct {
pool *pool.Pool
await pool.WaitParams
owner user.ID
}
const (
defaultPollInterval = time.Second // overrides default value from pool
defaultPollTimeout = 120 * time.Second // same as default value from pool
)
// NewFrostFS creates new FrostFS using provided pool.Pool.
func NewFrostFS(p *pool.Pool, key *keys.PrivateKey) *FrostFS {
await := pool.WaitParams{PollInterval: defaultPollInterval, Timeout: defaultPollTimeout}
var owner user.ID
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
return &FrostFS{
pool: p,
await: await,
owner: owner,
}
}
// TimeToEpoch implements layer.FrostFS interface method.
func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (uint64, uint64, error) {
if futureTime.Before(now) {
return 0, 0, fmt.Errorf("time '%s' must be in the future (after %s)",
futureTime.Format(time.RFC3339), now.Format(time.RFC3339))
}
networkInfo, err := x.pool.NetworkInfo(ctx)
if err != nil {
return 0, 0, handleObjectError("get network info via client", err)
}
epoch, err := util.TimeToEpoch(&networkInfo, now, futureTime)
if err != nil {
return 0, 0, err
}
return networkInfo.CurrentEpoch(), epoch, nil
}
// Container implements layer.FrostFS interface method.
func (x *FrostFS) Container(ctx context.Context, layerPrm frostfs.PrmContainer) (*container.Container, error) {
prm := pool.PrmContainerGet{
ContainerID: layerPrm.ContainerID,
Session: layerPrm.SessionToken,
}
res, err := x.pool.GetContainer(ctx, prm)
if err != nil {
return nil, handleObjectError("read container via connection pool", err)
}
return &res, nil
}
// CreateContainer implements layer.FrostFS interface method.
func (x *FrostFS) CreateContainer(ctx context.Context, prm frostfs.PrmContainerCreate) (*frostfs.ContainerCreateResult, error) {
var cnr container.Container
cnr.Init()
cnr.SetPlacementPolicy(prm.Policy)
cnr.SetOwner(prm.Creator)
creationTime := prm.CreationTime
if creationTime.IsZero() {
creationTime = time.Now()
}
container.SetCreationTime(&cnr, creationTime)
if prm.Name != "" {
var d container.Domain
d.SetName(prm.Name)
d.SetZone(prm.Zone)
container.WriteDomain(&cnr, d)
container.SetName(&cnr, prm.Name)
}
for i := range prm.AdditionalAttributes {
cnr.SetAttribute(prm.AdditionalAttributes[i][0], prm.AdditionalAttributes[i][1])
}
err := pool.SyncContainerWithNetwork(ctx, &cnr, x.pool)
if err != nil {
return nil, handleObjectError("sync container with the network state", err)
}
prmPut := pool.PrmContainerPut{
ClientParams: client.PrmContainerPut{
Container: &cnr,
Session: prm.SessionToken,
},
WaitParams: &x.await,
}
// send request to save the container
idCnr, err := x.pool.PutContainer(ctx, prmPut)
return &frostfs.ContainerCreateResult{
ContainerID: idCnr,
HomomorphicHashDisabled: container.IsHomomorphicHashingDisabled(cnr),
}, handleObjectError("save container via connection pool", err)
}
// AddContainerPolicyChain implements frostfs.FrostFS interface method.
func (x *FrostFS) AddContainerPolicyChain(ctx context.Context, prm frostfs.PrmAddContainerPolicyChain) error {
data, err := prm.Chain.MarshalBinary()
if err != nil {
return err
}
prmAddAPEChain := pool.PrmAddAPEChain{
Target: ape.ChainTarget{
TargetType: ape.TargetTypeContainer,
Name: prm.ContainerID.EncodeToString(),
},
Chain: ape.Chain{Raw: data},
}
err = x.pool.AddAPEChain(ctx, prmAddAPEChain)
return handleObjectError("add ape chain to container", err)
}
// UserContainers implements layer.FrostFS interface method.
func (x *FrostFS) UserContainers(ctx context.Context, layerPrm frostfs.PrmUserContainers) ([]cid.ID, error) {
prm := pool.PrmContainerList{
OwnerID: layerPrm.UserID,
Session: layerPrm.SessionToken,
}
r, err := x.pool.ListContainers(ctx, prm)
return r, handleObjectError("list user containers via connection pool", err)
}
// DeleteContainer implements layer.FrostFS interface method.
func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Container) error {
prm := pool.PrmContainerDelete{ContainerID: id, Session: token, WaitParams: &x.await}
err := x.pool.DeleteContainer(ctx, prm)
return handleObjectError("delete container via connection pool", err)
}
// CreateObject implements layer.FrostFS interface method.
func (x *FrostFS) CreateObject(ctx context.Context, prm frostfs.PrmObjectCreate) (*frostfs.CreateObjectResult, error) {
attrNum := len(prm.Attributes) + 1 // + creation time
if prm.Filepath != "" {
attrNum++
}
attrs := make([]object.Attribute, 0, attrNum)
var a *object.Attribute
a = object.NewAttribute()
a.SetKey(object.AttributeTimestamp)
creationTime := prm.CreationTime
if creationTime.IsZero() {
creationTime = time.Now()
}
a.SetValue(strconv.FormatInt(creationTime.Unix(), 10))
attrs = append(attrs, *a)
for i := range prm.Attributes {
a = object.NewAttribute()
a.SetKey(prm.Attributes[i][0])
a.SetValue(prm.Attributes[i][1])
attrs = append(attrs, *a)
}
if prm.Filepath != "" {
a = object.NewAttribute()
a.SetKey(object.AttributeFilePath)
a.SetValue(prm.Filepath)
attrs = append(attrs, *a)
}
obj := object.New()
obj.SetContainerID(prm.Container)
obj.SetOwnerID(x.owner)
obj.SetAttributes(attrs...)
obj.SetPayloadSize(prm.PayloadSize)
obj.SetType(prm.Type)
if prm.BearerToken == nil && prm.PrivateKey != nil {
var owner user.ID
user.IDFromKey(&owner, prm.PrivateKey.PublicKey)
obj.SetOwnerID(owner)
}
if len(prm.Locks) > 0 {
lock := new(object.Lock)
lock.WriteMembers(prm.Locks)
objectv2.WriteLock(obj.ToV2(), (objectv2.Lock)(*lock))
}
var prmPut pool.PrmObjectPut
prmPut.SetHeader(*obj)
prmPut.SetPayload(prm.Payload)
prmPut.SetCopiesNumberVector(prm.CopiesNumber)
prmPut.SetClientCut(prm.ClientCut)
prmPut.WithoutHomomorphicHash(prm.WithoutHomomorphicHash)
prmPut.SetBufferMaxSize(prm.BufferMaxSize)
if prm.BearerToken != nil {
prmPut.UseBearer(*prm.BearerToken)
} else {
prmPut.UseKey(prm.PrivateKey)
}
res, err := x.pool.PutObject(ctx, prmPut)
if err = handleObjectError("save object via connection pool", err); err != nil {
return nil, err
}
return &frostfs.CreateObjectResult{
ObjectID: res.ObjectID,
CreationEpoch: res.Epoch,
}, nil
}
// wraps io.ReadCloser and transforms Read errors related to access violation
// to frostfs.ErrAccessDenied.
type payloadReader struct {
io.ReadCloser
}
func (x payloadReader) Read(p []byte) (int, error) {
n, err := x.ReadCloser.Read(p)
if err != nil && errors.Is(err, io.EOF) {
return n, err
}
return n, handleObjectError("read payload", err)
}
// HeadObject implements layer.FrostFS interface method.
func (x *FrostFS) HeadObject(ctx context.Context, prm frostfs.PrmObjectHead) (*object.Object, error) {
var addr oid.Address
addr.SetContainer(prm.Container)
addr.SetObject(prm.Object)
var prmHead pool.PrmObjectHead
prmHead.SetAddress(addr)
if prm.BearerToken != nil {
prmHead.UseBearer(*prm.BearerToken)
} else {
prmHead.UseKey(prm.PrivateKey)
}
res, err := x.pool.HeadObject(ctx, prmHead)
if err != nil {
return nil, handleObjectError("read object header via connection pool", err)
}
return &res, nil
}
// GetObject implements layer.FrostFS interface method.
func (x *FrostFS) GetObject(ctx context.Context, prm frostfs.PrmObjectGet) (*frostfs.Object, error) {
var addr oid.Address
addr.SetContainer(prm.Container)
addr.SetObject(prm.Object)
var prmGet pool.PrmObjectGet
prmGet.SetAddress(addr)
if prm.BearerToken != nil {
prmGet.UseBearer(*prm.BearerToken)
} else {
prmGet.UseKey(prm.PrivateKey)
}
res, err := x.pool.GetObject(ctx, prmGet)
if err != nil {
return nil, handleObjectError("init full object reading via connection pool", err)
}
return &frostfs.Object{
Header: res.Header,
Payload: res.Payload,
}, nil
}
// RangeObject implements layer.FrostFS interface method.
func (x *FrostFS) RangeObject(ctx context.Context, prm frostfs.PrmObjectRange) (io.ReadCloser, error) {
var addr oid.Address
addr.SetContainer(prm.Container)
addr.SetObject(prm.Object)
var prmRange pool.PrmObjectRange
prmRange.SetAddress(addr)
prmRange.SetOffset(prm.PayloadRange[0])
prmRange.SetLength(prm.PayloadRange[1])
if prm.BearerToken != nil {
prmRange.UseBearer(*prm.BearerToken)
} else {
prmRange.UseKey(prm.PrivateKey)
}
res, err := x.pool.ObjectRange(ctx, prmRange)
if err != nil {
return nil, handleObjectError("init payload range reading via connection pool", err)
}
return payloadReader{&res}, nil
}
// DeleteObject implements layer.FrostFS interface method.
func (x *FrostFS) DeleteObject(ctx context.Context, prm frostfs.PrmObjectDelete) error {
var addr oid.Address
addr.SetContainer(prm.Container)
addr.SetObject(prm.Object)
var prmDelete pool.PrmObjectDelete
prmDelete.SetAddress(addr)
if prm.BearerToken != nil {
prmDelete.UseBearer(*prm.BearerToken)
} else {
prmDelete.UseKey(prm.PrivateKey)
}
err := x.pool.DeleteObject(ctx, prmDelete)
return handleObjectError("mark object removal via connection pool", err)
}
// SearchObjects implements layer.FrostFS interface method.
func (x *FrostFS) SearchObjects(ctx context.Context, prm frostfs.PrmObjectSearch) ([]oid.ID, error) {
filters := object.NewSearchFilters()
filters.AddRootFilter()
if prm.ExactAttribute[0] != "" {
filters.AddFilter(prm.ExactAttribute[0], prm.ExactAttribute[1], object.MatchStringEqual)
}
if prm.FilePrefix != "" {
filters.AddFilter(object.AttributeFileName, prm.FilePrefix, object.MatchCommonPrefix)
}
var prmSearch pool.PrmObjectSearch
prmSearch.SetContainerID(prm.Container)
prmSearch.SetFilters(filters)
if prm.BearerToken != nil {
prmSearch.UseBearer(*prm.BearerToken)
} else {
prmSearch.UseKey(prm.PrivateKey)
}
res, err := x.pool.SearchObjects(ctx, prmSearch)
if err != nil {
return nil, handleObjectError("init object search via connection pool", err)
}
defer res.Close()
var buf []oid.ID
err = res.Iterate(func(id oid.ID) bool {
buf = append(buf, id)
return false
})
return buf, handleObjectError("read object list", err)
}
// NetworkInfo implements layer.FrostFS interface method.
func (x *FrostFS) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
ni, err := x.pool.NetworkInfo(ctx)
if err != nil {
return ni, handleObjectError("get network info via connection pool", err)
}
return ni, nil
}
func (x *FrostFS) PatchObject(ctx context.Context, prm frostfs.PrmObjectPatch) (oid.ID, error) {
var addr oid.Address
addr.SetContainer(prm.Container)
addr.SetObject(prm.Object)
var prmPatch pool.PrmObjectPatch
prmPatch.SetAddress(addr)
var rng object.Range
rng.SetOffset(prm.Offset)
rng.SetLength(prm.Length)
if prm.Length+prm.Offset > prm.ObjectSize {
rng.SetLength(prm.ObjectSize - prm.Offset)
}
prmPatch.SetRange(&rng)
prmPatch.SetPayloadReader(prm.Payload)
if prm.BearerToken != nil {
prmPatch.UseBearer(*prm.BearerToken)
} else {
prmPatch.UseKey(prm.PrivateKey)
}
res, err := x.pool.PatchObject(ctx, prmPatch)
if err != nil {
return oid.ID{}, handleObjectError("patch object via connection pool", err)
}
return res.ObjectID, nil
}
func (x *FrostFS) Relations() relations.Relations {
return x.pool
}
// ResolverFrostFS represents virtual connection to the FrostFS network.
// It implements resolver.FrostFS.
type ResolverFrostFS struct {
pool *pool.Pool
}
// NewResolverFrostFS creates new ResolverFrostFS using provided pool.Pool.
func NewResolverFrostFS(p *pool.Pool) *ResolverFrostFS {
return &ResolverFrostFS{pool: p}
}
// SystemDNS implements resolver.FrostFS interface method.
func (x *ResolverFrostFS) SystemDNS(ctx context.Context) (string, error) {
networkInfo, err := x.pool.NetworkInfo(ctx)
if err != nil {
return "", handleObjectError("read network info via client", err)
}
domain := networkInfo.RawNetworkParameter("SystemDNS")
if domain == nil {
return "", errors.New("system DNS parameter not found or empty")
}
return string(domain), nil
}
func handleObjectError(msg string, err error) error {
if err == nil {
return nil
}
if reason, ok := frosterr.IsErrObjectAccessDenied(err); ok {
return fmt.Errorf("%s: %w: %s", msg, frostfs.ErrAccessDenied, reason)
}
if frosterr.IsTimeoutError(err) {
return fmt.Errorf("%s: %w: %s", msg, frostfs.ErrGatewayTimeout, err.Error())
}
if strings.Contains(err.Error(), "global domain is already taken") {
return fmt.Errorf("%s: %w: %s", msg, frostfs.ErrGlobalDomainIsAlreadyTaken, err.Error())
}
return fmt.Errorf("%s: %w", msg, err)
}
// PoolStatistic is a mediator which implements authmate.FrostFS through pool.Pool.
type PoolStatistic struct {
pool *pool.Pool
}
// NewPoolStatistic creates new PoolStatistic using provided pool.Pool.
func NewPoolStatistic(p *pool.Pool) *PoolStatistic {
return &PoolStatistic{pool: p}
}
// Statistic implements interface method.
func (x *PoolStatistic) Statistic() pool.Statistic {
return x.pool.Statistic()
}