[#559] Remove multipart objects using tombstones

Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
Marina Biryukova 2024-11-22 12:32:35 +03:00
parent 51322cccdf
commit f215d200e8
13 changed files with 322 additions and 23 deletions

View file

@ -32,6 +32,7 @@ import (
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain" "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine" "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
@ -184,6 +185,11 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig) (*handlerContextBas
features := &layer.FeatureSettingsMock{} features := &layer.FeatureSettingsMock{}
pool, err := ants.NewPool(1)
if err != nil {
return nil, err
}
layerCfg := &layer.Config{ layerCfg := &layer.Config{
Cache: layer.NewCache(cacheCfg), Cache: layer.NewCache(cacheCfg),
AnonKey: layer.AnonymousKey{Key: key}, AnonKey: layer.AnonymousKey{Key: key},
@ -191,6 +197,7 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig) (*handlerContextBas
TreeService: treeMock, TreeService: treeMock,
Features: features, Features: features,
GateOwner: owner, GateOwner: owner,
WorkerPool: pool,
} }
var pp netmap.PlacementPolicy var pp netmap.PlacementPolicy

View file

@ -13,6 +13,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain" "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
@ -170,6 +171,9 @@ type PrmObjectCreate struct {
// Sets max buffer size to read payload. // Sets max buffer size to read payload.
BufferMaxSize uint64 BufferMaxSize uint64
// Object type (optional).
Type object.Type
} }
// CreateObjectResult is a result parameter of FrostFS.CreateObject operation. // CreateObjectResult is a result parameter of FrostFS.CreateObject operation.
@ -344,4 +348,7 @@ type FrostFS interface {
// NetworkInfo returns parameters of FrostFS network. // NetworkInfo returns parameters of FrostFS network.
NetworkInfo(context.Context) (netmap.NetworkInfo, error) NetworkInfo(context.Context) (netmap.NetworkInfo, error)
// Relations returns implementation of relations.Relations interface.
Relations() relations.Relations
} }

View file

