package blobovnicza import ( "bytes" "context" "fmt" "math" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" ) // iterateAllDataBuckets iterates all buckets in db // // If the maximum size of the object (b.objSizeLimit) has been changed to lower value, // then there may be more buckets than the current limit of the object size. func (b *Blobovnicza) iterateAllDataBuckets(tx *bbolt.Tx, f func(uint64, uint64, *bbolt.Bucket) (bool, error)) error { return b.iterateBucketKeys(false, func(lower uint64, upper uint64, key []byte) (bool, error) { buck := tx.Bucket(key) if buck == nil { return true, nil } return f(lower, upper, buck) }) } func (b *Blobovnicza) iterateBucketKeys(useObjLimitBound bool, f func(uint64, uint64, []byte) (bool, error)) error { return b.iterateBounds(useObjLimitBound, func(lower, upper uint64) (bool, error) { return f(lower, upper, bucketKeyFromBounds(upper)) }) } func (b *Blobovnicza) iterateBounds(useObjLimitBound bool, f func(uint64, uint64) (bool, error)) error { var objLimitBound uint64 = math.MaxUint64 if useObjLimitBound { objLimitBound = upperPowerOfTwo(b.objSizeLimit) } for upper := firstBucketBound; upper <= max(objLimitBound, firstBucketBound); upper *= 2 { var lower uint64 if upper != firstBucketBound { lower = upper/2 + 1 } if stop, err := f(lower, upper); err != nil { return err } else if stop { break } } return nil } // IterationElement represents a unit of elements through which Iterate operation passes. type IterationElement struct { addr oid.Address data []byte } // ObjectData returns stored object in a binary representation. func (x IterationElement) ObjectData() []byte { return x.data } // Address returns address of the stored object. func (x IterationElement) Address() oid.Address { return x.addr } // IterationHandler is a generic processor of IterationElement. type IterationHandler func(IterationElement) error // IteratePrm groups the parameters of Iterate operation. type IteratePrm struct { decodeAddresses bool withoutData bool handler IterationHandler ignoreErrors bool } // DecodeAddresses sets flag to unmarshal object addresses. func (x *IteratePrm) DecodeAddresses() { x.decodeAddresses = true } // WithoutData sets flag to not read data of the objects. func (x *IteratePrm) WithoutData() { x.withoutData = true } // SetHandler sets handler to be called iteratively. func (x *IteratePrm) SetHandler(h IterationHandler) { x.handler = h } // IgnoreErrors makes all errors to be ignored. func (x *IteratePrm) IgnoreErrors() { x.ignoreErrors = true } // IterateRes groups the resulting values of Iterate operation. type IterateRes struct{} // Iterate goes through all stored objects, and passes IterationElement to parameterized handler until error return. // // Decodes object addresses if DecodeAddresses was called. Don't read object data if WithoutData was called. // // Returns handler's errors directly. Returns nil after iterating finish. // // Handler should not retain object data. Handler must not be nil. func (b *Blobovnicza) Iterate(ctx context.Context, prm IteratePrm) (IterateRes, error) { ctx, span := tracing.StartSpanFromContext(ctx, "Blobovnicza.Iterate", trace.WithAttributes( attribute.String("path", b.path), attribute.Bool("decode_addresses", prm.decodeAddresses), attribute.Bool("without_data", prm.withoutData), attribute.Bool("ignore_errors", prm.ignoreErrors), )) defer span.End() b.controlMtx.RLock() defer b.controlMtx.RUnlock() var elem IterationElement if err := b.boltDB.View(func(tx *bbolt.Tx) error { return tx.ForEach(func(bucketName []byte, buck *bbolt.Bucket) error { if isNonDataBucket(bucketName) { return nil } return buck.ForEach(func(k, v []byte) error { select { case <-ctx.Done(): return ctx.Err() default: } if prm.decodeAddresses { if err := addressFromKey(&elem.addr, k); err != nil { if prm.ignoreErrors { return nil } return fmt.Errorf("could not decode address key: %w", err) } } if !prm.withoutData { elem.data = bytes.Clone(v) } return prm.handler(elem) }) }) }); err != nil { return IterateRes{}, err } return IterateRes{}, nil } // IterateAddresses is a helper function which iterates over Blobovnicza and passes addresses of the objects to f. func IterateAddresses(ctx context.Context, blz *Blobovnicza, f func(oid.Address) error) error { var prm IteratePrm prm.DecodeAddresses() prm.WithoutData() prm.SetHandler(func(elem IterationElement) error { return f(elem.Address()) }) _, err := blz.Iterate(ctx, prm) return err }