Add option to remove object on replacement in unversioned bucket

Signed-off-by: Alex Vanin <a.vanin@yadro.com>
This commit is contained in:
Alexey Vanin 2024-12-27 13:36:13 +03:00
parent e721ac82f7
commit 16d276dcb7
8 changed files with 84 additions and 7 deletions

View file

@ -63,6 +63,10 @@ func (k *FeatureSettingsMock) FormContainerZone(ns string) string {
return ns + ".ns" return ns + ".ns"
} }
func (k *FeatureSettingsMock) RemoveOnReplace() bool {
return false
}
var _ frostfs.FrostFS = (*TestFrostFS)(nil) var _ frostfs.FrostFS = (*TestFrostFS)(nil)
type TestFrostFS struct { type TestFrostFS struct {

View file

@ -44,6 +44,7 @@ type (
BufferMaxSizeForPut() uint64 BufferMaxSizeForPut() uint64
MD5Enabled() bool MD5Enabled() bool
FormContainerZone(ns string) string FormContainerZone(ns string) string
RemoveOnReplace() bool
} }
Layer struct { Layer struct {

View file

@ -342,9 +342,25 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
newVersion.Size = createdObj.Size newVersion.Size = createdObj.Size
} }
if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil { if newVersion.IsUnversioned && n.features.RemoveOnReplace() {
n.log.Debug("trying to put new object and remove old if exists")
var oldObject *oid.ID
newVersion.ID, oldObject, err = n.treeService.AddOrReplaceVersion(ctx, p.BktInfo, newVersion)
if err != nil {
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err) return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
} }
if oldObject != nil {
err = n.objectDelete(ctx, p.BktInfo, *oldObject)
if err != nil {
n.log.Error("delete previous object version", zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", *oldObject), zap.Error(err))
}
}
} else {
newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion)
if err != nil {
return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err)
}
}
if p.Lock != nil && (p.Lock.Retention != nil || p.Lock.LegalHold != nil) { if p.Lock != nil && (p.Lock.Retention != nil || p.Lock.LegalHold != nil) {
putLockInfoPrms := &PutLockInfoParams{ putLockInfoPrms := &PutLockInfoParams{

View file

@ -46,6 +46,7 @@ type Service interface {
InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error)
GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error)
AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error)
AddOrReplaceVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, *oid.ID, error)
RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) error RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) error
PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error

View file

