diff --git a/pkg/local_object_storage/engine/container.go b/pkg/local_object_storage/engine/container.go index 17002a0d4..0ad977679 100644 --- a/pkg/local_object_storage/engine/container.go +++ b/pkg/local_object_storage/engine/container.go @@ -37,14 +37,24 @@ func (r *ListContainersRes) Containers() []*cid.ID { } // ContainerSize returns sum of estimation container sizes among all shards. -func (e *StorageEngine) ContainerSize(prm *ContainerSizePrm) *ContainerSizeRes { - if e.metrics != nil { - defer elapsed(e.metrics.AddEstimateContainerSizeDuration)() +// +// Returns empty result if executions are blocked (see BlockExecution). +func (e *StorageEngine) ContainerSize(prm *ContainerSizePrm) (res *ContainerSizeRes) { + err := e.exec(func() error { + res = e.containerSize(prm) + return nil + }) + if err != nil { + e.log.Debug("container size exec failure", + zap.String("err", err.Error()), + ) } - return &ContainerSizeRes{ - size: e.containerSize(prm.cid), + if res == nil { + res = new(ContainerSizeRes) } + + return } // ContainerSize returns sum of estimation container sizes among all shards. @@ -52,35 +62,51 @@ func ContainerSize(e *StorageEngine, id *cid.ID) uint64 { return e.ContainerSize(&ContainerSizePrm{cid: id}).Size() } -func (e *StorageEngine) containerSize(id *cid.ID) (total uint64) { +func (e *StorageEngine) containerSize(prm *ContainerSizePrm) *ContainerSizeRes { + if e.metrics != nil { + defer elapsed(e.metrics.AddEstimateContainerSizeDuration)() + } + + var res ContainerSizeRes + e.iterateOverUnsortedShards(func(s *shard.Shard) (stop bool) { - size, err := shard.ContainerSize(s, id) + size, err := shard.ContainerSize(s, prm.cid) if err != nil { e.log.Warn("can't get container size", zap.Stringer("shard_id", s.ID()), - zap.Stringer("container_id", id), + zap.Stringer("container_id", prm.cid), zap.String("error", err.Error())) return false } - total += size + res.size += size return false }) - return total + return &res } // ListContainers returns unique container IDs presented in the engine objects. -func (e *StorageEngine) ListContainers(_ *ListContainersPrm) *ListContainersRes { - if e.metrics != nil { - defer elapsed(e.metrics.AddListContainersDuration)() +// +// Returns empty result if executions are blocked (see BlockExecution). +func (e *StorageEngine) ListContainers(_ *ListContainersPrm) (res *ListContainersRes) { + err := e.exec(func() error { + res = e.listContainers() + return nil + }) + if err != nil { + e.log.Debug("list containers exec failure", + zap.String("err", err.Error()), + ) } - return &ListContainersRes{ - containers: e.listContainers(), + if res == nil { + res = new(ListContainersRes) } + + return } // ListContainers returns unique container IDs presented in the engine objects. @@ -88,7 +114,11 @@ func ListContainers(e *StorageEngine) []*cid.ID { return e.ListContainers(&ListContainersPrm{}).Containers() } -func (e *StorageEngine) listContainers() []*cid.ID { +func (e *StorageEngine) listContainers() *ListContainersRes { + if e.metrics != nil { + defer elapsed(e.metrics.AddListContainersDuration)() + } + uniqueIDs := make(map[string]*cid.ID) e.iterateOverUnsortedShards(func(s *shard.Shard) (stop bool) { @@ -116,5 +146,7 @@ func (e *StorageEngine) listContainers() []*cid.ID { result = append(result, v) } - return result + return &ListContainersRes{ + containers: result, + } } diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index 59b652e80..3b7a828cf 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -1,6 +1,7 @@ package engine import ( + "errors" "fmt" "go.uber.org/zap" @@ -8,6 +9,10 @@ import ( // Open opens all StorageEngine's components. func (e *StorageEngine) Open() error { + return e.open() +} + +func (e *StorageEngine) open() error { e.mtx.RLock() defer e.mtx.RUnlock() @@ -34,8 +39,18 @@ func (e *StorageEngine) Init() error { return nil } -// Close releases all StorageEngine's components. +var errClosed = errors.New("storage engine is closed") + +// Close releases all StorageEngine's components. Waits for all data-related operations to complete. +// After the call, all the next ones will fail until the ResumeExecution call. +// +// Еhe method is supposed to be called when the application exits. func (e *StorageEngine) Close() error { + return e.setBlockExecErr(errClosed) +} + +// closes all shards. Never returns an error, shard errors are logged. +func (e *StorageEngine) close() error { e.mtx.RLock() defer e.mtx.RUnlock() @@ -54,3 +69,70 @@ func (e *StorageEngine) Close() error { return nil } + +// executes op if execution is not blocked, otherwise returns blocking error. +// +// Can be called concurrently with setBlockExecErr. +func (e *StorageEngine) exec(op func() error) error { + e.blockExec.mtx.RLock() + defer e.blockExec.mtx.RUnlock() + + if e.blockExec.err != nil { + return e.blockExec.err + } + + return op() +} + +// sets the flag of blocking execution of all data operations according to err: +// * err != nil, then blocks the execution. If exec wasn't blocked, calls close method. +// * otherwise, resumes execution. If exec was blocked, calls open method. +// +// Can be called concurrently with exec. In this case it waits for all executions to complete. +func (e *StorageEngine) setBlockExecErr(err error) error { + e.blockExec.mtx.Lock() + defer e.blockExec.mtx.Unlock() + + prevErr := e.blockExec.err + + e.blockExec.err = err + + if err == nil { + if prevErr != nil { // block -> ok + return e.open() + } + } else if prevErr == nil { // ok -> block + return e.close() + } + + // otherwise do nothing + + return nil +} + +// BlockExecution block blocks the execution of any data-related operation. All blocked ops will return err. +// To resume the execution, use ResumeExecution method. +// +// Сan be called regardless of the fact of the previous blocking. If execution wasn't blocked, releases all resources +// similar to Close. Can be called concurrently with Close and any data related method (waits for all executions +// to complete). +// +// Must not be called concurrently with either Open or Init. +// +// Note: technically passing nil error will resume the execution, otherwise, it is recommended to call ResumeExecution +// for this. +func (e *StorageEngine) BlockExecution(err error) error { + return e.setBlockExecErr(err) +} + +// ResumeExecution resumes the execution of any data-related operation. +// To block the execution, use BlockExecution method. +// +// Сan be called regardless of the fact of the previous blocking. If execution was blocked, prepares all resources +// similar to Open. Can be called concurrently with Close and any data related method (waits for all executions +// to complete). +// +// Must not be called concurrently with either Open or Init. +func (e *StorageEngine) ResumeExecution() error { + return e.setBlockExecErr(nil) +} diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index 01444a306..2f645cbac 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -26,7 +26,18 @@ func (p *DeletePrm) WithAddresses(addr ...*objectSDK.Address) *DeletePrm { } // Delete marks the objects to be removed. -func (e *StorageEngine) Delete(prm *DeletePrm) (*DeleteRes, error) { +// +// Returns an error if executions are blocked (see BlockExecution). +func (e *StorageEngine) Delete(prm *DeletePrm) (res *DeleteRes, err error) { + err = e.exec(func() error { + res, err = e.delete(prm) + return err + }) + + return +} + +func (e *StorageEngine) delete(prm *DeletePrm) (*DeleteRes, error) { if e.metrics != nil { defer elapsed(e.metrics.AddDeleteDuration)() } diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index a78b284e7..a3fcaa71b 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -18,6 +18,12 @@ type StorageEngine struct { shards map[string]*shard.Shard shardPools map[string]util.WorkerPool + + blockExec struct { + mtx sync.RWMutex + + err error + } } // Option represents StorageEngine's constructor option. diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index e7c7fd696..3d41fea6e 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -42,7 +42,18 @@ func (r *GetRes) Object() *object.Object { // did not allow to completely read the object part. // // Returns ErrNotFound if requested object is missing in local storage. -func (e *StorageEngine) Get(prm *GetPrm) (*GetRes, error) { +// +// Returns an error if executions are blocked (see BlockExecution). +func (e *StorageEngine) Get(prm *GetPrm) (res *GetRes, err error) { + err = e.exec(func() error { + res, err = e.get(prm) + return err + }) + + return +} + +func (e *StorageEngine) get(prm *GetPrm) (*GetRes, error) { if e.metrics != nil { defer elapsed(e.metrics.AddGetDuration)() } diff --git a/pkg/local_object_storage/engine/head.go b/pkg/local_object_storage/engine/head.go index d4abcf6bc..5449a7b71 100644 --- a/pkg/local_object_storage/engine/head.go +++ b/pkg/local_object_storage/engine/head.go @@ -57,7 +57,18 @@ func (r *HeadRes) Header() *object.Object { // // Returns object.ErrNotFound if requested object is missing in local storage. // Returns object.ErrAlreadyRemoved if requested object was inhumed. -func (e *StorageEngine) Head(prm *HeadPrm) (*HeadRes, error) { +// +// Returns an error if executions are blocked (see BlockExecution). +func (e *StorageEngine) Head(prm *HeadPrm) (res *HeadRes, err error) { + err = e.exec(func() error { + res, err = e.head(prm) + return err + }) + + return +} + +func (e *StorageEngine) head(prm *HeadPrm) (*HeadRes, error) { if e.metrics != nil { defer elapsed(e.metrics.AddHeadDuration)() } diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 806442a75..b1cb62703 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -49,7 +49,18 @@ var errInhumeFailure = errors.New("inhume operation failed") // Inhume calls metabase. Inhume method to mark object as removed. It won't be // removed physically from shard until `Delete` operation. -func (e *StorageEngine) Inhume(prm *InhumePrm) (*InhumeRes, error) { +// +// Returns an error if executions are blocked (see BlockExecution). +func (e *StorageEngine) Inhume(prm *InhumePrm) (res *InhumeRes, err error) { + err = e.exec(func() error { + res, err = e.inhume(prm) + return err + }) + + return +} + +func (e *StorageEngine) inhume(prm *InhumePrm) (*InhumeRes, error) { if e.metrics != nil { defer elapsed(e.metrics.AddInhumeDuration)() } @@ -63,9 +74,9 @@ func (e *StorageEngine) Inhume(prm *InhumePrm) (*InhumeRes, error) { shPrm.MarkAsGarbage(prm.addrs[i]) } - ok := e.inhume(prm.addrs[i], shPrm, true) + ok := e.inhumeAddr(prm.addrs[i], shPrm, true) if !ok { - ok = e.inhume(prm.addrs[i], shPrm, false) + ok = e.inhumeAddr(prm.addrs[i], shPrm, false) if !ok { return nil, errInhumeFailure } @@ -75,7 +86,7 @@ func (e *StorageEngine) Inhume(prm *InhumePrm) (*InhumeRes, error) { return new(InhumeRes), nil } -func (e *StorageEngine) inhume(addr *objectSDK.Address, prm *shard.InhumePrm, checkExists bool) (ok bool) { +func (e *StorageEngine) inhumeAddr(addr *objectSDK.Address, prm *shard.InhumePrm, checkExists bool) (ok bool) { root := false e.iterateOverSortedShards(addr, func(_ int, sh *shard.Shard) (stop bool) { diff --git a/pkg/local_object_storage/engine/put.go b/pkg/local_object_storage/engine/put.go index 79a07514f..c6da03e5b 100644 --- a/pkg/local_object_storage/engine/put.go +++ b/pkg/local_object_storage/engine/put.go @@ -33,7 +33,18 @@ func (p *PutPrm) WithObject(obj *object.Object) *PutPrm { // // Returns any error encountered that // did not allow to completely save the object. -func (e *StorageEngine) Put(prm *PutPrm) (*PutRes, error) { +// +// Returns an error if executions are blocked (see BlockExecution). +func (e *StorageEngine) Put(prm *PutPrm) (res *PutRes, err error) { + err = e.exec(func() error { + res, err = e.put(prm) + return err + }) + + return +} + +func (e *StorageEngine) put(prm *PutPrm) (*PutRes, error) { if e.metrics != nil { defer elapsed(e.metrics.AddPutDuration)() } diff --git a/pkg/local_object_storage/engine/range.go b/pkg/local_object_storage/engine/range.go index 7d58318ea..97ffe3a1c 100644 --- a/pkg/local_object_storage/engine/range.go +++ b/pkg/local_object_storage/engine/range.go @@ -60,7 +60,18 @@ func (r *RngRes) Object() *object.Object { // Returns ErrNotFound if requested object is missing in local storage. // Returns ErrAlreadyRemoved if requested object is inhumed. // Returns ErrRangeOutOfBounds if requested object range is out of bounds. -func (e *StorageEngine) GetRange(prm *RngPrm) (*RngRes, error) { +// +// Returns an error if executions are blocked (see BlockExecution). +func (e *StorageEngine) GetRange(prm *RngPrm) (res *RngRes, err error) { + err = e.exec(func() error { + res, err = e.getRange(prm) + return err + }) + + return +} + +func (e *StorageEngine) getRange(prm *RngPrm) (*RngRes, error) { if e.metrics != nil { defer elapsed(e.metrics.AddRangeDuration)() } diff --git a/pkg/local_object_storage/engine/select.go b/pkg/local_object_storage/engine/select.go index 63638ec3a..5a9634558 100644 --- a/pkg/local_object_storage/engine/select.go +++ b/pkg/local_object_storage/engine/select.go @@ -47,7 +47,18 @@ func (r *SelectRes) AddressList() []*object.Address { // Select selects the objects from local storage that match select parameters. // // Returns any error encountered that did not allow to completely select the objects. -func (e *StorageEngine) Select(prm *SelectPrm) (*SelectRes, error) { +// +// Returns an error if executions are blocked (see BlockExecution). +func (e *StorageEngine) Select(prm *SelectPrm) (res *SelectRes, err error) { + err = e.exec(func() error { + res, err = e._select(prm) + return err + }) + + return +} + +func (e *StorageEngine) _select(prm *SelectPrm) (*SelectRes, error) { if e.metrics != nil { defer elapsed(e.metrics.AddSearchDuration)() } @@ -97,7 +108,18 @@ func (e *StorageEngine) Select(prm *SelectPrm) (*SelectRes, error) { // List returns `limit` available physically storage object addresses in engine. // If limit is zero, then returns all available object addresses. -func (e *StorageEngine) List(limit uint64) (*SelectRes, error) { +// +// Returns an error if executions are blocked (see BlockExecution). +func (e *StorageEngine) List(limit uint64) (res *SelectRes, err error) { + err = e.exec(func() error { + res, err = e.list(limit) + return err + }) + + return +} + +func (e *StorageEngine) list(limit uint64) (*SelectRes, error) { if e.metrics != nil { defer elapsed(e.metrics.AddListObjectsDuration)() }