[#601] Use tombstone batching during delete-objects
All checks were successful
/ DCO (pull_request) Successful in 35s
/ Vulncheck (pull_request) Successful in 1m3s
/ Builds (pull_request) Successful in 1m14s
/ Lint (pull_request) Successful in 2m7s
/ Tests (pull_request) Successful in 1m24s
/ OCI image (pull_request) Successful in 2m11s
All checks were successful
/ DCO (pull_request) Successful in 35s
/ Vulncheck (pull_request) Successful in 1m3s
/ Builds (pull_request) Successful in 1m14s
/ Lint (pull_request) Successful in 2m7s
/ Tests (pull_request) Successful in 1m24s
/ OCI image (pull_request) Successful in 2m11s
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
0fba02aadb
commit
0f656b1471
9 changed files with 1184 additions and 218 deletions
|
@ -223,6 +223,9 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re
|
||||||
}
|
}
|
||||||
if deletedObj.DeleteMarkerVersionID != "" {
|
if deletedObj.DeleteMarkerVersionID != "" {
|
||||||
deletedObj.DeleteMarker = true
|
deletedObj.DeleteMarker = true
|
||||||
|
if obj.VersionID != "" {
|
||||||
|
deletedObj.DeleteMarkerVersionID = obj.VersionID
|
||||||
|
}
|
||||||
}
|
}
|
||||||
response.DeletedObjects = append(response.DeletedObjects, deletedObj)
|
response.DeletedObjects = append(response.DeletedObjects, deletedObj)
|
||||||
}
|
}
|
||||||
|
|
File diff suppressed because it is too large
Load diff
|
@ -50,35 +50,69 @@ func TestDeleteMultipartAllParts(t *testing.T) {
|
||||||
partSize := layer.UploadMinSize
|
partSize := layer.UploadMinSize
|
||||||
objLen := 6 * partSize
|
objLen := 6 * partSize
|
||||||
|
|
||||||
bktName, bktName2, objName := "bucket", "bucket2", "object"
|
t.Run("single delete", func(t *testing.T) {
|
||||||
|
bktName, bktName2, objName := "bucket", "bucket2", "object"
|
||||||
|
|
||||||
// unversioned bucket
|
// unversioned bucket
|
||||||
createTestBucket(hc, bktName)
|
createTestBucket(hc, bktName)
|
||||||
multipartUpload(hc, bktName, objName, nil, objLen, partSize)
|
multipartUpload(hc, bktName, objName, nil, objLen, partSize)
|
||||||
hc.tp.ClearTombstoneOIDCount()
|
hc.tp.ClearTombstoneOIDCount()
|
||||||
deleteObject(t, hc, bktName, objName, emptyVersion)
|
deleteObject(t, hc, bktName, objName, emptyVersion)
|
||||||
require.Empty(t, hc.tp.Objects())
|
require.Empty(t, hc.tp.Objects())
|
||||||
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||||
|
|
||||||
// encrypted multipart
|
// encrypted multipart
|
||||||
multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize)
|
multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize)
|
||||||
hc.tp.ClearTombstoneOIDCount()
|
hc.tp.ClearTombstoneOIDCount()
|
||||||
deleteObject(t, hc, bktName, objName, emptyVersion)
|
deleteObject(t, hc, bktName, objName, emptyVersion)
|
||||||
require.Empty(t, hc.tp.Objects())
|
require.Empty(t, hc.tp.Objects())
|
||||||
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||||
|
|
||||||
// versions bucket
|
// versions bucket
|
||||||
createTestBucket(hc, bktName2)
|
createTestBucket(hc, bktName2)
|
||||||
putBucketVersioning(t, hc, bktName2, true)
|
putBucketVersioning(t, hc, bktName2, true)
|
||||||
multipartUpload(hc, bktName2, objName, nil, objLen, partSize)
|
multipartUpload(hc, bktName2, objName, nil, objLen, partSize)
|
||||||
_, hdr := getObject(hc, bktName2, objName)
|
_, hdr := getObject(hc, bktName2, objName)
|
||||||
versionID := hdr.Get("X-Amz-Version-Id")
|
versionID := hdr.Get("X-Amz-Version-Id")
|
||||||
hc.tp.ClearTombstoneOIDCount()
|
hc.tp.ClearTombstoneOIDCount()
|
||||||
deleteObject(t, hc, bktName2, objName, emptyVersion)
|
deleteObject(t, hc, bktName2, objName, emptyVersion)
|
||||||
require.Equal(t, 0, hc.tp.TombstoneOIDCount())
|
require.Equal(t, 0, hc.tp.TombstoneOIDCount())
|
||||||
deleteObject(t, hc, bktName2, objName, versionID)
|
deleteObject(t, hc, bktName2, objName, versionID)
|
||||||
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||||
require.Empty(t, hc.tp.Objects())
|
require.Empty(t, hc.tp.Objects())
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("multi delete", func(t *testing.T) {
|
||||||
|
bktName, bktName2, objName := "bucket-multi", "bucket2-multi", "object"
|
||||||
|
|
||||||
|
// unversioned bucket
|
||||||
|
createTestBucket(hc, bktName)
|
||||||
|
multipartUpload(hc, bktName, objName, nil, objLen, partSize)
|
||||||
|
hc.tp.ClearTombstoneOIDCount()
|
||||||
|
deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}})
|
||||||
|
require.Empty(t, hc.tp.Objects())
|
||||||
|
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||||
|
|
||||||
|
// encrypted multipart
|
||||||
|
multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize)
|
||||||
|
hc.tp.ClearTombstoneOIDCount()
|
||||||
|
deleteObjects(t, hc, bktName, [][2]string{{objName, emptyVersion}})
|
||||||
|
require.Empty(t, hc.tp.Objects())
|
||||||
|
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||||
|
|
||||||
|
// versions bucket
|
||||||
|
createTestBucket(hc, bktName2)
|
||||||
|
putBucketVersioning(t, hc, bktName2, true)
|
||||||
|
multipartUpload(hc, bktName2, objName, nil, objLen, partSize)
|
||||||
|
_, hdr := getObject(hc, bktName2, objName)
|
||||||
|
versionID := hdr.Get("X-Amz-Version-Id")
|
||||||
|
hc.tp.ClearTombstoneOIDCount()
|
||||||
|
deleteObjects(t, hc, bktName2, [][2]string{{objName, emptyVersion}})
|
||||||
|
require.Equal(t, 0, hc.tp.TombstoneOIDCount())
|
||||||
|
deleteObjects(t, hc, bktName2, [][2]string{{objName, versionID}})
|
||||||
|
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||||
|
require.Empty(t, hc.tp.Objects())
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestMultipartCopiesNumber(t *testing.T) {
|
func TestMultipartCopiesNumber(t *testing.T) {
|
||||||
|
|
80
api/layer/delete_test.go
Normal file
80
api/layer/delete_test.go
Normal file
|
@ -0,0 +1,80 @@
|
||||||
|
package layer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDelete(t *testing.T) {
|
||||||
|
t.Run("single delete", func(t *testing.T) {
|
||||||
|
tc := prepareContext(t)
|
||||||
|
netInfo := netmap.NetworkInfo{}
|
||||||
|
settings := &data.BucketSettings{Versioning: data.VersioningSuspended}
|
||||||
|
|
||||||
|
obj1 := tc.putObjectNamed(tc.bktInfo, "obj1", []byte("obj1"))
|
||||||
|
obj2 := tc.putObjectNamed(tc.bktInfo, "obj2", []byte("obj2"))
|
||||||
|
|
||||||
|
cnrID := tc.bktInfo.CID.EncodeToString()
|
||||||
|
|
||||||
|
tmock := tc.layer.treeService.(*TreeServiceMock)
|
||||||
|
list := tmock.versions[cnrID][obj1.Name]
|
||||||
|
tmock.versions[cnrID][obj1.Name] = append(list, tmock.versions[cnrID][obj2.Name]...)
|
||||||
|
|
||||||
|
tc.testFrostFS.SetObjectError(obj2.Address(), errors.New("obj error"))
|
||||||
|
|
||||||
|
prm := &DeleteObjectParams{
|
||||||
|
BktInfo: tc.bktInfo,
|
||||||
|
Objects: []*VersionedObject{{Name: obj1.Name}},
|
||||||
|
Settings: settings,
|
||||||
|
NetworkInfo: netInfo,
|
||||||
|
IsMultiple: false,
|
||||||
|
}
|
||||||
|
|
||||||
|
vos := tc.layer.DeleteObjects(tc.ctx, prm)
|
||||||
|
require.Len(t, vos, 1)
|
||||||
|
require.Error(t, vos[0].Error)
|
||||||
|
|
||||||
|
list = tmock.versions[cnrID][obj1.Name]
|
||||||
|
require.Len(t, list, 1)
|
||||||
|
require.False(t, list[0].IsDeleteMarker)
|
||||||
|
require.Equal(t, obj2.Name, list[0].FilePath)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("multiple delete", func(t *testing.T) {
|
||||||
|
tc := prepareContext(t)
|
||||||
|
netInfo := netmap.NetworkInfo{}
|
||||||
|
settings := &data.BucketSettings{Versioning: data.VersioningSuspended}
|
||||||
|
|
||||||
|
obj3 := tc.putObjectNamed(tc.bktInfo, "obj3", []byte("obj3"))
|
||||||
|
obj4 := tc.putObjectNamed(tc.bktInfo, "obj4", []byte("obj4"))
|
||||||
|
|
||||||
|
cnrID := tc.bktInfo.CID.EncodeToString()
|
||||||
|
|
||||||
|
tmock := tc.layer.treeService.(*TreeServiceMock)
|
||||||
|
list := tmock.versions[cnrID][obj3.Name]
|
||||||
|
tmock.versions[cnrID][obj3.Name] = append(list, tmock.versions[cnrID][obj4.Name]...)
|
||||||
|
|
||||||
|
tc.testFrostFS.SetObjectError(obj4.Address(), errors.New("obj error"))
|
||||||
|
|
||||||
|
prm := &DeleteObjectParams{
|
||||||
|
BktInfo: tc.bktInfo,
|
||||||
|
Objects: []*VersionedObject{{Name: obj3.Name}},
|
||||||
|
Settings: settings,
|
||||||
|
NetworkInfo: netInfo,
|
||||||
|
IsMultiple: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
vos := tc.layer.DeleteObjects(tc.ctx, prm)
|
||||||
|
require.Len(t, vos, 1)
|
||||||
|
require.Error(t, vos[0].Error)
|
||||||
|
|
||||||
|
list = tmock.versions[cnrID][obj3.Name]
|
||||||
|
require.Len(t, list, 1)
|
||||||
|
require.False(t, list[0].IsDeleteMarker)
|
||||||
|
require.Equal(t, obj4.Name, list[0].FilePath)
|
||||||
|
})
|
||||||
|
}
|
|
@ -524,7 +524,9 @@ func (t *TestFrostFS) PatchObject(ctx context.Context, prm frostfs.PrmObjectPatc
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TestFrostFS) Relations() relations.Relations {
|
func (t *TestFrostFS) Relations() relations.Relations {
|
||||||
return &RelationsMock{}
|
return &RelationsMock{
|
||||||
|
objectErrors: t.objectErrors,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TestFrostFS) AddContainerPolicyChain(_ context.Context, prm frostfs.PrmAddContainerPolicyChain) error {
|
func (t *TestFrostFS) AddContainerPolicyChain(_ context.Context, prm frostfs.PrmAddContainerPolicyChain) error {
|
||||||
|
@ -568,9 +570,16 @@ func isMatched(attributes []object.Attribute, filter object.SearchFilter) bool {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
type RelationsMock struct{}
|
type RelationsMock struct {
|
||||||
|
objectErrors map[string]error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RelationsMock) GetSplitInfo(_ context.Context, cnrID cid.ID, objID oid.ID, _ relations.Tokens) (*object.SplitInfo, error) {
|
||||||
|
addr := newAddress(cnrID, objID)
|
||||||
|
if err := r.objectErrors[addr.EncodeToString()]; err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
func (r *RelationsMock) GetSplitInfo(context.Context, cid.ID, oid.ID, relations.Tokens) (*object.SplitInfo, error) {
|
|
||||||
return nil, relations.ErrNoSplitInfo
|
return nil, relations.ErrNoSplitInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
|
@ -29,6 +30,7 @@ import (
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/relations"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
@ -844,14 +846,223 @@ func (n *Layer) DeleteObjects(ctx context.Context, p *DeleteObjectParams) []*Ver
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "layer.DeleteObjects")
|
ctx, span := tracing.StartSpanFromContext(ctx, "layer.DeleteObjects")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
for i, obj := range p.Objects {
|
if !p.IsMultiple {
|
||||||
p.Objects[i] = n.deleteObject(ctx, p.BktInfo, p.Settings, obj, p.NetworkInfo)
|
for i, obj := range p.Objects {
|
||||||
if p.IsMultiple && p.Objects[i].Error != nil {
|
p.Objects[i] = n.deleteObject(ctx, p.BktInfo, p.Settings, obj, p.NetworkInfo)
|
||||||
n.reqLogger(ctx).Error(logs.CouldntDeleteObject, zap.String("object", obj.String()), zap.Error(p.Objects[i].Error), logs.TagField(logs.TagExternalStorage))
|
|
||||||
}
|
}
|
||||||
|
return p.Objects
|
||||||
}
|
}
|
||||||
|
|
||||||
return p.Objects
|
maxLifetime := n.features.TombstoneLifetime()
|
||||||
|
inputCh := make(chan tombstoneData, len(p.Objects))
|
||||||
|
outputCh := n.submitPutTombstoneMultipleDelete(ctx, p.BktInfo, p.NetworkInfo.CurrentEpoch()+maxLifetime, inputCh)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(1)
|
||||||
|
|
||||||
|
resObjects := make([]*VersionedObject, 0, len(p.Objects))
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case r, ok := <-outputCh:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.err != nil && !client.IsErrObjectAlreadyRemoved(r.err) && !client.IsErrObjectNotFound(r.err) {
|
||||||
|
n.reqLogger(ctx).Error(logs.FailedToPutTombstones, zap.Error(r.err), logs.TagField(logs.TagExternalStorage))
|
||||||
|
|
||||||
|
for _, obj := range r.objs {
|
||||||
|
obj.obj.Error = r.err
|
||||||
|
resObjects = append(resObjects, obj.obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
LOOP:
|
||||||
|
for _, obj := range r.objs {
|
||||||
|
for _, node := range obj.nodes {
|
||||||
|
if err := n.treeService.RemoveVersion(ctx, p.BktInfo, node.ID); err != nil {
|
||||||
|
n.reqLogger(ctx).Error(logs.CouldntDeleteObject, zap.String("object", node.OID.EncodeToString()),
|
||||||
|
zap.Error(err), logs.TagField(logs.TagExternalStorage))
|
||||||
|
obj.obj.Error = r.err
|
||||||
|
resObjects = append(resObjects, obj.obj)
|
||||||
|
continue LOOP
|
||||||
|
}
|
||||||
|
if !node.IsDeleteMarker {
|
||||||
|
n.cache.DeleteObject(newAddress(p.BktInfo.CID, node.OID))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if obj.needDeleteMarker && obj.obj.Error == nil {
|
||||||
|
obj.obj = n.createDeleteMarker(ctx, p, obj.obj)
|
||||||
|
}
|
||||||
|
|
||||||
|
resObjects = append(resObjects, obj.obj)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
tokens := prepareTokensParameter(ctx, p.BktInfo.Owner)
|
||||||
|
for _, obj := range p.Objects {
|
||||||
|
n.deleteObjectUsingBatchTombstone(ctx, p, obj, tokens, inputCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
close(inputCh)
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
return resObjects
|
||||||
|
}
|
||||||
|
|
||||||
|
// deleteObjectUsingBatchTombstone schedule object removing.
|
||||||
|
// Method logic structure is similar to Layer.deleteObject.
|
||||||
|
func (n *Layer) deleteObjectUsingBatchTombstone(ctx context.Context, p *DeleteObjectParams, obj *VersionedObject, tokens relations.Tokens, inputCh chan<- tombstoneData) {
|
||||||
|
if len(obj.VersionID) != 0 || p.Settings.Unversioned() {
|
||||||
|
var nodeVersions []*data.NodeVersion
|
||||||
|
if nodeVersions, obj.Error = n.getNodeVersionsToDelete(ctx, p.BktInfo, obj); obj.Error != nil {
|
||||||
|
inputCh <- tombstoneData{obj: n.handleNotFoundError(p.BktInfo, obj)}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, nodeVersion := range nodeVersions {
|
||||||
|
if nodeVersion.IsDeleteMarker {
|
||||||
|
obj.DeleteMarkVersion = obj.VersionID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !n.removeOldVersionUsingBatchTombstone(ctx, p.BktInfo, obj, nodeVersions, p.NetworkInfo, tokens, inputCh, false) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
n.cache.DeleteObjectName(p.BktInfo.CID, p.BktInfo.Name, obj.Name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
lastVersion, err := n.getLastNodeVersion(ctx, p.BktInfo, obj)
|
||||||
|
if err != nil {
|
||||||
|
obj.Error = err
|
||||||
|
inputCh <- tombstoneData{obj: n.handleNotFoundError(p.BktInfo, obj)}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if p.Settings.VersioningSuspended() {
|
||||||
|
obj.VersionID = data.UnversionedObjectVersionID
|
||||||
|
|
||||||
|
var nodeVersions []*data.NodeVersion
|
||||||
|
if nodeVersions, obj.Error = n.getNodeVersionsToDelete(ctx, p.BktInfo, obj); obj.Error != nil {
|
||||||
|
if !isNotFoundError(obj.Error) {
|
||||||
|
inputCh <- tombstoneData{obj: obj}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
obj.Error = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, nodeVersion := range nodeVersions {
|
||||||
|
if nodeVersion.ID == lastVersion.ID && nodeVersion.IsDeleteMarker {
|
||||||
|
nodeVersions = append(nodeVersions[:i], nodeVersions[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if lastVersion.IsDeleteMarker {
|
||||||
|
obj.DeleteMarkVersion = lastVersion.OID.EncodeToString()
|
||||||
|
}
|
||||||
|
|
||||||
|
if !n.removeOldVersionUsingBatchTombstone(ctx, p.BktInfo, obj, nodeVersions, p.NetworkInfo, tokens, inputCh, !lastVersion.IsDeleteMarker) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
n.cache.DeleteObjectName(p.BktInfo.CID, p.BktInfo.Name, obj.Name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if lastVersion.IsDeleteMarker {
|
||||||
|
obj.DeleteMarkVersion = lastVersion.OID.EncodeToString()
|
||||||
|
inputCh <- tombstoneData{obj: obj}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
inputCh <- tombstoneData{obj: n.createDeleteMarker(ctx, p, obj)}
|
||||||
|
n.cache.DeleteObjectName(p.BktInfo.CID, p.BktInfo.Name, obj.Name)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) createDeleteMarker(ctx context.Context, p *DeleteObjectParams, obj *VersionedObject) *VersionedObject {
|
||||||
|
randOID, err := getRandomOID()
|
||||||
|
if err != nil {
|
||||||
|
obj.Error = fmt.Errorf("couldn't get random oid: %w", err)
|
||||||
|
return obj
|
||||||
|
}
|
||||||
|
|
||||||
|
obj.DeleteMarkVersion = randOID.EncodeToString()
|
||||||
|
now := TimeNow(ctx)
|
||||||
|
newVersion := &data.NodeVersion{
|
||||||
|
BaseNodeVersion: data.BaseNodeVersion{
|
||||||
|
OID: randOID,
|
||||||
|
FilePath: obj.Name,
|
||||||
|
Created: &now,
|
||||||
|
Owner: &n.gateOwner,
|
||||||
|
IsDeleteMarker: true,
|
||||||
|
CreationEpoch: p.NetworkInfo.CurrentEpoch(),
|
||||||
|
},
|
||||||
|
IsUnversioned: p.Settings.VersioningSuspended(),
|
||||||
|
}
|
||||||
|
|
||||||
|
_, obj.Error = n.treeService.AddVersion(ctx, p.BktInfo, newVersion)
|
||||||
|
return obj
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) removeOldVersionUsingBatchTombstone(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject, nodeVersions []*data.NodeVersion, networkInfo netmap.NetworkInfo, tokens relations.Tokens, in chan<- tombstoneData, needDeleteMarker bool) bool {
|
||||||
|
td := tombstoneData{
|
||||||
|
needDeleteMarker: needDeleteMarker,
|
||||||
|
obj: obj,
|
||||||
|
nodes: make([]*data.NodeVersion, 0, len(nodeVersions)),
|
||||||
|
members: make([]oid.ID, 0, len(nodeVersions)),
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, nodeVersion := range nodeVersions {
|
||||||
|
if nodeVersion.IsDeleteMarker {
|
||||||
|
td.nodes = append(td.nodes, nodeVersion)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if nodeVersion.IsCombined {
|
||||||
|
err := n.removeCombinedObject(ctx, bkt, nodeVersion, networkInfo)
|
||||||
|
if err != nil && !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) {
|
||||||
|
obj.Error = err
|
||||||
|
in <- td
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
td.nodes = append(td.nodes, nodeVersion)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, nodeVersion.OID, tokens)
|
||||||
|
if err != nil {
|
||||||
|
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
|
||||||
|
zap.String("oid", nodeVersion.OID.EncodeToString()), zap.Error(err))
|
||||||
|
|
||||||
|
if client.IsErrObjectAlreadyRemoved(err) || client.IsErrObjectNotFound(err) {
|
||||||
|
td.nodes = append(td.nodes, nodeVersion)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
obj.Error = err
|
||||||
|
in <- td
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
td.nodes = append(td.nodes, nodeVersion)
|
||||||
|
td.members = append(td.members, append(oids, nodeVersion.OID)...)
|
||||||
|
}
|
||||||
|
|
||||||
|
in <- td
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Layer) CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error) {
|
func (n *Layer) CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error) {
|
||||||
|
|
|
@ -76,6 +76,93 @@ func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, me
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type tombstoneData struct {
|
||||||
|
needDeleteMarker bool
|
||||||
|
obj *VersionedObject
|
||||||
|
nodes []*data.NodeVersion
|
||||||
|
members []oid.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
type tombstoneResp struct {
|
||||||
|
objs []tombstoneRespObj
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
type tombstoneRespObj struct {
|
||||||
|
needDeleteMarker bool
|
||||||
|
obj *VersionedObject
|
||||||
|
nodes []*data.NodeVersion
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) submitPutTombstoneMultipleDelete(ctx context.Context, bkt *data.BucketInfo, expEpoch uint64, ch <-chan tombstoneData) <-chan tombstoneResp {
|
||||||
|
res := make(chan tombstoneResp, cap(ch))
|
||||||
|
maxMembers := n.features.TombstoneMembersSize()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
defer func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(res)
|
||||||
|
}()
|
||||||
|
|
||||||
|
var tr tombstoneResp
|
||||||
|
var members []oid.ID
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case td, ok := <-ch:
|
||||||
|
if !ok {
|
||||||
|
if len(members) != 0 {
|
||||||
|
n.submitPutTombstoneBatch(ctx, bkt, expEpoch, res, &wg, tr, members)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(td.members) == 0 {
|
||||||
|
res <- tombstoneResp{objs: []tombstoneRespObj{{needDeleteMarker: td.needDeleteMarker, obj: td.obj, nodes: td.nodes}}, err: td.obj.Error}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
members = append(members, td.members...)
|
||||||
|
tr.objs = append(tr.objs, tombstoneRespObj{needDeleteMarker: td.needDeleteMarker, obj: td.obj, nodes: td.nodes})
|
||||||
|
|
||||||
|
if len(members) > maxMembers {
|
||||||
|
n.submitPutTombstoneBatch(ctx, bkt, expEpoch, res, &wg, tr, members)
|
||||||
|
tr = tombstoneResp{}
|
||||||
|
members = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Layer) submitPutTombstoneBatch(ctx context.Context, bkt *data.BucketInfo, expEpoch uint64, res chan<- tombstoneResp, wg *sync.WaitGroup, tr tombstoneResp, members []oid.ID) {
|
||||||
|
tomb := object.NewTombstone()
|
||||||
|
tomb.SetExpirationEpoch(expEpoch)
|
||||||
|
tomb.SetMembers(members)
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
err := n.workerPool.Submit(func() {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
if tr.err = n.putTombstoneObject(ctx, tomb, bkt); tr.err != nil {
|
||||||
|
n.reqLogger(ctx).Warn(logs.FailedToPutTombstoneObject, zap.String("cid", bkt.CID.EncodeToString()), zap.Error(tr.err))
|
||||||
|
}
|
||||||
|
res <- tr
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
wg.Done()
|
||||||
|
n.reqLogger(ctx).Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
|
||||||
|
tr.err = fmt.Errorf("submit task to pool: %w", err)
|
||||||
|
res <- tr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone, bktInfo *data.BucketInfo) error {
|
func (n *Layer) putTombstoneObject(ctx context.Context, tomb *object.Tombstone, bktInfo *data.BucketInfo) error {
|
||||||
payload, err := tomb.Marshal()
|
payload, err := tomb.Marshal()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -4,6 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"math/rand"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
@ -248,6 +249,7 @@ func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo
|
||||||
versions, ok := cnrVersionsMap[newVersion.FilePath]
|
versions, ok := cnrVersionsMap[newVersion.FilePath]
|
||||||
if !ok {
|
if !ok {
|
||||||
cnrVersionsMap[newVersion.FilePath] = []*data.NodeVersion{newVersion}
|
cnrVersionsMap[newVersion.FilePath] = []*data.NodeVersion{newVersion}
|
||||||
|
newVersion.ID = rand.Uint64()
|
||||||
return newVersion.ID, nil
|
return newVersion.ID, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -258,6 +260,8 @@ func (t *TreeServiceMock) AddVersion(_ context.Context, bktInfo *data.BucketInfo
|
||||||
if len(versions) != 0 {
|
if len(versions) != 0 {
|
||||||
newVersion.ID = versions[len(versions)-1].ID + 1
|
newVersion.ID = versions[len(versions)-1].ID + 1
|
||||||
newVersion.Timestamp = versions[len(versions)-1].Timestamp + 1
|
newVersion.Timestamp = versions[len(versions)-1].Timestamp + 1
|
||||||
|
} else {
|
||||||
|
newVersion.ID = rand.Uint64()
|
||||||
}
|
}
|
||||||
|
|
||||||
result := versions
|
result := versions
|
||||||
|
|
|
@ -17,14 +17,19 @@ import (
|
||||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
|
"github.com/panjf2000/ants/v2"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"go.uber.org/zap/zaptest"
|
"go.uber.org/zap/zaptest"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (tc *testContext) putObject(content []byte) *data.ObjectInfo {
|
func (tc *testContext) putObject(content []byte) *data.ObjectInfo {
|
||||||
|
return tc.putObjectNamed(tc.bktInfo, tc.obj, content)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (tc *testContext) putObjectNamed(bktInfo *data.BucketInfo, objName string, content []byte) *data.ObjectInfo {
|
||||||
extObjInfo, err := tc.layer.PutObject(tc.ctx, &PutObjectParams{
|
extObjInfo, err := tc.layer.PutObject(tc.ctx, &PutObjectParams{
|
||||||
BktInfo: tc.bktInfo,
|
BktInfo: bktInfo,
|
||||||
Object: tc.obj,
|
Object: objName,
|
||||||
Size: ptr(uint64(len(content))),
|
Size: ptr(uint64(len(content))),
|
||||||
Reader: bytes.NewReader(content),
|
Reader: bytes.NewReader(content),
|
||||||
Header: make(map[string]string),
|
Header: make(map[string]string),
|
||||||
|
@ -170,12 +175,16 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext {
|
||||||
var owner user.ID
|
var owner user.ID
|
||||||
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
|
user.IDFromKey(&owner, key.PrivateKey.PublicKey)
|
||||||
|
|
||||||
|
antPool, err := ants.NewPool(10)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
layerCfg := &Config{
|
layerCfg := &Config{
|
||||||
Cache: NewCache(config),
|
Cache: NewCache(config),
|
||||||
AnonKey: AnonymousKey{Key: key},
|
AnonKey: AnonymousKey{Key: key},
|
||||||
TreeService: NewTreeService(),
|
TreeService: NewTreeService(),
|
||||||
Features: &FeatureSettingsMock{},
|
Features: &FeatureSettingsMock{},
|
||||||
GateOwner: owner,
|
GateOwner: owner,
|
||||||
|
WorkerPool: antPool,
|
||||||
}
|
}
|
||||||
|
|
||||||
return &testContext{
|
return &testContext{
|
||||||
|
|
Loading…
Add table
Reference in a new issue