diff --git a/api/data/tree.go b/api/data/tree.go index f3580fc6..3fae4f86 100644 --- a/api/data/tree.go +++ b/api/data/tree.go @@ -20,6 +20,12 @@ type NodeVersion struct { IsCombined bool } +// OIDInfo represent OID to delete. +type OIDInfo struct { + ID oid.ID + IsCombined bool +} + // ExtendedNodeVersion contains additional node info to be able to sort versions by timestamp. type ExtendedNodeVersion struct { NodeVersion *NodeVersion diff --git a/api/handler/delete_test.go b/api/handler/delete_test.go index 8e44a72f..20ddac5b 100644 --- a/api/handler/delete_test.go +++ b/api/handler/delete_test.go @@ -493,6 +493,33 @@ func TestRemovalOnReplace(t *testing.T) { require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 2) } +func TestRemovalOnReplaceMultipart(t *testing.T) { + hc := prepareHandlerContext(t) + + bktName, objName := "bucket", "object" + bktInfo := createTestBucket(hc, bktName) + + multipartUpload(hc, bktName, objName, nil, 10, 10) + require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 2) + + multipartUpload(hc, bktName, objName, nil, 10, 10) + require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 4) + + hc.layerFeatures.SetRemoveOnReplace(true) + + multipartUpload(hc, bktName, objName, nil, 10, 10) + time.Sleep(time.Second) + require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 4) + + putObject(hc, bktName, objName) + time.Sleep(time.Second) + require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 3) + + multipartUpload(hc, bktName, objName, nil, 10, 10) + time.Sleep(time.Second) + require.Len(t, hc.MockedPool().AllObjects(bktInfo.CID), 4) +} + func createBucketAndObject(tc *handlerContext, bktName, objName string) (*data.BucketInfo, *data.ObjectInfo) { bktInfo := createTestBucket(tc, bktName) diff --git a/api/layer/layer.go b/api/layer/layer.go index b40758fd..bcb8af58 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -72,7 +72,7 @@ type ( removalParams struct { Auth frostfs.PrmAuth BktInfo *data.BucketInfo - OIDs []oid.ID + OIDsInfo []data.OIDInfo RequestID string TraceID string } @@ -733,8 +733,17 @@ func (n *Layer) removalRoutine(ctx context.Context) { } reqCtx, cancel := context.WithTimeout(ctx, n.features.RemoveOnReplaceTimeout()) - for _, objID := range prm.OIDs { - if err := n.objectDeleteBase(reqCtx, prm.BktInfo, objID, prm.Auth); err != nil { + for _, oidInfo := range prm.OIDsInfo { + if oidInfo.IsCombined { + networkInfo, err := n.GetNetworkInfo(ctx) + if err == nil { + err = n.removeCombinedObject(reqCtx, prm.BktInfo, oidInfo.ID, networkInfo, prm.Auth) + } + if err != nil { + n.log.Warn(logs.FailedToRemoveOldUnversionedCombinedObject, zap.String("request_id", prm.RequestID), + zap.String("trace_id", prm.TraceID), zap.Error(err), logs.TagField(logs.TagExternalStorage)) + } + } else if err := n.objectDeleteBase(reqCtx, prm.BktInfo, oidInfo.ID, prm.Auth); err != nil { n.log.Warn(logs.FailedToRemoveOldUnversionedObject, zap.String("request_id", prm.RequestID), zap.String("trace_id", prm.TraceID), zap.Error(err), logs.TagField(logs.TagExternalStorage)) } @@ -744,7 +753,7 @@ func (n *Layer) removalRoutine(ctx context.Context) { } } -func (n *Layer) tryRemove(ctx context.Context, bktInfo *data.BucketInfo, OIDs []oid.ID) { +func (n *Layer) tryRemove(ctx context.Context, bktInfo *data.BucketInfo, OIDsInfo []data.OIDInfo) { if !n.features.RemoveOnReplace() { return } @@ -753,7 +762,7 @@ func (n *Layer) tryRemove(ctx context.Context, bktInfo *data.BucketInfo, OIDs [] prm := removalParams{ Auth: frostfs.PrmAuth{}, BktInfo: bktInfo, - OIDs: OIDs, + OIDsInfo: OIDsInfo, RequestID: reqInfo.RequestID, TraceID: reqInfo.TraceID, } @@ -763,9 +772,9 @@ func (n *Layer) tryRemove(ctx context.Context, bktInfo *data.BucketInfo, OIDs [] select { case n.removalChan <- prm: default: - oidsStr := make([]string, len(OIDs)) - for i, d := range OIDs { - oidsStr[i] = d.EncodeToString() + oidsStr := make([]string, len(OIDsInfo)) + for i, d := range OIDsInfo { + oidsStr[i] = d.ID.EncodeToString() } n.reqLogger(ctx).Debug(logs.FailedToQueueOldUnversionedObjectToDelete, @@ -876,16 +885,16 @@ func (n *Layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, node } if nodeVersion.IsCombined { - return "", n.removeCombinedObject(ctx, bkt, nodeVersion, networkInfo) + return "", n.removeCombinedObject(ctx, bkt, nodeVersion.OID, networkInfo, frostfs.PrmAuth{}) } return "", n.objectDelete(ctx, bkt, nodeVersion.OID) } -func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion, networkInfo netmap.NetworkInfo) error { - combinedObj, err := n.objectGet(ctx, bkt, nodeVersion.OID) +func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, objID oid.ID, networkInfo netmap.NetworkInfo, prmAuth frostfs.PrmAuth) error { + combinedObj, err := n.objectGetBase(ctx, bkt, objID, prmAuth) if err != nil { - return fmt.Errorf("get combined object '%s': %w", nodeVersion.OID.EncodeToString(), err) + return fmt.Errorf("get combined object '%s': %w", objID.EncodeToString(), err) } var parts []*data.PartInfo @@ -893,7 +902,7 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, return fmt.Errorf("unmarshal combined object parts: %w", err) } - tokens := prepareTokensParameter(ctx, bkt.Owner) + tokens := prepareTokensParameterBase(ctx, bkt.Owner, prmAuth) members := make([]oid.ID, 0) // First gateway tries to delete all object parts. // In case of errors, abort multipart removal. @@ -905,18 +914,18 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, members = append(members, oids...) } - if err = n.putTombstones(ctx, bkt, networkInfo, members); err != nil { + if err = n.putTombstonesBase(ctx, bkt, networkInfo, members, prmAuth); err != nil { return fmt.Errorf("put tombstones with parts: %w", err) } // If all parts were removed successfully, remove multipart linking object. // Do not delete this object first, because gateway won't be able to find parts. - members, err = n.getMembers(ctx, bkt.CID, nodeVersion.OID, tokens) + members, err = n.getMembers(ctx, bkt.CID, objID, tokens) if err != nil { return err } - return n.putTombstones(ctx, bkt, networkInfo, members) + return n.putTombstonesBase(ctx, bkt, networkInfo, members, prmAuth) } // DeleteObjects from the storage. diff --git a/api/layer/tombstone.go b/api/layer/tombstone.go index 5aac279d..ab19536b 100644 --- a/api/layer/tombstone.go +++ b/api/layer/tombstone.go @@ -24,6 +24,10 @@ import ( ) func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) error { + return n.putTombstonesBase(ctx, bkt, networkInfo, members, frostfs.PrmAuth{}) +} + +func (n *Layer) putTombstonesBase(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID, prmAuth frostfs.PrmAuth) error { if len(members) == 0 { return nil } @@ -42,7 +46,7 @@ func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, network if end > len(members) { end = len(members) } - n.submitPutTombstone(ctx, bkt, members[tombstoneMembersSize*i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg, errCh) + n.submitPutTombstone(ctx, bkt, members[tombstoneMembersSize*i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg, prmAuth, errCh) } wg.Wait() @@ -55,7 +59,7 @@ func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, network return nil } -func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup, errCh chan<- error) { +func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup, prmAuth frostfs.PrmAuth, errCh chan<- error) { tomb := object.NewTombstone() tomb.SetExpirationEpoch(expEpoch) tomb.SetMembers(members) @@ -64,7 +68,7 @@ func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, me err := n.workerPool.Submit(func() { defer wg.Done() - if err := n.putTombstoneObject(ctx, tomb, bkt); err != nil { + if err := n.putTombstoneObject(ctx, tomb, bkt, prmAuth); err != nil { n.reqLogger(ctx).Warn(logs.FailedToPutTombstoneObject, zap.String("cid", bkt.CID.EncodeToString()), zap.Error(err), logs.TagField(logs.TagExternalStorage)) errCh <- fmt.Errorf("put tombstone object: %w", err) } @@ -76,7 +80,7 @@ func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, me } } -func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone, bktInfo *data.BucketInfo) error { +func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone, bktInfo *data.BucketInfo, prmAuth frostfs.PrmAuth) error { payload, err := tomb.Marshal() if err != nil { return fmt.Errorf("marshal tombstone: %w", err) @@ -91,6 +95,7 @@ func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone, WithoutHomomorphicHash: bktInfo.HomomorphicHashDisabled, BufferMaxSize: n.features.BufferMaxSizeForPut(), Type: object.TypeTombstone, + PrmAuth: prmAuth, } n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner) @@ -113,8 +118,17 @@ func (n *Layer) getMembers(ctx context.Context, cnrID cid.ID, objID oid.ID, toke } func prepareTokensParameter(ctx context.Context, bktOwner user.ID) relations.Tokens { + return prepareTokensParameterBase(ctx, bktOwner, frostfs.PrmAuth{}) +} + +func prepareTokensParameterBase(ctx context.Context, bktOwner user.ID, prmAuth frostfs.PrmAuth) relations.Tokens { tokens := relations.Tokens{} + if prmAuth.BearerToken != nil { + tokens.Bearer = prmAuth.BearerToken + return tokens + } + if bd, err := middleware.GetBoxData(ctx); err == nil && bd.Gate.BearerToken != nil { if bd.Gate.BearerToken.Impersonate() || bktOwner.Equals(bearer.ResolveIssuer(*bd.Gate.BearerToken)) { tokens.Bearer = bd.Gate.BearerToken diff --git a/api/layer/tree/tree_service.go b/api/layer/tree/tree_service.go index 1f4a9ccd..b457a6c2 100644 --- a/api/layer/tree/tree_service.go +++ b/api/layer/tree/tree_service.go @@ -49,7 +49,7 @@ type Service interface { // AddVersion creates new version in tree. // Returns new node id and object ids of old versions (OIDS) that must be deleted. // OIDs can be returned even if error is not nil. - AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, []oid.ID, error) + AddVersion(ctx context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, []data.OIDInfo, error) RemoveVersion(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64) error PutLock(ctx context.Context, bktInfo *data.BucketInfo, nodeID uint64, lock *data.LockInfo) error diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index 146411c8..bb34997b 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -240,7 +240,7 @@ func (t *TreeServiceMock) GetUnversioned(_ context.Context, bktInfo *data.Bucket return nil, tree.ErrNodeNotFound } -func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, []oid.ID, error) { +func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo, newVersion *data.NodeVersion) (uint64, []data.OIDInfo, error) { cnrVersionsMap, ok := t.versions[bktInfo.CID.EncodeToString()] if !ok { t.versions[bktInfo.CID.EncodeToString()] = map[string][]*data.NodeVersion{ @@ -266,7 +266,7 @@ func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo result := versions - var oldUnversionedIDs []oid.ID + var oldUnversionedIDs []data.OIDInfo if newVersion.IsUnversioned { result = make([]*data.NodeVersion, 0, len(versions)) @@ -274,7 +274,7 @@ func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo if !node.IsUnversioned { result = append(result, node) } else { - oldUnversionedIDs = append(oldUnversionedIDs, node.OID) + oldUnversionedIDs = append(oldUnversionedIDs, data.OIDInfo{ID: node.OID, IsCombined: node.IsCombined}) } } } diff --git a/internal/logs/logs.go b/internal/logs/logs.go index c8966126..36a2e7e4 100644 --- a/internal/logs/logs.go +++ b/internal/logs/logs.go @@ -181,6 +181,7 @@ const ( CouldNotFetchObjectMeta = "could not fetch object meta" FailedToDeleteObject = "failed to delete object" FailedToRemoveOldUnversionedObject = "failed to remove old unversioned object" + FailedToRemoveOldUnversionedCombinedObject = "failed to remove old unversioned combined object" CouldntDeleteLifecycleObject = "couldn't delete lifecycle configuration object" CouldntGetCORSObjectVersions = "couldn't get cors object versions" ) diff --git a/pkg/service/tree/tree.go b/pkg/service/tree/tree.go index f82809b7..cbfd677a 100644 --- a/pkg/service/tree/tree.go +++ b/pkg/service/tree/tree.go @@ -1331,7 +1331,7 @@ func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, tre return nodes, nil } -func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, []oid.ID, error) { +func (c *Tree) AddVersion(ctx context.Context, bktInfo *data.BucketInfo, version *data.NodeVersion) (uint64, []data.OIDInfo, error) { ctx, span := tracing.StartSpanFromContext(ctx, "tree.AddVersion") defer span.End() @@ -1768,7 +1768,7 @@ func (c *Tree) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.Bucket return getObjectTagging(nodes[isTagKV]), lockInfo, nil } -func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, []oid.ID, error) { +func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID string, version *data.NodeVersion) (uint64, []data.OIDInfo, error) { path := pathFromName(version.FilePath) meta := map[string]string{ oidKV: version.OID.EncodeToString(), @@ -1806,9 +1806,12 @@ func (c *Tree) addVersion(ctx context.Context, bktInfo *data.BucketInfo, treeID return 0, nil, err } - oldOIDs := make([]oid.ID, len(nodes)) + oldOIDs := make([]data.OIDInfo, len(nodes)) for i, oldNode := range nodes { - oldOIDs[i] = oldNode.OID + oldOIDs[i] = data.OIDInfo{ + ID: oldNode.OID, + IsCombined: oldNode.IsCombined, + } } return node.ID, oldOIDs, c.clearOutdatedVersionInfo(ctx, bktInfo, treeID, nodes)