@ -24,6 +24,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain" "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
@ -35,6 +36,14 @@ type FeatureSettingsMock struct {
md5Enabled bool md5Enabled bool
} }
func (k *FeatureSettingsMock) TombstoneLifetime() uint64 {
return 1
}
func (k *FeatureSettingsMock) TombstoneMembersSize() int {
return 2
}
func (k *FeatureSettingsMock) BufferMaxSizeForPut() uint64 { func (k *FeatureSettingsMock) BufferMaxSizeForPut() uint64 {
return 0 return 0
} }
@ -262,7 +271,11 @@ func (t *TestFrostFS) RangeObject(ctx context.Context, prm frostfs.PrmObjectRang
return io.NopCloser(bytes.NewReader(payload)), nil return io.NopCloser(bytes.NewReader(payload)), nil
} }
func (t *TestFrostFS) CreateObject(_ context.Context, prm frostfs.PrmObjectCreate) (*frostfs.CreateObjectResult, error) { func (t *TestFrostFS) CreateObject(ctx context.Context, prm frostfs.PrmObjectCreate) (*frostfs.CreateObjectResult, error) {
if prm.Type == object.TypeTombstone {
return t.createTombstone(ctx, prm)
}
b := make([]byte, 32) b := make([]byte, 32)
if _, err := io.ReadFull(rand.Reader, b); err != nil { if _, err := io.ReadFull(rand.Reader, b); err != nil {
return nil, err return nil, err
@ -338,6 +351,35 @@ func (t *TestFrostFS) CreateObject(_ context.Context, prm frostfs.PrmObjectCreat
}, nil }, nil
} }
func (t *TestFrostFS) createTombstone(ctx context.Context, prm frostfs.PrmObjectCreate) (*frostfs.CreateObjectResult, error) {
payload, err := io.ReadAll(prm.Payload)
if err != nil {
return nil, err
}
var tomb object.Tombstone
err = tomb.Unmarshal(payload)
if err != nil {
return nil, err
}
for _, objID := range tomb.Members() {
prmDelete := frostfs.PrmObjectDelete{
PrmAuth: prm.PrmAuth,
Container: prm.Container,
Object: objID,
}
if err = t.DeleteObject(ctx, prmDelete); err != nil {
return nil, err
}
}
return &frostfs.CreateObjectResult{
CreationEpoch: t.currentEpoch,
}, nil
}
func (t *TestFrostFS) DeleteObject(ctx context.Context, prm frostfs.PrmObjectDelete) error { func (t *TestFrostFS) DeleteObject(ctx context.Context, prm frostfs.PrmObjectDelete) error {
var addr oid.Address var addr oid.Address
addr.SetContainer(prm.Container) addr.SetContainer(prm.Container)
@ -459,6 +501,10 @@ func (t *TestFrostFS) PatchObject(ctx context.Context, prm frostfs.PrmObjectPatc
return newID, nil return newID, nil
} }
func (t *TestFrostFS) Relations() relations.Relations {
return &RelationsMock{}
}
func (t *TestFrostFS) AddContainerPolicyChain(_ context.Context, prm frostfs.PrmAddContainerPolicyChain) error { func (t *TestFrostFS) AddContainerPolicyChain(_ context.Context, prm frostfs.PrmAddContainerPolicyChain) error {
list, ok := t.chains[prm.ContainerID.EncodeToString()] list, ok := t.chains[prm.ContainerID.EncodeToString()]
if !ok { if !ok {
@ -499,3 +545,25 @@ func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool {
} }
return false return false
} }
type RelationsMock struct{}
func (r *RelationsMock) GetSplitInfo(context.Context, cid.ID, oid.ID, relations.Tokens) (*object.SplitInfo, error) {
return nil, relations.ErrNoSplitInfo
}
func (r *RelationsMock) ListChildrenByLinker(context.Context, cid.ID, oid.ID, relations.Tokens) ([]oid.ID, error) {
return nil, nil
}
func (r *RelationsMock) GetLeftSibling(context.Context, cid.ID, oid.ID, relations.Tokens) (oid.ID, error) {
return oid.ID{}, nil
}
func (r *RelationsMock) FindSiblingBySplitID(context.Context, cid.ID, *object.SplitID, relations.Tokens) ([]oid.ID, error) {
return nil, nil
}
func (r *RelationsMock) FindSiblingByParentID(_ context.Context, _ cid.ID, _ oid.ID, _ relations.Tokens) ([]oid.ID, error) {
return nil, nil
}

View file

@ -28,9 +28,11 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -44,6 +46,8 @@ type (
BufferMaxSizeForPut() uint64 BufferMaxSizeForPut() uint64
MD5Enabled() bool MD5Enabled() bool
FormContainerZone(ns string) string FormContainerZone(ns string) string
TombstoneMembersSize() int
TombstoneLifetime() uint64
} }
Layer struct { Layer struct {
@ -58,6 +62,7 @@ type (
gateKey *keys.PrivateKey gateKey *keys.PrivateKey
corsCnrInfo *data.BucketInfo corsCnrInfo *data.BucketInfo
lifecycleCnrInfo *data.BucketInfo lifecycleCnrInfo *data.BucketInfo
workerPool *ants.Pool
} }
Config struct { Config struct {
@ -71,6 +76,7 @@ type (
GateKey *keys.PrivateKey GateKey *keys.PrivateKey
CORSCnrInfo *data.BucketInfo CORSCnrInfo *data.BucketInfo
LifecycleCnrInfo *data.BucketInfo LifecycleCnrInfo *data.BucketInfo
WorkerPool *ants.Pool
} }
// AnonymousKey contains data for anonymous requests. // AnonymousKey contains data for anonymous requests.
@ -249,6 +255,7 @@ func NewLayer(log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer {
gateKey: config.GateKey, gateKey: config.GateKey,
corsCnrInfo: config.CORSCnrInfo, corsCnrInfo: config.CORSCnrInfo,
lifecycleCnrInfo: config.LifecycleCnrInfo, lifecycleCnrInfo: config.LifecycleCnrInfo,
workerPool: config.WorkerPool,
} }
} }
@ -557,7 +564,7 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
} }
for _, nodeVersion := range nodeVersions { for _, nodeVersion := range nodeVersions {
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil { if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj, networkInfo); obj.Error != nil {
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) { if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
return obj return obj
} }
@ -596,7 +603,7 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
} }
if !nodeVersion.IsDeleteMarker { if !nodeVersion.IsDeleteMarker {
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil { if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj, networkInfo); obj.Error != nil {
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) { if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
return obj return obj
} }
@ -732,19 +739,19 @@ func (n *Layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, ob
return n.getNodeVersion(ctx, objVersion) return n.getNodeVersion(ctx, objVersion)
} }
func (n *Layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) { func (n *Layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject, networkInfo netmap.NetworkInfo) (string, error) {
if nodeVersion.IsDeleteMarker { if nodeVersion.IsDeleteMarker {
return obj.VersionID, nil return obj.VersionID, nil
} }
if nodeVersion.IsCombined { if nodeVersion.IsCombined {
return "", n.removeCombinedObject(ctx, bkt, nodeVersion) return "", n.removeCombinedObject(ctx, bkt, nodeVersion, networkInfo)
} }
return "", n.objectDelete(ctx, bkt, nodeVersion.OID) return "", n.objectDelete(ctx, bkt, nodeVersion.OID)
} }
func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion) error { func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, networkInfo netmap.NetworkInfo) error {
combinedObj, err := n.objectGet(ctx, bkt, nodeVersion.OID) combinedObj, err := n.objectGet(ctx, bkt, nodeVersion.OID)
if err != nil { if err != nil {
return fmt.Errorf("get combined object '%s': %w", nodeVersion.OID.EncodeToString(), err) return fmt.Errorf("get combined object '%s': %w", nodeVersion.OID.EncodeToString(), err)
@ -755,20 +762,26 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
return fmt.Errorf("unmarshal combined object parts: %w", err) return fmt.Errorf("unmarshal combined object parts: %w", err)
} }
tokens := prepareTokensParameter(ctx, bkt.Owner)
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, nodeVersion.OID, tokens)
if err != nil {
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
zap.String("oid", nodeVersion.OID.EncodeToString()), zap.Error(err))
}
members := append(oids, nodeVersion.OID)
for _, part := range parts { for _, part := range parts {
if err = n.objectDelete(ctx, bkt, part.OID); err == nil { oids, err = relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, part.OID, tokens)
continue if err != nil {
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
zap.String("oid", part.OID.EncodeToString()), zap.Error(err))
} }
if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) { members = append(members, append(oids, part.OID)...)
return fmt.Errorf("couldn't delete part '%s': %w", part.OID.EncodeToString(), err)
} }
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", bkt.CID.EncodeToString()), n.putTombstones(ctx, bkt, networkInfo, members)
zap.String("oid", part.OID.EncodeToString()), zap.Int("part number", part.Number), zap.Error(err)) return nil
}
return n.objectDelete(ctx, bkt, nodeVersion.OID)
} }
// DeleteObjects from the storage. // DeleteObjects from the storage.

