diff --git a/pkg/services/replicator/process.go b/pkg/services/replicator/process.go new file mode 100644 index 00000000..67074877 --- /dev/null +++ b/pkg/services/replicator/process.go @@ -0,0 +1,95 @@ +package replicator + +import ( + "context" + + "github.com/nspcc-dev/neofs-node/pkg/network" + putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" + "go.uber.org/zap" +) + +func (p *Replicator) Run(ctx context.Context) { + defer func() { + close(p.ch) + p.log.Info("routine stopped") + }() + + p.ch = make(chan *Task, p.taskCap) + + p.log.Info("process routine", + zap.Uint32("task queue capacity", p.taskCap), + zap.Duration("put timeout", p.putTimeout), + ) + + for { + select { + case <-ctx.Done(): + p.log.Warn("context is done", + zap.String("error", ctx.Err().Error()), + ) + + return + case task, ok := <-p.ch: + if !ok { + p.log.Warn("trigger channel is closed") + + return + } + + p.handleTask(ctx, task) + } + } +} + +func (p *Replicator) handleTask(ctx context.Context, task *Task) { + defer func() { + p.log.Info("finish work", + zap.Uint32("amount of unfinished replicas", task.quantity), + ) + }() + + obj, err := p.localStorage.Get(task.addr) + if err != nil { + p.log.Error("could not get object from local storage") + + return + } + + prm := new(putsvc.RemotePutPrm). + WithObject(obj) + + for i := 0; task.quantity > 0 && i < len(task.nodes); i++ { + select { + case <-ctx.Done(): + return + default: + } + + netAddr := task.nodes[i].NetworkAddress() + + log := p.log.With(zap.String("node", netAddr)) + + node, err := network.AddressFromString(netAddr) + if err != nil { + log.Error("could not parse network address") + + continue + } + + callCtx, cancel := context.WithTimeout(ctx, p.putTimeout) + + err = p.remoteSender.PutObject(callCtx, prm.WithNodeAddress(node)) + + cancel() + + if err != nil { + log.Error("could not replicate object", + zap.String("error", err.Error()), + ) + } else { + log.Info("object successfully replicated") + + task.quantity-- + } + } +} diff --git a/pkg/services/replicator/replicator.go b/pkg/services/replicator/replicator.go new file mode 100644 index 00000000..92000373 --- /dev/null +++ b/pkg/services/replicator/replicator.go @@ -0,0 +1,80 @@ +package replicator + +import ( + "time" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" + putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// Replicator represents the utility that replicates +// local objects to remote nodes. +type Replicator struct { + *cfg + + ch chan *Task +} + +// Option is an option for Policer constructor. +type Option func(*cfg) + +type cfg struct { + taskCap uint32 + + putTimeout time.Duration + + log *logger.Logger + + remoteSender *putsvc.RemoteSender + + localStorage *localstore.Storage +} + +func defaultCfg() *cfg { + return &cfg{} +} + +// New creates, initializes and returns Replicator instance. +func New(opts ...Option) *Replicator { + c := defaultCfg() + + for i := range opts { + opts[i](c) + } + + c.log = c.log.With(zap.String("component", "Object Replicator")) + + return &Replicator{ + cfg: c, + } +} + +// WithPutTimeout returns option to set Put timeout of Replicator. +func WithPutTimeout(v time.Duration) Option { + return func(c *cfg) { + c.putTimeout = v + } +} + +// WithLogger returns option to set Logger of Replicator. +func WithLogger(v *logger.Logger) Option { + return func(c *cfg) { + c.log = v + } +} + +// WithRemoteSender returns option to set remote object sender of Replicator. +func WithRemoteSender(v *putsvc.RemoteSender) Option { + return func(c *cfg) { + c.remoteSender = v + } +} + +// WithLocalStorage returns option to set local object storage of Replicator. +func WithLocalStorage(v *localstore.Storage) Option { + return func(c *cfg) { + c.localStorage = v + } +} diff --git a/pkg/services/replicator/task.go b/pkg/services/replicator/task.go new file mode 100644 index 00000000..1521a9c7 --- /dev/null +++ b/pkg/services/replicator/task.go @@ -0,0 +1,53 @@ +package replicator + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/netmap" + "github.com/nspcc-dev/neofs-api-go/pkg/object" +) + +// Task represents group of Replicator task parameters. +type Task struct { + quantity uint32 + + addr *object.Address + + nodes netmap.Nodes +} + +// AddTask pushes replication task to Replicator queue. +// +// If task queue is full, log message is written. +func (p *Replicator) AddTask(t *Task) { + select { + case p.ch <- t: + default: + p.log.Warn("task queue is full") + } +} + +// WithCopiesNumber sets number of copies to replicate. +func (t *Task) WithCopiesNumber(v uint32) *Task { + if t != nil { + t.quantity = v + } + + return t +} + +// WithObjectAddress sets address of local object. +func (t *Task) WithObjectAddress(v *object.Address) *Task { + if t != nil { + t.addr = v + } + + return t +} + +// WithNodes sets list of potential object holders. +func (t *Task) WithNodes(v netmap.Nodes) *Task { + if t != nil { + t.nodes = v + } + + return t +}