forked from TrueCloudLab/frostfs-node
WIP: Morph: Add unit tests #2
6 changed files with 0 additions and 56 deletions
|
@ -997,13 +997,6 @@ func (c *cfg) needBootstrap() bool {
|
||||||
return c.cfgNetmap.needBootstrap
|
return c.cfgNetmap.needBootstrap
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObjectServiceLoad implements system loader interface for policer component.
|
|
||||||
// It is calculated as size/capacity ratio of "remote object put" worker.
|
|
||||||
// Returns float value between 0.0 and 1.0.
|
|
||||||
func (c *cfg) ObjectServiceLoad() float64 {
|
|
||||||
return float64(c.cfgObject.pool.putRemote.Running()) / float64(c.cfgObject.pool.putRemoteCapacity)
|
|
||||||
}
|
|
||||||
|
|
||||||
type dCmp struct {
|
type dCmp struct {
|
||||||
name string
|
name string
|
||||||
reloadFunc func() error
|
reloadFunc func() error
|
||||||
|
|
|
@ -259,7 +259,6 @@ func addPolicer(c *cfg, keyStorage *util.KeyStorage, clientConstructor *cache.Cl
|
||||||
}),
|
}),
|
||||||
policer.WithMaxCapacity(c.cfgObject.pool.replicatorPoolSize),
|
policer.WithMaxCapacity(c.cfgObject.pool.replicatorPoolSize),
|
||||||
policer.WithPool(c.cfgObject.pool.replication),
|
policer.WithPool(c.cfgObject.pool.replication),
|
||||||
policer.WithNodeLoader(c),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
c.workers = append(c.workers, worker{
|
c.workers = append(c.workers, worker{
|
||||||
|
|
|
@ -52,7 +52,6 @@ const (
|
||||||
PolicerRoutineStopped = "routine stopped" // Info in ../node/pkg/services/policer/process.go
|
PolicerRoutineStopped = "routine stopped" // Info in ../node/pkg/services/policer/process.go
|
||||||
PolicerFailureAtObjectSelectForReplication = "failure at object select for replication" // Warn in ../node/pkg/services/policer/process.go
|
PolicerFailureAtObjectSelectForReplication = "failure at object select for replication" // Warn in ../node/pkg/services/policer/process.go
|
||||||
PolicerPoolSubmission = "pool submission" // Warn in ../node/pkg/services/policer/process.go
|
PolicerPoolSubmission = "pool submission" // Warn in ../node/pkg/services/policer/process.go
|
||||||
PolicerTuneReplicationCapacity = "tune replication capacity" // Debug in ../node/pkg/services/policer/process.go
|
|
||||||
ReplicatorFinishWork = "finish work" // Debug in ../node/pkg/services/replicator/process.go
|
ReplicatorFinishWork = "finish work" // Debug in ../node/pkg/services/replicator/process.go
|
||||||
ReplicatorCouldNotGetObjectFromLocalStorage = "could not get object from local storage" // Error in ../node/pkg/services/replicator/process.go
|
ReplicatorCouldNotGetObjectFromLocalStorage = "could not get object from local storage" // Error in ../node/pkg/services/replicator/process.go
|
||||||
ReplicatorCouldNotReplicateObject = "could not replicate object" // Error in ../node/pkg/services/replicator/process.go
|
ReplicatorCouldNotReplicateObject = "could not replicate object" // Error in ../node/pkg/services/replicator/process.go
|
||||||
|
|
|
@ -41,12 +41,6 @@ type Replicator interface {
|
||||||
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.
|
// RemoteObjectHeaderFunc is the function to obtain HEAD info from a specific remote node.
|
||||||
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
|
type RemoteObjectHeaderFunc func(context.Context, netmapSDK.NodeInfo, oid.Address) (*objectSDK.Object, error)
|
||||||
|
|
||||||
// NodeLoader provides application load statistics.
|
|
||||||
type nodeLoader interface {
|
|
||||||
// ObjectServiceLoad returns object service load value in [0:1] range.
|
|
||||||
ObjectServiceLoad() float64
|
|
||||||
}
|
|
||||||
|
|
||||||
type cfg struct {
|
type cfg struct {
|
||||||
headTimeout time.Duration
|
headTimeout time.Duration
|
||||||
|
|
||||||
|
@ -70,8 +64,6 @@ type cfg struct {
|
||||||
|
|
||||||
taskPool *ants.Pool
|
taskPool *ants.Pool
|
||||||
|
|
||||||
loader nodeLoader
|
|
||||||
|
|
||||||
maxCapacity int
|
maxCapacity int
|
||||||
|
|
||||||
batchSize, cacheSize uint32
|
batchSize, cacheSize uint32
|
||||||
|
@ -178,10 +170,3 @@ func WithPool(p *ants.Pool) Option {
|
||||||
c.taskPool = p
|
c.taskPool = p
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithNodeLoader returns option to set FrostFS node load source.
|
|
||||||
func WithNodeLoader(l nodeLoader) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.loader = l
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -52,7 +52,6 @@ func TestBuryObjectWithoutContainer(t *testing.T) {
|
||||||
WithContainerSource(containerSrcFunc(containerSrc)),
|
WithContainerSource(containerSrcFunc(containerSrc)),
|
||||||
WithBuryFunc(buryFn),
|
WithBuryFunc(buryFn),
|
||||||
WithPool(pool),
|
WithPool(pool),
|
||||||
WithNodeLoader(constNodeLoader(0)),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
@ -279,7 +278,6 @@ func TestIteratorContract(t *testing.T) {
|
||||||
WithContainerSource(containerSrcFunc(containerSrc)),
|
WithContainerSource(containerSrcFunc(containerSrc)),
|
||||||
WithBuryFunc(buryFn),
|
WithBuryFunc(buryFn),
|
||||||
WithPool(pool),
|
WithPool(pool),
|
||||||
WithNodeLoader(constNodeLoader(0)),
|
|
||||||
func(c *cfg) {
|
func(c *cfg) {
|
||||||
c.sleepDuration = time.Millisecond
|
c.sleepDuration = time.Millisecond
|
||||||
},
|
},
|
||||||
|
@ -372,11 +370,6 @@ type announcedKeysFunc func([]byte) bool
|
||||||
|
|
||||||
func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) }
|
func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) }
|
||||||
|
|
||||||
// constNodeLoader is a nodeLoader that always returns a fixed value.
|
|
||||||
type constNodeLoader float64
|
|
||||||
|
|
||||||
func (f constNodeLoader) ObjectServiceLoad() float64 { return float64(f) }
|
|
||||||
|
|
||||||
// replicatorFunc is a Replicator backed by a function.
|
// replicatorFunc is a Replicator backed by a function.
|
||||||
type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult)
|
type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult)
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,6 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func (p *Policer) Run(ctx context.Context) {
|
func (p *Policer) Run(ctx context.Context) {
|
||||||
go p.poolCapacityWorker(ctx)
|
|
||||||
p.shardPolicyWorker(ctx)
|
p.shardPolicyWorker(ctx)
|
||||||
p.log.Info(logs.PolicerRoutineStopped)
|
p.log.Info(logs.PolicerRoutineStopped)
|
||||||
}
|
}
|
||||||
|
@ -65,27 +64,3 @@ func (p *Policer) shardPolicyWorker(ctx context.Context) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Policer) poolCapacityWorker(ctx context.Context) {
|
|
||||||
ticker := time.NewTicker(p.rebalanceFreq)
|
|
||||||
defer ticker.Stop()
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
frostfsSysLoad := p.loader.ObjectServiceLoad()
|
|
||||||
newCapacity := int((1.0 - frostfsSysLoad) * float64(p.maxCapacity))
|
|
||||||
if newCapacity == 0 {
|
|
||||||
newCapacity++
|
|
||||||
}
|
|
||||||
|
|
||||||
if p.taskPool.Cap() != newCapacity {
|
|
||||||
p.taskPool.Tune(newCapacity)
|
|
||||||
p.log.Debug(logs.PolicerTuneReplicationCapacity,
|
|
||||||
zap.Float64("system_load", frostfsSysLoad),
|
|
||||||
zap.Int("new_capacity", newCapacity))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in a new issue