[#559] Return error from multipart deleting #574

Merged
alexvanin merged 1 commit from mbiryukova/frostfs-s3-gw:feature/multipart_tombstone into master 2024-12-13 11:23:57 +00:00
6 changed files with 86 additions and 27 deletions

View file

@ -55,13 +55,17 @@ func TestDeleteMultipartAllParts(t *testing.T) {
// unversioned bucket
createTestBucket(hc, bktName)
multipartUpload(hc, bktName, objName, nil, objLen, partSize)
hc.tp.ClearTombstoneOIDCount()
deleteObject(t, hc, bktName, objName, emptyVersion)
require.Empty(t, hc.tp.Objects())
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
// encrypted multipart
multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize)
hc.tp.ClearTombstoneOIDCount()
deleteObject(t, hc, bktName, objName, emptyVersion)
require.Empty(t, hc.tp.Objects())
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
// versions bucket
createTestBucket(hc, bktName2)
@ -69,8 +73,11 @@ func TestDeleteMultipartAllParts(t *testing.T) {
multipartUpload(hc, bktName2, objName, nil, objLen, partSize)
_, hdr := getObject(hc, bktName2, objName)
versionID := hdr.Get("X-Amz-Version-Id")
hc.tp.ClearTombstoneOIDCount()
deleteObject(t, hc, bktName2, objName, emptyVersion)
require.Equal(t, 0, hc.tp.TombstoneOIDCount())
deleteObject(t, hc, bktName2, objName, versionID)
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
require.Empty(t, hc.tp.Objects())
}

View file

@ -82,6 +82,7 @@ type TestFrostFS struct {
chains map[string][]chain.Chain
currentEpoch uint64
key *keys.PrivateKey
tombstoneOIDCount int
}
func NewTestFrostFS(key *keys.PrivateKey) *TestFrostFS {
@ -373,6 +374,7 @@ func (t *TestFrostFS) createTombstone(ctx context.Context, prm frostfs.PrmObject
if err = t.DeleteObject(ctx, prmDelete); err != nil {
return nil, err
}
t.tombstoneOIDCount++
}
return &frostfs.CreateObjectResult{
@ -380,6 +382,14 @@ func (t *TestFrostFS) createTombstone(ctx context.Context, prm frostfs.PrmObject
}, nil
}
func (t *TestFrostFS) TombstoneOIDCount() int {
return t.tombstoneOIDCount
}
func (t *TestFrostFS) ClearTombstoneOIDCount() {
t.tombstoneOIDCount = 0
}
func (t *TestFrostFS) DeleteObject(ctx context.Context, prm frostfs.PrmObjectDelete) error {
var addr oid.Address
addr.SetContainer(prm.Container)

View file

@ -28,7 +28,6 @@ import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -763,25 +762,29 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
}
tokens := prepareTokensParameter(ctx, bkt.Owner)
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, nodeVersion.OID, tokens)
if err != nil {
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
zap.String("oid", nodeVersion.OID.EncodeToString()), zap.Error(err))
}
members := append(oids, nodeVersion.OID)
members := make([]oid.ID, 0)
// First gateway tries to delete all object parts.
// In case of errors, abort multipart removal.
for _, part := range parts {
oids, err = relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, part.OID, tokens)
oids, err := n.getMembers(ctx, bkt.CID, part.OID, tokens)
if err != nil {
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
zap.String("oid", part.OID.EncodeToString()), zap.Error(err))
return err
}
members = append(members, oids...)
}
members = append(members, append(oids, part.OID)...)
if err = n.putTombstones(ctx, bkt, networkInfo, members); err != nil {
return fmt.Errorf("put tombstones with parts: %w", err)
}
n.putTombstones(ctx, bkt, networkInfo, members)
return nil
// If all parts were removed successfully, remove multipart linking object.
// Do not delete this object first, because gateway won't be able to find parts.
members, err = n.getMembers(ctx, bkt.CID, nodeVersion.OID, tokens)
if err != nil {
return err
}
dkirillov marked this conversation as resolved Outdated

Can we use something like this, to simplify a little?

diff --git a/api/layer/layer.go b/api/layer/layer.go
index e764d887..e16eb71b 100644
--- a/api/layer/layer.go
+++ b/api/layer/layer.go
@@ -762,38 +762,28 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
 		return fmt.Errorf("unmarshal combined object parts: %w", err)
 	}
 
+	ids := make([]oid.ID, 1, len(parts)+1)
+	ids[0] = nodeVersion.OID
+	for i, part := range parts {
+		ids[i+1] = part.OID
+	}
+
 	tokens := prepareTokensParameter(ctx, bkt.Owner)
 	members := make([]oid.ID, 0)
-	for _, part := range parts {
-		oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, part.OID, tokens)
+	for _, id := range ids {
+		oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, id, tokens)
 		if err == nil {
-			members = append(members, append(oids, part.OID)...)
+			members = append(members, append(oids, id)...)
 			continue
 		}
 
 		if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) {
-			return fmt.Errorf("failed to list all object relations '%s': %w", part.OID.EncodeToString(), err)
-		}
-
-		n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
-			zap.String("oid", part.OID.EncodeToString()), zap.Error(err))
-	}
-
-	if err = n.putTombstones(ctx, bkt, networkInfo, members); err != nil {
-		return fmt.Errorf("put tombstones with parts: %w", err)
-	}
-
-	oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, nodeVersion.OID, tokens)
-	if err != nil {
-		if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) {
-			return fmt.Errorf("failed to list all object relations '%s': %w", nodeVersion.OID.EncodeToString(), err)
+			return fmt.Errorf("failed to list all object relations '%s': %w", id.EncodeToString(), err)
 		}
 
 		n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
-			zap.String("oid", nodeVersion.OID.EncodeToString()), zap.Error(err))
-		return nil
+			zap.String("oid", id.EncodeToString()), zap.Error(err))
 	}
-	members = append(oids, nodeVersion.OID)
 
 	return n.putTombstones(ctx, bkt, networkInfo, members)
 }

Can we use something like this, to simplify a little? ```diff diff --git a/api/layer/layer.go b/api/layer/layer.go index e764d887..e16eb71b 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -762,38 +762,28 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo, return fmt.Errorf("unmarshal combined object parts: %w", err) } + ids := make([]oid.ID, 1, len(parts)+1) + ids[0] = nodeVersion.OID + for i, part := range parts { + ids[i+1] = part.OID + } + tokens := prepareTokensParameter(ctx, bkt.Owner) members := make([]oid.ID, 0) - for _, part := range parts { - oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, part.OID, tokens) + for _, id := range ids { + oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, id, tokens) if err == nil { - members = append(members, append(oids, part.OID)...) + members = append(members, append(oids, id)...) continue } if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) { - return fmt.Errorf("failed to list all object relations '%s': %w", part.OID.EncodeToString(), err) - } - - n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()), - zap.String("oid", part.OID.EncodeToString()), zap.Error(err)) - } - - if err = n.putTombstones(ctx, bkt, networkInfo, members); err != nil { - return fmt.Errorf("put tombstones with parts: %w", err) - } - - oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, nodeVersion.OID, tokens) - if err != nil { - if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) { - return fmt.Errorf("failed to list all object relations '%s': %w", nodeVersion.OID.EncodeToString(), err) + return fmt.Errorf("failed to list all object relations '%s': %w", id.EncodeToString(), err) } n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()), - zap.String("oid", nodeVersion.OID.EncodeToString()), zap.Error(err)) - return nil + zap.String("oid", id.EncodeToString()), zap.Error(err)) } - members = append(oids, nodeVersion.OID) return n.putTombstones(ctx, bkt, networkInfo, members) } ```

nodeVersion.OID is deleted separately so that if errors occur during removal of parts, combined object is not deleted. It will allow to remove multipart again

`nodeVersion.OID` is deleted separately so that if errors occur during removal of parts, combined object is not deleted. It will allow to remove multipart again

We can leave a comment in code about this idea.

// First gateway tries to delete all object parts.
// In case of errors, abort multipart removal.
for _, part := range parts {
...
// If all parts were removed successfully, remove multipart linking object.
// Do not delete this object first, because gateway won't be able to find parts.
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, nodeVersion.OID, tokens)
We can leave a comment in code about this idea. ```go // First gateway tries to delete all object parts. // In case of errors, abort multipart removal. for _, part := range parts { ... // If all parts were removed successfully, remove multipart linking object. // Do not delete this object first, because gateway won't be able to find parts. oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, nodeVersion.OID, tokens) ```

I just meant to do this in one for (or extract duplicated code to function)

I just meant to do this in one `for` (or extract duplicated code to function)
return n.putTombstones(ctx, bkt, networkInfo, members)
}

