diff --git a/pkg/local_object_storage/writecache/cache.go b/pkg/local_object_storage/writecache/cache.go index e829d013c..abda5dfbc 100644 --- a/pkg/local_object_storage/writecache/cache.go +++ b/pkg/local_object_storage/writecache/cache.go @@ -45,7 +45,10 @@ const ( defaultMaxCacheSize = 1 << 30 // 1 GiB ) -var dummyCanceler context.CancelFunc = func() {} +var ( + defaultBucket = []byte{0} + dummyCanceler context.CancelFunc = func() {} +) // New creates new writecache instance. func New(opts ...Option) Cache { diff --git a/pkg/local_object_storage/writecache/flush.go b/pkg/local_object_storage/writecache/flush.go index 6257ed5dd..ec2169700 100644 --- a/pkg/local_object_storage/writecache/flush.go +++ b/pkg/local_object_storage/writecache/flush.go @@ -1,8 +1,12 @@ package writecache import ( + "bytes" "context" "errors" + "fmt" + "os" + "path/filepath" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" @@ -16,6 +20,8 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "go.etcd.io/bbolt" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "go.uber.org/zap" @@ -233,3 +239,83 @@ func (c *cache) Flush(ctx context.Context, ignoreErrors, seal bool) error { func (c *cache) flush(ctx context.Context, ignoreErrors bool) error { return c.flushFSTree(ctx, ignoreErrors) } + +type batchItem struct { + data []byte + address string +} + +func (c *cache) flushAndDropBBoltDB(ctx context.Context) error { + _, err := os.Stat(filepath.Join(c.path, dbName)) + if err != nil && os.IsNotExist(err) { + return nil + } + if err != nil { + return fmt.Errorf("could not check write-cache database existence: %w", err) + } + db, err := OpenDB(c.path, true, os.OpenFile) + if err != nil { + return fmt.Errorf("could not open write-cache database: %w", err) + } + defer func() { + _ = db.Close() + }() + + var last string + for { + batch, err := c.readNextDBBatch(db, last) + if err != nil { + return err + } + if len(batch) == 0 { + break + } + for _, item := range batch { + var obj objectSDK.Object + if err := obj.Unmarshal(item.data); err != nil { + return fmt.Errorf("unmarshal object from database: %w", err) + } + if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil { + return fmt.Errorf("flush object from database: %w", err) + } + } + last = batch[len(batch)-1].address + } + if err := db.Close(); err != nil { + return fmt.Errorf("close write-cache database: %w", err) + } + if err := os.Remove(filepath.Join(c.path, dbName)); err != nil { + return fmt.Errorf("remove write-cache database: %w", err) + } + return nil +} + +func (c *cache) readNextDBBatch(db *bbolt.DB, last string) ([]batchItem, error) { + const batchSize = 100 + var batch []batchItem + err := db.View(func(tx *bbolt.Tx) error { + var addr oid.Address + + b := tx.Bucket(defaultBucket) + cs := b.Cursor() + for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() { + sa := string(k) + if sa == last { + continue + } + if err := addr.DecodeString(sa); err != nil { + return fmt.Errorf("decode address from database: %w", err) + } + + batch = append(batch, batchItem{data: bytes.Clone(data), address: sa}) + if len(batch) == batchSize { + return errIterationCompleted + } + } + return nil + }) + if err == nil || errors.Is(err, errIterationCompleted) { + return batch, nil + } + return nil, err +} diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index b4cc8050f..74675a91e 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -16,6 +16,8 @@ import ( "go.uber.org/zap" ) +const dbName = "small.bolt" + func (c *cache) openStore(mod mode.ComponentMode) error { err := util.MkdirAllX(c.path, os.ModePerm) if err != nil { diff --git a/pkg/local_object_storage/writecache/upgrade.go b/pkg/local_object_storage/writecache/upgrade.go deleted file mode 100644 index 3a100f1a3..000000000 --- a/pkg/local_object_storage/writecache/upgrade.go +++ /dev/null @@ -1,110 +0,0 @@ -package writecache - -import ( - "bytes" - "context" - "errors" - "fmt" - "io/fs" - "os" - "path/filepath" - "time" - - objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "go.etcd.io/bbolt" -) - -const dbName = "small.bolt" - -var defaultBucket = []byte{0} - -func (c *cache) flushAndDropBBoltDB(ctx context.Context) error { - _, err := os.Stat(filepath.Join(c.path, dbName)) - if err != nil && os.IsNotExist(err) { - return nil - } - if err != nil { - return fmt.Errorf("could not check write-cache database existence: %w", err) - } - db, err := OpenDB(c.path, true, os.OpenFile) - if err != nil { - return fmt.Errorf("could not open write-cache database: %w", err) - } - defer func() { - _ = db.Close() - }() - - var last string - for { - batch, err := c.readNextDBBatch(db, last) - if err != nil { - return err - } - if len(batch) == 0 { - break - } - for _, item := range batch { - var obj objectSDK.Object - if err := obj.Unmarshal(item.data); err != nil { - return fmt.Errorf("unmarshal object from database: %w", err) - } - if err := c.flushObject(ctx, &obj, item.data, StorageTypeDB); err != nil { - return fmt.Errorf("flush object from database: %w", err) - } - } - last = batch[len(batch)-1].address - } - if err := db.Close(); err != nil { - return fmt.Errorf("close write-cache database: %w", err) - } - if err := os.Remove(filepath.Join(c.path, dbName)); err != nil { - return fmt.Errorf("remove write-cache database: %w", err) - } - return nil -} - -type batchItem struct { - data []byte - address string -} - -func (c *cache) readNextDBBatch(db *bbolt.DB, last string) ([]batchItem, error) { - const batchSize = 100 - var batch []batchItem - err := db.View(func(tx *bbolt.Tx) error { - var addr oid.Address - - b := tx.Bucket(defaultBucket) - cs := b.Cursor() - for k, data := cs.Seek([]byte(last)); k != nil; k, data = cs.Next() { - sa := string(k) - if sa == last { - continue - } - if err := addr.DecodeString(sa); err != nil { - return fmt.Errorf("decode address from database: %w", err) - } - - batch = append(batch, batchItem{data: bytes.Clone(data), address: sa}) - if len(batch) == batchSize { - return errIterationCompleted - } - } - return nil - }) - if err == nil || errors.Is(err, errIterationCompleted) { - return batch, nil - } - return nil, err -} - -// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true. -func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error)) (*bbolt.DB, error) { - return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{ - NoFreelistSync: true, - ReadOnly: ro, - Timeout: 100 * time.Millisecond, - OpenFile: openFile, - }) -} diff --git a/pkg/local_object_storage/writecache/util.go b/pkg/local_object_storage/writecache/util.go new file mode 100644 index 000000000..0ed4a954e --- /dev/null +++ b/pkg/local_object_storage/writecache/util.go @@ -0,0 +1,20 @@ +package writecache + +import ( + "io/fs" + "os" + "path/filepath" + "time" + + "go.etcd.io/bbolt" +) + +// OpenDB opens BoltDB instance for write-cache. Opens in read-only mode if ro is true. +func OpenDB(p string, ro bool, openFile func(string, int, fs.FileMode) (*os.File, error)) (*bbolt.DB, error) { + return bbolt.Open(filepath.Join(p, dbName), os.ModePerm, &bbolt.Options{ + NoFreelistSync: true, + ReadOnly: ro, + Timeout: 100 * time.Millisecond, + OpenFile: openFile, + }) +}