182 lines
4.6 KiB
Go
182 lines
4.6 KiB
Go
package policer
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"github.com/panjf2000/ants/v2"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// KeySpaceIterator is the interface that allows iterating over the key space
|
|
// of local storage.
|
|
// Note that the underlying implementation might be circular: i.e. it can restart
|
|
// when the end of the key space is reached.
|
|
type KeySpaceIterator interface {
|
|
Next(context.Context, uint32) ([]objectcore.AddressWithType, error)
|
|
Rewind()
|
|
}
|
|
|
|
// RedundantCopyCallback is a callback to pass
|
|
// the redundant local copy of the object.
|
|
type RedundantCopyCallback func(context.Context, oid.Address)
|
|
|
|
// BuryFunc is the function to bury (i.e. inhume) an object.
|
|
type BuryFunc func(context.Context, oid.Address) error
|
|
|
|
// Replicator is the interface to a consumer of replication tasks.
|
|
type Replicator interface {
|
|
HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult)
|
|
}
|
|
|
|
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.
|
|
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
|
|
|
|
type cfg struct {
|
|
headTimeout time.Duration
|
|
|
|
log *logger.Logger
|
|
|
|
keySpaceIterator KeySpaceIterator
|
|
|
|
buryFn BuryFunc
|
|
|
|
cnrSrc container.Source
|
|
|
|
placementBuilder placement.Builder
|
|
|
|
remoteHeader RemoteObjectHeaderFunc
|
|
|
|
netmapKeys netmap.AnnouncedKeys
|
|
|
|
replicator Replicator
|
|
|
|
cbRedundantCopy RedundantCopyCallback
|
|
|
|
taskPool *ants.Pool
|
|
|
|
maxCapacity int
|
|
|
|
batchSize, cacheSize uint32
|
|
|
|
rebalanceFreq, evictDuration, sleepDuration time.Duration
|
|
|
|
metrics MetricsRegister
|
|
}
|
|
|
|
func defaultCfg() *cfg {
|
|
return &cfg{
|
|
log: &logger.Logger{Logger: zap.L()},
|
|
batchSize: 10,
|
|
cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB
|
|
rebalanceFreq: 1 * time.Second,
|
|
sleepDuration: 1 * time.Second,
|
|
evictDuration: 30 * time.Second,
|
|
metrics: noopMetrics{},
|
|
}
|
|
}
|
|
|
|
// Option is an option for Policer constructor.
|
|
type Option func(*cfg)
|
|
|
|
// WithHeadTimeout returns option to set Head timeout of Policer.
|
|
func WithHeadTimeout(v time.Duration) Option {
|
|
return func(c *cfg) {
|
|
c.headTimeout = v
|
|
}
|
|
}
|
|
|
|
// WithLogger returns option to set Logger of Policer.
|
|
func WithLogger(v *logger.Logger) Option {
|
|
return func(c *cfg) {
|
|
c.log = v
|
|
}
|
|
}
|
|
|
|
func WithKeySpaceIterator(it KeySpaceIterator) Option {
|
|
return func(c *cfg) {
|
|
c.keySpaceIterator = it
|
|
}
|
|
}
|
|
|
|
func WithBuryFunc(f BuryFunc) Option {
|
|
return func(c *cfg) {
|
|
c.buryFn = f
|
|
}
|
|
}
|
|
|
|
// WithContainerSource returns option to set container source of Policer.
|
|
func WithContainerSource(v container.Source) Option {
|
|
return func(c *cfg) {
|
|
c.cnrSrc = v
|
|
}
|
|
}
|
|
|
|
// WithPlacementBuilder returns option to set object placement builder of Policer.
|
|
func WithPlacementBuilder(v placement.Builder) Option {
|
|
return func(c *cfg) {
|
|
c.placementBuilder = v
|
|
}
|
|
}
|
|
|
|
// WithRemoteObjectHeader returns option to set object header receiver of Policer.
|
|
func WithRemoteObjectHeaderFunc(v RemoteObjectHeaderFunc) Option {
|
|
return func(c *cfg) {
|
|
c.remoteHeader = v
|
|
}
|
|
}
|
|
|
|
// WithNetmapKeys returns option to set tool to work with announced public keys.
|
|
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
|
|
return func(c *cfg) {
|
|
c.netmapKeys = v
|
|
}
|
|
}
|
|
|
|
// WithReplicator returns option to set object replicator of Policer.
|
|
func WithReplicator(v Replicator) Option {
|
|
return func(c *cfg) {
|
|
c.replicator = v
|
|
}
|
|
}
|
|
|
|
// WithRedundantCopyCallback returns option to set
|
|
// callback to pass redundant local object copies
|
|
// detected by Policer.
|
|
func WithRedundantCopyCallback(cb RedundantCopyCallback) Option {
|
|
return func(c *cfg) {
|
|
c.cbRedundantCopy = cb
|
|
}
|
|
}
|
|
|
|
// WithMaxCapacity returns option to set max capacity
|
|
// that can be set to the pool.
|
|
func WithMaxCapacity(capacity int) Option {
|
|
return func(c *cfg) {
|
|
c.maxCapacity = capacity
|
|
}
|
|
}
|
|
|
|
// WithPool returns option to set pool for
|
|
// policy and replication operations.
|
|
func WithPool(p *ants.Pool) Option {
|
|
return func(c *cfg) {
|
|
c.taskPool = p
|
|
}
|
|
}
|
|
|
|
// WithMetrics returns option to set metrics.
|
|
func WithMetrics(m MetricsRegister) Option {
|
|
return func(c *cfg) {
|
|
c.metrics = m
|
|
}
|
|
}
|