@ -276,6 +276,46 @@ func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo
return newVersion.ID, nil return newVersion.ID, nil
} }
func (t *TreeServiceMock) AddOrReplaceVersion(_ context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, *oid.ID, error) {
cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()]
if !ok {
t.versions[bktInfo.CID.EncodeToString()] = map[string][]*data.NodeVersion{
newVersion.FilePath: {newVersion},
}
return newVersion.ID, nil, nil
}
versions, ok := cnrVersionsMap[newVersion.FilePath]
if !ok {
cnrVersionsMap[newVersion.FilePath] = []*data.NodeVersion{newVersion}
return newVersion.ID, nil, nil
}
sort.Slice(versions, func(i, j int) bool {
return versions[i].ID < versions[j].ID
})
if len(versions) != 0 {
newVersion.ID = versions[len(versions)-1].ID + 1
newVersion.Timestamp = versions[len(versions)-1].Timestamp + 1
}
result := versions
if newVersion.IsUnversioned {
result = make([]*data.NodeVersion, 0, len(versions))
for _, node := range versions {
if !node.IsUnversioned {
result = append(result, node)
}
}
}
cnrVersionsMap[newVersion.FilePath] = append(result, newVersion)
return newVersion.ID, nil, nil
}
func (t *TreeServiceMock) RemoveVersion(_ context.Context, bktInfo *data.BucketInfo, nodeID uint64) error { func (t *TreeServiceMock) RemoveVersion(_ context.Context, bktInfo *data.BucketInfo, nodeID uint64) error {
cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()] cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()]
if !ok { if !ok {

View file

@ -99,6 +99,7 @@ type (
frostfsidValidation bool frostfsidValidation bool
accessbox *cid.ID accessbox *cid.ID
dialerSource *internalnet.DialerSource dialerSource *internalnet.DialerSource
removeOnReplace bool
mu sync.RWMutex mu sync.RWMutex
namespaces Namespaces namespaces Namespaces
@ -239,6 +240,7 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
reconnectInterval: fetchReconnectInterval(v), reconnectInterval: fetchReconnectInterval(v),
frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled), frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled),
dialerSource: getDialerSource(log.logger, v), dialerSource: getDialerSource(log.logger, v),
removeOnReplace: v.GetBool(cfgRemoveOnReplace),
} }
settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow) settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow)
@ -453,6 +455,10 @@ func (s *appSettings) FormContainerZone(ns string) string {
return ns + ".ns" return ns + ".ns"
} }
func (s *appSettings) RemoveOnReplace() bool {
return s.removeOnReplace
}
func (s *appSettings) isDefaultNamespace(ns string) bool { func (s *appSettings) isDefaultNamespace(ns string) bool {
s.mu.RLock() s.mu.RLock()
namespaces := s.defaultNamespaces namespaces := s.defaultNamespaces

View file

@ -259,6 +259,7 @@ const ( // Settings.
// Enable return MD5 checksum in ETag. // Enable return MD5 checksum in ETag.
cfgMD5Enabled = "features.md5.enabled" cfgMD5Enabled = "features.md5.enabled"
cfgPolicyDenyByDefault = "features.policy.deny_by_default" cfgPolicyDenyByDefault = "features.policy.deny_by_default"
cfgRemoveOnReplace = "features.remove_on_replace"
// FrostfsID. // FrostfsID.
cfgFrostfsIDContract = "frostfsid.contract" cfgFrostfsIDContract = "frostfsid.contract"

View file

@ -1298,6 +1298,11 @@ func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, tre
} }
func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) { func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) {
nodeID, _, err := c.addVersion(ctx, bktInfo, versionTree, version)
return nodeID, err
}
func (c *Tree) AddOrReplaceVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, *oid.ID, error) {
return c.addVersion(ctx, bktInfo, versionTree, version) return c.addVersion(ctx, bktInfo, versionTree, version)
} }
@ -1688,7 +1693,8 @@ func (c *Tree) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.Bucket
return getObjectTagging(nodes[isTagKV]), lockInfo, nil return getObjectTagging(nodes[isTagKV]), lockInfo, nil
} }
func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, error) { // addVersion returns added tree node id and OID attribute of the replaced node. If no tree node were replaced, returns nil value
func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, *oid.ID, error) {
path := pathFromName(version.FilePath) path := pathFromName(version.FilePath)
meta := map[string]string{ meta := map[string]string{
oidKV: version.OID.EncodeToString(), oidKV: version.OID.EncodeToString(),
@ -1722,18 +1728,20 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID
node, err := c.getUnversioned(ctx, bktInfo, treeID, version.FilePath) node, err := c.getUnversioned(ctx, bktInfo, treeID, version.FilePath)
if err == nil { if err == nil {
if err = c.service.MoveNode(ctx, bktInfo, treeID, node.ID, node.ParenID, meta); err != nil { if err = c.service.MoveNode(ctx, bktInfo, treeID, node.ID, node.ParenID, meta); err != nil {
return 0, err return 0, nil, err
} }
return node.ID, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, node.ID) return node.ID, &node.OID, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, node.ID)
} }
if !errors.Is(err, tree.ErrNodeNotFound) { if !errors.Is(err, tree.ErrNodeNotFound) {
return 0, err return 0, nil, err
} }
} }
return c.service.AddNodeByPath(ctx, bktInfo, treeID, path[:len(path)-1], meta) nodeID, err := c.service.AddNodeByPath(ctx, bktInfo, treeID, path[:len(path)-1], meta)
return nodeID, nil, err
} }
func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error { func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {