From 6c0b29e3e36189344547fed50b59024552b050b4 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 11 Nov 2021 16:58:07 +0300 Subject: [PATCH] [#922] storage engine: Prevent any operations after first Close call Make `BlockExecution` / `ResumeExecution` to not release per-shard worker pools. Make `StorageEngine.Close` to block these methods and any data-related operations. It is still releases the pools. Signed-off-by: Leonard Lyubich --- pkg/local_object_storage/engine/control.go | 24 ++++++---- .../engine/control_test.go | 46 +++++++++++++++++++ .../engine/engine_test.go | 10 ++++ .../engine/inhume_test.go | 2 +- 4 files changed, 73 insertions(+), 9 deletions(-) create mode 100644 pkg/local_object_storage/engine/control_test.go diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index 3e3f44184f..f575365112 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -42,7 +42,7 @@ func (e *StorageEngine) Init() error { var errClosed = errors.New("storage engine is closed") // Close releases all StorageEngine's components. Waits for all data-related operations to complete. -// After the call, all the next ones will fail until the ResumeExecution call. +// After the call, all the next ones will fail. // // The method is supposed to be called when the application exits. func (e *StorageEngine) Close() error { @@ -50,12 +50,14 @@ func (e *StorageEngine) Close() error { } // closes all shards. Never returns an error, shard errors are logged. -func (e *StorageEngine) close() error { +func (e *StorageEngine) close(releasePools bool) error { e.mtx.RLock() defer e.mtx.RUnlock() - for _, p := range e.shardPools { - p.Release() + if releasePools { + for _, p := range e.shardPools { + p.Release() + } } for id, sh := range e.shards { @@ -85,7 +87,8 @@ func (e *StorageEngine) execIfNotBlocked(op func() error) error { } // sets the flag of blocking execution of all data operations according to err: -// * err != nil, then blocks the execution. If exec wasn't blocked, calls close method. +// * err != nil, then blocks the execution. If exec wasn't blocked, calls close method +// (if err == errClosed => additionally releases pools and does not allow to resume executions). // * otherwise, resumes execution. If exec was blocked, calls open method. // // Can be called concurrently with exec. In this case it waits for all executions to complete. @@ -95,6 +98,11 @@ func (e *StorageEngine) setBlockExecErr(err error) error { prevErr := e.blockExec.err + wasClosed := errors.Is(prevErr, errClosed) + if wasClosed { + return errClosed + } + e.blockExec.err = err if err == nil { @@ -102,7 +110,7 @@ func (e *StorageEngine) setBlockExecErr(err error) error { return e.open() } } else if prevErr == nil { // ok -> block - return e.close() + return e.close(errors.Is(err, errClosed)) } // otherwise do nothing @@ -115,7 +123,7 @@ func (e *StorageEngine) setBlockExecErr(err error) error { // // Сan be called regardless of the fact of the previous blocking. If execution wasn't blocked, releases all resources // similar to Close. Can be called concurrently with Close and any data related method (waits for all executions -// to complete). +// to complete). Returns error if any Close has been called before. // // Must not be called concurrently with either Open or Init. // @@ -130,7 +138,7 @@ func (e *StorageEngine) BlockExecution(err error) error { // // Сan be called regardless of the fact of the previous blocking. If execution was blocked, prepares all resources // similar to Open. Can be called concurrently with Close and any data related method (waits for all executions -// to complete). +// to complete). Returns error if any Close has been called before. // // Must not be called concurrently with either Open or Init. func (e *StorageEngine) ResumeExecution() error { diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go new file mode 100644 index 0000000000..a48964fcf3 --- /dev/null +++ b/pkg/local_object_storage/engine/control_test.go @@ -0,0 +1,46 @@ +package engine + +import ( + "errors" + "testing" + + cidtest "github.com/nspcc-dev/neofs-api-go/pkg/container/id/test" + "github.com/stretchr/testify/require" +) + +func TestExecBlocks(t *testing.T) { + e := testNewEngineWithShardNum(t, 2) // number doesn't matter in this test, 2 is several but not many + defer e.Close() + + // put some object + obj := generateRawObjectWithCID(t, cidtest.Generate()).Object() + + addr := obj.Address() + + require.NoError(t, Put(e, obj)) + + // block executions + errBlock := errors.New("block exec err") + + require.NoError(t, e.BlockExecution(errBlock)) + + // try to exec some op + _, err := Head(e, addr) + require.ErrorIs(t, err, errBlock) + + // resume executions + require.NoError(t, e.ResumeExecution()) + + _, err = Head(e, addr) // can be any data-related op + require.NoError(t, err) + + // close + require.NoError(t, e.Close()) + + // try exec after close + _, err = Head(e, addr) + require.Error(t, err) + + // try to resume + require.Error(t, e.ResumeExecution()) +} diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index a1fcc16303..9f7c21132f 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -116,3 +116,13 @@ func addAttribute(obj *object.RawObject, key, val string) { attrs = append(attrs, attr) obj.SetAttributes(attrs...) } + +func testNewEngineWithShardNum(t *testing.T, num int) *StorageEngine { + shards := make([]*shard.Shard, 0, num) + + for i := 0; i < num; i++ { + shards = append(shards, testNewShard(t, i)) + } + + return testNewEngineWithShards(shards...) +} diff --git a/pkg/local_object_storage/engine/inhume_test.go b/pkg/local_object_storage/engine/inhume_test.go index 15ea9b9712..47fa3e9807 100644 --- a/pkg/local_object_storage/engine/inhume_test.go +++ b/pkg/local_object_storage/engine/inhume_test.go @@ -34,7 +34,7 @@ func TestStorageEngine_Inhume(t *testing.T) { link.SetSplitID(splitID) t.Run("delete small object", func(t *testing.T) { - e := testNewEngineWithShards(testNewShard(t, 1)) + e := testNewEngineWithShardNum(t, 1) defer e.Close() err := Put(e, parent.Object())