2022-09-21 05:45:23 +00:00
|
|
|
package registry
|
|
|
|
|
|
|
|
import (
|
2022-10-26 19:29:17 +00:00
|
|
|
"context"
|
2022-09-21 05:45:23 +00:00
|
|
|
"encoding/binary"
|
2022-09-22 16:57:21 +00:00
|
|
|
"errors"
|
2022-09-21 05:45:23 +00:00
|
|
|
"os"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"go.etcd.io/bbolt"
|
|
|
|
)
|
|
|
|
|
|
|
|
type ObjRegistry struct {
|
2022-10-26 19:29:17 +00:00
|
|
|
ctx context.Context
|
|
|
|
cancel context.CancelFunc
|
2022-09-26 18:05:28 +00:00
|
|
|
boltDB *bbolt.DB
|
2022-09-21 05:45:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
const (
|
2022-09-22 16:57:21 +00:00
|
|
|
// Indicates that an object was created, but its data wasn't verified yet.
|
2022-09-21 05:45:23 +00:00
|
|
|
statusCreated = "created"
|
|
|
|
)
|
|
|
|
|
|
|
|
const bucketName = "_object"
|
|
|
|
|
2022-09-26 18:05:28 +00:00
|
|
|
// NewObjRegistry creates a new instance of object registry that stores information
|
|
|
|
// about objects in the specified bolt database. As registry uses read-write
|
|
|
|
// connection to the database, there may be only one instance of object registry
|
|
|
|
// per database file at a time.
|
2022-10-26 19:29:17 +00:00
|
|
|
func NewObjRegistry(ctx context.Context, dbFilePath string) *ObjRegistry {
|
2022-09-21 05:45:23 +00:00
|
|
|
options := bbolt.Options{Timeout: 100 * time.Millisecond}
|
|
|
|
boltDB, err := bbolt.Open(dbFilePath, os.ModePerm, &options)
|
|
|
|
if err != nil {
|
|
|
|
panic(err)
|
|
|
|
}
|
|
|
|
|
2022-10-26 19:29:17 +00:00
|
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
|
|
|
|
|
|
objRepository := &ObjRegistry{
|
|
|
|
ctx: ctx,
|
|
|
|
cancel: cancel,
|
|
|
|
boltDB: boltDB,
|
|
|
|
}
|
2022-09-21 05:45:23 +00:00
|
|
|
return objRepository
|
|
|
|
}
|
|
|
|
|
|
|
|
func (o *ObjRegistry) AddObject(cid, oid, s3Bucket, s3Key, payloadHash string) error {
|
|
|
|
return o.boltDB.Update(func(tx *bbolt.Tx) error {
|
|
|
|
b, err := tx.CreateBucketIfNotExists([]byte(bucketName))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
id, err := b.NextSequence()
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
object := ObjectInfo{
|
|
|
|
Id: id,
|
2023-03-23 13:32:25 +00:00
|
|
|
CreatedAt: time.Now().UTC().Unix(),
|
2022-09-21 05:45:23 +00:00
|
|
|
CID: cid,
|
|
|
|
OID: oid,
|
|
|
|
S3Bucket: s3Bucket,
|
|
|
|
S3Key: s3Key,
|
|
|
|
PayloadHash: payloadHash,
|
|
|
|
Status: statusCreated,
|
|
|
|
}
|
2023-03-23 13:32:25 +00:00
|
|
|
objBytes, err := object.Marshal()
|
2022-09-21 05:45:23 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2023-03-23 13:32:25 +00:00
|
|
|
return b.Put(encodeId(id), objBytes)
|
2022-09-21 05:45:23 +00:00
|
|
|
})
|
|
|
|
}
|
|
|
|
|
|
|
|
func (o *ObjRegistry) SetObjectStatus(id uint64, newStatus string) error {
|
|
|
|
return o.boltDB.Update(func(tx *bbolt.Tx) error {
|
|
|
|
b, err := tx.CreateBucketIfNotExists([]byte(bucketName))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
objBytes := b.Get(encodeId(id))
|
|
|
|
if objBytes == nil {
|
2022-09-22 16:57:21 +00:00
|
|
|
return errors.New("object doesn't exist")
|
2022-09-21 05:45:23 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
obj := new(ObjectInfo)
|
2023-03-23 13:32:25 +00:00
|
|
|
if err := obj.Unmarshal(objBytes); err != nil {
|
2022-09-21 05:45:23 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
obj.Status = newStatus
|
|
|
|
|
2023-03-23 13:32:25 +00:00
|
|
|
objBytes, err = obj.Marshal()
|
2022-09-21 05:45:23 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return b.Put(encodeId(id), objBytes)
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2022-09-27 16:01:43 +00:00
|
|
|
func (o *ObjRegistry) DeleteObject(id uint64) error {
|
|
|
|
return o.boltDB.Update(func(tx *bbolt.Tx) error {
|
|
|
|
b, err := tx.CreateBucketIfNotExists([]byte(bucketName))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return b.Delete(encodeId(id))
|
|
|
|
})
|
|
|
|
}
|
|
|
|
|
2022-09-21 05:45:23 +00:00
|
|
|
func (o *ObjRegistry) Close() error {
|
2022-10-26 19:29:17 +00:00
|
|
|
o.cancel()
|
2022-09-21 05:45:23 +00:00
|
|
|
return o.boltDB.Close()
|
|
|
|
}
|
|
|
|
|
|
|
|
func encodeId(id uint64) []byte {
|
|
|
|
idBytes := make([]byte, 8)
|
|
|
|
binary.BigEndian.PutUint64(idBytes, id)
|
|
|
|
return idBytes
|
|
|
|
}
|
|
|
|
|
|
|
|
func decodeId(idBytes []byte) uint64 {
|
|
|
|
return binary.BigEndian.Uint64(idBytes)
|
|
|
|
}
|