frostfs-node/pkg/local_object_storage/blobstor/blobovniczatree/control.go
Dmitrii Stepanov db49ad16cc
All checks were successful
DCO action / DCO (pull_request) Successful in 4m10s
Build / Build Components (1.21) (pull_request) Successful in 7m30s
Vulncheck / Vulncheck (pull_request) Successful in 7m5s
Tests and linters / Staticcheck (pull_request) Successful in 10m6s
Tests and linters / Lint (pull_request) Successful in 10m26s
Build / Build Components (1.20) (pull_request) Successful in 12m39s
Tests and linters / Tests (1.20) (pull_request) Successful in 16m55s
Tests and linters / Tests (1.21) (pull_request) Successful in 17m13s
Tests and linters / Tests with -race (pull_request) Successful in 17m14s
[#826] blobovniczatree: Do not create DB's on init
Blobovniczas will be created on write requests.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-12-07 15:37:33 +03:00

171 lines
4.3 KiB
Go

package blobovniczatree
import (
"context"
"errors"
"os"
"path/filepath"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
var errFailedToChangeExtensionReadOnly = errors.New("failed to change blobovnicza extension: read only mode")
// Open opens blobovnicza tree.
func (b *Blobovniczas) Open(readOnly bool) error {
b.readOnly = readOnly
b.metrics.SetMode(readOnly)
b.metrics.SetRebuildStatus(rebuildStatusNotStarted)
b.openManagers()
return nil
}
// Init initializes blobovnicza tree.
//
// Should be called exactly once.
func (b *Blobovniczas) Init() error {
b.log.Debug(logs.BlobovniczatreeInitializingBlobovniczas)
b.log.Debug(logs.BlobovniczaTreeFixingFileExtensions)
if err := b.addDBExtensionToDBs(b.rootPath, 0); err != nil {
b.log.Error(logs.BlobovniczaTreeFixingFileExtensionsFailed, zap.Error(err))
return err
}
b.log.Debug(logs.BlobovniczaTreeFixingFileExtensionsCompletedSuccessfully)
if b.readOnly {
b.log.Debug(logs.BlobovniczatreeReadonlyModeSkipBlobovniczasInitialization)
return nil
}
return b.initializeDBs(context.TODO())
}
func (b *Blobovniczas) initializeDBs(ctx context.Context) error {
err := util.MkdirAllX(b.rootPath, b.perm)
if err != nil {
return err
}
eg, egCtx := errgroup.WithContext(ctx)
eg.SetLimit(b.blzInitWorkerCount)
visited := make(map[string]struct{})
err = b.iterateExistingDBPaths(egCtx, func(p string) (bool, error) {
visited[p] = struct{}{}
eg.Go(func() error {
shBlz := b.getBlobovniczaWithoutCaching(p)
blz, err := shBlz.Open()
if err != nil {
return err
}
defer shBlz.Close()
moveInfo, err := blz.ListMoveInfo(egCtx)
if err != nil {
return err
}
for _, move := range moveInfo {
b.deleteProtectedObjects.Add(move.Address)
}
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
return nil
})
return false, nil
})
if err != nil {
_ = eg.Wait()
return err
}
if b.createDBInAdvance {
err = b.iterateSortedLeaves(egCtx, nil, func(p string) (bool, error) {
if _, found := visited[p]; found {
return false, nil
}
eg.Go(func() error {
shBlz := b.getBlobovniczaWithoutCaching(p)
_, err := shBlz.Open()
if err != nil {
return err
}
defer shBlz.Close()
b.log.Debug(logs.BlobovniczatreeBlobovniczaSuccessfullyInitializedClosing, zap.String("id", p))
return nil
})
return false, nil
})
if err != nil {
_ = eg.Wait()
return err
}
}
return eg.Wait()
}
func (b *Blobovniczas) openManagers() {
b.commondbManager.Open() // order important
b.activeDBManager.Open()
b.dbCache.Open()
}
// Close implements common.Storage.
func (b *Blobovniczas) Close() error {
b.dbCache.Close() // order important
b.activeDBManager.Close()
b.commondbManager.Close()
return nil
}
// returns blobovnicza with path p
//
// If blobovnicza is already cached, instance from cache is returned w/o changes.
func (b *Blobovniczas) getBlobovnicza(p string) *sharedDB {
return b.dbCache.GetOrCreate(p)
}
func (b *Blobovniczas) getBlobovniczaWithoutCaching(p string) *sharedDB {
return b.commondbManager.GetByPath(p)
}
func (b *Blobovniczas) addDBExtensionToDBs(path string, depth uint64) error {
entries, err := os.ReadDir(path)
if os.IsNotExist(err) && depth == 0 {
return nil
}
for _, entry := range entries {
if entry.IsDir() {
if err := b.addDBExtensionToDBs(filepath.Join(path, entry.Name()), depth+1); err != nil {
return err
}
continue
}
if strings.HasSuffix(entry.Name(), dbExtension) {
continue
}
if b.readOnly {
return errFailedToChangeExtensionReadOnly
}
sourcePath := filepath.Join(path, entry.Name())
targetPath := filepath.Join(path, entry.Name()+dbExtension)
b.log.Debug(logs.BlobovniczaTreeFixingFileExtensionForFile, zap.String("source", sourcePath), zap.String("target", targetPath))
if err := os.Rename(sourcePath, targetPath); err != nil {
b.log.Error(logs.BlobovniczaTreeFixingFileExtensionFailed, zap.String("source", sourcePath), zap.String("target", targetPath), zap.Error(err))
return err
}
b.log.Debug(logs.BlobovniczaTreeFixingFileExtensionCompletedSuccessfully, zap.String("source", sourcePath), zap.String("target", targetPath))
}
return nil
}