[#559] Return error from multipart deleting

Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
Marina Biryukova 2024-12-11 07:05:57 +03:00 committed by Alexey Vanin
parent 04b8fc2b5f
commit df1af2d2c9
6 changed files with 86 additions and 27 deletions

View file

@ -55,13 +55,17 @@ func TestDeleteMultipartAllParts(t *testing.T) {
// unversioned bucket
createTestBucket(hc, bktName)
multipartUpload(hc, bktName, objName, nil, objLen, partSize)
hc.tp.ClearTombstoneOIDCount()
deleteObject(t, hc, bktName, objName, emptyVersion)
require.Empty(t, hc.tp.Objects())
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
// encrypted multipart
multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize)
hc.tp.ClearTombstoneOIDCount()
deleteObject(t, hc, bktName, objName, emptyVersion)
require.Empty(t, hc.tp.Objects())
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
// versions bucket
createTestBucket(hc, bktName2)
@ -69,8 +73,11 @@ func TestDeleteMultipartAllParts(t *testing.T) {
multipartUpload(hc, bktName2, objName, nil, objLen, partSize)
_, hdr := getObject(hc, bktName2, objName)
versionID := hdr.Get("X-Amz-Version-Id")
hc.tp.ClearTombstoneOIDCount()
deleteObject(t, hc, bktName2, objName, emptyVersion)
require.Equal(t, 0, hc.tp.TombstoneOIDCount())
deleteObject(t, hc, bktName2, objName, versionID)
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
require.Empty(t, hc.tp.Objects())
}

View file

@ -75,13 +75,14 @@ func (k *FeatureSettingsMock) FormContainerZone(ns string) string {
var _ frostfs.FrostFS = (*TestFrostFS)(nil)
type TestFrostFS struct {
objects map[string]*object.Object
objectErrors map[string]error
objectPutErrors map[string]error
containers map[string]*container.Container
chains map[string][]chain.Chain
currentEpoch uint64
key *keys.PrivateKey
objects map[string]*object.Object
objectErrors map[string]error
objectPutErrors map[string]error
containers map[string]*container.Container
chains map[string][]chain.Chain
currentEpoch uint64
key *keys.PrivateKey
tombstoneOIDCount int
}
func NewTestFrostFS(key *keys.PrivateKey) *TestFrostFS {
@ -373,6 +374,7 @@ func (t *TestFrostFS) createTombstone(ctx context.Context, prm frostfs.PrmObject
if err = t.DeleteObject(ctx, prmDelete); err != nil {
return nil, err
}
t.tombstoneOIDCount++
}
return &frostfs.CreateObjectResult{
@ -380,6 +382,14 @@ func (t *TestFrostFS) createTombstone(ctx context.Context, prm frostfs.PrmObject
}, nil
}
func (t *TestFrostFS) TombstoneOIDCount() int {
return t.tombstoneOIDCount
}
func (t *TestFrostFS) ClearTombstoneOIDCount() {
t.tombstoneOIDCount = 0
}
func (t *TestFrostFS) DeleteObject(ctx context.Context, prm frostfs.PrmObjectDelete) error {
var addr oid.Address
addr.SetContainer(prm.Container)

View file

@ -28,7 +28,6 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -763,25 +762,29 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
}
tokens := prepareTokensParameter(ctx, bkt.Owner)
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, nodeVersion.OID, tokens)
if err != nil {
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
zap.String("oid", nodeVersion.OID.EncodeToString()), zap.Error(err))
}
members := append(oids, nodeVersion.OID)
members := make([]oid.ID, 0)
// First gateway tries to delete all object parts.
// In case of errors, abort multipart removal.
for _, part := range parts {
oids, err = relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, part.OID, tokens)
oids, err := n.getMembers(ctx, bkt.CID, part.OID, tokens)
if err != nil {
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
zap.String("oid", part.OID.EncodeToString()), zap.Error(err))
return err
}
members = append(members, append(oids, part.OID)...)
members = append(members, oids...)
}
n.putTombstones(ctx, bkt, networkInfo, members)
return nil
if err = n.putTombstones(ctx, bkt, networkInfo, members); err != nil {
return fmt.Errorf("put tombstones with parts: %w", err)
}
// If all parts were removed successfully, remove multipart linking object.
// Do not delete this object first, because gateway won't be able to find parts.
members, err = n.getMembers(ctx, bkt.CID, nodeVersion.OID, tokens)
if err != nil {
return err
}
return n.putTombstones(ctx, bkt, networkInfo, members)
}
// DeleteObjects from the storage.

View file

@ -586,12 +586,16 @@ func (n *Layer) deleteUploadedParts(ctx context.Context, bkt *data.BucketInfo, p
if err != nil {
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
zap.String("oid", info.OID.EncodeToString()), zap.Error(err))
continue
}
members = append(members, append(oids, info.OID)...)
}
}
n.putTombstones(ctx, bkt, networkInfo, members)
err := n.putTombstones(ctx, bkt, networkInfo, members)
if err != nil {
n.reqLogger(ctx).Warn(logs.FailedToPutTombstones, zap.Error(err))
}
}
func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {

View file

@ -13,6 +13,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -21,23 +23,39 @@ import (
"go.uber.org/zap"
)
func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) {
func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) error {
if len(members) == 0 {
return nil
}
var wg sync.WaitGroup
tombstoneMembersSize := n.features.TombstoneMembersSize()
tombstoneLifetime := n.features.TombstoneLifetime()
tombstonesCount := len(members) / tombstoneMembersSize
if len(members)%tombstoneMembersSize != 0 {
tombstonesCount++
}
errCh := make(chan error, tombstonesCount)
for i := 0; i < len(members); i += tombstoneMembersSize {
for i := 0; i < tombstonesCount; i++ {
end := tombstoneMembersSize * (i + 1)
if end > len(members) {
end = len(members)
}
n.submitPutTombstone(ctx, bkt, members[i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg)
n.submitPutTombstone(ctx, bkt, members[tombstoneMembersSize*i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg, errCh)
}
wg.Wait()
close(errCh)
if err := <-errCh; err != nil {
return err
}
return nil
}
func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup) {
func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup, errCh chan<- error) {
tomb := object.NewTombstone()
tomb.SetExpirationEpoch(expEpoch)
tomb.SetMembers(members)
@ -48,11 +66,13 @@ func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, me
if err := n.putTombstoneObject(ctx, tomb, bkt); err != nil {
n.reqLogger(ctx).Warn(logs.FailedToPutTombstoneObject, zap.String("cid", bkt.CID.EncodeToString()), zap.Error(err))
errCh <- fmt.Errorf("put tombstone object: %w", err)
}
})
if err != nil {
wg.Done()
n.reqLogger(ctx).Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
errCh <- fmt.Errorf("submit task to pool: %w", err)
}
}
@ -78,6 +98,20 @@ func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone,
return err
}
func (n *Layer) getMembers(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), cnrID, objID, tokens)
if err != nil {
if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) {
return nil, fmt.Errorf("failed to list all object relations '%s': %w", objID.EncodeToString(), err)
}
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", cnrID.EncodeToString()),
zap.String("oid", objID.EncodeToString()), zap.Error(err))
return nil, nil
}
return append(oids, objID), nil
}
func prepareTokensParameter(ctx context.Context, bktOwner user.ID) relations.Tokens {
tokens := relations.Tokens{}

View file

@ -182,4 +182,5 @@ const (
FailedToCreateWorkerPool = "failed to create worker pool"
FailedToListAllObjectRelations = "failed to list all object relations"
WarnInvalidTypeTLSTerminationHeader = "invalid type of value of tls termination header"
FailedToPutTombstones = "failed to put tombstones"
)