[#1318] metabase: Add iteration offset

Add offset element to the iterations over deleted objects (both the
Graveyard and the Garbage buckets).

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
Pavel Karpy 2021-04-15 13:58:10 +03:00 committed by LeL
parent fe8076e60a
commit daab30c391
3 changed files with 468 additions and 49 deletions

View file

@ -1,97 +1,231 @@
package meta package meta
import ( import (
"errors" "bytes"
"fmt" "fmt"
addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address"
"github.com/pkg/errors"
"go.etcd.io/bbolt" "go.etcd.io/bbolt"
) )
// DeletedObject represents descriptor of the object that was // GarbageObject represents descriptor of the
// marked to be deleted. // object that has been marked with GC.
type DeletedObject struct { type GarbageObject struct {
addr *addressSDK.Address addr *addressSDK.Address
} }
// Address returns buried object address. // Address returns garbage object address.
func (g *DeletedObject) Address() *addressSDK.Address { func (g GarbageObject) Address() *addressSDK.Address {
return g.addr return g.addr
} }
// Handler is a DeletedObject handling function. // GarbageHandler is a GarbageObject handling function.
type Handler func(*DeletedObject) error type GarbageHandler func(GarbageObject) error
// GarbageIterationPrm groups parameters of the garbage
// iteration process.
type GarbageIterationPrm struct {
h GarbageHandler
offset *addressSDK.Address
}
// SetHandler sets a handler that will be called on every
// GarbageObject.
func (g *GarbageIterationPrm) SetHandler(h GarbageHandler) *GarbageIterationPrm {
g.h = h
return g
}
// SetOffset sets an offset of the iteration operation.
// The handler will be applied to the next after the
// specified offset if any are left.
//
// Note: if offset is not found in db, iteration starts
// from the element that WOULD BE the following after the
// offset if offset was presented. That means that it is
// safe to delete offset element and pass if to the
// iteration once again: iteration would start from the
// next element.
//
// Nil offset means start an integration from the beginning.
func (g *GarbageIterationPrm) SetOffset(offset *addressSDK.Address) *GarbageIterationPrm {
g.offset = offset
return g
}
// IterateOverGarbage iterates over all objects // IterateOverGarbage iterates over all objects
// marked with GC mark. // marked with GC mark.
// //
// If h returns ErrInterruptIterator, nil returns immediately. // If h returns ErrInterruptIterator, nil returns immediately.
// Returns other errors of h directly. // Returns other errors of h directly.
func (db *DB) IterateOverGarbage(h Handler) error { func (db *DB) IterateOverGarbage(p *GarbageIterationPrm) error {
return db.boltDB.View(func(tx *bbolt.Tx) error { return db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateDeletedObj(tx, withGC, h) return db.iterateDeletedObj(tx, gcHandler{p.h}, p.offset)
}) })
} }
// TombstonedObject represents descriptor of the
// object that has been covered with tombstone.
type TombstonedObject struct {
addr *addressSDK.Address
tomb *addressSDK.Address
}
// Address returns tombstoned object address.
func (g TombstonedObject) Address() *addressSDK.Address {
return g.addr
}
// Tombstone returns address of a tombstone that
// covers object.
func (g TombstonedObject) Tombstone() *addressSDK.Address {
return g.tomb
}
// TombstonedHandler is a TombstonedObject handling function.
type TombstonedHandler func(object TombstonedObject) error
// GraveyardIterationPrm groups parameters of the graveyard
// iteration process.
type GraveyardIterationPrm struct {
h TombstonedHandler
offset *addressSDK.Address
}
// SetHandler sets a handler that will be called on every
// TombstonedObject.
func (g *GraveyardIterationPrm) SetHandler(h TombstonedHandler) *GraveyardIterationPrm {
g.h = h
return g
}
// SetOffset sets an offset of the iteration operation.
// The handler will be applied to the next after the
// specified offset if any are left.
//
// Note: if offset is not found in db, iteration starts
// from the element that WOULD BE the following after the
// offset if offset was presented. That means that it is
// safe to delete offset element and pass if to the
// iteration once again: iteration would start from the
// next element.
//
// Nil offset means start an integration from the beginning.
func (g *GraveyardIterationPrm) SetOffset(offset *addressSDK.Address) *GraveyardIterationPrm {
g.offset = offset
return g
}
// IterateOverGraveyard iterates over all graves in DB. // IterateOverGraveyard iterates over all graves in DB.
// //
// If h returns ErrInterruptIterator, nil returns immediately. // If h returns ErrInterruptIterator, nil returns immediately.
// Returns other errors of h directly. // Returns other errors of h directly.
func (db *DB) IterateOverGraveyard(h Handler) error { func (db *DB) IterateOverGraveyard(p *GraveyardIterationPrm) error {
return db.boltDB.View(func(tx *bbolt.Tx) error { return db.boltDB.View(func(tx *bbolt.Tx) error {
return db.iterateDeletedObj(tx, grave, h) return db.iterateDeletedObj(tx, graveyardHandler{p.h}, p.offset)
}) })
} }
type deletedType uint8 type kvHandler interface {
handleKV(k, v []byte) error
}
const ( type gcHandler struct {
_ deletedType = iota h GarbageHandler
grave }
withGC
)
func (db *DB) iterateDeletedObj(tx *bbolt.Tx, t deletedType, h Handler) error { func (g gcHandler) handleKV(k, _ []byte) error {
o, err := garbageFromKV(k)
if err != nil {
return fmt.Errorf("could not parse garbage object: %w", err)
}
return g.h(o)
}
type graveyardHandler struct {
h TombstonedHandler
}
func (g graveyardHandler) handleKV(k, v []byte) error {
o, err := graveFromKV(k, v)
if err != nil {
return fmt.Errorf("could not parse grave: %w", err)
}
return g.h(o)
}
func (db *DB) iterateDeletedObj(tx *bbolt.Tx, h kvHandler, offset *addressSDK.Address) error {
var bkt *bbolt.Bucket var bkt *bbolt.Bucket
switch t { switch t := h.(type) {
case grave: case graveyardHandler:
bkt = tx.Bucket(graveyardBucketName) bkt = tx.Bucket(graveyardBucketName)
case withGC: case gcHandler:
bkt = tx.Bucket(garbageBucketName) bkt = tx.Bucket(garbageBucketName)
default: default:
panic(fmt.Sprintf("metabase: unknown iteration object type: %d", t)) panic(fmt.Sprintf("metabase: unknown iteration object hadler: %T", t))
} }
if bkt == nil { if bkt == nil {
return nil return nil
} }
// iterate over all deleted objects c := bkt.Cursor()
err := bkt.ForEach(func(k, v []byte) error { var k, v []byte
// parse deleted object
delObj, err := deletedObjectFromKV(k, v) if offset == nil {
if err != nil { k, v = c.First()
return fmt.Errorf("could not parse Grave: %w", err) } else {
rawAddr := addressKey(offset)
k, v = c.Seek(rawAddr)
if bytes.Equal(k, rawAddr) {
// offset was found, move
// cursor to the next element
k, v = c.Next()
}
} }
// handler object for ; k != nil; k, v = c.Next() {
return h(delObj) err := h.handleKV(k, v)
}) if err != nil {
if errors.Is(err, ErrInterruptIterator) { if errors.Is(err, ErrInterruptIterator) {
err = nil return nil
} }
return err return err
} }
func deletedObjectFromKV(k, _ []byte) (*DeletedObject, error) {
addr, err := addressFromKey(k)
if err != nil {
return nil, fmt.Errorf("could not parse address: %w", err)
} }
return &DeletedObject{ return nil
}
func garbageFromKV(k []byte) (GarbageObject, error) {
addr, err := addressFromKey(k)
if err != nil {
return GarbageObject{}, fmt.Errorf("could not parse address: %w", err)
}
return GarbageObject{
addr: addr, addr: addr,
}, nil }, nil
} }
func graveFromKV(k, v []byte) (TombstonedObject, error) {
target, err := addressFromKey(k)
if err != nil {
return TombstonedObject{}, fmt.Errorf("could not parse address: %w", err)
}
tomb, err := addressFromKey(v)
if err != nil {
return TombstonedObject{}, err
}
return TombstonedObject{
addr: target,
tomb: tomb,
}, nil
}

View file

@ -9,10 +9,104 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
func TestDB_IterateDeletedObjects_EmptyDB(t *testing.T) {
db := newDB(t)
var counter int
iterGravePRM := new(meta.GraveyardIterationPrm)
err := db.IterateOverGraveyard(iterGravePRM.SetHandler(func(garbage meta.TombstonedObject) error {
counter++
return nil
}))
require.NoError(t, err)
require.Zero(t, counter)
iterGCPRM := new(meta.GarbageIterationPrm)
err = db.IterateOverGarbage(iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error {
counter++
return nil
}))
require.NoError(t, err)
require.Zero(t, counter)
}
func TestDB_Iterate_OffsetNotFound(t *testing.T) {
db := newDB(t)
obj1 := generateObject(t)
obj2 := generateObject(t)
addr1 := addressSDK.NewAddress()
err := addr1.Parse("AUSF6rhReoAdPVKYUZWW9o2LbtTvekn54B3JXi7pdzmn/2daLhLB7yVXbjBaKkckkuvjX22BxRYuSHy9RPxuH9PZS")
require.NoError(t, err)
addr2 := addressSDK.NewAddress()
err = addr2.Parse("CwYYr6sFLU1zK6DeBTVd8SReADUoxYobUhSrxgXYxCVn/ANYbnJoQqdjmU5Dhk3LkxYj5E9nJHQFf8LjTEcap9TxM")
require.NoError(t, err)
addr3 := addressSDK.NewAddress()
err = addr3.Parse("6ay4GfhR9RgN28d5ufg63toPetkYHGcpcW7G3b7QWSek/ANYbnJoQqdjmU5Dhk3LkxYj5E9nJHQFf8LjTEcap9TxM")
require.NoError(t, err)
obj1.SetContainerID(addr1.ContainerID())
obj1.SetID(addr1.ObjectID())
obj2.SetContainerID(addr2.ContainerID())
obj2.SetID(addr2.ObjectID())
err = putBig(db, obj1)
require.NoError(t, err)
_, err = db.Inhume(new(meta.InhumePrm).
WithAddresses(object.AddressOf(obj1)).
WithGCMark(),
)
require.NoError(t, err)
var counter int
iterGCPRM := new(meta.GarbageIterationPrm).
SetOffset(object.AddressOf(obj2)).
SetHandler(func(garbage meta.GarbageObject) error {
require.Equal(t, *garbage.Address(), *addr1)
counter++
return nil
})
err = db.IterateOverGarbage(iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error {
require.Equal(t, *garbage.Address(), *addr1)
counter++
return nil
}))
require.NoError(t, err)
// the second object would be put after the
// first, so it is expected that iteration
// will not receive the first object
require.Equal(t, 0, counter)
iterGCPRM.SetOffset(addr3)
err = db.IterateOverGarbage(iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error {
require.Equal(t, *garbage.Address(), *addr1)
counter++
return nil
}))
require.NoError(t, err)
// the third object would be put before the
// first, so it is expected that iteration
// will receive the first object
require.Equal(t, 1, counter)
}
func TestDB_IterateDeletedObjects(t *testing.T) { func TestDB_IterateDeletedObjects(t *testing.T) {
db := newDB(t) db := newDB(t)
// generate and put 2 objects // generate and put 4 objects
obj1 := generateObject(t) obj1 := generateObject(t)
obj2 := generateObject(t) obj2 := generateObject(t)
obj3 := generateObject(t) obj3 := generateObject(t)
@ -54,19 +148,25 @@ func TestDB_IterateDeletedObjects(t *testing.T) {
buriedTS, buriedGC []*addressSDK.Address buriedTS, buriedGC []*addressSDK.Address
) )
err = db.IterateOverGraveyard(func(deletedObject *meta.DeletedObject) error { iterGravePRM := new(meta.GraveyardIterationPrm)
buriedTS = append(buriedTS, deletedObject.Address())
err = db.IterateOverGraveyard(iterGravePRM.SetHandler(func(tomstoned meta.TombstonedObject) error {
require.Equal(t, *addrTombstone, *tomstoned.Tombstone())
buriedTS = append(buriedTS, tomstoned.Address())
counterAll++ counterAll++
return nil return nil
}) }))
err = db.IterateOverGarbage(func(deletedObject *meta.DeletedObject) error { iterGCPRM := new(meta.GarbageIterationPrm)
buriedGC = append(buriedGC, deletedObject.Address())
err = db.IterateOverGarbage(iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error {
buriedGC = append(buriedGC, garbage.Address())
counterAll++ counterAll++
return nil return nil
}) }))
require.NoError(t, err) require.NoError(t, err)
@ -86,6 +186,189 @@ func TestDB_IterateDeletedObjects(t *testing.T) {
require.True(t, equalAddresses(garbageExpected, buriedGC)) require.True(t, equalAddresses(garbageExpected, buriedGC))
} }
func TestDB_IterateOverGraveyard_Offset(t *testing.T) {
db := newDB(t)
// generate and put 4 objects
obj1 := generateObject(t)
obj2 := generateObject(t)
obj3 := generateObject(t)
obj4 := generateObject(t)
var err error
err = putBig(db, obj1)
require.NoError(t, err)
err = putBig(db, obj2)
require.NoError(t, err)
err = putBig(db, obj3)
require.NoError(t, err)
err = putBig(db, obj4)
require.NoError(t, err)
inhumePrm := new(meta.InhumePrm)
// inhume with tombstone
addrTombstone := generateAddress()
_, err = db.Inhume(inhumePrm.
WithAddresses(object.AddressOf(obj1), object.AddressOf(obj2), object.AddressOf(obj3), object.AddressOf(obj4)).
WithTombstoneAddress(addrTombstone),
)
require.NoError(t, err)
expectedGraveyard := []*addressSDK.Address{
object.AddressOf(obj1), object.AddressOf(obj2),
object.AddressOf(obj3), object.AddressOf(obj4),
}
var (
counter int
firstIterationSize = len(expectedGraveyard) / 2
gotGraveyard []*addressSDK.Address
)
iterGraveyardPrm := new(meta.GraveyardIterationPrm)
err = db.IterateOverGraveyard(iterGraveyardPrm.SetHandler(func(tombstoned meta.TombstonedObject) error {
require.Equal(t, *addrTombstone, *tombstoned.Tombstone())
gotGraveyard = append(gotGraveyard, tombstoned.Address())
counter++
if counter == firstIterationSize {
return meta.ErrInterruptIterator
}
return nil
}))
require.NoError(t, err)
require.Equal(t, firstIterationSize, counter)
require.Equal(t, firstIterationSize, len(gotGraveyard))
// last received address is an offset
offset := gotGraveyard[len(gotGraveyard)-1]
iterGraveyardPrm.SetOffset(offset)
err = db.IterateOverGraveyard(iterGraveyardPrm.SetHandler(func(tombstoned meta.TombstonedObject) error {
require.Equal(t, *addrTombstone, *tombstoned.Tombstone())
gotGraveyard = append(gotGraveyard, tombstoned.Address())
counter++
return nil
}))
require.NoError(t, err)
require.Equal(t, len(expectedGraveyard), counter)
require.True(t, equalAddresses(gotGraveyard, expectedGraveyard))
// last received object (last in db) as offset
// should lead to no iteration at all
offset = gotGraveyard[len(gotGraveyard)-1]
iterGraveyardPrm.SetOffset(offset)
iWasCalled := false
err = db.IterateOverGraveyard(iterGraveyardPrm.SetHandler(func(tombstoned meta.TombstonedObject) error {
iWasCalled = true
return nil
}))
require.NoError(t, err)
require.False(t, iWasCalled)
}
func TestDB_IterateOverGarbage_Offset(t *testing.T) {
db := newDB(t)
// generate and put 4 objects
obj1 := generateObject(t)
obj2 := generateObject(t)
obj3 := generateObject(t)
obj4 := generateObject(t)
var err error
err = putBig(db, obj1)
require.NoError(t, err)
err = putBig(db, obj2)
require.NoError(t, err)
err = putBig(db, obj3)
require.NoError(t, err)
err = putBig(db, obj4)
require.NoError(t, err)
inhumePrm := new(meta.InhumePrm)
_, err = db.Inhume(inhumePrm.
WithAddresses(object.AddressOf(obj1), object.AddressOf(obj2), object.AddressOf(obj3), object.AddressOf(obj4)).
WithGCMark(),
)
require.NoError(t, err)
expectedGarbage := []*addressSDK.Address{
object.AddressOf(obj1), object.AddressOf(obj2),
object.AddressOf(obj3), object.AddressOf(obj4),
}
var (
counter int
firstIterationSize = len(expectedGarbage) / 2
gotGarbage []*addressSDK.Address
)
iterGarbagePrm := new(meta.GarbageIterationPrm)
err = db.IterateOverGarbage(iterGarbagePrm.SetHandler(func(garbage meta.GarbageObject) error {
gotGarbage = append(gotGarbage, garbage.Address())
counter++
if counter == firstIterationSize {
return meta.ErrInterruptIterator
}
return nil
}))
require.NoError(t, err)
require.Equal(t, firstIterationSize, counter)
require.Equal(t, firstIterationSize, len(gotGarbage))
// last received address is an offset
offset := gotGarbage[len(gotGarbage)-1]
iterGarbagePrm.SetOffset(offset)
err = db.IterateOverGarbage(iterGarbagePrm.SetHandler(func(garbage meta.GarbageObject) error {
gotGarbage = append(gotGarbage, garbage.Address())
counter++
return nil
}))
require.NoError(t, err)
require.Equal(t, len(expectedGarbage), counter)
require.True(t, equalAddresses(gotGarbage, expectedGarbage))
// last received object (last in db) as offset
// should lead to no iteration at all
offset = gotGarbage[len(gotGarbage)-1]
iterGarbagePrm.SetOffset(offset)
iWasCalled := false
err = db.IterateOverGarbage(iterGarbagePrm.SetHandler(func(garbage meta.GarbageObject) error {
iWasCalled = true
return nil
}))
require.NoError(t, err)
require.False(t, iWasCalled)
}
func equalAddresses(aa1 []*addressSDK.Address, aa2 []*addressSDK.Address) bool { func equalAddresses(aa1 []*addressSDK.Address, aa2 []*addressSDK.Address) bool {
if len(aa1) != len(aa2) { if len(aa1) != len(aa2) {
return false return false

View file

@ -176,9 +176,11 @@ func (s *Shard) removeGarbage() {
buf := make([]*addressSDK.Address, 0, s.rmBatchSize) buf := make([]*addressSDK.Address, 0, s.rmBatchSize)
iterPrm := new(meta.GarbageIterationPrm)
// iterate over metabase's objects with GC mark // iterate over metabase's objects with GC mark
// (no more than s.rmBatchSize objects) // (no more than s.rmBatchSize objects)
err := s.metaBase.IterateOverGarbage(func(g *meta.DeletedObject) error { err := s.metaBase.IterateOverGarbage(iterPrm.SetHandler(func(g meta.GarbageObject) error {
buf = append(buf, g.Address()) buf = append(buf, g.Address())
if len(buf) == s.rmBatchSize { if len(buf) == s.rmBatchSize {
@ -186,7 +188,7 @@ func (s *Shard) removeGarbage() {
} }
return nil return nil
}) }))
if err != nil { if err != nil {
s.log.Warn("iterator over metabase graveyard failed", s.log.Warn("iterator over metabase graveyard failed",
zap.String("error", err.Error()), zap.String("error", err.Error()),