forked from TrueCloudLab/frostfs-node
[#1508] node: Remove unused replicator code
The node does not support asynchronous object replication anymore, so it does not need to have replicator worker, channel and `AddTask` function. Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
This commit is contained in:
parent
2e4a1cb6df
commit
df8a3807fe
4 changed files with 0 additions and 50 deletions
|
@ -217,8 +217,6 @@ func initObjectService(c *cfg) {
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
c.workers = append(c.workers, repl)
|
|
||||||
|
|
||||||
pol := policer.New(
|
pol := policer.New(
|
||||||
policer.WithLogger(c.log),
|
policer.WithLogger(c.log),
|
||||||
policer.WithLocalStorage(ls),
|
policer.WithLocalStorage(ls),
|
||||||
|
|
|
@ -9,39 +9,6 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (p *Replicator) Run(ctx context.Context) {
|
|
||||||
defer func() {
|
|
||||||
close(p.ch)
|
|
||||||
p.log.Info("routine stopped")
|
|
||||||
}()
|
|
||||||
|
|
||||||
p.ch = make(chan *Task, p.taskCap)
|
|
||||||
|
|
||||||
p.log.Info("process routine",
|
|
||||||
zap.Uint32("task queue capacity", p.taskCap),
|
|
||||||
zap.Duration("put timeout", p.putTimeout),
|
|
||||||
)
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
p.log.Warn("context is done",
|
|
||||||
zap.String("error", ctx.Err().Error()),
|
|
||||||
)
|
|
||||||
|
|
||||||
return
|
|
||||||
case task, ok := <-p.ch:
|
|
||||||
if !ok {
|
|
||||||
p.log.Warn("trigger channel is closed")
|
|
||||||
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
p.HandleTask(ctx, task)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// HandleTask executes replication task inside invoking goroutine.
|
// HandleTask executes replication task inside invoking goroutine.
|
||||||
func (p *Replicator) HandleTask(ctx context.Context, task *Task) {
|
func (p *Replicator) HandleTask(ctx context.Context, task *Task) {
|
||||||
defer func() {
|
defer func() {
|
||||||
|
|
|
@ -13,16 +13,12 @@ import (
|
||||||
// local objects to remote nodes.
|
// local objects to remote nodes.
|
||||||
type Replicator struct {
|
type Replicator struct {
|
||||||
*cfg
|
*cfg
|
||||||
|
|
||||||
ch chan *Task
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Option is an option for Policer constructor.
|
// Option is an option for Policer constructor.
|
||||||
type Option func(*cfg)
|
type Option func(*cfg)
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
taskCap uint32
|
|
||||||
|
|
||||||
putTimeout time.Duration
|
putTimeout time.Duration
|
||||||
|
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
|
|
@ -14,17 +14,6 @@ type Task struct {
|
||||||
nodes netmap.Nodes
|
nodes netmap.Nodes
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddTask pushes replication task to Replicator queue.
|
|
||||||
//
|
|
||||||
// If task queue is full, log message is written.
|
|
||||||
func (p *Replicator) AddTask(t *Task) {
|
|
||||||
select {
|
|
||||||
case p.ch <- t:
|
|
||||||
default:
|
|
||||||
p.log.Warn("task queue is full")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithCopiesNumber sets number of copies to replicate.
|
// WithCopiesNumber sets number of copies to replicate.
|
||||||
func (t *Task) WithCopiesNumber(v uint32) *Task {
|
func (t *Task) WithCopiesNumber(v uint32) *Task {
|
||||||
if t != nil {
|
if t != nil {
|
||||||
|
|
Loading…
Reference in a new issue