diff --git a/pkg/local_object_storage/metabase/graveyard.go b/pkg/local_object_storage/metabase/graveyard.go index 3e96ea36d..e4c1d3070 100644 --- a/pkg/local_object_storage/metabase/graveyard.go +++ b/pkg/local_object_storage/metabase/graveyard.go @@ -1,97 +1,231 @@ package meta import ( - "errors" + "bytes" "fmt" addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address" + "github.com/pkg/errors" "go.etcd.io/bbolt" ) -// DeletedObject represents descriptor of the object that was -// marked to be deleted. -type DeletedObject struct { +// GarbageObject represents descriptor of the +// object that has been marked with GC. +type GarbageObject struct { addr *addressSDK.Address } -// Address returns buried object address. -func (g *DeletedObject) Address() *addressSDK.Address { +// Address returns garbage object address. +func (g GarbageObject) Address() *addressSDK.Address { return g.addr } -// Handler is a DeletedObject handling function. -type Handler func(*DeletedObject) error +// GarbageHandler is a GarbageObject handling function. +type GarbageHandler func(GarbageObject) error + +// GarbageIterationPrm groups parameters of the garbage +// iteration process. +type GarbageIterationPrm struct { + h GarbageHandler + offset *addressSDK.Address +} + +// SetHandler sets a handler that will be called on every +// GarbageObject. +func (g *GarbageIterationPrm) SetHandler(h GarbageHandler) *GarbageIterationPrm { + g.h = h + return g +} + +// SetOffset sets an offset of the iteration operation. +// The handler will be applied to the next after the +// specified offset if any are left. +// +// Note: if offset is not found in db, iteration starts +// from the element that WOULD BE the following after the +// offset if offset was presented. That means that it is +// safe to delete offset element and pass if to the +// iteration once again: iteration would start from the +// next element. +// +// Nil offset means start an integration from the beginning. +func (g *GarbageIterationPrm) SetOffset(offset *addressSDK.Address) *GarbageIterationPrm { + g.offset = offset + return g +} // IterateOverGarbage iterates over all objects // marked with GC mark. // // If h returns ErrInterruptIterator, nil returns immediately. // Returns other errors of h directly. -func (db *DB) IterateOverGarbage(h Handler) error { +func (db *DB) IterateOverGarbage(p *GarbageIterationPrm) error { return db.boltDB.View(func(tx *bbolt.Tx) error { - return db.iterateDeletedObj(tx, withGC, h) + return db.iterateDeletedObj(tx, gcHandler{p.h}, p.offset) }) } +// TombstonedObject represents descriptor of the +// object that has been covered with tombstone. +type TombstonedObject struct { + addr *addressSDK.Address + tomb *addressSDK.Address +} + +// Address returns tombstoned object address. +func (g TombstonedObject) Address() *addressSDK.Address { + return g.addr +} + +// Tombstone returns address of a tombstone that +// covers object. +func (g TombstonedObject) Tombstone() *addressSDK.Address { + return g.tomb +} + +// TombstonedHandler is a TombstonedObject handling function. +type TombstonedHandler func(object TombstonedObject) error + +// GraveyardIterationPrm groups parameters of the graveyard +// iteration process. +type GraveyardIterationPrm struct { + h TombstonedHandler + offset *addressSDK.Address +} + +// SetHandler sets a handler that will be called on every +// TombstonedObject. +func (g *GraveyardIterationPrm) SetHandler(h TombstonedHandler) *GraveyardIterationPrm { + g.h = h + return g +} + +// SetOffset sets an offset of the iteration operation. +// The handler will be applied to the next after the +// specified offset if any are left. +// +// Note: if offset is not found in db, iteration starts +// from the element that WOULD BE the following after the +// offset if offset was presented. That means that it is +// safe to delete offset element and pass if to the +// iteration once again: iteration would start from the +// next element. +// +// Nil offset means start an integration from the beginning. +func (g *GraveyardIterationPrm) SetOffset(offset *addressSDK.Address) *GraveyardIterationPrm { + g.offset = offset + return g +} + // IterateOverGraveyard iterates over all graves in DB. // // If h returns ErrInterruptIterator, nil returns immediately. // Returns other errors of h directly. -func (db *DB) IterateOverGraveyard(h Handler) error { +func (db *DB) IterateOverGraveyard(p *GraveyardIterationPrm) error { return db.boltDB.View(func(tx *bbolt.Tx) error { - return db.iterateDeletedObj(tx, grave, h) + return db.iterateDeletedObj(tx, graveyardHandler{p.h}, p.offset) }) } -type deletedType uint8 +type kvHandler interface { + handleKV(k, v []byte) error +} -const ( - _ deletedType = iota - grave - withGC -) +type gcHandler struct { + h GarbageHandler +} -func (db *DB) iterateDeletedObj(tx *bbolt.Tx, t deletedType, h Handler) error { +func (g gcHandler) handleKV(k, _ []byte) error { + o, err := garbageFromKV(k) + if err != nil { + return fmt.Errorf("could not parse garbage object: %w", err) + } + + return g.h(o) +} + +type graveyardHandler struct { + h TombstonedHandler +} + +func (g graveyardHandler) handleKV(k, v []byte) error { + o, err := graveFromKV(k, v) + if err != nil { + return fmt.Errorf("could not parse grave: %w", err) + } + + return g.h(o) +} + +func (db *DB) iterateDeletedObj(tx *bbolt.Tx, h kvHandler, offset *addressSDK.Address) error { var bkt *bbolt.Bucket - switch t { - case grave: + switch t := h.(type) { + case graveyardHandler: bkt = tx.Bucket(graveyardBucketName) - case withGC: + case gcHandler: bkt = tx.Bucket(garbageBucketName) default: - panic(fmt.Sprintf("metabase: unknown iteration object type: %d", t)) + panic(fmt.Sprintf("metabase: unknown iteration object hadler: %T", t)) } if bkt == nil { return nil } - // iterate over all deleted objects - err := bkt.ForEach(func(k, v []byte) error { - // parse deleted object - delObj, err := deletedObjectFromKV(k, v) - if err != nil { - return fmt.Errorf("could not parse Grave: %w", err) + c := bkt.Cursor() + var k, v []byte + + if offset == nil { + k, v = c.First() + } else { + rawAddr := addressKey(offset) + + k, v = c.Seek(rawAddr) + if bytes.Equal(k, rawAddr) { + // offset was found, move + // cursor to the next element + k, v = c.Next() } - - // handler object - return h(delObj) - }) - - if errors.Is(err, ErrInterruptIterator) { - err = nil } - return err + for ; k != nil; k, v = c.Next() { + err := h.handleKV(k, v) + if err != nil { + if errors.Is(err, ErrInterruptIterator) { + return nil + } + + return err + } + } + + return nil } -func deletedObjectFromKV(k, _ []byte) (*DeletedObject, error) { +func garbageFromKV(k []byte) (GarbageObject, error) { addr, err := addressFromKey(k) if err != nil { - return nil, fmt.Errorf("could not parse address: %w", err) + return GarbageObject{}, fmt.Errorf("could not parse address: %w", err) } - return &DeletedObject{ + return GarbageObject{ addr: addr, }, nil } + +func graveFromKV(k, v []byte) (TombstonedObject, error) { + target, err := addressFromKey(k) + if err != nil { + return TombstonedObject{}, fmt.Errorf("could not parse address: %w", err) + } + + tomb, err := addressFromKey(v) + if err != nil { + return TombstonedObject{}, err + } + + return TombstonedObject{ + addr: target, + tomb: tomb, + }, nil +} diff --git a/pkg/local_object_storage/metabase/graveyard_test.go b/pkg/local_object_storage/metabase/graveyard_test.go index 963d8135e..6030101e4 100644 --- a/pkg/local_object_storage/metabase/graveyard_test.go +++ b/pkg/local_object_storage/metabase/graveyard_test.go @@ -9,10 +9,104 @@ import ( "github.com/stretchr/testify/require" ) +func TestDB_IterateDeletedObjects_EmptyDB(t *testing.T) { + db := newDB(t) + + var counter int + iterGravePRM := new(meta.GraveyardIterationPrm) + + err := db.IterateOverGraveyard(iterGravePRM.SetHandler(func(garbage meta.TombstonedObject) error { + counter++ + return nil + })) + require.NoError(t, err) + require.Zero(t, counter) + + iterGCPRM := new(meta.GarbageIterationPrm) + + err = db.IterateOverGarbage(iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error { + counter++ + return nil + })) + require.NoError(t, err) + require.Zero(t, counter) +} + +func TestDB_Iterate_OffsetNotFound(t *testing.T) { + db := newDB(t) + + obj1 := generateObject(t) + obj2 := generateObject(t) + + addr1 := addressSDK.NewAddress() + err := addr1.Parse("AUSF6rhReoAdPVKYUZWW9o2LbtTvekn54B3JXi7pdzmn/2daLhLB7yVXbjBaKkckkuvjX22BxRYuSHy9RPxuH9PZS") + require.NoError(t, err) + + addr2 := addressSDK.NewAddress() + err = addr2.Parse("CwYYr6sFLU1zK6DeBTVd8SReADUoxYobUhSrxgXYxCVn/ANYbnJoQqdjmU5Dhk3LkxYj5E9nJHQFf8LjTEcap9TxM") + require.NoError(t, err) + + addr3 := addressSDK.NewAddress() + err = addr3.Parse("6ay4GfhR9RgN28d5ufg63toPetkYHGcpcW7G3b7QWSek/ANYbnJoQqdjmU5Dhk3LkxYj5E9nJHQFf8LjTEcap9TxM") + require.NoError(t, err) + + obj1.SetContainerID(addr1.ContainerID()) + obj1.SetID(addr1.ObjectID()) + obj2.SetContainerID(addr2.ContainerID()) + obj2.SetID(addr2.ObjectID()) + + err = putBig(db, obj1) + require.NoError(t, err) + + _, err = db.Inhume(new(meta.InhumePrm). + WithAddresses(object.AddressOf(obj1)). + WithGCMark(), + ) + require.NoError(t, err) + + var counter int + + iterGCPRM := new(meta.GarbageIterationPrm). + SetOffset(object.AddressOf(obj2)). + SetHandler(func(garbage meta.GarbageObject) error { + require.Equal(t, *garbage.Address(), *addr1) + counter++ + + return nil + }) + + err = db.IterateOverGarbage(iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error { + require.Equal(t, *garbage.Address(), *addr1) + counter++ + + return nil + })) + require.NoError(t, err) + + // the second object would be put after the + // first, so it is expected that iteration + // will not receive the first object + require.Equal(t, 0, counter) + + iterGCPRM.SetOffset(addr3) + err = db.IterateOverGarbage(iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error { + require.Equal(t, *garbage.Address(), *addr1) + counter++ + + return nil + })) + require.NoError(t, err) + + // the third object would be put before the + // first, so it is expected that iteration + // will receive the first object + require.Equal(t, 1, counter) +} + func TestDB_IterateDeletedObjects(t *testing.T) { db := newDB(t) - // generate and put 2 objects + // generate and put 4 objects obj1 := generateObject(t) obj2 := generateObject(t) obj3 := generateObject(t) @@ -54,19 +148,25 @@ func TestDB_IterateDeletedObjects(t *testing.T) { buriedTS, buriedGC []*addressSDK.Address ) - err = db.IterateOverGraveyard(func(deletedObject *meta.DeletedObject) error { - buriedTS = append(buriedTS, deletedObject.Address()) + iterGravePRM := new(meta.GraveyardIterationPrm) + + err = db.IterateOverGraveyard(iterGravePRM.SetHandler(func(tomstoned meta.TombstonedObject) error { + require.Equal(t, *addrTombstone, *tomstoned.Tombstone()) + + buriedTS = append(buriedTS, tomstoned.Address()) counterAll++ return nil - }) + })) - err = db.IterateOverGarbage(func(deletedObject *meta.DeletedObject) error { - buriedGC = append(buriedGC, deletedObject.Address()) + iterGCPRM := new(meta.GarbageIterationPrm) + + err = db.IterateOverGarbage(iterGCPRM.SetHandler(func(garbage meta.GarbageObject) error { + buriedGC = append(buriedGC, garbage.Address()) counterAll++ return nil - }) + })) require.NoError(t, err) @@ -86,6 +186,189 @@ func TestDB_IterateDeletedObjects(t *testing.T) { require.True(t, equalAddresses(garbageExpected, buriedGC)) } +func TestDB_IterateOverGraveyard_Offset(t *testing.T) { + db := newDB(t) + + // generate and put 4 objects + obj1 := generateObject(t) + obj2 := generateObject(t) + obj3 := generateObject(t) + obj4 := generateObject(t) + + var err error + + err = putBig(db, obj1) + require.NoError(t, err) + + err = putBig(db, obj2) + require.NoError(t, err) + + err = putBig(db, obj3) + require.NoError(t, err) + + err = putBig(db, obj4) + require.NoError(t, err) + + inhumePrm := new(meta.InhumePrm) + + // inhume with tombstone + addrTombstone := generateAddress() + + _, err = db.Inhume(inhumePrm. + WithAddresses(object.AddressOf(obj1), object.AddressOf(obj2), object.AddressOf(obj3), object.AddressOf(obj4)). + WithTombstoneAddress(addrTombstone), + ) + require.NoError(t, err) + + expectedGraveyard := []*addressSDK.Address{ + object.AddressOf(obj1), object.AddressOf(obj2), + object.AddressOf(obj3), object.AddressOf(obj4), + } + + var ( + counter int + firstIterationSize = len(expectedGraveyard) / 2 + + gotGraveyard []*addressSDK.Address + ) + + iterGraveyardPrm := new(meta.GraveyardIterationPrm) + + err = db.IterateOverGraveyard(iterGraveyardPrm.SetHandler(func(tombstoned meta.TombstonedObject) error { + require.Equal(t, *addrTombstone, *tombstoned.Tombstone()) + + gotGraveyard = append(gotGraveyard, tombstoned.Address()) + + counter++ + if counter == firstIterationSize { + return meta.ErrInterruptIterator + } + + return nil + })) + require.NoError(t, err) + require.Equal(t, firstIterationSize, counter) + require.Equal(t, firstIterationSize, len(gotGraveyard)) + + // last received address is an offset + offset := gotGraveyard[len(gotGraveyard)-1] + iterGraveyardPrm.SetOffset(offset) + + err = db.IterateOverGraveyard(iterGraveyardPrm.SetHandler(func(tombstoned meta.TombstonedObject) error { + require.Equal(t, *addrTombstone, *tombstoned.Tombstone()) + + gotGraveyard = append(gotGraveyard, tombstoned.Address()) + counter++ + + return nil + })) + require.NoError(t, err) + require.Equal(t, len(expectedGraveyard), counter) + require.True(t, equalAddresses(gotGraveyard, expectedGraveyard)) + + // last received object (last in db) as offset + // should lead to no iteration at all + offset = gotGraveyard[len(gotGraveyard)-1] + iterGraveyardPrm.SetOffset(offset) + + iWasCalled := false + + err = db.IterateOverGraveyard(iterGraveyardPrm.SetHandler(func(tombstoned meta.TombstonedObject) error { + iWasCalled = true + return nil + })) + require.NoError(t, err) + require.False(t, iWasCalled) +} + +func TestDB_IterateOverGarbage_Offset(t *testing.T) { + db := newDB(t) + + // generate and put 4 objects + obj1 := generateObject(t) + obj2 := generateObject(t) + obj3 := generateObject(t) + obj4 := generateObject(t) + + var err error + + err = putBig(db, obj1) + require.NoError(t, err) + + err = putBig(db, obj2) + require.NoError(t, err) + + err = putBig(db, obj3) + require.NoError(t, err) + + err = putBig(db, obj4) + require.NoError(t, err) + + inhumePrm := new(meta.InhumePrm) + + _, err = db.Inhume(inhumePrm. + WithAddresses(object.AddressOf(obj1), object.AddressOf(obj2), object.AddressOf(obj3), object.AddressOf(obj4)). + WithGCMark(), + ) + require.NoError(t, err) + + expectedGarbage := []*addressSDK.Address{ + object.AddressOf(obj1), object.AddressOf(obj2), + object.AddressOf(obj3), object.AddressOf(obj4), + } + + var ( + counter int + firstIterationSize = len(expectedGarbage) / 2 + + gotGarbage []*addressSDK.Address + ) + + iterGarbagePrm := new(meta.GarbageIterationPrm) + + err = db.IterateOverGarbage(iterGarbagePrm.SetHandler(func(garbage meta.GarbageObject) error { + gotGarbage = append(gotGarbage, garbage.Address()) + + counter++ + if counter == firstIterationSize { + return meta.ErrInterruptIterator + } + + return nil + })) + require.NoError(t, err) + require.Equal(t, firstIterationSize, counter) + require.Equal(t, firstIterationSize, len(gotGarbage)) + + // last received address is an offset + offset := gotGarbage[len(gotGarbage)-1] + iterGarbagePrm.SetOffset(offset) + + err = db.IterateOverGarbage(iterGarbagePrm.SetHandler(func(garbage meta.GarbageObject) error { + gotGarbage = append(gotGarbage, garbage.Address()) + counter++ + + return nil + })) + require.NoError(t, err) + require.Equal(t, len(expectedGarbage), counter) + require.True(t, equalAddresses(gotGarbage, expectedGarbage)) + + // last received object (last in db) as offset + // should lead to no iteration at all + offset = gotGarbage[len(gotGarbage)-1] + iterGarbagePrm.SetOffset(offset) + + iWasCalled := false + + err = db.IterateOverGarbage(iterGarbagePrm.SetHandler(func(garbage meta.GarbageObject) error { + iWasCalled = true + return nil + })) + require.NoError(t, err) + require.False(t, iWasCalled) +} + func equalAddresses(aa1 []*addressSDK.Address, aa2 []*addressSDK.Address) bool { if len(aa1) != len(aa2) { return false diff --git a/pkg/local_object_storage/shard/gc.go b/pkg/local_object_storage/shard/gc.go index 22641b415..eeb6ec8c6 100644 --- a/pkg/local_object_storage/shard/gc.go +++ b/pkg/local_object_storage/shard/gc.go @@ -176,9 +176,11 @@ func (s *Shard) removeGarbage() { buf := make([]*addressSDK.Address, 0, s.rmBatchSize) + iterPrm := new(meta.GarbageIterationPrm) + // iterate over metabase's objects with GC mark // (no more than s.rmBatchSize objects) - err := s.metaBase.IterateOverGarbage(func(g *meta.DeletedObject) error { + err := s.metaBase.IterateOverGarbage(iterPrm.SetHandler(func(g meta.GarbageObject) error { buf = append(buf, g.Address()) if len(buf) == s.rmBatchSize { @@ -186,7 +188,7 @@ func (s *Shard) removeGarbage() { } return nil - }) + })) if err != nil { s.log.Warn("iterator over metabase graveyard failed", zap.String("error", err.Error()),