diff --git a/pkg/local_object_storage/writecache/cache.go b/pkg/local_object_storage/writecache/cache.go index 341071dc6..b97fc5856 100644 --- a/pkg/local_object_storage/writecache/cache.go +++ b/pkg/local_object_storage/writecache/cache.go @@ -45,10 +45,7 @@ const ( defaultMaxCacheSize = 1 << 30 // 1 GiB ) -var ( - defaultBucket = []byte{0} - dummyCanceler context.CancelFunc = func() {} -) +var dummyCanceler context.CancelFunc = func() {} // New creates new writecache instance. func New(opts ...Option) Cache { diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 10e4d68f0..83933375b 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -1,12 +1,8 @@ package writecache import ( - "bytes" "context" "errors" - "fmt" - "os" - "path/filepath" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -20,8 +16,6 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -239,83 +233,3 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error { func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { return c.flushFSTree(ctx, ignoreErrors) } - -type batchItem struct { - data []byte - address string -} - -func (c *cache) flushAndDropBBoltDB(ctx context.Context) error { - _, err := os.Stat(filepath.Join(c.path, dbName)) - if err != nil && os.IsNotExist(err) { - return nil - } - if err != nil { - return fmt.Errorf("could not check write-cache database existence: %w", err) - } - db, err := OpenDB(c.path, true, os.OpenFile) - if err != nil { - return fmt.Errorf("could not open write-cache database: %w", err) - } - defer func() { - _ = db.Close() - }() - - var last string - for { - batch, err := c.readNextDBBatch(db, last) - if err != nil { - return err - } - if len(batch) == 0 { - break - } - for _, item := range batch { - var obj objectSDK.Object - if err := obj.Unmarshal(item.data); err != nil { - return fmt.Errorf("unmarshal object from database: %w", err) - } - if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil { - return fmt.Errorf("flush object from database: %w", err) - } - } - last = batch[len(batch)-1].address - } - if err := db.Close(); err != nil { - return fmt.Errorf("close write-cache database: %w", err) - } - if err := os.Remove(filepath.Join(c.path, dbName)); err != nil { - return fmt.Errorf("remove write-cache database: %w", err) - } - return nil -} - -func (c *cache) readNextDBBatch(db *bbolt.DB, last string) ([]batchItem, error) { - const batchSize = 100 - var batch []batchItem - err := db.View(func(tx *bbolt.Tx) error { - var addr oid.Address - - b := tx.Bucket(defaultBucket) - cs := b.Cursor() - for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() { - sa := string(k) - if sa == last { - continue - } - if err := addr.DecodeString(sa); err != nil { - return fmt.Errorf("decode address from database: %w", err) - } - - batch = append(batch, batchItem{data: bytes.Clone(data), address: sa}) - if len(batch) == batchSize { - return errIterationCompleted - } - } - return nil - }) - if err == nil || errors.Is(err, errIterationCompleted) { - return batch, nil - } - return nil, err -} diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index e708a529e..6aface7a5 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -16,8 +16,6 @@ import ( "go.uber.org/zap" ) -const dbName = "small.bolt" - func (c *cache) openStore(mod mode.ComponentMode) error { err := util.MkdirAllX(c.path, os.ModePerm) if err != nil { diff --git a/pkg/local_object_storage/writecache/upgrade.go b/pkg/local_object_storage/writecache/upgrade.go new file mode 100644 index 000000000..3a100f1a3 --- /dev/null +++ b/pkg/local_object_storage/writecache/upgrade.go @@ -0,0 +1,110 @@ +package writecache + +import ( + "bytes" + "context" + "errors" + "fmt" + "io/fs" + "os" + "path/filepath" + "time" + + objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.etcd.io/bbolt" +) + +const dbName = "small.bolt" + +var defaultBucket = []byte{0} + +func (c *cache) flushAndDropBBoltDB(ctx context.Context) error { + _, err := os.Stat(filepath.Join(c.path, dbName)) + if err != nil && os.IsNotExist(err) { + return nil + } + if err != nil { + return fmt.Errorf("could not check write-cache database existence: %w", err) + } + db, err := OpenDB(c.path, true, os.OpenFile) + if err != nil { + return fmt.Errorf("could not open write-cache database: %w", err) + } + defer func() { + _ = db.Close() + }() + + var last string + for { + batch, err := c.readNextDBBatch(db, last) + if err != nil { + return err + } + if len(batch) == 0 { + break + } + for _, item := range batch { + var obj objectSDK.Object + if err := obj.Unmarshal(item.data); err != nil { + return fmt.Errorf("unmarshal object from database: %w", err) + } + if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil { + return fmt.Errorf("flush object from database: %w", err) + } + } + last = batch[len(batch)-1].address + } + if err := db.Close(); err != nil { + return fmt.Errorf("close write-cache database: %w", err) + } + if err := os.Remove(filepath.Join(c.path, dbName)); err != nil { + return fmt.Errorf("remove write-cache database: %w", err) + } + return nil +} + +type batchItem struct { + data []byte + address string +} + +func (c *cache) readNextDBBatch(db *bbolt.DB, last string) ([]batchItem, error) { + const batchSize = 100 + var batch []batchItem + err := db.View(func(tx *bbolt.Tx) error { + var addr oid.Address + + b := tx.Bucket(defaultBucket) + cs := b.Cursor() + for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() { + sa := string(k) + if sa == last { + continue + } + if err := addr.DecodeString(sa); err != nil { + return fmt.Errorf("decode address from database: %w", err) + } + + batch = append(batch, batchItem{data: bytes.Clone(data), address: sa}) + if len(batch) == batchSize { + return errIterationCompleted + } + } + return nil + }) + if err == nil || errors.Is(err, errIterationCompleted) { + return batch, nil + } + return nil, err +} + +// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true. +func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error)) (*bbolt.DB, error) { + return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{ + NoFreelistSync: true, + ReadOnly: ro, + Timeout: 100 * time.Millisecond, + OpenFile: openFile, + }) +} diff --git a/pkg/local_object_storage/writecache/util.go b/pkg/local_object_storage/writecache/util.go deleted file mode 100644 index 0ed4a954e..000000000 --- a/pkg/local_object_storage/writecache/util.go +++ /dev/null @@ -1,20 +0,0 @@ -package writecache - -import ( - "io/fs" - "os" - "path/filepath" - "time" - - "go.etcd.io/bbolt" -) - -// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true. -func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error)) (*bbolt.DB, error) { - return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{ - NoFreelistSync: true, - ReadOnly: ro, - Timeout: 100 * time.Millisecond, - OpenFile: openFile, - }) -}