fix/expired-locked-object-replication #138
5 changed files with 75 additions and 3 deletions
|
@ -554,6 +554,10 @@ type engineWithNotifications struct {
|
||||||
defaultTopic string
|
defaultTopic string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e engineWithNotifications) IsLocked(address oid.Address) (bool, error) {
|
||||||
|
return e.base.IsLocked(address)
|
||||||
|
}
|
||||||
|
|
||||||
func (e engineWithNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
|
func (e engineWithNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
|
||||||
return e.base.Delete(tombstone, toDelete)
|
return e.base.Delete(tombstone, toDelete)
|
||||||
}
|
}
|
||||||
|
@ -587,6 +591,10 @@ type engineWithoutNotifications struct {
|
||||||
engine *engine.StorageEngine
|
engine *engine.StorageEngine
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e engineWithoutNotifications) IsLocked(address oid.Address) (bool, error) {
|
||||||
|
return e.IsLocked(address)
|
||||||
|
}
|
||||||
|
|
||||||
func (e engineWithoutNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
|
func (e engineWithoutNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
|
||||||
var prm engine.InhumePrm
|
var prm engine.InhumePrm
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ type FormatValidatorOption func(*cfg)
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
netState netmap.State
|
netState netmap.State
|
||||||
|
e LockSource
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteHandler is an interface of delete queue processor.
|
// DeleteHandler is an interface of delete queue processor.
|
||||||
|
@ -38,6 +39,12 @@ type DeleteHandler interface {
|
||||||
DeleteObjects(oid.Address, ...oid.Address) error
|
DeleteObjects(oid.Address, ...oid.Address) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LockSource is a source of lock relations between the objects.
|
||||||
|
type LockSource interface {
|
||||||
|
// IsLocked must clarify object's lock status.
|
||||||
|
IsLocked(address oid.Address) (bool, error)
|
||||||
|
}
|
||||||
|
|
||||||
// Locker is an object lock storage interface.
|
// Locker is an object lock storage interface.
|
||||||
type Locker interface {
|
type Locker interface {
|
||||||
// Lock list of objects as locked by locker in the specified container.
|
// Lock list of objects as locked by locker in the specified container.
|
||||||
|
@ -319,7 +326,24 @@ func (v *FormatValidator) checkExpiration(obj *object.Object) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if exp < v.netState.CurrentEpoch() {
|
if exp < v.netState.CurrentEpoch() {
|
||||||
return errExpired
|
// an object could be expired but locked;
|
||||||
|
// put such an object is a correct operation
|
||||||
|
|
||||||
|
cID, _ := obj.ContainerID()
|
||||||
|
oID, _ := obj.ID()
|
||||||
|
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(cID)
|
||||||
|
addr.SetObject(oID)
|
||||||
|
|
||||||
|
locked, err := v.e.IsLocked(addr)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("locking status check for an expired object: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !locked {
|
||||||
|
return errExpired
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -380,3 +404,10 @@ func WithNetState(netState netmap.State) FormatValidatorOption {
|
||||||
c.netState = netState
|
c.netState = netState
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithLockSource return option to set the Storage Engine.
|
||||||
|
func WithLockSource(e LockSource) FormatValidatorOption {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.e = e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -36,13 +36,26 @@ func (s testNetState) CurrentEpoch() uint64 {
|
||||||
return s.epoch
|
return s.epoch
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type testLockSource struct {
|
||||||
|
m map[oid.Address]bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t testLockSource) IsLocked(address oid.Address) (bool, error) {
|
||||||
|
return t.m[address], nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestFormatValidator_Validate(t *testing.T) {
|
func TestFormatValidator_Validate(t *testing.T) {
|
||||||
const curEpoch = 13
|
const curEpoch = 13
|
||||||
|
|
||||||
|
ls := testLockSource{
|
||||||
|
m: make(map[oid.Address]bool),
|
||||||
|
}
|
||||||
|
|
||||||
v := NewFormatValidator(
|
v := NewFormatValidator(
|
||||||
WithNetState(testNetState{
|
WithNetState(testNetState{
|
||||||
epoch: curEpoch,
|
epoch: curEpoch,
|
||||||
}),
|
}),
|
||||||
|
WithLockSource(ls),
|
||||||
)
|
)
|
||||||
|
|
||||||
ownerKey, err := keys.NewPrivateKey()
|
ownerKey, err := keys.NewPrivateKey()
|
||||||
|
@ -229,8 +242,25 @@ func TestFormatValidator_Validate(t *testing.T) {
|
||||||
|
|
||||||
t.Run("expired object", func(t *testing.T) {
|
t.Run("expired object", func(t *testing.T) {
|
||||||
val := strconv.FormatUint(curEpoch-1, 10)
|
val := strconv.FormatUint(curEpoch-1, 10)
|
||||||
err := v.Validate(fn(val), false)
|
obj := fn(val)
|
||||||
require.ErrorIs(t, err, errExpired)
|
|
||||||
|
t.Run("non-locked", func(t *testing.T) {
|
||||||
|
err := v.Validate(obj, false)
|
||||||
|
require.ErrorIs(t, err, errExpired)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("locked", func(t *testing.T) {
|
||||||
|
var addr oid.Address
|
||||||
|
oID, _ := obj.ID()
|
||||||
|
cID, _ := obj.ContainerID()
|
||||||
|
|
||||||
|
addr.SetContainer(cID)
|
||||||
|
addr.SetObject(oID)
|
||||||
|
ls.m[addr] = true
|
||||||
|
|
||||||
|
err := v.Validate(obj, false)
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("alive object", func(t *testing.T) {
|
t.Run("alive object", func(t *testing.T) {
|
||||||
|
|
|
@ -20,6 +20,8 @@ type ObjectStorage interface {
|
||||||
// Lock must lock passed objects
|
// Lock must lock passed objects
|
||||||
// and return any appeared error.
|
// and return any appeared error.
|
||||||
Lock(locker oid.Address, toLock []oid.ID) error
|
Lock(locker oid.Address, toLock []oid.ID) error
|
||||||
|
// IsLocked must clarify object's lock status.
|
||||||
|
IsLocked(oid.Address) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type localTarget struct {
|
type localTarget struct {
|
||||||
|
|
|
@ -101,6 +101,7 @@ func WithMaxSizeSource(v MaxSizeSource) Option {
|
||||||
func WithObjectStorage(v ObjectStorage) Option {
|
func WithObjectStorage(v ObjectStorage) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.localStore = v
|
c.localStore = v
|
||||||
|
c.fmtValidatorOpts = append(c.fmtValidatorOpts, object.WithLockSource(v))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue