[#559] Return error from multipart deleting #574
6 changed files with 86 additions and 27 deletions
|
@ -55,13 +55,17 @@ func TestDeleteMultipartAllParts(t *testing.T) {
|
||||||
// unversioned bucket
|
// unversioned bucket
|
||||||
createTestBucket(hc, bktName)
|
createTestBucket(hc, bktName)
|
||||||
multipartUpload(hc, bktName, objName, nil, objLen, partSize)
|
multipartUpload(hc, bktName, objName, nil, objLen, partSize)
|
||||||
|
hc.tp.ClearTombstoneOIDCount()
|
||||||
deleteObject(t, hc, bktName, objName, emptyVersion)
|
deleteObject(t, hc, bktName, objName, emptyVersion)
|
||||||
require.Empty(t, hc.tp.Objects())
|
require.Empty(t, hc.tp.Objects())
|
||||||
|
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||||
|
|
||||||
// encrypted multipart
|
// encrypted multipart
|
||||||
multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize)
|
multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize)
|
||||||
|
hc.tp.ClearTombstoneOIDCount()
|
||||||
deleteObject(t, hc, bktName, objName, emptyVersion)
|
deleteObject(t, hc, bktName, objName, emptyVersion)
|
||||||
require.Empty(t, hc.tp.Objects())
|
require.Empty(t, hc.tp.Objects())
|
||||||
|
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||||
|
|
||||||
// versions bucket
|
// versions bucket
|
||||||
createTestBucket(hc, bktName2)
|
createTestBucket(hc, bktName2)
|
||||||
|
@ -69,8 +73,11 @@ func TestDeleteMultipartAllParts(t *testing.T) {
|
||||||
multipartUpload(hc, bktName2, objName, nil, objLen, partSize)
|
multipartUpload(hc, bktName2, objName, nil, objLen, partSize)
|
||||||
_, hdr := getObject(hc, bktName2, objName)
|
_, hdr := getObject(hc, bktName2, objName)
|
||||||
versionID := hdr.Get("X-Amz-Version-Id")
|
versionID := hdr.Get("X-Amz-Version-Id")
|
||||||
|
hc.tp.ClearTombstoneOIDCount()
|
||||||
deleteObject(t, hc, bktName2, objName, emptyVersion)
|
deleteObject(t, hc, bktName2, objName, emptyVersion)
|
||||||
|
require.Equal(t, 0, hc.tp.TombstoneOIDCount())
|
||||||
deleteObject(t, hc, bktName2, objName, versionID)
|
deleteObject(t, hc, bktName2, objName, versionID)
|
||||||
|
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||||
require.Empty(t, hc.tp.Objects())
|
require.Empty(t, hc.tp.Objects())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -75,13 +75,14 @@ func (k *FeatureSettingsMock) FormContainerZone(ns string) string {
|
||||||
var _ frostfs.FrostFS = (*TestFrostFS)(nil)
|
var _ frostfs.FrostFS = (*TestFrostFS)(nil)
|
||||||
|
|
||||||
type TestFrostFS struct {
|
type TestFrostFS struct {
|
||||||
objects map[string]*object.Object
|
objects map[string]*object.Object
|
||||||
objectErrors map[string]error
|
objectErrors map[string]error
|
||||||
objectPutErrors map[string]error
|
objectPutErrors map[string]error
|
||||||
containers map[string]*container.Container
|
containers map[string]*container.Container
|
||||||
chains map[string][]chain.Chain
|
chains map[string][]chain.Chain
|
||||||
currentEpoch uint64
|
currentEpoch uint64
|
||||||
key *keys.PrivateKey
|
key *keys.PrivateKey
|
||||||
|
tombstoneOIDCount int
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTestFrostFS(key *keys.PrivateKey) *TestFrostFS {
|
func NewTestFrostFS(key *keys.PrivateKey) *TestFrostFS {
|
||||||
|
@ -373,6 +374,7 @@ func (t *TestFrostFS) createTombstone(ctx context.Context, prm frostfs.PrmObject
|
||||||
if err = t.DeleteObject(ctx, prmDelete); err != nil {
|
if err = t.DeleteObject(ctx, prmDelete); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
t.tombstoneOIDCount++
|
||||||
}
|
}
|
||||||
|
|
||||||
return &frostfs.CreateObjectResult{
|
return &frostfs.CreateObjectResult{
|
||||||
|
@ -380,6 +382,14 @@ func (t *TestFrostFS) createTombstone(ctx context.Context, prm frostfs.PrmObject
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (t *TestFrostFS) TombstoneOIDCount() int {
|
||||||
|
return t.tombstoneOIDCount
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TestFrostFS) ClearTombstoneOIDCount() {
|
||||||
|
t.tombstoneOIDCount = 0
|
||||||
|
}
|
||||||
|
|
||||||
func (t *TestFrostFS) DeleteObject(ctx context.Context, prm frostfs.PrmObjectDelete) error {
|
func (t *TestFrostFS) DeleteObject(ctx context.Context, prm frostfs.PrmObjectDelete) error {
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(prm.Container)
|
addr.SetContainer(prm.Container)
|
||||||
|
|
|
@ -28,7 +28,6 @@ import (
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
@ -763,25 +762,29 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
tokens := prepareTokensParameter(ctx, bkt.Owner)
|
tokens := prepareTokensParameter(ctx, bkt.Owner)
|
||||||
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, nodeVersion.OID, tokens)
|
members := make([]oid.ID, 0)
|
||||||
if err != nil {
|
// First gateway tries to delete all object parts.
|
||||||
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
|
// In case of errors, abort multipart removal.
|
||||||
zap.String("oid", nodeVersion.OID.EncodeToString()), zap.Error(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
members := append(oids, nodeVersion.OID)
|
|
||||||
for _, part := range parts {
|
for _, part := range parts {
|
||||||
oids, err = relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, part.OID, tokens)
|
oids, err := n.getMembers(ctx, bkt.CID, part.OID, tokens)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
|
return err
|
||||||
zap.String("oid", part.OID.EncodeToString()), zap.Error(err))
|
|
||||||
}
|
}
|
||||||
|
members = append(members, oids...)
|
||||||
members = append(members, append(oids, part.OID)...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
n.putTombstones(ctx, bkt, networkInfo, members)
|
if err = n.putTombstones(ctx, bkt, networkInfo, members); err != nil {
|
||||||
return nil
|
return fmt.Errorf("put tombstones with parts: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If all parts were removed successfully, remove multipart linking object.
|
||||||
|
// Do not delete this object first, because gateway won't be able to find parts.
|
||||||
|
members, err = n.getMembers(ctx, bkt.CID, nodeVersion.OID, tokens)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return n.putTombstones(ctx, bkt, networkInfo, members)
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteObjects from the storage.
|
// DeleteObjects from the storage.
|
||||||
|
|
|
@ -586,12 +586,16 @@ func (n *Layer) deleteUploadedParts(ctx context.Context, bkt *data.BucketInfo, p
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
|
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
|
||||||
zap.String("oid", info.OID.EncodeToString()), zap.Error(err))
|
zap.String("oid", info.OID.EncodeToString()), zap.Error(err))
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
members = append(members, append(oids, info.OID)...)
|
members = append(members, append(oids, info.OID)...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
n.putTombstones(ctx, bkt, networkInfo, members)
|
err := n.putTombstones(ctx, bkt, networkInfo, members)
|
||||||
|
if err != nil {
|
||||||
|
n.reqLogger(ctx).Warn(logs.FailedToPutTombstones, zap.Error(err))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {
|
func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {
|
||||||
|
|
|
@ -13,6 +13,8 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -21,23 +23,39 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) {
|
func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) error {
|
||||||
|
if len(members) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
tombstoneMembersSize := n.features.TombstoneMembersSize()
|
tombstoneMembersSize := n.features.TombstoneMembersSize()
|
||||||
tombstoneLifetime := n.features.TombstoneLifetime()
|
tombstoneLifetime := n.features.TombstoneLifetime()
|
||||||
|
tombstonesCount := len(members) / tombstoneMembersSize
|
||||||
|
if len(members)%tombstoneMembersSize != 0 {
|
||||||
|
tombstonesCount++
|
||||||
|
}
|
||||||
|
errCh := make(chan error, tombstonesCount)
|
||||||
|
|
||||||
for i := 0; i < len(members); i += tombstoneMembersSize {
|
for i := 0; i < tombstonesCount; i++ {
|
||||||
end := tombstoneMembersSize * (i + 1)
|
end := tombstoneMembersSize * (i + 1)
|
||||||
if end > len(members) {
|
if end > len(members) {
|
||||||
end = len(members)
|
end = len(members)
|
||||||
}
|
}
|
||||||
n.submitPutTombstone(ctx, bkt, members[i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg)
|
n.submitPutTombstone(ctx, bkt, members[tombstoneMembersSize*i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg, errCh)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
close(errCh)
|
||||||
|
|
||||||
|
if err := <-errCh; err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup) {
|
func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup, errCh chan<- error) {
|
||||||
tomb := object.NewTombstone()
|
tomb := object.NewTombstone()
|
||||||
tomb.SetExpirationEpoch(expEpoch)
|
tomb.SetExpirationEpoch(expEpoch)
|
||||||
tomb.SetMembers(members)
|
tomb.SetMembers(members)
|
||||||
|
@ -48,11 +66,13 @@ func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, me
|
||||||
|
|
||||||
if err := n.putTombstoneObject(ctx, tomb, bkt); err != nil {
|
if err := n.putTombstoneObject(ctx, tomb, bkt); err != nil {
|
||||||
n.reqLogger(ctx).Warn(logs.FailedToPutTombstoneObject, zap.String("cid", bkt.CID.EncodeToString()), zap.Error(err))
|
n.reqLogger(ctx).Warn(logs.FailedToPutTombstoneObject, zap.String("cid", bkt.CID.EncodeToString()), zap.Error(err))
|
||||||
|
errCh <- fmt.Errorf("put tombstone object: %w", err)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
wg.Done()
|
wg.Done()
|
||||||
n.reqLogger(ctx).Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
|
n.reqLogger(ctx).Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
|
||||||
|
errCh <- fmt.Errorf("submit task to pool: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,6 +98,20 @@ func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone,
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (n *Layer) getMembers(ctx context.Context, cnrID cid.ID, objID oid.ID, tokens relations.Tokens) ([]oid.ID, error) {
|
||||||
|
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), cnrID, objID, tokens)
|
||||||
|
if err != nil {
|
||||||
|
if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) {
|
||||||
|
return nil, fmt.Errorf("failed to list all object relations '%s': %w", objID.EncodeToString(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", cnrID.EncodeToString()),
|
||||||
|
zap.String("oid", objID.EncodeToString()), zap.Error(err))
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
return append(oids, objID), nil
|
||||||
|
}
|
||||||
|
|
||||||
func prepareTokensParameter(ctx context.Context, bktOwner user.ID) relations.Tokens {
|
func prepareTokensParameter(ctx context.Context, bktOwner user.ID) relations.Tokens {
|
||||||
tokens := relations.Tokens{}
|
tokens := relations.Tokens{}
|
||||||
|
|
||||||
|
|
|
@ -182,4 +182,5 @@ const (
|
||||||
FailedToCreateWorkerPool = "failed to create worker pool"
|
FailedToCreateWorkerPool = "failed to create worker pool"
|
||||||
FailedToListAllObjectRelations = "failed to list all object relations"
|
FailedToListAllObjectRelations = "failed to list all object relations"
|
||||||
WarnInvalidTypeTLSTerminationHeader = "invalid type of value of tls termination header"
|
WarnInvalidTypeTLSTerminationHeader = "invalid type of value of tls termination header"
|
||||||
|
FailedToPutTombstones = "failed to put tombstones"
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in a new issue