package registry import ( "context" "encoding/binary" "errors" "fmt" "os" "time" "go.etcd.io/bbolt" ) type ObjRegistry struct { ctx context.Context cancel context.CancelFunc boltDB *bbolt.DB } const ( // Indicates that an object was created, but its data wasn't verified yet. statusCreated = "created" ) // NewObjRegistry creates a new instance of object registry that stores information // about objects in the specified bolt database. As registry uses read-write // connection to the database, there may be only one instance of object registry // per database file at a time. func NewObjRegistry(ctx context.Context, dbFilePath string) *ObjRegistry { options := bbolt.Options{Timeout: 100 * time.Millisecond} boltDB, err := bbolt.Open(dbFilePath, os.ModePerm, &options) if err != nil { panic(err) } ctx, cancel := context.WithCancel(ctx) objRepository := &ObjRegistry{ ctx: ctx, cancel: cancel, boltDB: boltDB, } return objRepository } func (o *ObjRegistry) AddObject(cid, oid, s3Bucket, s3Key, payloadHash string) error { return o.boltDB.Update(func(tx *bbolt.Tx) error { b, err := tx.CreateBucketIfNotExists([]byte(statusCreated)) if err != nil { return err } id, err := b.NextSequence() if err != nil { return err } object := ObjectInfo{ Id: id, CreatedAt: time.Now().UTC().Unix(), CID: cid, OID: oid, S3Bucket: s3Bucket, S3Key: s3Key, PayloadHash: payloadHash, Status: statusCreated, } objBytes, err := object.Marshal() if err != nil { return err } return b.Put(encodeId(id), objBytes) }) } func (o *ObjRegistry) SetObjectStatus(id uint64, oldStatus, newStatus string) error { return o.boltDB.Update(func(tx *bbolt.Tx) error { oldB := tx.Bucket([]byte(oldStatus)) if oldB == nil { return fmt.Errorf("bucket doesn't exist: '%s'", oldStatus) } key := encodeId(id) objBytes := oldB.Get(key) if objBytes == nil { return errors.New("object doesn't exist") } if err := oldB.Delete(key); err != nil { return fmt.Errorf("bucket.Delete: %w", err) } obj := new(ObjectInfo) if err := obj.Unmarshal(objBytes); err != nil { return err } obj.Status = newStatus objBytes, err := obj.Marshal() if err != nil { return err } newB, err := tx.CreateBucketIfNotExists([]byte(newStatus)) if err != nil { return err } return newB.Put(encodeId(id), objBytes) }) } func (o *ObjRegistry) DeleteObject(id uint64) error { return o.boltDB.Update(func(tx *bbolt.Tx) error { return tx.ForEach(func(_ []byte, b *bbolt.Bucket) error { return b.Delete(encodeId(id)) }) }) } func (o *ObjRegistry) Close() error { o.cancel() return o.boltDB.Close() } func encodeId(id uint64) []byte { idBytes := make([]byte, 8) binary.BigEndian.PutUint64(idBytes, id) return idBytes } func decodeId(idBytes []byte) uint64 { return binary.BigEndian.Uint64(idBytes) }