[#1634] node: Do not return expired objects
If an object has not been marked for removal by the GC in the current epoch yet but has already expired, respond with `ErrObjectNotFound` api status. Also, optimize shard iteration: a node must stop any iteration if the object is found but gonna be removed soon. All the checks are performed by the Metabase. Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
9aba0ba512
commit
156ba85326
28 changed files with 230 additions and 36 deletions
|
@ -12,6 +12,7 @@ Changelog for NeoFS Node
|
|||
### Fixed
|
||||
|
||||
- Losing request context in eACL response checks (#1595)
|
||||
- Do not return expired objects that have not been handled by the GC yet (#1634)
|
||||
- Setting CID field in `neofs-cli acl extended create` (#1650)
|
||||
- `neofs-ir` no longer hangs if it cannot bind to the control endpoint (#1643)
|
||||
- Do not require `lifetime` flag in `session create` CLI command (#1655)
|
||||
|
|
8
pkg/core/object/errors.go
Normal file
8
pkg/core/object/errors.go
Normal file
|
@ -0,0 +1,8 @@
|
|||
package object
|
||||
|
||||
import "errors"
|
||||
|
||||
// ErrObjectIsExpired is returned when the requested object's
|
||||
// epoch is less than the current one. Such objects are considered
|
||||
// as removed and should not be returned from the Storage Engine.
|
||||
var ErrObjectIsExpired = errors.New("object is expired")
|
|
@ -74,7 +74,7 @@ func (e *StorageEngine) delete(prm DeletePrm) (DeleteRes, error) {
|
|||
resExists, err := sh.Exists(existsPrm)
|
||||
if err != nil {
|
||||
_, ok := err.(*objectSDK.SplitInfoError)
|
||||
if ok || shard.IsErrRemoved(err) {
|
||||
if ok || shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) {
|
||||
return true
|
||||
}
|
||||
if !shard.IsErrNotFound(err) {
|
||||
|
|
|
@ -26,6 +26,11 @@ func (e *StorageEngine) exists(addr oid.Address) (bool, error) {
|
|||
if ok {
|
||||
return true
|
||||
}
|
||||
|
||||
if shard.IsErrObjectExpired(err) {
|
||||
return true
|
||||
}
|
||||
|
||||
if !shard.IsErrNotFound(err) {
|
||||
e.reportShardError(sh, "could not check existence of object in shard", err)
|
||||
}
|
||||
|
|
|
@ -113,6 +113,11 @@ func (e *StorageEngine) get(prm GetPrm) (GetRes, error) {
|
|||
outError = err
|
||||
|
||||
return true // stop, return it back
|
||||
case shard.IsErrObjectExpired(err):
|
||||
// object is found but should not
|
||||
// be returned
|
||||
outError = errNotFound
|
||||
return true
|
||||
default:
|
||||
e.reportShardError(sh, "could not get object from shard", err)
|
||||
return false
|
||||
|
|
|
@ -111,6 +111,14 @@ func (e *StorageEngine) head(prm HeadPrm) (HeadRes, error) {
|
|||
outError = err
|
||||
|
||||
return true // stop, return it back
|
||||
case shard.IsErrObjectExpired(err):
|
||||
var notFoundErr apistatus.ObjectNotFound
|
||||
|
||||
// object is found but should not
|
||||
// be returned
|
||||
outError = notFoundErr
|
||||
|
||||
return true
|
||||
default:
|
||||
e.reportShardError(sh, "could not head object from shard", err)
|
||||
return false
|
||||
|
|
|
@ -132,7 +132,7 @@ func (e *StorageEngine) inhumeAddr(addr oid.Address, prm shard.InhumePrm, checkE
|
|||
existPrm.SetAddress(addr)
|
||||
exRes, err := sh.Exists(existPrm)
|
||||
if err != nil {
|
||||
if shard.IsErrRemoved(err) {
|
||||
if shard.IsErrRemoved(err) || shard.IsErrObjectExpired(err) {
|
||||
// inhumed once - no need to be inhumed again
|
||||
status = 3
|
||||
return true
|
||||
|
|
|
@ -72,6 +72,12 @@ func (e *StorageEngine) lockSingle(idCnr cid.ID, locker, locked oid.ID, checkExi
|
|||
if err != nil {
|
||||
var siErr *objectSDK.SplitInfoError
|
||||
if !errors.As(err, &siErr) {
|
||||
if shard.IsErrObjectExpired(err) {
|
||||
// object is already expired =>
|
||||
// do not lock it
|
||||
return true
|
||||
}
|
||||
|
||||
e.reportShardError(sh, "could not check locked object for presence in shard", err)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -76,6 +76,12 @@ func (e *StorageEngine) put(prm PutPrm) (PutRes, error) {
|
|||
|
||||
exists, err := sh.Exists(existPrm)
|
||||
if err != nil {
|
||||
if shard.IsErrObjectExpired(err) {
|
||||
// object is already found but
|
||||
// expired => do nothing with it
|
||||
finished = true
|
||||
}
|
||||
|
||||
return // this is not ErrAlreadyRemoved error so we can go to the next shard
|
||||
}
|
||||
|
||||
|
|
|
@ -3,14 +3,17 @@ package meta_test
|
|||
import (
|
||||
"math"
|
||||
"os"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/checksum"
|
||||
checksumtest "github.com/nspcc-dev/neofs-sdk-go/checksum/test"
|
||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
|
||||
usertest "github.com/nspcc-dev/neofs-sdk-go/user/test"
|
||||
|
@ -19,9 +22,13 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type epochState struct{}
|
||||
type epochState struct{ e uint64 }
|
||||
|
||||
func (s epochState) CurrentEpoch() uint64 {
|
||||
if s.e != 0 {
|
||||
return s.e
|
||||
}
|
||||
|
||||
return math.MaxUint64
|
||||
}
|
||||
|
||||
|
@ -97,3 +104,26 @@ func addAttribute(obj *object.Object, key, val string) {
|
|||
attrs = append(attrs, attr)
|
||||
obj.SetAttributes(attrs...)
|
||||
}
|
||||
|
||||
func checkExpiredObjects(t *testing.T, db *meta.DB, f func(exp, nonExp *objectSDK.Object)) {
|
||||
expObj := generateObject(t)
|
||||
setExpiration(expObj, currEpoch)
|
||||
|
||||
require.NoError(t, metaPut(db, expObj, nil))
|
||||
|
||||
nonExpObj := generateObject(t)
|
||||
setExpiration(nonExpObj, currEpoch+1)
|
||||
|
||||
require.NoError(t, metaPut(db, nonExpObj, nil))
|
||||
|
||||
f(expObj, nonExpObj)
|
||||
}
|
||||
|
||||
func setExpiration(o *objectSDK.Object, epoch uint64) {
|
||||
var attr objectSDK.Attribute
|
||||
|
||||
attr.SetKey(objectV2.SysAttributeExpEpoch)
|
||||
attr.SetValue(strconv.FormatUint(epoch, 10))
|
||||
|
||||
o.SetAttributes(append(o.Attributes(), attr)...)
|
||||
}
|
||||
|
|
|
@ -58,9 +58,10 @@ func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) {
|
|||
|
||||
func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) error {
|
||||
refCounter := make(referenceCounter, len(addrs))
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
for i := range addrs {
|
||||
err := db.delete(tx, addrs[i], refCounter)
|
||||
err := db.delete(tx, addrs[i], refCounter, currEpoch)
|
||||
if err != nil {
|
||||
return err // maybe log and continue?
|
||||
}
|
||||
|
@ -78,7 +79,7 @@ func (db *DB) deleteGroup(tx *bbolt.Tx, addrs []oid.Address) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter) error {
|
||||
func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter, currEpoch uint64) error {
|
||||
// remove record from the garbage bucket
|
||||
garbageBKT := tx.Bucket(garbageBucketName)
|
||||
if garbageBKT != nil {
|
||||
|
@ -89,7 +90,7 @@ func (db *DB) delete(tx *bbolt.Tx, addr oid.Address, refCounter referenceCounter
|
|||
}
|
||||
|
||||
// unmarshal object, work only with physically stored (raw == true) objects
|
||||
obj, err := db.get(tx, addr, false, true)
|
||||
obj, err := db.get(tx, addr, false, true, currEpoch)
|
||||
if err != nil {
|
||||
if errors.As(err, new(apistatus.ObjectNotFound)) {
|
||||
return nil
|
||||
|
|
|
@ -118,6 +118,17 @@ func TestGraveOnlyDelete(t *testing.T) {
|
|||
require.NoError(t, metaDelete(db, addr))
|
||||
}
|
||||
|
||||
func TestExpiredObject(t *testing.T) {
|
||||
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
|
||||
|
||||
checkExpiredObjects(t, db, func(exp, nonExp *objectSDK.Object) {
|
||||
// removing expired object should be error-free
|
||||
require.NoError(t, metaDelete(db, object.AddressOf(exp)))
|
||||
|
||||
require.NoError(t, metaDelete(db, object.AddressOf(nonExp)))
|
||||
})
|
||||
}
|
||||
|
||||
func metaDelete(db *meta.DB, addrs ...oid.Address) error {
|
||||
var deletePrm meta.DeletePrm
|
||||
deletePrm.SetAddresses(addrs...)
|
||||
|
|
|
@ -3,7 +3,10 @@ package meta
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
|
@ -37,12 +40,15 @@ func (p ExistsRes) Exists() bool {
|
|||
// returns true if addr is in primary index or false if it is not.
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard.
|
||||
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
|
||||
func (db *DB) Exists(prm ExistsPrm) (res ExistsRes, err error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
res.exists, err = db.exists(tx, prm.addr)
|
||||
res.exists, err = db.exists(tx, prm.addr, currEpoch)
|
||||
|
||||
return err
|
||||
})
|
||||
|
@ -50,9 +56,9 @@ func (db *DB) Exists(prm ExistsPrm) (res ExistsRes, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address) (exists bool, err error) {
|
||||
// check graveyard first
|
||||
switch inGraveyard(tx, addr) {
|
||||
func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists bool, err error) {
|
||||
// check graveyard and object expiration first
|
||||
switch objectStatus(tx, addr, currEpoch) {
|
||||
case 1:
|
||||
var errNotFound apistatus.ObjectNotFound
|
||||
|
||||
|
@ -61,6 +67,8 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address) (exists bool, err error) {
|
|||
var errRemoved apistatus.ObjectAlreadyRemoved
|
||||
|
||||
return false, errRemoved
|
||||
case 3:
|
||||
return false, object.ErrObjectIsExpired
|
||||
}
|
||||
|
||||
objKey := objectKey(addr.Object())
|
||||
|
@ -86,11 +94,36 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address) (exists bool, err error) {
|
|||
return firstIrregularObjectType(tx, cnr, objKey) != objectSDK.TypeRegular, nil
|
||||
}
|
||||
|
||||
// inGraveyard returns:
|
||||
// * 0 if object is not marked for deletion;
|
||||
// objectStatus returns:
|
||||
// * 0 if object is available;
|
||||
// * 1 if object with GC mark;
|
||||
// * 2 if object is covered with tombstone.
|
||||
func inGraveyard(tx *bbolt.Tx, addr oid.Address) uint8 {
|
||||
// * 2 if object is covered with tombstone;
|
||||
// * 3 if object is expired.
|
||||
func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) uint8 {
|
||||
// we check only if the object is expired in the current
|
||||
// epoch since it is considered the only corner case: the
|
||||
// GC is expected to collect all the objects that have
|
||||
// expired previously for less than the one epoch duration
|
||||
|
||||
rawOID := []byte(addr.Object().EncodeToString())
|
||||
var expired bool
|
||||
|
||||
// bucket with objects that have expiration attr
|
||||
expirationBucket := tx.Bucket(attributeBucketName(addr.Container(), objectV2.SysAttributeExpEpoch))
|
||||
if expirationBucket != nil {
|
||||
// bucket that contains objects that expire in the current epoch
|
||||
currEpochBkt := expirationBucket.Bucket([]byte(strconv.FormatUint(currEpoch, 10)))
|
||||
if currEpochBkt != nil {
|
||||
if currEpochBkt.Get(rawOID) != nil {
|
||||
expired = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if expired {
|
||||
return 3
|
||||
}
|
||||
|
||||
graveyardBkt := tx.Bucket(graveyardBucketName)
|
||||
garbageBkt := tx.Bucket(garbageBucketName)
|
||||
addrKey := addressKey(addr)
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
|
@ -12,8 +13,10 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const currEpoch = 1000
|
||||
|
||||
func TestDB_Exists(t *testing.T) {
|
||||
db := newDB(t)
|
||||
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
|
||||
|
||||
t.Run("no object", func(t *testing.T) {
|
||||
nonExist := generateObject(t)
|
||||
|
@ -171,4 +174,15 @@ func TestDB_Exists(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
require.False(t, exists)
|
||||
})
|
||||
|
||||
t.Run("expired object", func(t *testing.T) {
|
||||
checkExpiredObjects(t, db, func(exp, nonExp *objectSDK.Object) {
|
||||
gotObj, err := metaExists(db, object.AddressOf(exp))
|
||||
require.False(t, gotObj)
|
||||
require.ErrorIs(t, err, object.ErrObjectIsExpired)
|
||||
|
||||
gotObj, err = metaExists(db, object.AddressOf(nonExp))
|
||||
require.True(t, gotObj)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package meta
|
|||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
|
||||
|
@ -44,12 +45,15 @@ func (r GetRes) Header() *objectSDK.Object {
|
|||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if object is missing in DB.
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard.
|
||||
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
|
||||
func (db *DB) Get(prm GetPrm) (res GetRes, err error) {
|
||||
db.modeMtx.Lock()
|
||||
defer db.modeMtx.Unlock()
|
||||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
res.hdr, err = db.get(tx, prm.addr, true, prm.raw)
|
||||
res.hdr, err = db.get(tx, prm.addr, true, prm.raw, currEpoch)
|
||||
|
||||
return err
|
||||
})
|
||||
|
@ -57,11 +61,11 @@ func (db *DB) Get(prm GetPrm) (res GetRes, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (db *DB) get(tx *bbolt.Tx, addr oid.Address, checkGraveyard, raw bool) (*objectSDK.Object, error) {
|
||||
func (db *DB) get(tx *bbolt.Tx, addr oid.Address, checkStatus, raw bool, currEpoch uint64) (*objectSDK.Object, error) {
|
||||
key := objectKey(addr.Object())
|
||||
|
||||
if checkGraveyard {
|
||||
switch inGraveyard(tx, addr) {
|
||||
if checkStatus {
|
||||
switch objectStatus(tx, addr, currEpoch) {
|
||||
case 1:
|
||||
var errNotFound apistatus.ObjectNotFound
|
||||
|
||||
|
@ -70,6 +74,8 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, checkGraveyard, raw bool) (*ob
|
|||
var errRemoved apistatus.ObjectAlreadyRemoved
|
||||
|
||||
return nil, errRemoved
|
||||
case 3:
|
||||
return nil, object.ErrObjectIsExpired
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@ import (
|
|||
)
|
||||
|
||||
func TestDB_Get(t *testing.T) {
|
||||
db := newDB(t)
|
||||
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
|
||||
|
||||
raw := generateObject(t)
|
||||
|
||||
|
@ -135,6 +135,18 @@ func TestDB_Get(t *testing.T) {
|
|||
_, err = metaGet(db, obj, false)
|
||||
require.ErrorAs(t, err, new(apistatus.ObjectNotFound))
|
||||
})
|
||||
|
||||
t.Run("expired object", func(t *testing.T) {
|
||||
checkExpiredObjects(t, db, func(exp, nonExp *objectSDK.Object) {
|
||||
gotExp, err := metaGet(db, object.AddressOf(exp), false)
|
||||
require.Nil(t, gotExp)
|
||||
require.ErrorIs(t, err, object.ErrObjectIsExpired)
|
||||
|
||||
gotNonExp, err := metaGet(db, object.AddressOf(nonExp), false)
|
||||
require.NoError(t, err)
|
||||
require.True(t, binaryEqual(gotNonExp, nonExp.CutPayload()))
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
// binary equal is used when object contains empty lists in the structure and
|
||||
|
|
|
@ -85,6 +85,8 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) {
|
|||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
garbageBKT := tx.Bucket(garbageBucketName)
|
||||
|
||||
|
@ -142,7 +144,7 @@ func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) {
|
|||
lockWasChecked = true
|
||||
}
|
||||
|
||||
obj, err := db.get(tx, prm.target[i], false, true)
|
||||
obj, err := db.get(tx, prm.target[i], false, true, currEpoch)
|
||||
|
||||
// if object is stored and it is regular object then update bucket
|
||||
// with container size estimations
|
||||
|
|
|
@ -53,12 +53,15 @@ var (
|
|||
// Big objects have nil blobovniczaID.
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard.
|
||||
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
|
||||
func (db *DB) Put(prm PutPrm) (res PutRes, err error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
return db.put(tx, prm.obj, prm.id, nil)
|
||||
return db.put(tx, prm.obj, prm.id, nil, currEpoch)
|
||||
})
|
||||
if err == nil {
|
||||
storagelog.Write(db.log,
|
||||
|
@ -69,7 +72,9 @@ func (db *DB) Put(prm PutPrm) (res PutRes, err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (db *DB) put(tx *bbolt.Tx, obj *objectSDK.Object, id *blobovnicza.ID, si *objectSDK.SplitInfo) error {
|
||||
func (db *DB) put(
|
||||
tx *bbolt.Tx, obj *objectSDK.Object, id *blobovnicza.ID,
|
||||
si *objectSDK.SplitInfo, currEpoch uint64) error {
|
||||
cnr, ok := obj.ContainerID()
|
||||
if !ok {
|
||||
return errors.New("missing container in object")
|
||||
|
@ -77,7 +82,7 @@ func (db *DB) put(tx *bbolt.Tx, obj *objectSDK.Object, id *blobovnicza.ID, si *o
|
|||
|
||||
isParent := si != nil
|
||||
|
||||
exists, err := db.exists(tx, object.AddressOf(obj))
|
||||
exists, err := db.exists(tx, object.AddressOf(obj), currEpoch)
|
||||
|
||||
if errors.As(err, &splitInfoError) {
|
||||
exists = true // object exists, however it is virtual
|
||||
|
@ -111,7 +116,7 @@ func (db *DB) put(tx *bbolt.Tx, obj *objectSDK.Object, id *blobovnicza.ID, si *o
|
|||
return err
|
||||
}
|
||||
|
||||
err = db.put(tx, par, id, parentSI)
|
||||
err = db.put(tx, par, id, parentSI, currEpoch)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -63,14 +63,16 @@ func (db *DB) Select(prm SelectPrm) (res SelectRes, err error) {
|
|||
return res, nil
|
||||
}
|
||||
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
return res, db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
res.addrList, err = db.selectObjects(tx, prm.cnr, prm.filters)
|
||||
res.addrList, err = db.selectObjects(tx, prm.cnr, prm.filters, currEpoch)
|
||||
|
||||
return err
|
||||
})
|
||||
}
|
||||
|
||||
func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters) ([]oid.Address, error) {
|
||||
func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters, currEpoch uint64) ([]oid.Address, error) {
|
||||
group, err := groupFilters(fs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -112,11 +114,11 @@ func (db *DB) selectObjects(tx *bbolt.Tx, cnr cid.ID, fs object.SearchFilters) (
|
|||
return nil, err
|
||||
}
|
||||
|
||||
if inGraveyard(tx, addr) > 0 {
|
||||
if objectStatus(tx, addr, currEpoch) > 0 {
|
||||
continue // ignore removed objects
|
||||
}
|
||||
|
||||
if !db.matchSlowFilters(tx, addr, group.slowFilters) {
|
||||
if !db.matchSlowFilters(tx, addr, group.slowFilters, currEpoch) {
|
||||
continue // ignore objects with unmatched slow filters
|
||||
}
|
||||
|
||||
|
@ -163,10 +165,11 @@ func (db *DB) selectFastFilter(
|
|||
fNum int, // index of filter
|
||||
) {
|
||||
prefix := cnr.EncodeToString() + "/"
|
||||
currEpoch := db.epochState.CurrentEpoch()
|
||||
|
||||
switch f.Header() {
|
||||
case v2object.FilterHeaderObjectID:
|
||||
db.selectObjectID(tx, f, cnr, to, fNum)
|
||||
db.selectObjectID(tx, f, cnr, to, fNum, currEpoch)
|
||||
case v2object.FilterHeaderOwnerID:
|
||||
bucketName := ownerBucketName(cnr)
|
||||
db.selectFromFKBT(tx, bucketName, f, prefix, to, fNum)
|
||||
|
@ -407,6 +410,7 @@ func (db *DB) selectObjectID(
|
|||
cnr cid.ID,
|
||||
to map[string]int, // resulting cache
|
||||
fNum int, // index of filter
|
||||
currEpoch uint64,
|
||||
) {
|
||||
prefix := cnr.EncodeToString() + "/"
|
||||
|
||||
|
@ -423,7 +427,7 @@ func (db *DB) selectObjectID(
|
|||
return
|
||||
}
|
||||
|
||||
ok, err := db.exists(tx, addr)
|
||||
ok, err := db.exists(tx, addr, currEpoch)
|
||||
if (err == nil && ok) || errors.As(err, &splitInfoError) {
|
||||
markAddressInCache(to, fNum, addrStr)
|
||||
}
|
||||
|
@ -463,12 +467,12 @@ func (db *DB) selectObjectID(
|
|||
}
|
||||
|
||||
// matchSlowFilters return true if object header is matched by all slow filters.
|
||||
func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f object.SearchFilters) bool {
|
||||
func (db *DB) matchSlowFilters(tx *bbolt.Tx, addr oid.Address, f object.SearchFilters, currEpoch uint64) bool {
|
||||
if len(f) == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
obj, err := db.get(tx, addr, true, false)
|
||||
obj, err := db.get(tx, addr, true, false, currEpoch)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -805,6 +805,23 @@ func BenchmarkSelect(b *testing.B) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestExpiredObjects(t *testing.T) {
|
||||
db := newDB(t, meta.WithEpochState(epochState{currEpoch}))
|
||||
|
||||
checkExpiredObjects(t, db, func(exp, nonExp *objectSDK.Object) {
|
||||
cidExp, _ := exp.ContainerID()
|
||||
cidNonExp, _ := nonExp.ContainerID()
|
||||
|
||||
objs, err := metaSelect(db, cidExp, objectSDK.SearchFilters{})
|
||||
require.NoError(t, err)
|
||||
require.Empty(t, objs) // expired object should not be returned
|
||||
|
||||
objs, err = metaSelect(db, cidNonExp, objectSDK.SearchFilters{})
|
||||
require.NoError(t, err)
|
||||
require.NotEmpty(t, objs)
|
||||
})
|
||||
}
|
||||
|
||||
func benchmarkSelect(b *testing.B, db *meta.DB, cid cidSDK.ID, fs objectSDK.SearchFilters, expected int) {
|
||||
var prm meta.SelectPrm
|
||||
prm.SetContainerID(cid)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package shard
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
|
@ -208,7 +209,7 @@ func (s *Shard) refillMetabase() error {
|
|||
mPrm.SetBlobovniczaID(blzID)
|
||||
|
||||
_, err := s.metaBase.Put(mPrm)
|
||||
if err != nil && !meta.IsErrRemoved(err) {
|
||||
if err != nil && !meta.IsErrRemoved(err) && !errors.Is(err, object.ErrObjectIsExpired) {
|
||||
return err
|
||||
}
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package shard
|
|||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
)
|
||||
|
||||
|
@ -23,3 +24,9 @@ func IsErrRemoved(err error) bool {
|
|||
func IsErrOutOfRange(err error) bool {
|
||||
return errors.As(err, new(apistatus.ObjectOutOfRange))
|
||||
}
|
||||
|
||||
// IsErrObjectExpired checks if an error returned by Shard corresponds to
|
||||
// expired object.
|
||||
func IsErrObjectExpired(err error) bool {
|
||||
return errors.Is(err, object.ErrObjectIsExpired)
|
||||
}
|
||||
|
|
|
@ -32,6 +32,7 @@ func (p ExistsRes) Exists() bool {
|
|||
// unambiguously determine the presence of an object.
|
||||
//
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been marked as removed.
|
||||
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
|
||||
func (s *Shard) Exists(prm ExistsPrm) (ExistsRes, error) {
|
||||
var exists bool
|
||||
var err error
|
||||
|
|
|
@ -59,6 +59,7 @@ func (r GetRes) HasMeta() bool {
|
|||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing in shard.
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
|
||||
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
|
||||
func (s *Shard) Get(prm GetPrm) (GetRes, error) {
|
||||
var big, small storFetcher
|
||||
|
||||
|
|
|
@ -45,6 +45,7 @@ func (r HeadRes) Object() *objectSDK.Object {
|
|||
//
|
||||
// Returns an error of type apistatus.ObjectNotFound if object is missing in Shard.
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
|
||||
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
|
||||
func (s *Shard) Head(prm HeadPrm) (HeadRes, error) {
|
||||
// object can be saved in write-cache (if enabled) or in metabase
|
||||
|
||||
|
|
|
@ -64,6 +64,7 @@ func (r RngRes) HasMeta() bool {
|
|||
// Returns ErrRangeOutOfBounds if the requested object range is out of bounds.
|
||||
// Returns an error of type apistatus.ObjectNotFound if the requested object is missing.
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if the requested object has been marked as removed in shard.
|
||||
// Returns the object.ErrObjectIsExpired if the object is presented but already expired.
|
||||
func (s *Shard) GetRange(prm RngPrm) (RngRes, error) {
|
||||
var big, small storFetcher
|
||||
|
||||
|
|
|
@ -122,7 +122,7 @@ func (s *Shard) Restore(prm RestorePrm) (RestoreRes, error) {
|
|||
|
||||
putPrm.SetObject(obj)
|
||||
_, err = s.Put(putPrm)
|
||||
if err != nil {
|
||||
if err != nil && !IsErrObjectExpired(err) && !IsErrRemoved(err) {
|
||||
return RestoreRes{}, err
|
||||
}
|
||||
|
||||
|
|
|
@ -1,9 +1,13 @@
|
|||
package writecache
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree"
|
||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
"go.etcd.io/bbolt"
|
||||
)
|
||||
|
@ -63,7 +67,11 @@ func (c *cache) isFlushed(addr oid.Address) bool {
|
|||
existsPrm.SetAddress(addr)
|
||||
|
||||
mRes, err := c.metabase.Exists(existsPrm)
|
||||
if err != nil || !mRes.Exists() {
|
||||
if err != nil {
|
||||
return errors.Is(err, object.ErrObjectIsExpired) || errors.As(err, new(apistatus.ObjectAlreadyRemoved))
|
||||
}
|
||||
|
||||
if !mRes.Exists() {
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue