node: Use Context in Blobovniczas.Iterate() #403

Merged
fyrchik merged 1 commit from acid-ant/frostfs-node:bugfix/394-blob-iter-ctx into master 2023-05-31 10:09:23 +00:00
10 changed files with 41 additions and 25 deletions

View file

@ -1,6 +1,7 @@
package blobovnicza package blobovnicza
import ( import (
"context"
"fmt" "fmt"
"io" "io"
@ -33,6 +34,6 @@ func listFunc(cmd *cobra.Command, _ []string) {
blz := openBlobovnicza(cmd) blz := openBlobovnicza(cmd)
defer blz.Close() defer blz.Close()
err := blobovnicza.IterateAddresses(blz, wAddr) err := blobovnicza.IterateAddresses(context.Background(), blz, wAddr)
common.ExitOnErr(cmd, common.Errf("blobovnicza iterator failure: %w", err)) common.ExitOnErr(cmd, common.Errf("blobovnicza iterator failure: %w", err))
} }

View file

@ -1,6 +1,7 @@
package blobovnicza package blobovnicza
import ( import (
"context"
"fmt" "fmt"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -117,12 +118,17 @@ type IterateRes struct {
// Returns handler's errors directly. Returns nil after iterating finish. // Returns handler's errors directly. Returns nil after iterating finish.
// //
// Handler should not retain object data. Handler must not be nil. // Handler should not retain object data. Handler must not be nil.
func (b *Blobovnicza) Iterate(prm IteratePrm) (IterateRes, error) { func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes, error) {
var elem IterationElement var elem IterationElement
if err := b.boltDB.View(func(tx *bbolt.Tx) error { if err := b.boltDB.View(func(tx *bbolt.Tx) error {
return tx.ForEach(func(name []byte, buck *bbolt.Bucket) error { return tx.ForEach(func(name []byte, buck *bbolt.Bucket) error {
return buck.ForEach(func(k, v []byte) error { return buck.ForEach(func(k, v []byte) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if prm.decodeAddresses { if prm.decodeAddresses {
if err := addressFromKey(&elem.addr, k); err != nil { if err := addressFromKey(&elem.addr, k); err != nil {
if prm.ignoreErrors { if prm.ignoreErrors {
@ -147,7 +153,7 @@ func (b *Blobovnicza) Iterate(prm IteratePrm) (IterateRes, error) {
} }
// IterateAddresses is a helper function which iterates over Blobovnicza and passes addresses of the objects to f. // IterateAddresses is a helper function which iterates over Blobovnicza and passes addresses of the objects to f.
func IterateAddresses(blz *Blobovnicza, f func(oid.Address) error) error { func IterateAddresses(ctx context.Context, blz *Blobovnicza, f func(oid.Address) error) error {
var prm IteratePrm var prm IteratePrm
prm.DecodeAddresses() prm.DecodeAddresses()
@ -157,7 +163,7 @@ func IterateAddresses(blz *Blobovnicza, f func(oid.Address) error) error {
return f(elem.Address()) return f(elem.Address())
}) })
_, err := blz.Iterate(prm) _, err := blz.Iterate(ctx, prm)
return err return err
} }

View file

@ -1,6 +1,7 @@
package blobovnicza package blobovnicza
import ( import (
"context"
"errors" "errors"
"path/filepath" "path/filepath"
"testing" "testing"
@ -33,22 +34,22 @@ func TestBlobovniczaIterate(t *testing.T) {
return nil return nil
} }
_, err = b.Iterate(IteratePrm{handler: inc}) _, err = b.Iterate(context.Background(), IteratePrm{handler: inc})
require.NoError(t, err) require.NoError(t, err)
require.ElementsMatch(t, seen, data) require.ElementsMatch(t, seen, data)
seen = seen[:0] seen = seen[:0]
_, err = b.Iterate(IteratePrm{handler: inc, decodeAddresses: true}) _, err = b.Iterate(context.Background(), IteratePrm{handler: inc, decodeAddresses: true})
require.Error(t, err) require.Error(t, err)
seen = seen[:0] seen = seen[:0]
_, err = b.Iterate(IteratePrm{handler: inc, decodeAddresses: true, ignoreErrors: true}) _, err = b.Iterate(context.Background(), IteratePrm{handler: inc, decodeAddresses: true, ignoreErrors: true})
require.NoError(t, err) require.NoError(t, err)
require.ElementsMatch(t, seen, data[:1]) require.ElementsMatch(t, seen, data[:1])
seen = seen[:0] seen = seen[:0]
expectedErr := errors.New("stop iteration") expectedErr := errors.New("stop iteration")
_, err = b.Iterate(IteratePrm{ _, err = b.Iterate(context.Background(), IteratePrm{
decodeAddresses: true, decodeAddresses: true,
handler: func(IterationElement) error { return expectedErr }, handler: func(IterationElement) error { return expectedErr },
ignoreErrors: true, ignoreErrors: true,

View file

@ -1,6 +1,7 @@
package blobovniczatree package blobovniczatree
import ( import (
"context"
"fmt" "fmt"
"path/filepath" "path/filepath"
@ -26,7 +27,7 @@ func (b *Blobovniczas) Init() error {
return nil return nil
} }
return b.iterateLeaves(func(p string) (bool, error) { return b.iterateLeaves(context.TODO(), func(p string) (bool, error) {
blz, err := b.openBlobovniczaNoCache(p) blz, err := b.openBlobovniczaNoCache(p)
if err != nil { if err != nil {
return true, err return true, err

View file

@ -48,7 +48,7 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
activeCache := make(map[string]struct{}) activeCache := make(map[string]struct{})
objectFound := false objectFound := false
err = b.iterateSortedLeaves(&prm.Address, func(p string) (bool, error) { err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
dirPath := filepath.Dir(p) dirPath := filepath.Dir(p)
// don't process active blobovnicza of the level twice // don't process active blobovnicza of the level twice

View file

@ -40,7 +40,7 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
gPrm.SetAddress(prm.Address) gPrm.SetAddress(prm.Address)
var found bool var found bool
err := b.iterateSortedLeaves(&prm.Address, func(p string) (bool, error) { err := b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
dirPath := filepath.Dir(p) dirPath := filepath.Dir(p)
_, ok := activeCache[dirPath] _, ok := activeCache[dirPath]

View file

@ -46,7 +46,7 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
activeCache := make(map[string]struct{}) activeCache := make(map[string]struct{})
err = b.iterateSortedLeaves(&prm.Address, func(p string) (bool, error) { err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
dirPath := filepath.Dir(p) dirPath := filepath.Dir(p)
_, ok := activeCache[dirPath] _, ok := activeCache[dirPath]

View file

@ -46,7 +46,7 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
activeCache := make(map[string]struct{}) activeCache := make(map[string]struct{})
objectFound := false objectFound := false
err = b.iterateSortedLeaves(&prm.Address, func(p string) (bool, error) { err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
dirPath := filepath.Dir(p) dirPath := filepath.Dir(p)
_, ok := activeCache[dirPath] _, ok := activeCache[dirPath]

View file

@ -12,8 +12,8 @@ import (
) )
// Iterate iterates over all objects in b. // Iterate iterates over all objects in b.
func (b *Blobovniczas) Iterate(_ context.Context, prm common.IteratePrm) (common.IterateRes, error) { func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
return common.IterateRes{}, b.iterateBlobovniczas(prm.IgnoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error { return common.IterateRes{}, b.iterateBlobovniczas(ctx, prm.IgnoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error {
var subPrm blobovnicza.IteratePrm var subPrm blobovnicza.IteratePrm
subPrm.SetHandler(func(elem blobovnicza.IterationElement) error { subPrm.SetHandler(func(elem blobovnicza.IterationElement) error {
data, err := b.compression.Decompress(elem.ObjectData()) data, err := b.compression.Decompress(elem.ObjectData())
@ -40,14 +40,14 @@ func (b *Blobovniczas) Iterate(_ context.Context, prm common.IteratePrm) (common
}) })
subPrm.DecodeAddresses() subPrm.DecodeAddresses()
_, err := blz.Iterate(subPrm) _, err := blz.Iterate(ctx, subPrm)
return err return err
}) })
} }
// iterator over all Blobovniczas in unsorted order. Break on f's error return. // iterator over all Blobovniczas in unsorted order. Break on f's error return.
func (b *Blobovniczas) iterateBlobovniczas(ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error { func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error {
return b.iterateLeaves(func(p string) (bool, error) { return b.iterateLeaves(ctx, func(p string) (bool, error) {
blz, err := b.openBlobovnicza(p) blz, err := b.openBlobovnicza(p)
if err != nil { if err != nil {
if ignoreErrors { if ignoreErrors {
@ -63,8 +63,9 @@ func (b *Blobovniczas) iterateBlobovniczas(ignoreErrors bool, f func(string, *bl
} }
// iterator over the paths of Blobovniczas sorted by weight. // iterator over the paths of Blobovniczas sorted by weight.
func (b *Blobovniczas) iterateSortedLeaves(addr *oid.Address, f func(string) (bool, error)) error { func (b *Blobovniczas) iterateSortedLeaves(ctx context.Context, addr *oid.Address, f func(string) (bool, error)) error {
_, err := b.iterateSorted( _, err := b.iterateSorted(
ctx,
addr, addr,
make([]string, 0, b.blzShallowDepth), make([]string, 0, b.blzShallowDepth),
b.blzShallowDepth, b.blzShallowDepth,
@ -75,13 +76,14 @@ func (b *Blobovniczas) iterateSortedLeaves(addr *oid.Address, f func(string) (bo
} }
// iterator over directories with Blobovniczas sorted by weight. // iterator over directories with Blobovniczas sorted by weight.
func (b *Blobovniczas) iterateDeepest(addr oid.Address, f func(string) (bool, error)) error { func (b *Blobovniczas) iterateDeepest(ctx context.Context, addr oid.Address, f func(string) (bool, error)) error {
depth := b.blzShallowDepth depth := b.blzShallowDepth
if depth > 0 { if depth > 0 {
depth-- depth--
} }
_, err := b.iterateSorted( _, err := b.iterateSorted(
ctx,
&addr, &addr,
make([]string, 0, depth), make([]string, 0, depth),
depth, depth,
@ -92,7 +94,7 @@ func (b *Blobovniczas) iterateDeepest(addr oid.Address, f func(string) (bool, er
} }
// iterator over particular level of directories. // iterator over particular level of directories.
func (b *Blobovniczas) iterateSorted(addr *oid.Address, curPath []string, execDepth uint64, f func([]string) (bool, error)) (bool, error) { func (b *Blobovniczas) iterateSorted(ctx context.Context, addr *oid.Address, curPath []string, execDepth uint64, f func([]string) (bool, error)) (bool, error) {
indices := indexSlice(b.blzShallowWidth) indices := indexSlice(b.blzShallowWidth)
hrw.SortSliceByValue(indices, addressHash(addr, filepath.Join(curPath...))) hrw.SortSliceByValue(indices, addressHash(addr, filepath.Join(curPath...)))
@ -100,6 +102,11 @@ func (b *Blobovniczas) iterateSorted(addr *oid.Address, curPath []string, execDe
exec := uint64(len(curPath)) == execDepth exec := uint64(len(curPath)) == execDepth
for i := range indices { for i := range indices {
select {
case <-ctx.Done():
return false, ctx.Err()
default:
}
if i == 0 { if i == 0 {
curPath = append(curPath, u64ToHexString(indices[i])) curPath = append(curPath, u64ToHexString(indices[i]))
} else { } else {
@ -112,7 +119,7 @@ func (b *Blobovniczas) iterateSorted(addr *oid.Address, curPath []string, execDe
} else if stop { } else if stop {
return true, nil return true, nil
} }
} else if stop, err := b.iterateSorted(addr, curPath, execDepth, f); err != nil { } else if stop, err := b.iterateSorted(ctx, addr, curPath, execDepth, f); err != nil {
return false, err return false, err
} else if stop { } else if stop {
return true, nil return true, nil
@ -123,8 +130,8 @@ func (b *Blobovniczas) iterateSorted(addr *oid.Address, curPath []string, execDe
} }
// iterator over the paths of Blobovniczas in random order. // iterator over the paths of Blobovniczas in random order.
func (b *Blobovniczas) iterateLeaves(f func(string) (bool, error)) error { func (b *Blobovniczas) iterateLeaves(ctx context.Context, f func(string) (bool, error)) error {
return b.iterateSortedLeaves(nil, f) return b.iterateSortedLeaves(ctx, nil, f)
} }
// makes slice of uint64 values from 0 to number-1. // makes slice of uint64 values from 0 to number-1.

View file

@ -45,7 +45,7 @@ func (b *Blobovniczas) Put(ctx context.Context, prm common.PutPrm) (common.PutRe
PutPrm: putPrm, PutPrm: putPrm,
} }
if err := b.iterateDeepest(prm.Address, it.iterate); err != nil { if err := b.iterateDeepest(ctx, prm.Address, it.iterate); err != nil {
return common.PutRes{}, err return common.PutRes{}, err
} else if it.ID == nil { } else if it.ID == nil {
if it.AllFull { if it.AllFull {