fix/expired-locked-object-replication #138
5 changed files with 75 additions and 3 deletions
|
@ -554,6 +554,10 @@ type engineWithNotifications struct {
|
|||
defaultTopic string
|
||||
}
|
||||
|
||||
func (e engineWithNotifications) IsLocked(address oid.Address) (bool, error) {
|
||||
return e.base.IsLocked(address)
|
||||
}
|
||||
|
||||
func (e engineWithNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
|
||||
return e.base.Delete(tombstone, toDelete)
|
||||
}
|
||||
|
@ -587,6 +591,10 @@ type engineWithoutNotifications struct {
|
|||
engine *engine.StorageEngine
|
||||
}
|
||||
|
||||
func (e engineWithoutNotifications) IsLocked(address oid.Address) (bool, error) {
|
||||
return e.IsLocked(address)
|
||||
}
|
||||
|
||||
func (e engineWithoutNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
|
||||
var prm engine.InhumePrm
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ type FormatValidatorOption func(*cfg)
|
|||
|
||||
type cfg struct {
|
||||
netState netmap.State
|
||||
e LockSource
|
||||
}
|
||||
|
||||
// DeleteHandler is an interface of delete queue processor.
|
||||
|
@ -38,6 +39,12 @@ type DeleteHandler interface {
|
|||
DeleteObjects(oid.Address, ...oid.Address) error
|
||||
}
|
||||
|
||||
// LockSource is a source of lock relations between the objects.
|
||||
type LockSource interface {
|
||||
// IsLocked must clarify object's lock status.
|
||||
IsLocked(address oid.Address) (bool, error)
|
||||
}
|
||||
|
||||
// Locker is an object lock storage interface.
|
||||
type Locker interface {
|
||||
// Lock list of objects as locked by locker in the specified container.
|
||||
|
@ -319,7 +326,24 @@ func (v *FormatValidator) checkExpiration(obj *object.Object) error {
|
|||
}
|
||||
|
||||
if exp < v.netState.CurrentEpoch() {
|
||||
return errExpired
|
||||
// an object could be expired but locked;
|
||||
// put such an object is a correct operation
|
||||
|
||||
cID, _ := obj.ContainerID()
|
||||
oID, _ := obj.ID()
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(cID)
|
||||
addr.SetObject(oID)
|
||||
|
||||
locked, err := v.e.IsLocked(addr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("locking status check for an expired object: %w", err)
|
||||
}
|
||||
|
||||
if !locked {
|
||||
return errExpired
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -380,3 +404,10 @@ func WithNetState(netState netmap.State) FormatValidatorOption {
|
|||
c.netState = netState
|
||||
}
|
||||
}
|
||||
|
||||
// WithLockSource return option to set the Storage Engine.
|
||||
func WithLockSource(e LockSource) FormatValidatorOption {
|
||||
return func(c *cfg) {
|
||||
c.e = e
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,13 +36,26 @@ func (s testNetState) CurrentEpoch() uint64 {
|
|||
return s.epoch
|
||||
}
|
||||
|
||||
type testLockSource struct {
|
||||
m map[oid.Address]bool
|
||||
}
|
||||
|
||||
func (t testLockSource) IsLocked(address oid.Address) (bool, error) {
|
||||
return t.m[address], nil
|
||||
}
|
||||
|
||||
func TestFormatValidator_Validate(t *testing.T) {
|
||||
const curEpoch = 13
|
||||
|
||||
ls := testLockSource{
|
||||
m: make(map[oid.Address]bool),
|
||||
}
|
||||
|
||||
v := NewFormatValidator(
|
||||
WithNetState(testNetState{
|
||||
epoch: curEpoch,
|
||||
}),
|
||||
WithLockSource(ls),
|
||||
)
|
||||
|
||||
ownerKey, err := keys.NewPrivateKey()
|
||||
|
@ -229,8 +242,25 @@ func TestFormatValidator_Validate(t *testing.T) {
|
|||
|
||||
t.Run("expired object", func(t *testing.T) {
|
||||
val := strconv.FormatUint(curEpoch-1, 10)
|
||||
err := v.Validate(fn(val), false)
|
||||
require.ErrorIs(t, err, errExpired)
|
||||
obj := fn(val)
|
||||
|
||||
t.Run("non-locked", func(t *testing.T) {
|
||||
err := v.Validate(obj, false)
|
||||
require.ErrorIs(t, err, errExpired)
|
||||
})
|
||||
|
||||
t.Run("locked", func(t *testing.T) {
|
||||
var addr oid.Address
|
||||
oID, _ := obj.ID()
|
||||
cID, _ := obj.ContainerID()
|
||||
|
||||
addr.SetContainer(cID)
|
||||
addr.SetObject(oID)
|
||||
ls.m[addr] = true
|
||||
|
||||
err := v.Validate(obj, false)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("alive object", func(t *testing.T) {
|
||||
|
|
|
@ -20,6 +20,8 @@ type ObjectStorage interface {
|
|||
// Lock must lock passed objects
|
||||
// and return any appeared error.
|
||||
Lock(locker oid.Address, toLock []oid.ID) error
|
||||
// IsLocked must clarify object's lock status.
|
||||
IsLocked(oid.Address) (bool, error)
|
||||
}
|
||||
|
||||
type localTarget struct {
|
||||
|
|
|
@ -101,6 +101,7 @@ func WithMaxSizeSource(v MaxSizeSource) Option {
|
|||
func WithObjectStorage(v ObjectStorage) Option {
|
||||
return func(c *cfg) {
|
||||
c.localStore = v
|
||||
c.fmtValidatorOpts = append(c.fmtValidatorOpts, object.WithLockSource(v))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue