frostfs-node/pkg/local_object_storage/shard/rebuilder.go
Dmitrii Stepanov f1c7905263 [] blobovniczatree: Make Rebuild concurrent
Different DBs can be rebuild concurrently.

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
2023-12-07 15:37:33 +03:00

98 lines
2.1 KiB
Go

package shard
import (
"context"
"errors"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
type rebuilder struct {
mtx *sync.Mutex
wg *sync.WaitGroup
cancel func()
limiter RebuildWorkerLimiter
}
func newRebuilder(l RebuildWorkerLimiter) *rebuilder {
return &rebuilder{
mtx: &sync.Mutex{},
wg: &sync.WaitGroup{},
cancel: nil,
limiter: l,
}
}
func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.start(ctx, bs, mb, log)
}
func (r *rebuilder) start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) {
if r.cancel != nil {
r.stop(log)
}
ctx, cancel := context.WithCancel(ctx)
r.cancel = cancel
r.wg.Add(1)
go func() {
defer r.wg.Done()
log.Info(logs.BlobstoreRebuildStarted)
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, r.limiter); err != nil {
log.Warn(logs.FailedToRebuildBlobstore, zap.Error(err))
} else {
log.Info(logs.BlobstoreRebuildCompletedSuccessfully)
}
}()
}
func (r *rebuilder) Stop(log *logger.Logger) {
r.mtx.Lock()
defer r.mtx.Unlock()
r.stop(log)
}
func (r *rebuilder) stop(log *logger.Logger) {
if r.cancel == nil {
return
}
r.cancel()
r.wg.Wait()
r.cancel = nil
log.Info(logs.BlobstoreRebuildStopped)
}
var errMBIsNotAvailable = errors.New("metabase is not available")
type mbStorageIDUpdate struct {
mb *meta.DB
}
func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if u.mb == nil {
return errMBIsNotAvailable
}
var prm meta.UpdateStorageIDPrm
prm.SetAddress(addr)
prm.SetStorageID(storageID)
_, err := u.mb.UpdateStorageID(ctx, prm)
return err
}