forked from TrueCloudLab/frostfs-node
[#328] container/load: Implement local storage of used space announcements
Implement a component that stores the values of the used space of containers. The storage allows you to write several values for a fixed container and epoch number, and read the averaged estimates of all accumulated values. All values are stored in memory. This component is planned to be used as an accumulator of opinions from various network participants about the fullness of the container. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
cc88320d6b
commit
2622e11ae3
2 changed files with 202 additions and 0 deletions
141
pkg/services/container/announcement/load/storage/storage.go
Normal file
141
pkg/services/container/announcement/load/storage/storage.go
Normal file
|
@ -0,0 +1,141 @@
|
|||
package loadstorage
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
loadcontroller "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/controller"
|
||||
)
|
||||
|
||||
type usedSpaceEstimations struct {
|
||||
announcement container.UsedSpaceAnnouncement
|
||||
|
||||
sizes []uint64
|
||||
}
|
||||
|
||||
// Storage represents in-memory storage of
|
||||
// UsedSpaceAnnouncement values.
|
||||
//
|
||||
// The write operation has the usual behavior - to save
|
||||
// the next number of used container space for a specific epoch.
|
||||
// All values related to one key (epoch, container ID) are stored
|
||||
// as a list.
|
||||
//
|
||||
// Storage also provides an iterator interface, into the handler
|
||||
// of which the final score is passed, built on all values saved
|
||||
// at the time of the call. Currently the only possible estimation
|
||||
// formula is used - the average between 10th and 90th percentile.
|
||||
//
|
||||
// For correct operation, Storage must be created
|
||||
// using the constructor (New) based on the required parameters
|
||||
// and optional components. After successful creation,
|
||||
// Storage is immediately ready to work through API.
|
||||
type Storage struct {
|
||||
mtx sync.RWMutex
|
||||
|
||||
mItems map[uint64]*usedSpaceEstimations
|
||||
}
|
||||
|
||||
// Prm groups the required parameters of the Storage's constructor.
|
||||
//
|
||||
// The component is not parameterizable at the moment.
|
||||
type Prm struct{}
|
||||
|
||||
// New creates a new instance of the Storage.
|
||||
//
|
||||
// The created Storage does not require additional
|
||||
// initialization and is completely ready for work.
|
||||
func New(_ Prm) *Storage {
|
||||
return &Storage{
|
||||
mItems: make(map[uint64]*usedSpaceEstimations),
|
||||
}
|
||||
}
|
||||
|
||||
// Put appends the next value of the occupied container space for the epoch
|
||||
// to the list of already saved values.
|
||||
//
|
||||
// Always returns nil error.
|
||||
func (s *Storage) Put(a container.UsedSpaceAnnouncement) error {
|
||||
s.mtx.Lock()
|
||||
|
||||
{
|
||||
key := a.Epoch()
|
||||
|
||||
estimations, ok := s.mItems[key]
|
||||
if !ok {
|
||||
estimations = &usedSpaceEstimations{
|
||||
announcement: a,
|
||||
sizes: make([]uint64, 0, 1),
|
||||
}
|
||||
|
||||
s.mItems[key] = estimations
|
||||
}
|
||||
|
||||
estimations.sizes = append(estimations.sizes, a.UsedSpace())
|
||||
}
|
||||
|
||||
s.mtx.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Storage) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Iterate goes through all the lists with the key (container ID, epoch),
|
||||
// calculates the final grade for all values, and passes it to the handler.
|
||||
//
|
||||
// Final grade is the average between 10th and 90th percentiles.
|
||||
func (s *Storage) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) (err error) {
|
||||
s.mtx.RLock()
|
||||
|
||||
{
|
||||
for _, v := range s.mItems {
|
||||
if f(v.announcement) {
|
||||
// calculate estimation based on 90th percentile
|
||||
v.announcement.SetUsedSpace(finalEstimation(v.sizes))
|
||||
|
||||
if err = h(v.announcement); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
s.mtx.RUnlock()
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func finalEstimation(vals []uint64) uint64 {
|
||||
sort.Slice(vals, func(i, j int) bool {
|
||||
return vals[i] < vals[j]
|
||||
})
|
||||
|
||||
const (
|
||||
lowerRank = 10
|
||||
upperRank = 90
|
||||
)
|
||||
|
||||
if len(vals) >= lowerRank {
|
||||
lowerInd := percentile(lowerRank, vals)
|
||||
upperInd := percentile(upperRank, vals)
|
||||
|
||||
vals = vals[lowerInd:upperInd]
|
||||
}
|
||||
|
||||
sum := uint64(0)
|
||||
|
||||
for i := range vals {
|
||||
sum += vals[i]
|
||||
}
|
||||
|
||||
return sum / uint64(len(vals))
|
||||
}
|
||||
|
||||
func percentile(rank int, vals []uint64) int {
|
||||
p := len(vals) * rank / 100
|
||||
return p
|
||||
}
|
|
@ -0,0 +1,61 @@
|
|||
package loadstorage
|
||||
|
||||
import (
|
||||
"crypto/sha256"
|
||||
"math/rand"
|
||||
"testing"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/pkg/container"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func randCID() *container.ID {
|
||||
h := [sha256.Size]byte{}
|
||||
|
||||
rand.Read(h[:])
|
||||
|
||||
id := container.NewID()
|
||||
id.SetSHA256(h)
|
||||
|
||||
return id
|
||||
}
|
||||
|
||||
func TestStorage(t *testing.T) {
|
||||
const epoch uint64 = 13
|
||||
|
||||
a := container.NewAnnouncement()
|
||||
a.SetContainerID(randCID())
|
||||
a.SetEpoch(epoch)
|
||||
|
||||
const opinionsNum = 100
|
||||
|
||||
s := New(Prm{})
|
||||
|
||||
opinions := make([]uint64, opinionsNum)
|
||||
for i := range opinions {
|
||||
opinions[i] = rand.Uint64()
|
||||
|
||||
a.SetUsedSpace(opinions[i])
|
||||
|
||||
require.NoError(t, s.Put(*a))
|
||||
}
|
||||
|
||||
iterCounter := 0
|
||||
|
||||
err := s.Iterate(
|
||||
func(ai container.UsedSpaceAnnouncement) bool {
|
||||
return ai.Epoch() == epoch
|
||||
},
|
||||
func(ai container.UsedSpaceAnnouncement) error {
|
||||
iterCounter++
|
||||
|
||||
require.Equal(t, epoch, ai.Epoch())
|
||||
require.Equal(t, a.ContainerID(), ai.ContainerID())
|
||||
require.Equal(t, finalEstimation(opinions), ai.UsedSpace())
|
||||
|
||||
return nil
|
||||
},
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, iterCounter)
|
||||
}
|
Loading…
Reference in a new issue