diff --git a/internal/registry/obj_info.go b/internal/registry/obj_info.go index b3cae4a..c7ab55c 100644 --- a/internal/registry/obj_info.go +++ b/internal/registry/obj_info.go @@ -18,27 +18,35 @@ type ObjectInfo struct { } func (o ObjectInfo) EncodeBinary(w *io.BinWriter) { + o.encodeFilterableFields(w) w.WriteU64LE(o.Id) - w.WriteU64LE(uint64(o.CreatedAt)) w.WriteString(o.CID) w.WriteString(o.OID) w.WriteString(o.S3Bucket) w.WriteString(o.S3Key) - w.WriteString(o.Status) w.WriteString(o.PayloadHash) } +func (o ObjectInfo) encodeFilterableFields(w *io.BinWriter) { + w.WriteU64LE(uint64(o.CreatedAt)) + w.WriteString(o.Status) +} + func (o *ObjectInfo) DecodeBinary(r *io.BinReader) { + o.decodeFilterableFields(r) o.Id = r.ReadU64LE() - o.CreatedAt = int64(r.ReadU64LE()) o.CID = r.ReadString() o.OID = r.ReadString() o.S3Bucket = r.ReadString() o.S3Key = r.ReadString() - o.Status = r.ReadString() o.PayloadHash = r.ReadString() } +func (o *ObjectInfo) decodeFilterableFields(r *io.BinReader) { + o.CreatedAt = int64(r.ReadU64LE()) + o.Status = r.ReadString() +} + func (o ObjectInfo) Marshal() ([]byte, error) { w := io.NewBufBinWriter() o.EncodeBinary(w.BinWriter) diff --git a/internal/registry/obj_info_test.go b/internal/registry/obj_info_test.go index 2c5bdc5..f53f6db 100644 --- a/internal/registry/obj_info_test.go +++ b/internal/registry/obj_info_test.go @@ -87,13 +87,13 @@ func TestObjectInfoEncodeBinary(t *testing.T) { func randomObjectInfo() ObjectInfo { return ObjectInfo{ - Id: rand.Uint64(), CreatedAt: int64(rand.Uint64()), + Status: statusCreated, + Id: rand.Uint64(), CID: randString(32), OID: randString(32), S3Bucket: randString(32), S3Key: randString(32), - Status: "created", PayloadHash: randString(64), } } diff --git a/internal/registry/obj_registry.go b/internal/registry/obj_registry.go index e2368f8..3b01f32 100644 --- a/internal/registry/obj_registry.go +++ b/internal/registry/obj_registry.go @@ -4,6 +4,7 @@ import ( "context" "encoding/binary" "errors" + "fmt" "os" "time" @@ -21,8 +22,6 @@ const ( statusCreated = "created" ) -const bucketName = "_object" - // NewObjRegistry creates a new instance of object registry that stores information // about objects in the specified bolt database. As registry uses read-write // connection to the database, there may be only one instance of object registry @@ -46,7 +45,7 @@ func NewObjRegistry(ctx context.Context, dbFilePath string) *ObjRegistry { func (o *ObjRegistry) AddObject(cid, oid, s3Bucket, s3Key, payloadHash string) error { return o.boltDB.Update(func(tx *bbolt.Tx) error { - b, err := tx.CreateBucketIfNotExists([]byte(bucketName)) + b, err := tx.CreateBucketIfNotExists([]byte(statusCreated)) if err != nil { return err } @@ -75,14 +74,14 @@ func (o *ObjRegistry) AddObject(cid, oid, s3Bucket, s3Key, payloadHash string) e }) } -func (o *ObjRegistry) SetObjectStatus(id uint64, newStatus string) error { +func (o *ObjRegistry) SetObjectStatus(id uint64, oldStatus, newStatus string) error { return o.boltDB.Update(func(tx *bbolt.Tx) error { - b, err := tx.CreateBucketIfNotExists([]byte(bucketName)) - if err != nil { - return err + oldB := tx.Bucket([]byte(oldStatus)) + if oldB == nil { + return fmt.Errorf("bucket doesn't exist: '%s'", oldStatus) } - objBytes := b.Get(encodeId(id)) + objBytes := oldB.Get(encodeId(id)) if objBytes == nil { return errors.New("object doesn't exist") } @@ -93,22 +92,24 @@ func (o *ObjRegistry) SetObjectStatus(id uint64, newStatus string) error { } obj.Status = newStatus - objBytes, err = obj.Marshal() + objBytes, err := obj.Marshal() if err != nil { return err } - return b.Put(encodeId(id), objBytes) + + newB, err := tx.CreateBucketIfNotExists([]byte(newStatus)) + if err != nil { + return err + } + return newB.Put(encodeId(id), objBytes) }) } func (o *ObjRegistry) DeleteObject(id uint64) error { return o.boltDB.Update(func(tx *bbolt.Tx) error { - b, err := tx.CreateBucketIfNotExists([]byte(bucketName)) - if err != nil { - return err - } - - return b.Delete(encodeId(id)) + return tx.ForEach(func(_ []byte, b *bbolt.Bucket) error { + return b.Delete(encodeId(id)) + }) }) } diff --git a/internal/registry/obj_selector.go b/internal/registry/obj_selector.go index f46368d..71f5e29 100644 --- a/internal/registry/obj_selector.go +++ b/internal/registry/obj_selector.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/nspcc-dev/neo-go/pkg/io" "go.etcd.io/bbolt" ) @@ -58,15 +59,23 @@ func (o *ObjSelector) NextObject() *ObjectInfo { func (o *ObjSelector) Count() (int, error) { var count = 0 err := o.boltDB.View(func(tx *bbolt.Tx) error { - b := tx.Bucket([]byte(bucketName)) + b := tx.Bucket([]byte(o.filter.Status)) if b == nil { return nil } + if o.filter.Age == 0 { + count = b.Stats().KeyN + return nil + } + return b.ForEach(func(_, objBytes []byte) error { if objBytes != nil { + r := io.NewBinReaderFromBuf(objBytes) + var obj ObjectInfo - if err := obj.Unmarshal(objBytes); err != nil { + obj.decodeFilterableFields(r) + if r.Err != nil { // Ignore malformed objects return nil } @@ -94,7 +103,7 @@ func (o *ObjSelector) selectLoop() { // cache the objects err := o.boltDB.View(func(tx *bbolt.Tx) error { - b := tx.Bucket([]byte(bucketName)) + b := tx.Bucket([]byte(o.filter.Status)) if b == nil { return nil }