[#370] Fix removing combined object

Port #364

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-04-17 14:17:55 +03:00
parent 486b8a4284
commit 399a6d6d65
2 changed files with 62 additions and 0 deletions

View file

@ -62,6 +62,36 @@ func TestPeriodicWriter(t *testing.T) {
})
}
func TestDeleteMultipartAllParts(t *testing.T) {
hc := prepareHandlerContext(t)
partSize := layer.UploadMinSize
objLen := 6 * partSize
bktName, bktName2, objName := "bucket", "bucket2", "object"
// unversioned bucket
createTestBucket(hc, bktName)
multipartUpload(hc, bktName, objName, nil, objLen, partSize)
deleteObject(t, hc, bktName, objName, emptyVersion)
require.Empty(t, hc.tp.Objects())
// encrypted multipart
multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize)
deleteObject(t, hc, bktName, objName, emptyVersion)
require.Empty(t, hc.tp.Objects())
// versions bucket
createTestBucket(hc, bktName2)
putBucketVersioning(t, hc, bktName2, true)
multipartUpload(hc, bktName2, objName, nil, objLen, partSize)
_, hdr := getObject(hc, bktName2, objName)
versionID := hdr.Get("X-Amz-Version-Id")
deleteObject(t, hc, bktName2, objName, emptyVersion)
deleteObject(t, hc, bktName2, objName, versionID)
require.Empty(t, hc.tp.Objects())
}
func TestMultipartUploadInvalidPart(t *testing.T) {
hc := prepareHandlerContext(t)

View file

@ -4,6 +4,7 @@ import (
"context"
"crypto/ecdsa"
"crypto/rand"
"encoding/json"
"fmt"
"io"
"net/url"
@ -752,9 +753,40 @@ func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, node
return obj.VersionID, nil
}
if nodeVersion.IsCombined {
return "", n.removeCombinedObject(ctx, bkt, nodeVersion)
}
return "", n.objectDelete(ctx, bkt, nodeVersion.OID)
}
func (n *layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion) error {
combinedObj, err := n.objectGet(ctx, bkt, nodeVersion.OID)
if err != nil {
return fmt.Errorf("get combined object '%s': %w", nodeVersion.OID.EncodeToString(), err)
}
var parts []*data.PartInfo
if err = json.Unmarshal(combinedObj.Payload(), &parts); err != nil {
return fmt.Errorf("unmarshal combined object parts: %w", err)
}
for _, part := range parts {
if err = n.objectDelete(ctx, bkt, part.OID); err == nil {
continue
}
if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) {
return fmt.Errorf("couldn't delete part '%s': %w", part.OID.EncodeToString(), err)
}
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", bkt.CID.EncodeToString()),
zap.String("oid", part.OID.EncodeToString()), zap.Int("part number", part.Number), zap.Error(err))
}
return n.objectDelete(ctx, bkt, nodeVersion.OID)
}
// DeleteObjects from the storage.
func (n *layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*VersionedObject {
for i, obj := range p.Objects {