[#559] Remove multipart objects using tombstones #560

Merged
alexvanin merged 1 commit from mbiryukova/frostfs-s3-gw:feature/multipart_tombstone into master 2024-12-04 08:16:11 +00:00
13 changed files with 322 additions and 23 deletions

View file

@ -32,6 +32,7 @@ import (
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain" "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine" "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
@ -184,6 +185,11 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig) (*handlerContextBas
features := &layer.FeatureSettingsMock{} features := &layer.FeatureSettingsMock{}
pool, err := ants.NewPool(1)
if err != nil {
return nil, err
}
layerCfg := &layer.Config{ layerCfg := &layer.Config{
Cache: layer.NewCache(cacheCfg), Cache: layer.NewCache(cacheCfg),
AnonKey: layer.AnonymousKey{Key: key}, AnonKey: layer.AnonymousKey{Key: key},
@ -191,6 +197,7 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig) (*handlerContextBas
TreeService: treeMock, TreeService: treeMock,
Features: features, Features: features,
GateOwner: owner, GateOwner: owner,
WorkerPool: pool,
} }
var pp netmap.PlacementPolicy var pp netmap.PlacementPolicy

View file

@ -13,6 +13,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain" "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
@ -170,6 +171,9 @@ type PrmObjectCreate struct {
// Sets max buffer size to read payload. // Sets max buffer size to read payload.
BufferMaxSize uint64 BufferMaxSize uint64
// Object type (optional).
Type object.Type
} }
// CreateObjectResult is a result parameter of FrostFS.CreateObject operation. // CreateObjectResult is a result parameter of FrostFS.CreateObject operation.
@ -344,4 +348,7 @@ type FrostFS interface {
// NetworkInfo returns parameters of FrostFS network. // NetworkInfo returns parameters of FrostFS network.
NetworkInfo(context.Context) (netmap.NetworkInfo, error) NetworkInfo(context.Context) (netmap.NetworkInfo, error)
// Relations returns implementation of relations.Relations interface.
Relations() relations.Relations
} }

View file

