xk6-frostfs/internal/registry/obj_selector.go
Dmitrii Stepanov 335c45c578
All checks were successful
DCO action / DCO (pull_request) Successful in 1m15s
Tests and linters / Tests (1.22) (pull_request) Successful in 3m12s
Tests and linters / Tests with -race (pull_request) Successful in 3m26s
Tests and linters / Tests (1.21) (pull_request) Successful in 3m39s
Tests and linters / Lint (pull_request) Successful in 4m21s
[#149] selector: Add read timeout
If there are no objects for 10 second, then return nil.
It is required to prevent VU iteration hang if there
are no objects pushed to registry.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2024-06-24 13:59:12 +03:00

209 lines
4.6 KiB
Go

package registry
import (
"context"
"fmt"
"time"
"github.com/nspcc-dev/neo-go/pkg/io"
"go.etcd.io/bbolt"
)
const nextObjectTimeout = 10 * time.Second
type ObjFilter struct {
Status string
Age int
}
type ObjSelector struct {
ctx context.Context
objChan chan *ObjectInfo
boltDB *bbolt.DB
filter *ObjFilter
cacheSize int
kind SelectorKind
}
// objectSelectCache is the default maximum size of a batch to select from DB.
const objectSelectCache = 1000
// NewObjSelector creates a new instance of object selector that can iterate over
// objects in the specified registry.
func NewObjSelector(registry *ObjRegistry, selectionSize int, kind SelectorKind, filter *ObjFilter) *ObjSelector {
if selectionSize <= 0 {
selectionSize = objectSelectCache
}
if filter == nil || filter.Status == "" {
panic("filtering without status is not supported")
}
objSelector := &ObjSelector{
ctx: registry.ctx,
boltDB: registry.boltDB,
filter: filter,
objChan: make(chan *ObjectInfo, selectionSize*2),
cacheSize: selectionSize,
kind: kind,
}
go objSelector.selectLoop()
return objSelector
}
// NextObject returns the next object from the registry that matches filter of
// the selector. NextObject only roams forward from the current position of the
// selector. If there are no objects that match the filter, blocks until one of
// the following happens:
// - a "new" next object is available;
// - underlying registry context is done, nil objects will be returned on the
// currently blocked and every further NextObject calls.
func (o *ObjSelector) NextObject() *ObjectInfo {
if o.kind == SelectorOneshot {
return <-o.objChan
}
select {
case <-time.After(nextObjectTimeout):
return nil
case obj := <-o.objChan:
return obj
}
}
// Count returns total number of objects that match filter of the selector.
func (o *ObjSelector) Count() (int, error) {
count := 0
err := o.boltDB.View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(o.filter.Status))
if b == nil {
return nil
}
if o.filter.Age == 0 {
count = b.Stats().KeyN
return nil
}
return b.ForEach(func(_, objBytes []byte) error {
if objBytes != nil {
r := io.NewBinReaderFromBuf(objBytes)
var obj ObjectInfo
obj.decodeFilterableFields(r)
if r.Err != nil {
// Ignore malformed objects
return nil
}
if o.filter.match(obj) {
count++
}
}
return nil
})
})
return count, err
}
func (o *ObjSelector) selectLoop() {
cache := make([]*ObjectInfo, 0, o.cacheSize)
var lastID uint64
defer close(o.objChan)
for {
select {
case <-o.ctx.Done():
return
default:
}
// cache the objects
err := o.boltDB.View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(o.filter.Status))
if b == nil {
return nil
}
c := b.Cursor()
// Establish the start position for searching the next object:
// If we should go from the beginning (lastID=0), then we start
// from the first element. Otherwise, we start from the last
// handled ID + 1.
var keyBytes, objBytes []byte
if lastID == 0 {
keyBytes, objBytes = c.First()
} else {
keyBytes, objBytes = c.Seek(encodeId(lastID))
if keyBytes != nil && decodeId(keyBytes) == lastID {
keyBytes, objBytes = c.Next()
}
}
// Iterate over objects to find the next object matching the filter.
for ; keyBytes != nil && len(cache) != o.cacheSize; keyBytes, objBytes = c.Next() {
if objBytes != nil {
var obj ObjectInfo
if err := obj.Unmarshal(objBytes); err != nil {
// Ignore malformed objects for now. Maybe it should be panic?
continue
}
if o.filter.match(obj) {
cache = append(cache, &obj)
}
}
}
if len(cache) > 0 {
lastID = cache[len(cache)-1].Id
}
return nil
})
if err != nil {
panic(fmt.Errorf("fetching objects failed: %w", err))
}
for _, obj := range cache {
select {
case <-o.ctx.Done():
return
case o.objChan <- obj:
}
}
if o.kind == SelectorOneshot && len(cache) != o.cacheSize {
return
}
if o.kind != SelectorLooped && len(cache) != o.cacheSize {
// no more objects, wait a little; the logic could be improved.
select {
case <-time.After(time.Second):
case <-o.ctx.Done():
return
}
}
if o.kind == SelectorLooped && len(cache) != o.cacheSize {
lastID = 0
}
// clean handled objects
cache = cache[:0]
}
}
func (f *ObjFilter) match(o ObjectInfo) bool {
if f.Status != "" && f.Status != o.Status {
return false
}
if f.Age != 0 {
objAge := time.Now().UTC().Unix() - o.CreatedAt
if objAge < int64(f.Age) {
return false
}
}
return true
}