[#559] Remove multipart objects using tombstones
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
parent
a12fea8a5b
commit
037e972424
12 changed files with 302 additions and 23 deletions
|
@ -32,6 +32,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/panjf2000/ants/v2"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
|
@ -184,6 +185,11 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig) (*handlerContextBas
|
||||||
|
|
||||||
features := &layer.FeatureSettingsMock{}
|
features := &layer.FeatureSettingsMock{}
|
||||||
|
|
||||||
|
pool, err := ants.NewPool(1)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
layerCfg := &layer.Config{
|
layerCfg := &layer.Config{
|
||||||
Cache: layer.NewCache(cacheCfg),
|
Cache: layer.NewCache(cacheCfg),
|
||||||
AnonKey: layer.AnonymousKey{Key: key},
|
AnonKey: layer.AnonymousKey{Key: key},
|
||||||
|
@ -191,6 +197,7 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig) (*handlerContextBas
|
||||||
TreeService: treeMock,
|
TreeService: treeMock,
|
||||||
Features: features,
|
Features: features,
|
||||||
GateOwner: owner,
|
GateOwner: owner,
|
||||||
|
WorkerPool: pool,
|
||||||
}
|
}
|
||||||
|
|
||||||
var pp netmap.PlacementPolicy
|
var pp netmap.PlacementPolicy
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||||
|
@ -170,6 +171,9 @@ type PrmObjectCreate struct {
|
||||||
|
|
||||||
// Sets max buffer size to read payload.
|
// Sets max buffer size to read payload.
|
||||||
BufferMaxSize uint64
|
BufferMaxSize uint64
|
||||||
|
|
||||||
|
// Object type (optional).
|
||||||
|
Type object.Type
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateObjectResult is a result parameter of FrostFS.CreateObject operation.
|
// CreateObjectResult is a result parameter of FrostFS.CreateObject operation.
|
||||||
|
@ -344,4 +348,7 @@ type FrostFS interface {
|
||||||
|
|
||||||
// NetworkInfo returns parameters of FrostFS network.
|
// NetworkInfo returns parameters of FrostFS network.
|
||||||
NetworkInfo(context.Context) (netmap.NetworkInfo, error)
|
NetworkInfo(context.Context) (netmap.NetworkInfo, error)
|
||||||
|
|
||||||
|
// Relations returns implementation of relations.Relations interface.
|
||||||
|
Relations() relations.Relations
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||||
|
@ -35,6 +36,14 @@ type FeatureSettingsMock struct {
|
||||||
md5Enabled bool
|
md5Enabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (k *FeatureSettingsMock) TombstoneLifetime() uint64 {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *FeatureSettingsMock) TombstoneMembersSize() int {
|
||||||
|
return 2
|
||||||
|
}
|
||||||
|
|
||||||
func (k *FeatureSettingsMock) BufferMaxSizeForPut() uint64 {
|
func (k *FeatureSettingsMock) BufferMaxSizeForPut() uint64 {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
@ -262,7 +271,36 @@ func (t *TestFrostFS) RangeObject(ctx context.Context, prm frostfs.PrmObjectRang
|
||||||
return io.NopCloser(bytes.NewReader(payload)), nil
|
return io.NopCloser(bytes.NewReader(payload)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TestFrostFS) CreateObject(_ context.Context, prm frostfs.PrmObjectCreate) (*frostfs.CreateObjectResult, error) {
|
func (t *TestFrostFS) CreateObject(ctx context.Context, prm frostfs.PrmObjectCreate) (*frostfs.CreateObjectResult, error) {
|
||||||
|
if prm.Type == object.TypeTombstone {
|
||||||
|
payload, err := io.ReadAll(prm.Payload)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var tomb object.Tombstone
|
||||||
|
err = tomb.Unmarshal(payload)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, objID := range tomb.Members() {
|
||||||
|
prmDelete := frostfs.PrmObjectDelete{
|
||||||
|
PrmAuth: prm.PrmAuth,
|
||||||
|
Container: prm.Container,
|
||||||
|
Object: objID,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = t.DeleteObject(ctx, prmDelete); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &frostfs.CreateObjectResult{
|
||||||
|
CreationEpoch: t.currentEpoch,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
b := make([]byte, 32)
|
b := make([]byte, 32)
|
||||||
if _, err := io.ReadFull(rand.Reader, b); err != nil {
|
if _, err := io.ReadFull(rand.Reader, b); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -459,6 +497,10 @@ func (t *TestFrostFS) PatchObject(ctx context.Context, prm frostfs.PrmObjectPatc
|
||||||
return newID, nil
|
return newID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *TestFrostFS) Relations() relations.Relations {
|
||||||
|
return &RelationsMock{}
|
||||||
|
}
|
||||||
|
|
||||||
func (t *TestFrostFS) AddContainerPolicyChain(_ context.Context, prm frostfs.PrmAddContainerPolicyChain) error {
|
func (t *TestFrostFS) AddContainerPolicyChain(_ context.Context, prm frostfs.PrmAddContainerPolicyChain) error {
|
||||||
list, ok := t.chains[prm.ContainerID.EncodeToString()]
|
list, ok := t.chains[prm.ContainerID.EncodeToString()]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -499,3 +541,25 @@ func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool {
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type RelationsMock struct{}
|
||||||
|
|
||||||
|
func (r *RelationsMock) GetSplitInfo(context.Context, cid.ID, oid.ID, relations.Tokens) (*object.SplitInfo, error) {
|
||||||
|
return nil, relations.ErrNoSplitInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RelationsMock) ListChildrenByLinker(context.Context, cid.ID, oid.ID, relations.Tokens) ([]oid.ID, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RelationsMock) GetLeftSibling(context.Context, cid.ID, oid.ID, relations.Tokens) (oid.ID, error) {
|
||||||
|
return oid.ID{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RelationsMock) FindSiblingBySplitID(context.Context, cid.ID, *object.SplitID, relations.Tokens) ([]oid.ID, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RelationsMock) FindSiblingByParentID(_ context.Context, _ cid.ID, _ oid.ID, _ relations.Tokens) ([]oid.ID, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package layer
|
package layer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
|
@ -13,8 +14,10 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
apierr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
apierr "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||||
|
@ -27,10 +30,13 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/panjf2000/ants/v2"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -44,6 +50,8 @@ type (
|
||||||
BufferMaxSizeForPut() uint64
|
BufferMaxSizeForPut() uint64
|
||||||
MD5Enabled() bool
|
MD5Enabled() bool
|
||||||
FormContainerZone(ns string) string
|
FormContainerZone(ns string) string
|
||||||
|
TombstoneMembersSize() int
|
||||||
|
TombstoneLifetime() uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
Layer struct {
|
Layer struct {
|
||||||
|
@ -58,6 +66,7 @@ type (
|
||||||
gateKey *keys.PrivateKey
|
gateKey *keys.PrivateKey
|
||||||
corsCnrInfo *data.BucketInfo
|
corsCnrInfo *data.BucketInfo
|
||||||
lifecycleCnrInfo *data.BucketInfo
|
lifecycleCnrInfo *data.BucketInfo
|
||||||
|
workerPool *ants.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
Config struct {
|
Config struct {
|
||||||
|
@ -71,6 +80,7 @@ type (
|
||||||
GateKey *keys.PrivateKey
|
GateKey *keys.PrivateKey
|
||||||
CORSCnrInfo *data.BucketInfo
|
CORSCnrInfo *data.BucketInfo
|
||||||
LifecycleCnrInfo *data.BucketInfo
|
LifecycleCnrInfo *data.BucketInfo
|
||||||
|
WorkerPool *ants.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
// AnonymousKey contains data for anonymous requests.
|
// AnonymousKey contains data for anonymous requests.
|
||||||
|
@ -249,6 +259,7 @@ func NewLayer(log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer {
|
||||||
gateKey: config.GateKey,
|
gateKey: config.GateKey,
|
||||||
corsCnrInfo: config.CORSCnrInfo,
|
corsCnrInfo: config.CORSCnrInfo,
|
||||||
lifecycleCnrInfo: config.LifecycleCnrInfo,
|
lifecycleCnrInfo: config.LifecycleCnrInfo,
|
||||||
|
workerPool: config.WorkerPool,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -557,7 +568,7 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, nodeVersion := range nodeVersions {
|
for _, nodeVersion := range nodeVersions {
|
||||||
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
|
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj, networkInfo); obj.Error != nil {
|
||||||
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
|
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
|
||||||
return obj
|
return obj
|
||||||
}
|
}
|
||||||
|
@ -596,7 +607,7 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
||||||
}
|
}
|
||||||
|
|
||||||
if !nodeVersion.IsDeleteMarker {
|
if !nodeVersion.IsDeleteMarker {
|
||||||
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
|
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj, networkInfo); obj.Error != nil {
|
||||||
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
|
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
|
||||||
return obj
|
return obj
|
||||||
}
|
}
|
||||||
|
@ -732,19 +743,19 @@ func (n *Layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, ob
|
||||||
return n.getNodeVersion(ctx, objVersion)
|
return n.getNodeVersion(ctx, objVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) {
|
func (n *Layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject, networkInfo netmap.NetworkInfo) (string, error) {
|
||||||
if nodeVersion.IsDeleteMarker {
|
if nodeVersion.IsDeleteMarker {
|
||||||
return obj.VersionID, nil
|
return obj.VersionID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if nodeVersion.IsCombined {
|
if nodeVersion.IsCombined {
|
||||||
return "", n.removeCombinedObject(ctx, bkt, nodeVersion)
|
return "", n.removeCombinedObject(ctx, bkt, nodeVersion, networkInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
return "", n.objectDelete(ctx, bkt, nodeVersion.OID)
|
return "", n.objectDelete(ctx, bkt, nodeVersion.OID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion) error {
|
func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, networkInfo netmap.NetworkInfo) error {
|
||||||
combinedObj, err := n.objectGet(ctx, bkt, nodeVersion.OID)
|
combinedObj, err := n.objectGet(ctx, bkt, nodeVersion.OID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get combined object '%s': %w", nodeVersion.OID.EncodeToString(), err)
|
return fmt.Errorf("get combined object '%s': %w", nodeVersion.OID.EncodeToString(), err)
|
||||||
|
@ -755,20 +766,83 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
|
||||||
return fmt.Errorf("unmarshal combined object parts: %w", err)
|
return fmt.Errorf("unmarshal combined object parts: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tokens := prepareTokensParameter(ctx, bkt.Owner)
|
||||||
|
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, nodeVersion.OID, tokens)
|
||||||
|
if err != nil {
|
||||||
|
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
|
||||||
|
zap.String("oid", nodeVersion.OID.EncodeToString()), zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
members := append(oids, nodeVersion.OID)
|
||||||
for _, part := range parts {
|
for _, part := range parts {
|
||||||
if err = n.objectDelete(ctx, bkt, part.OID); err == nil {
|
oids, err = relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, part.OID, tokens)
|
||||||
continue
|
if err != nil {
|
||||||
|
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
|
||||||
|
zap.String("oid", part.OID.EncodeToString()), zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) {
|
members = append(members, append(oids, part.OID)...)
|
||||||
return fmt.Errorf("couldn't delete part '%s': %w", part.OID.EncodeToString(), err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", bkt.CID.EncodeToString()),
|
n.putTombstones(ctx, bkt, networkInfo, members)
|
||||||
zap.String("oid", part.OID.EncodeToString()), zap.Int("part number", part.Number), zap.Error(err))
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
tombstoneMembersSize := n.features.TombstoneMembersSize()
|
||||||
|
tombstoneLifetime := n.features.TombstoneLifetime()
|
||||||
|
|
||||||
|
for i := 0; i < len(members); i += tombstoneMembersSize {
|
||||||
|
end := tombstoneMembersSize * (i + 1)
|
||||||
|
if end > len(members) {
|
||||||
|
end = len(members)
|
||||||
|
}
|
||||||
|
n.submitPutTombstone(ctx, bkt, members[i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg)
|
||||||
}
|
}
|
||||||
|
|
||||||
return n.objectDelete(ctx, bkt, nodeVersion.OID)
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup) {
|
||||||
|
tomb := object.NewTombstone()
|
||||||
|
tomb.SetExpirationEpoch(expEpoch)
|
||||||
|
tomb.SetMembers(members)
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
err := n.workerPool.Submit(func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
if err := n.putTombstoneObject(ctx, tomb, bkt); err != nil {
|
||||||
|
n.reqLogger(ctx).Warn(logs.FailedToPutTombstoneObject, zap.String("cid", bkt.CID.EncodeToString()), zap.Error(err))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
wg.Done()
|
||||||
|
n.reqLogger(ctx).Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone, bktInfo *data.BucketInfo) error {
|
||||||
|
payload, err := tomb.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("marshal tombstone: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
prm := frostfs.PrmObjectCreate{
|
||||||
|
Container: bktInfo.CID,
|
||||||
|
Attributes: [][2]string{{objectV2.SysAttributeExpEpoch, strconv.FormatUint(tomb.ExpirationEpoch(), 10)}},
|
||||||
|
Payload: bytes.NewReader(payload),
|
||||||
|
CreationTime: TimeNow(ctx),
|
||||||
|
ClientCut: n.features.ClientCut(),
|
||||||
|
WithoutHomomorphicHash: bktInfo.HomomorphicHashDisabled,
|
||||||
|
BufferMaxSize: n.features.BufferMaxSizeForPut(),
|
||||||
|
Type: object.TypeTombstone,
|
||||||
|
}
|
||||||
|
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
|
||||||
|
|
||||||
|
_, err = n.frostFS.CreateObject(ctx, prm)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteObjects from the storage.
|
// DeleteObjects from the storage.
|
||||||
|
|
|
@ -21,8 +21,12 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"github.com/minio/sio"
|
"github.com/minio/sio"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -559,16 +563,43 @@ func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
networkInfo, err := n.GetNetworkInfo(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get network info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
n.deleteUploadedParts(ctx, p.Bkt, parts, networkInfo)
|
||||||
|
|
||||||
|
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) deleteUploadedParts(ctx context.Context, bkt *data.BucketInfo, parts PartsInfo, networkInfo netmap.NetworkInfo) {
|
||||||
|
members := make([]oid.ID, 0)
|
||||||
|
tokens := prepareTokensParameter(ctx, bkt.Owner)
|
||||||
for _, infos := range parts {
|
for _, infos := range parts {
|
||||||
for _, info := range infos {
|
for _, info := range infos {
|
||||||
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil {
|
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, info.OID, tokens)
|
||||||
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()),
|
if err != nil {
|
||||||
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err))
|
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
|
||||||
|
zap.String("oid", info.OID.EncodeToString()), zap.Error(err))
|
||||||
}
|
}
|
||||||
|
members = append(members, append(oids, info.OID)...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo)
|
n.putTombstones(ctx, bkt, networkInfo, members)
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareTokensParameter(ctx context.Context, bktOwner user.ID) relations.Tokens {
|
||||||
|
tokens := relations.Tokens{}
|
||||||
|
|
||||||
|
if bd, err := middleware.GetBoxData(ctx); err == nil && bd.Gate.BearerToken != nil {
|
||||||
|
if bd.Gate.BearerToken.Impersonate() || bktOwner.Equals(bearer.ResolveIssuer(*bd.Gate.BearerToken)) {
|
||||||
|
tokens.Bearer = bd.Gate.BearerToken
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return tokens
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {
|
func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {
|
||||||
|
|
|
@ -50,6 +50,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"github.com/go-chi/chi/v5/middleware"
|
"github.com/go-chi/chi/v5/middleware"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/panjf2000/ants/v2"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
|
@ -99,6 +100,7 @@ type (
|
||||||
frostfsidValidation bool
|
frostfsidValidation bool
|
||||||
accessbox *cid.ID
|
accessbox *cid.ID
|
||||||
dialerSource *internalnet.DialerSource
|
dialerSource *internalnet.DialerSource
|
||||||
|
workerPoolSize int
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
namespaces Namespaces
|
namespaces Namespaces
|
||||||
|
@ -119,6 +121,8 @@ type (
|
||||||
vhsNamespacesEnabled map[string]bool
|
vhsNamespacesEnabled map[string]bool
|
||||||
retryMaxBackoff time.Duration
|
retryMaxBackoff time.Duration
|
||||||
retryStrategy handler.RetryStrategy
|
retryStrategy handler.RetryStrategy
|
||||||
|
tombstoneMembersSize int
|
||||||
|
tombstoneLifetime uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
maxClientsConfig struct {
|
maxClientsConfig struct {
|
||||||
|
@ -224,12 +228,21 @@ func (a *App) initLayer(ctx context.Context) {
|
||||||
GateKey: a.key,
|
GateKey: a.key,
|
||||||
CORSCnrInfo: corsCnrInfo,
|
CORSCnrInfo: corsCnrInfo,
|
||||||
LifecycleCnrInfo: lifecycleCnrInfo,
|
LifecycleCnrInfo: lifecycleCnrInfo,
|
||||||
|
WorkerPool: a.initWorkerPool(),
|
||||||
}
|
}
|
||||||
|
|
||||||
// prepare object layer
|
// prepare object layer
|
||||||
a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg)
|
a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *App) initWorkerPool() *ants.Pool {
|
||||||
|
workerPool, err := ants.NewPool(a.settings.workerPoolSize)
|
||||||
|
if err != nil {
|
||||||
|
a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err))
|
||||||
|
}
|
||||||
|
return workerPool
|
||||||
|
}
|
||||||
|
|
||||||
func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
||||||
settings := &appSettings{
|
settings := &appSettings{
|
||||||
logLevel: log.lvl,
|
logLevel: log.lvl,
|
||||||
|
@ -239,6 +252,7 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
||||||
reconnectInterval: fetchReconnectInterval(v),
|
reconnectInterval: fetchReconnectInterval(v),
|
||||||
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
|
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
|
||||||
dialerSource: getDialerSource(log.logger, v),
|
dialerSource: getDialerSource(log.logger, v),
|
||||||
|
workerPoolSize: fetchTombstoneWorkerPoolSize(v),
|
||||||
}
|
}
|
||||||
|
|
||||||
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow)
|
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow)
|
||||||
|
@ -275,6 +289,8 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
|
||||||
httpLoggingMaxLogSize := v.GetInt(cfgHTTPLoggingMaxLogSize)
|
httpLoggingMaxLogSize := v.GetInt(cfgHTTPLoggingMaxLogSize)
|
||||||
httpLoggingOutputPath := v.GetString(cfgHTTPLoggingDestination)
|
httpLoggingOutputPath := v.GetString(cfgHTTPLoggingDestination)
|
||||||
httpLoggingUseGzip := v.GetBool(cfgHTTPLoggingGzip)
|
httpLoggingUseGzip := v.GetBool(cfgHTTPLoggingGzip)
|
||||||
|
tombstoneMembersSize := fetchTombstoneMembersSize(v)
|
||||||
|
tombstoneLifetime := fetchTombstoneLifetime(v)
|
||||||
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
@ -304,6 +320,8 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
|
||||||
s.vhsHeader = vhsHeader
|
s.vhsHeader = vhsHeader
|
||||||
s.servernameHeader = servernameHeader
|
s.servernameHeader = servernameHeader
|
||||||
s.vhsNamespacesEnabled = vhsNamespacesEnabled
|
s.vhsNamespacesEnabled = vhsNamespacesEnabled
|
||||||
|
s.tombstoneMembersSize = tombstoneMembersSize
|
||||||
|
s.tombstoneLifetime = tombstoneLifetime
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool {
|
func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool {
|
||||||
|
@ -506,6 +524,18 @@ func (s *appSettings) AccessBoxContainer() (cid.ID, bool) {
|
||||||
return cid.ID{}, false
|
return cid.ID{}, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) TombstoneMembersSize() int {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.tombstoneMembersSize
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) TombstoneLifetime() uint64 {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.tombstoneLifetime
|
||||||
|
}
|
||||||
|
|
||||||
func (a *App) initAPI(ctx context.Context) {
|
func (a *App) initAPI(ctx context.Context) {
|
||||||
a.initLayer(ctx)
|
a.initLayer(ctx)
|
||||||
a.initHandler()
|
a.initHandler()
|
||||||
|
|
|
@ -74,6 +74,10 @@ const (
|
||||||
defaultRetryMaxAttempts = 4
|
defaultRetryMaxAttempts = 4
|
||||||
defaultRetryMaxBackoff = 30 * time.Second
|
defaultRetryMaxBackoff = 30 * time.Second
|
||||||
defaultRetryStrategy = handler.RetryStrategyExponential
|
defaultRetryStrategy = handler.RetryStrategyExponential
|
||||||
|
|
||||||
|
defaultTombstoneLifetime = uint64(10)
|
||||||
|
defaultTombstoneMembersSize = 100
|
||||||
|
defaultTombstoneWorkerPoolSize = 100
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -240,6 +244,9 @@ const ( // Settings.
|
||||||
cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put"
|
cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put"
|
||||||
// Sets max attempt to make successful tree request.
|
// Sets max attempt to make successful tree request.
|
||||||
cfgTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts"
|
cfgTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts"
|
||||||
|
cfgTombstoneLifetime = "frostfs.tombstone.lifetime"
|
||||||
|
cfgTombstoneMembersSize = "frostfs.tombstone.members_size"
|
||||||
|
cfgTombstoneWorkerPoolSize = "frostfs.tombstone.worker_pool_size"
|
||||||
|
|
||||||
// Specifies the timeout after which unhealthy client be closed during rebalancing
|
// Specifies the timeout after which unhealthy client be closed during rebalancing
|
||||||
// if it will become healthy back.
|
// if it will become healthy back.
|
||||||
|
@ -804,6 +811,33 @@ func fetchTracingAttributes(v *viper.Viper) (map[string]string, error) {
|
||||||
return attributes, nil
|
return attributes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func fetchTombstoneLifetime(v *viper.Viper) uint64 {
|
||||||
|
tombstoneLifetime := v.GetUint64(cfgTombstoneLifetime)
|
||||||
|
if tombstoneLifetime <= 0 {
|
||||||
|
tombstoneLifetime = defaultTombstoneLifetime
|
||||||
|
}
|
||||||
|
|
||||||
|
return tombstoneLifetime
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchTombstoneMembersSize(v *viper.Viper) int {
|
||||||
|
tombstoneMembersSize := v.GetInt(cfgTombstoneMembersSize)
|
||||||
|
if tombstoneMembersSize <= 0 {
|
||||||
|
tombstoneMembersSize = defaultTombstoneMembersSize
|
||||||
|
}
|
||||||
|
|
||||||
|
return tombstoneMembersSize
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchTombstoneWorkerPoolSize(v *viper.Viper) int {
|
||||||
|
tombstoneWorkerPoolSize := v.GetInt(cfgTombstoneWorkerPoolSize)
|
||||||
|
if tombstoneWorkerPoolSize <= 0 {
|
||||||
|
tombstoneWorkerPoolSize = defaultTombstoneWorkerPoolSize
|
||||||
|
}
|
||||||
|
|
||||||
|
return tombstoneWorkerPoolSize
|
||||||
|
}
|
||||||
|
|
||||||
func newSettings() *viper.Viper {
|
func newSettings() *viper.Viper {
|
||||||
v := viper.New()
|
v := viper.New()
|
||||||
|
|
||||||
|
@ -876,6 +910,9 @@ func newSettings() *viper.Viper {
|
||||||
|
|
||||||
// frostfs
|
// frostfs
|
||||||
v.SetDefault(cfgBufferMaxSizeForPut, 1024*1024) // 1mb
|
v.SetDefault(cfgBufferMaxSizeForPut, 1024*1024) // 1mb
|
||||||
|
v.SetDefault(cfgTombstoneLifetime, defaultTombstoneLifetime)
|
||||||
|
v.SetDefault(cfgTombstoneMembersSize, defaultTombstoneMembersSize)
|
||||||
|
v.SetDefault(cfgTombstoneWorkerPoolSize, defaultTombstoneWorkerPoolSize)
|
||||||
|
|
||||||
// kludge
|
// kludge
|
||||||
v.SetDefault(cfgKludgeUseDefaultXMLNS, false)
|
v.SetDefault(cfgKludgeUseDefaultXMLNS, false)
|
||||||
|
|
|
@ -163,6 +163,12 @@ S3_GW_FROSTFS_BUFFER_MAX_SIZE_FOR_PUT=1048576
|
||||||
S3_GW_FROSTFS_TREE_POOL_MAX_ATTEMPTS=0
|
S3_GW_FROSTFS_TREE_POOL_MAX_ATTEMPTS=0
|
||||||
# Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back.
|
# Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back.
|
||||||
S3_GW_FROSTFS_GRACEFUL_CLOSE_ON_SWITCH_TIMEOUT=10s
|
S3_GW_FROSTFS_GRACEFUL_CLOSE_ON_SWITCH_TIMEOUT=10s
|
||||||
|
# Tombstone's lifetime in epochs.
|
||||||
|
S3_GW_FROSTFS_TOMBSTONE_LIFETIME=10
|
||||||
|
# Maximum number of object IDs in one tombstone.
|
||||||
|
S3_GW_FROSTFS_TOMBSTONE_MEMBERS_COUNT=100
|
||||||
|
# Maximum worker count in layer's worker pool that create tombstones.
|
||||||
|
S3_GW_FROSTFS_TOMBSTONE_WORKER_POOL_SIZE=100
|
||||||
|
|
||||||
# List of allowed AccessKeyID prefixes
|
# List of allowed AccessKeyID prefixes
|
||||||
# If not set, S3 GW will accept all AccessKeyIDs
|
# If not set, S3 GW will accept all AccessKeyIDs
|
||||||
|
|
|
@ -199,6 +199,13 @@ frostfs:
|
||||||
buffer_max_size_for_put: 1048576
|
buffer_max_size_for_put: 1048576
|
||||||
# Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back.
|
# Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back.
|
||||||
graceful_close_on_switch_timeout: 10s
|
graceful_close_on_switch_timeout: 10s
|
||||||
|
tombstone:
|
||||||
|
# Tombstone's lifetime in epochs.
|
||||||
|
lifetime: 10
|
||||||
|
# Maximum number of object IDs in one tombstone.
|
||||||
|
members_count: 100
|
||||||
|
# Maximum worker count in layer's worker pool that create tombstones.
|
||||||
|
worker_pool_size: 100
|
||||||
|
|
||||||
# List of allowed AccessKeyID prefixes
|
# List of allowed AccessKeyID prefixes
|
||||||
# If the parameter is omitted, S3 GW will accept all AccessKeyIDs
|
# If the parameter is omitted, S3 GW will accept all AccessKeyIDs
|
||||||
|
|
|
@ -588,6 +588,10 @@ frostfs:
|
||||||
buffer_max_size_for_put: 1048576 # 1mb
|
buffer_max_size_for_put: 1048576 # 1mb
|
||||||
tree_pool_max_attempts: 0
|
tree_pool_max_attempts: 0
|
||||||
graceful_close_on_switch_timeout: 10s
|
graceful_close_on_switch_timeout: 10s
|
||||||
|
tombstone:
|
||||||
|
lifetime: 10
|
||||||
|
members_count: 100
|
||||||
|
worker_pool_size: 100
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|
@ -597,6 +601,9 @@ frostfs:
|
||||||
| `buffer_max_size_for_put` | `uint64` | yes | `1048576` | Sets max buffer size for read payload in put operations. |
|
| `buffer_max_size_for_put` | `uint64` | yes | `1048576` | Sets max buffer size for read payload in put operations. |
|
||||||
| `tree_pool_max_attempts` | `uint32` | no | `0` | Sets max attempt to make successful tree request. Value 0 means the number of attempts equals to number of nodes in pool. |
|
| `tree_pool_max_attempts` | `uint32` | no | `0` | Sets max attempt to make successful tree request. Value 0 means the number of attempts equals to number of nodes in pool. |
|
||||||
| `graceful_close_on_switch_timeout` | `duration` | no | `10s` | Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back. |
|
| `graceful_close_on_switch_timeout` | `duration` | no | `10s` | Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back. |
|
||||||
|
| `tombstone.lifetime` | `uint64` | yes | 10 | Tombstone's lifetime in epochs. |
|
||||||
|
| `tombstone.members_count` | `int` | yes | 100 | Maximum number of object IDs in one tombstone. |
|
||||||
|
| `tombstone.worker_pool_size` | `int` | no | 100 | Maximum worker count in layer's worker pool that create tombstones. |
|
||||||
|
|
||||||
# `resolve_bucket` section
|
# `resolve_bucket` section
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
|
@ -214,6 +215,7 @@ func (x *FrostFS) CreateObject(ctx context.Context, prm frostfs.PrmObjectCreate)
|
||||||
obj.SetOwnerID(x.owner)
|
obj.SetOwnerID(x.owner)
|
||||||
obj.SetAttributes(attrs...)
|
obj.SetAttributes(attrs...)
|
||||||
obj.SetPayloadSize(prm.PayloadSize)
|
obj.SetPayloadSize(prm.PayloadSize)
|
||||||
|
obj.SetType(prm.Type)
|
||||||
|
|
||||||
if prm.BearerToken == nil && prm.PrivateKey != nil {
|
if prm.BearerToken == nil && prm.PrivateKey != nil {
|
||||||
var owner user.ID
|
var owner user.ID
|
||||||
|
@ -438,6 +440,10 @@ func (x *FrostFS) PatchObject(ctx context.Context, prm frostfs.PrmObjectPatch) (
|
||||||
return res.ObjectID, nil
|
return res.ObjectID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (x *FrostFS) Relations() relations.Relations {
|
||||||
|
return x.pool
|
||||||
|
}
|
||||||
|
|
||||||
// ResolverFrostFS represents virtual connection to the FrostFS network.
|
// ResolverFrostFS represents virtual connection to the FrostFS network.
|
||||||
// It implements resolver.FrostFS.
|
// It implements resolver.FrostFS.
|
||||||
type ResolverFrostFS struct {
|
type ResolverFrostFS struct {
|
||||||
|
|
|
@ -177,4 +177,7 @@ const (
|
||||||
MultinetConfigWontBeUpdated = "multinet config won't be updated"
|
MultinetConfigWontBeUpdated = "multinet config won't be updated"
|
||||||
MultinetDialSuccess = "multinet dial successful"
|
MultinetDialSuccess = "multinet dial successful"
|
||||||
MultinetDialFail = "multinet dial failed"
|
MultinetDialFail = "multinet dial failed"
|
||||||
|
FailedToPutTombstoneObject = "failed to put tombstone object"
|
||||||
|
FailedToCreateWorkerPool = "failed to create worker pool"
|
||||||
|
FailedToListAllObjectRelations = "failed to list all object relations"
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in a new issue