diff --git a/api/handler/multipart_upload_test.go b/api/handler/multipart_upload_test.go index 370ce4f7..003cf065 100644 --- a/api/handler/multipart_upload_test.go +++ b/api/handler/multipart_upload_test.go @@ -62,6 +62,36 @@ func TestPeriodicWriter(t *testing.T) { }) } +func TestDeleteMultipartAllParts(t *testing.T) { + hc := prepareHandlerContext(t) + + partSize := layer.UploadMinSize + objLen := 6 * partSize + + bktName, bktName2, objName := "bucket", "bucket2", "object" + + // unversioned bucket + createTestBucket(hc, bktName) + multipartUpload(hc, bktName, objName, nil, objLen, partSize) + deleteObject(t, hc, bktName, objName, emptyVersion) + require.Empty(t, hc.tp.Objects()) + + // encrypted multipart + multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize) + deleteObject(t, hc, bktName, objName, emptyVersion) + require.Empty(t, hc.tp.Objects()) + + // versions bucket + createTestBucket(hc, bktName2) + putBucketVersioning(t, hc, bktName2, true) + multipartUpload(hc, bktName2, objName, nil, objLen, partSize) + _, hdr := getObject(hc, bktName2, objName) + versionID := hdr.Get("X-Amz-Version-Id") + deleteObject(t, hc, bktName2, objName, emptyVersion) + deleteObject(t, hc, bktName2, objName, versionID) + require.Empty(t, hc.tp.Objects()) +} + func TestMultipartUploadInvalidPart(t *testing.T) { hc := prepareHandlerContext(t) diff --git a/api/layer/layer.go b/api/layer/layer.go index 25465cdc..dd8c2aac 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -4,6 +4,7 @@ import ( "context" "crypto/ecdsa" "crypto/rand" + "encoding/json" "fmt" "io" "net/url" @@ -752,9 +753,40 @@ func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, node return obj.VersionID, nil } + if nodeVersion.IsCombined { + return "", n.removeCombinedObject(ctx, bkt, nodeVersion) + } + return "", n.objectDelete(ctx, bkt, nodeVersion.OID) } +func (n *layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion) error { + combinedObj, err := n.objectGet(ctx, bkt, nodeVersion.OID) + if err != nil { + return fmt.Errorf("get combined object '%s': %w", nodeVersion.OID.EncodeToString(), err) + } + + var parts []*data.PartInfo + if err = json.Unmarshal(combinedObj.Payload(), &parts); err != nil { + return fmt.Errorf("unmarshal combined object parts: %w", err) + } + + for _, part := range parts { + if err = n.objectDelete(ctx, bkt, part.OID); err == nil { + continue + } + + if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) { + return fmt.Errorf("couldn't delete part '%s': %w", part.OID.EncodeToString(), err) + } + + n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", bkt.CID.EncodeToString()), + zap.String("oid", part.OID.EncodeToString()), zap.Int("part number", part.Number), zap.Error(err)) + } + + return n.objectDelete(ctx, bkt, nodeVersion.OID) +} + // DeleteObjects from the storage. func (n *layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*VersionedObject { for i, obj := range p.Objects {