@ -24,6 +24,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain" "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
@ -35,6 +36,14 @@ type FeatureSettingsMock struct {
md5Enabled bool md5Enabled bool
} }
func (k *FeatureSettingsMock) TombstoneLifetime() uint64 {
return 1
}
func (k *FeatureSettingsMock) TombstoneMembersSize() int {
return 2
}
func (k *FeatureSettingsMock) BufferMaxSizeForPut() uint64 { func (k *FeatureSettingsMock) BufferMaxSizeForPut() uint64 {
return 0 return 0
} }
@ -262,7 +271,11 @@ func (t *TestFrostFS) RangeObject(ctx context.Context, prm frostfs.PrmObjectRang
return io.NopCloser(bytes.NewReader(payload)), nil return io.NopCloser(bytes.NewReader(payload)), nil
} }
func (t *TestFrostFS) CreateObject(_ context.Context, prm frostfs.PrmObjectCreate) (*frostfs.CreateObjectResult, error) { func (t *TestFrostFS) CreateObject(ctx context.Context, prm frostfs.PrmObjectCreate) (*frostfs.CreateObjectResult, error) {
if prm.Type == object.TypeTombstone {
alexvanin marked this conversation as resolved Outdated

nitpick: I think this looks better in separate private

createTombstone(ctx context.Context, prm frostfs.PrmObjectCreate) {
}

This condition does not reuse any code from CreateObject so there is no point to inline this code.

nitpick: I think this looks better in separate private ``` createTombstone(ctx context.Context, prm frostfs.PrmObjectCreate) { } ``` This condition does not reuse any code from `CreateObject` so there is no point to inline this code.
return t.createTombstone(ctx, prm)
}
b := make([]byte, 32) b := make([]byte, 32)
if _, err := io.ReadFull(rand.Reader, b); err != nil { if _, err := io.ReadFull(rand.Reader, b); err != nil {
return nil, err return nil, err
@ -338,6 +351,35 @@ func (t *TestFrostFS) CreateObject(_ context.Context, prm frostfs.PrmObjectCreat
}, nil }, nil
} }
func (t *TestFrostFS) createTombstone(ctx context.Context, prm frostfs.PrmObjectCreate) (*frostfs.CreateObjectResult, error) {
payload, err := io.ReadAll(prm.Payload)
if err != nil {
return nil, err
}
var tomb object.Tombstone
err = tomb.Unmarshal(payload)
if err != nil {
return nil, err
}
for _, objID := range tomb.Members() {
prmDelete := frostfs.PrmObjectDelete{
PrmAuth: prm.PrmAuth,
Container: prm.Container,
Object: objID,
}
if err = t.DeleteObject(ctx, prmDelete); err != nil {
return nil, err
}
}
return &frostfs.CreateObjectResult{
CreationEpoch: t.currentEpoch,
}, nil
}
func (t *TestFrostFS) DeleteObject(ctx context.Context, prm frostfs.PrmObjectDelete) error { func (t *TestFrostFS) DeleteObject(ctx context.Context, prm frostfs.PrmObjectDelete) error {
var addr oid.Address var addr oid.Address
addr.SetContainer(prm.Container) addr.SetContainer(prm.Container)
@ -459,6 +501,10 @@ func (t *TestFrostFS) PatchObject(ctx context.Context, prm frostfs.PrmObjectPatc
return newID, nil return newID, nil
} }
func (t *TestFrostFS) Relations() relations.Relations {
return &RelationsMock{}
}
func (t *TestFrostFS) AddContainerPolicyChain(_ context.Context, prm frostfs.PrmAddContainerPolicyChain) error { func (t *TestFrostFS) AddContainerPolicyChain(_ context.Context, prm frostfs.PrmAddContainerPolicyChain) error {
list, ok := t.chains[prm.ContainerID.EncodeToString()] list, ok := t.chains[prm.ContainerID.EncodeToString()]
if !ok { if !ok {
@ -499,3 +545,25 @@ func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool {
} }
return false return false
} }
type RelationsMock struct{}
func (r *RelationsMock) GetSplitInfo(context.Context, cid.ID, oid.ID, relations.Tokens) (*object.SplitInfo, error) {
return nil, relations.ErrNoSplitInfo
}
func (r *RelationsMock) ListChildrenByLinker(context.Context, cid.ID, oid.ID, relations.Tokens) ([]oid.ID, error) {
return nil, nil
}
func (r *RelationsMock) GetLeftSibling(context.Context, cid.ID, oid.ID, relations.Tokens) (oid.ID, error) {
return oid.ID{}, nil
}
func (r *RelationsMock) FindSiblingBySplitID(context.Context, cid.ID, *object.SplitID, relations.Tokens) ([]oid.ID, error) {
return nil, nil
}
func (r *RelationsMock) FindSiblingByParentID(_ context.Context, _ cid.ID, _ oid.ID, _ relations.Tokens) ([]oid.ID, error) {
return nil, nil
}

View file

@ -28,9 +28,11 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap" "go.uber.org/zap"
) )
@ -44,6 +46,8 @@ type (
BufferMaxSizeForPut() uint64 BufferMaxSizeForPut() uint64
MD5Enabled() bool MD5Enabled() bool
FormContainerZone(ns string) string FormContainerZone(ns string) string
TombstoneMembersSize() int
TombstoneLifetime() uint64
} }
Layer struct { Layer struct {
@ -58,6 +62,7 @@ type (
gateKey *keys.PrivateKey gateKey *keys.PrivateKey
corsCnrInfo *data.BucketInfo corsCnrInfo *data.BucketInfo
lifecycleCnrInfo *data.BucketInfo lifecycleCnrInfo *data.BucketInfo
workerPool *ants.Pool
} }
Config struct { Config struct {
@ -71,6 +76,7 @@ type (
GateKey *keys.PrivateKey GateKey *keys.PrivateKey
CORSCnrInfo *data.BucketInfo CORSCnrInfo *data.BucketInfo
LifecycleCnrInfo *data.BucketInfo LifecycleCnrInfo *data.BucketInfo
WorkerPool *ants.Pool
} }
// AnonymousKey contains data for anonymous requests. // AnonymousKey contains data for anonymous requests.
@ -249,6 +255,7 @@ func NewLayer(log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer {
gateKey: config.GateKey, gateKey: config.GateKey,
corsCnrInfo: config.CORSCnrInfo, corsCnrInfo: config.CORSCnrInfo,
lifecycleCnrInfo: config.LifecycleCnrInfo, lifecycleCnrInfo: config.LifecycleCnrInfo,
workerPool: config.WorkerPool,
} }
} }
@ -557,7 +564,7 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
} }
for _, nodeVersion := range nodeVersions { for _, nodeVersion := range nodeVersions {
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil { if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj, networkInfo); obj.Error != nil {
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) { if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
return obj return obj
} }
@ -596,7 +603,7 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
} }
if !nodeVersion.IsDeleteMarker { if !nodeVersion.IsDeleteMarker {
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil { if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj, networkInfo); obj.Error != nil {
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) { if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
return obj return obj
} }
@ -732,19 +739,19 @@ func (n *Layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, ob
return n.getNodeVersion(ctx, objVersion) return n.getNodeVersion(ctx, objVersion)
} }
func (n *Layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) { func (n *Layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject, networkInfo netmap.NetworkInfo) (string, error) {
if nodeVersion.IsDeleteMarker { if nodeVersion.IsDeleteMarker {
return obj.VersionID, nil return obj.VersionID, nil
} }
if nodeVersion.IsCombined { if nodeVersion.IsCombined {
return "", n.removeCombinedObject(ctx, bkt, nodeVersion) return "", n.removeCombinedObject(ctx, bkt, nodeVersion, networkInfo)
} }
return "", n.objectDelete(ctx, bkt, nodeVersion.OID) return "", n.objectDelete(ctx, bkt, nodeVersion.OID)
} }
func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion) error { func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, networkInfo netmap.NetworkInfo) error {
combinedObj, err := n.objectGet(ctx, bkt, nodeVersion.OID) combinedObj, err := n.objectGet(ctx, bkt, nodeVersion.OID)
if err != nil { if err != nil {
return fmt.Errorf("get combined object '%s': %w", nodeVersion.OID.EncodeToString(), err) return fmt.Errorf("get combined object '%s': %w", nodeVersion.OID.EncodeToString(), err)
@ -755,20 +762,26 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
return fmt.Errorf("unmarshal combined object parts: %w", err) return fmt.Errorf("unmarshal combined object parts: %w", err)
} }
for _, part := range parts { tokens := prepareTokensParameter(ctx, bkt.Owner)
if err = n.objectDelete(ctx, bkt, part.OID); err == nil { oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, nodeVersion.OID, tokens)
continue if err != nil {
} n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
zap.String("oid", nodeVersion.OID.EncodeToString()), zap.Error(err))
if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) {
return fmt.Errorf("couldn't delete part '%s': %w", part.OID.EncodeToString(), err)
}
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", bkt.CID.EncodeToString()),
zap.String("oid", part.OID.EncodeToString()), zap.Int("part number", part.Number), zap.Error(err))
} }
return n.objectDelete(ctx, bkt, nodeVersion.OID) members := append(oids, nodeVersion.OID)
for _, part := range parts {
oids, err = relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, part.OID, tokens)
if err != nil {
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
zap.String("oid", part.OID.EncodeToString()), zap.Error(err))
}
members = append(members, append(oids, part.OID)...)
}
dkirillov marked this conversation as resolved Outdated

