diff --git a/pkg/services/container/announcement/load/storage/storage.go b/pkg/services/container/announcement/load/storage/storage.go new file mode 100644 index 00000000..f41af236 --- /dev/null +++ b/pkg/services/container/announcement/load/storage/storage.go @@ -0,0 +1,141 @@ +package loadstorage + +import ( + "sort" + "sync" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" + loadcontroller "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/controller" +) + +type usedSpaceEstimations struct { + announcement container.UsedSpaceAnnouncement + + sizes []uint64 +} + +// Storage represents in-memory storage of +// UsedSpaceAnnouncement values. +// +// The write operation has the usual behavior - to save +// the next number of used container space for a specific epoch. +// All values related to one key (epoch, container ID) are stored +// as a list. +// +// Storage also provides an iterator interface, into the handler +// of which the final score is passed, built on all values saved +// at the time of the call. Currently the only possible estimation +// formula is used - the average between 10th and 90th percentile. +// +// For correct operation, Storage must be created +// using the constructor (New) based on the required parameters +// and optional components. After successful creation, +// Storage is immediately ready to work through API. +type Storage struct { + mtx sync.RWMutex + + mItems map[uint64]*usedSpaceEstimations +} + +// Prm groups the required parameters of the Storage's constructor. +// +// The component is not parameterizable at the moment. +type Prm struct{} + +// New creates a new instance of the Storage. +// +// The created Storage does not require additional +// initialization and is completely ready for work. +func New(_ Prm) *Storage { + return &Storage{ + mItems: make(map[uint64]*usedSpaceEstimations), + } +} + +// Put appends the next value of the occupied container space for the epoch +// to the list of already saved values. +// +// Always returns nil error. +func (s *Storage) Put(a container.UsedSpaceAnnouncement) error { + s.mtx.Lock() + + { + key := a.Epoch() + + estimations, ok := s.mItems[key] + if !ok { + estimations = &usedSpaceEstimations{ + announcement: a, + sizes: make([]uint64, 0, 1), + } + + s.mItems[key] = estimations + } + + estimations.sizes = append(estimations.sizes, a.UsedSpace()) + } + + s.mtx.Unlock() + + return nil +} + +func (s *Storage) Close() error { + return nil +} + +// Iterate goes through all the lists with the key (container ID, epoch), +// calculates the final grade for all values, and passes it to the handler. +// +// Final grade is the average between 10th and 90th percentiles. +func (s *Storage) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.UsedSpaceHandler) (err error) { + s.mtx.RLock() + + { + for _, v := range s.mItems { + if f(v.announcement) { + // calculate estimation based on 90th percentile + v.announcement.SetUsedSpace(finalEstimation(v.sizes)) + + if err = h(v.announcement); err != nil { + break + } + } + } + } + + s.mtx.RUnlock() + + return +} + +func finalEstimation(vals []uint64) uint64 { + sort.Slice(vals, func(i, j int) bool { + return vals[i] < vals[j] + }) + + const ( + lowerRank = 10 + upperRank = 90 + ) + + if len(vals) >= lowerRank { + lowerInd := percentile(lowerRank, vals) + upperInd := percentile(upperRank, vals) + + vals = vals[lowerInd:upperInd] + } + + sum := uint64(0) + + for i := range vals { + sum += vals[i] + } + + return sum / uint64(len(vals)) +} + +func percentile(rank int, vals []uint64) int { + p := len(vals) * rank / 100 + return p +} diff --git a/pkg/services/container/announcement/load/storage/storage_test.go b/pkg/services/container/announcement/load/storage/storage_test.go new file mode 100644 index 00000000..fa54cdcd --- /dev/null +++ b/pkg/services/container/announcement/load/storage/storage_test.go @@ -0,0 +1,61 @@ +package loadstorage + +import ( + "crypto/sha256" + "math/rand" + "testing" + + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/stretchr/testify/require" +) + +func randCID() *container.ID { + h := [sha256.Size]byte{} + + rand.Read(h[:]) + + id := container.NewID() + id.SetSHA256(h) + + return id +} + +func TestStorage(t *testing.T) { + const epoch uint64 = 13 + + a := container.NewAnnouncement() + a.SetContainerID(randCID()) + a.SetEpoch(epoch) + + const opinionsNum = 100 + + s := New(Prm{}) + + opinions := make([]uint64, opinionsNum) + for i := range opinions { + opinions[i] = rand.Uint64() + + a.SetUsedSpace(opinions[i]) + + require.NoError(t, s.Put(*a)) + } + + iterCounter := 0 + + err := s.Iterate( + func(ai container.UsedSpaceAnnouncement) bool { + return ai.Epoch() == epoch + }, + func(ai container.UsedSpaceAnnouncement) error { + iterCounter++ + + require.Equal(t, epoch, ai.Epoch()) + require.Equal(t, a.ContainerID(), ai.ContainerID()) + require.Equal(t, finalEstimation(opinions), ai.UsedSpace()) + + return nil + }, + ) + require.NoError(t, err) + require.Equal(t, 1, iterCounter) +}