diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go new file mode 100644 index 000000000..85f2a6c41 --- /dev/null +++ b/pkg/services/policer/check.go @@ -0,0 +1,103 @@ +package policer + +import ( + "context" + "strings" + + "github.com/nspcc-dev/neofs-api-go/pkg/netmap" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/network" + headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" + "go.uber.org/zap" +) + +func (p *Policer) processObject(ctx context.Context, addr *object.Address) { + cnr, err := p.cnrSrc.Get(addr.GetContainerID()) + if err != nil { + p.log.Error("could not get container", + zap.String("error", err.Error()), + ) + + return + } + + policy := cnr.GetPlacementPolicy() + + nn, err := p.placementBuilder.BuildPlacement(addr, policy) + if err != nil { + p.log.Error("could not build placement vector for object", + zap.String("error", err.Error()), + ) + + return + } + + replicas := policy.GetReplicas() + + for i := range nn { + select { + case <-ctx.Done(): + return + default: + } + + p.processNodes(ctx, addr, nn[i], replicas[i].GetCount()) + } +} + +func (p *Policer) processNodes(ctx context.Context, addr *object.Address, nodes netmap.Nodes, shortage uint32) { + prm := new(headsvc.RemoteHeadPrm).WithObjectAddress(addr) + + for i := 0; shortage > 0 && i < len(nodes); i++ { + select { + case <-ctx.Done(): + return + default: + } + + netAddr := nodes[i].NetworkAddress() + + log := p.log.With(zap.String("node", netAddr)) + + node, err := network.AddressFromString(netAddr) + if err != nil { + log.Error("could not parse network address") + + continue + } + + if network.IsLocalAddress(p.localAddrSrc, node) { + shortage-- + } else { + callCtx, cancel := context.WithTimeout(ctx, p.headTimeout) + + _, err = p.remoteHeader.Head(callCtx, prm.WithNodeAddress(node)) + + cancel() + + if err != nil { + // FIXME: this is a temporary solution to resolve 404 response from remote node + // We need to distinguish problem nodes from nodes without an object. + if strings.Contains(err.Error(), headsvc.ErrNotFound.Error()) { + continue + } else { + log.Error("could not receive object header", + zap.String("error", err.Error()), + ) + } + } else { + shortage-- + } + } + + nodes = append(nodes[:i], nodes[i+1:]...) + i-- + } + + if shortage > 0 { + p.log.Info("shortage of object copies detected", + zap.Uint32("shortage", shortage), + ) + // TODO: send task to replicator + } +} diff --git a/pkg/services/policer/policer.go b/pkg/services/policer/policer.go new file mode 100644 index 000000000..8a0d5e57d --- /dev/null +++ b/pkg/services/policer/policer.go @@ -0,0 +1,140 @@ +package policer + +import ( + "sync" + "time" + + "github.com/nspcc-dev/neofs-node/pkg/core/container" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" + "github.com/nspcc-dev/neofs-node/pkg/network" + headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" + "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" + "github.com/nspcc-dev/neofs-node/pkg/util/logger" + "go.uber.org/zap" +) + +// Policer represents the utility that verifies +// compliance with the object storage policy. +type Policer struct { + *cfg + + prevTask prevTask +} + +// Option is an option for Policer constructor. +type Option func(*cfg) + +type cfg struct { + headTimeout time.Duration + + workScope workScope + + log *logger.Logger + + trigger <-chan *Task + + jobQueue jobQueue + + cnrSrc container.Source + + placementBuilder placement.Builder + + remoteHeader *headsvc.RemoteHeader + + localAddrSrc network.LocalAddressSource +} + +func defaultCfg() *cfg { + return &cfg{ + log: zap.L(), + } +} + +// New creates, initializes and returns Policer instance. +func New(opts ...Option) *Policer { + c := defaultCfg() + + for i := range opts { + opts[i](c) + } + + c.log = c.log.With(zap.String("component", "Object Policer")) + + return &Policer{ + cfg: c, + prevTask: prevTask{ + cancel: func() {}, + wait: new(sync.WaitGroup), + }, + } +} + +// WithHeadTimeout returns option to set Head timeout of Policer. +func WithHeadTimeout(v time.Duration) Option { + return func(c *cfg) { + c.headTimeout = v + } +} + +// WithWorkScope returns option to set job work scope value of Policer. +func WithWorkScope(v int) Option { + return func(c *cfg) { + c.workScope.val = v + } +} + +// WithExpansionRate returns option to set expansion rate of Policer's works scope (in %). +func WithExpansionRate(v int) Option { + return func(c *cfg) { + c.workScope.expRate = v + } +} + +// WithTrigger returns option to set triggering channel of Policer. +func WithTrigger(v <-chan *Task) Option { + return func(c *cfg) { + c.trigger = v + } +} + +// WithLogger returns option to set Logger of Policer. +func WithLogger(v *logger.Logger) Option { + return func(c *cfg) { + c.log = v + } +} + +// WithLocalStorage returns option to set local object storage of Policer. +func WithLocalStorage(v *localstore.Storage) Option { + return func(c *cfg) { + c.jobQueue.localStorage = v + } +} + +// WithContainerSource returns option to set container source of Policer. +func WithContainerSource(v container.Source) Option { + return func(c *cfg) { + c.cnrSrc = v + } +} + +// WithPlacementBuilder returns option to set object placement builder of Policer. +func WithPlacementBuilder(v placement.Builder) Option { + return func(c *cfg) { + c.placementBuilder = v + } +} + +// WithRemoteHeader returns option to set object header receiver of Policer. +func WithRemoteHeader(v *headsvc.RemoteHeader) Option { + return func(c *cfg) { + c.remoteHeader = v + } +} + +// WithLocalAddressSource returns option to set local address source of Policer. +func WithLocalAddressSource(v network.LocalAddressSource) Option { + return func(c *cfg) { + c.localAddrSrc = v + } +} diff --git a/pkg/services/policer/process.go b/pkg/services/policer/process.go new file mode 100644 index 000000000..eb9fbd6b5 --- /dev/null +++ b/pkg/services/policer/process.go @@ -0,0 +1,114 @@ +package policer + +import ( + "context" + "sync" + + "go.uber.org/zap" +) + +// Task represents group of Policer tact parameters. +type Task struct{} + +type prevTask struct { + undone int + + cancel context.CancelFunc + + wait *sync.WaitGroup +} + +type workScope struct { + val int + + expRate int // in % +} + +func (p *Policer) Run(ctx context.Context) { + defer func() { + p.log.Info("routine stopped") + }() + + p.log.Info("process routine", + zap.Int("work scope value", p.workScope.val), + zap.Int("expansion rate (%)", p.workScope.val), + zap.Duration("head timeout", p.headTimeout), + ) + + for { + select { + case <-ctx.Done(): + p.prevTask.cancel() + + p.log.Warn("context is done", + zap.String("error", ctx.Err().Error()), + ) + + return + case task, ok := <-p.trigger: + if !ok { + p.log.Warn("trigger channel is closed") + + return + } + + p.prevTask.cancel() + p.prevTask.wait.Wait() + + var taskCtx context.Context + + taskCtx, p.prevTask.cancel = context.WithCancel(ctx) + + go p.handleTask(taskCtx, task) + } + } +} + +func (p *Policer) handleTask(ctx context.Context, task *Task) { + p.prevTask.wait.Add(1) + + defer func() { + p.prevTask.wait.Done() + p.log.Info("finish work", + zap.Int("amount of unfinished objects", p.prevTask.undone), + ) + }() + + var delta int + + // undone - amount of objects we couldn't process in last epoch + if p.prevTask.undone > 0 { + // if there are unprocessed objects, then lower your estimation + delta = -p.prevTask.undone + } else { + // otherwise try to expand + delta = p.workScope.val * p.workScope.expRate / 100 + } + + addrs, err := p.jobQueue.Select(p.workScope.val + delta) + if err != nil { + p.log.Warn("could not select objects", + zap.String("error", err.Error()), + ) + } + + // if there are NOT enough objects to fill the pool, do not change it + // otherwise expand or shrink it with the delta value + if len(addrs) >= p.workScope.val+delta { + p.workScope.val += delta + } + + p.prevTask.undone = len(addrs) + + for i := range addrs { + select { + case <-ctx.Done(): + return + default: + } + + p.processObject(ctx, addrs[i]) + + p.prevTask.undone-- + } +} diff --git a/pkg/services/policer/queue.go b/pkg/services/policer/queue.go new file mode 100644 index 000000000..63348a0a0 --- /dev/null +++ b/pkg/services/policer/queue.go @@ -0,0 +1,28 @@ +package policer + +import ( + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/localstore" +) + +type jobQueue struct { + localStorage *localstore.Storage +} + +func (q *jobQueue) Select(limit int) ([]*object.Address, error) { + // TODO: optimize the logic for selecting objects + // We can prioritize objects for migration, newly arrived objects, etc. + // It is recommended to make changes after updating the metabase + + res := make([]*object.Address, 0, limit) + + if err := q.localStorage.Iterate(nil, func(meta *localstore.ObjectMeta) bool { + res = append(res, meta.Head().Address()) + + return len(res) >= limit + }); err != nil { + return nil, err + } + + return res, nil +}