What about something like this:

diff --git a/api/layer/layer.go b/api/layer/layer.go
index 51e7edd8..d5bdca41 100644
--- a/api/layer/layer.go
+++ b/api/layer/layer.go
@@ -777,14 +777,14 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
 
 func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) {
 	var wg sync.WaitGroup
-	i := 0
 	tombstoneMembersSize := n.features.TombstoneMembersSize()
-	for ; i < len(members)/tombstoneMembersSize; i++ {
-		n.submitPutTombstone(ctx, bkt, members[tombstoneMembersSize*i:tombstoneMembersSize*(i+1)], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg)
-	}
 
-	if len(members)%tombstoneMembersSize != 0 {
-		n.submitPutTombstone(ctx, bkt, members[tombstoneMembersSize*i:], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg)
+	for i := 0; i < len(members); i += tombstoneMembersSize {
+		end := tombstoneMembersSize * (i + 1)
+		if end > len(members) {
+			end = len(members)
+		}
+		n.submitPutTombstone(ctx, bkt, members[i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg)
 	}
 
 	wg.Wait()

What about something like this: ```diff diff --git a/api/layer/layer.go b/api/layer/layer.go index 51e7edd8..d5bdca41 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -777,14 +777,14 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) { var wg sync.WaitGroup - i := 0 tombstoneMembersSize := n.features.TombstoneMembersSize() - for ; i < len(members)/tombstoneMembersSize; i++ { - n.submitPutTombstone(ctx, bkt, members[tombstoneMembersSize*i:tombstoneMembersSize*(i+1)], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg) - } - if len(members)%tombstoneMembersSize != 0 { - n.submitPutTombstone(ctx, bkt, members[tombstoneMembersSize*i:], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg) + for i := 0; i < len(members); i += tombstoneMembersSize { + end := tombstoneMembersSize * (i + 1) + if end > len(members) { + end = len(members) + } + n.submitPutTombstone(ctx, bkt, members[i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg) } wg.Wait() ```
n.putTombstones(ctx, bkt, networkInfo, members)
return nil
} }
// DeleteObjects from the storage. // DeleteObjects from the storage.

View file

@ -22,7 +22,9 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/minio/sio" "github.com/minio/sio"
"go.uber.org/zap" "go.uber.org/zap"
@ -565,16 +567,31 @@ func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
return err return err
} }
networkInfo, err := n.GetNetworkInfo(ctx)
if err != nil {
return fmt.Errorf("get network info: %w", err)
}
n.deleteUploadedParts(ctx, p.Bkt, parts, networkInfo)
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo)
}
func (n *Layer) deleteUploadedParts(ctx context.Context, bkt *data.BucketInfo, parts PartsInfo, networkInfo netmap.NetworkInfo) {
members := make([]oid.ID, 0)
tokens := prepareTokensParameter(ctx, bkt.Owner)
for _, infos := range parts { for _, infos := range parts {
for _, info := range infos { for _, info := range infos {
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil { oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, info.OID, tokens)
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()), if err != nil {
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err)) n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
zap.String("oid", info.OID.EncodeToString()), zap.Error(err))
} }
members = append(members, append(oids, info.OID)...)
} }
} }
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo) n.putTombstones(ctx, bkt, networkInfo, members)
} }
func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) { func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {

91
api/layer/tombstone.go Normal file
View file

@ -0,0 +1,91 @@
package layer
import (
"bytes"
"context"
"fmt"
"strconv"
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap"
)
func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) {
var wg sync.WaitGroup
tombstoneMembersSize := n.features.TombstoneMembersSize()
tombstoneLifetime := n.features.TombstoneLifetime()
for i := 0; i < len(members); i += tombstoneMembersSize {
end := tombstoneMembersSize * (i + 1)
if end > len(members) {
end = len(members)
}
n.submitPutTombstone(ctx, bkt, members[i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg)
}
wg.Wait()
}
func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup) {
tomb := object.NewTombstone()
tomb.SetExpirationEpoch(expEpoch)
tomb.SetMembers(members)
wg.Add(1)
err := n.workerPool.Submit(func() {
defer wg.Done()
if err := n.putTombstoneObject(ctx, tomb, bkt); err != nil {
n.reqLogger(ctx).Warn(logs.FailedToPutTombstoneObject, zap.String("cid", bkt.CID.EncodeToString()), zap.Error(err))
}
})
if err != nil {
wg.Done()
n.reqLogger(ctx).Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
}
}
func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone, bktInfo *data.BucketInfo) error {
payload, err := tomb.Marshal()
if err != nil {
return fmt.Errorf("marshal tombstone: %w", err)
}
prm := frostfs.PrmObjectCreate{
Container: bktInfo.CID,
Attributes: [][2]string{{objectV2.SysAttributeExpEpoch, strconv.FormatUint(tomb.ExpirationEpoch(), 10)}},
Payload: bytes.NewReader(payload),
CreationTime: TimeNow(ctx),
ClientCut: n.features.ClientCut(),
WithoutHomomorphicHash: bktInfo.HomomorphicHashDisabled,
BufferMaxSize: n.features.BufferMaxSizeForPut(),
Type: object.TypeTombstone,
}
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
_, err = n.frostFS.CreateObject(ctx, prm)
return err
}
func prepareTokensParameter(ctx context.Context, bktOwner user.ID) relations.Tokens {
tokens := relations.Tokens{}
if bd, err := middleware.GetBoxData(ctx); err == nil && bd.Gate.BearerToken != nil {
if bd.Gate.BearerToken.Impersonate() || bktOwner.Equals(bearer.ResolveIssuer(*bd.Gate.BearerToken)) {
tokens.Bearer = bd.Gate.BearerToken
}
}
return tokens
}

