node: Use Context
in Blobovniczas.Iterate()
#403
10 changed files with 41 additions and 25 deletions
|
@ -1,6 +1,7 @@
|
||||||
package blobovnicza
|
package blobovnicza
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
|
@ -33,6 +34,6 @@ func listFunc(cmd *cobra.Command, _ []string) {
|
||||||
blz := openBlobovnicza(cmd)
|
blz := openBlobovnicza(cmd)
|
||||||
defer blz.Close()
|
defer blz.Close()
|
||||||
|
|
||||||
err := blobovnicza.IterateAddresses(blz, wAddr)
|
err := blobovnicza.IterateAddresses(context.Background(), blz, wAddr)
|
||||||
common.ExitOnErr(cmd, common.Errf("blobovnicza iterator failure: %w", err))
|
common.ExitOnErr(cmd, common.Errf("blobovnicza iterator failure: %w", err))
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package blobovnicza
|
package blobovnicza
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -117,12 +118,17 @@ type IterateRes struct {
|
||||||
// Returns handler's errors directly. Returns nil after iterating finish.
|
// Returns handler's errors directly. Returns nil after iterating finish.
|
||||||
//
|
//
|
||||||
// Handler should not retain object data. Handler must not be nil.
|
// Handler should not retain object data. Handler must not be nil.
|
||||||
func (b *Blobovnicza) Iterate(prm IteratePrm) (IterateRes, error) {
|
func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes, error) {
|
||||||
var elem IterationElement
|
var elem IterationElement
|
||||||
|
|
||||||
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
return tx.ForEach(func(name []byte, buck *bbolt.Bucket) error {
|
return tx.ForEach(func(name []byte, buck *bbolt.Bucket) error {
|
||||||
return buck.ForEach(func(k, v []byte) error {
|
return buck.ForEach(func(k, v []byte) error {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
if prm.decodeAddresses {
|
if prm.decodeAddresses {
|
||||||
if err := addressFromKey(&elem.addr, k); err != nil {
|
if err := addressFromKey(&elem.addr, k); err != nil {
|
||||||
if prm.ignoreErrors {
|
if prm.ignoreErrors {
|
||||||
|
@ -147,7 +153,7 @@ func (b *Blobovnicza) Iterate(prm IteratePrm) (IterateRes, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// IterateAddresses is a helper function which iterates over Blobovnicza and passes addresses of the objects to f.
|
// IterateAddresses is a helper function which iterates over Blobovnicza and passes addresses of the objects to f.
|
||||||
func IterateAddresses(blz *Blobovnicza, f func(oid.Address) error) error {
|
func IterateAddresses(ctx context.Context, blz *Blobovnicza, f func(oid.Address) error) error {
|
||||||
var prm IteratePrm
|
var prm IteratePrm
|
||||||
|
|
||||||
prm.DecodeAddresses()
|
prm.DecodeAddresses()
|
||||||
|
@ -157,7 +163,7 @@ func IterateAddresses(blz *Blobovnicza, f func(oid.Address) error) error {
|
||||||
return f(elem.Address())
|
return f(elem.Address())
|
||||||
})
|
})
|
||||||
|
|
||||||
_, err := blz.Iterate(prm)
|
_, err := blz.Iterate(ctx, prm)
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package blobovnicza
|
package blobovnicza
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -33,22 +34,22 @@ func TestBlobovniczaIterate(t *testing.T) {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = b.Iterate(IteratePrm{handler: inc})
|
_, err = b.Iterate(context.Background(), IteratePrm{handler: inc})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.ElementsMatch(t, seen, data)
|
require.ElementsMatch(t, seen, data)
|
||||||
|
|
||||||
seen = seen[:0]
|
seen = seen[:0]
|
||||||
_, err = b.Iterate(IteratePrm{handler: inc, decodeAddresses: true})
|
_, err = b.Iterate(context.Background(), IteratePrm{handler: inc, decodeAddresses: true})
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
|
|
||||||
seen = seen[:0]
|
seen = seen[:0]
|
||||||
_, err = b.Iterate(IteratePrm{handler: inc, decodeAddresses: true, ignoreErrors: true})
|
_, err = b.Iterate(context.Background(), IteratePrm{handler: inc, decodeAddresses: true, ignoreErrors: true})
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.ElementsMatch(t, seen, data[:1])
|
require.ElementsMatch(t, seen, data[:1])
|
||||||
|
|
||||||
seen = seen[:0]
|
seen = seen[:0]
|
||||||
expectedErr := errors.New("stop iteration")
|
expectedErr := errors.New("stop iteration")
|
||||||
_, err = b.Iterate(IteratePrm{
|
_, err = b.Iterate(context.Background(), IteratePrm{
|
||||||
decodeAddresses: true,
|
decodeAddresses: true,
|
||||||
handler: func(IterationElement) error { return expectedErr },
|
handler: func(IterationElement) error { return expectedErr },
|
||||||
ignoreErrors: true,
|
ignoreErrors: true,
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package blobovniczatree
|
package blobovniczatree
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
|
@ -26,7 +27,7 @@ func (b *Blobovniczas) Init() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.iterateLeaves(func(p string) (bool, error) {
|
return b.iterateLeaves(context.TODO(), func(p string) (bool, error) {
|
||||||
blz, err := b.openBlobovniczaNoCache(p)
|
blz, err := b.openBlobovniczaNoCache(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, err
|
return true, err
|
||||||
|
|
|
@ -48,7 +48,7 @@ func (b *Blobovniczas) Delete(ctx context.Context, prm common.DeletePrm) (res co
|
||||||
activeCache := make(map[string]struct{})
|
activeCache := make(map[string]struct{})
|
||||||
objectFound := false
|
objectFound := false
|
||||||
|
|
||||||
err = b.iterateSortedLeaves(&prm.Address, func(p string) (bool, error) {
|
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
|
||||||
dirPath := filepath.Dir(p)
|
dirPath := filepath.Dir(p)
|
||||||
|
|
||||||
// don't process active blobovnicza of the level twice
|
// don't process active blobovnicza of the level twice
|
||||||
|
|
|
@ -40,7 +40,7 @@ func (b *Blobovniczas) Exists(ctx context.Context, prm common.ExistsPrm) (common
|
||||||
gPrm.SetAddress(prm.Address)
|
gPrm.SetAddress(prm.Address)
|
||||||
|
|
||||||
var found bool
|
var found bool
|
||||||
err := b.iterateSortedLeaves(&prm.Address, func(p string) (bool, error) {
|
err := b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
|
||||||
dirPath := filepath.Dir(p)
|
dirPath := filepath.Dir(p)
|
||||||
|
|
||||||
_, ok := activeCache[dirPath]
|
_, ok := activeCache[dirPath]
|
||||||
|
|
|
@ -46,7 +46,7 @@ func (b *Blobovniczas) Get(ctx context.Context, prm common.GetPrm) (res common.G
|
||||||
|
|
||||||
activeCache := make(map[string]struct{})
|
activeCache := make(map[string]struct{})
|
||||||
|
|
||||||
err = b.iterateSortedLeaves(&prm.Address, func(p string) (bool, error) {
|
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
|
||||||
dirPath := filepath.Dir(p)
|
dirPath := filepath.Dir(p)
|
||||||
|
|
||||||
_, ok := activeCache[dirPath]
|
_, ok := activeCache[dirPath]
|
||||||
|
|
|
@ -46,7 +46,7 @@ func (b *Blobovniczas) GetRange(ctx context.Context, prm common.GetRangePrm) (re
|
||||||
activeCache := make(map[string]struct{})
|
activeCache := make(map[string]struct{})
|
||||||
objectFound := false
|
objectFound := false
|
||||||
|
|
||||||
err = b.iterateSortedLeaves(&prm.Address, func(p string) (bool, error) {
|
err = b.iterateSortedLeaves(ctx, &prm.Address, func(p string) (bool, error) {
|
||||||
dirPath := filepath.Dir(p)
|
dirPath := filepath.Dir(p)
|
||||||
|
|
||||||
_, ok := activeCache[dirPath]
|
_, ok := activeCache[dirPath]
|
||||||
|
|
|
@ -12,8 +12,8 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// Iterate iterates over all objects in b.
|
// Iterate iterates over all objects in b.
|
||||||
func (b *Blobovniczas) Iterate(_ context.Context, prm common.IteratePrm) (common.IterateRes, error) {
|
func (b *Blobovniczas) Iterate(ctx context.Context, prm common.IteratePrm) (common.IterateRes, error) {
|
||||||
return common.IterateRes{}, b.iterateBlobovniczas(prm.IgnoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error {
|
return common.IterateRes{}, b.iterateBlobovniczas(ctx, prm.IgnoreErrors, func(p string, blz *blobovnicza.Blobovnicza) error {
|
||||||
var subPrm blobovnicza.IteratePrm
|
var subPrm blobovnicza.IteratePrm
|
||||||
subPrm.SetHandler(func(elem blobovnicza.IterationElement) error {
|
subPrm.SetHandler(func(elem blobovnicza.IterationElement) error {
|
||||||
data, err := b.compression.Decompress(elem.ObjectData())
|
data, err := b.compression.Decompress(elem.ObjectData())
|
||||||
|
@ -40,14 +40,14 @@ func (b *Blobovniczas) Iterate(_ context.Context, prm common.IteratePrm) (common
|
||||||
})
|
})
|
||||||
subPrm.DecodeAddresses()
|
subPrm.DecodeAddresses()
|
||||||
|
|
||||||
_, err := blz.Iterate(subPrm)
|
_, err := blz.Iterate(ctx, subPrm)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// iterator over all Blobovniczas in unsorted order. Break on f's error return.
|
// iterator over all Blobovniczas in unsorted order. Break on f's error return.
|
||||||
func (b *Blobovniczas) iterateBlobovniczas(ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error {
|
func (b *Blobovniczas) iterateBlobovniczas(ctx context.Context, ignoreErrors bool, f func(string, *blobovnicza.Blobovnicza) error) error {
|
||||||
return b.iterateLeaves(func(p string) (bool, error) {
|
return b.iterateLeaves(ctx, func(p string) (bool, error) {
|
||||||
blz, err := b.openBlobovnicza(p)
|
blz, err := b.openBlobovnicza(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ignoreErrors {
|
if ignoreErrors {
|
||||||
|
@ -63,8 +63,9 @@ func (b *Blobovniczas) iterateBlobovniczas(ignoreErrors bool, f func(string, *bl
|
||||||
}
|
}
|
||||||
|
|
||||||
// iterator over the paths of Blobovniczas sorted by weight.
|
// iterator over the paths of Blobovniczas sorted by weight.
|
||||||
func (b *Blobovniczas) iterateSortedLeaves(addr *oid.Address, f func(string) (bool, error)) error {
|
func (b *Blobovniczas) iterateSortedLeaves(ctx context.Context, addr *oid.Address, f func(string) (bool, error)) error {
|
||||||
_, err := b.iterateSorted(
|
_, err := b.iterateSorted(
|
||||||
|
ctx,
|
||||||
addr,
|
addr,
|
||||||
make([]string, 0, b.blzShallowDepth),
|
make([]string, 0, b.blzShallowDepth),
|
||||||
b.blzShallowDepth,
|
b.blzShallowDepth,
|
||||||
|
@ -75,13 +76,14 @@ func (b *Blobovniczas) iterateSortedLeaves(addr *oid.Address, f func(string) (bo
|
||||||
}
|
}
|
||||||
|
|
||||||
// iterator over directories with Blobovniczas sorted by weight.
|
// iterator over directories with Blobovniczas sorted by weight.
|
||||||
func (b *Blobovniczas) iterateDeepest(addr oid.Address, f func(string) (bool, error)) error {
|
func (b *Blobovniczas) iterateDeepest(ctx context.Context, addr oid.Address, f func(string) (bool, error)) error {
|
||||||
depth := b.blzShallowDepth
|
depth := b.blzShallowDepth
|
||||||
if depth > 0 {
|
if depth > 0 {
|
||||||
depth--
|
depth--
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := b.iterateSorted(
|
_, err := b.iterateSorted(
|
||||||
|
ctx,
|
||||||
&addr,
|
&addr,
|
||||||
make([]string, 0, depth),
|
make([]string, 0, depth),
|
||||||
depth,
|
depth,
|
||||||
|
@ -92,7 +94,7 @@ func (b *Blobovniczas) iterateDeepest(addr oid.Address, f func(string) (bool, er
|
||||||
}
|
}
|
||||||
|
|
||||||
// iterator over particular level of directories.
|
// iterator over particular level of directories.
|
||||||
func (b *Blobovniczas) iterateSorted(addr *oid.Address, curPath []string, execDepth uint64, f func([]string) (bool, error)) (bool, error) {
|
func (b *Blobovniczas) iterateSorted(ctx context.Context, addr *oid.Address, curPath []string, execDepth uint64, f func([]string) (bool, error)) (bool, error) {
|
||||||
indices := indexSlice(b.blzShallowWidth)
|
indices := indexSlice(b.blzShallowWidth)
|
||||||
|
|
||||||
hrw.SortSliceByValue(indices, addressHash(addr, filepath.Join(curPath...)))
|
hrw.SortSliceByValue(indices, addressHash(addr, filepath.Join(curPath...)))
|
||||||
|
@ -100,6 +102,11 @@ func (b *Blobovniczas) iterateSorted(addr *oid.Address, curPath []string, execDe
|
||||||
exec := uint64(len(curPath)) == execDepth
|
exec := uint64(len(curPath)) == execDepth
|
||||||
|
|
||||||
for i := range indices {
|
for i := range indices {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return false, ctx.Err()
|
||||||
|
default:
|
||||||
|
}
|
||||||
if i == 0 {
|
if i == 0 {
|
||||||
curPath = append(curPath, u64ToHexString(indices[i]))
|
curPath = append(curPath, u64ToHexString(indices[i]))
|
||||||
} else {
|
} else {
|
||||||
|
@ -112,7 +119,7 @@ func (b *Blobovniczas) iterateSorted(addr *oid.Address, curPath []string, execDe
|
||||||
} else if stop {
|
} else if stop {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
} else if stop, err := b.iterateSorted(addr, curPath, execDepth, f); err != nil {
|
} else if stop, err := b.iterateSorted(ctx, addr, curPath, execDepth, f); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
} else if stop {
|
} else if stop {
|
||||||
return true, nil
|
return true, nil
|
||||||
|
@ -123,8 +130,8 @@ func (b *Blobovniczas) iterateSorted(addr *oid.Address, curPath []string, execDe
|
||||||
}
|
}
|
||||||
|
|
||||||
// iterator over the paths of Blobovniczas in random order.
|
// iterator over the paths of Blobovniczas in random order.
|
||||||
func (b *Blobovniczas) iterateLeaves(f func(string) (bool, error)) error {
|
func (b *Blobovniczas) iterateLeaves(ctx context.Context, f func(string) (bool, error)) error {
|
||||||
return b.iterateSortedLeaves(nil, f)
|
return b.iterateSortedLeaves(ctx, nil, f)
|
||||||
}
|
}
|
||||||
|
|
||||||
// makes slice of uint64 values from 0 to number-1.
|
// makes slice of uint64 values from 0 to number-1.
|
||||||
|
|
|
@ -45,7 +45,7 @@ func (b *Blobovniczas) Put(ctx context.Context, prm common.PutPrm) (common.PutRe
|
||||||
PutPrm: putPrm,
|
PutPrm: putPrm,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := b.iterateDeepest(prm.Address, it.iterate); err != nil {
|
if err := b.iterateDeepest(ctx, prm.Address, it.iterate); err != nil {
|
||||||
return common.PutRes{}, err
|
return common.PutRes{}, err
|
||||||
} else if it.ID == nil {
|
} else if it.ID == nil {
|
||||||
if it.AllFull {
|
if it.AllFull {
|
||||||
|
|
Loading…
Reference in a new issue