forked from TrueCloudLab/frostfs-node
[#1318] metabase: Separate buckets with TS and GC marks
It allows storing information about object in both ways at the same time: 1. Metabase should know if an object is covered by a tombstone (that is not expired yet); 2. It should be possible to physically delete objects covered by a tombstone immediately (mark with GC) but keep tombstone knowledge. Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
b74fb2b932
commit
8107c8d1a9
9 changed files with 163 additions and 84 deletions
|
@ -49,6 +49,7 @@ func (db *DB) init(reset bool) error {
|
||||||
string(containerVolumeBucketName): {},
|
string(containerVolumeBucketName): {},
|
||||||
string(graveyardBucketName): {},
|
string(graveyardBucketName): {},
|
||||||
string(toMoveItBucketName): {},
|
string(toMoveItBucketName): {},
|
||||||
|
string(garbageBucketName): {},
|
||||||
}
|
}
|
||||||
|
|
||||||
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
return db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
|
|
|
@ -86,12 +86,12 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []*addressSDK.Address) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) delete(tx *bbolt.Tx, addr *addressSDK.Address, refCounter referenceCounter) error {
|
func (db *DB) delete(tx *bbolt.Tx, addr *addressSDK.Address, refCounter referenceCounter) error {
|
||||||
// remove record from graveyard
|
// remove record from the garbage bucket
|
||||||
graveyard := tx.Bucket(graveyardBucketName)
|
garbageBKT := tx.Bucket(garbageBucketName)
|
||||||
if graveyard != nil {
|
if garbageBKT != nil {
|
||||||
err := graveyard.Delete(addressKey(addr))
|
err := garbageBKT.Delete(addressKey(addr))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not remove from graveyard: %w", err)
|
return fmt.Errorf("could not remove from garbage bucket: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
|
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
@ -45,9 +46,6 @@ func TestDB_Delete(t *testing.T) {
|
||||||
err = meta.Inhume(db, object.AddressOf(child), object.AddressOf(ts))
|
err = meta.Inhume(db, object.AddressOf(child), object.AddressOf(ts))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
err = meta.Inhume(db, object.AddressOf(child), object.AddressOf(ts))
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// delete object
|
// delete object
|
||||||
err = meta.Delete(db, object.AddressOf(child))
|
err = meta.Delete(db, object.AddressOf(child))
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
@ -57,13 +55,14 @@ func TestDB_Delete(t *testing.T) {
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.Len(t, l, 0)
|
require.Len(t, l, 0)
|
||||||
|
|
||||||
// check if they removed from graveyard
|
// check if they marked as already removed
|
||||||
|
|
||||||
ok, err := meta.Exists(db, object.AddressOf(child))
|
ok, err := meta.Exists(db, object.AddressOf(child))
|
||||||
require.NoError(t, err)
|
require.Error(t, apistatus.ObjectAlreadyRemoved{})
|
||||||
require.False(t, ok)
|
require.False(t, ok)
|
||||||
|
|
||||||
ok, err = meta.Exists(db, object.AddressOf(parent))
|
ok, err = meta.Exists(db, object.AddressOf(parent))
|
||||||
require.NoError(t, err)
|
require.Error(t, apistatus.ObjectAlreadyRemoved{})
|
||||||
require.False(t, ok)
|
require.False(t, ok)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
@ -101,24 +100,37 @@ func (db *DB) exists(tx *bbolt.Tx, addr *addressSDK.Address) (exists bool, err e
|
||||||
}
|
}
|
||||||
|
|
||||||
// inGraveyard returns:
|
// inGraveyard returns:
|
||||||
// * 0 if object is not in graveyard;
|
// * 0 if object is not marked for deletion;
|
||||||
// * 1 if object is in graveyard with GC mark;
|
// * 1 if object with GC mark;
|
||||||
// * 2 if object is in graveyard with tombstone.
|
// * 2 if object is covered with tombstone.
|
||||||
func inGraveyard(tx *bbolt.Tx, addr *addressSDK.Address) uint8 {
|
func inGraveyard(tx *bbolt.Tx, addr *addressSDK.Address) uint8 {
|
||||||
graveyard := tx.Bucket(graveyardBucketName)
|
graveyard := tx.Bucket(graveyardBucketName)
|
||||||
if graveyard == nil {
|
if graveyard == nil {
|
||||||
|
// incorrect metabase state, does not make
|
||||||
|
// sense to check garbage bucket
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
val := graveyard.Get(addressKey(addr))
|
val := graveyard.Get(addressKey(addr))
|
||||||
if val == nil {
|
if val == nil {
|
||||||
|
garbageBCK := tx.Bucket(garbageBucketName)
|
||||||
|
if garbageBCK == nil {
|
||||||
|
// incorrect node state
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
if bytes.Equal(val, []byte(inhumeGCMarkValue)) {
|
val = garbageBCK.Get(addressKey(addr))
|
||||||
|
if val != nil {
|
||||||
|
// object has been marked with GC
|
||||||
return 1
|
return 1
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// neither in the graveyard
|
||||||
|
// nor was marked with GC mark
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// object in the graveyard
|
||||||
return 2
|
return 2
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
package meta
|
package meta
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
@ -9,53 +8,74 @@ import (
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Grave represents descriptor of DB's graveyard item.
|
// DeletedObject represents descriptor of the object that was
|
||||||
type Grave struct {
|
// marked to be deleted.
|
||||||
gcMark bool
|
type DeletedObject struct {
|
||||||
|
|
||||||
addr *addressSDK.Address
|
addr *addressSDK.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithGCMark returns true if grave marked for GC to be removed.
|
|
||||||
func (g *Grave) WithGCMark() bool {
|
|
||||||
return g.gcMark
|
|
||||||
}
|
|
||||||
|
|
||||||
// Address returns buried object address.
|
// Address returns buried object address.
|
||||||
func (g *Grave) Address() *addressSDK.Address {
|
func (g *DeletedObject) Address() *addressSDK.Address {
|
||||||
return g.addr
|
return g.addr
|
||||||
}
|
}
|
||||||
|
|
||||||
// GraveHandler is a Grave handling function.
|
// Handler is a DeletedObject handling function.
|
||||||
type GraveHandler func(*Grave) error
|
type Handler func(*DeletedObject) error
|
||||||
|
|
||||||
|
// IterateOverGarbage iterates over all objects
|
||||||
|
// marked with GC mark.
|
||||||
|
//
|
||||||
|
// If h returns ErrInterruptIterator, nil returns immediately.
|
||||||
|
// Returns other errors of h directly.
|
||||||
|
func (db *DB) IterateOverGarbage(h Handler) error {
|
||||||
|
return db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
|
return db.iterateDeletedObj(tx, withGC, h)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// IterateOverGraveyard iterates over all graves in DB.
|
// IterateOverGraveyard iterates over all graves in DB.
|
||||||
//
|
//
|
||||||
// If h returns ErrInterruptIterator, nil returns immediately.
|
// If h returns ErrInterruptIterator, nil returns immediately.
|
||||||
// Returns other errors of h directly.
|
// Returns other errors of h directly.
|
||||||
func (db *DB) IterateOverGraveyard(h GraveHandler) error {
|
func (db *DB) IterateOverGraveyard(h Handler) error {
|
||||||
return db.boltDB.View(func(tx *bbolt.Tx) error {
|
return db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
return db.iterateOverGraveyard(tx, h)
|
return db.iterateDeletedObj(tx, grave, h)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) iterateOverGraveyard(tx *bbolt.Tx, h GraveHandler) error {
|
type deletedType uint8
|
||||||
// get graveyard bucket
|
|
||||||
bktGraveyard := tx.Bucket(graveyardBucketName)
|
const (
|
||||||
if bktGraveyard == nil {
|
_ deletedType = iota
|
||||||
|
grave
|
||||||
|
withGC
|
||||||
|
)
|
||||||
|
|
||||||
|
func (db *DB) iterateDeletedObj(tx *bbolt.Tx, t deletedType, h Handler) error {
|
||||||
|
var bkt *bbolt.Bucket
|
||||||
|
switch t {
|
||||||
|
case grave:
|
||||||
|
bkt = tx.Bucket(graveyardBucketName)
|
||||||
|
case withGC:
|
||||||
|
bkt = tx.Bucket(garbageBucketName)
|
||||||
|
default:
|
||||||
|
panic(fmt.Sprintf("metabase: unknown iteration object type: %d", t))
|
||||||
|
}
|
||||||
|
|
||||||
|
if bkt == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// iterate over all graves
|
// iterate over all deleted objects
|
||||||
err := bktGraveyard.ForEach(func(k, v []byte) error {
|
err := bkt.ForEach(func(k, v []byte) error {
|
||||||
// parse Grave
|
// parse deleted object
|
||||||
g, err := graveFromKV(k, v)
|
delObj, err := deletedObjectFromKV(k, v)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not parse Grave: %w", err)
|
return fmt.Errorf("could not parse Grave: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// handler Grave
|
// handler object
|
||||||
return h(g)
|
return h(delObj)
|
||||||
})
|
})
|
||||||
|
|
||||||
if errors.Is(err, ErrInterruptIterator) {
|
if errors.Is(err, ErrInterruptIterator) {
|
||||||
|
@ -65,14 +85,13 @@ func (db *DB) iterateOverGraveyard(tx *bbolt.Tx, h GraveHandler) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func graveFromKV(k, v []byte) (*Grave, error) {
|
func deletedObjectFromKV(k, _ []byte) (*DeletedObject, error) {
|
||||||
addr, err := addressFromKey(k)
|
addr, err := addressFromKey(k)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not parse address: %w", err)
|
return nil, fmt.Errorf("could not parse address: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Grave{
|
return &DeletedObject{
|
||||||
gcMark: bytes.Equal(v, []byte(inhumeGCMarkValue)),
|
|
||||||
addr: addr,
|
addr: addr,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,12 +9,14 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestDB_IterateOverGraveyard(t *testing.T) {
|
func TestDB_IterateDeletedObjects(t *testing.T) {
|
||||||
db := newDB(t)
|
db := newDB(t)
|
||||||
|
|
||||||
// generate and put 2 objects
|
// generate and put 2 objects
|
||||||
obj1 := generateObject(t)
|
obj1 := generateObject(t)
|
||||||
obj2 := generateObject(t)
|
obj2 := generateObject(t)
|
||||||
|
obj3 := generateObject(t)
|
||||||
|
obj4 := generateObject(t)
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
@ -24,20 +26,26 @@ func TestDB_IterateOverGraveyard(t *testing.T) {
|
||||||
err = putBig(db, obj2)
|
err = putBig(db, obj2)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = putBig(db, obj3)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = putBig(db, obj4)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
inhumePrm := new(meta.InhumePrm)
|
inhumePrm := new(meta.InhumePrm)
|
||||||
|
|
||||||
// inhume with tombstone
|
// inhume with tombstone
|
||||||
addrTombstone := generateAddress()
|
addrTombstone := generateAddress()
|
||||||
|
|
||||||
_, err = db.Inhume(inhumePrm.
|
_, err = db.Inhume(inhumePrm.
|
||||||
WithAddresses(object.AddressOf(obj1)).
|
WithAddresses(object.AddressOf(obj1), object.AddressOf(obj2)).
|
||||||
WithTombstoneAddress(addrTombstone),
|
WithTombstoneAddress(addrTombstone),
|
||||||
)
|
)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// inhume with GC mark
|
// inhume with GC mark
|
||||||
_, err = db.Inhume(inhumePrm.
|
_, err = db.Inhume(inhumePrm.
|
||||||
WithAddresses(object.AddressOf(obj2)).
|
WithAddresses(object.AddressOf(obj3), object.AddressOf(obj4)).
|
||||||
WithGCMark(),
|
WithGCMark(),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -46,13 +54,15 @@ func TestDB_IterateOverGraveyard(t *testing.T) {
|
||||||
buriedTS, buriedGC []*addressSDK.Address
|
buriedTS, buriedGC []*addressSDK.Address
|
||||||
)
|
)
|
||||||
|
|
||||||
err = db.IterateOverGraveyard(func(g *meta.Grave) error {
|
err = db.IterateOverGraveyard(func(deletedObject *meta.DeletedObject) error {
|
||||||
if g.WithGCMark() {
|
buriedTS = append(buriedTS, deletedObject.Address())
|
||||||
buriedGC = append(buriedGC, g.Address())
|
counterAll++
|
||||||
} else {
|
|
||||||
buriedTS = append(buriedTS, g.Address())
|
|
||||||
}
|
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
err = db.IterateOverGarbage(func(deletedObject *meta.DeletedObject) error {
|
||||||
|
buriedGC = append(buriedGC, deletedObject.Address())
|
||||||
counterAll++
|
counterAll++
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -60,7 +70,30 @@ func TestDB_IterateOverGraveyard(t *testing.T) {
|
||||||
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
require.Equal(t, 2, counterAll)
|
require.Equal(t, 4, counterAll)
|
||||||
require.Equal(t, []*addressSDK.Address{object.AddressOf(obj1)}, buriedTS)
|
require.True(t, equalAddresses([]*addressSDK.Address{object.AddressOf(obj1), object.AddressOf(obj2)}, buriedTS))
|
||||||
require.Equal(t, []*addressSDK.Address{object.AddressOf(obj2)}, buriedGC)
|
require.True(t, equalAddresses([]*addressSDK.Address{object.AddressOf(obj3), object.AddressOf(obj4)}, buriedGC))
|
||||||
|
}
|
||||||
|
|
||||||
|
func equalAddresses(aa1 []*addressSDK.Address, aa2 []*addressSDK.Address) bool {
|
||||||
|
if len(aa1) != len(aa2) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, a1 := range aa1 {
|
||||||
|
found := false
|
||||||
|
|
||||||
|
for _, a2 := range aa2 {
|
||||||
|
if a1.String() == a2.String() {
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !found {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,8 +65,6 @@ func Inhume(db *DB, target, tomb *addressSDK.Address) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
const inhumeGCMarkValue = "GCMARK"
|
|
||||||
|
|
||||||
var errBreakBucketForEach = errors.New("bucket ForEach break")
|
var errBreakBucketForEach = errors.New("bucket ForEach break")
|
||||||
|
|
||||||
// Inhume marks objects as removed but not removes it from metabase.
|
// Inhume marks objects as removed but not removes it from metabase.
|
||||||
|
@ -75,28 +73,40 @@ var errBreakBucketForEach = errors.New("bucket ForEach break")
|
||||||
// if at least one object is locked.
|
// if at least one object is locked.
|
||||||
func (db *DB) Inhume(prm *InhumePrm) (res *InhumeRes, err error) {
|
func (db *DB) Inhume(prm *InhumePrm) (res *InhumeRes, err error) {
|
||||||
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||||
graveyard, err := tx.CreateBucketIfNotExists(graveyardBucketName)
|
var (
|
||||||
if err != nil {
|
// target bucket of the operation, one of the:
|
||||||
return err
|
// 1. Graveyard if Inhume was called with a Tombstone
|
||||||
}
|
// 2. Garbage if Inhume was called with a GC mark
|
||||||
|
bkt *bbolt.Bucket
|
||||||
|
// value that will be put in the bucket, one of the:
|
||||||
|
// 1. tombstone address if Inhume was called with
|
||||||
|
// a Tombstone
|
||||||
|
// 2. zeroValue if Inhume was called with a GC mark
|
||||||
|
value []byte
|
||||||
|
)
|
||||||
|
|
||||||
var tombKey []byte
|
|
||||||
if prm.tomb != nil {
|
if prm.tomb != nil {
|
||||||
tombKey = addressKey(prm.tomb)
|
bkt = tx.Bucket(graveyardBucketName)
|
||||||
|
tombKey := addressKey(prm.tomb)
|
||||||
|
|
||||||
// it is forbidden to have a tomb-on-tomb in NeoFS,
|
// it is forbidden to have a tomb-on-tomb in NeoFS,
|
||||||
// so graveyard keys must not be addresses of tombstones
|
// so graveyard keys must not be addresses of tombstones
|
||||||
|
data := bkt.Get(tombKey)
|
||||||
// tombstones can be marked for GC in graveyard, so exclude this case
|
if data != nil {
|
||||||
data := graveyard.Get(tombKey)
|
err := bkt.Delete(tombKey)
|
||||||
if data != nil && !bytes.Equal(data, []byte(inhumeGCMarkValue)) {
|
|
||||||
err := graveyard.Delete(tombKey)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not remove grave with tombstone key: %w", err)
|
return fmt.Errorf("could not remove grave with tombstone key: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
value = tombKey
|
||||||
} else {
|
} else {
|
||||||
tombKey = []byte(inhumeGCMarkValue)
|
bkt, err = tx.CreateBucketIfNotExists(garbageBucketName)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
value = zeroValue
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range prm.target {
|
for i := range prm.target {
|
||||||
|
@ -128,7 +138,7 @@ func (db *DB) Inhume(prm *InhumePrm) (res *InhumeRes, err error) {
|
||||||
|
|
||||||
// iterate over graveyard and check if target address
|
// iterate over graveyard and check if target address
|
||||||
// is the address of tombstone in graveyard.
|
// is the address of tombstone in graveyard.
|
||||||
err = graveyard.ForEach(func(k, v []byte) error {
|
err = bkt.ForEach(func(k, v []byte) error {
|
||||||
// check if graveyard has record with key corresponding
|
// check if graveyard has record with key corresponding
|
||||||
// to tombstone address (at least one)
|
// to tombstone address (at least one)
|
||||||
targetIsTomb = bytes.Equal(v, targetKey)
|
targetIsTomb = bytes.Equal(v, targetKey)
|
||||||
|
@ -158,7 +168,7 @@ func (db *DB) Inhume(prm *InhumePrm) (res *InhumeRes, err error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// consider checking if target is already in graveyard?
|
// consider checking if target is already in graveyard?
|
||||||
err = graveyard.Put(targetKey, tombKey)
|
err = bkt.Put(targetKey, value)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,14 @@ bytes. Check it later.
|
||||||
const invalidBase58String = "_"
|
const invalidBase58String = "_"
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
// graveyardBucketName stores rows with the objects that have been
|
||||||
|
// covered with Tombstone objects. That objects should not be returned
|
||||||
|
// from the node and should not be accepted by the node from other
|
||||||
|
// nodes.
|
||||||
graveyardBucketName = []byte(invalidBase58String + "Graveyard")
|
graveyardBucketName = []byte(invalidBase58String + "Graveyard")
|
||||||
|
// garbageBucketName stores rows with the objects that should be physically
|
||||||
|
// deleted by the node (Garbage Collector routine).
|
||||||
|
garbageBucketName = []byte(invalidBase58String + "Garbage")
|
||||||
toMoveItBucketName = []byte(invalidBase58String + "ToMoveIt")
|
toMoveItBucketName = []byte(invalidBase58String + "ToMoveIt")
|
||||||
containerVolumeBucketName = []byte(invalidBase58String + "ContainerSize")
|
containerVolumeBucketName = []byte(invalidBase58String + "ContainerSize")
|
||||||
|
|
||||||
|
|
|
@ -166,7 +166,7 @@ func (gc *gc) stop() {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// iterates over metabase graveyard and deletes objects
|
// iterates over metabase and deletes objects
|
||||||
// with GC-marked graves.
|
// with GC-marked graves.
|
||||||
// Does nothing if shard is in "read-only" mode.
|
// Does nothing if shard is in "read-only" mode.
|
||||||
func (s *Shard) removeGarbage() {
|
func (s *Shard) removeGarbage() {
|
||||||
|
@ -176,12 +176,10 @@ func (s *Shard) removeGarbage() {
|
||||||
|
|
||||||
buf := make([]*addressSDK.Address, 0, s.rmBatchSize)
|
buf := make([]*addressSDK.Address, 0, s.rmBatchSize)
|
||||||
|
|
||||||
// iterate over metabase graveyard and accumulate
|
// iterate over metabase's objects with GC mark
|
||||||
// objects with GC mark (no more the s.rmBatchSize objects)
|
// (no more than s.rmBatchSize objects)
|
||||||
err := s.metaBase.IterateOverGraveyard(func(g *meta.Grave) error {
|
err := s.metaBase.IterateOverGarbage(func(g *meta.DeletedObject) error {
|
||||||
if g.WithGCMark() {
|
|
||||||
buf = append(buf, g.Address())
|
buf = append(buf, g.Address())
|
||||||
}
|
|
||||||
|
|
||||||
if len(buf) == s.rmBatchSize {
|
if len(buf) == s.rmBatchSize {
|
||||||
return meta.ErrInterruptIterator
|
return meta.ErrInterruptIterator
|
||||||
|
|
Loading…
Reference in a new issue