package shard import ( "context" "errors" "sync" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/blobstor" meta "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/metabase" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" ) type rebuilder struct { mtx *sync.Mutex wg *sync.WaitGroup cancel func() limiter RebuildWorkerLimiter } func newRebuilder(l RebuildWorkerLimiter) *rebuilder { return &rebuilder{ mtx: &sync.Mutex{}, wg: &sync.WaitGroup{}, cancel: nil, limiter: l, } } func (r *rebuilder) Start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) { r.mtx.Lock() defer r.mtx.Unlock() r.start(ctx, bs, mb, log) } func (r *rebuilder) start(ctx context.Context, bs *blobstor.BlobStor, mb *meta.DB, log *logger.Logger) { if r.cancel != nil { r.stop(log) } ctx, cancel := context.WithCancel(ctx) r.cancel = cancel r.wg.Add(1) go func() { defer r.wg.Done() log.Info(logs.BlobstoreRebuildStarted) if err := bs.Rebuild(ctx, &mbStorageIDUpdate{mb: mb}, r.limiter); err != nil { log.Warn(logs.FailedToRebuildBlobstore, zap.Error(err)) } else { log.Info(logs.BlobstoreRebuildCompletedSuccessfully) } }() } func (r *rebuilder) Stop(log *logger.Logger) { r.mtx.Lock() defer r.mtx.Unlock() r.stop(log) } func (r *rebuilder) stop(log *logger.Logger) { if r.cancel == nil { return } r.cancel() r.wg.Wait() r.cancel = nil log.Info(logs.BlobstoreRebuildStopped) } var errMBIsNotAvailable = errors.New("metabase is not available") type mbStorageIDUpdate struct { mb *meta.DB } func (u *mbStorageIDUpdate) UpdateStorageID(ctx context.Context, addr oid.Address, storageID []byte) error { select { case <-ctx.Done(): return ctx.Err() default: } if u.mb == nil { return errMBIsNotAvailable } var prm meta.UpdateStorageIDPrm prm.SetAddress(addr) prm.SetStorageID(storageID) _, err := u.mb.UpdateStorageID(prm) return err }