[#1559] local_object_storage: Fix tests and some data races
Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
parent
50e28f22f9
commit
1691364653
14 changed files with 48 additions and 4 deletions
|
@ -10,6 +10,9 @@ import (
|
|||
)
|
||||
|
||||
func (db *DB) Containers() (list []cid.ID, err error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
list, err = db.containers(tx)
|
||||
|
||||
|
|
|
@ -19,7 +19,8 @@ func (db *DB) Open(readOnly bool) error {
|
|||
db.log.Debug("created directory for Metabase", zap.String("path", db.info.Path))
|
||||
|
||||
if db.boltOptions == nil {
|
||||
db.boltOptions = bbolt.DefaultOptions
|
||||
opts := *bbolt.DefaultOptions
|
||||
db.boltOptions = &opts
|
||||
}
|
||||
db.boltOptions.ReadOnly = readOnly
|
||||
|
||||
|
|
|
@ -40,6 +40,9 @@ type referenceCounter map[string]*referenceNumber
|
|||
|
||||
// Delete removed object records from metabase indexes.
|
||||
func (db *DB) Delete(prm DeletePrm) (DeleteRes, error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
err := db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
return db.deleteGroup(tx, prm.addrs)
|
||||
})
|
||||
|
|
|
@ -38,6 +38,9 @@ func (p ExistsRes) Exists() bool {
|
|||
//
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard.
|
||||
func (db *DB) Exists(prm ExistsPrm) (res ExistsRes, err error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
res.exists, err = db.exists(tx, prm.addr)
|
||||
|
||||
|
|
|
@ -45,6 +45,9 @@ func (r GetRes) Header() *objectSDK.Object {
|
|||
// Returns an error of type apistatus.ObjectNotFound if object is missing in DB.
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard.
|
||||
func (db *DB) Get(prm GetPrm) (res GetRes, err error) {
|
||||
db.modeMtx.Lock()
|
||||
defer db.modeMtx.Unlock()
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
res.hdr, err = db.get(tx, prm.addr, true, prm.raw)
|
||||
|
||||
|
|
|
@ -82,6 +82,9 @@ var ErrLockObjectRemoval = errors.New("lock object removal")
|
|||
// NOTE: Marks any object with GC mark (despite any prohibitions on operations
|
||||
// with that object) if WithForceGCMark option has been provided.
|
||||
func (db *DB) Inhume(prm InhumePrm) (res InhumeRes, err error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
garbageBKT := tx.Bucket(garbageBucketName)
|
||||
|
||||
|
|
|
@ -61,6 +61,9 @@ func (l ListRes) Cursor() *Cursor {
|
|||
// Returns ErrEndOfListing if there are no more objects to return or count
|
||||
// parameter set to zero.
|
||||
func (db *DB) ListWithCursor(prm ListPrm) (res ListRes, err error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
result := make([]oid.Address, 0, prm.count)
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
|
|
|
@ -29,6 +29,9 @@ func bucketNameLockers(idCnr cid.ID) []byte {
|
|||
//
|
||||
// Locked list should be unique. Panics if it is empty.
|
||||
func (db *DB) Lock(cnr cid.ID, locker oid.ID, locked []oid.ID) error {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if len(locked) == 0 {
|
||||
panic("empty locked list")
|
||||
}
|
||||
|
|
|
@ -49,6 +49,9 @@ func (p MovableRes) AddressList() []oid.Address {
|
|||
// ToMoveIt marks objects to move it into another shard. This useful for
|
||||
// faster HRW fetching.
|
||||
func (db *DB) ToMoveIt(prm ToMoveItPrm) (res ToMoveItRes, err error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
toMoveIt, err := tx.CreateBucketIfNotExists(toMoveItBucketName)
|
||||
if err != nil {
|
||||
|
@ -63,6 +66,9 @@ func (db *DB) ToMoveIt(prm ToMoveItPrm) (res ToMoveItRes, err error) {
|
|||
|
||||
// DoNotMove removes `MoveIt` mark from the object.
|
||||
func (db *DB) DoNotMove(prm DoNotMovePrm) (res DoNotMoveRes, err error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
err = db.boltDB.Update(func(tx *bbolt.Tx) error {
|
||||
toMoveIt := tx.Bucket(toMoveItBucketName)
|
||||
if toMoveIt == nil {
|
||||
|
@ -77,6 +83,9 @@ func (db *DB) DoNotMove(prm DoNotMovePrm) (res DoNotMoveRes, err error) {
|
|||
|
||||
// Movable returns list of marked objects to move into other shard.
|
||||
func (db *DB) Movable(_ MovablePrm) (MovableRes, error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
var strAddrs []string
|
||||
|
||||
err := db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
|
|
|
@ -54,6 +54,9 @@ var (
|
|||
//
|
||||
// Returns an error of type apistatus.ObjectAlreadyRemoved if object has been placed in graveyard.
|
||||
func (db *DB) Put(prm PutPrm) (res PutRes, err error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
err = db.boltDB.Batch(func(tx *bbolt.Tx) error {
|
||||
return db.put(tx, prm.obj, prm.id, nil)
|
||||
})
|
||||
|
|
|
@ -56,6 +56,9 @@ func (r SelectRes) AddressList() []oid.Address {
|
|||
|
||||
// Select returns list of addresses of objects that match search filters.
|
||||
func (db *DB) Select(prm SelectPrm) (res SelectRes, err error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
if blindlyProcess(prm.filters) {
|
||||
return res, nil
|
||||
}
|
||||
|
|
|
@ -34,6 +34,9 @@ func (r IsSmallRes) BlobovniczaID() *blobovnicza.ID {
|
|||
// shallow path which is calculated from address and therefore it is not
|
||||
// indexed in metabase.
|
||||
func (db *DB) IsSmall(prm IsSmallPrm) (res IsSmallRes, err error) {
|
||||
db.modeMtx.RLock()
|
||||
defer db.modeMtx.RUnlock()
|
||||
|
||||
err = db.boltDB.View(func(tx *bbolt.Tx) error {
|
||||
res.id, err = db.isSmall(tx, prm.addr)
|
||||
|
||||
|
|
|
@ -7,7 +7,6 @@ import (
|
|||
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard"
|
||||
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
|
||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||
oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -56,7 +55,8 @@ func testShardDelete(t *testing.T, hasWriteCache bool) {
|
|||
})
|
||||
|
||||
t.Run("small object", func(t *testing.T) {
|
||||
obj.SetID(oidtest.ID())
|
||||
obj := generateObjectWithCID(t, cnr)
|
||||
addAttribute(obj, "foo", "bar")
|
||||
addPayload(obj, 1<<5)
|
||||
|
||||
putPrm.SetObject(obj)
|
||||
|
|
|
@ -60,7 +60,11 @@ func (c *cache) openStore(readOnly bool) error {
|
|||
DirNameLen: 1,
|
||||
}
|
||||
|
||||
// Write-cache can be opened multiple times during `SetMode`.
|
||||
// flushed map must not be re-created in this case.
|
||||
if c.flushed == nil {
|
||||
c.flushed, _ = lru.New(lruKeysCount)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue