From 3495fc14cb5c989fe12b7bebf18f64c43e737e38 Mon Sep 17 00:00:00 2001 From: Anton Nikiforov Date: Sun, 28 May 2023 22:37:37 +0300 Subject: [PATCH] [#394] node: Use `Context` in `Blobovniczas.Iterate()` Signed-off-by: Anton Nikiforov --- cmd/frostfs-lens/internal/blobovnicza/list.go | 3 +- .../blobovnicza/iterate.go | 12 ++++++-- .../blobovnicza/iterate_test.go | 9 +++--- .../blobstor/blobovniczatree/control.go | 3 +- .../blobstor/blobovniczatree/delete.go | 2 +- .../blobstor/blobovniczatree/exists.go | 2 +- .../blobstor/blobovniczatree/get.go | 2 +- .../blobstor/blobovniczatree/get_range.go | 2 +- .../blobstor/blobovniczatree/iterate.go | 29 ++++++++++++------- .../blobstor/blobovniczatree/put.go | 2 +- 10 files changed, 41 insertions(+), 25 deletions(-) diff --git a/cmd/frostfs-lens/internal/blobovnicza/list.go b/cmd/frostfs-lens/internal/blobovnicza/list.go index 67242a7d..d327dbc4 100644 --- a/cmd/frostfs-lens/internal/blobovnicza/list.go +++ b/cmd/frostfs-lens/internal/blobovnicza/list.go @@ -1,6 +1,7 @@ package blobovnicza import ( + "context" "fmt" "io" @@ -33,6 +34,6 @@ func listFunc(cmd *cobra.Command, _ []string) { blz := openBlobovnicza(cmd) defer blz.Close() - err := blobovnicza.IterateAddresses(blz, wAddr) + err := blobovnicza.IterateAddresses(context.Background(), blz, wAddr) common.ExitOnErr(cmd, common.Errf("blobovnicza iterator failure: %w", err)) } diff --git a/pkg/local_object_storage/blobovnicza/iterate.go b/pkg/local_object_storage/blobovnicza/iterate.go index 1adfacbc..8f835705 100644 --- a/pkg/local_object_storage/blobovnicza/iterate.go +++ b/pkg/local_object_storage/blobovnicza/iterate.go @@ -1,6 +1,7 @@ package blobovnicza import ( + "context" "fmt" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -117,12 +118,17 @@ type IterateRes struct { // Returns handler's errors directly. Returns nil after iterating finish. // // Handler should not retain object data. Handler must not be nil. -func (b *Blobovnicza) Iterate(prm IteratePrm) (IterateRes, error) { +func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes, error) { var elem IterationElement if err := b.boltDB.View(func(tx *bbolt.Tx) error { return tx.ForEach(func(name []byte, buck *bbolt.Bucket) error { return buck.ForEach(func(k, v []byte) error { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } if prm.decodeAddresses { if err := addressFromKey(&elem.addr, k); err != nil { if prm.ignoreErrors { @@ -147,7 +153,7 @@ func (b *Blobovnicza) Iterate(prm IteratePrm) (IterateRes, error) { } // IterateAddresses is a helper function which iterates over Blobovnicza and passes addresses of the objects to f. -func IterateAddresses(blz *Blobovnicza, f func(oid.Address) error) error { +func IterateAddresses(ctx context.Context, blz *Blobovnicza, f func(oid.Address) error) error { var prm IteratePrm prm.DecodeAddresses() @@ -157,7 +163,7 @@ func IterateAddresses(blz *Blobovnicza, f func(oid.Address) error) error { return f(elem.Address()) }) - _, err := blz.Iterate(prm) + _, err := blz.Iterate(ctx, prm) return err } diff --git a/pkg/local_object_storage/blobovnicza/iterate_test.go b/pkg/local_object_storage/blobovnicza/iterate_test.go index 6ecb20c7..505685ce 100644 --- a/pkg/local_object_storage/blobovnicza/iterate_test.go +++ b/pkg/local_object_storage/blobovnicza/iterate_test.go @@ -1,6 +1,7 @@ package blobovnicza import ( + "context" "errors" "path/filepath" "testing" @@ -33,22 +34,22 @@ func TestBlobovniczaIterate(t *testing.T) { return nil } - _, err = b.Iterate(IteratePrm{handler: inc}) + _, err = b.Iterate(context.Background(), IteratePrm{handler: inc}) require.NoError(t, err) require.ElementsMatch(t, seen, data) seen = seen[:0] - _, err = b.Iterate(IteratePrm{handler: inc, decodeAddresses: true}) + _, err = b.Iterate(context.Background(), IteratePrm{handler: inc, decodeAddresses: true}) require.Error(t, err) seen = seen[:0] - _, err = b.Iterate(IteratePrm{handler: inc, decodeAddresses: true, ignoreErrors: true}) + _, err = b.Iterate(context.Background(), IteratePrm{handler: inc, decodeAddresses: true, ignoreErrors: true}) require.NoError(t, err) require.ElementsMatch(t, seen, data[:1]) seen = seen[:0] expectedErr := errors.New("stop iteration") - _, err = b.Iterate(IteratePrm{ + _, err = b.Iterate(context.Background(), IteratePrm{ decodeAddresses: true, handler: func(IterationElement) error { return expectedErr }, ignoreErrors: true, diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/control.go b/pkg/local_object_storage/blobstor/blobovniczatree/control.go index 0240c7a9..0b213563 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/control.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/control.go @@ -1,6 +1,7 @@ package blobovniczatree import ( + "context" "fmt" "path/filepath" @@ -26,7 +27,7 @@ func (b *Blobovniczas) Init() error { return nil } - return b.iterateLeaves(func(p string) (bool, error) { + return b.iterateLeaves(context.TODO(), func(p string) (bool, error) { blz, err := b.openBlobovniczaNoCache(p) if err != nil { return true, err diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/delete.go b/pkg/local_object_storage/blobstor/blobovniczatree/delete.go index f84d8fbe..5aa9062a 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/delete.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/delete.go @@ -48,7 +48,7 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co activeCache := make(map[string]struct{}) objectFound := false - err = b.iterateSortedLeaves(&prm.Address, func(p string) (bool, error) { + err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) { dirPath := filepath.Dir(p) // don't process active blobovnicza of the level twice diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/exists.go b/pkg/local_object_storage/blobstor/blobovniczatree/exists.go index 9d9fd4cb..e7852612 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/exists.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/exists.go @@ -40,7 +40,7 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common gPrm.SetAddress(prm.Address) var found bool - err := b.iterateSortedLeaves(&prm.Address, func(p string) (bool, error) { + err := b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) { dirPath := filepath.Dir(p) _, ok := activeCache[dirPath] diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/get.go b/pkg/local_object_storage/blobstor/blobovniczatree/get.go index 0b8ccb64..8955eb14 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/get.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/get.go @@ -46,7 +46,7 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G activeCache := make(map[string]struct{}) - err = b.iterateSortedLeaves(&prm.Address, func(p string) (bool, error) { + err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) { dirPath := filepath.Dir(p) _, ok := activeCache[dirPath] diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/get_range.go b/pkg/local_object_storage/blobstor/blobovniczatree/get_range.go index d6dfe51b..fb23a967 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/get_range.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/get_range.go @@ -46,7 +46,7 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re activeCache := make(map[string]struct{}) objectFound := false - err = b.iterateSortedLeaves(&prm.Address, func(p string) (bool, error) { + err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) { dirPath := filepath.Dir(p) _, ok := activeCache[dirPath] diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go b/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go index ad933da0..14071669 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/iterate.go @@ -12,8 +12,8 @@ import ( ) // Iterate iterates over all objects in b. -func (b *Blobovniczas) Iterate(_ context.Context, prm common.IteratePrm) (common.IterateRes, error) { - return common.IterateRes{}, b.iterateBlobovniczas(prm.IgnoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error { +func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) { + return common.IterateRes{}, b.iterateBlobovniczas(ctx, prm.IgnoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error { var subPrm blobovnicza.IteratePrm subPrm.SetHandler(func(elem blobovnicza.IterationElement) error { data, err := b.compression.Decompress(elem.ObjectData()) @@ -40,14 +40,14 @@ func (b *Blobovniczas) Iterate(_ context.Context, prm common.IteratePrm) (common }) subPrm.DecodeAddresses() - _, err := blz.Iterate(subPrm) + _, err := blz.Iterate(ctx, subPrm) return err }) } // iterator over all Blobovniczas in unsorted order. Break on f's error return. -func (b *Blobovniczas) iterateBlobovniczas(ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error { - return b.iterateLeaves(func(p string) (bool, error) { +func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error { + return b.iterateLeaves(ctx, func(p string) (bool, error) { blz, err := b.openBlobovnicza(p) if err != nil { if ignoreErrors { @@ -63,8 +63,9 @@ func (b *Blobovniczas) iterateBlobovniczas(ignoreErrors bool, f func(string, *bl } // iterator over the paths of Blobovniczas sorted by weight. -func (b *Blobovniczas) iterateSortedLeaves(addr *oid.Address, f func(string) (bool, error)) error { +func (b *Blobovniczas) iterateSortedLeaves(ctx context.Context, addr *oid.Address, f func(string) (bool, error)) error { _, err := b.iterateSorted( + ctx, addr, make([]string, 0, b.blzShallowDepth), b.blzShallowDepth, @@ -75,13 +76,14 @@ func (b *Blobovniczas) iterateSortedLeaves(addr *oid.Address, f func(string) (bo } // iterator over directories with Blobovniczas sorted by weight. -func (b *Blobovniczas) iterateDeepest(addr oid.Address, f func(string) (bool, error)) error { +func (b *Blobovniczas) iterateDeepest(ctx context.Context, addr oid.Address, f func(string) (bool, error)) error { depth := b.blzShallowDepth if depth > 0 { depth-- } _, err := b.iterateSorted( + ctx, &addr, make([]string, 0, depth), depth, @@ -92,7 +94,7 @@ func (b *Blobovniczas) iterateDeepest(addr oid.Address, f func(string) (bool, er } // iterator over particular level of directories. -func (b *Blobovniczas) iterateSorted(addr *oid.Address, curPath []string, execDepth uint64, f func([]string) (bool, error)) (bool, error) { +func (b *Blobovniczas) iterateSorted(ctx context.Context, addr *oid.Address, curPath []string, execDepth uint64, f func([]string) (bool, error)) (bool, error) { indices := indexSlice(b.blzShallowWidth) hrw.SortSliceByValue(indices, addressHash(addr, filepath.Join(curPath...))) @@ -100,6 +102,11 @@ func (b *Blobovniczas) iterateSorted(addr *oid.Address, curPath []string, execDe exec := uint64(len(curPath)) == execDepth for i := range indices { + select { + case <-ctx.Done(): + return false, ctx.Err() + default: + } if i == 0 { curPath = append(curPath, u64ToHexString(indices[i])) } else { @@ -112,7 +119,7 @@ func (b *Blobovniczas) iterateSorted(addr *oid.Address, curPath []string, execDe } else if stop { return true, nil } - } else if stop, err := b.iterateSorted(addr, curPath, execDepth, f); err != nil { + } else if stop, err := b.iterateSorted(ctx, addr, curPath, execDepth, f); err != nil { return false, err } else if stop { return true, nil @@ -123,8 +130,8 @@ func (b *Blobovniczas) iterateSorted(addr *oid.Address, curPath []string, execDe } // iterator over the paths of Blobovniczas in random order. -func (b *Blobovniczas) iterateLeaves(f func(string) (bool, error)) error { - return b.iterateSortedLeaves(nil, f) +func (b *Blobovniczas) iterateLeaves(ctx context.Context, f func(string) (bool, error)) error { + return b.iterateSortedLeaves(ctx, nil, f) } // makes slice of uint64 values from 0 to number-1. diff --git a/pkg/local_object_storage/blobstor/blobovniczatree/put.go b/pkg/local_object_storage/blobstor/blobovniczatree/put.go index ec302d14..4e1d6621 100644 --- a/pkg/local_object_storage/blobstor/blobovniczatree/put.go +++ b/pkg/local_object_storage/blobstor/blobovniczatree/put.go @@ -45,7 +45,7 @@ func (b *Blobovniczas) Put(ctx context.Context, prm common.PutPrm) (common.PutRe PutPrm: putPrm, } - if err := b.iterateDeepest(prm.Address, it.iterate); err != nil { + if err := b.iterateDeepest(ctx, prm.Address, it.iterate); err != nil { return common.PutRes{}, err } else if it.ID == nil { if it.AllFull {