461 lines
12 KiB
Go
461 lines
12 KiB
Go
package frostfs
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math"
|
|
"strconv"
|
|
"time"
|
|
|
|
objectv2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
|
errorsFrost "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/errors"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
|
)
|
|
|
|
// FrostFS represents virtual connection to the FrostFS network.
|
|
// It is used to provide an interface to dependent packages
|
|
// which work with FrostFS.
|
|
type FrostFS struct {
|
|
pool *pool.Pool
|
|
await pool.WaitParams
|
|
owner user.ID
|
|
}
|
|
|
|
const (
|
|
defaultPollInterval = time.Second // overrides default value from pool
|
|
defaultPollTimeout = 120 * time.Second // same as default value from pool
|
|
)
|
|
|
|
// NewFrostFS creates new FrostFS using provided pool.Pool.
|
|
func NewFrostFS(p *pool.Pool, key *keys.PrivateKey) *FrostFS {
|
|
await := pool.WaitParams{PollInterval: defaultPollInterval, Timeout: defaultPollTimeout}
|
|
|
|
var owner user.ID
|
|
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
|
|
|
|
return &FrostFS{
|
|
pool: p,
|
|
await: await,
|
|
owner: owner,
|
|
}
|
|
}
|
|
|
|
// TimeToEpoch implements layer.FrostFS interface method.
|
|
func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (uint64, uint64, error) {
|
|
dur := futureTime.Sub(now)
|
|
if dur < 0 {
|
|
return 0, 0, fmt.Errorf("time '%s' must be in the future (after %s)",
|
|
futureTime.Format(time.RFC3339), now.Format(time.RFC3339))
|
|
}
|
|
|
|
networkInfo, err := x.pool.NetworkInfo(ctx)
|
|
if err != nil {
|
|
return 0, 0, handleObjectError("get network info via client", err)
|
|
}
|
|
|
|
durEpoch := networkInfo.EpochDuration()
|
|
if durEpoch == 0 {
|
|
return 0, 0, errors.New("epoch duration is missing or zero")
|
|
}
|
|
|
|
curr := networkInfo.CurrentEpoch()
|
|
msPerEpoch := durEpoch * uint64(networkInfo.MsPerBlock())
|
|
|
|
epochLifetime := uint64(dur.Milliseconds()) / msPerEpoch
|
|
if uint64(dur.Milliseconds())%msPerEpoch != 0 {
|
|
epochLifetime++
|
|
}
|
|
|
|
var epoch uint64
|
|
if epochLifetime >= math.MaxUint64-curr {
|
|
epoch = math.MaxUint64
|
|
} else {
|
|
epoch = curr + epochLifetime
|
|
}
|
|
|
|
return curr, epoch, nil
|
|
}
|
|
|
|
// Container implements layer.FrostFS interface method.
|
|
func (x *FrostFS) Container(ctx context.Context, layerPrm layer.PrmContainer) (*container.Container, error) {
|
|
prm := pool.PrmContainerGet{
|
|
ContainerID: layerPrm.ContainerID,
|
|
Session: layerPrm.SessionToken,
|
|
}
|
|
|
|
res, err := x.pool.GetContainer(ctx, prm)
|
|
if err != nil {
|
|
return nil, handleObjectError("read container via connection pool", err)
|
|
}
|
|
|
|
return &res, nil
|
|
}
|
|
|
|
// CreateContainer implements layer.FrostFS interface method.
|
|
func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCreate) (*layer.ContainerCreateResult, error) {
|
|
var cnr container.Container
|
|
cnr.Init()
|
|
cnr.SetPlacementPolicy(prm.Policy)
|
|
cnr.SetOwner(prm.Creator)
|
|
cnr.SetBasicACL(prm.BasicACL)
|
|
|
|
creationTime := prm.CreationTime
|
|
if creationTime.IsZero() {
|
|
creationTime = time.Now()
|
|
}
|
|
container.SetCreationTime(&cnr, creationTime)
|
|
|
|
if prm.Name != "" {
|
|
var d container.Domain
|
|
d.SetName(prm.Name)
|
|
d.SetZone(prm.Zone)
|
|
|
|
container.WriteDomain(&cnr, d)
|
|
container.SetName(&cnr, prm.Name)
|
|
}
|
|
|
|
for i := range prm.AdditionalAttributes {
|
|
cnr.SetAttribute(prm.AdditionalAttributes[i][0], prm.AdditionalAttributes[i][1])
|
|
}
|
|
|
|
err := pool.SyncContainerWithNetwork(ctx, &cnr, x.pool)
|
|
if err != nil {
|
|
return nil, handleObjectError("sync container with the network state", err)
|
|
}
|
|
|
|
prmPut := pool.PrmContainerPut{
|
|
ClientParams: client.PrmContainerPut{
|
|
Container: &cnr,
|
|
Session: prm.SessionToken,
|
|
},
|
|
WaitParams: &x.await,
|
|
}
|
|
|
|
// send request to save the container
|
|
idCnr, err := x.pool.PutContainer(ctx, prmPut)
|
|
return &layer.ContainerCreateResult{
|
|
ContainerID: idCnr,
|
|
HomomorphicHashDisabled: container.IsHomomorphicHashingDisabled(cnr),
|
|
}, handleObjectError("save container via connection pool", err)
|
|
}
|
|
|
|
// UserContainers implements layer.FrostFS interface method.
|
|
func (x *FrostFS) UserContainers(ctx context.Context, layerPrm layer.PrmUserContainers) ([]cid.ID, error) {
|
|
prm := pool.PrmContainerList{
|
|
OwnerID: layerPrm.UserID,
|
|
Session: layerPrm.SessionToken,
|
|
}
|
|
|
|
r, err := x.pool.ListContainers(ctx, prm)
|
|
return r, handleObjectError("list user containers via connection pool", err)
|
|
}
|
|
|
|
// DeleteContainer implements layer.FrostFS interface method.
|
|
func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Container) error {
|
|
prm := pool.PrmContainerDelete{ContainerID: id, Session: token, WaitParams: &x.await}
|
|
|
|
err := x.pool.DeleteContainer(ctx, prm)
|
|
return handleObjectError("delete container via connection pool", err)
|
|
}
|
|
|
|
// CreateObject implements layer.FrostFS interface method.
|
|
func (x *FrostFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (*layer.CreateObjectResult, error) {
|
|
attrNum := len(prm.Attributes) + 1 // + creation time
|
|
|
|
if prm.Filepath != "" {
|
|
attrNum++
|
|
}
|
|
|
|
attrs := make([]object.Attribute, 0, attrNum)
|
|
var a *object.Attribute
|
|
|
|
a = object.NewAttribute()
|
|
a.SetKey(object.AttributeTimestamp)
|
|
|
|
creationTime := prm.CreationTime
|
|
if creationTime.IsZero() {
|
|
creationTime = time.Now()
|
|
}
|
|
a.SetValue(strconv.FormatInt(creationTime.Unix(), 10))
|
|
|
|
attrs = append(attrs, *a)
|
|
|
|
for i := range prm.Attributes {
|
|
a = object.NewAttribute()
|
|
a.SetKey(prm.Attributes[i][0])
|
|
a.SetValue(prm.Attributes[i][1])
|
|
attrs = append(attrs, *a)
|
|
}
|
|
|
|
if prm.Filepath != "" {
|
|
a = object.NewAttribute()
|
|
a.SetKey(object.AttributeFilePath)
|
|
a.SetValue(prm.Filepath)
|
|
attrs = append(attrs, *a)
|
|
}
|
|
|
|
obj := object.New()
|
|
obj.SetContainerID(prm.Container)
|
|
obj.SetOwnerID(x.owner)
|
|
obj.SetAttributes(attrs...)
|
|
obj.SetPayloadSize(prm.PayloadSize)
|
|
|
|
if prm.BearerToken == nil && prm.PrivateKey != nil {
|
|
var owner user.ID
|
|
user.IDFromKey(&owner, prm.PrivateKey.PublicKey)
|
|
obj.SetOwnerID(owner)
|
|
}
|
|
|
|
if len(prm.Locks) > 0 {
|
|
lock := new(object.Lock)
|
|
lock.WriteMembers(prm.Locks)
|
|
objectv2.WriteLock(obj.ToV2(), (objectv2.Lock)(*lock))
|
|
}
|
|
|
|
var prmPut pool.PrmObjectPut
|
|
prmPut.SetHeader(*obj)
|
|
prmPut.SetPayload(prm.Payload)
|
|
prmPut.SetCopiesNumberVector(prm.CopiesNumber)
|
|
prmPut.SetClientCut(prm.ClientCut)
|
|
prmPut.WithoutHomomorphicHash(prm.WithoutHomomorphicHash)
|
|
prmPut.SetBufferMaxSize(prm.BufferMaxSize)
|
|
|
|
if prm.BearerToken != nil {
|
|
prmPut.UseBearer(*prm.BearerToken)
|
|
} else {
|
|
prmPut.UseKey(prm.PrivateKey)
|
|
}
|
|
|
|
res, err := x.pool.PutObject(ctx, prmPut)
|
|
if err = handleObjectError("save object via connection pool", err); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &layer.CreateObjectResult{
|
|
ObjectID: res.ObjectID,
|
|
CreationEpoch: res.Epoch,
|
|
}, nil
|
|
}
|
|
|
|
// wraps io.ReadCloser and transforms Read errors related to access violation
|
|
// to frostfs.ErrAccessDenied.
|
|
type payloadReader struct {
|
|
io.ReadCloser
|
|
}
|
|
|
|
func (x payloadReader) Read(p []byte) (int, error) {
|
|
n, err := x.ReadCloser.Read(p)
|
|
if err != nil && errors.Is(err, io.EOF) {
|
|
return n, err
|
|
}
|
|
return n, handleObjectError("read payload", err)
|
|
}
|
|
|
|
// HeadObject implements layer.FrostFS interface method.
|
|
func (x *FrostFS) HeadObject(ctx context.Context, prm layer.PrmObjectHead) (*object.Object, error) {
|
|
var addr oid.Address
|
|
addr.SetContainer(prm.Container)
|
|
addr.SetObject(prm.Object)
|
|
|
|
var prmHead pool.PrmObjectHead
|
|
prmHead.SetAddress(addr)
|
|
|
|
if prm.BearerToken != nil {
|
|
prmHead.UseBearer(*prm.BearerToken)
|
|
} else {
|
|
prmHead.UseKey(prm.PrivateKey)
|
|
}
|
|
|
|
res, err := x.pool.HeadObject(ctx, prmHead)
|
|
if err != nil {
|
|
return nil, handleObjectError("read object header via connection pool", err)
|
|
}
|
|
|
|
return &res, nil
|
|
}
|
|
|
|
// GetObject implements layer.FrostFS interface method.
|
|
func (x *FrostFS) GetObject(ctx context.Context, prm layer.PrmObjectGet) (*layer.Object, error) {
|
|
var addr oid.Address
|
|
addr.SetContainer(prm.Container)
|
|
addr.SetObject(prm.Object)
|
|
|
|
var prmGet pool.PrmObjectGet
|
|
prmGet.SetAddress(addr)
|
|
|
|
if prm.BearerToken != nil {
|
|
prmGet.UseBearer(*prm.BearerToken)
|
|
} else {
|
|
prmGet.UseKey(prm.PrivateKey)
|
|
}
|
|
|
|
res, err := x.pool.GetObject(ctx, prmGet)
|
|
if err != nil {
|
|
return nil, handleObjectError("init full object reading via connection pool", err)
|
|
}
|
|
|
|
return &layer.Object{
|
|
Header: res.Header,
|
|
Payload: res.Payload,
|
|
}, nil
|
|
}
|
|
|
|
// RangeObject implements layer.FrostFS interface method.
|
|
func (x *FrostFS) RangeObject(ctx context.Context, prm layer.PrmObjectRange) (io.ReadCloser, error) {
|
|
var addr oid.Address
|
|
addr.SetContainer(prm.Container)
|
|
addr.SetObject(prm.Object)
|
|
|
|
var prmRange pool.PrmObjectRange
|
|
prmRange.SetAddress(addr)
|
|
prmRange.SetOffset(prm.PayloadRange[0])
|
|
prmRange.SetLength(prm.PayloadRange[1])
|
|
|
|
if prm.BearerToken != nil {
|
|
prmRange.UseBearer(*prm.BearerToken)
|
|
} else {
|
|
prmRange.UseKey(prm.PrivateKey)
|
|
}
|
|
|
|
res, err := x.pool.ObjectRange(ctx, prmRange)
|
|
if err != nil {
|
|
return nil, handleObjectError("init payload range reading via connection pool", err)
|
|
}
|
|
|
|
return payloadReader{&res}, nil
|
|
}
|
|
|
|
// DeleteObject implements layer.FrostFS interface method.
|
|
func (x *FrostFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) error {
|
|
var addr oid.Address
|
|
addr.SetContainer(prm.Container)
|
|
addr.SetObject(prm.Object)
|
|
|
|
var prmDelete pool.PrmObjectDelete
|
|
prmDelete.SetAddress(addr)
|
|
|
|
if prm.BearerToken != nil {
|
|
prmDelete.UseBearer(*prm.BearerToken)
|
|
} else {
|
|
prmDelete.UseKey(prm.PrivateKey)
|
|
}
|
|
|
|
err := x.pool.DeleteObject(ctx, prmDelete)
|
|
return handleObjectError("mark object removal via connection pool", err)
|
|
}
|
|
|
|
// SearchObjects implements layer.FrostFS interface method.
|
|
func (x *FrostFS) SearchObjects(ctx context.Context, prm layer.PrmObjectSearch) ([]oid.ID, error) {
|
|
filters := object.NewSearchFilters()
|
|
filters.AddRootFilter()
|
|
|
|
if prm.ExactAttribute[0] != "" {
|
|
filters.AddFilter(prm.ExactAttribute[0], prm.ExactAttribute[1], object.MatchStringEqual)
|
|
}
|
|
|
|
if prm.FilePrefix != "" {
|
|
filters.AddFilter(object.AttributeFileName, prm.FilePrefix, object.MatchCommonPrefix)
|
|
}
|
|
|
|
var prmSearch pool.PrmObjectSearch
|
|
prmSearch.SetContainerID(prm.Container)
|
|
prmSearch.SetFilters(filters)
|
|
|
|
if prm.BearerToken != nil {
|
|
prmSearch.UseBearer(*prm.BearerToken)
|
|
} else {
|
|
prmSearch.UseKey(prm.PrivateKey)
|
|
}
|
|
|
|
res, err := x.pool.SearchObjects(ctx, prmSearch)
|
|
if err != nil {
|
|
return nil, handleObjectError("init object search via connection pool", err)
|
|
}
|
|
defer res.Close()
|
|
|
|
var buf []oid.ID
|
|
err = res.Iterate(func(id oid.ID) bool {
|
|
buf = append(buf, id)
|
|
return false
|
|
})
|
|
return buf, handleObjectError("read object list", err)
|
|
}
|
|
|
|
// NetworkInfo implements layer.FrostFS interface method.
|
|
func (x *FrostFS) NetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
|
|
ni, err := x.pool.NetworkInfo(ctx)
|
|
if err != nil {
|
|
return ni, handleObjectError("get network info via connection pool", err)
|
|
}
|
|
|
|
return ni, nil
|
|
}
|
|
|
|
// ResolverFrostFS represents virtual connection to the FrostFS network.
|
|
// It implements resolver.FrostFS.
|
|
type ResolverFrostFS struct {
|
|
pool *pool.Pool
|
|
}
|
|
|
|
// NewResolverFrostFS creates new ResolverFrostFS using provided pool.Pool.
|
|
func NewResolverFrostFS(p *pool.Pool) *ResolverFrostFS {
|
|
return &ResolverFrostFS{pool: p}
|
|
}
|
|
|
|
// SystemDNS implements resolver.FrostFS interface method.
|
|
func (x *ResolverFrostFS) SystemDNS(ctx context.Context) (string, error) {
|
|
networkInfo, err := x.pool.NetworkInfo(ctx)
|
|
if err != nil {
|
|
return "", handleObjectError("read network info via client", err)
|
|
}
|
|
|
|
domain := networkInfo.RawNetworkParameter("SystemDNS")
|
|
if domain == nil {
|
|
return "", errors.New("system DNS parameter not found or empty")
|
|
}
|
|
|
|
return string(domain), nil
|
|
}
|
|
|
|
func handleObjectError(msg string, err error) error {
|
|
if err == nil {
|
|
return nil
|
|
}
|
|
|
|
if reason, ok := errorsFrost.IsErrObjectAccessDenied(err); ok {
|
|
return fmt.Errorf("%s: %w: %s", msg, layer.ErrAccessDenied, reason)
|
|
}
|
|
|
|
if errorsFrost.IsTimeoutError(err) {
|
|
return fmt.Errorf("%s: %w: %s", msg, layer.ErrGatewayTimeout, err.Error())
|
|
}
|
|
|
|
return fmt.Errorf("%s: %w", msg, err)
|
|
}
|
|
|
|
// PoolStatistic is a mediator which implements authmate.FrostFS through pool.Pool.
|
|
type PoolStatistic struct {
|
|
pool *pool.Pool
|
|
}
|
|
|
|
// NewPoolStatistic creates new PoolStatistic using provided pool.Pool.
|
|
func NewPoolStatistic(p *pool.Pool) *PoolStatistic {
|
|
return &PoolStatistic{pool: p}
|
|
}
|
|
|
|
// Statistic implements interface method.
|
|
func (x *PoolStatistic) Statistic() pool.Statistic {
|
|
return x.pool.Statistic()
|
|
}
|