forked from TrueCloudLab/frostfs-s3-gw
[#601] Use tombstone batching during delete-objects
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
619385836d
commit
0e68222817
9 changed files with 1180 additions and 218 deletions
|
@ -225,6 +225,9 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re
|
|||
}
|
||||
if deletedObj.DeleteMarkerVersionID != "" {
|
||||
deletedObj.DeleteMarker = true
|
||||
if obj.VersionID != "" {
|
||||
deletedObj.DeleteMarkerVersionID = obj.VersionID
|
||||
}
|
||||
}
|
||||
response.DeletedObjects = append(response.DeletedObjects, deletedObj)
|
||||
}
|
||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -50,35 +50,69 @@ func TestDeleteMultipartAllParts(t *testing.T) {
|
|||
partSize := layer.UploadMinSize
|
||||
objLen := 6 * partSize
|
||||
|
||||
bktName, bktName2, objName := "bucket", "bucket2", "object"
|
||||
t.Run("single delete", func(t *testing.T) {
|
||||
bktName, bktName2, objName := "bucket", "bucket2", "object"
|
||||
|
||||
// unversioned bucket
|
||||
createTestBucket(hc, bktName)
|
||||
multipartUpload(hc, bktName, objName, nil, objLen, partSize)
|
||||
hc.tp.ClearTombstoneOIDCount()
|
||||
deleteObject(t, hc, bktName, objName, emptyVersion)
|
||||
require.Empty(t, hc.tp.Objects())
|
||||
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||
// unversioned bucket
|
||||
createTestBucket(hc, bktName)
|
||||
multipartUpload(hc, bktName, objName, nil, objLen, partSize)
|
||||
hc.tp.ClearTombstoneOIDCount()
|
||||
deleteObject(t, hc, bktName, objName, emptyVersion)
|
||||
require.Empty(t, hc.tp.Objects())
|
||||
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||
|
||||
// encrypted multipart
|
||||
multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize)
|
||||
hc.tp.ClearTombstoneOIDCount()
|
||||
deleteObject(t, hc, bktName, objName, emptyVersion)
|
||||
require.Empty(t, hc.tp.Objects())
|
||||
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||
// encrypted multipart
|
||||
multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize)
|
||||
hc.tp.ClearTombstoneOIDCount()
|
||||
deleteObject(t, hc, bktName, objName, emptyVersion)
|
||||
require.Empty(t, hc.tp.Objects())
|
||||
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||
|
||||
// versions bucket
|
||||
createTestBucket(hc, bktName2)
|
||||
putBucketVersioning(t, hc, bktName2, true)
|
||||
multipartUpload(hc, bktName2, objName, nil, objLen, partSize)
|
||||
_, hdr := getObject(hc, bktName2, objName)
|
||||
versionID := hdr.Get("X-Amz-Version-Id")
|
||||
hc.tp.ClearTombstoneOIDCount()
|
||||
deleteObject(t, hc, bktName2, objName, emptyVersion)
|
||||
require.Equal(t, 0, hc.tp.TombstoneOIDCount())
|
||||
deleteObject(t, hc, bktName2, objName, versionID)
|
||||
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||
require.Empty(t, hc.tp.Objects())
|
||||
// versions bucket
|
||||
createTestBucket(hc, bktName2)
|
||||
putBucketVersioning(t, hc, bktName2, true)
|
||||
multipartUpload(hc, bktName2, objName, nil, objLen, partSize)
|
||||
_, hdr := getObject(hc, bktName2, objName)
|
||||
versionID := hdr.Get("X-Amz-Version-Id")
|
||||
hc.tp.ClearTombstoneOIDCount()
|
||||
deleteObject(t, hc, bktName2, objName, emptyVersion)
|
||||
require.Equal(t, 0, hc.tp.TombstoneOIDCount())
|
||||
deleteObject(t, hc, bktName2, objName, versionID)
|
||||
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||
require.Empty(t, hc.tp.Objects())
|
||||
})
|
||||
|
||||
t.Run("multi delete", func(t *testing.T) {
|
||||
bktName, bktName2, objName := "bucket-multi", "bucket2-multi", "object"
|
||||
|
||||
// unversioned bucket
|
||||
createTestBucket(hc, bktName)
|
||||
multipartUpload(hc, bktName, objName, nil, objLen, partSize)
|
||||
hc.tp.ClearTombstoneOIDCount()
|
||||
deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}})
|
||||
require.Empty(t, hc.tp.Objects())
|
||||
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||
|
||||
// encrypted multipart
|
||||
multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize)
|
||||
hc.tp.ClearTombstoneOIDCount()
|
||||
deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}})
|
||||
require.Empty(t, hc.tp.Objects())
|
||||
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||
|
||||
// versions bucket
|
||||
createTestBucket(hc, bktName2)
|
||||
putBucketVersioning(t, hc, bktName2, true)
|
||||
multipartUpload(hc, bktName2, objName, nil, objLen, partSize)
|
||||
_, hdr := getObject(hc, bktName2, objName)
|
||||
versionID := hdr.Get("X-Amz-Version-Id")
|
||||
hc.tp.ClearTombstoneOIDCount()
|
||||
deleteObjects(t, hc, bktName2, [][2]string{{objName, emptyVersion}})
|
||||
require.Equal(t, 0, hc.tp.TombstoneOIDCount())
|
||||
deleteObjects(t, hc, bktName2, [][2]string{{objName, versionID}})
|
||||
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||
require.Empty(t, hc.tp.Objects())
|
||||
})
|
||||
}
|
||||
|
||||
func TestSpecialMultipartName(t *testing.T) {
|
||||
|
|
77
api/layer/delete_test.go
Normal file
77
api/layer/delete_test.go
Normal file
|
@ -0,0 +1,77 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDelete(t *testing.T) {
|
||||
tc := prepareContext(t)
|
||||
|
||||
netInfo := netmap.NetworkInfo{}
|
||||
settings := &data.BucketSettings{Versioning: data.VersioningSuspended}
|
||||
|
||||
t.Run("single delete", func(t *testing.T) {
|
||||
obj1 := tc.putObjectNamed(tc.bktInfo, "obj1", []byte("obj1"))
|
||||
obj2 := tc.putObjectNamed(tc.bktInfo, "obj2", []byte("obj2"))
|
||||
|
||||
cnrID := tc.bktInfo.CID.EncodeToString()
|
||||
|
||||
tmock := tc.layer.treeService.(*TreeServiceMock)
|
||||
list := tmock.versions[cnrID][obj1.Name]
|
||||
tmock.versions[cnrID][obj1.Name] = append(list, tmock.versions[cnrID][obj2.Name]...)
|
||||
|
||||
tc.testFrostFS.SetObjectError(obj2.Address(), errors.New("obj error"))
|
||||
|
||||
prm := &DeleteObjectParams{
|
||||
BktInfo: tc.bktInfo,
|
||||
Objects: []*VersionedObject{{Name: obj1.Name}},
|
||||
Settings: settings,
|
||||
NetworkInfo: netInfo,
|
||||
IsMultiple: false,
|
||||
}
|
||||
|
||||
vos := tc.layer.DeleteObjects(tc.ctx, prm)
|
||||
require.Len(t, vos, 1)
|
||||
require.Error(t, vos[0].Error)
|
||||
|
||||
list = tmock.versions[cnrID][obj1.Name]
|
||||
require.Len(t, list, 1)
|
||||
require.False(t, list[0].IsDeleteMarker)
|
||||
require.Equal(t, obj2.Name, list[0].FilePath)
|
||||
})
|
||||
|
||||
t.Run("multiple delete", func(t *testing.T) {
|
||||
obj3 := tc.putObjectNamed(tc.bktInfo, "obj3", []byte("obj3"))
|
||||
obj4 := tc.putObjectNamed(tc.bktInfo, "obj4", []byte("obj4"))
|
||||
|
||||
cnrID := tc.bktInfo.CID.EncodeToString()
|
||||
|
||||
tmock := tc.layer.treeService.(*TreeServiceMock)
|
||||
list := tmock.versions[cnrID][obj3.Name]
|
||||
tmock.versions[cnrID][obj3.Name] = append(list, tmock.versions[cnrID][obj4.Name]...)
|
||||
|
||||
tc.testFrostFS.SetObjectError(obj4.Address(), errors.New("obj error"))
|
||||
|
||||
prm := &DeleteObjectParams{
|
||||
BktInfo: tc.bktInfo,
|
||||
Objects: []*VersionedObject{{Name: obj3.Name}},
|
||||
Settings: settings,
|
||||
NetworkInfo: netInfo,
|
||||
IsMultiple: true,
|
||||
}
|
||||
|
||||
vos := tc.layer.DeleteObjects(tc.ctx, prm)
|
||||
require.Len(t, vos, 1)
|
||||
require.Error(t, vos[0].Error)
|
||||
|
||||
list = tmock.versions[cnrID][obj3.Name]
|
||||
require.Len(t, list, 1)
|
||||
require.False(t, list[0].IsDeleteMarker)
|
||||
require.Equal(t, obj4.Name, list[0].FilePath)
|
||||
})
|
||||
}
|
|
@ -516,7 +516,9 @@ func (t *TestFrostFS) PatchObject(ctx context.Context, prm frostfs.PrmObjectPatc
|
|||
}
|
||||
|
||||
func (t *TestFrostFS) Relations() relations.Relations {
|
||||
return &RelationsMock{}
|
||||
return &RelationsMock{
|
||||
objectErrors: t.objectErrors,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) AddContainerPolicyChain(_ context.Context, prm frostfs.PrmAddContainerPolicyChain) error {
|
||||
|
@ -560,9 +562,16 @@ func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
type RelationsMock struct{}
|
||||
type RelationsMock struct {
|
||||
objectErrors map[string]error
|
||||
}
|
||||
|
||||
func (r *RelationsMock) GetSplitInfo(_ context.Context, cnrID cid.ID, objID oid.ID, _ relations.Tokens) (*object.SplitInfo, error) {
|
||||
addr := newAddress(cnrID, objID)
|
||||
if err := r.objectErrors[addr.EncodeToString()]; err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
func (r *RelationsMock) GetSplitInfo(context.Context, cid.ID, oid.ID, relations.Tokens) (*object.SplitInfo, error) {
|
||||
return nil, relations.ErrNoSplitInfo
|
||||
}
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
|
@ -28,6 +29,7 @@ import (
|
|||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
|
@ -818,14 +820,222 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
|
|||
|
||||
// DeleteObjects from the storage.
|
||||
func (n *Layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*VersionedObject {
|
||||
for i, obj := range p.Objects {
|
||||
p.Objects[i] = n.deleteObject(ctx, p.BktInfo, p.Settings, obj, p.NetworkInfo)
|
||||
if p.IsMultiple && p.Objects[i].Error != nil {
|
||||
n.reqLogger(ctx).Error(logs.CouldntDeleteObject, zap.String("object", obj.String()), zap.Error(p.Objects[i].Error))
|
||||
if !p.IsMultiple {
|
||||
for i, obj := range p.Objects {
|
||||
p.Objects[i] = n.deleteObject(ctx, p.BktInfo, p.Settings, obj, p.NetworkInfo)
|
||||
}
|
||||
return p.Objects
|
||||
}
|
||||
|
||||
return p.Objects
|
||||
maxLifetime := n.features.TombstoneLifetime()
|
||||
inputCh := make(chan tombstoneData, len(p.Objects))
|
||||
outputCh := n.submitPutTombstoneMultipleDelete(ctx, p.BktInfo, p.NetworkInfo.CurrentEpoch()+maxLifetime, inputCh)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(1)
|
||||
|
||||
resObjects := make([]*VersionedObject, 0, len(p.Objects))
|
||||
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case r, ok := <-outputCh:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if r.err != nil && !client.IsErrObjectAlreadyRemoved(r.err) && !client.IsErrObjectNotFound(r.err) {
|
||||
n.reqLogger(ctx).Error(logs.FailedToPutTombstones, zap.Error(r.err))
|
||||
|
||||
for _, obj := range r.objs {
|
||||
obj.obj.Error = r.err
|
||||
resObjects = append(resObjects, obj.obj)
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
LOOP:
|
||||
for _, obj := range r.objs {
|
||||
for _, node := range obj.nodes {
|
||||
if err := n.treeService.RemoveVersion(ctx, p.BktInfo, node.ID); err != nil {
|
||||
n.reqLogger(ctx).Error(logs.CouldntDeleteObject, zap.String("object", node.OID.EncodeToString()), zap.Error(err))
|
||||
obj.obj.Error = r.err
|
||||
resObjects = append(resObjects, obj.obj)
|
||||
continue LOOP
|
||||
}
|
||||
if !node.IsDeleteMarker {
|
||||
n.cache.DeleteObject(newAddress(p.BktInfo.CID, node.OID))
|
||||
}
|
||||
}
|
||||
|
||||
if obj.needDeleteMarker && obj.obj.Error == nil {
|
||||
obj.obj = n.createDeleteMarker(ctx, p, obj.obj)
|
||||
}
|
||||
|
||||
resObjects = append(resObjects, obj.obj)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
tokens := prepareTokensParameter(ctx, p.BktInfo.Owner)
|
||||
for _, obj := range p.Objects {
|
||||
n.deleteObjectUsingBatchTombstone(ctx, p, obj, tokens, inputCh)
|
||||
}
|
||||
|
||||
close(inputCh)
|
||||
wg.Wait()
|
||||
|
||||
return resObjects
|
||||
}
|
||||
|
||||
// deleteObjectUsingBatchTombstone schedule object removing.
|
||||
// Method logic structure is similar to Layer.deleteObject.
|
||||
func (n *Layer) deleteObjectUsingBatchTombstone(ctx context.Context, p *DeleteObjectParams, obj *VersionedObject, tokens relations.Tokens, inputCh chan<- tombstoneData) {
|
||||
if len(obj.VersionID) != 0 || p.Settings.Unversioned() {
|
||||
var nodeVersions []*data.NodeVersion
|
||||
if nodeVersions, obj.Error = n.getNodeVersionsToDelete(ctx, p.BktInfo, obj); obj.Error != nil {
|
||||
inputCh <- tombstoneData{obj: n.handleNotFoundError(p.BktInfo, obj)}
|
||||
return
|
||||
}
|
||||
|
||||
for _, nodeVersion := range nodeVersions {
|
||||
if nodeVersion.IsDeleteMarker {
|
||||
obj.DeleteMarkVersion = obj.VersionID
|
||||
}
|
||||
}
|
||||
if !n.removeOldVersionUsingBatchTombstone(ctx, p.BktInfo, obj, nodeVersions, p.NetworkInfo, tokens, inputCh, false) {
|
||||
return
|
||||
}
|
||||
|
||||
n.cache.DeleteObjectName(p.BktInfo.CID, p.BktInfo.Name, obj.Name)
|
||||
return
|
||||
}
|
||||
|
||||
lastVersion, err := n.getLastNodeVersion(ctx, p.BktInfo, obj)
|
||||
if err != nil {
|
||||
obj.Error = err
|
||||
inputCh <- tombstoneData{obj: n.handleNotFoundError(p.BktInfo, obj)}
|
||||
return
|
||||
}
|
||||
|
||||
if p.Settings.VersioningSuspended() {
|
||||
obj.VersionID = data.UnversionedObjectVersionID
|
||||
|
||||
var nodeVersions []*data.NodeVersion
|
||||
if nodeVersions, obj.Error = n.getNodeVersionsToDelete(ctx, p.BktInfo, obj); obj.Error != nil {
|
||||
if !isNotFoundError(obj.Error) {
|
||||
inputCh <- tombstoneData{obj: obj}
|
||||
return
|
||||
}
|
||||
obj.Error = nil
|
||||
}
|
||||
|
||||
for i, nodeVersion := range nodeVersions {
|
||||
if nodeVersion.ID == lastVersion.ID && nodeVersion.IsDeleteMarker {
|
||||
nodeVersions = append(nodeVersions[:i], nodeVersions[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if lastVersion.IsDeleteMarker {
|
||||
obj.DeleteMarkVersion = lastVersion.OID.EncodeToString()
|
||||
}
|
||||
|
||||
if !n.removeOldVersionUsingBatchTombstone(ctx, p.BktInfo, obj, nodeVersions, p.NetworkInfo, tokens, inputCh, !lastVersion.IsDeleteMarker) {
|
||||
return
|
||||
}
|
||||
|
||||
n.cache.DeleteObjectName(p.BktInfo.CID, p.BktInfo.Name, obj.Name)
|
||||
return
|
||||
}
|
||||
|
||||
if lastVersion.IsDeleteMarker {
|
||||
obj.DeleteMarkVersion = lastVersion.OID.EncodeToString()
|
||||
inputCh <- tombstoneData{obj: obj}
|
||||
return
|
||||
}
|
||||
|
||||
inputCh <- tombstoneData{obj: n.createDeleteMarker(ctx, p, obj)}
|
||||
n.cache.DeleteObjectName(p.BktInfo.CID, p.BktInfo.Name, obj.Name)
|
||||
}
|
||||
|
||||
func (n *Layer) createDeleteMarker(ctx context.Context, p *DeleteObjectParams, obj *VersionedObject) *VersionedObject {
|
||||
randOID, err := getRandomOID()
|
||||
if err != nil {
|
||||
obj.Error = fmt.Errorf("couldn't get random oid: %w", err)
|
||||
return obj
|
||||
}
|
||||
|
||||
obj.DeleteMarkVersion = randOID.EncodeToString()
|
||||
now := TimeNow(ctx)
|
||||
newVersion := &data.NodeVersion{
|
||||
BaseNodeVersion: data.BaseNodeVersion{
|
||||
OID: randOID,
|
||||
FilePath: obj.Name,
|
||||
Created: &now,
|
||||
Owner: &n.gateOwner,
|
||||
IsDeleteMarker: true,
|
||||
CreationEpoch: p.NetworkInfo.CurrentEpoch(),
|
||||
},
|
||||
IsUnversioned: p.Settings.VersioningSuspended(),
|
||||
}
|
||||
|
||||
_, obj.Error = n.treeService.AddVersion(ctx, p.BktInfo, newVersion)
|
||||
return obj
|
||||
}
|
||||
|
||||
func (n *Layer) removeOldVersionUsingBatchTombstone(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject, nodeVersions []*data.NodeVersion, networkInfo netmap.NetworkInfo, tokens relations.Tokens, in chan<- tombstoneData, needDeleteMarker bool) bool {
|
||||
td := tombstoneData{
|
||||
needDeleteMarker: needDeleteMarker,
|
||||
obj: obj,
|
||||
nodes: make([]*data.NodeVersion, 0, len(nodeVersions)),
|
||||
members: make([]oid.ID, 0, len(nodeVersions)),
|
||||
}
|
||||
|
||||
for _, nodeVersion := range nodeVersions {
|
||||
if nodeVersion.IsDeleteMarker {
|
||||
td.nodes = append(td.nodes, nodeVersion)
|
||||
continue
|
||||
}
|
||||
|
||||
if nodeVersion.IsCombined {
|
||||
err := n.removeCombinedObject(ctx, bkt, nodeVersion, networkInfo)
|
||||
if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) {
|
||||
obj.Error = err
|
||||
in <- td
|
||||
return false
|
||||
}
|
||||
|
||||
td.nodes = append(td.nodes, nodeVersion)
|
||||
continue
|
||||
}
|
||||
|
||||
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, nodeVersion.OID, tokens)
|
||||
if err != nil {
|
||||
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
|
||||
zap.String("oid", nodeVersion.OID.EncodeToString()), zap.Error(err))
|
||||
|
||||
if client.IsErrObjectAlreadyRemoved(err) || client.IsErrObjectNotFound(err) {
|
||||
td.nodes = append(td.nodes, nodeVersion)
|
||||
continue
|
||||
}
|
||||
|
||||
obj.Error = err
|
||||
in <- td
|
||||
return false
|
||||
}
|
||||
|
||||
td.nodes = append(td.nodes, nodeVersion)
|
||||
td.members = append(td.members, append(oids, nodeVersion.OID)...)
|
||||
}
|
||||
|
||||
in <- td
|
||||
return true
|
||||
}
|
||||
|
||||
func (n *Layer) CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error) {
|
||||
|
|
|
@ -76,6 +76,93 @@ func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, me
|
|||
}
|
||||
}
|
||||
|
||||
type tombstoneData struct {
|
||||
needDeleteMarker bool
|
||||
obj *VersionedObject
|
||||
nodes []*data.NodeVersion
|
||||
members []oid.ID
|
||||
}
|
||||
|
||||
type tombstoneResp struct {
|
||||
objs []tombstoneRespObj
|
||||
err error
|
||||
}
|
||||
|
||||
type tombstoneRespObj struct {
|
||||
needDeleteMarker bool
|
||||
obj *VersionedObject
|
||||
nodes []*data.NodeVersion
|
||||
}
|
||||
|
||||
func (n *Layer) submitPutTombstoneMultipleDelete(ctx context.Context, bkt *data.BucketInfo, expEpoch uint64, ch <-chan tombstoneData) <-chan tombstoneResp {
|
||||
res := make(chan tombstoneResp, cap(ch))
|
||||
maxMembers := n.features.TombstoneMembersSize()
|
||||
|
||||
go func() {
|
||||
var wg sync.WaitGroup
|
||||
defer func() {
|
||||
wg.Wait()
|
||||
close(res)
|
||||
}()
|
||||
|
||||
var tr tombstoneResp
|
||||
var members []oid.ID
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case td, ok := <-ch:
|
||||
if !ok {
|
||||
if len(members) != 0 {
|
||||
n.submitPutTombstoneBatch(ctx, bkt, expEpoch, res, &wg, tr, members)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if len(td.members) == 0 {
|
||||
res <- tombstoneResp{objs: []tombstoneRespObj{{needDeleteMarker: td.needDeleteMarker, obj: td.obj, nodes: td.nodes}}, err: td.obj.Error}
|
||||
continue
|
||||
}
|
||||
|
||||
members = append(members, td.members...)
|
||||
tr.objs = append(tr.objs, tombstoneRespObj{needDeleteMarker: td.needDeleteMarker, obj: td.obj, nodes: td.nodes})
|
||||
|
||||
if len(members) > maxMembers {
|
||||
n.submitPutTombstoneBatch(ctx, bkt, expEpoch, res, &wg, tr, members)
|
||||
tr = tombstoneResp{}
|
||||
members = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return res
|
||||
}
|
||||
|
||||
func (n *Layer) submitPutTombstoneBatch(ctx context.Context, bkt *data.BucketInfo, expEpoch uint64, res chan<- tombstoneResp, wg *sync.WaitGroup, tr tombstoneResp, members []oid.ID) {
|
||||
tomb := object.NewTombstone()
|
||||
tomb.SetExpirationEpoch(expEpoch)
|
||||
tomb.SetMembers(members)
|
||||
|
||||
wg.Add(1)
|
||||
err := n.workerPool.Submit(func() {
|
||||
defer wg.Done()
|
||||
|
||||
if tr.err = n.putTombstoneObject(ctx, tomb, bkt); tr.err != nil {
|
||||
n.reqLogger(ctx).Warn(logs.FailedToPutTombstoneObject, zap.String("cid", bkt.CID.EncodeToString()), zap.Error(tr.err))
|
||||
}
|
||||
res <- tr
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
wg.Done()
|
||||
n.reqLogger(ctx).Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
|
||||
tr.err = fmt.Errorf("submit task to pool: %w", err)
|
||||
res <- tr
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone, bktInfo *data.BucketInfo) error {
|
||||
payload, err := tomb.Marshal()
|
||||
if err != nil {
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -248,6 +249,7 @@ func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo
|
|||
versions, ok := cnrVersionsMap[newVersion.FilePath]
|
||||
if !ok {
|
||||
cnrVersionsMap[newVersion.FilePath] = []*data.NodeVersion{newVersion}
|
||||
newVersion.ID = rand.Uint64()
|
||||
return newVersion.ID, nil
|
||||
}
|
||||
|
||||
|
@ -258,6 +260,8 @@ func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo
|
|||
if len(versions) != 0 {
|
||||
newVersion.ID = versions[len(versions)-1].ID + 1
|
||||
newVersion.Timestamp = versions[len(versions)-1].Timestamp + 1
|
||||
} else {
|
||||
newVersion.ID = rand.Uint64()
|
||||
}
|
||||
|
||||
result := versions
|
||||
|
|
|
@ -17,14 +17,19 @@ import (
|
|||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/panjf2000/ants/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (tc *testContext) putObject(content []byte) *data.ObjectInfo {
|
||||
return tc.putObjectNamed(tc.bktInfo, tc.obj, content)
|
||||
}
|
||||
|
||||
func (tc *testContext) putObjectNamed(bktInfo *data.BucketInfo, objName string, content []byte) *data.ObjectInfo {
|
||||
extObjInfo, err := tc.layer.PutObject(tc.ctx, &PutObjectParams{
|
||||
BktInfo: tc.bktInfo,
|
||||
Object: tc.obj,
|
||||
BktInfo: bktInfo,
|
||||
Object: objName,
|
||||
Size: ptr(uint64(len(content))),
|
||||
Reader: bytes.NewReader(content),
|
||||
Header: make(map[string]string),
|
||||
|
@ -170,12 +175,16 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext {
|
|||
var owner user.ID
|
||||
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
|
||||
|
||||
antPool, err := ants.NewPool(10)
|
||||
require.NoError(t, err)
|
||||
|
||||
layerCfg := &Config{
|
||||
Cache: NewCache(config),
|
||||
AnonKey: AnonymousKey{Key: key},
|
||||
TreeService: NewTreeService(),
|
||||
Features: &FeatureSettingsMock{},
|
||||
GateOwner: owner,
|
||||
WorkerPool: antPool,
|
||||
}
|
||||
|
||||
return &testContext{
|
||||
|
|
Loading…
Add table
Reference in a new issue