forked from TrueCloudLab/frostfs-node
[#791] blobovnicza: Implement method to iterate over object addresses
In previous implementation `Blobovnicza.Iterate` op decoded object data only and passed it to the handler. There is a need to iterate over all addresses of the stored objects. Add `DecodeAddresses` and `WithoutData` methods of `IteratePrm` type. Add `Address` method to `IterationElement` type. Make `Iterate` to decode object addresses if `DecodeAddress` was called and not read the data if `WithoutData` was called. Implement `IterateAddresses` helper function to simplify the code. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
8c59ade4ed
commit
5e2ca0d04b
2 changed files with 59 additions and 3 deletions
|
@ -3,6 +3,7 @@ package blobovnicza
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/pkg/object"
|
||||||
"go.etcd.io/bbolt"
|
"go.etcd.io/bbolt"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -58,6 +59,8 @@ func max(a, b uint64) uint64 {
|
||||||
|
|
||||||
// IterationElement represents a unit of elements through which Iterate operation passes.
|
// IterationElement represents a unit of elements through which Iterate operation passes.
|
||||||
type IterationElement struct {
|
type IterationElement struct {
|
||||||
|
addr *object.Address
|
||||||
|
|
||||||
data []byte
|
data []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -66,14 +69,33 @@ func (x IterationElement) ObjectData() []byte {
|
||||||
return x.data
|
return x.data
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Address returns address of the stored object.
|
||||||
|
func (x IterationElement) Address() *object.Address {
|
||||||
|
return x.addr
|
||||||
|
}
|
||||||
|
|
||||||
// IterationHandler is a generic processor of IterationElement.
|
// IterationHandler is a generic processor of IterationElement.
|
||||||
type IterationHandler func(IterationElement) error
|
type IterationHandler func(IterationElement) error
|
||||||
|
|
||||||
// IteratePrm groups the parameters of Iterate operation.
|
// IteratePrm groups the parameters of Iterate operation.
|
||||||
type IteratePrm struct {
|
type IteratePrm struct {
|
||||||
|
decodeAddresses bool
|
||||||
|
|
||||||
|
withoutData bool
|
||||||
|
|
||||||
handler IterationHandler
|
handler IterationHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DecodeAddresses sets flag to unmarshal object addresses.
|
||||||
|
func (x *IteratePrm) DecodeAddresses() {
|
||||||
|
x.decodeAddresses = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// WithoutData sets flag to not read data of the objects.
|
||||||
|
func (x *IteratePrm) WithoutData() {
|
||||||
|
x.withoutData = true
|
||||||
|
}
|
||||||
|
|
||||||
// SetHandler sets handler to be called iteratively.
|
// SetHandler sets handler to be called iteratively.
|
||||||
func (x *IteratePrm) SetHandler(h IterationHandler) {
|
func (x *IteratePrm) SetHandler(h IterationHandler) {
|
||||||
x.handler = h
|
x.handler = h
|
||||||
|
@ -83,8 +105,9 @@ func (x *IteratePrm) SetHandler(h IterationHandler) {
|
||||||
type IterateRes struct {
|
type IterateRes struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterate goes through all stored objects, and passes their headers
|
// Iterate goes through all stored objects, and passes IterationElement to parameterized handler until error return.
|
||||||
// to parameterized handler until error return.
|
//
|
||||||
|
// Decodes object addresses if DecodeAddresses was called. Don't read object data if WithoutData was called.
|
||||||
//
|
//
|
||||||
// Returns handler's errors directly. Returns nil after iterating finish.
|
// Returns handler's errors directly. Returns nil after iterating finish.
|
||||||
//
|
//
|
||||||
|
@ -95,7 +118,20 @@ func (b *Blobovnicza) Iterate(prm IteratePrm) (*IterateRes, error) {
|
||||||
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
||||||
return b.iterateBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) {
|
return b.iterateBuckets(tx, func(lower, upper uint64, buck *bbolt.Bucket) (bool, error) {
|
||||||
err := buck.ForEach(func(k, v []byte) error {
|
err := buck.ForEach(func(k, v []byte) error {
|
||||||
|
if prm.decodeAddresses {
|
||||||
|
if elem.addr == nil {
|
||||||
|
elem.addr = object.NewAddress()
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := addressFromKey(elem.addr, k); err != nil {
|
||||||
|
return fmt.Errorf("could not decode address key: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !prm.withoutData {
|
||||||
elem.data = v
|
elem.data = v
|
||||||
|
}
|
||||||
|
|
||||||
return prm.handler(elem)
|
return prm.handler(elem)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -120,3 +156,19 @@ func IterateObjects(blz *Blobovnicza, f func([]byte) error) error {
|
||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IterateAddresses is a helper function which iterates over Blobovnicza and passes addresses of the objects to f.
|
||||||
|
func IterateAddresses(blz *Blobovnicza, f func(*object.Address) error) error {
|
||||||
|
var prm IteratePrm
|
||||||
|
|
||||||
|
prm.DecodeAddresses()
|
||||||
|
prm.WithoutData()
|
||||||
|
|
||||||
|
prm.SetHandler(func(elem IterationElement) error {
|
||||||
|
return f(elem.Address())
|
||||||
|
})
|
||||||
|
|
||||||
|
_, err := blz.Iterate(prm)
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
|
@ -87,3 +87,7 @@ func (b *Blobovnicza) Put(prm *PutPrm) (*PutRes, error) {
|
||||||
func addressKey(addr *objectSDK.Address) []byte {
|
func addressKey(addr *objectSDK.Address) []byte {
|
||||||
return []byte(addr.String())
|
return []byte(addr.String())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func addressFromKey(dst *objectSDK.Address, data []byte) error {
|
||||||
|
return dst.Parse(string(data))
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue