[#559] Remove multipart objects using tombstones #560
13 changed files with 322 additions and 23 deletions
|
@ -32,6 +32,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/panjf2000/ants/v2"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
|
@ -184,6 +185,11 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig) (*handlerContextBas
|
||||||
|
|
||||||
features := &layer.FeatureSettingsMock{}
|
features := &layer.FeatureSettingsMock{}
|
||||||
|
|
||||||
|
pool, err := ants.NewPool(1)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
layerCfg := &layer.Config{
|
layerCfg := &layer.Config{
|
||||||
Cache: layer.NewCache(cacheCfg),
|
Cache: layer.NewCache(cacheCfg),
|
||||||
AnonKey: layer.AnonymousKey{Key: key},
|
AnonKey: layer.AnonymousKey{Key: key},
|
||||||
|
@ -191,6 +197,7 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig) (*handlerContextBas
|
||||||
TreeService: treeMock,
|
TreeService: treeMock,
|
||||||
Features: features,
|
Features: features,
|
||||||
GateOwner: owner,
|
GateOwner: owner,
|
||||||
|
WorkerPool: pool,
|
||||||
}
|
}
|
||||||
|
|
||||||
var pp netmap.PlacementPolicy
|
var pp netmap.PlacementPolicy
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||||
|
@ -170,6 +171,9 @@ type PrmObjectCreate struct {
|
||||||
|
|
||||||
// Sets max buffer size to read payload.
|
// Sets max buffer size to read payload.
|
||||||
BufferMaxSize uint64
|
BufferMaxSize uint64
|
||||||
|
|
||||||
|
// Object type (optional).
|
||||||
|
Type object.Type
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateObjectResult is a result parameter of FrostFS.CreateObject operation.
|
// CreateObjectResult is a result parameter of FrostFS.CreateObject operation.
|
||||||
|
@ -344,4 +348,7 @@ type FrostFS interface {
|
||||||
|
|
||||||
// NetworkInfo returns parameters of FrostFS network.
|
// NetworkInfo returns parameters of FrostFS network.
|
||||||
NetworkInfo(context.Context) (netmap.NetworkInfo, error)
|
NetworkInfo(context.Context) (netmap.NetworkInfo, error)
|
||||||
|
|
||||||
|
// Relations returns implementation of relations.Relations interface.
|
||||||
|
Relations() relations.Relations
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||||
|
@ -35,6 +36,14 @@ type FeatureSettingsMock struct {
|
||||||
md5Enabled bool
|
md5Enabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (k *FeatureSettingsMock) TombstoneLifetime() uint64 {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
func (k *FeatureSettingsMock) TombstoneMembersSize() int {
|
||||||
|
return 2
|
||||||
|
}
|
||||||
|
|
||||||
func (k *FeatureSettingsMock) BufferMaxSizeForPut() uint64 {
|
func (k *FeatureSettingsMock) BufferMaxSizeForPut() uint64 {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
@ -262,7 +271,11 @@ func (t *TestFrostFS) RangeObject(ctx context.Context, prm frostfs.PrmObjectRang
|
||||||
return io.NopCloser(bytes.NewReader(payload)), nil
|
return io.NopCloser(bytes.NewReader(payload)), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TestFrostFS) CreateObject(_ context.Context, prm frostfs.PrmObjectCreate) (*frostfs.CreateObjectResult, error) {
|
func (t *TestFrostFS) CreateObject(ctx context.Context, prm frostfs.PrmObjectCreate) (*frostfs.CreateObjectResult, error) {
|
||||||
|
if prm.Type == object.TypeTombstone {
|
||||||
alexvanin marked this conversation as resolved
Outdated
|
|||||||
|
return t.createTombstone(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
b := make([]byte, 32)
|
b := make([]byte, 32)
|
||||||
if _, err := io.ReadFull(rand.Reader, b); err != nil {
|
if _, err := io.ReadFull(rand.Reader, b); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -338,6 +351,35 @@ func (t *TestFrostFS) CreateObject(_ context.Context, prm frostfs.PrmObjectCreat
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *TestFrostFS) createTombstone(ctx context.Context, prm frostfs.PrmObjectCreate) (*frostfs.CreateObjectResult, error) {
|
||||||
|
payload, err := io.ReadAll(prm.Payload)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var tomb object.Tombstone
|
||||||
|
err = tomb.Unmarshal(payload)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, objID := range tomb.Members() {
|
||||||
|
prmDelete := frostfs.PrmObjectDelete{
|
||||||
|
PrmAuth: prm.PrmAuth,
|
||||||
|
Container: prm.Container,
|
||||||
|
Object: objID,
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = t.DeleteObject(ctx, prmDelete); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &frostfs.CreateObjectResult{
|
||||||
|
CreationEpoch: t.currentEpoch,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (t *TestFrostFS) DeleteObject(ctx context.Context, prm frostfs.PrmObjectDelete) error {
|
func (t *TestFrostFS) DeleteObject(ctx context.Context, prm frostfs.PrmObjectDelete) error {
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(prm.Container)
|
addr.SetContainer(prm.Container)
|
||||||
|
@ -459,6 +501,10 @@ func (t *TestFrostFS) PatchObject(ctx context.Context, prm frostfs.PrmObjectPatc
|
||||||
return newID, nil
|
return newID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *TestFrostFS) Relations() relations.Relations {
|
||||||
|
return &RelationsMock{}
|
||||||
|
}
|
||||||
|
|
||||||
func (t *TestFrostFS) AddContainerPolicyChain(_ context.Context, prm frostfs.PrmAddContainerPolicyChain) error {
|
func (t *TestFrostFS) AddContainerPolicyChain(_ context.Context, prm frostfs.PrmAddContainerPolicyChain) error {
|
||||||
list, ok := t.chains[prm.ContainerID.EncodeToString()]
|
list, ok := t.chains[prm.ContainerID.EncodeToString()]
|
||||||
if !ok {
|
if !ok {
|
||||||
|
@ -499,3 +545,25 @@ func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool {
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type RelationsMock struct{}
|
||||||
|
|
||||||
|
func (r *RelationsMock) GetSplitInfo(context.Context, cid.ID, oid.ID, relations.Tokens) (*object.SplitInfo, error) {
|
||||||
|
return nil, relations.ErrNoSplitInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RelationsMock) ListChildrenByLinker(context.Context, cid.ID, oid.ID, relations.Tokens) ([]oid.ID, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RelationsMock) GetLeftSibling(context.Context, cid.ID, oid.ID, relations.Tokens) (oid.ID, error) {
|
||||||
|
return oid.ID{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RelationsMock) FindSiblingBySplitID(context.Context, cid.ID, *object.SplitID, relations.Tokens) ([]oid.ID, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RelationsMock) FindSiblingByParentID(_ context.Context, _ cid.ID, _ oid.ID, _ relations.Tokens) ([]oid.ID, error) {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
|
@ -28,9 +28,11 @@ import (
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/panjf2000/ants/v2"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -44,6 +46,8 @@ type (
|
||||||
BufferMaxSizeForPut() uint64
|
BufferMaxSizeForPut() uint64
|
||||||
MD5Enabled() bool
|
MD5Enabled() bool
|
||||||
FormContainerZone(ns string) string
|
FormContainerZone(ns string) string
|
||||||
|
TombstoneMembersSize() int
|
||||||
|
TombstoneLifetime() uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
Layer struct {
|
Layer struct {
|
||||||
|
@ -58,6 +62,7 @@ type (
|
||||||
gateKey *keys.PrivateKey
|
gateKey *keys.PrivateKey
|
||||||
corsCnrInfo *data.BucketInfo
|
corsCnrInfo *data.BucketInfo
|
||||||
lifecycleCnrInfo *data.BucketInfo
|
lifecycleCnrInfo *data.BucketInfo
|
||||||
|
workerPool *ants.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
Config struct {
|
Config struct {
|
||||||
|
@ -71,6 +76,7 @@ type (
|
||||||
GateKey *keys.PrivateKey
|
GateKey *keys.PrivateKey
|
||||||
CORSCnrInfo *data.BucketInfo
|
CORSCnrInfo *data.BucketInfo
|
||||||
LifecycleCnrInfo *data.BucketInfo
|
LifecycleCnrInfo *data.BucketInfo
|
||||||
|
WorkerPool *ants.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
// AnonymousKey contains data for anonymous requests.
|
// AnonymousKey contains data for anonymous requests.
|
||||||
|
@ -249,6 +255,7 @@ func NewLayer(log *zap.Logger, frostFS frostfs.FrostFS, config *Config) *Layer {
|
||||||
gateKey: config.GateKey,
|
gateKey: config.GateKey,
|
||||||
corsCnrInfo: config.CORSCnrInfo,
|
corsCnrInfo: config.CORSCnrInfo,
|
||||||
lifecycleCnrInfo: config.LifecycleCnrInfo,
|
lifecycleCnrInfo: config.LifecycleCnrInfo,
|
||||||
|
workerPool: config.WorkerPool,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -557,7 +564,7 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, nodeVersion := range nodeVersions {
|
for _, nodeVersion := range nodeVersions {
|
||||||
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
|
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj, networkInfo); obj.Error != nil {
|
||||||
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
|
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
|
||||||
return obj
|
return obj
|
||||||
}
|
}
|
||||||
|
@ -596,7 +603,7 @@ func (n *Layer) deleteObject(ctx context.Context, bkt *data.BucketInfo, settings
|
||||||
}
|
}
|
||||||
|
|
||||||
if !nodeVersion.IsDeleteMarker {
|
if !nodeVersion.IsDeleteMarker {
|
||||||
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj); obj.Error != nil {
|
if obj.DeleteMarkVersion, obj.Error = n.removeOldVersion(ctx, bkt, nodeVersion, obj, networkInfo); obj.Error != nil {
|
||||||
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
|
if !client.IsErrObjectAlreadyRemoved(obj.Error) && !client.IsErrObjectNotFound(obj.Error) {
|
||||||
return obj
|
return obj
|
||||||
}
|
}
|
||||||
|
@ -732,19 +739,19 @@ func (n *Layer) getLastNodeVersion(ctx context.Context, bkt *data.BucketInfo, ob
|
||||||
return n.getNodeVersion(ctx, objVersion)
|
return n.getNodeVersion(ctx, objVersion)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject) (string, error) {
|
func (n *Layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, obj *VersionedObject, networkInfo netmap.NetworkInfo) (string, error) {
|
||||||
if nodeVersion.IsDeleteMarker {
|
if nodeVersion.IsDeleteMarker {
|
||||||
return obj.VersionID, nil
|
return obj.VersionID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if nodeVersion.IsCombined {
|
if nodeVersion.IsCombined {
|
||||||
return "", n.removeCombinedObject(ctx, bkt, nodeVersion)
|
return "", n.removeCombinedObject(ctx, bkt, nodeVersion, networkInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
return "", n.objectDelete(ctx, bkt, nodeVersion.OID)
|
return "", n.objectDelete(ctx, bkt, nodeVersion.OID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion) error {
|
func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, networkInfo netmap.NetworkInfo) error {
|
||||||
combinedObj, err := n.objectGet(ctx, bkt, nodeVersion.OID)
|
combinedObj, err := n.objectGet(ctx, bkt, nodeVersion.OID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get combined object '%s': %w", nodeVersion.OID.EncodeToString(), err)
|
return fmt.Errorf("get combined object '%s': %w", nodeVersion.OID.EncodeToString(), err)
|
||||||
|
@ -755,20 +762,26 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
|
||||||
return fmt.Errorf("unmarshal combined object parts: %w", err)
|
return fmt.Errorf("unmarshal combined object parts: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, part := range parts {
|
tokens := prepareTokensParameter(ctx, bkt.Owner)
|
||||||
if err = n.objectDelete(ctx, bkt, part.OID); err == nil {
|
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, nodeVersion.OID, tokens)
|
||||||
continue
|
if err != nil {
|
||||||
}
|
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
|
||||||
|
zap.String("oid", nodeVersion.OID.EncodeToString()), zap.Error(err))
|
||||||
if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) {
|
|
||||||
return fmt.Errorf("couldn't delete part '%s': %w", part.OID.EncodeToString(), err)
|
|
||||||
}
|
|
||||||
|
|
||||||
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", bkt.CID.EncodeToString()),
|
|
||||||
zap.String("oid", part.OID.EncodeToString()), zap.Int("part number", part.Number), zap.Error(err))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return n.objectDelete(ctx, bkt, nodeVersion.OID)
|
members := append(oids, nodeVersion.OID)
|
||||||
|
for _, part := range parts {
|
||||||
|
oids, err = relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, part.OID, tokens)
|
||||||
|
if err != nil {
|
||||||
|
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
|
||||||
|
zap.String("oid", part.OID.EncodeToString()), zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
members = append(members, append(oids, part.OID)...)
|
||||||
|
}
|
||||||
|
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
What about something like this:
What about something like this:
```diff
diff --git a/api/layer/layer.go b/api/layer/layer.go
index 51e7edd8..d5bdca41 100644
--- a/api/layer/layer.go
+++ b/api/layer/layer.go
@@ -777,14 +777,14 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) {
var wg sync.WaitGroup
- i := 0
tombstoneMembersSize := n.features.TombstoneMembersSize()
- for ; i < len(members)/tombstoneMembersSize; i++ {
- n.submitPutTombstone(ctx, bkt, members[tombstoneMembersSize*i:tombstoneMembersSize*(i+1)], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg)
- }
- if len(members)%tombstoneMembersSize != 0 {
- n.submitPutTombstone(ctx, bkt, members[tombstoneMembersSize*i:], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg)
+ for i := 0; i < len(members); i += tombstoneMembersSize {
+ end := tombstoneMembersSize * (i + 1)
+ if end > len(members) {
+ end = len(members)
+ }
+ n.submitPutTombstone(ctx, bkt, members[i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg)
}
wg.Wait()
```
|
|||||||
|
n.putTombstones(ctx, bkt, networkInfo, members)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteObjects from the storage.
|
// DeleteObjects from the storage.
|
||||||
|
|
|
@ -22,7 +22,9 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/tree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"github.com/minio/sio"
|
"github.com/minio/sio"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -565,16 +567,31 @@ func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
networkInfo, err := n.GetNetworkInfo(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get network info: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
n.deleteUploadedParts(ctx, p.Bkt, parts, networkInfo)
|
||||||
|
|
||||||
|
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) deleteUploadedParts(ctx context.Context, bkt *data.BucketInfo, parts PartsInfo, networkInfo netmap.NetworkInfo) {
|
||||||
|
members := make([]oid.ID, 0)
|
||||||
|
tokens := prepareTokensParameter(ctx, bkt.Owner)
|
||||||
for _, infos := range parts {
|
for _, infos := range parts {
|
||||||
for _, info := range infos {
|
for _, info := range infos {
|
||||||
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil {
|
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, info.OID, tokens)
|
||||||
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()),
|
if err != nil {
|
||||||
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err))
|
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
|
||||||
|
zap.String("oid", info.OID.EncodeToString()), zap.Error(err))
|
||||||
}
|
}
|
||||||
|
members = append(members, append(oids, info.OID)...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo)
|
n.putTombstones(ctx, bkt, networkInfo, members)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {
|
func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {
|
||||||
|
|
91
api/layer/tombstone.go
Normal file
91
api/layer/tombstone.go
Normal file
|
@ -0,0 +1,91 @@
|
||||||
|
package layer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/frostfs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
tombstoneMembersSize := n.features.TombstoneMembersSize()
|
||||||
|
tombstoneLifetime := n.features.TombstoneLifetime()
|
||||||
|
|
||||||
|
for i := 0; i < len(members); i += tombstoneMembersSize {
|
||||||
|
end := tombstoneMembersSize * (i + 1)
|
||||||
|
if end > len(members) {
|
||||||
|
end = len(members)
|
||||||
|
}
|
||||||
|
n.submitPutTombstone(ctx, bkt, members[i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup) {
|
||||||
|
tomb := object.NewTombstone()
|
||||||
|
tomb.SetExpirationEpoch(expEpoch)
|
||||||
|
tomb.SetMembers(members)
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
err := n.workerPool.Submit(func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
if err := n.putTombstoneObject(ctx, tomb, bkt); err != nil {
|
||||||
|
n.reqLogger(ctx).Warn(logs.FailedToPutTombstoneObject, zap.String("cid", bkt.CID.EncodeToString()), zap.Error(err))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
wg.Done()
|
||||||
|
n.reqLogger(ctx).Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone, bktInfo *data.BucketInfo) error {
|
||||||
|
payload, err := tomb.Marshal()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("marshal tombstone: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
prm := frostfs.PrmObjectCreate{
|
||||||
|
Container: bktInfo.CID,
|
||||||
|
Attributes: [][2]string{{objectV2.SysAttributeExpEpoch, strconv.FormatUint(tomb.ExpirationEpoch(), 10)}},
|
||||||
|
Payload: bytes.NewReader(payload),
|
||||||
|
CreationTime: TimeNow(ctx),
|
||||||
|
ClientCut: n.features.ClientCut(),
|
||||||
|
WithoutHomomorphicHash: bktInfo.HomomorphicHashDisabled,
|
||||||
|
BufferMaxSize: n.features.BufferMaxSizeForPut(),
|
||||||
|
Type: object.TypeTombstone,
|
||||||
|
}
|
||||||
|
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
|
||||||
|
|
||||||
|
_, err = n.frostFS.CreateObject(ctx, prm)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareTokensParameter(ctx context.Context, bktOwner user.ID) relations.Tokens {
|
||||||
|
tokens := relations.Tokens{}
|
||||||
|
|
||||||
|
if bd, err := middleware.GetBoxData(ctx); err == nil && bd.Gate.BearerToken != nil {
|
||||||
|
if bd.Gate.BearerToken.Impersonate() || bktOwner.Equals(bearer.ResolveIssuer(*bd.Gate.BearerToken)) {
|
||||||
|
tokens.Bearer = bd.Gate.BearerToken
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return tokens
|
||||||
|
}
|
|
@ -50,6 +50,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"github.com/go-chi/chi/v5/middleware"
|
"github.com/go-chi/chi/v5/middleware"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/panjf2000/ants/v2"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"golang.org/x/exp/slices"
|
"golang.org/x/exp/slices"
|
||||||
|
@ -105,6 +106,7 @@ type (
|
||||||
frostfsidValidation bool
|
frostfsidValidation bool
|
||||||
accessbox *cid.ID
|
accessbox *cid.ID
|
||||||
dialerSource *internalnet.DialerSource
|
dialerSource *internalnet.DialerSource
|
||||||
|
workerPoolSize int
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
namespaces Namespaces
|
namespaces Namespaces
|
||||||
|
@ -125,6 +127,8 @@ type (
|
||||||
vhsNamespacesEnabled map[string]bool
|
vhsNamespacesEnabled map[string]bool
|
||||||
retryMaxBackoff time.Duration
|
retryMaxBackoff time.Duration
|
||||||
retryStrategy handler.RetryStrategy
|
retryStrategy handler.RetryStrategy
|
||||||
|
tombstoneMembersSize int
|
||||||
|
tombstoneLifetime uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
maxClientsConfig struct {
|
maxClientsConfig struct {
|
||||||
|
@ -249,12 +253,21 @@ func (a *App) initLayer(ctx context.Context) {
|
||||||
GateKey: a.key,
|
GateKey: a.key,
|
||||||
CORSCnrInfo: corsCnrInfo,
|
CORSCnrInfo: corsCnrInfo,
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Can we use Can we use `fetch...` function that check config provided value and will return default if it's 0 or less?
|
|||||||
LifecycleCnrInfo: lifecycleCnrInfo,
|
LifecycleCnrInfo: lifecycleCnrInfo,
|
||||||
|
WorkerPool: a.initWorkerPool(),
|
||||||
}
|
}
|
||||||
|
|
||||||
// prepare object layer
|
// prepare object layer
|
||||||
a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg)
|
a.obj = layer.NewLayer(a.log, frostfs.NewFrostFS(a.pool, a.key), layerCfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *App) initWorkerPool() *ants.Pool {
|
||||||
|
workerPool, err := ants.NewPool(a.settings.workerPoolSize)
|
||||||
|
if err != nil {
|
||||||
|
a.log.Fatal(logs.FailedToCreateWorkerPool, zap.Error(err))
|
||||||
|
}
|
||||||
|
return workerPool
|
||||||
|
}
|
||||||
|
|
||||||
func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
||||||
settings := &appSettings{
|
settings := &appSettings{
|
||||||
logLevel: log.lvl,
|
logLevel: log.lvl,
|
||||||
|
@ -264,6 +277,7 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
||||||
reconnectInterval: fetchReconnectInterval(v),
|
reconnectInterval: fetchReconnectInterval(v),
|
||||||
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
|
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
|
||||||
dialerSource: getDialerSource(log.logger, v),
|
dialerSource: getDialerSource(log.logger, v),
|
||||||
|
workerPoolSize: fetchTombstoneWorkerPoolSize(v),
|
||||||
}
|
}
|
||||||
|
|
||||||
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow)
|
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow)
|
||||||
|
@ -300,6 +314,8 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
|
||||||
httpLoggingMaxLogSize := v.GetInt(cfgHTTPLoggingMaxLogSize)
|
httpLoggingMaxLogSize := v.GetInt(cfgHTTPLoggingMaxLogSize)
|
||||||
httpLoggingOutputPath := v.GetString(cfgHTTPLoggingDestination)
|
httpLoggingOutputPath := v.GetString(cfgHTTPLoggingDestination)
|
||||||
httpLoggingUseGzip := v.GetBool(cfgHTTPLoggingGzip)
|
httpLoggingUseGzip := v.GetBool(cfgHTTPLoggingGzip)
|
||||||
|
tombstoneMembersSize := fetchTombstoneMembersSize(v)
|
||||||
|
tombstoneLifetime := fetchTombstoneLifetime(v)
|
||||||
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
@ -329,6 +345,8 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
|
||||||
s.vhsHeader = vhsHeader
|
s.vhsHeader = vhsHeader
|
||||||
s.servernameHeader = servernameHeader
|
s.servernameHeader = servernameHeader
|
||||||
s.vhsNamespacesEnabled = vhsNamespacesEnabled
|
s.vhsNamespacesEnabled = vhsNamespacesEnabled
|
||||||
|
s.tombstoneMembersSize = tombstoneMembersSize
|
||||||
|
s.tombstoneLifetime = tombstoneLifetime
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool {
|
func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool {
|
||||||
|
@ -531,6 +549,18 @@ func (s *appSettings) AccessBoxContainer() (cid.ID, bool) {
|
||||||
return cid.ID{}, false
|
return cid.ID{}, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) TombstoneMembersSize() int {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.tombstoneMembersSize
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) TombstoneLifetime() uint64 {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.tombstoneLifetime
|
||||||
|
}
|
||||||
|
|
||||||
func (a *App) initAPI(ctx context.Context) {
|
func (a *App) initAPI(ctx context.Context) {
|
||||||
a.initLayer(ctx)
|
a.initLayer(ctx)
|
||||||
a.initHandler()
|
a.initHandler()
|
||||||
|
|
|
@ -74,6 +74,10 @@ const (
|
||||||
defaultRetryMaxAttempts = 4
|
defaultRetryMaxAttempts = 4
|
||||||
defaultRetryMaxBackoff = 30 * time.Second
|
defaultRetryMaxBackoff = 30 * time.Second
|
||||||
defaultRetryStrategy = handler.RetryStrategyExponential
|
defaultRetryStrategy = handler.RetryStrategyExponential
|
||||||
|
|
||||||
|
defaultTombstoneLifetime = 10
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
We can write just
We can write just
```golang
defaultTombstoneLifetime = 10
```
|
|||||||
|
defaultTombstoneMembersSize = 100
|
||||||
|
defaultTombstoneWorkerPoolSize = 100
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -239,7 +243,10 @@ const ( // Settings.
|
||||||
// Sets max buffer size for read payload in put operations.
|
// Sets max buffer size for read payload in put operations.
|
||||||
cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put"
|
cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put"
|
||||||
// Sets max attempt to make successful tree request.
|
// Sets max attempt to make successful tree request.
|
||||||
cfgTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts"
|
cfgTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts"
|
||||||
|
cfgTombstoneLifetime = "frostfs.tombstone.lifetime"
|
||||||
|
cfgTombstoneMembersSize = "frostfs.tombstone.members_size"
|
||||||
|
cfgTombstoneWorkerPoolSize = "frostfs.tombstone.worker_pool_size"
|
||||||
|
|
||||||
// Specifies the timeout after which unhealthy client be closed during rebalancing
|
// Specifies the timeout after which unhealthy client be closed during rebalancing
|
||||||
// if it will become healthy back.
|
// if it will become healthy back.
|
||||||
|
@ -804,6 +811,33 @@ func fetchTracingAttributes(v *viper.Viper) (map[string]string, error) {
|
||||||
return attributes, nil
|
return attributes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func fetchTombstoneLifetime(v *viper.Viper) uint64 {
|
||||||
|
tombstoneLifetime := v.GetUint64(cfgTombstoneLifetime)
|
||||||
|
if tombstoneLifetime <= 0 {
|
||||||
|
tombstoneLifetime = defaultTombstoneLifetime
|
||||||
|
}
|
||||||
|
|
||||||
|
return tombstoneLifetime
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchTombstoneMembersSize(v *viper.Viper) int {
|
||||||
|
tombstoneMembersSize := v.GetInt(cfgTombstoneMembersSize)
|
||||||
|
if tombstoneMembersSize <= 0 {
|
||||||
|
tombstoneMembersSize = defaultTombstoneMembersSize
|
||||||
|
}
|
||||||
|
|
||||||
|
return tombstoneMembersSize
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchTombstoneWorkerPoolSize(v *viper.Viper) int {
|
||||||
|
tombstoneWorkerPoolSize := v.GetInt(cfgTombstoneWorkerPoolSize)
|
||||||
|
if tombstoneWorkerPoolSize <= 0 {
|
||||||
|
tombstoneWorkerPoolSize = defaultTombstoneWorkerPoolSize
|
||||||
|
}
|
||||||
|
|
||||||
|
return tombstoneWorkerPoolSize
|
||||||
|
}
|
||||||
|
|
||||||
func newSettings() *viper.Viper {
|
func newSettings() *viper.Viper {
|
||||||
v := viper.New()
|
v := viper.New()
|
||||||
|
|
||||||
|
@ -876,6 +910,9 @@ func newSettings() *viper.Viper {
|
||||||
|
|
||||||
// frostfs
|
// frostfs
|
||||||
v.SetDefault(cfgBufferMaxSizeForPut, 1024*1024) // 1mb
|
v.SetDefault(cfgBufferMaxSizeForPut, 1024*1024) // 1mb
|
||||||
|
v.SetDefault(cfgTombstoneLifetime, defaultTombstoneLifetime)
|
||||||
|
v.SetDefault(cfgTombstoneMembersSize, defaultTombstoneMembersSize)
|
||||||
|
v.SetDefault(cfgTombstoneWorkerPoolSize, defaultTombstoneWorkerPoolSize)
|
||||||
|
|
||||||
// kludge
|
// kludge
|
||||||
v.SetDefault(cfgKludgeUseDefaultXMLNS, false)
|
v.SetDefault(cfgKludgeUseDefaultXMLNS, false)
|
||||||
|
|
|
@ -163,6 +163,12 @@ S3_GW_FROSTFS_BUFFER_MAX_SIZE_FOR_PUT=1048576
|
||||||
S3_GW_FROSTFS_TREE_POOL_MAX_ATTEMPTS=0
|
S3_GW_FROSTFS_TREE_POOL_MAX_ATTEMPTS=0
|
||||||
# Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back.
|
# Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back.
|
||||||
S3_GW_FROSTFS_GRACEFUL_CLOSE_ON_SWITCH_TIMEOUT=10s
|
S3_GW_FROSTFS_GRACEFUL_CLOSE_ON_SWITCH_TIMEOUT=10s
|
||||||
|
# Tombstone's lifetime in epochs.
|
||||||
|
S3_GW_FROSTFS_TOMBSTONE_LIFETIME=10
|
||||||
|
# Maximum number of object IDs in one tombstone.
|
||||||
|
S3_GW_FROSTFS_TOMBSTONE_MEMBERS_SIZE=100
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
It seems the variable must be
It seems the variable must be
```
S3_GW_FROSTFS_TOMBSTONE_MEMBERS_SIZE=100
```
|
|||||||
|
# Maximum worker count in layer's worker pool that create tombstones.
|
||||||
|
S3_GW_FROSTFS_TOMBSTONE_WORKER_POOL_SIZE=100
|
||||||
|
|
||||||
# List of allowed AccessKeyID prefixes
|
# List of allowed AccessKeyID prefixes
|
||||||
# If not set, S3 GW will accept all AccessKeyIDs
|
# If not set, S3 GW will accept all AccessKeyIDs
|
||||||
|
|
|
@ -199,6 +199,13 @@ frostfs:
|
||||||
buffer_max_size_for_put: 1048576
|
buffer_max_size_for_put: 1048576
|
||||||
# Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back.
|
# Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back.
|
||||||
graceful_close_on_switch_timeout: 10s
|
graceful_close_on_switch_timeout: 10s
|
||||||
|
tombstone:
|
||||||
|
# Tombstone's lifetime in epochs.
|
||||||
|
lifetime: 10
|
||||||
|
# Maximum number of object IDs in one tombstone.
|
||||||
|
members_size: 100
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
It seems the variable must be
It seems the variable must be
```yaml
tombstone:
members_size: 100
```
|
|||||||
|
# Maximum worker count in layer's worker pool that create tombstones.
|
||||||
|
worker_pool_size: 100
|
||||||
|
|
||||||
# List of allowed AccessKeyID prefixes
|
# List of allowed AccessKeyID prefixes
|
||||||
# If the parameter is omitted, S3 GW will accept all AccessKeyIDs
|
# If the parameter is omitted, S3 GW will accept all AccessKeyIDs
|
||||||
|
|
|
@ -588,6 +588,10 @@ frostfs:
|
||||||
buffer_max_size_for_put: 1048576 # 1mb
|
buffer_max_size_for_put: 1048576 # 1mb
|
||||||
tree_pool_max_attempts: 0
|
tree_pool_max_attempts: 0
|
||||||
graceful_close_on_switch_timeout: 10s
|
graceful_close_on_switch_timeout: 10s
|
||||||
|
tombstone:
|
||||||
|
lifetime: 10
|
||||||
|
members_size: 100
|
||||||
|
worker_pool_size: 100
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|
@ -597,6 +601,9 @@ frostfs:
|
||||||
| `buffer_max_size_for_put` | `uint64` | yes | `1048576` | Sets max buffer size for read payload in put operations. |
|
| `buffer_max_size_for_put` | `uint64` | yes | `1048576` | Sets max buffer size for read payload in put operations. |
|
||||||
| `tree_pool_max_attempts` | `uint32` | no | `0` | Sets max attempt to make successful tree request. Value 0 means the number of attempts equals to number of nodes in pool. |
|
| `tree_pool_max_attempts` | `uint32` | no | `0` | Sets max attempt to make successful tree request. Value 0 means the number of attempts equals to number of nodes in pool. |
|
||||||
| `graceful_close_on_switch_timeout` | `duration` | no | `10s` | Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back. |
|
| `graceful_close_on_switch_timeout` | `duration` | no | `10s` | Specifies the timeout after which unhealthy client be closed during rebalancing if it will become healthy back. |
|
||||||
|
| `tombstone.lifetime` | `uint64` | yes | 10 | Tombstone's lifetime in epochs. |
|
||||||
|
| `tombstone.members_size` | `int` | yes | 100 | Maximum number of object IDs in one tombstone. |
|
||||||
|
| `tombstone.worker_pool_size` | `int` | no | 100 | Maximum worker count in layer's worker pool that create tombstones. |
|
||||||
|
|
||||||
# `resolve_bucket` section
|
# `resolve_bucket` section
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
|
@ -214,6 +215,7 @@ func (x *FrostFS) CreateObject(ctx context.Context, prm frostfs.PrmObjectCreate)
|
||||||
obj.SetOwnerID(x.owner)
|
obj.SetOwnerID(x.owner)
|
||||||
obj.SetAttributes(attrs...)
|
obj.SetAttributes(attrs...)
|
||||||
obj.SetPayloadSize(prm.PayloadSize)
|
obj.SetPayloadSize(prm.PayloadSize)
|
||||||
|
obj.SetType(prm.Type)
|
||||||
alexvanin marked this conversation as resolved
Outdated
alexvanin
commented
Is default type a regular object? As far as I understand, Is default type a regular object? As far as I understand, `prm.Type` is optional.
mbiryukova
commented
Yes, default type is regular Yes, default type is regular
|
|||||||
|
|
||||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
It seems we can unconditionally set type (0 means regular object) It seems we can unconditionally set type (0 means regular object)
|
|||||||
if prm.BearerToken == nil && prm.PrivateKey != nil {
|
if prm.BearerToken == nil && prm.PrivateKey != nil {
|
||||||
var owner user.ID
|
var owner user.ID
|
||||||
|
@ -438,6 +440,10 @@ func (x *FrostFS) PatchObject(ctx context.Context, prm frostfs.PrmObjectPatch) (
|
||||||
return res.ObjectID, nil
|
return res.ObjectID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (x *FrostFS) Relations() relations.Relations {
|
||||||
|
return x.pool
|
||||||
|
}
|
||||||
|
|
||||||
// ResolverFrostFS represents virtual connection to the FrostFS network.
|
// ResolverFrostFS represents virtual connection to the FrostFS network.
|
||||||
// It implements resolver.FrostFS.
|
// It implements resolver.FrostFS.
|
||||||
type ResolverFrostFS struct {
|
type ResolverFrostFS struct {
|
||||||
|
|
|
@ -178,4 +178,7 @@ const (
|
||||||
MultinetDialSuccess = "multinet dial successful"
|
MultinetDialSuccess = "multinet dial successful"
|
||||||
MultinetDialFail = "multinet dial failed"
|
MultinetDialFail = "multinet dial failed"
|
||||||
FailedToParseHTTPTime = "failed to parse http time, header is ignored"
|
FailedToParseHTTPTime = "failed to parse http time, header is ignored"
|
||||||
|
FailedToPutTombstoneObject = "failed to put tombstone object"
|
||||||
|
FailedToCreateWorkerPool = "failed to create worker pool"
|
||||||
|
FailedToListAllObjectRelations = "failed to list all object relations"
|
||||||
)
|
)
|
||||||
|
|
Loading…
Add table
Reference in a new issue
nitpick: I think this looks better in separate private
This condition does not reuse any code from
CreateObject
so there is no point to inline this code.