xk6-frostfs/internal/registry/obj_selector.go
Vladimir Domnich 37e27f6791 [#23] Implement deletion of objects
1. Added simple lock mechanism to reset obj selector. This prevents
   most of concurrency issues when multiple VUs try to reset selector.
2. Added logic to delete objects to grpc and s3 scenarios.
3. Added registry support to http scenario.
4. Deletion logic was not implemented for http scenario, because
   http gateway does not provide web-method to delete objects.

Signed-off-by: Vladimir Domnich <v.domnich@yadro.com>
2022-10-03 17:34:20 +03:00

147 lines
3.7 KiB
Go

package registry
import (
"encoding/json"
"sync"
"time"
"go.etcd.io/bbolt"
)
type ObjFilter struct {
Status string
Age int
}
type ObjSelector struct {
boltDB *bbolt.DB
filter *ObjFilter
mu sync.Mutex
lastId uint64
// UTC date&time before which selector is locked for iteration or resetting.
// This lock prevents concurrency issues when some VUs are selecting objects
// while another VU resets the selector and attempts to select the same objects
lockedUntil time.Time
}
// NewObjSelector creates a new instance of object selector that can iterate over
// objects in the specified registry.
func NewObjSelector(registry *ObjRegistry, filter *ObjFilter) *ObjSelector {
objSelector := &ObjSelector{boltDB: registry.boltDB, filter: filter}
return objSelector
}
// NextObject returns the next object from the registry that matches filter of
// the selector. NextObject only roams forward from the current position of the
// selector. If there are no objects that match the filter, then returns nil.
func (o *ObjSelector) NextObject() (*ObjectInfo, error) {
var foundObj *ObjectInfo
err := o.boltDB.View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(bucketName))
if b == nil {
return nil
}
c := b.Cursor()
// We use mutex so that multiple VUs won't attempt to modify lastId simultaneously
// TODO: consider singleton channel that will produce those ids on demand
o.mu.Lock()
defer o.mu.Unlock()
if time.Now().UTC().Before(o.lockedUntil) {
return nil
}
// Establish the start position for searching the next object:
// If we should go from the beginning (lastId=0), then we start from the first element
// Otherwise we start from the key right after the lastId
var keyBytes, objBytes []byte
if o.lastId == 0 {
keyBytes, objBytes = c.First()
} else {
c.Seek(encodeId(o.lastId))
keyBytes, objBytes = c.Next()
}
// Iterate over objects to find the next object matching the filter
var obj ObjectInfo
for ; keyBytes != nil; keyBytes, objBytes = c.Next() {
if objBytes != nil {
if err := json.Unmarshal(objBytes, &obj); err != nil {
// Ignore malformed objects for now. Maybe it should be panic?
continue
}
// If we reached an object that matches filter, stop iterating
if o.filter.match(obj) {
foundObj = &obj
break
}
}
}
// Update the last key
if keyBytes != nil {
o.lastId = decodeId(keyBytes)
return nil
}
return nil
})
return foundObj, err
}
// Resets object selector to start scanning objects from the beginning.
// After resetting the selector is locked for specified lockTime to prevent
// concurrency issues.
func (o *ObjSelector) Reset(lockTime int) bool {
o.mu.Lock()
defer o.mu.Unlock()
if time.Now().UTC().Before(o.lockedUntil) {
return false
}
o.lastId = 0
o.lockedUntil = time.Now().UTC().Add(time.Duration(lockTime) * time.Second)
return true
}
// Count returns total number of objects that match filter of the selector.
func (o *ObjSelector) Count() (int, error) {
var count = 0
err := o.boltDB.View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(bucketName))
if b == nil {
return nil
}
return b.ForEach(func(_, objBytes []byte) error {
if objBytes != nil {
var obj ObjectInfo
if err := json.Unmarshal(objBytes, &obj); err != nil {
// Ignore malformed objects
return nil
}
if o.filter.match(obj) {
count++
}
}
return nil
})
})
return count, err
}
func (f *ObjFilter) match(o ObjectInfo) bool {
if f.Status != "" && f.Status != o.Status {
return false
}
if f.Age != 0 {
objAge := time.Now().UTC().Sub(o.CreatedAt).Seconds()
if objAge < float64(f.Age) {
return false
}
}
return true
}