[#56] node: Allow reading expired locked object

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
This commit is contained in:
Pavel Karpy 2023-01-30 13:23:44 +03:00 committed by fyrchik
parent 3beef10f89
commit 337049b2ce
3 changed files with 27 additions and 0 deletions

View file

@ -20,6 +20,7 @@ Changelog for FrostFS Node
- Pilorama now can merge multiple batches into one (#2231) - Pilorama now can merge multiple batches into one (#2231)
- Storage engine now can start even when some shard components are unavailable (#2238) - Storage engine now can start even when some shard components are unavailable (#2238)
- `neofs-cli` buffer for object put increased from 4 KiB to 3 MiB (#2243) - `neofs-cli` buffer for object put increased from 4 KiB to 3 MiB (#2243)
- Expired locked object is available for reading (#56)
### Fixed ### Fixed
- Increase payload size metric on shards' `put` operation (#1794) - Increase payload size metric on shards' `put` operation (#1794)

View file

@ -100,6 +100,11 @@ func (db *DB) exists(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) (exists b
// - 2 if object is covered with tombstone; // - 2 if object is covered with tombstone;
// - 3 if object is expired. // - 3 if object is expired.
func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) uint8 { func objectStatus(tx *bbolt.Tx, addr oid.Address, currEpoch uint64) uint8 {
// locked object could not be removed/marked with GC/expired
if objectLocked(tx, addr.Container(), addr.Object()) {
return 0
}
// we check only if the object is expired in the current // we check only if the object is expired in the current
// epoch since it is considered the only corner case: the // epoch since it is considered the only corner case: the
// GC is expected to collect all the objects that have // GC is expected to collect all the objects that have

View file

@ -169,6 +169,27 @@ func TestDB_Lock(t *testing.T) {
}) })
} }
func TestDB_Lock_Expired(t *testing.T) {
es := &epochState{e: 123}
db := newDB(t, meta.WithEpochState(es))
// put an object
addr := putWithExpiration(t, db, object.TypeRegular, 124)
// expire the obj
es.e = 125
_, err := metaGet(db, addr, false)
require.ErrorIs(t, err, meta.ErrObjectIsExpired)
// lock the obj
require.NoError(t, db.Lock(addr.Container(), oidtest.ID(), []oid.ID{addr.Object()}))
// object is expired but locked, thus, must be available
_, err = metaGet(db, addr, false)
require.NoError(t, err)
}
func TestDB_IsLocked(t *testing.T) { func TestDB_IsLocked(t *testing.T) {
db := newDB(t) db := newDB(t)