Both container and object IDs are Stringers. There's no need to explicitly call EncodeToString when formatting or logging

Both container and object IDs are `Stringer`s. There's no need to explicitly call `EncodeToString` when formatting or logging

Yes, but in most places it's called, so will leave the same

Yes, but in most places it's called, so will leave the same
// DeleteObjects from the storage.

View file

@ -586,12 +586,16 @@ func (n *Layer) deleteUploadedParts(ctx context.Context, bkt *data.BucketInfo, p
if err != nil {
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
zap.String("oid", info.OID.EncodeToString()), zap.Error(err))
continue
dkirillov marked this conversation as resolved Outdated

Shouldn't we also check error and if it isn't object already removed or object not found then return error?

Shouldn't we also check error and if it isn't `object already removed` or `object not found` then return error?

I did it the way it was before not to change behavior

I did it the way it was [before](https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/commit/51322cccdf54e0e7bd4fd3fb0acd5553606498f1/api/layer/multipart_upload.go#L568) not to change behavior
}
members = append(members, append(oids, info.OID)...)
}
}
n.putTombstones(ctx, bkt, networkInfo, members)
err := n.putTombstones(ctx, bkt, networkInfo, members)
if err != nil {
n.reqLogger(ctx).Warn(logs.FailedToPutTombstones, zap.Error(err))
}
}
func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {

View file

@ -13,6 +13,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -21,23 +23,39 @@ import (
"go.uber.org/zap"
)
func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) {
func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) error {
if len(members) == 0 {
return nil
}
var wg sync.WaitGroup
tombstoneMembersSize := n.features.TombstoneMembersSize()
tombstoneLifetime := n.features.TombstoneLifetime()
tombstonesCount := len(members) / tombstoneMembersSize
if len(members)%tombstoneMembersSize != 0 {
tombstonesCount++
}
errCh := make(chan error, tombstonesCount)
for i := 0; i < len(members); i += tombstoneMembersSize {
for i := 0; i < tombstonesCount; i++ {
end := tombstoneMembersSize * (i + 1)
if end > len(members) {
end = len(members)
}
n.submitPutTombstone(ctx, bkt, members[i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg)
n.submitPutTombstone(ctx, bkt, members[tombstoneMembersSize*i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg, errCh)
}
wg.Wait()
close(errCh)
if err := <-errCh; err != nil {
return err
}
return nil
}
func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup) {
func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup, errCh chan<- error) {
tomb := object.NewTombstone()
tomb.SetExpirationEpoch(expEpoch)

It seems that we only return the first error from errCh in putTombstones. Should we leave log warnings for the rest?

It seems that we only return the first error from errCh in `putTombstones`. Should we leave log warnings for the rest?

I can add logging of all errors from channel in putTombstones

I can add logging of all errors from channel in `putTombstones`

I don't mind having logs here or log it all at once in putTombstones. But we should definitely log it.

I don't mind having logs here or log it all at once in `putTombstones`. But we should definitely log it.
tomb.SetMembers(members)
@ -48,11 +66,13 @@ func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, me
if err := n.putTombstoneObject(ctx, tomb, bkt); err != nil {
n.reqLogger(ctx).Warn(logs.FailedToPutTombstoneObject, zap.String("cid", bkt.CID.EncodeToString()), zap.Error(err))
errCh <- fmt.Errorf("put tombstone object: %w", err)
}
})
if err != nil {
wg.Done()
n.reqLogger(ctx).Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
errCh <- fmt.Errorf("submit task to pool: %w", err)
}
}
@ -78,6 +98,20 @@ func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone,
return err
}
func (n *Layer) getMembers(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), cnrID, objID, tokens)
if err != nil {
if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) {
return nil, fmt.Errorf("failed to list all object relations '%s': %w", objID.EncodeToString(), err)
}
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", cnrID.EncodeToString()),
zap.String("oid", objID.EncodeToString()), zap.Error(err))
return nil, nil
}
return append(oids, objID), nil
}
func prepareTokensParameter(ctx context.Context, bktOwner user.ID) relations.Tokens {
tokens := relations.Tokens{}

View file

@ -182,4 +182,5 @@ const (
FailedToCreateWorkerPool = "failed to create worker pool"
FailedToListAllObjectRelations = "failed to list all object relations"
WarnInvalidTypeTLSTerminationHeader = "invalid type of value of tls termination header"
FailedToPutTombstones = "failed to put tombstones"
)