View file

@ -50,6 +50,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/go-chi/chi/v5/middleware" "github.com/go-chi/chi/v5/middleware"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/panjf2000/ants/v2"
"github.com/spf13/viper" "github.com/spf13/viper"
"go.uber.org/zap" "go.uber.org/zap"
"golang.org/x/exp/slices" "golang.org/x/exp/slices"
@ -105,6 +106,7 @@ type (
frostfsidValidation bool frostfsidValidation bool
accessbox *cid.ID accessbox *cid.ID
dialerSource *internalnet.DialerSource dialerSource *internalnet.DialerSource
workerPoolSize int
mu sync.RWMutex mu sync.RWMutex
namespaces Namespaces namespaces Namespaces
@ -125,6 +127,8 @@ type (
vhsNamespacesEnabled map[string]bool vhsNamespacesEnabled map[string]bool
retryMaxBackoff time.Duration retryMaxBackoff time.Duration
retryStrategy handler.RetryStrategy retryStrategy handler.RetryStrategy
tombstoneMembersSize int
tombstoneLifetime uint64
} }
maxClientsConfig struct { maxClientsConfig struct {
@ -249,12 +253,21 @@ func (a *App) initLayer(ctx context.Context) {
GateKey: a.key, GateKey: a.key,
CORSCnrInfo: corsCnrInfo, CORSCnrInfo: corsCnrInfo,
dkirillov marked this conversation as resolved Outdated

Can we use fetch... function that check config provided value and will return default if it's 0 or less?

Can we use `fetch...` function that check config provided value and will return default if it's 0 or less?
LifecycleCnrInfo: lifecycleCnrInfo, LifecycleCnrInfo: lifecycleCnrInfo,
WorkerPool: a.initWorkerPool(),
} }
// prepare object layer // prepare object layer
a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg) a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg)
} }
func (a *App) initWorkerPool() *ants.Pool {
workerPool, err := ants.NewPool(a.settings.workerPoolSize)
if err != nil {
a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err))
}
return workerPool
}
func newAppSettings(log *Logger, v *viper.Viper) *appSettings { func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
settings := &appSettings{ settings := &appSettings{
logLevel: log.lvl, logLevel: log.lvl,
@ -264,6 +277,7 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
reconnectInterval: fetchReconnectInterval(v), reconnectInterval: fetchReconnectInterval(v),
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled), frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
dialerSource: getDialerSource(log.logger, v), dialerSource: getDialerSource(log.logger, v),
workerPoolSize: fetchTombstoneWorkerPoolSize(v),
} }
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow) settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow)
@ -300,6 +314,8 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
httpLoggingMaxLogSize := v.GetInt(cfgHTTPLoggingMaxLogSize) httpLoggingMaxLogSize := v.GetInt(cfgHTTPLoggingMaxLogSize)
httpLoggingOutputPath := v.GetString(cfgHTTPLoggingDestination) httpLoggingOutputPath := v.GetString(cfgHTTPLoggingDestination)
httpLoggingUseGzip := v.GetBool(cfgHTTPLoggingGzip) httpLoggingUseGzip := v.GetBool(cfgHTTPLoggingGzip)
tombstoneMembersSize := fetchTombstoneMembersSize(v)
tombstoneLifetime := fetchTombstoneLifetime(v)
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -329,6 +345,8 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
s.vhsHeader = vhsHeader s.vhsHeader = vhsHeader
s.servernameHeader = servernameHeader s.servernameHeader = servernameHeader
s.vhsNamespacesEnabled = vhsNamespacesEnabled s.vhsNamespacesEnabled = vhsNamespacesEnabled
s.tombstoneMembersSize = tombstoneMembersSize
s.tombstoneLifetime = tombstoneLifetime
} }
func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool { func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool {
@ -531,6 +549,18 @@ func (s *appSettings) AccessBoxContainer() (cid.ID, bool) {
return cid.ID{}, false return cid.ID{}, false
} }
func (s *appSettings) TombstoneMembersSize() int {
s.mu.RLock()
defer s.mu.RUnlock()
return s.tombstoneMembersSize
}
func (s *appSettings) TombstoneLifetime() uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.tombstoneLifetime
}
func (a *App) initAPI(ctx context.Context) { func (a *App) initAPI(ctx context.Context) {
a.initLayer(ctx) a.initLayer(ctx)
a.initHandler() a.initHandler()

View file

@ -74,6 +74,10 @@ const (
defaultRetryMaxAttempts = 4 defaultRetryMaxAttempts = 4
defaultRetryMaxBackoff = 30 * time.Second defaultRetryMaxBackoff = 30 * time.Second
defaultRetryStrategy = handler.RetryStrategyExponential defaultRetryStrategy = handler.RetryStrategyExponential
defaultTombstoneLifetime = 10
dkirillov marked this conversation as resolved Outdated

We can write just

defaultTombstoneLifetime       = 10
We can write just ```golang defaultTombstoneLifetime = 10 ```
defaultTombstoneMembersSize = 100
defaultTombstoneWorkerPoolSize = 100
) )
var ( var (
@ -239,7 +243,10 @@ const ( // Settings.
// Sets max buffer size for read payload in put operations. // Sets max buffer size for read payload in put operations.
cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put" cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put"
// Sets max attempt to make successful tree request. // Sets max attempt to make successful tree request.
cfgTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts" cfgTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts"
cfgTombstoneLifetime = "frostfs.tombstone.lifetime"
cfgTombstoneMembersSize = "frostfs.tombstone.members_size"
cfgTombstoneWorkerPoolSize = "frostfs.tombstone.worker_pool_size"
// Specifies the timeout after which unhealthy client be closed during rebalancing // Specifies the timeout after which unhealthy client be closed during rebalancing
// if it will become healthy back. // if it will become healthy back.
@ -804,6 +811,33 @@ func fetchTracingAttributes(v *viper.Viper) (map[string]string, error) {
return attributes, nil return attributes, nil
} }
func fetchTombstoneLifetime(v *viper.Viper) uint64 {
tombstoneLifetime := v.GetUint64(cfgTombstoneLifetime)
if tombstoneLifetime <= 0 {
tombstoneLifetime = defaultTombstoneLifetime
}
return tombstoneLifetime
}
func fetchTombstoneMembersSize(v *viper.Viper) int {
tombstoneMembersSize := v.GetInt(cfgTombstoneMembersSize)
if tombstoneMembersSize <= 0 {
tombstoneMembersSize = defaultTombstoneMembersSize
}
return tombstoneMembersSize
}
func fetchTombstoneWorkerPoolSize(v *viper.Viper) int {
tombstoneWorkerPoolSize := v.GetInt(cfgTombstoneWorkerPoolSize)
if tombstoneWorkerPoolSize <= 0 {
tombstoneWorkerPoolSize = defaultTombstoneWorkerPoolSize
}
return tombstoneWorkerPoolSize
}
func newSettings() *viper.Viper { func newSettings() *viper.Viper {
v := viper.New() v := viper.New()
@ -876,6 +910,9 @@ func newSettings() *viper.Viper {
// frostfs // frostfs
v.SetDefault(cfgBufferMaxSizeForPut, 1024*1024) // 1mb v.SetDefault(cfgBufferMaxSizeForPut, 1024*1024) // 1mb
v.SetDefault(cfgTombstoneLifetime, defaultTombstoneLifetime)
v.SetDefault(cfgTombstoneMembersSize, defaultTombstoneMembersSize)
v.SetDefault(cfgTombstoneWorkerPoolSize, defaultTombstoneWorkerPoolSize)
// kludge // kludge
v.SetDefault(cfgKludgeUseDefaultXMLNS, false) v.SetDefault(cfgKludgeUseDefaultXMLNS, false)

View file

@ -163,6 +163,12 @@ S3_GW_FROSTFS_BUFFER_MAX_SIZE_FOR_PUT=1048576
S3_GW_FROSTFS_TREE_POOL_MAX_ATTEMPTS=0 S3_GW_FROSTFS_TREE_POOL_MAX_ATTEMPTS=0
# Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back. # Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back.
S3_GW_FROSTFS_GRACEFUL_CLOSE_ON_SWITCH_TIMEOUT=10s S3_GW_FROSTFS_GRACEFUL_CLOSE_ON_SWITCH_TIMEOUT=10s
# Tombstone's lifetime in epochs.
S3_GW_FROSTFS_TOMBSTONE_LIFETIME=10
# Maximum number of object IDs in one tombstone.
S3_GW_FROSTFS_TOMBSTONE_MEMBERS_SIZE=100
dkirillov marked this conversation as resolved Outdated

It seems the variable must be

S3_GW_FROSTFS_TOMBSTONE_MEMBERS_SIZE=100
It seems the variable must be ``` S3_GW_FROSTFS_TOMBSTONE_MEMBERS_SIZE=100 ```
# Maximum worker count in layer's worker pool that create tombstones.
S3_GW_FROSTFS_TOMBSTONE_WORKER_POOL_SIZE=100
# List of allowed AccessKeyID prefixes # List of allowed AccessKeyID prefixes
# If not set, S3 GW will accept all AccessKeyIDs # If not set, S3 GW will accept all AccessKeyIDs

View file

@ -199,6 +199,13 @@ frostfs:
buffer_max_size_for_put: 1048576 buffer_max_size_for_put: 1048576
# Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back. # Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back.
graceful_close_on_switch_timeout: 10s graceful_close_on_switch_timeout: 10s
tombstone:
# Tombstone's lifetime in epochs.
lifetime: 10
# Maximum number of object IDs in one tombstone.
members_size: 100
dkirillov marked this conversation as resolved Outdated

It seems the variable must be

tombstone:
  members_size: 100
It seems the variable must be ```yaml tombstone: members_size: 100 ```
# Maximum worker count in layer's worker pool that create tombstones.
worker_pool_size: 100
# List of allowed AccessKeyID prefixes # List of allowed AccessKeyID prefixes
# If the parameter is omitted, S3 GW will accept all AccessKeyIDs # If the parameter is omitted, S3 GW will accept all AccessKeyIDs

View file

@ -588,6 +588,10 @@ frostfs:
buffer_max_size_for_put: 1048576 # 1mb buffer_max_size_for_put: 1048576 # 1mb
tree_pool_max_attempts: 0 tree_pool_max_attempts: 0
graceful_close_on_switch_timeout: 10s graceful_close_on_switch_timeout: 10s
tombstone:
lifetime: 10
members_size: 100
worker_pool_size: 100
``` ```
| Parameter | Type | SIGHUP reload | Default value | Description | | Parameter | Type | SIGHUP reload | Default value | Description |
@ -597,6 +601,9 @@ frostfs:
| `buffer_max_size_for_put` | `uint64` | yes | `1048576` | Sets max buffer size for read payload in put operations. | | `buffer_max_size_for_put` | `uint64` | yes | `1048576` | Sets max buffer size for read payload in put operations. |
| `tree_pool_max_attempts` | `uint32` | no | `0` | Sets max attempt to make successful tree request. Value 0 means the number of attempts equals to number of nodes in pool. | | `tree_pool_max_attempts` | `uint32` | no | `0` | Sets max attempt to make successful tree request. Value 0 means the number of attempts equals to number of nodes in pool. |
| `graceful_close_on_switch_timeout` | `duration` | no | `10s` | Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back. | | `graceful_close_on_switch_timeout` | `duration` | no | `10s` | Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back. |
| `tombstone.lifetime` | `uint64` | yes | 10 | Tombstone's lifetime in epochs. |
| `tombstone.members_size` | `int` | yes | 100 | Maximum number of object IDs in one tombstone. |
| `tombstone.worker_pool_size` | `int` | no | 100 | Maximum worker count in layer's worker pool that create tombstones. |
# `resolve_bucket` section # `resolve_bucket` section

View file

@ -19,6 +19,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
@ -214,6 +215,7 @@ func (x *FrostFS) CreateObject(ctx context.Context, prm frostfs.PrmObjectCreate)
obj.SetOwnerID(x.owner) obj.SetOwnerID(x.owner)
obj.SetAttributes(attrs...) obj.SetAttributes(attrs...)
obj.SetPayloadSize(prm.PayloadSize) obj.SetPayloadSize(prm.PayloadSize)
obj.SetType(prm.Type)
alexvanin marked this conversation as resolved Outdated

Is default type a regular object? As far as I understand, prm.Type is optional.

Is default type a regular object? As far as I understand, `prm.Type` is optional.

Yes, default type is regular

Yes, default type is regular
dkirillov marked this conversation as resolved Outdated

It seems we can unconditionally set type (0 means regular object)

It seems we can unconditionally set type (0 means regular object)
if prm.BearerToken == nil && prm.PrivateKey != nil { if prm.BearerToken == nil && prm.PrivateKey != nil {
var owner user.ID var owner user.ID
@ -438,6 +440,10 @@ func (x *FrostFS) PatchObject(ctx context.Context, prm frostfs.PrmObjectPatch) (
return res.ObjectID, nil return res.ObjectID, nil
} }
func (x *FrostFS) Relations() relations.Relations {
return x.pool
}
// ResolverFrostFS represents virtual connection to the FrostFS network. // ResolverFrostFS represents virtual connection to the FrostFS network.
// It implements resolver.FrostFS. // It implements resolver.FrostFS.
type ResolverFrostFS struct { type ResolverFrostFS struct {

View file

@ -178,4 +178,7 @@ const (
MultinetDialSuccess = "multinet dial successful" MultinetDialSuccess = "multinet dial successful"
MultinetDialFail = "multinet dial failed" MultinetDialFail = "multinet dial failed"
FailedToParseHTTPTime = "failed to parse http time, header is ignored" FailedToParseHTTPTime = "failed to parse http time, header is ignored"
FailedToPutTombstoneObject = "failed to put tombstone object"
FailedToCreateWorkerPool = "failed to create worker pool"
FailedToListAllObjectRelations = "failed to list all object relations"
) )