[#363] Fix removing combined object
/ Vulncheck (pull_request) Failing after 1m36s Details
/ DCO (pull_request) Successful in 17m8s Details
/ Builds (1.20) (pull_request) Successful in 30m42s Details
/ Builds (1.21) (pull_request) Successful in 30m39s Details
/ Lint (pull_request) Successful in 31m19s Details
/ Tests (1.20) (pull_request) Successful in 30m33s Details
/ Tests (1.21) (pull_request) Successful in 23m46s Details

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
Denis Kirillov 2024-04-11 18:01:05 +03:00
parent 65a8e2dadc
commit 765015f1e8
2 changed files with 62 additions and 0 deletions

View File

@ -38,6 +38,36 @@ func TestMultipartUploadInvalidPart(t *testing.T) {
assertS3Error(hc.t, w, s3Errors.GetAPIError(s3Errors.ErrEntityTooSmall))
}
func TestDeleteMultipartAllParts(t *testing.T) {
hc := prepareHandlerContext(t)
partSize := layer.UploadMinSize
objLen := 6 * partSize
bktName, bktName2, objName := "bucket", "bucket2", "object"
// unversioned bucket
createTestBucket(hc, bktName)
multipartUpload(hc, bktName, objName, nil, objLen, partSize)
deleteObject(t, hc, bktName, objName, emptyVersion)
require.Empty(t, hc.tp.Objects())
// encrypted multipart
multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize)
deleteObject(t, hc, bktName, objName, emptyVersion)
require.Empty(t, hc.tp.Objects())
// versions bucket
createTestBucket(hc, bktName2)
putBucketVersioning(t, hc, bktName2, true)
multipartUpload(hc, bktName2, objName, nil, objLen, partSize)
_, hdr := getObject(hc, bktName2, objName)
versionID := hdr.Get("X-Amz-Version-Id")
deleteObject(t, hc, bktName2, objName, emptyVersion)
deleteObject(t, hc, bktName2, objName, versionID)
require.Empty(t, hc.tp.Objects())
}
func TestMultipartReUploadPart(t *testing.T) {
hc := prepareHandlerContext(t)

View File

@ -4,6 +4,7 @@ import (
"context"
"crypto/ecdsa"
"crypto/rand"
"encoding/json"
"encoding/xml"
"fmt"
"io"
@ -791,9 +792,40 @@ func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, node
return obj.VersionID, nil
}
if nodeVersion.IsCombined {
return "", n.removeCombinedObject(ctx, bkt, nodeVersion)
}
return "", n.objectDelete(ctx, bkt, nodeVersion.OID)
}
func (n *layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion) error {
combinedObj, err := n.objectGet(ctx, bkt, nodeVersion.OID)
if err != nil {
return fmt.Errorf("get combined object '%s': %w", nodeVersion.OID.EncodeToString(), err)
}
var parts []*data.PartInfo
if err = json.Unmarshal(combinedObj.Payload(), &parts); err != nil {
return fmt.Errorf("unmarshal combined object parts: %w", err)
}
for _, part := range parts {
if err = n.objectDelete(ctx, bkt, part.OID); err == nil {
continue
}
if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) {
return fmt.Errorf("couldn't delete part '%s': %w", part.OID.EncodeToString(), err)
}
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", bkt.CID.EncodeToString()),
zap.String("oid", part.OID.EncodeToString()), zap.Int("part number", part.Number), zap.Error(err))
}
return n.objectDelete(ctx, bkt, nodeVersion.OID)
}
// DeleteObjects from the storage.
func (n *layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*VersionedObject {
for i, obj := range p.Objects {