[#559] Return error from multipart deleting #574

Merged
alexvanin merged 1 commit from mbiryukova/frostfs-s3-gw:feature/multipart_tombstone into master 2024-12-13 11:23:57 +00:00
6 changed files with 86 additions and 27 deletions

View file

@ -55,13 +55,17 @@ func TestDeleteMultipartAllParts(t *testing.T) {
// unversioned bucket // unversioned bucket
createTestBucket(hc, bktName) createTestBucket(hc, bktName)
multipartUpload(hc, bktName, objName, nil, objLen, partSize) multipartUpload(hc, bktName, objName, nil, objLen, partSize)
hc.tp.ClearTombstoneOIDCount()
deleteObject(t, hc, bktName, objName, emptyVersion) deleteObject(t, hc, bktName, objName, emptyVersion)
require.Empty(t, hc.tp.Objects()) require.Empty(t, hc.tp.Objects())
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
// encrypted multipart // encrypted multipart
multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize) multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize)
hc.tp.ClearTombstoneOIDCount()
deleteObject(t, hc, bktName, objName, emptyVersion) deleteObject(t, hc, bktName, objName, emptyVersion)
require.Empty(t, hc.tp.Objects()) require.Empty(t, hc.tp.Objects())
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
// versions bucket // versions bucket
createTestBucket(hc, bktName2) createTestBucket(hc, bktName2)
@ -69,8 +73,11 @@ func TestDeleteMultipartAllParts(t *testing.T) {
multipartUpload(hc, bktName2, objName, nil, objLen, partSize) multipartUpload(hc, bktName2, objName, nil, objLen, partSize)
_, hdr := getObject(hc, bktName2, objName) _, hdr := getObject(hc, bktName2, objName)
versionID := hdr.Get("X-Amz-Version-Id") versionID := hdr.Get("X-Amz-Version-Id")
hc.tp.ClearTombstoneOIDCount()
deleteObject(t, hc, bktName2, objName, emptyVersion) deleteObject(t, hc, bktName2, objName, emptyVersion)
require.Equal(t, 0, hc.tp.TombstoneOIDCount())
deleteObject(t, hc, bktName2, objName, versionID) deleteObject(t, hc, bktName2, objName, versionID)
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
require.Empty(t, hc.tp.Objects()) require.Empty(t, hc.tp.Objects())
} }

View file

@ -82,6 +82,7 @@ type TestFrostFS struct {
chains map[string][]chain.Chain chains map[string][]chain.Chain
currentEpoch uint64 currentEpoch uint64
key *keys.PrivateKey key *keys.PrivateKey
tombstoneOIDCount int
} }
func NewTestFrostFS(key *keys.PrivateKey) *TestFrostFS { func NewTestFrostFS(key *keys.PrivateKey) *TestFrostFS {
@ -373,6 +374,7 @@ func (t *TestFrostFS) createTombstone(ctx context.Context, prm frostfs.PrmObject
if err = t.DeleteObject(ctx, prmDelete); err != nil { if err = t.DeleteObject(ctx, prmDelete); err != nil {
return nil, err return nil, err
} }
t.tombstoneOIDCount++
} }
return &frostfs.CreateObjectResult{ return &frostfs.CreateObjectResult{
@ -380,6 +382,14 @@ func (t *TestFrostFS) createTombstone(ctx context.Context, prm frostfs.PrmObject
}, nil }, nil
} }
func (t *TestFrostFS) TombstoneOIDCount() int {
return t.tombstoneOIDCount
}
func (t *TestFrostFS) ClearTombstoneOIDCount() {
t.tombstoneOIDCount = 0
}
func (t *TestFrostFS) DeleteObject(ctx context.Context, prm frostfs.PrmObjectDelete) error { func (t *TestFrostFS) DeleteObject(ctx context.Context, prm frostfs.PrmObjectDelete) error {
var addr oid.Address var addr oid.Address
addr.SetContainer(prm.Container) addr.SetContainer(prm.Container)

View file

@ -28,7 +28,6 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -763,25 +762,29 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
} }
tokens := prepareTokensParameter(ctx, bkt.Owner) tokens := prepareTokensParameter(ctx, bkt.Owner)
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, nodeVersion.OID, tokens) members := make([]oid.ID, 0)
if err != nil { // First gateway tries to delete all object parts.
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()), // In case of errors, abort multipart removal.
zap.String("oid", nodeVersion.OID.EncodeToString()), zap.Error(err))
}
members := append(oids, nodeVersion.OID)
for _, part := range parts { for _, part := range parts {
oids, err = relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, part.OID, tokens) oids, err := n.getMembers(ctx, bkt.CID, part.OID, tokens)
if err != nil { if err != nil {
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()), return err
zap.String("oid", part.OID.EncodeToString()), zap.Error(err)) }
members = append(members, oids...)
} }
members = append(members, append(oids, part.OID)...) if err = n.putTombstones(ctx, bkt, networkInfo, members); err != nil {
return fmt.Errorf("put tombstones with parts: %w", err)
} }
n.putTombstones(ctx, bkt, networkInfo, members) // If all parts were removed successfully, remove multipart linking object.
return nil // Do not delete this object first, because gateway won't be able to find parts.
members, err = n.getMembers(ctx, bkt.CID, nodeVersion.OID, tokens)
if err != nil {
return err
}
return n.putTombstones(ctx, bkt, networkInfo, members)
} }
// DeleteObjects from the storage. // DeleteObjects from the storage.

