xk6-frostfs/internal/registry/obj_registry.go

134 lines
2.9 KiB
Go
Raw Permalink Normal View History

package registry
import (
"context"
"encoding/binary"
"errors"
"fmt"
"os"
"time"
"go.etcd.io/bbolt"
)
type ObjRegistry struct {
ctx context.Context
cancel context.CancelFunc
boltDB *bbolt.DB
}
const (
// Indicates that an object was created, but its data wasn't verified yet.
statusCreated = "created"
)
// NewObjRegistry creates a new instance of object registry that stores information
// about objects in the specified bolt database. As registry uses read-write
// connection to the database, there may be only one instance of object registry
// per database file at a time.
func NewObjRegistry(ctx context.Context, dbFilePath string) *ObjRegistry {
options := bbolt.Options{Timeout: 100 * time.Millisecond}
boltDB, err := bbolt.Open(dbFilePath, os.ModePerm, &options)
if err != nil {
panic(err)
}
ctx, cancel := context.WithCancel(ctx)
objRepository := &ObjRegistry{
ctx: ctx,
cancel: cancel,
boltDB: boltDB,
}
return objRepository
}
func (o *ObjRegistry) AddObject(cid, oid, s3Bucket, s3Key, payloadHash string) error {
return o.boltDB.Batch(func(tx *bbolt.Tx) error {
b, err := tx.CreateBucketIfNotExists([]byte(statusCreated))
if err != nil {
return err
}
id, err := b.NextSequence()
if err != nil {
return err
}
object := ObjectInfo{
Id: id,
CreatedAt: time.Now().UTC().Unix(),
CID: cid,
OID: oid,
S3Bucket: s3Bucket,
S3Key: s3Key,
PayloadHash: payloadHash,
Status: statusCreated,
}
objBytes, err := object.Marshal()
if err != nil {
return err
}
return b.Put(encodeId(id), objBytes)
})
}
func (o *ObjRegistry) SetObjectStatus(id uint64, oldStatus, newStatus string) error {
return o.boltDB.Batch(func(tx *bbolt.Tx) error {
oldB := tx.Bucket([]byte(oldStatus))
if oldB == nil {
return fmt.Errorf("bucket doesn't exist: '%s'", oldStatus)
}
key := encodeId(id)
objBytes := oldB.Get(key)
if objBytes == nil {
return errors.New("object doesn't exist")
}
if err := oldB.Delete(key); err != nil {
return fmt.Errorf("bucket.Delete: %w", err)
}
obj := new(ObjectInfo)
if err := obj.Unmarshal(objBytes); err != nil {
return err
}
obj.Status = newStatus
objBytes, err := obj.Marshal()
if err != nil {
return err
}
newB, err := tx.CreateBucketIfNotExists([]byte(newStatus))
if err != nil {
return err
}
return newB.Put(encodeId(id), objBytes)
})
}
func (o *ObjRegistry) DeleteObject(id uint64) error {
return o.boltDB.Batch(func(tx *bbolt.Tx) error {
return tx.ForEach(func(_ []byte, b *bbolt.Bucket) error {
return b.Delete(encodeId(id))
})
})
}
func (o *ObjRegistry) Close() error {
o.cancel()
return o.boltDB.Close()
}
func encodeId(id uint64) []byte {
idBytes := make([]byte, 8)
binary.BigEndian.PutUint64(idBytes, id)
return idBytes
}
func decodeId(idBytes []byte) uint64 {
return binary.BigEndian.Uint64(idBytes)
}