[#67] node: Accept expired locked objects

Allow replication of any (expired too) locked object. Information about
object locking is considered to be presented on the _container nodes_.

Signed-off-by: Pavel Karpy <p.karpy@yadro.com>
This commit is contained in:
Pavel Karpy 2023-03-15 04:07:27 +03:00
parent f006f3b342
commit 64bde68fb9
5 changed files with 75 additions and 3 deletions

View file

@ -554,6 +554,10 @@ type engineWithNotifications struct {
defaultTopic string defaultTopic string
} }
func (e engineWithNotifications) IsLocked(address oid.Address) (bool, error) {
return e.base.IsLocked(address)
}
func (e engineWithNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error { func (e engineWithNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
return e.base.Delete(tombstone, toDelete) return e.base.Delete(tombstone, toDelete)
} }
@ -587,6 +591,10 @@ type engineWithoutNotifications struct {
engine *engine.StorageEngine engine *engine.StorageEngine
} }
func (e engineWithoutNotifications) IsLocked(address oid.Address) (bool, error) {
return e.IsLocked(address)
}
func (e engineWithoutNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error { func (e engineWithoutNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
var prm engine.InhumePrm var prm engine.InhumePrm

View file

@ -27,6 +27,7 @@ type FormatValidatorOption func(*cfg)
type cfg struct { type cfg struct {
netState netmap.State netState netmap.State
e LockSource
} }
// DeleteHandler is an interface of delete queue processor. // DeleteHandler is an interface of delete queue processor.
@ -38,6 +39,12 @@ type DeleteHandler interface {
DeleteObjects(oid.Address, ...oid.Address) error DeleteObjects(oid.Address, ...oid.Address) error
} }
// LockSource is a source of lock relations between the objects.
type LockSource interface {
// IsLocked must clarify object's lock status.
IsLocked(address oid.Address) (bool, error)
}
// Locker is an object lock storage interface. // Locker is an object lock storage interface.
type Locker interface { type Locker interface {
// Lock list of objects as locked by locker in the specified container. // Lock list of objects as locked by locker in the specified container.
@ -319,7 +326,24 @@ func (v *FormatValidator) checkExpiration(obj *object.Object) error {
} }
if exp < v.netState.CurrentEpoch() { if exp < v.netState.CurrentEpoch() {
return errExpired // an object could be expired but locked;
// put such an object is a correct operation
cID, _ := obj.ContainerID()
oID, _ := obj.ID()
var addr oid.Address
addr.SetContainer(cID)
addr.SetObject(oID)
locked, err := v.e.IsLocked(addr)
if err != nil {
return fmt.Errorf("locking status check for an expired object: %w", err)
}
if !locked {
return errExpired
}
} }
return nil return nil
@ -380,3 +404,10 @@ func WithNetState(netState netmap.State) FormatValidatorOption {
c.netState = netState c.netState = netState
} }
} }
// WithLockSource return option to set the Storage Engine.
func WithLockSource(e LockSource) FormatValidatorOption {
return func(c *cfg) {
c.e = e
}
}

View file

@ -36,13 +36,26 @@ func (s testNetState) CurrentEpoch() uint64 {
return s.epoch return s.epoch
} }
type testLockSource struct {
m map[oid.Address]bool
}
func (t testLockSource) IsLocked(address oid.Address) (bool, error) {
return t.m[address], nil
}
func TestFormatValidator_Validate(t *testing.T) { func TestFormatValidator_Validate(t *testing.T) {
const curEpoch = 13 const curEpoch = 13
ls := testLockSource{
m: make(map[oid.Address]bool),
}
v := NewFormatValidator( v := NewFormatValidator(
WithNetState(testNetState{ WithNetState(testNetState{
epoch: curEpoch, epoch: curEpoch,
}), }),
WithLockSource(ls),
) )
ownerKey, err := keys.NewPrivateKey() ownerKey, err := keys.NewPrivateKey()
@ -229,8 +242,25 @@ func TestFormatValidator_Validate(t *testing.T) {
t.Run("expired object", func(t *testing.T) { t.Run("expired object", func(t *testing.T) {
val := strconv.FormatUint(curEpoch-1, 10) val := strconv.FormatUint(curEpoch-1, 10)
err := v.Validate(fn(val), false) obj := fn(val)
require.ErrorIs(t, err, errExpired)
t.Run("non-locked", func(t *testing.T) {
err := v.Validate(obj, false)
require.ErrorIs(t, err, errExpired)
})
t.Run("locked", func(t *testing.T) {
var addr oid.Address
oID, _ := obj.ID()
cID, _ := obj.ContainerID()
addr.SetContainer(cID)
addr.SetObject(oID)
ls.m[addr] = true
err := v.Validate(obj, false)
require.NoError(t, err)
})
}) })
t.Run("alive object", func(t *testing.T) { t.Run("alive object", func(t *testing.T) {

View file

@ -20,6 +20,8 @@ type ObjectStorage interface {
// Lock must lock passed objects // Lock must lock passed objects
// and return any appeared error. // and return any appeared error.
Lock(locker oid.Address, toLock []oid.ID) error Lock(locker oid.Address, toLock []oid.ID) error
// IsLocked must clarify object's lock status.
IsLocked(oid.Address) (bool, error)
} }
type localTarget struct { type localTarget struct {

View file

@ -101,6 +101,7 @@ func WithMaxSizeSource(v MaxSizeSource) Option {
func WithObjectStorage(v ObjectStorage) Option { func WithObjectStorage(v ObjectStorage) Option {
return func(c *cfg) { return func(c *cfg) {
c.localStore = v c.localStore = v
c.fmtValidatorOpts = append(c.fmtValidatorOpts, object.WithLockSource(v))
} }
} }