frostfs-node/pkg/services/policer/option.go

173 lines
4.4 KiB
Go
Raw Permalink Normal View History

package policer
import (
"context"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
netmapSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
// KeySpaceIterator is the interface that allows iterating over the key space
// of local storage.
// Note that the underlying implementation might be circular: i.e. it can restart
// when the end of the key space is reached.
type KeySpaceIterator interface {
Next(context.Context, uint32) ([]objectcore.AddressWithType, error)
Rewind()
}
// RedundantCopyCallback is a callback to pass
// the redundant local copy of the object.
type RedundantCopyCallback func(context.Context, oid.Address)
// BuryFunc is the function to bury (i.e. inhume) an object.
type BuryFunc func(context.Context, oid.Address) error
// Replicator is the interface to a consumer of replication tasks.
type Replicator interface {
HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult)
}
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
type cfg struct {
headTimeout time.Duration
log *logger.Logger
keySpaceIterator KeySpaceIterator
buryFn BuryFunc
cnrSrc container.Source
placementBuilder placement.Builder
remoteHeader RemoteObjectHeaderFunc
netmapKeys netmap.AnnouncedKeys
replicator Replicator
cbRedundantCopy RedundantCopyCallback
taskPool *ants.Pool
maxCapacity int
batchSize, cacheSize uint32
rebalanceFreq, evictDuration, sleepDuration time.Duration
}
func defaultCfg() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
batchSize: 10,
cacheSize: 1024, // 1024 * address size = 1024 * 64 = 64 MiB
rebalanceFreq: 1 * time.Second,
sleepDuration: 1 * time.Second,
evictDuration: 30 * time.Second,
}
}
// Option is an option for Policer constructor.
type Option func(*cfg)
// WithHeadTimeout returns option to set Head timeout of Policer.
func WithHeadTimeout(v time.Duration) Option {
return func(c *cfg) {
c.headTimeout = v
}
}
// WithLogger returns option to set Logger of Policer.
func WithLogger(v *logger.Logger) Option {
return func(c *cfg) {
c.log = v
}
}
func WithKeySpaceIterator(it KeySpaceIterator) Option {
return func(c *cfg) {
c.keySpaceIterator = it
}
}
func WithBuryFunc(f BuryFunc) Option {
return func(c *cfg) {
c.buryFn = f
}
}
// WithContainerSource returns option to set container source of Policer.
func WithContainerSource(v container.Source) Option {
return func(c *cfg) {
c.cnrSrc = v
}
}
// WithPlacementBuilder returns option to set object placement builder of Policer.
func WithPlacementBuilder(v placement.Builder) Option {
return func(c *cfg) {
c.placementBuilder = v
}
}
// WithRemoteObjectHeader returns option to set object header receiver of Policer.
func WithRemoteObjectHeaderFunc(v RemoteObjectHeaderFunc) Option {
return func(c *cfg) {
c.remoteHeader = v
}
}
// WithNetmapKeys returns option to set tool to work with announced public keys.
func WithNetmapKeys(v netmap.AnnouncedKeys) Option {
return func(c *cfg) {
c.netmapKeys = v
}
}
// WithReplicator returns option to set object replicator of Policer.
func WithReplicator(v Replicator) Option {
return func(c *cfg) {
c.replicator = v
}
}
// WithRedundantCopyCallback returns option to set
// callback to pass redundant local object copies
// detected by Policer.
func WithRedundantCopyCallback(cb RedundantCopyCallback) Option {
return func(c *cfg) {
c.cbRedundantCopy = cb
}
}
// WithMaxCapacity returns option to set max capacity
// that can be set to the pool.
func WithMaxCapacity(capacity int) Option {
return func(c *cfg) {
c.maxCapacity = capacity
}
}
// WithPool returns option to set pool for
// policy and replication operations.
func WithPool(p *ants.Pool) Option {
return func(c *cfg) {
c.taskPool = p
}
}