[#1559] local_object_storage: Provide readOnly flag to Open

We should be able to reopen storage in readonly in runtime.

Signed-off-by: Evgenii Stratonikov <evgeniy@nspcc.ru>
This commit is contained in:
Evgenii Stratonikov 2022-06-28 16:42:50 +03:00 committed by fyrchik
parent e38b0aa4ba
commit 1e786233bf
20 changed files with 70 additions and 49 deletions

View file

@ -80,7 +80,7 @@ func objectInspectCmd(cmd *cobra.Command, _ []string) {
blz := blobovnicza.New( blz := blobovnicza.New(
blobovnicza.WithPath(vPath), blobovnicza.WithPath(vPath),
blobovnicza.ReadOnly()) blobovnicza.WithReadOnly(true))
common.ExitOnErr(cmd, blz.Open()) common.ExitOnErr(cmd, blz.Open())
defer blz.Close() defer blz.Close()

View file

@ -60,7 +60,7 @@ var Command = &cobra.Command{
blz := blobovnicza.New( blz := blobovnicza.New(
blobovnicza.WithPath(vPath), blobovnicza.WithPath(vPath),
blobovnicza.ReadOnly(), blobovnicza.WithReadOnly(true),
) )
common.ExitOnErr(cmd, blz.Open()) common.ExitOnErr(cmd, blz.Open())

View file

@ -106,9 +106,9 @@ func WithLogger(l *logger.Logger) Option {
} }
} }
// ReadOnly returns an option to open Blobovnicza in read-only mode. // WithReadOnly returns an option to open Blobovnicza in read-only mode.
func ReadOnly() Option { func WithReadOnly(ro bool) Option {
return func(c *cfg) { return func(c *cfg) {
c.boltOptions.ReadOnly = true c.boltOptions.ReadOnly = ro
} }
} }

View file

@ -910,6 +910,7 @@ func (b *blobovniczas) openBlobovnicza(p string) (*blobovnicza.Blobovnicza, erro
} }
blz := blobovnicza.New(append(b.blzOpts, blz := blobovnicza.New(append(b.blzOpts,
blobovnicza.WithReadOnly(b.readOnly),
blobovnicza.WithPath(filepath.Join(b.blzRootPath, p)), blobovnicza.WithPath(filepath.Join(b.blzRootPath, p)),
)...) )...)

View file

@ -44,6 +44,8 @@ type cfg struct {
blzRootPath string blzRootPath string
readOnly bool
blzOpts []blobovnicza.Option blzOpts []blobovnicza.Option
} }

View file

@ -24,7 +24,7 @@ func TestCompression(t *testing.T) {
WithRootPath(dir), WithRootPath(dir),
WithSmallSizeLimit(smallSizeLimit), WithSmallSizeLimit(smallSizeLimit),
WithBlobovniczaShallowWidth(1)) // default width is 16, slow init WithBlobovniczaShallowWidth(1)) // default width is 16, slow init
require.NoError(t, bs.Open()) require.NoError(t, bs.Open(false))
require.NoError(t, bs.Init()) require.NoError(t, bs.Init())
return bs return bs
} }
@ -90,7 +90,7 @@ func TestBlobstor_needsCompression(t *testing.T) {
WithSmallSizeLimit(smallSizeLimit), WithSmallSizeLimit(smallSizeLimit),
WithBlobovniczaShallowWidth(1), WithBlobovniczaShallowWidth(1),
WithUncompressableContentTypes(ct)) WithUncompressableContentTypes(ct))
require.NoError(t, bs.Open()) require.NoError(t, bs.Open(false))
require.NoError(t, bs.Init()) require.NoError(t, bs.Init())
return bs return bs
} }

View file

@ -6,9 +6,11 @@ import (
) )
// Open opens BlobStor. // Open opens BlobStor.
func (b *BlobStor) Open() error { func (b *BlobStor) Open(readOnly bool) error {
b.log.Debug("opening...") b.log.Debug("opening...")
b.blobovniczas.readOnly = readOnly
return nil return nil
} }

View file

