fix/expired-locked-object-replication #138
5 changed files with 75 additions and 3 deletions
|
@ -554,6 +554,10 @@ type engineWithNotifications struct {
|
||||||
defaultTopic string
|
defaultTopic string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e engineWithNotifications) IsLocked(address oid.Address) (bool, error) {
|
||||||
|
return e.base.IsLocked(address)
|
||||||
|
}
|
||||||
|
|
||||||
func (e engineWithNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
|
func (e engineWithNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
|
||||||
return e.base.Delete(tombstone, toDelete)
|
return e.base.Delete(tombstone, toDelete)
|
||||||
}
|
}
|
||||||
|
@ -587,6 +591,10 @@ type engineWithoutNotifications struct {
|
||||||
engine *engine.StorageEngine
|
engine *engine.StorageEngine
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (e engineWithoutNotifications) IsLocked(address oid.Address) (bool, error) {
|
||||||
|
return e.IsLocked(address)
|
||||||
|
}
|
||||||
|
|
||||||
func (e engineWithoutNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
|
func (e engineWithoutNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
|
||||||
var prm engine.InhumePrm
|
var prm engine.InhumePrm
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,7 @@ type FormatValidatorOption func(*cfg)
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
netState netmap.State
|
netState netmap.State
|
||||||
|
e LockSource
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteHandler is an interface of delete queue processor.
|
// DeleteHandler is an interface of delete queue processor.
|
||||||
|
@ -38,6 +39,12 @@ type DeleteHandler interface {
|
||||||
DeleteObjects(oid.Address, ...oid.Address) error
|
DeleteObjects(oid.Address, ...oid.Address) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LockSource is a source of lock relations between the objects.
|
||||||
|
type LockSource interface {
|
||||||
fyrchik marked this conversation as resolved
Outdated
|
|||||||
|
// IsLocked must clarify object's lock status.
|
||||||
|
IsLocked(address oid.Address) (bool, error)
|
||||||
|
}
|
||||||
|
|
||||||
// Locker is an object lock storage interface.
|
// Locker is an object lock storage interface.
|
||||||
type Locker interface {
|
type Locker interface {
|
||||||
// Lock list of objects as locked by locker in the specified container.
|
// Lock list of objects as locked by locker in the specified container.
|
||||||
|
@ -319,7 +326,24 @@ func (v *FormatValidator) checkExpiration(obj *object.Object) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if exp < v.netState.CurrentEpoch() {
|
if exp < v.netState.CurrentEpoch() {
|
||||||
return errExpired
|
// an object could be expired but locked;
|
||||||
|
// put such an object is a correct operation
|
||||||
|
|
||||||
|
cID, _ := obj.ContainerID()
|
||||||
|
oID, _ := obj.ID()
|
||||||
|
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(cID)
|
||||||
|
addr.SetObject(oID)
|
||||||
|
|
||||||
|
locked, err := v.e.IsLocked(addr)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("locking status check for an expired object: %w", err)
|
||||||
carpawell
commented
i can imagine that we could store an object anyway in that branch but not sure. @TrueCloudLab/storage-core-committers i can imagine that we could store an object anyway in that branch but not sure. @TrueCloudLab/storage-core-committers
fyrchik
commented
If there was some storage error, it would probably better not to store anything. If there was some storage error, it would probably better not to store anything.
Also, I would somehow mention the word `expired` in the error message (`can't check locking status for an expired object`?)
carpawell
commented
added added
|
|||||||
|
}
|
||||||
|
|
||||||
|
if !locked {
|
||||||
|
return errExpired
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -380,3 +404,10 @@ func WithNetState(netState netmap.State) FormatValidatorOption {
|
||||||
c.netState = netState
|
c.netState = netState
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithLockSource return option to set the Storage Engine.
|
||||||
|
func WithLockSource(e LockSource) FormatValidatorOption {
|
||||||
|
return func(c *cfg) {
|
||||||
|
c.e = e
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -36,13 +36,26 @@ func (s testNetState) CurrentEpoch() uint64 {
|
||||||
return s.epoch
|
return s.epoch
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type testLockSource struct {
|
||||||
|
m map[oid.Address]bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t testLockSource) IsLocked(address oid.Address) (bool, error) {
|
||||||
|
return t.m[address], nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestFormatValidator_Validate(t *testing.T) {
|
func TestFormatValidator_Validate(t *testing.T) {
|
||||||
const curEpoch = 13
|
const curEpoch = 13
|
||||||
|
|
||||||
|
ls := testLockSource{
|
||||||
|
m: make(map[oid.Address]bool),
|
||||||
|
}
|
||||||
|
|
||||||
v := NewFormatValidator(
|
v := NewFormatValidator(
|
||||||
WithNetState(testNetState{
|
WithNetState(testNetState{
|
||||||
epoch: curEpoch,
|
epoch: curEpoch,
|
||||||
}),
|
}),
|
||||||
|
WithLockSource(ls),
|
||||||
)
|
)
|
||||||
|
|
||||||
ownerKey, err := keys.NewPrivateKey()
|
ownerKey, err := keys.NewPrivateKey()
|
||||||
|
@ -229,8 +242,25 @@ func TestFormatValidator_Validate(t *testing.T) {
|
||||||
|
|
||||||
t.Run("expired object", func(t *testing.T) {
|
t.Run("expired object", func(t *testing.T) {
|
||||||
val := strconv.FormatUint(curEpoch-1, 10)
|
val := strconv.FormatUint(curEpoch-1, 10)
|
||||||
err := v.Validate(fn(val), false)
|
obj := fn(val)
|
||||||
require.ErrorIs(t, err, errExpired)
|
|
||||||
|
t.Run("non-locked", func(t *testing.T) {
|
||||||
|
err := v.Validate(obj, false)
|
||||||
|
require.ErrorIs(t, err, errExpired)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("locked", func(t *testing.T) {
|
||||||
|
var addr oid.Address
|
||||||
|
oID, _ := obj.ID()
|
||||||
|
cID, _ := obj.ContainerID()
|
||||||
|
|
||||||
|
addr.SetContainer(cID)
|
||||||
|
addr.SetObject(oID)
|
||||||
|
ls.m[addr] = true
|
||||||
|
|
||||||
|
err := v.Validate(obj, false)
|
||||||
|
require.NoError(t, err)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("alive object", func(t *testing.T) {
|
t.Run("alive object", func(t *testing.T) {
|
||||||
|
|
|
@ -20,6 +20,8 @@ type ObjectStorage interface {
|
||||||
// Lock must lock passed objects
|
// Lock must lock passed objects
|
||||||
// and return any appeared error.
|
// and return any appeared error.
|
||||||
Lock(locker oid.Address, toLock []oid.ID) error
|
Lock(locker oid.Address, toLock []oid.ID) error
|
||||||
|
// IsLocked must clarify object's lock status.
|
||||||
|
IsLocked(oid.Address) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type localTarget struct {
|
type localTarget struct {
|
||||||
|
|
|
@ -101,6 +101,7 @@ func WithMaxSizeSource(v MaxSizeSource) Option {
|
||||||
func WithObjectStorage(v ObjectStorage) Option {
|
func WithObjectStorage(v ObjectStorage) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.localStore = v
|
c.localStore = v
|
||||||
|
c.fmtValidatorOpts = append(c.fmtValidatorOpts, object.WithLockSource(v))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue
Do we really need to make this interface exportable and write comments?
we usually do such in our packages since it should be implemented in another package
in fact i would use just
StorageEngine
here but cycled imports appeared so i decided to do that as usualcc @fyrchik
We use it as an argument to the exported option, seems ok to me.