forked from TrueCloudLab/frostfs-node
174 lines
3.5 KiB
Go
174 lines
3.5 KiB
Go
|
package shard
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"sync"
|
||
|
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor/common"
|
||
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||
|
"go.uber.org/zap"
|
||
|
)
|
||
|
|
||
|
type RebuildWorkerLimiter interface {
|
||
|
AcquireWorkSlot(ctx context.Context) error
|
||
|
ReleaseWorkSlot()
|
||
|
}
|
||
|
|
||
|
type rebuildLimiter struct {
|
||
|
semaphore chan struct{}
|
||
|
}
|
||
|
|
||
|
func newRebuildLimiter(workersCount uint32) *rebuildLimiter {
|
||
|
return &rebuildLimiter{
|
||
|
semaphore: make(chan struct{}, workersCount),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (l *rebuildLimiter) AcquireWorkSlot(ctx context.Context) error {
|
||
|
select {
|
||
|
case l.semaphore <- struct{}{}:
|
||
|
return nil
|
||
|
case <-ctx.Done():
|
||
|
return ctx.Err()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (l *rebuildLimiter) ReleaseWorkSlot() {
|
||
|
<-l.semaphore
|
||
|
}
|
||
|
|
||
|
type rebuildTask struct {
|
||
|
limiter RebuildWorkerLimiter
|
||
|
action common.RebuildAction
|
||
|
}
|
||
|
|
||
|
type rebuilder struct {
|
||
|
mtx *sync.Mutex
|
||
|
wg *sync.WaitGroup
|
||
|
cancel func()
|
||
|
limiter RebuildWorkerLimiter
|
||
|
done chan struct{}
|
||
|
tasks chan rebuildTask
|
||
|
}
|
||
|
|
||
|
func newRebuilder(l RebuildWorkerLimiter) *rebuilder {
|
||
|
return &rebuilder{
|
||
|
mtx: &sync.Mutex{},
|
||
|
wg: &sync.WaitGroup{},
|
||
|
limiter: l,
|
||
|
tasks: make(chan rebuildTask, 10),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) {
|
||
|
r.mtx.Lock()
|
||
|
defer r.mtx.Unlock()
|
||
|
|
||
|
if r.done != nil {
|
||
|
return // already started
|
||
|
}
|
||
|
ctx, cancel := context.WithCancel(ctx)
|
||
|
r.cancel = cancel
|
||
|
r.done = make(chan struct{})
|
||
|
r.wg.Add(1)
|
||
|
go func() {
|
||
|
defer r.wg.Done()
|
||
|
for {
|
||
|
select {
|
||
|
case <-r.done:
|
||
|
return
|
||
|
case t, ok := <-r.tasks:
|
||
|
if !ok {
|
||
|
continue
|
||
|
}
|
||
|
runRebuild(ctx, bs, mb, log, t.action, t.limiter)
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return
|
||
|
case r.tasks <- rebuildTask{
|
||
|
limiter: r.limiter,
|
||
|
action: common.RebuildAction{
|
||
|
SchemaChange: true,
|
||
|
},
|
||
|
}:
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func runRebuild(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger,
|
||
|
action common.RebuildAction, limiter RebuildWorkerLimiter,
|
||
|
) {
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return
|
||
|
default:
|
||
|
}
|
||
|
log.Info(logs.BlobstoreRebuildStarted)
|
||
|
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, limiter, action); err != nil {
|
||
|
log.Warn(logs.FailedToRebuildBlobstore, zap.Error(err))
|
||
|
} else {
|
||
|
log.Info(logs.BlobstoreRebuildCompletedSuccessfully)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (r *rebuilder) ScheduleRebuild(ctx context.Context, limiter RebuildWorkerLimiter, action common.RebuildAction,
|
||
|
) error {
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return ctx.Err()
|
||
|
case r.tasks <- rebuildTask{
|
||
|
limiter: limiter,
|
||
|
action: action,
|
||
|
}:
|
||
|
return nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (r *rebuilder) Stop(log *logger.Logger) {
|
||
|
r.mtx.Lock()
|
||
|
defer r.mtx.Unlock()
|
||
|
|
||
|
if r.done != nil {
|
||
|
close(r.done)
|
||
|
}
|
||
|
if r.cancel != nil {
|
||
|
r.cancel()
|
||
|
}
|
||
|
r.wg.Wait()
|
||
|
r.cancel = nil
|
||
|
r.done = nil
|
||
|
log.Info(logs.BlobstoreRebuildStopped)
|
||
|
}
|
||
|
|
||
|
var errMBIsNotAvailable = errors.New("metabase is not available")
|
||
|
|
||
|
type mbStorageIDUpdate struct {
|
||
|
mb *meta.DB
|
||
|
}
|
||
|
|
||
|
func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error {
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return ctx.Err()
|
||
|
default:
|
||
|
}
|
||
|
|
||
|
if u.mb == nil {
|
||
|
return errMBIsNotAvailable
|
||
|
}
|
||
|
|
||
|
var prm meta.UpdateStorageIDPrm
|
||
|
prm.SetAddress(addr)
|
||
|
prm.SetStorageID(storageID)
|
||
|
_, err := u.mb.UpdateStorageID(ctx, prm)
|
||
|
return err
|
||
|
}
|