xk6-frostfs/internal/registry/obj_registry.go
Vladimir Domnich b1ec6d562c [#19] Stop object iteration after all objects were processed
At the moment we don't need logic that swings back to beginning of registry when
all objects have been processed. So, for now we can stop iterating and return an
error when selector reaches the end of registry.

Signed-off-by: Vladimir Domnich <v.domnich@yadro.com>
2022-09-23 13:36:27 +03:00

149 lines
3.3 KiB
Go

package registry
import (
"encoding/binary"
"encoding/json"
"errors"
"os"
"time"
"go.etcd.io/bbolt"
)
type ObjRegistry struct {
boltDB *bbolt.DB
objSelector *ObjSelector
}
const (
// Indicates that an object was created, but its data wasn't verified yet.
statusCreated = "created"
)
const bucketName = "_object"
// ObjectInfo represents information about neoFS object that has been created
// via gRPC/HTTP/S3 API.
type ObjectInfo struct {
Id uint64 // Identifier in bolt DB
CID string // Container ID in gRPC/HTTP
OID string // Object ID in gRPC/HTTP
S3Bucket string // Bucket name in S3
S3Key string // Object key in S3
Status string // Status of the object
PayloadHash string // SHA256 hash of object payload that can be used for verification
}
// NewModuleInstance implements the modules.Module interface and returns
// a new instance for each VU.
func NewObjRegistry(dbFilePath string) *ObjRegistry {
options := bbolt.Options{Timeout: 100 * time.Millisecond}
boltDB, err := bbolt.Open(dbFilePath, os.ModePerm, &options)
if err != nil {
panic(err)
}
objSelector := ObjSelector{boltDB: boltDB, objStatus: statusCreated}
objRepository := &ObjRegistry{boltDB: boltDB, objSelector: &objSelector}
return objRepository
}
func (o *ObjRegistry) AddObject(cid, oid, s3Bucket, s3Key, payloadHash string) error {
return o.boltDB.Update(func(tx *bbolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte(bucketName))
if err != nil {
return err
}
id, err := b.NextSequence()
if err != nil {
return err
}
object := ObjectInfo{
Id: id,
CID: cid,
OID: oid,
S3Bucket: s3Bucket,
S3Key: s3Key,
PayloadHash: payloadHash,
Status: statusCreated,
}
objectJson, err := json.Marshal(object)
if err != nil {
return err
}
return b.Put(encodeId(id), objectJson)
})
}
func (o *ObjRegistry) SetObjectStatus(id uint64, newStatus string) error {
return o.boltDB.Update(func(tx *bbolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte(bucketName))
if err != nil {
return err
}
objBytes := b.Get(encodeId(id))
if objBytes == nil {
return errors.New("object doesn't exist")
}
obj := new(ObjectInfo)
if err := json.Unmarshal(objBytes, &obj); err != nil {
return err
}
obj.Status = newStatus
objBytes, err = json.Marshal(obj)
if err != nil {
return err
}
return b.Put(encodeId(id), objBytes)
})
}
func (o *ObjRegistry) GetObjectCountInStatus(status string) (int, error) {
var objCount = 0
err := o.boltDB.View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte(bucketName))
if b == nil {
return nil
}
return b.ForEach(func(_, objBytes []byte) error {
if objBytes != nil {
var obj ObjectInfo
if err := json.Unmarshal(objBytes, &obj); err != nil {
// Ignore malformed objects
return nil
}
if obj.Status == status {
objCount++
}
}
return nil
})
})
return objCount, err
}
func (o *ObjRegistry) NextObjectToVerify() (*ObjectInfo, error) {
return o.objSelector.NextObject()
}
func (o *ObjRegistry) Close() error {
return o.boltDB.Close()
}
func encodeId(id uint64) []byte {
idBytes := make([]byte, 8)
binary.BigEndian.PutUint64(idBytes, id)
return idBytes
}
func decodeId(idBytes []byte) uint64 {
return binary.BigEndian.Uint64(idBytes)
}