forked from TrueCloudLab/xk6-frostfs
212 lines
4.7 KiB
Go
212 lines
4.7 KiB
Go
package registry
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/nspcc-dev/neo-go/pkg/io"
|
|
"go.etcd.io/bbolt"
|
|
)
|
|
|
|
const nextObjectTimeout = 10 * time.Second
|
|
|
|
type ObjFilter struct {
|
|
Status string
|
|
Age int
|
|
}
|
|
|
|
type ObjSelector struct {
|
|
ctx context.Context
|
|
objChan chan *ObjectInfo
|
|
boltDB *bbolt.DB
|
|
filter *ObjFilter
|
|
cacheSize int
|
|
kind SelectorKind
|
|
// Sync synchronizes VU used for deletion.
|
|
Sync sync.WaitGroup
|
|
}
|
|
|
|
// objectSelectCache is the default maximum size of a batch to select from DB.
|
|
const objectSelectCache = 1000
|
|
|
|
// NewObjSelector creates a new instance of object selector that can iterate over
|
|
// objects in the specified registry.
|
|
func NewObjSelector(registry *ObjRegistry, selectionSize int, kind SelectorKind, filter *ObjFilter) *ObjSelector {
|
|
if selectionSize <= 0 {
|
|
selectionSize = objectSelectCache
|
|
}
|
|
if filter == nil || filter.Status == "" {
|
|
panic("filtering without status is not supported")
|
|
}
|
|
objSelector := &ObjSelector{
|
|
ctx: registry.ctx,
|
|
boltDB: registry.boltDB,
|
|
filter: filter,
|
|
objChan: make(chan *ObjectInfo, selectionSize*2),
|
|
cacheSize: selectionSize,
|
|
kind: kind,
|
|
}
|
|
|
|
go objSelector.selectLoop()
|
|
|
|
return objSelector
|
|
}
|
|
|
|
// NextObject returns the next object from the registry that matches filter of
|
|
// the selector. NextObject only roams forward from the current position of the
|
|
// selector. If there are no objects that match the filter, blocks until one of
|
|
// the following happens:
|
|
// - a "new" next object is available;
|
|
// - underlying registry context is done, nil objects will be returned on the
|
|
// currently blocked and every further NextObject calls.
|
|
func (o *ObjSelector) NextObject() *ObjectInfo {
|
|
if o.kind == SelectorOneshot {
|
|
return <-o.objChan
|
|
}
|
|
|
|
select {
|
|
case <-time.After(nextObjectTimeout):
|
|
return nil
|
|
case obj := <-o.objChan:
|
|
return obj
|
|
}
|
|
}
|
|
|
|
// Count returns total number of objects that match filter of the selector.
|
|
func (o *ObjSelector) Count() (int, error) {
|
|
count := 0
|
|
err := o.boltDB.View(func(tx *bbolt.Tx) error {
|
|
b := tx.Bucket([]byte(o.filter.Status))
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
|
|
if o.filter.Age == 0 {
|
|
count = b.Stats().KeyN
|
|
return nil
|
|
}
|
|
|
|
return b.ForEach(func(_, objBytes []byte) error {
|
|
if objBytes != nil {
|
|
r := io.NewBinReaderFromBuf(objBytes)
|
|
|
|
var obj ObjectInfo
|
|
obj.decodeFilterableFields(r)
|
|
if r.Err != nil {
|
|
// Ignore malformed objects
|
|
return nil
|
|
}
|
|
if o.filter.match(obj) {
|
|
count++
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
})
|
|
return count, err
|
|
}
|
|
|
|
func (o *ObjSelector) selectLoop() {
|
|
cache := make([]*ObjectInfo, 0, o.cacheSize)
|
|
var lastID uint64
|
|
defer close(o.objChan)
|
|
|
|
for {
|
|
select {
|
|
case <-o.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
// cache the objects
|
|
err := o.boltDB.View(func(tx *bbolt.Tx) error {
|
|
b := tx.Bucket([]byte(o.filter.Status))
|
|
if b == nil {
|
|
return nil
|
|
}
|
|
|
|
c := b.Cursor()
|
|
|
|
// Establish the start position for searching the next object:
|
|
// If we should go from the beginning (lastID=0), then we start
|
|
// from the first element. Otherwise, we start from the last
|
|
// handled ID + 1.
|
|
var keyBytes, objBytes []byte
|
|
if lastID == 0 {
|
|
keyBytes, objBytes = c.First()
|
|
} else {
|
|
keyBytes, objBytes = c.Seek(encodeId(lastID))
|
|
if keyBytes != nil && decodeId(keyBytes) == lastID {
|
|
keyBytes, objBytes = c.Next()
|
|
}
|
|
}
|
|
|
|
// Iterate over objects to find the next object matching the filter.
|
|
for ; keyBytes != nil && len(cache) != o.cacheSize; keyBytes, objBytes = c.Next() {
|
|
if objBytes != nil {
|
|
var obj ObjectInfo
|
|
if err := obj.Unmarshal(objBytes); err != nil {
|
|
// Ignore malformed objects for now. Maybe it should be panic?
|
|
continue
|
|
}
|
|
|
|
if o.filter.match(obj) {
|
|
cache = append(cache, &obj)
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(cache) > 0 {
|
|
lastID = cache[len(cache)-1].Id
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
panic(fmt.Errorf("fetching objects failed: %w", err))
|
|
}
|
|
|
|
for _, obj := range cache {
|
|
select {
|
|
case <-o.ctx.Done():
|
|
return
|
|
case o.objChan <- obj:
|
|
}
|
|
}
|
|
|
|
if o.kind == SelectorOneshot && len(cache) != o.cacheSize {
|
|
return
|
|
}
|
|
|
|
if o.kind != SelectorLooped && len(cache) != o.cacheSize {
|
|
// no more objects, wait a little; the logic could be improved.
|
|
select {
|
|
case <-time.After(time.Second):
|
|
case <-o.ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
|
|
if o.kind == SelectorLooped && len(cache) != o.cacheSize {
|
|
lastID = 0
|
|
}
|
|
|
|
// clean handled objects
|
|
cache = cache[:0]
|
|
}
|
|
}
|
|
|
|
func (f *ObjFilter) match(o ObjectInfo) bool {
|
|
if f.Status != "" && f.Status != o.Status {
|
|
return false
|
|
}
|
|
if f.Age != 0 {
|
|
objAge := time.Now().UTC().Unix() - o.CreatedAt
|
|
if objAge < int64(f.Age) {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|