forked from TrueCloudLab/frostfs-node
[#1556] Upgrade NeoFS SDK Go with changed container API
Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
f699e82ea7
commit
c165d1a9b5
36 changed files with 207 additions and 480 deletions
|
@ -29,7 +29,7 @@ type announceContext struct {
|
|||
commonContext
|
||||
}
|
||||
|
||||
// Start starts the processing of UsedSpaceAnnouncement values.
|
||||
// Start starts the processing of container.SizeEstimation values.
|
||||
//
|
||||
// Single Start operation overtakes all data from LocalMetrics to
|
||||
// LocalAnnouncementTarget (Controller's parameters).
|
||||
|
@ -82,10 +82,10 @@ func (c *announceContext) announce() {
|
|||
|
||||
// iterate over all collected metrics and write them to the target
|
||||
err = metricsIterator.Iterate(
|
||||
func(container.UsedSpaceAnnouncement) bool {
|
||||
func(container.SizeEstimation) bool {
|
||||
return true // local metrics don't know about epochs
|
||||
},
|
||||
func(a container.UsedSpaceAnnouncement) error {
|
||||
func(a container.SizeEstimation) error {
|
||||
a.SetEpoch(c.epoch) // set epoch explicitly
|
||||
return targetWriter.Put(a)
|
||||
},
|
||||
|
@ -179,7 +179,7 @@ type stopContext struct {
|
|||
commonContext
|
||||
}
|
||||
|
||||
// Stop interrupts the processing of UsedSpaceAnnouncement values.
|
||||
// Stop interrupts the processing of container.SizeEstimation values.
|
||||
//
|
||||
// Single Stop operation releases an announcement context and overtakes
|
||||
// all data from AnnouncementAccumulator to ResultReceiver (Controller's
|
||||
|
|
|
@ -19,12 +19,12 @@ type testAnnouncementStorage struct {
|
|||
|
||||
mtx sync.RWMutex
|
||||
|
||||
m map[uint64][]container.UsedSpaceAnnouncement
|
||||
m map[uint64][]container.SizeEstimation
|
||||
}
|
||||
|
||||
func newTestStorage() *testAnnouncementStorage {
|
||||
return &testAnnouncementStorage{
|
||||
m: make(map[uint64][]container.UsedSpaceAnnouncement),
|
||||
m: make(map[uint64][]container.SizeEstimation),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -61,7 +61,7 @@ func (s *testAnnouncementStorage) InitWriter(context.Context) (loadcontroller.Wr
|
|||
return s, nil
|
||||
}
|
||||
|
||||
func (s *testAnnouncementStorage) Put(v container.UsedSpaceAnnouncement) error {
|
||||
func (s *testAnnouncementStorage) Put(v container.SizeEstimation) error {
|
||||
s.mtx.Lock()
|
||||
s.m[v.Epoch()] = append(s.m[v.Epoch()], v)
|
||||
s.mtx.Unlock()
|
||||
|
@ -73,12 +73,11 @@ func (s *testAnnouncementStorage) Close() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func randAnnouncement() container.UsedSpaceAnnouncement {
|
||||
a := container.NewAnnouncement()
|
||||
a.SetContainerID(cidtest.ID())
|
||||
a.SetUsedSpace(rand.Uint64())
|
||||
func randAnnouncement() (a container.SizeEstimation) {
|
||||
a.SetContainer(cidtest.ID())
|
||||
a.SetValue(rand.Uint64())
|
||||
|
||||
return *a
|
||||
return
|
||||
}
|
||||
|
||||
func TestSimpleScenario(t *testing.T) {
|
||||
|
@ -116,7 +115,7 @@ func TestSimpleScenario(t *testing.T) {
|
|||
const goodNum = 4
|
||||
|
||||
// create 2 random values for processing epoch and 1 for some different
|
||||
announces := make([]container.UsedSpaceAnnouncement, 0, goodNum)
|
||||
announces := make([]container.SizeEstimation, 0, goodNum)
|
||||
|
||||
for i := 0; i < goodNum; i++ {
|
||||
a := randAnnouncement()
|
||||
|
@ -174,13 +173,13 @@ func TestSimpleScenario(t *testing.T) {
|
|||
wg.Wait()
|
||||
|
||||
// result target should contain all "good" announcements and shoult not container the "bad" one
|
||||
var res []container.UsedSpaceAnnouncement
|
||||
var res []container.SizeEstimation
|
||||
|
||||
err := resultStorage.Iterate(
|
||||
func(a container.UsedSpaceAnnouncement) bool {
|
||||
func(a container.SizeEstimation) bool {
|
||||
return true
|
||||
},
|
||||
func(a container.UsedSpaceAnnouncement) error {
|
||||
func(a container.SizeEstimation) error {
|
||||
res = append(res, a)
|
||||
return nil
|
||||
},
|
||||
|
|
|
@ -7,22 +7,22 @@ import (
|
|||
"github.com/nspcc-dev/neofs-sdk-go/container"
|
||||
)
|
||||
|
||||
// UsedSpaceHandler describes the signature of the UsedSpaceAnnouncement
|
||||
// UsedSpaceHandler describes the signature of the container.SizeEstimation
|
||||
// value handling function.
|
||||
//
|
||||
// Termination of processing without failures is usually signaled
|
||||
// with a zero error, while a specific value may describe the reason
|
||||
// for failure.
|
||||
type UsedSpaceHandler func(container.UsedSpaceAnnouncement) error
|
||||
type UsedSpaceHandler func(container.SizeEstimation) error
|
||||
|
||||
// UsedSpaceFilter describes the signature of the function for
|
||||
// checking whether a value meets a certain criterion.
|
||||
//
|
||||
// Return of true means conformity, false - vice versa.
|
||||
type UsedSpaceFilter func(container.UsedSpaceAnnouncement) bool
|
||||
type UsedSpaceFilter func(container.SizeEstimation) bool
|
||||
|
||||
// Iterator is a group of methods provided by entity
|
||||
// which can iterate over a group of UsedSpaceAnnouncement values.
|
||||
// which can iterate over a group of container.SizeEstimation values.
|
||||
type Iterator interface {
|
||||
// Iterate must start an iterator over values that
|
||||
// meet the filter criterion (returns true).
|
||||
|
@ -37,7 +37,7 @@ type Iterator interface {
|
|||
|
||||
// IteratorProvider is a group of methods provided
|
||||
// by entity which generates iterators over
|
||||
// UsedSpaceAnnouncement values.
|
||||
// container.SizeEstimation values.
|
||||
type IteratorProvider interface {
|
||||
// InitIterator should return an initialized Iterator.
|
||||
//
|
||||
|
@ -49,12 +49,12 @@ type IteratorProvider interface {
|
|||
InitIterator(context.Context) (Iterator, error)
|
||||
}
|
||||
|
||||
// Writer describes the interface for storing UsedSpaceAnnouncement values.
|
||||
// Writer describes the interface for storing container.SizeEstimation values.
|
||||
//
|
||||
// This interface is provided by both local storage
|
||||
// of values and remote (wrappers over the RPC).
|
||||
type Writer interface {
|
||||
// Put performs a write operation of UsedSpaceAnnouncement value
|
||||
// Put performs a write operation of container.SizeEstimation value
|
||||
// and returns any error encountered.
|
||||
//
|
||||
// All values after the Close call must be flushed to the
|
||||
|
@ -62,7 +62,7 @@ type Writer interface {
|
|||
// Close operation.
|
||||
//
|
||||
// Put must not be called after Close.
|
||||
Put(container.UsedSpaceAnnouncement) error
|
||||
Put(container.SizeEstimation) error
|
||||
|
||||
// Close exits with method-providing Writer.
|
||||
//
|
||||
|
@ -75,7 +75,7 @@ type Writer interface {
|
|||
|
||||
// WriterProvider is a group of methods provided
|
||||
// by entity which generates keepers of
|
||||
// UsedSpaceAnnouncement values.
|
||||
// container.SizeEstimation values.
|
||||
type WriterProvider interface {
|
||||
// InitWriter should return an initialized Writer.
|
||||
//
|
||||
|
|
|
@ -7,7 +7,7 @@ import (
|
|||
)
|
||||
|
||||
func usedSpaceFilterEpochEQ(epoch uint64) UsedSpaceFilter {
|
||||
return func(a container.UsedSpaceAnnouncement) bool {
|
||||
return func(a container.SizeEstimation) bool {
|
||||
return a.Epoch() == epoch
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,6 @@ package loadroute
|
|||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"sync"
|
||||
|
||||
loadcontroller "github.com/nspcc-dev/neofs-node/pkg/services/container/announcement/load/controller"
|
||||
|
@ -71,7 +70,7 @@ type routeKey struct {
|
|||
type valuesRoute struct {
|
||||
route []ServerInfo
|
||||
|
||||
values []container.UsedSpaceAnnouncement
|
||||
values []container.SizeEstimation
|
||||
}
|
||||
|
||||
type loadWriter struct {
|
||||
|
@ -85,18 +84,13 @@ type loadWriter struct {
|
|||
mServers map[string]loadcontroller.Writer
|
||||
}
|
||||
|
||||
func (w *loadWriter) Put(a container.UsedSpaceAnnouncement) error {
|
||||
cnr, ok := a.ContainerID()
|
||||
if !ok {
|
||||
return errors.New("missing container in load announcement")
|
||||
}
|
||||
|
||||
func (w *loadWriter) Put(a container.SizeEstimation) error {
|
||||
w.routeMtx.Lock()
|
||||
defer w.routeMtx.Unlock()
|
||||
|
||||
key := routeKey{
|
||||
epoch: a.Epoch(),
|
||||
cid: cnr.EncodeToString(),
|
||||
cid: a.Container().EncodeToString(),
|
||||
}
|
||||
|
||||
routeValues, ok := w.mRoute[key]
|
||||
|
@ -110,7 +104,7 @@ func (w *loadWriter) Put(a container.UsedSpaceAnnouncement) error {
|
|||
|
||||
routeValues = &valuesRoute{
|
||||
route: route,
|
||||
values: []container.UsedSpaceAnnouncement{a},
|
||||
values: []container.SizeEstimation{a},
|
||||
}
|
||||
|
||||
w.mRoute[key] = routeValues
|
||||
|
|
|
@ -33,7 +33,7 @@ type Builder interface {
|
|||
// in that list (means that point is the last point in one of the route groups),
|
||||
// returned route must contain nil point that should be interpreted as signal to,
|
||||
// among sending to other route points, save the announcement in that point.
|
||||
NextStage(a container.UsedSpaceAnnouncement, passed []ServerInfo) ([]ServerInfo, error)
|
||||
NextStage(a container.SizeEstimation, passed []ServerInfo) ([]ServerInfo, error)
|
||||
}
|
||||
|
||||
// RemoteWriterProvider describes the component
|
||||
|
|
|
@ -2,7 +2,6 @@ package placementrouter
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
netmapcore "github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
||||
|
@ -16,15 +15,12 @@ import (
|
|||
// If passed route has more than one point, then endpoint of the route is reached.
|
||||
//
|
||||
// The traversed route is not checked, it is assumed to be correct.
|
||||
func (b *Builder) NextStage(a container.UsedSpaceAnnouncement, passed []loadroute.ServerInfo) ([]loadroute.ServerInfo, error) {
|
||||
func (b *Builder) NextStage(a container.SizeEstimation, passed []loadroute.ServerInfo) ([]loadroute.ServerInfo, error) {
|
||||
if len(passed) > 1 {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
cnr, ok := a.ContainerID()
|
||||
if !ok {
|
||||
return nil, errors.New("missing container in load announcement")
|
||||
}
|
||||
cnr := a.Container()
|
||||
|
||||
placement, err := b.placementBuilder.BuildPlacement(a.Epoch(), cnr)
|
||||
if err != nil {
|
||||
|
|
|
@ -12,7 +12,7 @@ var errWrongRoute = errors.New("wrong route")
|
|||
// CheckRoute checks if the route is a route correctly constructed by the builder for value a.
|
||||
//
|
||||
// Returns nil if route is correct, otherwise an error clarifying the inconsistency.
|
||||
func CheckRoute(builder Builder, a container.UsedSpaceAnnouncement, route []ServerInfo) error {
|
||||
func CheckRoute(builder Builder, a container.SizeEstimation, route []ServerInfo) error {
|
||||
for i := 1; i < len(route); i++ {
|
||||
servers, err := builder.NextStage(a, route[:i])
|
||||
if err != nil {
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package loadstorage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
|
@ -10,7 +9,7 @@ import (
|
|||
)
|
||||
|
||||
type usedSpaceEstimations struct {
|
||||
announcement container.UsedSpaceAnnouncement
|
||||
announcement container.SizeEstimation
|
||||
|
||||
sizes []uint64
|
||||
}
|
||||
|
@ -22,7 +21,7 @@ type storageKey struct {
|
|||
}
|
||||
|
||||
// Storage represents in-memory storage of
|
||||
// UsedSpaceAnnouncement values.
|
||||
// container.SizeEstimation values.
|
||||
//
|
||||
// The write operation has the usual behavior - to save
|
||||
// the next number of used container space for a specific epoch.
|
||||
|
@ -63,18 +62,13 @@ func New(_ Prm) *Storage {
|
|||
// to the list of already saved values.
|
||||
//
|
||||
// Always returns nil error.
|
||||
func (s *Storage) Put(a container.UsedSpaceAnnouncement) error {
|
||||
cnr, ok := a.ContainerID()
|
||||
if !ok {
|
||||
return errors.New("missing container in load announcement")
|
||||
}
|
||||
|
||||
func (s *Storage) Put(a container.SizeEstimation) error {
|
||||
s.mtx.Lock()
|
||||
|
||||
{
|
||||
key := storageKey{
|
||||
epoch: a.Epoch(),
|
||||
cid: cnr.EncodeToString(),
|
||||
cid: a.Container().EncodeToString(),
|
||||
}
|
||||
|
||||
estimations, ok := s.mItems[key]
|
||||
|
@ -87,7 +81,7 @@ func (s *Storage) Put(a container.UsedSpaceAnnouncement) error {
|
|||
s.mItems[key] = estimations
|
||||
}
|
||||
|
||||
estimations.sizes = append(estimations.sizes, a.UsedSpace())
|
||||
estimations.sizes = append(estimations.sizes, a.Value())
|
||||
}
|
||||
|
||||
s.mtx.Unlock()
|
||||
|
@ -110,7 +104,7 @@ func (s *Storage) Iterate(f loadcontroller.UsedSpaceFilter, h loadcontroller.Use
|
|||
for _, v := range s.mItems {
|
||||
if f(v.announcement) {
|
||||
// calculate estimation based on 90th percentile
|
||||
v.announcement.SetUsedSpace(finalEstimation(v.sizes))
|
||||
v.announcement.SetValue(finalEstimation(v.sizes))
|
||||
|
||||
if err = h(v.announcement); err != nil {
|
||||
break
|
||||
|
|
|
@ -12,8 +12,8 @@ import (
|
|||
func TestStorage(t *testing.T) {
|
||||
const epoch uint64 = 13
|
||||
|
||||
a := container.NewAnnouncement()
|
||||
a.SetContainerID(cidtest.ID())
|
||||
var a container.SizeEstimation
|
||||
a.SetContainer(cidtest.ID())
|
||||
a.SetEpoch(epoch)
|
||||
|
||||
const opinionsNum = 100
|
||||
|
@ -24,25 +24,23 @@ func TestStorage(t *testing.T) {
|
|||
for i := range opinions {
|
||||
opinions[i] = rand.Uint64()
|
||||
|
||||
a.SetUsedSpace(opinions[i])
|
||||
a.SetValue(opinions[i])
|
||||
|
||||
require.NoError(t, s.Put(*a))
|
||||
require.NoError(t, s.Put(a))
|
||||
}
|
||||
|
||||
iterCounter := 0
|
||||
|
||||
err := s.Iterate(
|
||||
func(ai container.UsedSpaceAnnouncement) bool {
|
||||
func(ai container.SizeEstimation) bool {
|
||||
return ai.Epoch() == epoch
|
||||
},
|
||||
func(ai container.UsedSpaceAnnouncement) error {
|
||||
func(ai container.SizeEstimation) error {
|
||||
iterCounter++
|
||||
|
||||
require.Equal(t, epoch, ai.Epoch())
|
||||
cnr1, _ := a.ContainerID()
|
||||
cnr2, _ := ai.ContainerID()
|
||||
require.Equal(t, cnr1, cnr2)
|
||||
require.Equal(t, finalEstimation(opinions), ai.UsedSpace())
|
||||
require.Equal(t, a.Container(), ai.Container())
|
||||
require.Equal(t, finalEstimation(opinions), ai.Value())
|
||||
|
||||
return nil
|
||||
},
|
||||
|
|
|
@ -11,7 +11,6 @@ import (
|
|||
containercore "github.com/nspcc-dev/neofs-node/pkg/core/container"
|
||||
containerSvc "github.com/nspcc-dev/neofs-node/pkg/services/container"
|
||||
"github.com/nspcc-dev/neofs-node/pkg/services/object/acl/eacl"
|
||||
containerSDK "github.com/nspcc-dev/neofs-sdk-go/container"
|
||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
eaclSDK "github.com/nspcc-dev/neofs-sdk-go/eacl"
|
||||
"github.com/nspcc-dev/neofs-sdk-go/session"
|
||||
|
@ -58,8 +57,16 @@ func (s *morphExecutor) Put(_ context.Context, tokV2 *sessionV2.Token, body *con
|
|||
return nil, errors.New("missing signature")
|
||||
}
|
||||
|
||||
cnr := containercore.Container{
|
||||
Value: containerSDK.NewContainerFromV2(body.GetContainer()),
|
||||
cnrV2 := body.GetContainer()
|
||||
if cnrV2 == nil {
|
||||
return nil, errors.New("missing container field")
|
||||
}
|
||||
|
||||
var cnr containercore.Container
|
||||
|
||||
err := cnr.Value.ReadFromV2(*cnrV2)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid container: %w", err)
|
||||
}
|
||||
|
||||
cnr.Signature.ReadFromV2(*sigV2)
|
||||
|
@ -158,8 +165,11 @@ func (s *morphExecutor) Get(ctx context.Context, body *container.GetRequestBody)
|
|||
cnr.Session.WriteToV2(tokV2)
|
||||
}
|
||||
|
||||
var cnrV2 container.Container
|
||||
cnr.Value.WriteToV2(&cnrV2)
|
||||
|
||||
res := new(container.GetResponseBody)
|
||||
res.SetContainer(cnr.Value.ToV2())
|
||||
res.SetContainer(&cnrV2)
|
||||
res.SetSignature(sigV2)
|
||||
res.SetSessionToken(tokV2)
|
||||
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
containerSvcMorph "github.com/nspcc-dev/neofs-node/pkg/services/container/morph"
|
||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||
containertest "github.com/nspcc-dev/neofs-sdk-go/container/test"
|
||||
sessiontest "github.com/nspcc-dev/neofs-sdk-go/session/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
@ -54,6 +55,13 @@ func TestInvalidToken(t *testing.T) {
|
|||
var reqBody container.PutRequestBody
|
||||
reqBody.SetSignature(new(refs.Signature))
|
||||
|
||||
cnr := containertest.Container()
|
||||
|
||||
var cnrV2 container.Container
|
||||
cnr.WriteToV2(&cnrV2)
|
||||
|
||||
reqBody.SetContainer(&cnrV2)
|
||||
|
||||
_, err = e.Put(context.TODO(), tokV2, &reqBody)
|
||||
return
|
||||
},
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue