diff --git a/pkg/local_object_storage/metabase/iterators.go b/pkg/local_object_storage/metabase/iterators.go new file mode 100644 index 000000000..7e1dc0e12 --- /dev/null +++ b/pkg/local_object_storage/metabase/iterators.go @@ -0,0 +1,111 @@ +package meta + +import ( + "strconv" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/pkg/errors" + "go.etcd.io/bbolt" +) + +// ExpiredObject is a descriptor of expired object from DB. +type ExpiredObject struct { + typ object.Type + + addr *object.Address +} + +// Type returns type of the expired object. +func (e *ExpiredObject) Type() object.Type { + return e.typ +} + +// Address returns address of the expired object. +func (e *ExpiredObject) Address() *object.Address { + return e.addr +} + +// ExpiredObjectHandler is an ExpiredObject handling function. +type ExpiredObjectHandler func(*ExpiredObject) error + +// ErrInterruptIterator is returned by iteration handlers +// as a "break" keyword. +var ErrInterruptIterator = errors.New("iterator is interrupted") + +// IterateExpired iterates over all objects in DB which are out of date +// relative to epoch. +// +// If h returns ErrInterruptIterator, nil returns immediately. +// Returns other errors of h directly. +func (db *DB) IterateExpired(epoch uint64, h ExpiredObjectHandler) error { + return db.boltDB.View(func(tx *bbolt.Tx) error { + return db.iterateExpired(tx, epoch, h) + }) +} + +func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler) error { + err := tx.ForEach(func(name []byte, b *bbolt.Bucket) error { + cidBytes := cidFromAttributeBucket(name, objectV2.SysAttributeExpEpoch) + if cidBytes == nil { + return nil + } + + return b.ForEach(func(expKey, _ []byte) error { + bktExpired := b.Bucket(expKey) + if bktExpired == nil { + return nil + } + + return bktExpired.ForEach(func(idKey, _ []byte) error { + expiresAt, err := strconv.ParseUint(string(expKey), 10, 64) + if err != nil { + return errors.Wrap(err, "could not parse expiration epoch") + } else if expiresAt >= epoch { + return nil + } + + id := object.NewID() + + err = id.Parse(string(idKey)) + if err != nil { + return errors.Wrap(err, "could not parse ID of expired object") + } + + cid := container.NewID() + + err = cid.Parse(string(cidBytes)) + if err != nil { + return errors.Wrap(err, "could not parse container ID of expired bucket") + } + + addr := object.NewAddress() + addr.SetContainerID(cid) + addr.SetObjectID(id) + + return h(&ExpiredObject{ + typ: objectType(tx, cid, idKey), + addr: addr, + }) + }) + }) + }) + + if errors.Is(err, ErrInterruptIterator) { + err = nil + } + + return err +} + +func objectType(tx *bbolt.Tx, cid *container.ID, oidBytes []byte) object.Type { + switch { + default: + return object.TypeRegular + case inBucket(tx, tombstoneBucketName(cid), oidBytes): + return object.TypeTombstone + case inBucket(tx, storageGroupBucketName(cid), oidBytes): + return object.TypeStorageGroup + } +} diff --git a/pkg/local_object_storage/metabase/iterators_test.go b/pkg/local_object_storage/metabase/iterators_test.go new file mode 100644 index 000000000..16ea5075b --- /dev/null +++ b/pkg/local_object_storage/metabase/iterators_test.go @@ -0,0 +1,58 @@ +package meta_test + +import ( + "strconv" + "testing" + + "github.com/nspcc-dev/neofs-api-go/pkg/object" + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + "github.com/stretchr/testify/require" +) + +func TestDB_IterateExpired(t *testing.T) { + db := newDB(t) + defer releaseDB(db) + + const epoch = 13 + + mAlive := map[object.Type]*object.Address{} + mExpired := map[object.Type]*object.Address{} + + for _, typ := range []object.Type{ + object.TypeRegular, + object.TypeTombstone, + object.TypeStorageGroup, + } { + mAlive[typ] = putWithExpiration(t, db, typ, epoch) + mExpired[typ] = putWithExpiration(t, db, typ, epoch-1) + } + + err := db.IterateExpired(epoch, func(exp *meta.ExpiredObject) error { + if addr, ok := mAlive[exp.Type()]; ok { + require.NotEqual(t, addr, exp.Address()) + } + + addr, ok := mExpired[exp.Type()] + require.True(t, ok) + require.Equal(t, addr, exp.Address()) + + delete(mExpired, exp.Type()) + + return nil + }) + require.NoError(t, err) + + require.Empty(t, mExpired) +} + +func putWithExpiration(t *testing.T, db *meta.DB, typ object.Type, expiresAt uint64) *object.Address { + raw := generateRawObject(t) + raw.SetType(typ) + addAttribute(raw, objectV2.SysAttributeExpEpoch, strconv.FormatUint(expiresAt, 10)) + + obj := raw.Object() + require.NoError(t, putBig(db, obj)) + + return obj.Address() +} diff --git a/pkg/local_object_storage/metabase/util.go b/pkg/local_object_storage/metabase/util.go index 8aa1964e6..9489c1cce 100644 --- a/pkg/local_object_storage/metabase/util.go +++ b/pkg/local_object_storage/metabase/util.go @@ -1,6 +1,7 @@ package meta import ( + "bytes" "strings" "github.com/nspcc-dev/neofs-api-go/pkg/container" @@ -67,6 +68,16 @@ func attributeBucketName(cid *container.ID, attributeKey string) []byte { return []byte(sb.String()) } +// returns from attributeBucketName result, nil otherwise. +func cidFromAttributeBucket(val []byte, attributeKey string) []byte { + suffix := []byte(userAttributePostfix + attributeKey) + if !bytes.HasSuffix(val, suffix) { + return nil + } + + return val[:len(val)-len(suffix)] +} + // payloadHashBucketName returns _payloadhash. func payloadHashBucketName(cid *container.ID) []byte { return []byte(cid.String() + payloadHashPostfix)