forked from TrueCloudLab/frostfs-node
[#661] shard: Add blobstor rebuilder
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
1f07e8b375
commit
4e05ce1c3a
5 changed files with 121 additions and 0 deletions
|
@ -512,4 +512,8 @@ const (
|
||||||
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
|
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
|
||||||
FailedToCountWritecacheItems = "failed to count writecache items"
|
FailedToCountWritecacheItems = "failed to count writecache items"
|
||||||
AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza"
|
AttemtToCloseAlreadyClosedBlobovnicza = "attempt to close an already closed blobovnicza"
|
||||||
|
FailedToRebuildBlobstore = "failed to rebuild blobstore"
|
||||||
|
BlobstoreRebuildStarted = "blobstore rebuild started"
|
||||||
|
BlobstoreRebuildCompletedSuccessfully = "blobstore rebuild completed successfully"
|
||||||
|
BlobstoreRebuildStopped = "blobstore rebuild stopped"
|
||||||
)
|
)
|
||||||
|
|
15
pkg/local_object_storage/blobstor/rebuild.go
Normal file
15
pkg/local_object_storage/blobstor/rebuild.go
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
package blobstor
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
)
|
||||||
|
|
||||||
|
type StorageIDUpdate interface {
|
||||||
|
UpdateStorageID(ctx context.Context, obj *objectSDK.Object, storageID []byte) error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *BlobStor) Rebuild(_ context.Context, _ StorageIDUpdate) error {
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -162,6 +162,9 @@ func (s *Shard) Init(ctx context.Context) error {
|
||||||
|
|
||||||
s.gc.init(ctx)
|
s.gc.init(ctx)
|
||||||
|
|
||||||
|
s.rb = newRebuilder()
|
||||||
|
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -266,6 +269,9 @@ func (s *Shard) refillTombstoneObject(ctx context.Context, obj *objectSDK.Object
|
||||||
|
|
||||||
// Close releases all Shard's components.
|
// Close releases all Shard's components.
|
||||||
func (s *Shard) Close() error {
|
func (s *Shard) Close() error {
|
||||||
|
if s.rb != nil {
|
||||||
|
s.rb.Stop(s.log)
|
||||||
|
}
|
||||||
components := []interface{ Close() error }{}
|
components := []interface{ Close() error }{}
|
||||||
|
|
||||||
if s.pilorama != nil {
|
if s.pilorama != nil {
|
||||||
|
@ -310,6 +316,11 @@ func (s *Shard) Reload(ctx context.Context, opts ...Option) error {
|
||||||
unlock := s.lockExclusive()
|
unlock := s.lockExclusive()
|
||||||
defer unlock()
|
defer unlock()
|
||||||
|
|
||||||
|
s.rb.Stop(s.log)
|
||||||
|
defer func() {
|
||||||
|
s.rb.Start(ctx, s.blobStor, s.metaBase, s.log)
|
||||||
|
}()
|
||||||
|
|
||||||
ok, err := s.metaBase.Reload(c.metaOpts...)
|
ok, err := s.metaBase.Reload(c.metaOpts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, meta.ErrDegradedMode) {
|
if errors.Is(err, meta.ErrDegradedMode) {
|
||||||
|
|
89
pkg/local_object_storage/shard/rebuilder.go
Normal file
89
pkg/local_object_storage/shard/rebuilder.go
Normal file
|
@ -0,0 +1,89 @@
|
||||||
|
package shard
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor"
|
||||||
|
meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type rebuilder struct {
|
||||||
|
mtx *sync.Mutex
|
||||||
|
wg *sync.WaitGroup
|
||||||
|
cancel func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func newRebuilder() *rebuilder {
|
||||||
|
return &rebuilder{
|
||||||
|
mtx: &sync.Mutex{},
|
||||||
|
wg: &sync.WaitGroup{},
|
||||||
|
cancel: nil,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) {
|
||||||
|
r.mtx.Lock()
|
||||||
|
defer r.mtx.Unlock()
|
||||||
|
|
||||||
|
r.start(ctx, bs, mb, log)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rebuilder) start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) {
|
||||||
|
if r.cancel != nil {
|
||||||
|
r.stop(log)
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
r.cancel = cancel
|
||||||
|
r.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer r.wg.Done()
|
||||||
|
|
||||||
|
log.Info(logs.BlobstoreRebuildStarted)
|
||||||
|
if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}); err != nil {
|
||||||
|
log.Warn(logs.FailedToRebuildBlobstore, zap.Error(err))
|
||||||
|
} else {
|
||||||
|
log.Info(logs.BlobstoreRebuildCompletedSuccessfully)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rebuilder) Stop(log *logger.Logger) {
|
||||||
|
r.mtx.Lock()
|
||||||
|
defer r.mtx.Unlock()
|
||||||
|
|
||||||
|
r.stop(log)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *rebuilder) stop(log *logger.Logger) {
|
||||||
|
if r.cancel == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
r.cancel()
|
||||||
|
r.wg.Wait()
|
||||||
|
r.cancel = nil
|
||||||
|
log.Info(logs.BlobstoreRebuildStopped)
|
||||||
|
}
|
||||||
|
|
||||||
|
var errMBIsNotAvailable = errors.New("metabase is not available")
|
||||||
|
|
||||||
|
type mbStorageIDUpdate struct {
|
||||||
|
mb *meta.DB
|
||||||
|
}
|
||||||
|
|
||||||
|
func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, obj *objectSDK.Object, storageID []byte) error {
|
||||||
|
if u.mb == nil {
|
||||||
|
return errMBIsNotAvailable
|
||||||
|
}
|
||||||
|
var prm meta.PutPrm
|
||||||
|
prm.SetObject(obj)
|
||||||
|
prm.SetStorageID(storageID)
|
||||||
|
_, err := u.mb.Put(ctx, prm)
|
||||||
|
return err
|
||||||
|
}
|
|
@ -38,6 +38,8 @@ type Shard struct {
|
||||||
|
|
||||||
tsSource TombstoneSource
|
tsSource TombstoneSource
|
||||||
|
|
||||||
|
rb *rebuilder
|
||||||
|
|
||||||
gcCancel atomic.Value
|
gcCancel atomic.Value
|
||||||
setModeRequested atomic.Bool
|
setModeRequested atomic.Bool
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue