From 156ba8532667e8e89812a3a102396bc9b40eec83 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 27 Jul 2022 21:38:28 +0300 Subject: [PATCH] [#1634] node: Do not return expired objects If an object has not been marked for removal by the GC in the current epoch yet but has already expired, respond with `ErrObjectNotFound` api status. Also, optimize shard iteration: a node must stop any iteration if the object is found but gonna be removed soon. All the checks are performed by the Metabase. Signed-off-by: Pavel Karpy --- CHANGELOG.md | 1 + pkg/core/object/errors.go | 8 +++ pkg/local_object_storage/engine/delete.go | 2 +- pkg/local_object_storage/engine/exists.go | 5 ++ pkg/local_object_storage/engine/get.go | 5 ++ pkg/local_object_storage/engine/head.go | 8 +++ pkg/local_object_storage/engine/inhume.go | 2 +- pkg/local_object_storage/engine/lock.go | 6 +++ pkg/local_object_storage/engine/put.go | 6 +++ pkg/local_object_storage/metabase/db_test.go | 32 +++++++++++- pkg/local_object_storage/metabase/delete.go | 7 +-- .../metabase/delete_test.go | 11 +++++ pkg/local_object_storage/metabase/exists.go | 49 ++++++++++++++++--- .../metabase/exists_test.go | 16 +++++- pkg/local_object_storage/metabase/get.go | 14 ++++-- pkg/local_object_storage/metabase/get_test.go | 14 +++++- pkg/local_object_storage/metabase/inhume.go | 4 +- pkg/local_object_storage/metabase/put.go | 13 +++-- pkg/local_object_storage/metabase/select.go | 20 +++++--- .../metabase/select_test.go | 17 +++++++ pkg/local_object_storage/shard/control.go | 3 +- pkg/local_object_storage/shard/errors.go | 7 +++ pkg/local_object_storage/shard/exists.go | 1 + pkg/local_object_storage/shard/get.go | 1 + pkg/local_object_storage/shard/head.go | 1 + pkg/local_object_storage/shard/range.go | 1 + pkg/local_object_storage/shard/restore.go | 2 +- pkg/local_object_storage/writecache/init.go | 10 +++- 28 files changed, 230 insertions(+), 36 deletions(-) create mode 100644 pkg/core/object/errors.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 750414213f..59c392b623 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ Changelog for NeoFS Node ### Fixed - Losing request context in eACL response checks (#1595) +- Do not return expired objects that have not been handled by the GC yet (#1634) - Setting CID field in `neofs-cli acl extended create` (#1650) - `neofs-ir` no longer hangs if it cannot bind to the control endpoint (#1643) - Do not require `lifetime` flag in `session create` CLI command (#1655) diff --git a/pkg/core/object/errors.go b/pkg/core/object/errors.go new file mode 100644 index 0000000000..6ef6b1a002 --- /dev/null +++ b/pkg/core/object/errors.go @@ -0,0 +1,8 @@ +package object + +import "errors" + +// ErrObjectIsExpired is returned when the requested object's +// epoch is less than the current one. Such objects are considered +// as removed and should not be returned from the Storage Engine. +var ErrObjectIsExpired = errors.New("object is expired") diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index ccf64ca6dc..6fff7b6617 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -74,7 +74,7 @@ func (e *StorageEngine) delete(prm DeletePrm) (DeleteRes, error) { resExists, err := sh.Exists(existsPrm) if err != nil { _, ok := err.(*objectSDK.SplitInfoError) - if ok || shard.IsErrRemoved(err) { + if ok || shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) { return true } if !shard.IsErrNotFound(err) { diff --git a/pkg/local_object_storage/engine/exists.go b/pkg/local_object_storage/engine/exists.go index a0d8549ec5..d4027994ec 100644 --- a/pkg/local_object_storage/engine/exists.go +++ b/pkg/local_object_storage/engine/exists.go @@ -26,6 +26,11 @@ func (e *StorageEngine) exists(addr oid.Address) (bool, error) { if ok { return true } + + if shard.IsErrObjectExpired(err) { + return true + } + if !shard.IsErrNotFound(err) { e.reportShardError(sh, "could not check existence of object in shard", err) } diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index b11e72cdf4..809912cb86 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -113,6 +113,11 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) { outError = err return true // stop, return it back + case shard.IsErrObjectExpired(err): + // object is found but should not + // be returned + outError = errNotFound + return true default: e.reportShardError(sh, "could not get object from shard", err) return false diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index 381b5c3b4f..eed783db1f 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -111,6 +111,14 @@ func (e *StorageEngine) head(prm HeadPrm) (HeadRes, error) { outError = err return true // stop, return it back + case shard.IsErrObjectExpired(err): + var notFoundErr apistatus.ObjectNotFound + + // object is found but should not + // be returned + outError = notFoundErr + + return true default: e.reportShardError(sh, "could not head object from shard", err) return false diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index c0c3cd4463..890d476a69 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -132,7 +132,7 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE existPrm.SetAddress(addr) exRes, err := sh.Exists(existPrm) if err != nil { - if shard.IsErrRemoved(err) { + if shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) { // inhumed once - no need to be inhumed again status = 3 return true diff --git a/pkg/local_object_storage/engine/lock.go b/pkg/local_object_storage/engine/lock.go index 52023b9943..1dbdd2d8c8 100644 --- a/pkg/local_object_storage/engine/lock.go +++ b/pkg/local_object_storage/engine/lock.go @@ -72,6 +72,12 @@ func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExi if err != nil { var siErr *objectSDK.SplitInfoError if !errors.As(err, &siErr) { + if shard.IsErrObjectExpired(err) { + // object is already expired => + // do not lock it + return true + } + e.reportShardError(sh, "could not check locked object for presence in shard", err) return } diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 078223aa67..613a4eb5ff 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -76,6 +76,12 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) { exists, err := sh.Exists(existPrm) if err != nil { + if shard.IsErrObjectExpired(err) { + // object is already found but + // expired => do nothing with it + finished = true + } + return // this is not ErrAlreadyRemoved error so we can go to the next shard } diff --git a/pkg/local_object_storage/metabase/db_test.go b/pkg/local_object_storage/metabase/db_test.go index dee0bef5f3..435a1b2db7 100644 --- a/pkg/local_object_storage/metabase/db_test.go +++ b/pkg/local_object_storage/metabase/db_test.go @@ -3,14 +3,17 @@ package meta_test import ( "math" "os" + "strconv" "testing" + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-sdk-go/checksum" checksumtest "github.com/nspcc-dev/neofs-sdk-go/checksum/test" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" "github.com/nspcc-dev/neofs-sdk-go/object" + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" usertest "github.com/nspcc-dev/neofs-sdk-go/user/test" @@ -19,9 +22,13 @@ import ( "github.com/stretchr/testify/require" ) -type epochState struct{} +type epochState struct{ e uint64 } func (s epochState) CurrentEpoch() uint64 { + if s.e != 0 { + return s.e + } + return math.MaxUint64 } @@ -97,3 +104,26 @@ func addAttribute(obj *object.Object, key, val string) { attrs = append(attrs, attr) obj.SetAttributes(attrs...) } + +func checkExpiredObjects(t *testing.T, db *meta.DB, f func(exp, nonExp *objectSDK.Object)) { + expObj := generateObject(t) + setExpiration(expObj, currEpoch) + + require.NoError(t, metaPut(db, expObj, nil)) + + nonExpObj := generateObject(t) + setExpiration(nonExpObj, currEpoch+1) + + require.NoError(t, metaPut(db, nonExpObj, nil)) + + f(expObj, nonExpObj) +} + +func setExpiration(o *objectSDK.Object, epoch uint64) { + var attr objectSDK.Attribute + + attr.SetKey(objectV2.SysAttributeExpEpoch) + attr.SetValue(strconv.FormatUint(epoch, 10)) + + o.SetAttributes(append(o.Attributes(), attr)...) +} diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index 69ee63a244..0c7218bf55 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -58,9 +58,10 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) { func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) error { refCounter := make(referenceCounter, len(addrs)) + currEpoch := db.epochState.CurrentEpoch() for i := range addrs { - err := db.delete(tx, addrs[i], refCounter) + err := db.delete(tx, addrs[i], refCounter, currEpoch) if err != nil { return err // maybe log and continue? } @@ -78,7 +79,7 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) error { return nil } -func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter) error { +func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) error { // remove record from the garbage bucket garbageBKT := tx.Bucket(garbageBucketName) if garbageBKT != nil { @@ -89,7 +90,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter } // unmarshal object, work only with physically stored (raw == true) objects - obj, err := db.get(tx, addr, false, true) + obj, err := db.get(tx, addr, false, true, currEpoch) if err != nil { if errors.As(err, new(apistatus.ObjectNotFound)) { return nil diff --git a/pkg/local_object_storage/metabase/delete_test.go b/pkg/local_object_storage/metabase/delete_test.go index bdb4860b35..dc54b8f426 100644 --- a/pkg/local_object_storage/metabase/delete_test.go +++ b/pkg/local_object_storage/metabase/delete_test.go @@ -118,6 +118,17 @@ func TestGraveOnlyDelete(t *testing.T) { require.NoError(t, metaDelete(db, addr)) } +func TestExpiredObject(t *testing.T) { + db := newDB(t, meta.WithEpochState(epochState{currEpoch})) + + checkExpiredObjects(t, db, func(exp, nonExp *objectSDK.Object) { + // removing expired object should be error-free + require.NoError(t, metaDelete(db, object.AddressOf(exp))) + + require.NoError(t, metaDelete(db, object.AddressOf(nonExp))) + }) +} + func metaDelete(db *meta.DB, addrs ...oid.Address) error { var deletePrm meta.DeletePrm deletePrm.SetAddresses(addrs...) diff --git a/pkg/local_object_storage/metabase/exists.go b/pkg/local_object_storage/metabase/exists.go index 5e4454d555..893b8effa5 100644 --- a/pkg/local_object_storage/metabase/exists.go +++ b/pkg/local_object_storage/metabase/exists.go @@ -3,7 +3,10 @@ package meta import ( "errors" "fmt" + "strconv" + objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" + "github.com/nspcc-dev/neofs-node/pkg/core/object" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" @@ -37,12 +40,15 @@ func (p ExistsRes) Exists() bool { // returns true if addr is in primary index or false if it is not. // // Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard. +// Returns the object.ErrObjectIsExpired if the object is presented but already expired. func (db *DB) Exists(prm ExistsPrm) (res ExistsRes, err error) { db.modeMtx.RLock() defer db.modeMtx.RUnlock() + currEpoch := db.epochState.CurrentEpoch() + err = db.boltDB.View(func(tx *bbolt.Tx) error { - res.exists, err = db.exists(tx, prm.addr) + res.exists, err = db.exists(tx, prm.addr, currEpoch) return err }) @@ -50,9 +56,9 @@ func (db *DB) Exists(prm ExistsPrm) (res ExistsRes, err error) { return } -func (db *DB) exists(tx *bbolt.Tx, addr oid.Address) (exists bool, err error) { - // check graveyard first - switch inGraveyard(tx, addr) { +func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists bool, err error) { + // check graveyard and object expiration first + switch objectStatus(tx, addr, currEpoch) { case 1: var errNotFound apistatus.ObjectNotFound @@ -61,6 +67,8 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address) (exists bool, err error) { var errRemoved apistatus.ObjectAlreadyRemoved return false, errRemoved + case 3: + return false, object.ErrObjectIsExpired } objKey := objectKey(addr.Object()) @@ -86,11 +94,36 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address) (exists bool, err error) { return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, nil } -// inGraveyard returns: -// * 0 if object is not marked for deletion; +// objectStatus returns: +// * 0 if object is available; // * 1 if object with GC mark; -// * 2 if object is covered with tombstone. -func inGraveyard(tx *bbolt.Tx, addr oid.Address) uint8 { +// * 2 if object is covered with tombstone; +// * 3 if object is expired. +func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) uint8 { + // we check only if the object is expired in the current + // epoch since it is considered the only corner case: the + // GC is expected to collect all the objects that have + // expired previously for less than the one epoch duration + + rawOID := []byte(addr.Object().EncodeToString()) + var expired bool + + // bucket with objects that have expiration attr + expirationBucket := tx.Bucket(attributeBucketName(addr.Container(), objectV2.SysAttributeExpEpoch)) + if expirationBucket != nil { + // bucket that contains objects that expire in the current epoch + currEpochBkt := expirationBucket.Bucket([]byte(strconv.FormatUint(currEpoch, 10))) + if currEpochBkt != nil { + if currEpochBkt.Get(rawOID) != nil { + expired = true + } + } + } + + if expired { + return 3 + } + graveyardBkt := tx.Bucket(graveyardBucketName) garbageBkt := tx.Bucket(garbageBucketName) addrKey := addressKey(addr) diff --git a/pkg/local_object_storage/metabase/exists_test.go b/pkg/local_object_storage/metabase/exists_test.go index 155d04be9a..93afd977cc 100644 --- a/pkg/local_object_storage/metabase/exists_test.go +++ b/pkg/local_object_storage/metabase/exists_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/nspcc-dev/neofs-node/pkg/core/object" + meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" @@ -12,8 +13,10 @@ import ( "github.com/stretchr/testify/require" ) +const currEpoch = 1000 + func TestDB_Exists(t *testing.T) { - db := newDB(t) + db := newDB(t, meta.WithEpochState(epochState{currEpoch})) t.Run("no object", func(t *testing.T) { nonExist := generateObject(t) @@ -171,4 +174,15 @@ func TestDB_Exists(t *testing.T) { require.NoError(t, err) require.False(t, exists) }) + + t.Run("expired object", func(t *testing.T) { + checkExpiredObjects(t, db, func(exp, nonExp *objectSDK.Object) { + gotObj, err := metaExists(db, object.AddressOf(exp)) + require.False(t, gotObj) + require.ErrorIs(t, err, object.ErrObjectIsExpired) + + gotObj, err = metaExists(db, object.AddressOf(nonExp)) + require.True(t, gotObj) + }) + }) } diff --git a/pkg/local_object_storage/metabase/get.go b/pkg/local_object_storage/metabase/get.go index 60ca44ed93..2b67823dc2 100644 --- a/pkg/local_object_storage/metabase/get.go +++ b/pkg/local_object_storage/metabase/get.go @@ -3,6 +3,7 @@ package meta import ( "fmt" + "github.com/nspcc-dev/neofs-node/pkg/core/object" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" @@ -44,12 +45,15 @@ func (r GetRes) Header() *objectSDK.Object { // // Returns an error of type apistatus.ObjectNotFound if object is missing in DB. // Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard. +// Returns the object.ErrObjectIsExpired if the object is presented but already expired. func (db *DB) Get(prm GetPrm) (res GetRes, err error) { db.modeMtx.Lock() defer db.modeMtx.Unlock() + currEpoch := db.epochState.CurrentEpoch() + err = db.boltDB.View(func(tx *bbolt.Tx) error { - res.hdr, err = db.get(tx, prm.addr, true, prm.raw) + res.hdr, err = db.get(tx, prm.addr, true, prm.raw, currEpoch) return err }) @@ -57,11 +61,11 @@ func (db *DB) Get(prm GetPrm) (res GetRes, err error) { return } -func (db *DB) get(tx *bbolt.Tx, addr oid.Address, checkGraveyard, raw bool) (*objectSDK.Object, error) { +func (db *DB) get(tx *bbolt.Tx, addr oid.Address, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) { key := objectKey(addr.Object()) - if checkGraveyard { - switch inGraveyard(tx, addr) { + if checkStatus { + switch objectStatus(tx, addr, currEpoch) { case 1: var errNotFound apistatus.ObjectNotFound @@ -70,6 +74,8 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, checkGraveyard, raw bool) (*ob var errRemoved apistatus.ObjectAlreadyRemoved return nil, errRemoved + case 3: + return nil, object.ErrObjectIsExpired } } diff --git a/pkg/local_object_storage/metabase/get_test.go b/pkg/local_object_storage/metabase/get_test.go index 633354cfbf..a645a0b687 100644 --- a/pkg/local_object_storage/metabase/get_test.go +++ b/pkg/local_object_storage/metabase/get_test.go @@ -17,7 +17,7 @@ import ( ) func TestDB_Get(t *testing.T) { - db := newDB(t) + db := newDB(t, meta.WithEpochState(epochState{currEpoch})) raw := generateObject(t) @@ -135,6 +135,18 @@ func TestDB_Get(t *testing.T) { _, err = metaGet(db, obj, false) require.ErrorAs(t, err, new(apistatus.ObjectNotFound)) }) + + t.Run("expired object", func(t *testing.T) { + checkExpiredObjects(t, db, func(exp, nonExp *objectSDK.Object) { + gotExp, err := metaGet(db, object.AddressOf(exp), false) + require.Nil(t, gotExp) + require.ErrorIs(t, err, object.ErrObjectIsExpired) + + gotNonExp, err := metaGet(db, object.AddressOf(nonExp), false) + require.NoError(t, err) + require.True(t, binaryEqual(gotNonExp, nonExp.CutPayload())) + }) + }) } // binary equal is used when object contains empty lists in the structure and diff --git a/pkg/local_object_storage/metabase/inhume.go b/pkg/local_object_storage/metabase/inhume.go index 23454528ff..f03f660bfd 100644 --- a/pkg/local_object_storage/metabase/inhume.go +++ b/pkg/local_object_storage/metabase/inhume.go @@ -85,6 +85,8 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) { db.modeMtx.RLock() defer db.modeMtx.RUnlock() + currEpoch := db.epochState.CurrentEpoch() + err = db.boltDB.Update(func(tx *bbolt.Tx) error { garbageBKT := tx.Bucket(garbageBucketName) @@ -142,7 +144,7 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) { lockWasChecked = true } - obj, err := db.get(tx, prm.target[i], false, true) + obj, err := db.get(tx, prm.target[i], false, true, currEpoch) // if object is stored and it is regular object then update bucket // with container size estimations diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index 5f4c204a60..f04250a064 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -53,12 +53,15 @@ var ( // Big objects have nil blobovniczaID. // // Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard. +// Returns the object.ErrObjectIsExpired if the object is presented but already expired. func (db *DB) Put(prm PutPrm) (res PutRes, err error) { db.modeMtx.RLock() defer db.modeMtx.RUnlock() + currEpoch := db.epochState.CurrentEpoch() + err = db.boltDB.Batch(func(tx *bbolt.Tx) error { - return db.put(tx, prm.obj, prm.id, nil) + return db.put(tx, prm.obj, prm.id, nil, currEpoch) }) if err == nil { storagelog.Write(db.log, @@ -69,7 +72,9 @@ func (db *DB) Put(prm PutPrm) (res PutRes, err error) { return } -func (db *DB) put(tx *bbolt.Tx, obj *objectSDK.Object, id *blobovnicza.ID, si *objectSDK.SplitInfo) error { +func (db *DB) put( + tx *bbolt.Tx, obj *objectSDK.Object, id *blobovnicza.ID, + si *objectSDK.SplitInfo, currEpoch uint64) error { cnr, ok := obj.ContainerID() if !ok { return errors.New("missing container in object") @@ -77,7 +82,7 @@ func (db *DB) put(tx *bbolt.Tx, obj *objectSDK.Object, id *blobovnicza.ID, si *o isParent := si != nil - exists, err := db.exists(tx, object.AddressOf(obj)) + exists, err := db.exists(tx, object.AddressOf(obj), currEpoch) if errors.As(err, &splitInfoError) { exists = true // object exists, however it is virtual @@ -111,7 +116,7 @@ func (db *DB) put(tx *bbolt.Tx, obj *objectSDK.Object, id *blobovnicza.ID, si *o return err } - err = db.put(tx, par, id, parentSI) + err = db.put(tx, par, id, parentSI, currEpoch) if err != nil { return err } diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index 57ea443101..a8b40b1523 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -63,14 +63,16 @@ func (db *DB) Select(prm SelectPrm) (res SelectRes, err error) { return res, nil } + currEpoch := db.epochState.CurrentEpoch() + return res, db.boltDB.View(func(tx *bbolt.Tx) error { - res.addrList, err = db.selectObjects(tx, prm.cnr, prm.filters) + res.addrList, err = db.selectObjects(tx, prm.cnr, prm.filters, currEpoch) return err }) } -func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters) ([]oid.Address, error) { +func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters, currEpoch uint64) ([]oid.Address, error) { group, err := groupFilters(fs) if err != nil { return nil, err @@ -112,11 +114,11 @@ func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters) ( return nil, err } - if inGraveyard(tx, addr) > 0 { + if objectStatus(tx, addr, currEpoch) > 0 { continue // ignore removed objects } - if !db.matchSlowFilters(tx, addr, group.slowFilters) { + if !db.matchSlowFilters(tx, addr, group.slowFilters, currEpoch) { continue // ignore objects with unmatched slow filters } @@ -163,10 +165,11 @@ func (db *DB) selectFastFilter( fNum int, // index of filter ) { prefix := cnr.EncodeToString() + "/" + currEpoch := db.epochState.CurrentEpoch() switch f.Header() { case v2object.FilterHeaderObjectID: - db.selectObjectID(tx, f, cnr, to, fNum) + db.selectObjectID(tx, f, cnr, to, fNum, currEpoch) case v2object.FilterHeaderOwnerID: bucketName := ownerBucketName(cnr) db.selectFromFKBT(tx, bucketName, f, prefix, to, fNum) @@ -407,6 +410,7 @@ func (db *DB) selectObjectID( cnr cid.ID, to map[string]int, // resulting cache fNum int, // index of filter + currEpoch uint64, ) { prefix := cnr.EncodeToString() + "/" @@ -423,7 +427,7 @@ func (db *DB) selectObjectID( return } - ok, err := db.exists(tx, addr) + ok, err := db.exists(tx, addr, currEpoch) if (err == nil && ok) || errors.As(err, &splitInfoError) { markAddressInCache(to, fNum, addrStr) } @@ -463,12 +467,12 @@ func (db *DB) selectObjectID( } // matchSlowFilters return true if object header is matched by all slow filters. -func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f object.SearchFilters) bool { +func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f object.SearchFilters, currEpoch uint64) bool { if len(f) == 0 { return true } - obj, err := db.get(tx, addr, true, false) + obj, err := db.get(tx, addr, true, false, currEpoch) if err != nil { return false } diff --git a/pkg/local_object_storage/metabase/select_test.go b/pkg/local_object_storage/metabase/select_test.go index 710758ba59..295f39c620 100644 --- a/pkg/local_object_storage/metabase/select_test.go +++ b/pkg/local_object_storage/metabase/select_test.go @@ -805,6 +805,23 @@ func BenchmarkSelect(b *testing.B) { }) } +func TestExpiredObjects(t *testing.T) { + db := newDB(t, meta.WithEpochState(epochState{currEpoch})) + + checkExpiredObjects(t, db, func(exp, nonExp *objectSDK.Object) { + cidExp, _ := exp.ContainerID() + cidNonExp, _ := nonExp.ContainerID() + + objs, err := metaSelect(db, cidExp, objectSDK.SearchFilters{}) + require.NoError(t, err) + require.Empty(t, objs) // expired object should not be returned + + objs, err = metaSelect(db, cidNonExp, objectSDK.SearchFilters{}) + require.NoError(t, err) + require.NotEmpty(t, objs) + }) +} + func benchmarkSelect(b *testing.B, db *meta.DB, cid cidSDK.ID, fs objectSDK.SearchFilters, expected int) { var prm meta.SelectPrm prm.SetContainerID(cid) diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index fe5c115f41..4e27b9b4ac 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -1,6 +1,7 @@ package shard import ( + "errors" "fmt" "github.com/nspcc-dev/neofs-node/pkg/core/object" @@ -208,7 +209,7 @@ func (s *Shard) refillMetabase() error { mPrm.SetBlobovniczaID(blzID) _, err := s.metaBase.Put(mPrm) - if err != nil && !meta.IsErrRemoved(err) { + if err != nil && !meta.IsErrRemoved(err) && !errors.Is(err, object.ErrObjectIsExpired) { return err } diff --git a/pkg/local_object_storage/shard/errors.go b/pkg/local_object_storage/shard/errors.go index 5ab3ecdf84..3aa3ddc83a 100644 --- a/pkg/local_object_storage/shard/errors.go +++ b/pkg/local_object_storage/shard/errors.go @@ -3,6 +3,7 @@ package shard import ( "errors" + "github.com/nspcc-dev/neofs-node/pkg/core/object" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" ) @@ -23,3 +24,9 @@ func IsErrRemoved(err error) bool { func IsErrOutOfRange(err error) bool { return errors.As(err, new(apistatus.ObjectOutOfRange)) } + +// IsErrObjectExpired checks if an error returned by Shard corresponds to +// expired object. +func IsErrObjectExpired(err error) bool { + return errors.Is(err, object.ErrObjectIsExpired) +} diff --git a/pkg/local_object_storage/shard/exists.go b/pkg/local_object_storage/shard/exists.go index dd7594a85d..e8a6cfe53e 100644 --- a/pkg/local_object_storage/shard/exists.go +++ b/pkg/local_object_storage/shard/exists.go @@ -32,6 +32,7 @@ func (p ExistsRes) Exists() bool { // unambiguously determine the presence of an object. // // Returns an error of type apistatus.ObjectAlreadyRemoved if object has been marked as removed. +// Returns the object.ErrObjectIsExpired if the object is presented but already expired. func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) { var exists bool var err error diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index 70ce128c44..ed78e6e829 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -59,6 +59,7 @@ func (r GetRes) HasMeta() bool { // // Returns an error of type apistatus.ObjectNotFound if the requested object is missing in shard. // Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard. +// Returns the object.ErrObjectIsExpired if the object is presented but already expired. func (s *Shard) Get(prm GetPrm) (GetRes, error) { var big, small storFetcher diff --git a/pkg/local_object_storage/shard/head.go b/pkg/local_object_storage/shard/head.go index 80d85df19d..60e1db4d58 100644 --- a/pkg/local_object_storage/shard/head.go +++ b/pkg/local_object_storage/shard/head.go @@ -45,6 +45,7 @@ func (r HeadRes) Object() *objectSDK.Object { // // Returns an error of type apistatus.ObjectNotFound if object is missing in Shard. // Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard. +// Returns the object.ErrObjectIsExpired if the object is presented but already expired. func (s *Shard) Head(prm HeadPrm) (HeadRes, error) { // object can be saved in write-cache (if enabled) or in metabase diff --git a/pkg/local_object_storage/shard/range.go b/pkg/local_object_storage/shard/range.go index e74e838c2b..bc59f01eb9 100644 --- a/pkg/local_object_storage/shard/range.go +++ b/pkg/local_object_storage/shard/range.go @@ -64,6 +64,7 @@ func (r RngRes) HasMeta() bool { // Returns ErrRangeOutOfBounds if the requested object range is out of bounds. // Returns an error of type apistatus.ObjectNotFound if the requested object is missing. // Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard. +// Returns the object.ErrObjectIsExpired if the object is presented but already expired. func (s *Shard) GetRange(prm RngPrm) (RngRes, error) { var big, small storFetcher diff --git a/pkg/local_object_storage/shard/restore.go b/pkg/local_object_storage/shard/restore.go index ae37b159c3..e42cfa408e 100644 --- a/pkg/local_object_storage/shard/restore.go +++ b/pkg/local_object_storage/shard/restore.go @@ -122,7 +122,7 @@ func (s *Shard) Restore(prm RestorePrm) (RestoreRes, error) { putPrm.SetObject(obj) _, err = s.Put(putPrm) - if err != nil { + if err != nil && !IsErrObjectExpired(err) && !IsErrRemoved(err) { return RestoreRes{}, err } diff --git a/pkg/local_object_storage/writecache/init.go b/pkg/local_object_storage/writecache/init.go index d089f5d612..df706346ac 100644 --- a/pkg/local_object_storage/writecache/init.go +++ b/pkg/local_object_storage/writecache/init.go @@ -1,9 +1,13 @@ package writecache import ( + "errors" + + "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "go.etcd.io/bbolt" ) @@ -63,7 +67,11 @@ func (c *cache) isFlushed(addr oid.Address) bool { existsPrm.SetAddress(addr) mRes, err := c.metabase.Exists(existsPrm) - if err != nil || !mRes.Exists() { + if err != nil { + return errors.Is(err, object.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved)) + } + + if !mRes.Exists() { return false }