From 64bde68fb97fed2a77187be52056b8d34ca52137 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Wed, 15 Mar 2023 04:07:27 +0300 Subject: [PATCH] [#67] node: Accept expired locked objects Allow replication of any (expired too) locked object. Information about object locking is considered to be presented on the _container nodes_. Signed-off-by: Pavel Karpy --- cmd/frostfs-node/object.go | 8 +++++++ pkg/core/object/fmt.go | 33 ++++++++++++++++++++++++++++- pkg/core/object/fmt_test.go | 34 ++++++++++++++++++++++++++++-- pkg/services/object/put/local.go | 2 ++ pkg/services/object/put/service.go | 1 + 5 files changed, 75 insertions(+), 3 deletions(-) diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 8ff63f419..5e0307362 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -554,6 +554,10 @@ type engineWithNotifications struct { defaultTopic string } +func (e engineWithNotifications) IsLocked(address oid.Address) (bool, error) { + return e.base.IsLocked(address) +} + func (e engineWithNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error { return e.base.Delete(tombstone, toDelete) } @@ -587,6 +591,10 @@ type engineWithoutNotifications struct { engine *engine.StorageEngine } +func (e engineWithoutNotifications) IsLocked(address oid.Address) (bool, error) { + return e.IsLocked(address) +} + func (e engineWithoutNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error { var prm engine.InhumePrm diff --git a/pkg/core/object/fmt.go b/pkg/core/object/fmt.go index b3bafe6c8..804f0bd40 100644 --- a/pkg/core/object/fmt.go +++ b/pkg/core/object/fmt.go @@ -27,6 +27,7 @@ type FormatValidatorOption func(*cfg) type cfg struct { netState netmap.State + e LockSource } // DeleteHandler is an interface of delete queue processor. @@ -38,6 +39,12 @@ type DeleteHandler interface { DeleteObjects(oid.Address, ...oid.Address) error } +// LockSource is a source of lock relations between the objects. +type LockSource interface { + // IsLocked must clarify object's lock status. + IsLocked(address oid.Address) (bool, error) +} + // Locker is an object lock storage interface. type Locker interface { // Lock list of objects as locked by locker in the specified container. @@ -319,7 +326,24 @@ func (v *FormatValidator) checkExpiration(obj *object.Object) error { } if exp < v.netState.CurrentEpoch() { - return errExpired + // an object could be expired but locked; + // put such an object is a correct operation + + cID, _ := obj.ContainerID() + oID, _ := obj.ID() + + var addr oid.Address + addr.SetContainer(cID) + addr.SetObject(oID) + + locked, err := v.e.IsLocked(addr) + if err != nil { + return fmt.Errorf("locking status check for an expired object: %w", err) + } + + if !locked { + return errExpired + } } return nil @@ -380,3 +404,10 @@ func WithNetState(netState netmap.State) FormatValidatorOption { c.netState = netState } } + +// WithLockSource return option to set the Storage Engine. +func WithLockSource(e LockSource) FormatValidatorOption { + return func(c *cfg) { + c.e = e + } +} diff --git a/pkg/core/object/fmt_test.go b/pkg/core/object/fmt_test.go index 3f2ea4634..563c7827d 100644 --- a/pkg/core/object/fmt_test.go +++ b/pkg/core/object/fmt_test.go @@ -36,13 +36,26 @@ func (s testNetState) CurrentEpoch() uint64 { return s.epoch } +type testLockSource struct { + m map[oid.Address]bool +} + +func (t testLockSource) IsLocked(address oid.Address) (bool, error) { + return t.m[address], nil +} + func TestFormatValidator_Validate(t *testing.T) { const curEpoch = 13 + ls := testLockSource{ + m: make(map[oid.Address]bool), + } + v := NewFormatValidator( WithNetState(testNetState{ epoch: curEpoch, }), + WithLockSource(ls), ) ownerKey, err := keys.NewPrivateKey() @@ -229,8 +242,25 @@ func TestFormatValidator_Validate(t *testing.T) { t.Run("expired object", func(t *testing.T) { val := strconv.FormatUint(curEpoch-1, 10) - err := v.Validate(fn(val), false) - require.ErrorIs(t, err, errExpired) + obj := fn(val) + + t.Run("non-locked", func(t *testing.T) { + err := v.Validate(obj, false) + require.ErrorIs(t, err, errExpired) + }) + + t.Run("locked", func(t *testing.T) { + var addr oid.Address + oID, _ := obj.ID() + cID, _ := obj.ContainerID() + + addr.SetContainer(cID) + addr.SetObject(oID) + ls.m[addr] = true + + err := v.Validate(obj, false) + require.NoError(t, err) + }) }) t.Run("alive object", func(t *testing.T) { diff --git a/pkg/services/object/put/local.go b/pkg/services/object/put/local.go index 009df95c3..f344f77e9 100644 --- a/pkg/services/object/put/local.go +++ b/pkg/services/object/put/local.go @@ -20,6 +20,8 @@ type ObjectStorage interface { // Lock must lock passed objects // and return any appeared error. Lock(locker oid.Address, toLock []oid.ID) error + // IsLocked must clarify object's lock status. + IsLocked(oid.Address) (bool, error) } type localTarget struct { diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index 338e5485b..b74c97d49 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -101,6 +101,7 @@ func WithMaxSizeSource(v MaxSizeSource) Option { func WithObjectStorage(v ObjectStorage) Option { return func(c *cfg) { c.localStore = v + c.fmtValidatorOpts = append(c.fmtValidatorOpts, object.WithLockSource(v)) } } -- 2.45.2