Evgenii Stratonikov
b66b5a2f37
It is the heaviest function executing on setup stage. The culprit is the linear dependency between its execution time and the amount of objects in registry. The solution is to store object by status. While the optimization doesn't work for objects with no status, it is currently provided by all scenarios. Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
129 lines
2.8 KiB
Go
129 lines
2.8 KiB
Go
package registry
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"time"
|
|
|
|
"go.etcd.io/bbolt"
|
|
)
|
|
|
|
type ObjRegistry struct {
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
boltDB *bbolt.DB
|
|
}
|
|
|
|
const (
|
|
// Indicates that an object was created, but its data wasn't verified yet.
|
|
statusCreated = "created"
|
|
)
|
|
|
|
// NewObjRegistry creates a new instance of object registry that stores information
|
|
// about objects in the specified bolt database. As registry uses read-write
|
|
// connection to the database, there may be only one instance of object registry
|
|
// per database file at a time.
|
|
func NewObjRegistry(ctx context.Context, dbFilePath string) *ObjRegistry {
|
|
options := bbolt.Options{Timeout: 100 * time.Millisecond}
|
|
boltDB, err := bbolt.Open(dbFilePath, os.ModePerm, &options)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
objRepository := &ObjRegistry{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
boltDB: boltDB,
|
|
}
|
|
return objRepository
|
|
}
|
|
|
|
func (o *ObjRegistry) AddObject(cid, oid, s3Bucket, s3Key, payloadHash string) error {
|
|
return o.boltDB.Update(func(tx *bbolt.Tx) error {
|
|
b, err := tx.CreateBucketIfNotExists([]byte(statusCreated))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
id, err := b.NextSequence()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
object := ObjectInfo{
|
|
Id: id,
|
|
CreatedAt: time.Now().UTC().Unix(),
|
|
CID: cid,
|
|
OID: oid,
|
|
S3Bucket: s3Bucket,
|
|
S3Key: s3Key,
|
|
PayloadHash: payloadHash,
|
|
Status: statusCreated,
|
|
}
|
|
objBytes, err := object.Marshal()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return b.Put(encodeId(id), objBytes)
|
|
})
|
|
}
|
|
|
|
func (o *ObjRegistry) SetObjectStatus(id uint64, oldStatus, newStatus string) error {
|
|
return o.boltDB.Update(func(tx *bbolt.Tx) error {
|
|
oldB := tx.Bucket([]byte(oldStatus))
|
|
if oldB == nil {
|
|
return fmt.Errorf("bucket doesn't exist: '%s'", oldStatus)
|
|
}
|
|
|
|
objBytes := oldB.Get(encodeId(id))
|
|
if objBytes == nil {
|
|
return errors.New("object doesn't exist")
|
|
}
|
|
|
|
obj := new(ObjectInfo)
|
|
if err := obj.Unmarshal(objBytes); err != nil {
|
|
return err
|
|
}
|
|
obj.Status = newStatus
|
|
|
|
objBytes, err := obj.Marshal()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
newB, err := tx.CreateBucketIfNotExists([]byte(newStatus))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return newB.Put(encodeId(id), objBytes)
|
|
})
|
|
}
|
|
|
|
func (o *ObjRegistry) DeleteObject(id uint64) error {
|
|
return o.boltDB.Update(func(tx *bbolt.Tx) error {
|
|
return tx.ForEach(func(_ []byte, b *bbolt.Bucket) error {
|
|
return b.Delete(encodeId(id))
|
|
})
|
|
})
|
|
}
|
|
|
|
func (o *ObjRegistry) Close() error {
|
|
o.cancel()
|
|
return o.boltDB.Close()
|
|
}
|
|
|
|
func encodeId(id uint64) []byte {
|
|
idBytes := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(idBytes, id)
|
|
return idBytes
|
|
}
|
|
|
|
func decodeId(idBytes []byte) uint64 {
|
|
return binary.BigEndian.Uint64(idBytes)
|
|
}
|