@ -21,7 +21,7 @@ func TestExists(t *testing.T) {
b := New(WithRootPath(dir), b := New(WithRootPath(dir),
WithSmallSizeLimit(smallSizeLimit), WithSmallSizeLimit(smallSizeLimit),
WithBlobovniczaShallowWidth(1)) // default width is 16, slow init WithBlobovniczaShallowWidth(1)) // default width is 16, slow init
require.NoError(t, b.Open()) require.NoError(t, b.Open(false))
require.NoError(t, b.Init()) require.NoError(t, b.Init())
objects := []*objectSDK.Object{ objects := []*objectSDK.Object{

View file

@ -34,7 +34,7 @@ func TestIterateObjects(t *testing.T) {
defer os.RemoveAll(p) defer os.RemoveAll(p)
// open Blobstor // open Blobstor
require.NoError(t, blobStor.Open()) require.NoError(t, blobStor.Open(false))
// initialize Blobstor // initialize Blobstor
require.NoError(t, blobStor.Init()) require.NoError(t, blobStor.Init())
@ -111,7 +111,7 @@ func TestIterate_IgnoreErrors(t *testing.T) {
WithBlobovniczaShallowWidth(1), WithBlobovniczaShallowWidth(1),
WithBlobovniczaShallowDepth(1)} WithBlobovniczaShallowDepth(1)}
bs := New(bsOpts...) bs := New(bsOpts...)
require.NoError(t, bs.Open()) require.NoError(t, bs.Open(false))
require.NoError(t, bs.Init()) require.NoError(t, bs.Init())
addrs := make([]oid.Address, objCount) addrs := make([]oid.Address, objCount)
@ -148,7 +148,7 @@ func TestIterate_IgnoreErrors(t *testing.T) {
// Increase width to have blobovnicza which is definitely empty. // Increase width to have blobovnicza which is definitely empty.
b := New(append(bsOpts, WithBlobovniczaShallowWidth(2))...) b := New(append(bsOpts, WithBlobovniczaShallowWidth(2))...)
require.NoError(t, b.Open()) require.NoError(t, b.Open(false))
require.NoError(t, b.Init()) require.NoError(t, b.Init())
var p string var p string
@ -163,7 +163,7 @@ func TestIterate_IgnoreErrors(t *testing.T) {
require.NoError(t, os.Chmod(p, 0)) require.NoError(t, os.Chmod(p, 0))
require.NoError(t, b.Close()) require.NoError(t, b.Close())
require.NoError(t, bs.Open()) require.NoError(t, bs.Open(false))
require.NoError(t, bs.Init()) require.NoError(t, bs.Init())
var prm IteratePrm var prm IteratePrm

View file

@ -10,7 +10,7 @@ import (
) )
// Open boltDB instance for metabase. // Open boltDB instance for metabase.
func (db *DB) Open() error { func (db *DB) Open(readOnly bool) error {
err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission) err := util.MkdirAllX(filepath.Dir(db.info.Path), db.info.Permission)
if err != nil { if err != nil {
return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err) return fmt.Errorf("can't create dir %s for metabase: %w", db.info.Path, err)
@ -18,6 +18,11 @@ func (db *DB) Open() error {
db.log.Debug("created directory for Metabase", zap.String("path", db.info.Path)) db.log.Debug("created directory for Metabase", zap.String("path", db.info.Path))
if db.boltOptions == nil {
db.boltOptions = bbolt.DefaultOptions
}
db.boltOptions.ReadOnly = readOnly
db.boltDB, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions) db.boltDB, err = bbolt.Open(db.info.Path, db.info.Permission, db.boltOptions)
if err != nil { if err != nil {
return fmt.Errorf("can't open boltDB database: %w", err) return fmt.Errorf("can't open boltDB database: %w", err)

View file

@ -39,7 +39,7 @@ func newDB(t testing.TB, opts ...meta.Option) *meta.DB {
bdb := meta.New(append([]meta.Option{meta.WithPath(path), meta.WithPermissions(0600)}, bdb := meta.New(append([]meta.Option{meta.WithPath(path), meta.WithPermissions(0600)},
opts...)...) opts...)...)
require.NoError(t, bdb.Open()) require.NoError(t, bdb.Open(false))
require.NoError(t, bdb.Init()) require.NoError(t, bdb.Init())
t.Cleanup(func() { t.Cleanup(func() {

View file

@ -36,13 +36,13 @@ func TestVersion(t *testing.T) {
} }
t.Run("simple", func(t *testing.T) { t.Run("simple", func(t *testing.T) {
db := newDB(t) db := newDB(t)
require.NoError(t, db.Open()) require.NoError(t, db.Open(false))
require.NoError(t, db.Init()) require.NoError(t, db.Init())
check(t, db) check(t, db)
require.NoError(t, db.Close()) require.NoError(t, db.Close())
t.Run("reopen", func(t *testing.T) { t.Run("reopen", func(t *testing.T) {
require.NoError(t, db.Open()) require.NoError(t, db.Open(false))
require.NoError(t, db.Init()) require.NoError(t, db.Init())
check(t, db) check(t, db)
require.NoError(t, db.Close()) require.NoError(t, db.Close())
@ -50,29 +50,29 @@ func TestVersion(t *testing.T) {
}) })
t.Run("old data", func(t *testing.T) { t.Run("old data", func(t *testing.T) {
db := newDB(t) db := newDB(t)
require.NoError(t, db.Open()) require.NoError(t, db.Open(false))
require.NoError(t, db.WriteShardID([]byte{1, 2, 3, 4})) require.NoError(t, db.WriteShardID([]byte{1, 2, 3, 4}))
require.NoError(t, db.Close()) require.NoError(t, db.Close())
require.NoError(t, db.Open()) require.NoError(t, db.Open(false))
require.NoError(t, db.Init()) require.NoError(t, db.Init())
check(t, db) check(t, db)
require.NoError(t, db.Close()) require.NoError(t, db.Close())
}) })
t.Run("invalid version", func(t *testing.T) { t.Run("invalid version", func(t *testing.T) {
db := newDB(t) db := newDB(t)
require.NoError(t, db.Open()) require.NoError(t, db.Open(false))
require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error { require.NoError(t, db.boltDB.Update(func(tx *bbolt.Tx) error {
return updateVersion(tx, version+1) return updateVersion(tx, version+1)
})) }))
require.NoError(t, db.Close()) require.NoError(t, db.Close())
require.NoError(t, db.Open()) require.NoError(t, db.Open(false))
require.Error(t, db.Init()) require.Error(t, db.Init())
require.NoError(t, db.Close()) require.NoError(t, db.Close())
t.Run("reset", func(t *testing.T) { t.Run("reset", func(t *testing.T) {
require.NoError(t, db.Open()) require.NoError(t, db.Open(false))
require.NoError(t, db.Reset()) require.NoError(t, db.Reset())
check(t, db) check(t, db)
require.NoError(t, db.Close()) require.NoError(t, db.Close())

View file

@ -58,14 +58,14 @@ func NewBoltForest(opts ...Option) ForestStorage {
return &b return &b
} }
func (t *boltForest) Init() error { return nil } func (t *boltForest) Open(readOnly bool) error {
func (t *boltForest) Open() error {
err := util.MkdirAllX(filepath.Dir(t.path), t.perm) err := util.MkdirAllX(filepath.Dir(t.path), t.perm)
if err != nil { if err != nil {
return fmt.Errorf("can't create dir %s for the pilorama: %w", t.path, err) return fmt.Errorf("can't create dir %s for the pilorama: %w", t.path, err)
} }
opts := *bbolt.DefaultOptions opts := *bbolt.DefaultOptions
opts.ReadOnly = readOnly
opts.NoSync = t.noSync opts.NoSync = t.noSync
opts.Timeout = 100 * time.Millisecond opts.Timeout = 100 * time.Millisecond
@ -77,6 +77,12 @@ func (t *boltForest) Open() error {
t.db.MaxBatchSize = t.maxBatchSize t.db.MaxBatchSize = t.maxBatchSize
t.db.MaxBatchDelay = t.maxBatchDelay t.db.MaxBatchDelay = t.maxBatchDelay
return nil
}
func (t *boltForest) Init() error {
if t.db.IsReadOnly() {
return nil
}
return t.db.Update(func(tx *bbolt.Tx) error { return t.db.Update(func(tx *bbolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(dataBucket) _, err := tx.CreateBucketIfNotExists(dataBucket)
if err != nil { if err != nil {
@ -89,7 +95,12 @@ func (t *boltForest) Open() error {
return nil return nil
}) })
} }
func (t *boltForest) Close() error { return t.db.Close() } func (t *boltForest) Close() error {
if t.db != nil {
return t.db.Close()
}
return nil
}
// TreeMove implements the Forest interface. // TreeMove implements the Forest interface.
func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*LogMove, error) { func (t *boltForest) TreeMove(d CIDDescriptor, treeID string, m *Move) (*LogMove, error) {

View file

@ -108,7 +108,7 @@ func (f *memoryForest) Init() error {
return nil return nil
} }
func (f *memoryForest) Open() error { func (f *memoryForest) Open(bool) error {
return nil return nil
} }

View file

@ -18,8 +18,8 @@ var providers = []struct {
}{ }{
{"inmemory", func(t testing.TB) Forest { {"inmemory", func(t testing.TB) Forest {
f := NewMemoryForest() f := NewMemoryForest()
require.NoError(t, f.Open(false))
require.NoError(t, f.Init()) require.NoError(t, f.Init())
require.NoError(t, f.Open())
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, f.Close()) require.NoError(t, f.Close())
}) })
@ -32,8 +32,8 @@ var providers = []struct {
require.NoError(t, err) require.NoError(t, err)
f := NewBoltForest(WithPath(filepath.Join(tmpDir, "test.db"))) f := NewBoltForest(WithPath(filepath.Join(tmpDir, "test.db")))
require.NoError(t, f.Open(false))
require.NoError(t, f.Init()) require.NoError(t, f.Init())
require.NoError(t, f.Open())
t.Cleanup(func() { t.Cleanup(func() {
require.NoError(t, f.Close()) require.NoError(t, f.Close())
require.NoError(t, os.RemoveAll(tmpDir)) require.NoError(t, os.RemoveAll(tmpDir))

View file

@ -39,7 +39,7 @@ type ForestStorage interface {
// DumpInfo returns information about the pilorama. // DumpInfo returns information about the pilorama.
DumpInfo() Info DumpInfo() Info
Init() error Init() error
Open() error Open(bool) error
Close() error Close() error
Forest Forest
} }

View file

@ -14,20 +14,20 @@ import (
// Open opens all Shard's components. // Open opens all Shard's components.
func (s *Shard) Open() error { func (s *Shard) Open() error {
components := []interface{ Open() error }{ components := []interface{ Open(bool) error }{
s.blobStor, s.metaBase, s.blobStor, s.metaBase,
} }
if s.pilorama != nil {
components = append(components, s.pilorama)
}
if s.hasWriteCache() { if s.hasWriteCache() {
components = append(components, s.writeCache) components = append(components, s.writeCache)
} }
if s.pilorama != nil {
components = append(components, s.pilorama)
}
for _, component := range components { for _, component := range components {
if err := component.Open(); err != nil { if err := component.Open(false); err != nil {
return fmt.Errorf("could not open %T: %w", component, err) return fmt.Errorf("could not open %T: %w", component, err)
} }
} }
@ -48,14 +48,14 @@ func (s *Shard) Init() error {
s.blobStor.Init, fMetabase, s.blobStor.Init, fMetabase,
} }
if s.pilorama != nil {
components = append(components, s.pilorama.Init)
}
if s.hasWriteCache() { if s.hasWriteCache() {
components = append(components, s.writeCache.Init) components = append(components, s.writeCache.Init)
} }
if s.pilorama != nil {
components = append(components, s.pilorama.Init)
}
for _, component := range components { for _, component := range components {
if err := component(); err != nil { if err := component(); err != nil {
return fmt.Errorf("could not initialize %T: %w", component, err) return fmt.Errorf("could not initialize %T: %w", component, err)
@ -162,14 +162,14 @@ func (s *Shard) refillMetabase() error {
func (s *Shard) Close() error { func (s *Shard) Close() error {
components := []interface{ Close() error }{} components := []interface{ Close() error }{}
if s.hasWriteCache() {
components = append(components, s.writeCache)
}
if s.pilorama != nil { if s.pilorama != nil {
components = append(components, s.pilorama) components = append(components, s.pilorama)
} }
if s.hasWriteCache() {
components = append(components, s.writeCache)
}
components = append(components, s.blobStor, s.metaBase) components = append(components, s.blobStor, s.metaBase)
for _, component := range components { for _, component := range components {

View file

@ -27,7 +27,7 @@ func (s *Shard) ID() *ID {
// UpdateID reads shard ID saved in the metabase and updates it if it is missing. // UpdateID reads shard ID saved in the metabase and updates it if it is missing.
func (s *Shard) UpdateID() (err error) { func (s *Shard) UpdateID() (err error) {
if err = s.metaBase.Open(); err != nil { if err = s.metaBase.Open(false); err != nil {
return err return err
} }
defer func() { defer func() {

View file

@ -27,13 +27,13 @@ const lruKeysCount = 256 * 1024 * 8
const dbName = "small.bolt" const dbName = "small.bolt"
func (c *cache) openStore() error { func (c *cache) openStore(readOnly bool) error {
err := util.MkdirAllX(c.path, os.ModePerm) err := util.MkdirAllX(c.path, os.ModePerm)
if err != nil { if err != nil {
return err return err
} }
c.db, err = OpenDB(c.path, false) c.db, err = OpenDB(c.path, readOnly)
if err != nil { if err != nil {
return fmt.Errorf("could not open database: %w", err) return fmt.Errorf("could not open database: %w", err)
} }

View file

@ -29,7 +29,7 @@ type Cache interface {
DumpInfo() Info DumpInfo() Info
Init() error Init() error
Open() error Open(readOnly bool) error
Close() error Close() error
} }
@ -125,8 +125,8 @@ func (c *cache) DumpInfo() Info {
} }
// Open opens and initializes database. Reads object counters from the ObjectCounters instance. // Open opens and initializes database. Reads object counters from the ObjectCounters instance.
func (c *cache) Open() error { func (c *cache) Open(readOnly bool) error {
err := c.openStore() err := c.openStore(readOnly)
if err != nil { if err != nil {
return err return err
} }