forked from TrueCloudLab/frostfs-node
Dmitrii Stepanov
e3d9dd6ee8
DB value is only valid while the tx is alive. But handler may to run something in other goroutine. Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
189 lines
4.8 KiB
Go
189 lines
4.8 KiB
Go
package blobovnicza
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"fmt"
|
|
"math"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"go.etcd.io/bbolt"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
)
|
|
|
|
// iterateAllDataBuckets iterates all buckets in db
|
|
//
|
|
// If the maximum size of the object (b.objSizeLimit) has been changed to lower value,
|
|
// then there may be more buckets than the current limit of the object size.
|
|
func (b *Blobovnicza) iterateAllDataBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error {
|
|
return b.iterateBucketKeys(false, func(lower uint64, upper uint64, key []byte) (bool, error) {
|
|
buck := tx.Bucket(key)
|
|
if buck == nil {
|
|
return true, nil
|
|
}
|
|
|
|
return f(lower, upper, buck)
|
|
})
|
|
}
|
|
|
|
func (b *Blobovnicza) iterateBucketKeys(useObjLimitBound bool, f func(uint64, uint64, []byte) (bool, error)) error {
|
|
return b.iterateBounds(useObjLimitBound, func(lower, upper uint64) (bool, error) {
|
|
return f(lower, upper, bucketKeyFromBounds(upper))
|
|
})
|
|
}
|
|
|
|
func (b *Blobovnicza) iterateBounds(useObjLimitBound bool, f func(uint64, uint64) (bool, error)) error {
|
|
var objLimitBound uint64 = math.MaxUint64
|
|
if useObjLimitBound {
|
|
objLimitBound = upperPowerOfTwo(b.objSizeLimit)
|
|
}
|
|
|
|
for upper := firstBucketBound; upper <= max(objLimitBound, firstBucketBound); upper *= 2 {
|
|
var lower uint64
|
|
|
|
if upper != firstBucketBound {
|
|
lower = upper/2 + 1
|
|
}
|
|
|
|
if stop, err := f(lower, upper); err != nil {
|
|
return err
|
|
} else if stop {
|
|
break
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func max(a, b uint64) uint64 {
|
|
if a > b {
|
|
return a
|
|
}
|
|
|
|
return b
|
|
}
|
|
|
|
// IterationElement represents a unit of elements through which Iterate operation passes.
|
|
type IterationElement struct {
|
|
addr oid.Address
|
|
|
|
data []byte
|
|
}
|
|
|
|
// ObjectData returns stored object in a binary representation.
|
|
func (x IterationElement) ObjectData() []byte {
|
|
return x.data
|
|
}
|
|
|
|
// Address returns address of the stored object.
|
|
func (x IterationElement) Address() oid.Address {
|
|
return x.addr
|
|
}
|
|
|
|
// IterationHandler is a generic processor of IterationElement.
|
|
type IterationHandler func(IterationElement) error
|
|
|
|
// IteratePrm groups the parameters of Iterate operation.
|
|
type IteratePrm struct {
|
|
decodeAddresses bool
|
|
|
|
withoutData bool
|
|
|
|
handler IterationHandler
|
|
|
|
ignoreErrors bool
|
|
}
|
|
|
|
// DecodeAddresses sets flag to unmarshal object addresses.
|
|
func (x *IteratePrm) DecodeAddresses() {
|
|
x.decodeAddresses = true
|
|
}
|
|
|
|
// WithoutData sets flag to not read data of the objects.
|
|
func (x *IteratePrm) WithoutData() {
|
|
x.withoutData = true
|
|
}
|
|
|
|
// SetHandler sets handler to be called iteratively.
|
|
func (x *IteratePrm) SetHandler(h IterationHandler) {
|
|
x.handler = h
|
|
}
|
|
|
|
// IgnoreErrors makes all errors to be ignored.
|
|
func (x *IteratePrm) IgnoreErrors() {
|
|
x.ignoreErrors = true
|
|
}
|
|
|
|
// IterateRes groups the resulting values of Iterate operation.
|
|
type IterateRes struct{}
|
|
|
|
// Iterate goes through all stored objects, and passes IterationElement to parameterized handler until error return.
|
|
//
|
|
// Decodes object addresses if DecodeAddresses was called. Don't read object data if WithoutData was called.
|
|
//
|
|
// Returns handler's errors directly. Returns nil after iterating finish.
|
|
//
|
|
// Handler should not retain object data. Handler must not be nil.
|
|
func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes, error) {
|
|
ctx, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.Iterate",
|
|
trace.WithAttributes(
|
|
attribute.String("path", b.path),
|
|
attribute.Bool("decode_addresses", prm.decodeAddresses),
|
|
attribute.Bool("without_data", prm.withoutData),
|
|
attribute.Bool("ignore_errors", prm.ignoreErrors),
|
|
))
|
|
defer span.End()
|
|
|
|
var elem IterationElement
|
|
|
|
if err := b.boltDB.View(func(tx *bbolt.Tx) error {
|
|
return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error {
|
|
if isNonDataBucket(bucketName) {
|
|
return nil
|
|
}
|
|
return buck.ForEach(func(k, v []byte) error {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
}
|
|
if prm.decodeAddresses {
|
|
if err := addressFromKey(&elem.addr, k); err != nil {
|
|
if prm.ignoreErrors {
|
|
return nil
|
|
}
|
|
return fmt.Errorf("could not decode address key: %w", err)
|
|
}
|
|
}
|
|
|
|
if !prm.withoutData {
|
|
elem.data = bytes.Clone(v)
|
|
}
|
|
|
|
return prm.handler(elem)
|
|
})
|
|
})
|
|
}); err != nil {
|
|
return IterateRes{}, err
|
|
}
|
|
|
|
return IterateRes{}, nil
|
|
}
|
|
|
|
// IterateAddresses is a helper function which iterates over Blobovnicza and passes addresses of the objects to f.
|
|
func IterateAddresses(ctx context.Context, blz *Blobovnicza, f func(oid.Address) error) error {
|
|
var prm IteratePrm
|
|
|
|
prm.DecodeAddresses()
|
|
prm.WithoutData()
|
|
|
|
prm.SetHandler(func(elem IterationElement) error {
|
|
return f(elem.Address())
|
|
})
|
|
|
|
_, err := blz.Iterate(ctx, prm)
|
|
|
|
return err
|
|
}
|