diff --git a/api/layer/frostfs_mock.go b/api/layer/frostfs_mock.go index f61214a6..6e8a7ad6 100644 --- a/api/layer/frostfs_mock.go +++ b/api/layer/frostfs_mock.go @@ -63,6 +63,10 @@ func (k *FeatureSettingsMock) FormContainerZone(ns string) string { return ns + ".ns" } +func (k *FeatureSettingsMock) RemoveOnReplace() bool { + return false +} + var _ frostfs.FrostFS = (*TestFrostFS)(nil) type TestFrostFS struct { diff --git a/api/layer/layer.go b/api/layer/layer.go index 12d064c1..09aed98d 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -44,6 +44,7 @@ type ( BufferMaxSizeForPut() uint64 MD5Enabled() bool FormContainerZone(ns string) string + RemoveOnReplace() bool } Layer struct { diff --git a/api/layer/object.go b/api/layer/object.go index 555a0cbf..a19f5bcd 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -342,8 +342,24 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend newVersion.Size = createdObj.Size } - if newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion); err != nil { - return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err) + if newVersion.IsUnversioned && n.features.RemoveOnReplace() { + n.log.Debug("trying to put new object and remove old if exists") + var oldObject *oid.ID + newVersion.ID, oldObject, err = n.treeService.AddOrReplaceVersion(ctx, p.BktInfo, newVersion) + if err != nil { + return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err) + } + if oldObject != nil { + err = n.objectDelete(ctx, p.BktInfo, *oldObject) + if err != nil { + n.log.Error("delete previous object version", zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", *oldObject), zap.Error(err)) + } + } + } else { + newVersion.ID, err = n.treeService.AddVersion(ctx, p.BktInfo, newVersion) + if err != nil { + return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err) + } } if p.Lock != nil && (p.Lock.Retention != nil || p.Lock.LegalHold != nil) { diff --git a/api/layer/tree/tree_service.go b/api/layer/tree/tree_service.go index db079b10..3a3a343b 100644 --- a/api/layer/tree/tree_service.go +++ b/api/layer/tree/tree_service.go @@ -46,6 +46,7 @@ type Service interface { InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) GetUnversioned(ctx context.Context, bktInfo *data.BucketInfo, objectName string) (*data.NodeVersion, error) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, error) + AddOrReplaceVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, *oid.ID, error) RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) error PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index 577f4068..675e17c3 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -276,6 +276,46 @@ func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo return newVersion.ID, nil } +func (t *TreeServiceMock) AddOrReplaceVersion(_ context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, *oid.ID, error) { + cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()] + if !ok { + t.versions[bktInfo.CID.EncodeToString()] = map[string][]*data.NodeVersion{ + newVersion.FilePath: {newVersion}, + } + return newVersion.ID, nil, nil + } + + versions, ok := cnrVersionsMap[newVersion.FilePath] + if !ok { + cnrVersionsMap[newVersion.FilePath] = []*data.NodeVersion{newVersion} + return newVersion.ID, nil, nil + } + + sort.Slice(versions, func(i, j int) bool { + return versions[i].ID < versions[j].ID + }) + + if len(versions) != 0 { + newVersion.ID = versions[len(versions)-1].ID + 1 + newVersion.Timestamp = versions[len(versions)-1].Timestamp + 1 + } + + result := versions + + if newVersion.IsUnversioned { + result = make([]*data.NodeVersion, 0, len(versions)) + for _, node := range versions { + if !node.IsUnversioned { + result = append(result, node) + } + } + } + + cnrVersionsMap[newVersion.FilePath] = append(result, newVersion) + + return newVersion.ID, nil, nil +} + func (t *TreeServiceMock) RemoveVersion(_ context.Context, bktInfo *data.BucketInfo, nodeID uint64) error { cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()] if !ok { diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 3c96eb62..a56f2e65 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -99,6 +99,7 @@ type ( frostfsidValidation bool accessbox *cid.ID dialerSource *internalnet.DialerSource + removeOnReplace bool mu sync.RWMutex namespaces Namespaces @@ -239,6 +240,7 @@ func newAppSettings(log *Logger, v *viper.Viper) *appSettings { reconnectInterval: fetchReconnectInterval(v), frostfsidValidation: v.GetBool(cfgFrostfsIDValidationEnabled), dialerSource: getDialerSource(log.logger, v), + removeOnReplace: v.GetBool(cfgRemoveOnReplace), } settings.resolveZoneList = v.GetStringSlice(cfgResolveBucketAllow) @@ -453,6 +455,10 @@ func (s *appSettings) FormContainerZone(ns string) string { return ns + ".ns" } +func (s *appSettings) RemoveOnReplace() bool { + return s.removeOnReplace +} + func (s *appSettings) isDefaultNamespace(ns string) bool { s.mu.RLock() namespaces := s.defaultNamespaces diff --git a/cmd/s3-gw/app_settings.go b/cmd/s3-gw/app_settings.go index abe5f3f8..d5c6a989 100644 --- a/cmd/s3-gw/app_settings.go +++ b/cmd/s3-gw/app_settings.go @@ -259,6 +259,7 @@ const ( // Settings. // Enable return MD5 checksum in ETag. cfgMD5Enabled = "features.md5.enabled" cfgPolicyDenyByDefault = "features.policy.deny_by_default" + cfgRemoveOnReplace = "features.remove_on_replace" // FrostfsID. cfgFrostfsIDContract = "frostfsid.contract" diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index 438b1297..5921e980 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -1298,6 +1298,11 @@ func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, tre } func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, error) { + nodeID, _, err := c.addVersion(ctx, bktInfo, versionTree, version) + return nodeID, err +} + +func (c *Tree) AddOrReplaceVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, *oid.ID, error) { return c.addVersion(ctx, bktInfo, versionTree, version) } @@ -1688,7 +1693,8 @@ func (c *Tree) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.Bucket return getObjectTagging(nodes[isTagKV]), lockInfo, nil } -func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, error) { +// addVersion returns added tree node id and OID attribute of the replaced node. If no tree node were replaced, returns nil value +func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, *oid.ID, error) { path := pathFromName(version.FilePath) meta := map[string]string{ oidKV: version.OID.EncodeToString(), @@ -1722,18 +1728,20 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID node, err := c.getUnversioned(ctx, bktInfo, treeID, version.FilePath) if err == nil { if err = c.service.MoveNode(ctx, bktInfo, treeID, node.ID, node.ParenID, meta); err != nil { - return 0, err + return 0, nil, err } - return node.ID, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, node.ID) + return node.ID, &node.OID, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, node.ID) } if !errors.Is(err, tree.ErrNodeNotFound) { - return 0, err + return 0, nil, err } } - return c.service.AddNodeByPath(ctx, bktInfo, treeID, path[:len(path)-1], meta) + nodeID, err := c.service.AddNodeByPath(ctx, bktInfo, treeID, path[:len(path)-1], meta) + + return nodeID, nil, err } func (c *Tree) clearOutdatedVersionInfo(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {