diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go b/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go index 9918801b92..ad933da0b0 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go @@ -1,6 +1,7 @@ package blobovniczatree import ( + "context" "fmt" "path/filepath" @@ -11,7 +12,7 @@ import ( ) // Iterate iterates over all objects in b. -func (b *Blobovniczas) Iterate(prm common.IteratePrm) (common.IterateRes, error) { +func (b *Blobovniczas) Iterate(_ context.Context, prm common.IteratePrm) (common.IterateRes, error) { return common.IterateRes{}, b.iterateBlobovniczas(prm.IgnoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error { var subPrm blobovnicza.IteratePrm subPrm.SetHandler(func(elem blobovnicza.IterationElement) error { diff --git a/pkg/local_object_storage/blobstor/common/storage.go b/pkg/local_object_storage/blobstor/common/storage.go index 801d32c1ef..c5d187f302 100644 --- a/pkg/local_object_storage/blobstor/common/storage.go +++ b/pkg/local_object_storage/blobstor/common/storage.go @@ -25,5 +25,5 @@ type Storage interface { Exists(context.Context, ExistsPrm) (ExistsRes, error) Put(context.Context, PutPrm) (PutRes, error) Delete(context.Context, DeletePrm) (DeleteRes, error) - Iterate(IteratePrm) (IterateRes, error) + Iterate(context.Context, IteratePrm) (IterateRes, error) } diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 75f63193a0..b0879f68ec 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -100,11 +100,11 @@ func addressFromString(s string) (oid.Address, error) { } // Iterate iterates over all stored objects. -func (t *FSTree) Iterate(prm common.IteratePrm) (common.IterateRes, error) { - return common.IterateRes{}, t.iterate(0, []string{t.RootPath}, prm) +func (t *FSTree) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) { + return common.IterateRes{}, t.iterate(ctx, 0, []string{t.RootPath}, prm) } -func (t *FSTree) iterate(depth uint64, curPath []string, prm common.IteratePrm) error { +func (t *FSTree) iterate(ctx context.Context, depth uint64, curPath []string, prm common.IteratePrm) error { curName := strings.Join(curPath[1:], "") des, err := os.ReadDir(filepath.Join(curPath...)) if err != nil { @@ -119,10 +119,15 @@ func (t *FSTree) iterate(depth uint64, curPath []string, prm common.IteratePrm) curPath = append(curPath, "") for i := range des { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } curPath[l] = des[i].Name() if !isLast && des[i].IsDir() { - err := t.iterate(depth+1, curPath, prm) + err := t.iterate(ctx, depth+1, curPath, prm) if err != nil { // Must be error from handler in case errors are ignored. // Need to report. diff --git a/pkg/local_object_storage/blobstor/internal/blobstortest/iterate.go b/pkg/local_object_storage/blobstor/internal/blobstortest/iterate.go index 83ada9607a..34622c8579 100644 --- a/pkg/local_object_storage/blobstor/internal/blobstortest/iterate.go +++ b/pkg/local_object_storage/blobstor/internal/blobstortest/iterate.go @@ -49,7 +49,7 @@ func runTestNormalHandler(t *testing.T, s common.Storage, objects []objectDesc) return nil } - _, err := s.Iterate(iterPrm) + _, err := s.Iterate(context.Background(), iterPrm) require.NoError(t, err) require.Equal(t, len(objects), len(seen)) for i := range objects { @@ -72,7 +72,7 @@ func runTestLazyHandler(t *testing.T, s common.Storage, objects []objectDesc) { return nil } - _, err := s.Iterate(iterPrm) + _, err := s.Iterate(context.Background(), iterPrm) require.NoError(t, err) require.Equal(t, len(objects), len(seen)) for i := range objects { @@ -107,7 +107,7 @@ func runTestIgnoreLogicalErrors(t *testing.T, s common.Storage, objects []object return nil } - _, err := s.Iterate(iterPrm) + _, err := s.Iterate(context.Background(), iterPrm) require.Equal(t, err, logicErr) require.Equal(t, len(objects)/2, len(seen)) for i := range objects { diff --git a/pkg/local_object_storage/blobstor/iterate.go b/pkg/local_object_storage/blobstor/iterate.go index 2c37ee7764..4e52f0abf0 100644 --- a/pkg/local_object_storage/blobstor/iterate.go +++ b/pkg/local_object_storage/blobstor/iterate.go @@ -1,6 +1,7 @@ package blobstor import ( + "context" "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -16,12 +17,12 @@ import ( // did not allow to completely iterate over the storage. // // If handler returns an error, method wraps and returns it immediately. -func (b *BlobStor) Iterate(prm common.IteratePrm) (common.IterateRes, error) { +func (b *BlobStor) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) { b.modeMtx.RLock() defer b.modeMtx.RUnlock() for i := range b.storage { - _, err := b.storage[i].Storage.Iterate(prm) + _, err := b.storage[i].Storage.Iterate(ctx, prm) if err != nil && !prm.IgnoreErrors { return common.IterateRes{}, fmt.Errorf("blobstor iterator failure: %w", err) } @@ -31,7 +32,7 @@ func (b *BlobStor) Iterate(prm common.IteratePrm) (common.IterateRes, error) { // IterateBinaryObjects is a helper function which iterates over BlobStor and passes binary objects to f. // Errors related to object reading and unmarshaling are logged and skipped. -func IterateBinaryObjects(blz *BlobStor, f func(addr oid.Address, data []byte, descriptor []byte) error) error { +func IterateBinaryObjects(ctx context.Context, blz *BlobStor, f func(addr oid.Address, data []byte, descriptor []byte) error) error { var prm common.IteratePrm prm.Handler = func(elem common.IterationElement) error { @@ -45,7 +46,7 @@ func IterateBinaryObjects(blz *BlobStor, f func(addr oid.Address, data []byte, d return nil } - _, err := blz.Iterate(prm) + _, err := blz.Iterate(ctx, prm) return err } diff --git a/pkg/local_object_storage/blobstor/iterate_test.go b/pkg/local_object_storage/blobstor/iterate_test.go index 6488ff5fc7..c35869655d 100644 --- a/pkg/local_object_storage/blobstor/iterate_test.go +++ b/pkg/local_object_storage/blobstor/iterate_test.go @@ -68,7 +68,7 @@ func TestIterateObjects(t *testing.T) { require.NoError(t, err) } - err := IterateBinaryObjects(blobStor, func(addr oid.Address, data []byte, descriptor []byte) error { + err := IterateBinaryObjects(context.Background(), blobStor, func(addr oid.Address, data []byte, descriptor []byte) error { v, ok := mObjs[string(data)] require.True(t, ok) diff --git a/pkg/local_object_storage/blobstor/memstore/memstore.go b/pkg/local_object_storage/blobstor/memstore/memstore.go index e435cfef40..b6cca2551c 100644 --- a/pkg/local_object_storage/blobstor/memstore/memstore.go +++ b/pkg/local_object_storage/blobstor/memstore/memstore.go @@ -126,7 +126,7 @@ func (s *memstoreImpl) Delete(_ context.Context, req common.DeletePrm) (common.D return common.DeleteRes{}, logicerr.Wrap(apistatus.ObjectNotFound{}) } -func (s *memstoreImpl) Iterate(req common.IteratePrm) (common.IterateRes, error) { +func (s *memstoreImpl) Iterate(_ context.Context, req common.IteratePrm) (common.IterateRes, error) { s.mu.RLock() defer s.mu.RUnlock() for k, v := range s.objs { diff --git a/pkg/local_object_storage/blobstor/perf_test.go b/pkg/local_object_storage/blobstor/perf_test.go index f219825302..5245146cb9 100644 --- a/pkg/local_object_storage/blobstor/perf_test.go +++ b/pkg/local_object_storage/blobstor/perf_test.go @@ -207,7 +207,7 @@ func BenchmarkSubstorageIteratePerf(b *testing.B) { // Benchmark iterate cnt := 0 b.ResetTimer() - _, err := st.Iterate(common.IteratePrm{ + _, err := st.Iterate(context.Background(), common.IteratePrm{ Handler: func(elem common.IterationElement) error { cnt++ return nil diff --git a/pkg/local_object_storage/blobstor/teststore/teststore.go b/pkg/local_object_storage/blobstor/teststore/teststore.go index 24d742fdaf..c7179eb459 100644 --- a/pkg/local_object_storage/blobstor/teststore/teststore.go +++ b/pkg/local_object_storage/blobstor/teststore/teststore.go @@ -202,14 +202,14 @@ func (s *TestStore) Delete(ctx context.Context, req common.DeletePrm) (common.De } } -func (s *TestStore) Iterate(req common.IteratePrm) (common.IterateRes, error) { +func (s *TestStore) Iterate(ctx context.Context, req common.IteratePrm) (common.IterateRes, error) { s.mu.RLock() defer s.mu.RUnlock() switch { case s.overrides.Iterate != nil: return s.overrides.Iterate(req) case s.st != nil: - return s.st.Iterate(req) + return s.st.Iterate(ctx, req) default: panic(fmt.Sprintf("unexpected storage call: Iterate(%+v)", req)) } diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index f885451af7..e8e2bd4d7d 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -170,7 +170,7 @@ func (s *Shard) refillMetabase(ctx context.Context) error { obj := objectSDK.New() - err = blobstor.IterateBinaryObjects(s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error { + err = blobstor.IterateBinaryObjects(ctx, s.blobStor, func(addr oid.Address, data []byte, descriptor []byte) error { if err := obj.Unmarshal(data); err != nil { s.log.Warn(logs.ShardCouldNotUnmarshalObject, zap.Stringer("address", addr), diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 28409f6094..767435ebf3 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -35,6 +35,17 @@ const ( // runFlushLoop starts background workers which periodically flush objects to the blobstor. func (c *cache) runFlushLoop() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + ch := c.closeCh + c.wg.Add(1) + go func() { + <-ch + cancel() + c.wg.Done() + }() + for i := 0; i < c.workersCount; i++ { c.wg.Add(1) go c.workerFlushSmall() @@ -42,7 +53,7 @@ func (c *cache) runFlushLoop() { c.wg.Add(1) go func() { - c.workerFlushBig(context.TODO()) + c.workerFlushBig(ctx) c.wg.Done() }() @@ -211,7 +222,7 @@ func (c *cache) flushFSTree(ctx context.Context, ignoreErrors bool) error { return nil } - _, err := c.fsTree.Iterate(prm) + _, err := c.fsTree.Iterate(ctx, prm) return err }