[#559] Return error from multipart deleting #574
6 changed files with 86 additions and 27 deletions
|
@ -55,13 +55,17 @@ func TestDeleteMultipartAllParts(t *testing.T) {
|
|||
// unversioned bucket
|
||||
createTestBucket(hc, bktName)
|
||||
multipartUpload(hc, bktName, objName, nil, objLen, partSize)
|
||||
hc.tp.ClearTombstoneOIDCount()
|
||||
deleteObject(t, hc, bktName, objName, emptyVersion)
|
||||
require.Empty(t, hc.tp.Objects())
|
||||
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||
|
||||
// encrypted multipart
|
||||
multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize)
|
||||
hc.tp.ClearTombstoneOIDCount()
|
||||
deleteObject(t, hc, bktName, objName, emptyVersion)
|
||||
require.Empty(t, hc.tp.Objects())
|
||||
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||
|
||||
// versions bucket
|
||||
createTestBucket(hc, bktName2)
|
||||
|
@ -69,8 +73,11 @@ func TestDeleteMultipartAllParts(t *testing.T) {
|
|||
multipartUpload(hc, bktName2, objName, nil, objLen, partSize)
|
||||
_, hdr := getObject(hc, bktName2, objName)
|
||||
versionID := hdr.Get("X-Amz-Version-Id")
|
||||
hc.tp.ClearTombstoneOIDCount()
|
||||
deleteObject(t, hc, bktName2, objName, emptyVersion)
|
||||
require.Equal(t, 0, hc.tp.TombstoneOIDCount())
|
||||
deleteObject(t, hc, bktName2, objName, versionID)
|
||||
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||
require.Empty(t, hc.tp.Objects())
|
||||
}
|
||||
|
||||
|
|
|
@ -75,13 +75,14 @@ func (k *FeatureSettingsMock) FormContainerZone(ns string) string {
|
|||
var _ frostfs.FrostFS = (*TestFrostFS)(nil)
|
||||
|
||||
type TestFrostFS struct {
|
||||
objects map[string]*object.Object
|
||||
objectErrors map[string]error
|
||||
objectPutErrors map[string]error
|
||||
containers map[string]*container.Container
|
||||
chains map[string][]chain.Chain
|
||||
currentEpoch uint64
|
||||
key *keys.PrivateKey
|
||||
objects map[string]*object.Object
|
||||
objectErrors map[string]error
|
||||
objectPutErrors map[string]error
|
||||
containers map[string]*container.Container
|
||||
chains map[string][]chain.Chain
|
||||
currentEpoch uint64
|
||||
key *keys.PrivateKey
|
||||
tombstoneOIDCount int
|
||||
}
|
||||
|
||||
func NewTestFrostFS(key *keys.PrivateKey) *TestFrostFS {
|
||||
|
@ -373,6 +374,7 @@ func (t *TestFrostFS) createTombstone(ctx context.Context, prm frostfs.PrmObject
|
|||
if err = t.DeleteObject(ctx, prmDelete); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t.tombstoneOIDCount++
|
||||
}
|
||||
|
||||
return &frostfs.CreateObjectResult{
|
||||
|
@ -380,6 +382,14 @@ func (t *TestFrostFS) createTombstone(ctx context.Context, prm frostfs.PrmObject
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) TombstoneOIDCount() int {
|
||||
return t.tombstoneOIDCount
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) ClearTombstoneOIDCount() {
|
||||
t.tombstoneOIDCount = 0
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) DeleteObject(ctx context.Context, prm frostfs.PrmObjectDelete) error {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(prm.Container)
|
||||
|
|
|
@ -28,7 +28,6 @@ import (
|
|||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
|
@ -763,25 +762,29 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
|
|||
}
|
||||
|
||||
tokens := prepareTokensParameter(ctx, bkt.Owner)
|
||||
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, nodeVersion.OID, tokens)
|
||||
if err != nil {
|
||||
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
|
||||
zap.String("oid", nodeVersion.OID.EncodeToString()), zap.Error(err))
|
||||
}
|
||||
|
||||
members := append(oids, nodeVersion.OID)
|
||||
members := make([]oid.ID, 0)
|
||||
// First gateway tries to delete all object parts.
|
||||
// In case of errors, abort multipart removal.
|
||||
for _, part := range parts {
|
||||
oids, err = relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, part.OID, tokens)
|
||||
oids, err := n.getMembers(ctx, bkt.CID, part.OID, tokens)
|
||||
if err != nil {
|
||||
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
|
||||
zap.String("oid", part.OID.EncodeToString()), zap.Error(err))
|
||||
return err
|
||||
}
|
||||
|
||||
members = append(members, append(oids, part.OID)...)
|
||||
members = append(members, oids...)
|
||||
}
|
||||
|
||||
n.putTombstones(ctx, bkt, networkInfo, members)
|
||||
return nil
|
||||
if err = n.putTombstones(ctx, bkt, networkInfo, members); err != nil {
|
||||
return fmt.Errorf("put tombstones with parts: %w", err)
|
||||
}
|
||||
|
||||
// If all parts were removed successfully, remove multipart linking object.
|
||||
// Do not delete this object first, because gateway won't be able to find parts.
|
||||
members, err = n.getMembers(ctx, bkt.CID, nodeVersion.OID, tokens)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dkirillov marked this conversation as resolved
Outdated
|
||||
return n.putTombstones(ctx, bkt, networkInfo, members)
|
||||
}
|
||||
|
||||
a-savchuk
commented
Both container and object IDs are Both container and object IDs are `Stringer`s. There's no need to explicitly call `EncodeToString` when formatting or logging
mbiryukova
commented
Yes, but in most places it's called, so will leave the same Yes, but in most places it's called, so will leave the same
|
||||
// DeleteObjects from the storage.
|
||||
|
|
|
@ -586,12 +586,16 @@ func (n *Layer) deleteUploadedParts(ctx context.Context, bkt *data.BucketInfo, p
|
|||
if err != nil {
|
||||
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
|
||||
zap.String("oid", info.OID.EncodeToString()), zap.Error(err))
|
||||
continue
|
||||
dkirillov marked this conversation as resolved
Outdated
dkirillov
commented
Shouldn't we also check error and if it isn't Shouldn't we also check error and if it isn't `object already removed` or `object not found` then return error?
mbiryukova
commented
I did it the way it was before not to change behavior I did it the way it was [before](https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/src/commit/51322cccdf54e0e7bd4fd3fb0acd5553606498f1/api/layer/multipart_upload.go#L568) not to change behavior
|
||||
}
|
||||
members = append(members, append(oids, info.OID)...)
|
||||
}
|
||||
}
|
||||
|
||||
n.putTombstones(ctx, bkt, networkInfo, members)
|
||||
err := n.putTombstones(ctx, bkt, networkInfo, members)
|
||||
if err != nil {
|
||||
n.reqLogger(ctx).Warn(logs.FailedToPutTombstones, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {
|
||||
|
|
|
@ -13,6 +13,8 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -21,23 +23,39 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) {
|
||||
func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) error {
|
||||
if len(members) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
tombstoneMembersSize := n.features.TombstoneMembersSize()
|
||||
tombstoneLifetime := n.features.TombstoneLifetime()
|
||||
tombstonesCount := len(members) / tombstoneMembersSize
|
||||
if len(members)%tombstoneMembersSize != 0 {
|
||||
tombstonesCount++
|
||||
}
|
||||
errCh := make(chan error, tombstonesCount)
|
||||
|
||||
for i := 0; i < len(members); i += tombstoneMembersSize {
|
||||
for i := 0; i < tombstonesCount; i++ {
|
||||
end := tombstoneMembersSize * (i + 1)
|
||||
if end > len(members) {
|
||||
end = len(members)
|
||||
}
|
||||
n.submitPutTombstone(ctx, bkt, members[i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg)
|
||||
n.submitPutTombstone(ctx, bkt, members[tombstoneMembersSize*i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg, errCh)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(errCh)
|
||||
|
||||
if err := <-errCh; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup) {
|
||||
func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup, errCh chan<- error) {
|
||||
tomb := object.NewTombstone()
|
||||
tomb.SetExpirationEpoch(expEpoch)
|
||||
nzinkevich
commented
It seems that we only return the first error from errCh in It seems that we only return the first error from errCh in `putTombstones`. Should we leave log warnings for the rest?
mbiryukova
commented
I can add logging of all errors from channel in I can add logging of all errors from channel in `putTombstones`
alexvanin
commented
I don't mind having logs here or log it all at once in I don't mind having logs here or log it all at once in `putTombstones`. But we should definitely log it.
|
||||
tomb.SetMembers(members)
|
||||
|
@ -48,11 +66,13 @@ func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, me
|
|||
|
||||
if err := n.putTombstoneObject(ctx, tomb, bkt); err != nil {
|
||||
n.reqLogger(ctx).Warn(logs.FailedToPutTombstoneObject, zap.String("cid", bkt.CID.EncodeToString()), zap.Error(err))
|
||||
errCh <- fmt.Errorf("put tombstone object: %w", err)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
wg.Done()
|
||||
n.reqLogger(ctx).Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
|
||||
errCh <- fmt.Errorf("submit task to pool: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -78,6 +98,20 @@ func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone,
|
|||
return err
|
||||
}
|
||||
|
||||
func (n *Layer) getMembers(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
|
||||
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), cnrID, objID, tokens)
|
||||
if err != nil {
|
||||
if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) {
|
||||
return nil, fmt.Errorf("failed to list all object relations '%s': %w", objID.EncodeToString(), err)
|
||||
}
|
||||
|
||||
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", cnrID.EncodeToString()),
|
||||
zap.String("oid", objID.EncodeToString()), zap.Error(err))
|
||||
return nil, nil
|
||||
}
|
||||
return append(oids, objID), nil
|
||||
}
|
||||
|
||||
func prepareTokensParameter(ctx context.Context, bktOwner user.ID) relations.Tokens {
|
||||
tokens := relations.Tokens{}
|
||||
|
||||
|
|
|
@ -182,4 +182,5 @@ const (
|
|||
FailedToCreateWorkerPool = "failed to create worker pool"
|
||||
FailedToListAllObjectRelations = "failed to list all object relations"
|
||||
WarnInvalidTypeTLSTerminationHeader = "invalid type of value of tls termination header"
|
||||
FailedToPutTombstones = "failed to put tombstones"
|
||||
)
|
||||
|
|
Loading…
Reference in a new issue
Can we use something like this, to simplify a little?
nodeVersion.OID
is deleted separately so that if errors occur during removal of parts, combined object is not deleted. It will allow to remove multipart againWe can leave a comment in code about this idea.
I just meant to do this in one
for
(or extract duplicated code to function)