forked from TrueCloudLab/xk6-frostfs
Evgenii Stratonikov
b66b5a2f37
It is the heaviest function executing on setup stage. The culprit is the linear dependency between its execution time and the amount of objects in registry. The solution is to store object by status. While the optimization doesn't work for objects with no status, it is currently provided by all scenarios. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
185 lines
4.1 KiB
Go
185 lines
4.1 KiB
Go
package registry
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
|
"go.etcd.io/bbolt"
|
|
)
|
|
|
|
type ObjFilter struct {
|
|
Status string
|
|
Age int
|
|
}
|
|
|
|
type ObjSelector struct {
|
|
ctx context.Context
|
|
objChan chan *ObjectInfo
|
|
boltDB *bbolt.DB
|
|
filter *ObjFilter
|
|
cacheSize int
|
|
}
|
|
|
|
// objectSelectCache is the default maximum size of a batch to select from DB.
|
|
const objectSelectCache = 1000
|
|
|
|
// NewObjSelector creates a new instance of object selector that can iterate over
|
|
// objects in the specified registry.
|
|
func NewObjSelector(registry *ObjRegistry, selectionSize int, filter *ObjFilter) *ObjSelector {
|
|
if selectionSize <= 0 {
|
|
selectionSize = objectSelectCache
|
|
}
|
|
objSelector := &ObjSelector{
|
|
ctx: registry.ctx,
|
|
boltDB: registry.boltDB,
|
|
filter: filter,
|
|
objChan: make(chan *ObjectInfo, selectionSize*2),
|
|
cacheSize: selectionSize,
|
|
}
|
|
|
|
go objSelector.selectLoop()
|
|
|
|
return objSelector
|
|
}
|
|
|
|
// NextObject returns the next object from the registry that matches filter of
|
|
// the selector. NextObject only roams forward from the current position of the
|
|
// selector. If there are no objects that match the filter, blocks until one of
|
|
// the following happens:
|
|
// - a "new" next object is available;
|
|
// - underlying registry context is done, nil objects will be returned on the
|
|
// currently blocked and every further NextObject calls.
|
|
func (o *ObjSelector) NextObject() *ObjectInfo {
|
|
return <-o.objChan
|
|
}
|
|
|
|
// Count returns total number of objects that match filter of the selector.
|
|
func (o *ObjSelector) Count() (int, error) {
|
|
var count = 0
|
|
err := o.boltDB.View(func(tx *bbolt.Tx) error {
|
|
b := tx.Bucket([]byte(o.filter.Status))
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
|
|
if o.filter.Age == 0 {
|
|
count = b.Stats().KeyN
|
|
return nil
|
|
}
|
|
|
|
return b.ForEach(func(_, objBytes []byte) error {
|
|
if objBytes != nil {
|
|
r := io.NewBinReaderFromBuf(objBytes)
|
|
|
|
var obj ObjectInfo
|
|
obj.decodeFilterableFields(r)
|
|
if r.Err != nil {
|
|
// Ignore malformed objects
|
|
return nil
|
|
}
|
|
if o.filter.match(obj) {
|
|
count++
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
})
|
|
return count, err
|
|
}
|
|
|
|
func (o *ObjSelector) selectLoop() {
|
|
cache := make([]*ObjectInfo, 0, o.cacheSize)
|
|
var lastID uint64
|
|
defer close(o.objChan)
|
|
|
|
for {
|
|
select {
|
|
case <-o.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
// cache the objects
|
|
err := o.boltDB.View(func(tx *bbolt.Tx) error {
|
|
b := tx.Bucket([]byte(o.filter.Status))
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
|
|
c := b.Cursor()
|
|
|
|
// Establish the start position for searching the next object:
|
|
// If we should go from the beginning (lastID=0), then we start
|
|
// from the first element. Otherwise, we start from the last
|
|
// handled ID + 1.
|
|
var keyBytes, objBytes []byte
|
|
if lastID == 0 {
|
|
keyBytes, objBytes = c.First()
|
|
} else {
|
|
keyBytes, objBytes = c.Seek(encodeId(lastID))
|
|
if keyBytes != nil && decodeId(keyBytes) == lastID {
|
|
keyBytes, objBytes = c.Next()
|
|
}
|
|
}
|
|
|
|
// Iterate over objects to find the next object matching the filter.
|
|
for ; keyBytes != nil && len(cache) != o.cacheSize; keyBytes, objBytes = c.Next() {
|
|
if objBytes != nil {
|
|
var obj ObjectInfo
|
|
if err := obj.Unmarshal(objBytes); err != nil {
|
|
// Ignore malformed objects for now. Maybe it should be panic?
|
|
continue
|
|
}
|
|
|
|
if o.filter.match(obj) {
|
|
cache = append(cache, &obj)
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(cache) > 0 {
|
|
lastID = cache[len(cache)-1].Id
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
panic(fmt.Errorf("fetching objects failed: %w", err))
|
|
}
|
|
|
|
for _, obj := range cache {
|
|
select {
|
|
case <-o.ctx.Done():
|
|
return
|
|
case o.objChan <- obj:
|
|
}
|
|
}
|
|
|
|
if len(cache) != o.cacheSize {
|
|
// no more objects, wait a little; the logic could be improved.
|
|
select {
|
|
case <-time.After(time.Second * time.Duration(o.filter.Age/2)):
|
|
case <-o.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
|
|
// clean handled objects
|
|
cache = cache[:0]
|
|
}
|
|
}
|
|
|
|
func (f *ObjFilter) match(o ObjectInfo) bool {
|
|
if f.Status != "" && f.Status != o.Status {
|
|
return false
|
|
}
|
|
if f.Age != 0 {
|
|
objAge := time.Now().UTC().Unix() - o.CreatedAt
|
|
if objAge < int64(f.Age) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|