bugfix/multipart_removing #370

Merged
alexvanin merged 2 commits from dkirillov/frostfs-s3-gw:bugfix/multipart_removing into support/v0.28 2024-04-18 10:14:53 +00:00
3 changed files with 80 additions and 2 deletions

View File

@ -62,6 +62,52 @@ func TestPeriodicWriter(t *testing.T) {
})
}
func TestDeleteMultipartAllParts(t *testing.T) {
hc := prepareHandlerContext(t)
partSize := layer.UploadMinSize
objLen := 6 * partSize
bktName, bktName2, objName := "bucket", "bucket2", "object"
// unversioned bucket
createTestBucket(hc, bktName)
multipartUpload(hc, bktName, objName, nil, objLen, partSize)
deleteObject(t, hc, bktName, objName, emptyVersion)
require.Empty(t, hc.tp.Objects())
// encrypted multipart
multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize)
deleteObject(t, hc, bktName, objName, emptyVersion)
require.Empty(t, hc.tp.Objects())
// versions bucket
createTestBucket(hc, bktName2)
putBucketVersioning(t, hc, bktName2, true)
multipartUpload(hc, bktName2, objName, nil, objLen, partSize)
_, hdr := getObject(hc, bktName2, objName)
versionID := hdr.Get("X-Amz-Version-Id")
deleteObject(t, hc, bktName2, objName, emptyVersion)
deleteObject(t, hc, bktName2, objName, versionID)
require.Empty(t, hc.tp.Objects())
}
func TestMultipartTreeSize(t *testing.T) {
hc := prepareHandlerContext(t)
partSize := layer.UploadMinSize
objLen := 6 * partSize
bktName, objName := "bucket", "object"
bktInfo := createTestBucket(hc, bktName)
multipartUpload(hc, bktName, objName, nil, objLen, partSize)
nodeVersion, err := hc.tree.GetLatestVersion(hc.Context(), bktInfo, objName)
require.NoError(t, err)
require.EqualValues(t, objLen, nodeVersion.Size)
}
func TestMultipartUploadInvalidPart(t *testing.T) {
hc := prepareHandlerContext(t)

View File

@ -4,6 +4,7 @@ import (
"context"
"crypto/ecdsa"
"crypto/rand"
"encoding/json"
"fmt"
"io"
"net/url"
@ -752,9 +753,40 @@ func (n *layer) removeOldVersion(ctx context.Context, bkt *data.BucketInfo, node
return obj.VersionID, nil
}
if nodeVersion.IsCombined {
return "", n.removeCombinedObject(ctx, bkt, nodeVersion)
}
return "", n.objectDelete(ctx, bkt, nodeVersion.OID)
}
func (n *layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, nodeVersion *data.NodeVersion) error {
combinedObj, err := n.objectGet(ctx, bkt, nodeVersion.OID)
if err != nil {
return fmt.Errorf("get combined object '%s': %w", nodeVersion.OID.EncodeToString(), err)
}
var parts []*data.PartInfo
if err = json.Unmarshal(combinedObj.Payload(), &parts); err != nil {
return fmt.Errorf("unmarshal combined object parts: %w", err)
}
for _, part := range parts {
if err = n.objectDelete(ctx, bkt, part.OID); err == nil {
continue
}
if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) {
return fmt.Errorf("couldn't delete part '%s': %w", part.OID.EncodeToString(), err)
}
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", bkt.CID.EncodeToString()),
zap.String("oid", part.OID.EncodeToString()), zap.Int("part number", part.Number), zap.Error(err))
}
return n.objectDelete(ctx, bkt, nodeVersion.OID)
}
// DeleteObjects from the storage.
func (n *layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*VersionedObject {
for i, obj := range p.Objects {

View File

@ -315,7 +315,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
OID: id,
ETag: hex.EncodeToString(hash),
FilePath: p.Object,
Size: size,
Size: p.Size,
},
IsUnversioned: !bktSettings.VersioningEnabled(),
IsCombined: p.Header[MultipartObjectSize] != "",
@ -356,7 +356,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
Owner: n.gateOwner,
Bucket: p.BktInfo.Name,
Name: p.Object,
Size: size,
Size: size, // we don't use here p.Size to be consistent with the objectInfoFromMeta function
Created: prm.CreationTime,
Headers: p.Header,
ContentType: p.Header[api.ContentType],