fix/expired-locked-object-replication #138
5 changed files with 75 additions and 3 deletions
|
@ -554,6 +554,10 @@ type engineWithNotifications struct {
|
|||
defaultTopic string
|
||||
}
|
||||
|
||||
func (e engineWithNotifications) IsLocked(address oid.Address) (bool, error) {
|
||||
return e.base.IsLocked(address)
|
||||
}
|
||||
|
||||
func (e engineWithNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
|
||||
return e.base.Delete(tombstone, toDelete)
|
||||
}
|
||||
|
@ -587,6 +591,10 @@ type engineWithoutNotifications struct {
|
|||
engine *engine.StorageEngine
|
||||
}
|
||||
|
||||
func (e engineWithoutNotifications) IsLocked(address oid.Address) (bool, error) {
|
||||
return e.IsLocked(address)
|
||||
}
|
||||
|
||||
func (e engineWithoutNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
|
||||
var prm engine.InhumePrm
|
||||
|
||||
|
|
|
@ -27,6 +27,7 @@ type FormatValidatorOption func(*cfg)
|
|||
|
||||
type cfg struct {
|
||||
netState netmap.State
|
||||
e LockSource
|
||||
}
|
||||
|
||||
// DeleteHandler is an interface of delete queue processor.
|
||||
|
@ -38,6 +39,12 @@ type DeleteHandler interface {
|
|||
DeleteObjects(oid.Address, ...oid.Address) error
|
||||
}
|
||||
|
||||
// LockSource is a source of lock relations between the objects.
|
||||
type LockSource interface {
|
||||
fyrchik marked this conversation as resolved
Outdated
|
||||
// IsLocked must clarify object's lock status.
|
||||
IsLocked(address oid.Address) (bool, error)
|
||||
}
|
||||
|
||||
// Locker is an object lock storage interface.
|
||||
type Locker interface {
|
||||
// Lock list of objects as locked by locker in the specified container.
|
||||
|
@ -319,8 +326,25 @@ func (v *FormatValidator) checkExpiration(obj *object.Object) error {
|
|||
}
|
||||
|
||||
if exp < v.netState.CurrentEpoch() {
|
||||
// an object could be expired but locked;
|
||||
// put such an object is a correct operation
|
||||
|
||||
cID, _ := obj.ContainerID()
|
||||
oID, _ := obj.ID()
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(cID)
|
||||
addr.SetObject(oID)
|
||||
|
||||
locked, err := v.e.IsLocked(addr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("locking status check for an expired object: %w", err)
|
||||
carpawell
commented
i can imagine that we could store an object anyway in that branch but not sure. @TrueCloudLab/storage-core-committers i can imagine that we could store an object anyway in that branch but not sure. @TrueCloudLab/storage-core-committers
fyrchik
commented
If there was some storage error, it would probably better not to store anything. If there was some storage error, it would probably better not to store anything.
Also, I would somehow mention the word `expired` in the error message (`can't check locking status for an expired object`?)
carpawell
commented
added added
|
||||
}
|
||||
|
||||
if !locked {
|
||||
return errExpired
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -380,3 +404,10 @@ func WithNetState(netState netmap.State) FormatValidatorOption {
|
|||
c.netState = netState
|
||||
}
|
||||
}
|
||||
|
||||
// WithLockSource return option to set the Storage Engine.
|
||||
func WithLockSource(e LockSource) FormatValidatorOption {
|
||||
return func(c *cfg) {
|
||||
c.e = e
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,13 +36,26 @@ func (s testNetState) CurrentEpoch() uint64 {
|
|||
return s.epoch
|
||||
}
|
||||
|
||||
type testLockSource struct {
|
||||
m map[oid.Address]bool
|
||||
}
|
||||
|
||||
func (t testLockSource) IsLocked(address oid.Address) (bool, error) {
|
||||
return t.m[address], nil
|
||||
}
|
||||
|
||||
func TestFormatValidator_Validate(t *testing.T) {
|
||||
const curEpoch = 13
|
||||
|
||||
ls := testLockSource{
|
||||
m: make(map[oid.Address]bool),
|
||||
}
|
||||
|
||||
v := NewFormatValidator(
|
||||
WithNetState(testNetState{
|
||||
epoch: curEpoch,
|
||||
}),
|
||||
WithLockSource(ls),
|
||||
)
|
||||
|
||||
ownerKey, err := keys.NewPrivateKey()
|
||||
|
@ -229,10 +242,27 @@ func TestFormatValidator_Validate(t *testing.T) {
|
|||
|
||||
t.Run("expired object", func(t *testing.T) {
|
||||
val := strconv.FormatUint(curEpoch-1, 10)
|
||||
err := v.Validate(fn(val), false)
|
||||
obj := fn(val)
|
||||
|
||||
t.Run("non-locked", func(t *testing.T) {
|
||||
err := v.Validate(obj, false)
|
||||
require.ErrorIs(t, err, errExpired)
|
||||
})
|
||||
|
||||
t.Run("locked", func(t *testing.T) {
|
||||
var addr oid.Address
|
||||
oID, _ := obj.ID()
|
||||
cID, _ := obj.ContainerID()
|
||||
|
||||
addr.SetContainer(cID)
|
||||
addr.SetObject(oID)
|
||||
ls.m[addr] = true
|
||||
|
||||
err := v.Validate(obj, false)
|
||||
require.NoError(t, err)
|
||||
})
|
||||
})
|
||||
|
||||
t.Run("alive object", func(t *testing.T) {
|
||||
val := strconv.FormatUint(curEpoch, 10)
|
||||
err := v.Validate(fn(val), true)
|
||||
|
|
|
@ -20,6 +20,8 @@ type ObjectStorage interface {
|
|||
// Lock must lock passed objects
|
||||
// and return any appeared error.
|
||||
Lock(locker oid.Address, toLock []oid.ID) error
|
||||
// IsLocked must clarify object's lock status.
|
||||
IsLocked(oid.Address) (bool, error)
|
||||
}
|
||||
|
||||
type localTarget struct {
|
||||
|
|
|
@ -101,6 +101,7 @@ func WithMaxSizeSource(v MaxSizeSource) Option {
|
|||
func WithObjectStorage(v ObjectStorage) Option {
|
||||
return func(c *cfg) {
|
||||
c.localStore = v
|
||||
c.fmtValidatorOpts = append(c.fmtValidatorOpts, object.WithLockSource(v))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue
Do we really need to make this interface exportable and write comments?
we usually do such in our packages since it should be implemented in another package
in fact i would use just
StorageEngine
here but cycled imports appeared so i decided to do that as usualcc @fyrchik
We use it as an argument to the exported option, seems ok to me.