View file

@ -586,12 +586,16 @@ func (n *Layer) deleteUploadedParts(ctx context.Context, bkt *data.BucketInfo, p
if err != nil { if err != nil {
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()), n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
zap.String("oid", info.OID.EncodeToString()), zap.Error(err)) zap.String("oid", info.OID.EncodeToString()), zap.Error(err))
continue
} }
members = append(members, append(oids, info.OID)...) members = append(members, append(oids, info.OID)...)
} }
} }
n.putTombstones(ctx, bkt, networkInfo, members) err := n.putTombstones(ctx, bkt, networkInfo, members)
if err != nil {
n.reqLogger(ctx).Warn(logs.FailedToPutTombstones, zap.Error(err))
}
} }
func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) { func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {

View file

@ -13,6 +13,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -21,23 +23,39 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
) )
func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) { func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) error {
if len(members) == 0 {
return nil
}
var wg sync.WaitGroup var wg sync.WaitGroup
tombstoneMembersSize := n.features.TombstoneMembersSize() tombstoneMembersSize := n.features.TombstoneMembersSize()
tombstoneLifetime := n.features.TombstoneLifetime() tombstoneLifetime := n.features.TombstoneLifetime()
tombstonesCount := len(members) / tombstoneMembersSize
if len(members)%tombstoneMembersSize != 0 {
tombstonesCount++
}
errCh := make(chan error, tombstonesCount)
for i := 0; i < len(members); i += tombstoneMembersSize { for i := 0; i < tombstonesCount; i++ {
end := tombstoneMembersSize * (i + 1) end := tombstoneMembersSize * (i + 1)
if end > len(members) { if end > len(members) {
end = len(members) end = len(members)
} }
n.submitPutTombstone(ctx, bkt, members[i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg) n.submitPutTombstone(ctx, bkt, members[tombstoneMembersSize*i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg, errCh)
} }
wg.Wait() wg.Wait()
close(errCh)
if err := <-errCh; err != nil {
return err
} }
func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup) { return nil
}
func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup, errCh chan<- error) {
tomb := object.NewTombstone() tomb := object.NewTombstone()
tomb.SetExpirationEpoch(expEpoch) tomb.SetExpirationEpoch(expEpoch)
tomb.SetMembers(members) tomb.SetMembers(members)
@ -48,11 +66,13 @@ func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, me
if err := n.putTombstoneObject(ctx, tomb, bkt); err != nil { if err := n.putTombstoneObject(ctx, tomb, bkt); err != nil {
n.reqLogger(ctx).Warn(logs.FailedToPutTombstoneObject, zap.String("cid", bkt.CID.EncodeToString()), zap.Error(err)) n.reqLogger(ctx).Warn(logs.FailedToPutTombstoneObject, zap.String("cid", bkt.CID.EncodeToString()), zap.Error(err))
errCh <- fmt.Errorf("put tombstone object: %w", err)
} }
}) })
if err != nil { if err != nil {
wg.Done() wg.Done()
n.reqLogger(ctx).Warn(logs.FailedToSubmitTaskToPool, zap.Error(err)) n.reqLogger(ctx).Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
errCh <- fmt.Errorf("submit task to pool: %w", err)
} }
} }
@ -78,6 +98,20 @@ func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone,
return err return err
} }
func (n *Layer) getMembers(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), cnrID, objID, tokens)
if err != nil {
if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) {
return nil, fmt.Errorf("failed to list all object relations '%s': %w", objID.EncodeToString(), err)
}
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", cnrID.EncodeToString()),
zap.String("oid", objID.EncodeToString()), zap.Error(err))
return nil, nil
}
return append(oids, objID), nil
}
func prepareTokensParameter(ctx context.Context, bktOwner user.ID) relations.Tokens { func prepareTokensParameter(ctx context.Context, bktOwner user.ID) relations.Tokens {
tokens := relations.Tokens{} tokens := relations.Tokens{}

View file

@ -182,4 +182,5 @@ const (
FailedToCreateWorkerPool = "failed to create worker pool" FailedToCreateWorkerPool = "failed to create worker pool"
FailedToListAllObjectRelations = "failed to list all object relations" FailedToListAllObjectRelations = "failed to list all object relations"
WarnInvalidTypeTLSTerminationHeader = "invalid type of value of tls termination header" WarnInvalidTypeTLSTerminationHeader = "invalid type of value of tls termination header"
FailedToPutTombstones = "failed to put tombstones"
) )