[#895] metabase: Do not delete GC mark for virtual objects
All checks were successful
DCO action / DCO (pull_request) Successful in 4m45s
Vulncheck / Vulncheck (pull_request) Successful in 6m54s
Build / Build Components (1.21) (pull_request) Successful in 8m13s
Build / Build Components (1.20) (pull_request) Successful in 8m18s
Tests and linters / Lint (pull_request) Successful in 9m50s
Tests and linters / Staticcheck (pull_request) Successful in 9m59s
Tests and linters / Tests (1.20) (pull_request) Successful in 10m39s
Tests and linters / Tests (1.21) (pull_request) Successful in 11m17s
Tests and linters / Tests with -race (pull_request) Successful in 4m26s
All checks were successful
DCO action / DCO (pull_request) Successful in 4m45s
Vulncheck / Vulncheck (pull_request) Successful in 6m54s
Build / Build Components (1.21) (pull_request) Successful in 8m13s
Build / Build Components (1.20) (pull_request) Successful in 8m18s
Tests and linters / Lint (pull_request) Successful in 9m50s
Tests and linters / Staticcheck (pull_request) Successful in 9m59s
Tests and linters / Tests (1.20) (pull_request) Successful in 10m39s
Tests and linters / Tests (1.21) (pull_request) Successful in 11m17s
Tests and linters / Tests with -race (pull_request) Successful in 4m26s
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
f20b336b47
commit
50e648e3b1
4 changed files with 124 additions and 10 deletions
|
@ -6,6 +6,7 @@ import (
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/internal/testutil"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/shard"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger/test"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||||
|
@ -85,6 +86,97 @@ func TestDeleteBigObject(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDeleteBigObjectWithoutGC(t *testing.T) {
|
||||||
|
t.Parallel()
|
||||||
|
|
||||||
|
cnr := cidtest.ID()
|
||||||
|
parentID := oidtest.ID()
|
||||||
|
splitID := objectSDK.NewSplitID()
|
||||||
|
|
||||||
|
parent := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
parent.SetID(parentID)
|
||||||
|
parent.SetPayload(nil)
|
||||||
|
|
||||||
|
const childCount = 3
|
||||||
|
children := make([]*objectSDK.Object, childCount)
|
||||||
|
childIDs := make([]oid.ID, childCount)
|
||||||
|
for i := range children {
|
||||||
|
children[i] = testutil.GenerateObjectWithCID(cnr)
|
||||||
|
if i != 0 {
|
||||||
|
children[i].SetPreviousID(childIDs[i-1])
|
||||||
|
}
|
||||||
|
if i == len(children)-1 {
|
||||||
|
children[i].SetParent(parent)
|
||||||
|
}
|
||||||
|
children[i].SetSplitID(splitID)
|
||||||
|
children[i].SetPayload([]byte{byte(i), byte(i + 1), byte(i + 2)})
|
||||||
|
childIDs[i], _ = children[i].ID()
|
||||||
|
}
|
||||||
|
|
||||||
|
link := testutil.GenerateObjectWithCID(cnr)
|
||||||
|
link.SetParent(parent)
|
||||||
|
link.SetParentID(parentID)
|
||||||
|
link.SetSplitID(splitID)
|
||||||
|
link.SetChildren(childIDs...)
|
||||||
|
|
||||||
|
s1 := testNewShard(t, 1, shard.WithDisabledGC())
|
||||||
|
|
||||||
|
e := testNewEngine(t).setInitializedShards(t, s1).engine
|
||||||
|
e.log = test.NewLogger(t)
|
||||||
|
defer e.Close(context.Background())
|
||||||
|
|
||||||
|
for i := range children {
|
||||||
|
require.NoError(t, Put(context.Background(), e, children[i]))
|
||||||
|
}
|
||||||
|
require.NoError(t, Put(context.Background(), e, link))
|
||||||
|
|
||||||
|
addrParent := object.AddressOf(parent)
|
||||||
|
checkGetError[*objectSDK.SplitInfoError](t, e, addrParent, true)
|
||||||
|
|
||||||
|
addrLink := object.AddressOf(link)
|
||||||
|
checkGetError[error](t, e, addrLink, false)
|
||||||
|
|
||||||
|
for i := range children {
|
||||||
|
checkGetError[error](t, e, object.AddressOf(children[i]), false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete logical
|
||||||
|
var deletePrm DeletePrm
|
||||||
|
deletePrm.WithForceRemoval()
|
||||||
|
deletePrm.WithAddress(addrParent)
|
||||||
|
|
||||||
|
_, err := e.Delete(context.Background(), deletePrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true)
|
||||||
|
checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true)
|
||||||
|
for i := range children {
|
||||||
|
checkGetError[*apistatus.ObjectNotFound](t, e, object.AddressOf(children[i]), true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete physical
|
||||||
|
var delPrm shard.DeletePrm
|
||||||
|
delPrm.SetAddresses(addrParent)
|
||||||
|
_, err = s1.Delete(context.Background(), delPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
delPrm.SetAddresses(addrLink)
|
||||||
|
_, err = s1.Delete(context.Background(), delPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
for i := range children {
|
||||||
|
delPrm.SetAddresses(object.AddressOf(children[i]))
|
||||||
|
_, err = s1.Delete(context.Background(), delPrm)
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
checkGetError[*apistatus.ObjectNotFound](t, e, addrParent, true)
|
||||||
|
checkGetError[*apistatus.ObjectNotFound](t, e, addrLink, true)
|
||||||
|
for i := range children {
|
||||||
|
checkGetError[*apistatus.ObjectNotFound](t, e, object.AddressOf(children[i]), true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func checkGetError[E error](t *testing.T, e *StorageEngine, addr oid.Address, shouldFail bool) {
|
func checkGetError[E error](t *testing.T, e *StorageEngine, addr oid.Address, shouldFail bool) {
|
||||||
var getPrm GetPrm
|
var getPrm GetPrm
|
||||||
getPrm.WithAddress(addr)
|
getPrm.WithAddress(addr)
|
||||||
|
|
|
@ -181,12 +181,12 @@ func newTestStorages(root string, smallSize uint64) ([]blobstor.SubStorage, *tes
|
||||||
}, smallFileStorage, largeFileStorage
|
}, smallFileStorage, largeFileStorage
|
||||||
}
|
}
|
||||||
|
|
||||||
func testNewShard(t testing.TB, id int) *shard.Shard {
|
func testNewShard(t testing.TB, id int, opts ...shard.Option) *shard.Shard {
|
||||||
sid, err := generateShardID()
|
sid, err := generateShardID()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
shardOpts := append([]shard.Option{shard.WithID(sid)}, testDefaultShardOptions(t, id)...)
|
shardOpts := append([]shard.Option{shard.WithID(sid)}, testDefaultShardOptions(t, id)...)
|
||||||
s := shard.New(shardOpts...)
|
s := shard.New(append(shardOpts, opts...)...)
|
||||||
|
|
||||||
require.NoError(t, s.Open(context.Background()))
|
require.NoError(t, s.Open(context.Background()))
|
||||||
require.NoError(t, s.Init(context.Background()))
|
require.NoError(t, s.Init(context.Background()))
|
||||||
|
|
|
@ -251,26 +251,27 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
||||||
|
|
||||||
removeAvailableObject := inGraveyardWithKey(addrKey, graveyardBKT, garbageBKT) == 0
|
removeAvailableObject := inGraveyardWithKey(addrKey, graveyardBKT, garbageBKT) == 0
|
||||||
|
|
||||||
// remove record from the garbage bucket
|
|
||||||
if garbageBKT != nil {
|
|
||||||
err := garbageBKT.Delete(addrKey)
|
|
||||||
if err != nil {
|
|
||||||
return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// unmarshal object, work only with physically stored (raw == true) objects
|
// unmarshal object, work only with physically stored (raw == true) objects
|
||||||
obj, err := db.get(tx, addr, key, false, true, currEpoch)
|
obj, err := db.get(tx, addr, key, false, true, currEpoch)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var siErr *objectSDK.SplitInfoError
|
var siErr *objectSDK.SplitInfoError
|
||||||
|
|
||||||
if client.IsErrObjectNotFound(err) || errors.As(err, &siErr) {
|
if client.IsErrObjectNotFound(err) || errors.As(err, &siErr) {
|
||||||
|
// if object is virtual (parent) then do nothing, it will be deleted with last child
|
||||||
return deleteSingleResult{}, nil
|
return deleteSingleResult{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return deleteSingleResult{}, err
|
return deleteSingleResult{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// remove record from the garbage bucket
|
||||||
|
if garbageBKT != nil {
|
||||||
|
err := garbageBKT.Delete(addrKey)
|
||||||
|
if err != nil {
|
||||||
|
return deleteSingleResult{}, fmt.Errorf("could not remove from garbage bucket: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// if object is an only link to a parent, then remove parent
|
// if object is an only link to a parent, then remove parent
|
||||||
if parent := obj.Parent(); parent != nil {
|
if parent := obj.Parent(); parent != nil {
|
||||||
parAddr := object.AddressOf(parent)
|
parAddr := object.AddressOf(parent)
|
||||||
|
@ -327,6 +328,19 @@ func (db *DB) deleteObject(
|
||||||
return fmt.Errorf("can't remove fake bucket tree indexes: %w", err)
|
return fmt.Errorf("can't remove fake bucket tree indexes: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if isParent {
|
||||||
|
// remove record from the garbage bucket, because regular object deletion does nothing for virtual object
|
||||||
|
garbageBKT := tx.Bucket(garbageBucketName)
|
||||||
|
if garbageBKT != nil {
|
||||||
|
key := make([]byte, addressKeySize)
|
||||||
|
addrKey := addressKey(object.AddressOf(obj), key)
|
||||||
|
err := garbageBKT.Delete(addrKey)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not remove from garbage bucket: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -371,6 +371,14 @@ func WithRebuildWorkerLimiter(l RebuildWorkerLimiter) Option {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithDisabledGC disables GC.
|
||||||
|
// For testing purposes only.
|
||||||
|
func WithDisabledGC() Option {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.gcCfg.testHookRemover = func(ctx context.Context) gcRunResult { return gcRunResult{} }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// WithZeroSizeCallback returns option to set zero-size containers callback.
|
// WithZeroSizeCallback returns option to set zero-size containers callback.
|
||||||
func WithZeroSizeCallback(cb EmptyContainersCallback) Option {
|
func WithZeroSizeCallback(cb EmptyContainersCallback) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
|
|
Loading…
Reference in a new issue