[#378] metabase: Implement iterator over expired objects
Implement `DB.IterateExpired` method that iterates over the objects in metabase that are expired at particular epoch. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
a9a1acc880
commit
182df23859
3 changed files with 180 additions and 0 deletions
111
pkg/local_object_storage/metabase/iterators.go
Normal file
111
pkg/local_object_storage/metabase/iterators.go
Normal file
|
@ -0,0 +1,111 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
"github.com/pkg/errors"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
||||
// ExpiredObject is a descriptor of expired object from DB.
|
||||
type ExpiredObject struct {
|
||||
typ object.Type
|
||||
|
||||
addr *object.Address
|
||||
}
|
||||
|
||||
// Type returns type of the expired object.
|
||||
func (e *ExpiredObject) Type() object.Type {
|
||||
return e.typ
|
||||
}
|
||||
|
||||
// Address returns address of the expired object.
|
||||
func (e *ExpiredObject) Address() *object.Address {
|
||||
return e.addr
|
||||
}
|
||||
|
||||
// ExpiredObjectHandler is an ExpiredObject handling function.
|
||||
type ExpiredObjectHandler func(*ExpiredObject) error
|
||||
|
||||
// ErrInterruptIterator is returned by iteration handlers
|
||||
// as a "break" keyword.
|
||||
var ErrInterruptIterator = errors.New("iterator is interrupted")
|
||||
|
||||
// IterateExpired iterates over all objects in DB which are out of date
|
||||
// relative to epoch.
|
||||
//
|
||||
// If h returns ErrInterruptIterator, nil returns immediately.
|
||||
// Returns other errors of h directly.
|
||||
func (db *DB) IterateExpired(epoch uint64, h ExpiredObjectHandler) error {
|
||||
return db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
return db.iterateExpired(tx, epoch, h)
|
||||
})
|
||||
}
|
||||
|
||||
func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) error {
|
||||
err := tx.ForEach(func(name []byte, b *bbolt.Bucket) error {
|
||||
cidBytes := cidFromAttributeBucket(name, objectV2.SysAttributeExpEpoch)
|
||||
if cidBytes == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return b.ForEach(func(expKey, _ []byte) error {
|
||||
bktExpired := b.Bucket(expKey)
|
||||
if bktExpired == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return bktExpired.ForEach(func(idKey, _ []byte) error {
|
||||
expiresAt, err := strconv.ParseUint(string(expKey), 10, 64)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not parse expiration epoch")
|
||||
} else if expiresAt >= epoch {
|
||||
return nil
|
||||
}
|
||||
|
||||
id := object.NewID()
|
||||
|
||||
err = id.Parse(string(idKey))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not parse ID of expired object")
|
||||
}
|
||||
|
||||
cid := container.NewID()
|
||||
|
||||
err = cid.Parse(string(cidBytes))
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "could not parse container ID of expired bucket")
|
||||
}
|
||||
|
||||
addr := object.NewAddress()
|
||||
addr.SetContainerID(cid)
|
||||
addr.SetObjectID(id)
|
||||
|
||||
return h(&ExpiredObject{
|
||||
typ: objectType(tx, cid, idKey),
|
||||
addr: addr,
|
||||
})
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
if errors.Is(err, ErrInterruptIterator) {
|
||||
err = nil
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func objectType(tx *bbolt.Tx, cid *container.ID, oidBytes []byte) object.Type {
|
||||
switch {
|
||||
default:
|
||||
return object.TypeRegular
|
||||
case inBucket(tx, tombstoneBucketName(cid), oidBytes):
|
||||
return object.TypeTombstone
|
||||
case inBucket(tx, storageGroupBucketName(cid), oidBytes):
|
||||
return object.TypeStorageGroup
|
||||
}
|
||||
}
|
58
pkg/local_object_storage/metabase/iterators_test.go
Normal file
58
pkg/local_object_storage/metabase/iterators_test.go
Normal file
|
@ -0,0 +1,58 @@
|
|||
package meta_test
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDB_IterateExpired(t *testing.T) {
|
||||
db := newDB(t)
|
||||
defer releaseDB(db)
|
||||
|
||||
const epoch = 13
|
||||
|
||||
mAlive := map[object.Type]*object.Address{}
|
||||
mExpired := map[object.Type]*object.Address{}
|
||||
|
||||
for _, typ := range []object.Type{
|
||||
object.TypeRegular,
|
||||
object.TypeTombstone,
|
||||
object.TypeStorageGroup,
|
||||
} {
|
||||
mAlive[typ] = putWithExpiration(t, db, typ, epoch)
|
||||
mExpired[typ] = putWithExpiration(t, db, typ, epoch-1)
|
||||
}
|
||||
|
||||
err := db.IterateExpired(epoch, func(exp *meta.ExpiredObject) error {
|
||||
if addr, ok := mAlive[exp.Type()]; ok {
|
||||
require.NotEqual(t, addr, exp.Address())
|
||||
}
|
||||
|
||||
addr, ok := mExpired[exp.Type()]
|
||||
require.True(t, ok)
|
||||
require.Equal(t, addr, exp.Address())
|
||||
|
||||
delete(mExpired, exp.Type())
|
||||
|
||||
return nil
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Empty(t, mExpired)
|
||||
}
|
||||
|
||||
func putWithExpiration(t *testing.T, db *meta.DB, typ object.Type, expiresAt uint64) *object.Address {
|
||||
raw := generateRawObject(t)
|
||||
raw.SetType(typ)
|
||||
addAttribute(raw, objectV2.SysAttributeExpEpoch, strconv.FormatUint(expiresAt, 10))
|
||||
|
||||
obj := raw.Object()
|
||||
require.NoError(t, putBig(db, obj))
|
||||
|
||||
return obj.Address()
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
package meta
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"strings"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
|
@ -67,6 +68,16 @@ func attributeBucketName(cid *container.ID, attributeKey string) []byte {
|
|||
return []byte(sb.String())
|
||||
}
|
||||
|
||||
// returns <CID> from attributeBucketName result, nil otherwise.
|
||||
func cidFromAttributeBucket(val []byte, attributeKey string) []byte {
|
||||
suffix := []byte(userAttributePostfix + attributeKey)
|
||||
if !bytes.HasSuffix(val, suffix) {
|
||||
return nil
|
||||
}
|
||||
|
||||
return val[:len(val)-len(suffix)]
|
||||
}
|
||||
|
||||
// payloadHashBucketName returns <CID>_payloadhash.
|
||||
func payloadHashBucketName(cid *container.ID) []byte {
|
||||
return []byte(cid.String() + payloadHashPostfix)
|
||||
|
|
Loading…
Reference in a new issue