From 64bde68fb97fed2a77187be52056b8d34ca52137 Mon Sep 17 00:00:00 2001
From: Pavel Karpy
Date: Wed, 15 Mar 2023 04:07:27 +0300
Subject: [PATCH] [#67] node: Accept expired locked objects
Allow replication of any (expired too) locked object. Information about
object locking is considered to be presented on the _container nodes_.
Signed-off-by: Pavel Karpy
---
cmd/frostfs-node/object.go | 8 +++++++
pkg/core/object/fmt.go | 33 ++++++++++++++++++++++++++++-
pkg/core/object/fmt_test.go | 34 ++++++++++++++++++++++++++++--
pkg/services/object/put/local.go | 2 ++
pkg/services/object/put/service.go | 1 +
5 files changed, 75 insertions(+), 3 deletions(-)
diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go
index 8ff63f419..5e0307362 100644
--- a/cmd/frostfs-node/object.go
+++ b/cmd/frostfs-node/object.go
@@ -554,6 +554,10 @@ type engineWithNotifications struct {
defaultTopic string
}
+func (e engineWithNotifications) IsLocked(address oid.Address) (bool, error) {
+ return e.base.IsLocked(address)
+}
+
func (e engineWithNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
return e.base.Delete(tombstone, toDelete)
}
@@ -587,6 +591,10 @@ type engineWithoutNotifications struct {
engine *engine.StorageEngine
}
+func (e engineWithoutNotifications) IsLocked(address oid.Address) (bool, error) {
+ return e.IsLocked(address)
+}
+
func (e engineWithoutNotifications) Delete(tombstone oid.Address, toDelete []oid.ID) error {
var prm engine.InhumePrm
diff --git a/pkg/core/object/fmt.go b/pkg/core/object/fmt.go
index b3bafe6c8..804f0bd40 100644
--- a/pkg/core/object/fmt.go
+++ b/pkg/core/object/fmt.go
@@ -27,6 +27,7 @@ type FormatValidatorOption func(*cfg)
type cfg struct {
netState netmap.State
+ e LockSource
}
// DeleteHandler is an interface of delete queue processor.
@@ -38,6 +39,12 @@ type DeleteHandler interface {
DeleteObjects(oid.Address, ...oid.Address) error
}
+// LockSource is a source of lock relations between the objects.
+type LockSource interface {
+ // IsLocked must clarify object's lock status.
+ IsLocked(address oid.Address) (bool, error)
+}
+
// Locker is an object lock storage interface.
type Locker interface {
// Lock list of objects as locked by locker in the specified container.
@@ -319,7 +326,24 @@ func (v *FormatValidator) checkExpiration(obj *object.Object) error {
}
if exp < v.netState.CurrentEpoch() {
- return errExpired
+ // an object could be expired but locked;
+ // put such an object is a correct operation
+
+ cID, _ := obj.ContainerID()
+ oID, _ := obj.ID()
+
+ var addr oid.Address
+ addr.SetContainer(cID)
+ addr.SetObject(oID)
+
+ locked, err := v.e.IsLocked(addr)
+ if err != nil {
+ return fmt.Errorf("locking status check for an expired object: %w", err)
+ }
+
+ if !locked {
+ return errExpired
+ }
}
return nil
@@ -380,3 +404,10 @@ func WithNetState(netState netmap.State) FormatValidatorOption {
c.netState = netState
}
}
+
+// WithLockSource return option to set the Storage Engine.
+func WithLockSource(e LockSource) FormatValidatorOption {
+ return func(c *cfg) {
+ c.e = e
+ }
+}
diff --git a/pkg/core/object/fmt_test.go b/pkg/core/object/fmt_test.go
index 3f2ea4634..563c7827d 100644
--- a/pkg/core/object/fmt_test.go
+++ b/pkg/core/object/fmt_test.go
@@ -36,13 +36,26 @@ func (s testNetState) CurrentEpoch() uint64 {
return s.epoch
}
+type testLockSource struct {
+ m map[oid.Address]bool
+}
+
+func (t testLockSource) IsLocked(address oid.Address) (bool, error) {
+ return t.m[address], nil
+}
+
func TestFormatValidator_Validate(t *testing.T) {
const curEpoch = 13
+ ls := testLockSource{
+ m: make(map[oid.Address]bool),
+ }
+
v := NewFormatValidator(
WithNetState(testNetState{
epoch: curEpoch,
}),
+ WithLockSource(ls),
)
ownerKey, err := keys.NewPrivateKey()
@@ -229,8 +242,25 @@ func TestFormatValidator_Validate(t *testing.T) {
t.Run("expired object", func(t *testing.T) {
val := strconv.FormatUint(curEpoch-1, 10)
- err := v.Validate(fn(val), false)
- require.ErrorIs(t, err, errExpired)
+ obj := fn(val)
+
+ t.Run("non-locked", func(t *testing.T) {
+ err := v.Validate(obj, false)
+ require.ErrorIs(t, err, errExpired)
+ })
+
+ t.Run("locked", func(t *testing.T) {
+ var addr oid.Address
+ oID, _ := obj.ID()
+ cID, _ := obj.ContainerID()
+
+ addr.SetContainer(cID)
+ addr.SetObject(oID)
+ ls.m[addr] = true
+
+ err := v.Validate(obj, false)
+ require.NoError(t, err)
+ })
})
t.Run("alive object", func(t *testing.T) {
diff --git a/pkg/services/object/put/local.go b/pkg/services/object/put/local.go
index 009df95c3..f344f77e9 100644
--- a/pkg/services/object/put/local.go
+++ b/pkg/services/object/put/local.go
@@ -20,6 +20,8 @@ type ObjectStorage interface {
// Lock must lock passed objects
// and return any appeared error.
Lock(locker oid.Address, toLock []oid.ID) error
+ // IsLocked must clarify object's lock status.
+ IsLocked(oid.Address) (bool, error)
}
type localTarget struct {
diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go
index 338e5485b..b74c97d49 100644
--- a/pkg/services/object/put/service.go
+++ b/pkg/services/object/put/service.go
@@ -101,6 +101,7 @@ func WithMaxSizeSource(v MaxSizeSource) Option {
func WithObjectStorage(v ObjectStorage) Option {
return func(c *cfg) {
c.localStore = v
+ c.fmtValidatorOpts = append(c.fmtValidatorOpts, object.WithLockSource(v))
}
}