View file

@ -22,7 +22,9 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/minio/sio" "github.com/minio/sio"
"go.uber.org/zap" "go.uber.org/zap"
@ -565,16 +567,31 @@ func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
return err return err
} }
networkInfo, err := n.GetNetworkInfo(ctx)
if err != nil {
return fmt.Errorf("get network info: %w", err)
}
n.deleteUploadedParts(ctx, p.Bkt, parts, networkInfo)
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo)
}
func (n *Layer) deleteUploadedParts(ctx context.Context, bkt *data.BucketInfo, parts PartsInfo, networkInfo netmap.NetworkInfo) {
members := make([]oid.ID, 0)
tokens := prepareTokensParameter(ctx, bkt.Owner)
for _, infos := range parts { for _, infos := range parts {
for _, info := range infos { for _, info := range infos {
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil { oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, info.OID, tokens)
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()), if err != nil {
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err)) n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
zap.String("oid", info.OID.EncodeToString()), zap.Error(err))
} }
members = append(members, append(oids, info.OID)...)
} }
} }
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo) n.putTombstones(ctx, bkt, networkInfo, members)
} }
func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) { func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {

91
api/layer/tombstone.go Normal file
View file

@ -0,0 +1,91 @@
package layer
import (
"bytes"
"context"
"fmt"
"strconv"
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap"
)
func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) {
var wg sync.WaitGroup
tombstoneMembersSize := n.features.TombstoneMembersSize()
tombstoneLifetime := n.features.TombstoneLifetime()
for i := 0; i < len(members); i += tombstoneMembersSize {
end := tombstoneMembersSize * (i + 1)
if end > len(members) {
end = len(members)
}
n.submitPutTombstone(ctx, bkt, members[i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg)
}
wg.Wait()
}
func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup) {
tomb := object.NewTombstone()
tomb.SetExpirationEpoch(expEpoch)
tomb.SetMembers(members)
wg.Add(1)
err := n.workerPool.Submit(func() {
defer wg.Done()
if err := n.putTombstoneObject(ctx, tomb, bkt); err != nil {
n.reqLogger(ctx).Warn(logs.FailedToPutTombstoneObject, zap.String("cid", bkt.CID.EncodeToString()), zap.Error(err))
}
})
if err != nil {
wg.Done()
n.reqLogger(ctx).Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
}
}
func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone, bktInfo *data.BucketInfo) error {
payload, err := tomb.Marshal()
if err != nil {
return fmt.Errorf("marshal tombstone: %w", err)
}
prm := frostfs.PrmObjectCreate{
Container: bktInfo.CID,
Attributes: [][2]string{{objectV2.SysAttributeExpEpoch, strconv.FormatUint(tomb.ExpirationEpoch(), 10)}},
Payload: bytes.NewReader(payload),
CreationTime: TimeNow(ctx),
ClientCut: n.features.ClientCut(),
WithoutHomomorphicHash: bktInfo.HomomorphicHashDisabled,
BufferMaxSize: n.features.BufferMaxSizeForPut(),
Type: object.TypeTombstone,
}
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
_, err = n.frostFS.CreateObject(ctx, prm)
return err
}
func prepareTokensParameter(ctx context.Context, bktOwner user.ID) relations.Tokens {
tokens := relations.Tokens{}
if bd, err := middleware.GetBoxData(ctx); err == nil && bd.Gate.BearerToken != nil {
if bd.Gate.BearerToken.Impersonate() || bktOwner.Equals(bearer.ResolveIssuer(*bd.Gate.BearerToken)) {
tokens.Bearer = bd.Gate.BearerToken
}
}
return tokens
}

View file

@ -50,6 +50,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/go-chi/chi/v5/middleware" "github.com/go-chi/chi/v5/middleware"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/panjf2000/ants/v2"
"github.com/spf13/viper" "github.com/spf13/viper"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
@ -105,6 +106,7 @@ type (
frostfsidValidation bool frostfsidValidation bool
accessbox *cid.ID accessbox *cid.ID
dialerSource *internalnet.DialerSource dialerSource *internalnet.DialerSource
workerPoolSize int
mu sync.RWMutex mu sync.RWMutex
namespaces Namespaces namespaces Namespaces
@ -125,6 +127,8 @@ type (
vhsNamespacesEnabled map[string]bool vhsNamespacesEnabled map[string]bool
retryMaxBackoff time.Duration retryMaxBackoff time.Duration
retryStrategy handler.RetryStrategy retryStrategy handler.RetryStrategy
tombstoneMembersSize int
tombstoneLifetime uint64
} }
maxClientsConfig struct { maxClientsConfig struct {
@ -249,12 +253,21 @@ func (a *App) initLayer(ctx context.Context) {
GateKey: a.key, GateKey: a.key,
CORSCnrInfo: corsCnrInfo, CORSCnrInfo: corsCnrInfo,
LifecycleCnrInfo: lifecycleCnrInfo, LifecycleCnrInfo: lifecycleCnrInfo,
WorkerPool: a.initWorkerPool(),
} }
// prepare object layer // prepare object layer
a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg) a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg)
} }
func (a *App) initWorkerPool() *ants.Pool {
workerPool, err := ants.NewPool(a.settings.workerPoolSize)
if err != nil {
a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err))
}
return workerPool
}
func newAppSettings(log *Logger, v *viper.Viper) *appSettings { func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
settings := &appSettings{ settings := &appSettings{
logLevel: log.lvl, logLevel: log.lvl,
@ -264,6 +277,7 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
reconnectInterval: fetchReconnectInterval(v), reconnectInterval: fetchReconnectInterval(v),
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled), frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
dialerSource: getDialerSource(log.logger, v), dialerSource: getDialerSource(log.logger, v),
workerPoolSize: fetchTombstoneWorkerPoolSize(v),
} }
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow) settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow)
@ -300,6 +314,8 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
httpLoggingMaxLogSize := v.GetInt(cfgHTTPLoggingMaxLogSize) httpLoggingMaxLogSize := v.GetInt(cfgHTTPLoggingMaxLogSize)
httpLoggingOutputPath := v.GetString(cfgHTTPLoggingDestination) httpLoggingOutputPath := v.GetString(cfgHTTPLoggingDestination)
httpLoggingUseGzip := v.GetBool(cfgHTTPLoggingGzip) httpLoggingUseGzip := v.GetBool(cfgHTTPLoggingGzip)
tombstoneMembersSize := fetchTombstoneMembersSize(v)
tombstoneLifetime := fetchTombstoneLifetime(v)
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -329,6 +345,8 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
s.vhsHeader = vhsHeader s.vhsHeader = vhsHeader
s.servernameHeader = servernameHeader s.servernameHeader = servernameHeader
s.vhsNamespacesEnabled = vhsNamespacesEnabled s.vhsNamespacesEnabled = vhsNamespacesEnabled
s.tombstoneMembersSize = tombstoneMembersSize
s.tombstoneLifetime = tombstoneLifetime
} }
func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool { func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool {
@ -531,6 +549,18 @@ func (s *appSettings) AccessBoxContainer() (cid.ID, bool) {
return cid.ID{}, false return cid.ID{}, false
} }
func (s *appSettings) TombstoneMembersSize() int {
s.mu.RLock()
defer s.mu.RUnlock()
return s.tombstoneMembersSize
}
func (s *appSettings) TombstoneLifetime() uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.tombstoneLifetime
}
func (a *App) initAPI(ctx context.Context) { func (a *App) initAPI(ctx context.Context) {
a.initLayer(ctx) a.initLayer(ctx)
a.initHandler() a.initHandler()

View file

@ -74,6 +74,10 @@ const (
defaultRetryMaxAttempts = 4 defaultRetryMaxAttempts = 4
defaultRetryMaxBackoff = 30 * time.Second defaultRetryMaxBackoff = 30 * time.Second
defaultRetryStrategy = handler.RetryStrategyExponential defaultRetryStrategy = handler.RetryStrategyExponential
defaultTombstoneLifetime = 10
defaultTombstoneMembersSize = 100
defaultTombstoneWorkerPoolSize = 100
) )
var ( var (
@ -240,6 +244,9 @@ const ( // Settings.
cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put" cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put"
// Sets max attempt to make successful tree request. // Sets max attempt to make successful tree request.
cfgTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts" cfgTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts"
cfgTombstoneLifetime = "frostfs.tombstone.lifetime"
cfgTombstoneMembersSize = "frostfs.tombstone.members_size"
cfgTombstoneWorkerPoolSize = "frostfs.tombstone.worker_pool_size"
// Specifies the timeout after which unhealthy client be closed during rebalancing // Specifies the timeout after which unhealthy client be closed during rebalancing
// if it will become healthy back. // if it will become healthy back.
@ -804,6 +811,33 @@ func fetchTracingAttributes(v *viper.Viper) (map[string]string, error) {
return attributes, nil return attributes, nil
} }
func fetchTombstoneLifetime(v *viper.Viper) uint64 {
tombstoneLifetime := v.GetUint64(cfgTombstoneLifetime)
if tombstoneLifetime <= 0 {
tombstoneLifetime = defaultTombstoneLifetime
}
return tombstoneLifetime
}
func fetchTombstoneMembersSize(v *viper.Viper) int {
tombstoneMembersSize := v.GetInt(cfgTombstoneMembersSize)
if tombstoneMembersSize <= 0 {
tombstoneMembersSize = defaultTombstoneMembersSize
}
return tombstoneMembersSize
}
func fetchTombstoneWorkerPoolSize(v *viper.Viper) int {
tombstoneWorkerPoolSize := v.GetInt(cfgTombstoneWorkerPoolSize)
if tombstoneWorkerPoolSize <= 0 {
tombstoneWorkerPoolSize = defaultTombstoneWorkerPoolSize
}
return tombstoneWorkerPoolSize
}
func newSettings() *viper.Viper { func newSettings() *viper.Viper {
v := viper.New() v := viper.New()
@ -876,6 +910,9 @@ func newSettings() *viper.Viper {
// frostfs // frostfs
v.SetDefault(cfgBufferMaxSizeForPut, 1024*1024) // 1mb v.SetDefault(cfgBufferMaxSizeForPut, 1024*1024) // 1mb
v.SetDefault(cfgTombstoneLifetime, defaultTombstoneLifetime)
v.SetDefault(cfgTombstoneMembersSize, defaultTombstoneMembersSize)
v.SetDefault(cfgTombstoneWorkerPoolSize, defaultTombstoneWorkerPoolSize)
// kludge // kludge
v.SetDefault(cfgKludgeUseDefaultXMLNS, false) v.SetDefault(cfgKludgeUseDefaultXMLNS, false)

View file

@ -163,6 +163,12 @@ S3_GW_FROSTFS_BUFFER_MAX_SIZE_FOR_PUT=1048576
S3_GW_FROSTFS_TREE_POOL_MAX_ATTEMPTS=0 S3_GW_FROSTFS_TREE_POOL_MAX_ATTEMPTS=0
# Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back. # Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back.
S3_GW_FROSTFS_GRACEFUL_CLOSE_ON_SWITCH_TIMEOUT=10s S3_GW_FROSTFS_GRACEFUL_CLOSE_ON_SWITCH_TIMEOUT=10s
# Tombstone's lifetime in epochs.
S3_GW_FROSTFS_TOMBSTONE_LIFETIME=10
# Maximum number of object IDs in one tombstone.
S3_GW_FROSTFS_TOMBSTONE_MEMBERS_SIZE=100
# Maximum worker count in layer's worker pool that create tombstones.
S3_GW_FROSTFS_TOMBSTONE_WORKER_POOL_SIZE=100
# List of allowed AccessKeyID prefixes # List of allowed AccessKeyID prefixes
# If not set, S3 GW will accept all AccessKeyIDs # If not set, S3 GW will accept all AccessKeyIDs

View file

@ -199,6 +199,13 @@ frostfs:
buffer_max_size_for_put: 1048576 buffer_max_size_for_put: 1048576
# Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back. # Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back.
graceful_close_on_switch_timeout: 10s graceful_close_on_switch_timeout: 10s
tombstone:
# Tombstone's lifetime in epochs.
lifetime: 10
# Maximum number of object IDs in one tombstone.
members_size: 100
# Maximum worker count in layer's worker pool that create tombstones.
worker_pool_size: 100
# List of allowed AccessKeyID prefixes # List of allowed AccessKeyID prefixes
# If the parameter is omitted, S3 GW will accept all AccessKeyIDs # If the parameter is omitted, S3 GW will accept all AccessKeyIDs

View file

@ -588,6 +588,10 @@ frostfs:
buffer_max_size_for_put: 1048576 # 1mb buffer_max_size_for_put: 1048576 # 1mb
tree_pool_max_attempts: 0 tree_pool_max_attempts: 0
graceful_close_on_switch_timeout: 10s graceful_close_on_switch_timeout: 10s
tombstone:
lifetime: 10
members_size: 100
worker_pool_size: 100
``` ```
| Parameter | Type | SIGHUP reload | Default value | Description | | Parameter | Type | SIGHUP reload | Default value | Description |
@ -597,6 +601,9 @@ frostfs:
| `buffer_max_size_for_put` | `uint64` | yes | `1048576` | Sets max buffer size for read payload in put operations. | | `buffer_max_size_for_put` | `uint64` | yes | `1048576` | Sets max buffer size for read payload in put operations. |
| `tree_pool_max_attempts` | `uint32` | no | `0` | Sets max attempt to make successful tree request. Value 0 means the number of attempts equals to number of nodes in pool. | | `tree_pool_max_attempts` | `uint32` | no | `0` | Sets max attempt to make successful tree request. Value 0 means the number of attempts equals to number of nodes in pool. |
| `graceful_close_on_switch_timeout` | `duration` | no | `10s` | Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back. | | `graceful_close_on_switch_timeout` | `duration` | no | `10s` | Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back. |
| `tombstone.lifetime` | `uint64` | yes | 10 | Tombstone's lifetime in epochs. |
| `tombstone.members_size` | `int` | yes | 100 | Maximum number of object IDs in one tombstone. |
| `tombstone.worker_pool_size` | `int` | no | 100 | Maximum worker count in layer's worker pool that create tombstones. |
# `resolve_bucket` section # `resolve_bucket` section

View file

@ -19,6 +19,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
@ -214,6 +215,7 @@ func (x *FrostFS) CreateObject(ctx context.Context, prm frostfs.PrmObjectCreate)
obj.SetOwnerID(x.owner) obj.SetOwnerID(x.owner)
obj.SetAttributes(attrs...) obj.SetAttributes(attrs...)
obj.SetPayloadSize(prm.PayloadSize) obj.SetPayloadSize(prm.PayloadSize)
obj.SetType(prm.Type)
if prm.BearerToken == nil && prm.PrivateKey != nil { if prm.BearerToken == nil && prm.PrivateKey != nil {
var owner user.ID var owner user.ID
@ -438,6 +440,10 @@ func (x *FrostFS) PatchObject(ctx context.Context, prm frostfs.PrmObjectPatch) (
return res.ObjectID, nil return res.ObjectID, nil
} }
func (x *FrostFS) Relations() relations.Relations {
return x.pool
}
// ResolverFrostFS represents virtual connection to the FrostFS network. // ResolverFrostFS represents virtual connection to the FrostFS network.
// It implements resolver.FrostFS. // It implements resolver.FrostFS.
type ResolverFrostFS struct { type ResolverFrostFS struct {

View file

@ -178,4 +178,7 @@ const (
MultinetDialSuccess = "multinet dial successful" MultinetDialSuccess = "multinet dial successful"
MultinetDialFail = "multinet dial failed" MultinetDialFail = "multinet dial failed"
FailedToParseHTTPTime = "failed to parse http time, header is ignored" FailedToParseHTTPTime = "failed to parse http time, header is ignored"
FailedToPutTombstoneObject = "failed to put tombstone object"
FailedToCreateWorkerPool = "failed to create worker pool"
FailedToListAllObjectRelations = "failed to list all object relations"
) )