[#1176] metabase: Ignore locked objects in IterateExpired
Make `DB.IterateExpired` to not pass locked objects to the handler. The method is used by GC, therefore it will not consider them as candidates for deletion. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
9508633a7e
commit
dba3e58dc5
3 changed files with 31 additions and 1 deletions
|
@ -38,7 +38,8 @@ type ExpiredObjectHandler func(*ExpiredObject) error
|
||||||
var ErrInterruptIterator = errors.New("iterator is interrupted")
|
var ErrInterruptIterator = errors.New("iterator is interrupted")
|
||||||
|
|
||||||
// IterateExpired iterates over all objects in DB which are out of date
|
// IterateExpired iterates over all objects in DB which are out of date
|
||||||
// relative to epoch.
|
// relative to epoch. Locked objects are not included (do not confuse
|
||||||
|
// with objects of type LOCK).
|
||||||
//
|
//
|
||||||
// If h returns ErrInterruptIterator, nil returns immediately.
|
// If h returns ErrInterruptIterator, nil returns immediately.
|
||||||
// Returns other errors of h directly.
|
// Returns other errors of h directly.
|
||||||
|
@ -83,6 +84,14 @@ func (db *DB) iterateExpired(tx *bbolt.Tx, epoch uint64, h ExpiredObjectHandler)
|
||||||
return fmt.Errorf("could not parse container ID of expired bucket: %w", err)
|
return fmt.Errorf("could not parse container ID of expired bucket: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ignore locked objects.
|
||||||
|
//
|
||||||
|
// To slightly optimize performance we can check only REGULAR objects
|
||||||
|
// (only they can be locked), but it's more reliable.
|
||||||
|
if objectLocked(tx, *cnrID, *id) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
addr := addressSDK.NewAddress()
|
addr := addressSDK.NewAddress()
|
||||||
addr.SetContainerID(cnrID)
|
addr.SetContainerID(cnrID)
|
||||||
addr.SetObjectID(id)
|
addr.SetObjectID(id)
|
||||||
|
|
|
@ -9,6 +9,8 @@ import (
|
||||||
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
|
||||||
"github.com/nspcc-dev/neofs-sdk-go/object"
|
"github.com/nspcc-dev/neofs-sdk-go/object"
|
||||||
addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address"
|
addressSDK "github.com/nspcc-dev/neofs-sdk-go/object/address"
|
||||||
|
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||||
|
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -30,11 +32,17 @@ func TestDB_IterateExpired(t *testing.T) {
|
||||||
mExpired[typ] = putWithExpiration(t, db, typ, epoch-1)
|
mExpired[typ] = putWithExpiration(t, db, typ, epoch-1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
expiredLocked := putWithExpiration(t, db, object.TypeRegular, epoch-1)
|
||||||
|
|
||||||
|
require.NoError(t, db.Lock(*expiredLocked.ContainerID(), *oidtest.ID(), []oid.ID{*expiredLocked.ObjectID()}))
|
||||||
|
|
||||||
err := db.IterateExpired(epoch, func(exp *meta.ExpiredObject) error {
|
err := db.IterateExpired(epoch, func(exp *meta.ExpiredObject) error {
|
||||||
if addr, ok := mAlive[exp.Type()]; ok {
|
if addr, ok := mAlive[exp.Type()]; ok {
|
||||||
require.NotEqual(t, addr, exp.Address())
|
require.NotEqual(t, addr, exp.Address())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
require.NotEqual(t, expiredLocked, exp.Address())
|
||||||
|
|
||||||
addr, ok := mExpired[exp.Type()]
|
addr, ok := mExpired[exp.Type()]
|
||||||
require.True(t, ok)
|
require.True(t, ok)
|
||||||
require.Equal(t, addr, exp.Address())
|
require.Equal(t, addr, exp.Address())
|
||||||
|
|
|
@ -97,3 +97,16 @@ func (db *DB) Lock(cnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// checks if specified object is locked in the specified container.
|
||||||
|
func objectLocked(tx *bbolt.Tx, idCnr cid.ID, idObj oid.ID) bool {
|
||||||
|
bucketLocked := tx.Bucket(bucketNameLocked)
|
||||||
|
if bucketLocked != nil {
|
||||||
|
bucketLockedContainer := bucketLocked.Bucket([]byte(idCnr.String()))
|
||||||
|
if bucketLockedContainer != nil {
|
||||||
|
return bucketLockedContainer.Get(objectKey(&idObj)) != nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue