frostfs-node/pkg/services/policer/policer_test.go

386 lines
11 KiB
Go
Raw Normal View History

package policer
import (
"bytes"
"context"
"errors"
"sort"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
"github.com/panjf2000/ants/v2"
"github.com/stretchr/testify/require"
)
func TestBuryObjectWithoutContainer(t *testing.T) {
// Key space
addr := oidtest.Address()
objs := []objectcore.AddressWithType{
{
Address: addr,
Type: objectSDK.TypeRegular,
},
}
// Container source and bury function
buryCh := make(chan oid.Address)
containerSrc := func(id cid.ID) (*container.Container, error) {
return nil, apistatus.ContainerNotFound{}
}
buryFn := func(ctx context.Context, a oid.Address) error {
buryCh <- a
return nil
}
// Task pool
pool, err := ants.NewPool(4)
require.NoError(t, err)
// Policer instance
p := New(
WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}),
WithContainerSource(containerSrcFunc(containerSrc)),
WithBuryFunc(buryFn),
WithPool(pool),
WithNodeLoader(constNodeLoader(0)),
)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go p.Run(ctx)
require.Equal(t, addr, <-buryCh)
}
func TestProcessObject(t *testing.T) {
// Notes:
// - nodes are referred to by their index throughout, which is embedded in the public key
// - node with index 0 always refers to the local node, so there's no need to add it to objHolders
// - policy is used only to match the number of replicas for each index in the placement
tests := []struct {
desc string
objType objectSDK.Type
nodeCount int
policy string
placement [][]int
objHolders []int
maintenanceNodes []int
wantRemoveRedundant bool
wantReplicateTo []int
}{
{
desc: "1 copy already held by local node",
nodeCount: 1,
policy: `REP 1`,
placement: [][]int{{0}},
},
{
desc: "1 copy already held by the remote node",
nodeCount: 2,
policy: `REP 1`,
placement: [][]int{{1}},
objHolders: []int{1},
wantRemoveRedundant: true,
},
{
desc: "1 copy not yet held by the remote node",
nodeCount: 2,
policy: `REP 1`,
placement: [][]int{{1}},
wantReplicateTo: []int{1},
},
{
desc: "2 copies already held by local and remote node",
nodeCount: 2,
policy: `REP 2`,
placement: [][]int{{0, 1}},
objHolders: []int{1},
},
{
desc: "2 copies but not held by remote node",
nodeCount: 2,
policy: `REP 2`,
placement: [][]int{{0, 1}},
wantReplicateTo: []int{1},
},
{
desc: "multiple vectors already held by remote node",
nodeCount: 2,
policy: `REP 2 REP 2`,
placement: [][]int{{0, 1}, {0, 1}},
objHolders: []int{1},
},
{
desc: "multiple vectors not yet held by remote node",
nodeCount: 2,
policy: `REP 2 REP 2`,
placement: [][]int{{0, 1}, {0, 1}},
wantReplicateTo: []int{1, 1}, // is this actually good?
},
{
desc: "lock object must be replicated to all nodes",
objType: objectSDK.TypeLock,
nodeCount: 3,
policy: `REP 1`,
placement: [][]int{{0, 1, 2}},
wantReplicateTo: []int{1, 2},
},
{
desc: "preserve local copy when maintenance nodes exist",
nodeCount: 3,
policy: `REP 2`,
placement: [][]int{{1, 2}},
objHolders: []int{1},
maintenanceNodes: []int{2},
},
}
for i := range tests {
ti := tests[i]
t.Run(ti.desc, func(t *testing.T) {
addr := oidtest.Address()
// Netmap, placement policy and placement builder
nodes := make([]netmap.NodeInfo, ti.nodeCount)
for i := range nodes {
nodes[i].SetPublicKey([]byte{byte(i)})
}
for _, i := range ti.maintenanceNodes {
nodes[i].SetMaintenance()
}
var policy netmap.PlacementPolicy
require.NoError(t, policy.DecodeString(ti.policy))
placementVectors := make([][]netmap.NodeInfo, len(ti.placement))
for i, pv := range ti.placement {
for _, nj := range pv {
placementVectors[i] = append(placementVectors[i], nodes[nj])
}
}
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
if cnr.Equals(addr.Container()) && obj != nil && obj.Equals(addr.Object()) {
return placementVectors, nil
}
t.Errorf("unexpected placement build: cid=%v oid=%v", cnr, obj)
return nil, errors.New("unexpected placement build")
}
// Object remote header
headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
index := int(ni.PublicKey()[0])
if a != addr || index < 1 || index >= ti.nodeCount {
t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a)
return nil, errors.New("unexpected object head")
}
for _, i := range ti.objHolders {
if index == i {
return nil, nil
}
}
return nil, apistatus.ObjectNotFound{}
}
// Container source
cnr := &container.Container{}
cnr.Value.Init()
cnr.Value.SetPlacementPolicy(policy)
containerSrc := func(id cid.ID) (*container.Container, error) {
if id.Equals(addr.Container()) {
return cnr, nil
}
t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container())
return nil, apistatus.ContainerNotFound{}
}
buryFn := func(ctx context.Context, a oid.Address) error {
t.Errorf("unexpected object buried: %v", a)
return nil
}
// Policer instance
var gotRemoveRedundant bool
var gotReplicateTo []int
p := New(
WithContainerSource(containerSrcFunc(containerSrc)),
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
return bytes.Equal(k, nodes[0].PublicKey())
})),
WithRemoteObjectHeaderFunc(headFn),
WithBuryFunc(buryFn),
WithRedundantCopyCallback(func(_ context.Context, a oid.Address) {
require.True(t, a.Equals(addr), "unexpected redundant copy callback: a=%v", a)
gotRemoveRedundant = true
}),
WithReplicator(replicatorFunc(func(_ context.Context, task replicator.Task, res replicator.TaskResult) {
require.True(t, task.Addr.Equals(addr), "unexpected replicator task: %+v", task)
for _, node := range task.Nodes {
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
}
})),
)
addrWithType := objectcore.AddressWithType{
Address: addr,
Type: ti.objType,
}
p.processObject(context.Background(), addrWithType)
sort.Ints(gotReplicateTo)
require.Equal(t, ti.wantRemoveRedundant, gotRemoveRedundant)
require.Equal(t, ti.wantReplicateTo, gotReplicateTo)
})
}
}
func TestIteratorContract(t *testing.T) {
addr := oidtest.Address()
objs := []objectcore.AddressWithType{{
Address: addr,
Type: objectSDK.TypeRegular,
}}
containerSrc := func(id cid.ID) (*container.Container, error) {
return nil, apistatus.ContainerNotFound{}
}
buryFn := func(ctx context.Context, a oid.Address) error {
return nil
}
pool, err := ants.NewPool(4)
require.NoError(t, err)
it := &predefinedIterator{
scenario: []nextResult{
{objs, nil},
{nil, errors.New("opaque")},
{nil, engine.ErrEndOfListing},
{nil, engine.ErrEndOfListing},
{nil, errors.New("opaque")},
{objs, engine.ErrEndOfListing},
},
finishCh: make(chan struct{}),
}
p := New(
WithKeySpaceIterator(it),
WithContainerSource(containerSrcFunc(containerSrc)),
WithBuryFunc(buryFn),
WithPool(pool),
WithNodeLoader(constNodeLoader(0)),
func(c *cfg) {
c.sleepDuration = time.Millisecond
},
)
ctx, cancel := context.WithCancel(context.Background())
go p.Run(ctx)
<-it.finishCh
cancel()
require.Equal(t, []string{
"Next",
"Next",
"Next",
"Rewind",
"Next",
"Rewind",
"Next",
"Next",
"Rewind",
}, it.calls)
}
type nextResult struct {
objs []objectcore.AddressWithType
err error
}
type predefinedIterator struct {
scenario []nextResult
finishCh chan struct{}
pos int
calls []string
}
func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.AddressWithType, error) {
if it.pos == len(it.scenario) {
close(it.finishCh)
<-ctx.Done()
return nil, nil
}
res := it.scenario[it.pos]
it.pos += 1
it.calls = append(it.calls, "Next")
return res.objs, res.err
}
func (it *predefinedIterator) Rewind() {
it.calls = append(it.calls, "Rewind")
}
// sliceKeySpaceIterator is a KeySpaceIterator backed by a slice.
type sliceKeySpaceIterator struct {
objs []objectcore.AddressWithType
cur int
}
func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.AddressWithType, error) {
if it.cur >= len(it.objs) {
return nil, engine.ErrEndOfListing
}
end := it.cur + int(size)
if end > len(it.objs) {
end = len(it.objs)
}
ret := it.objs[it.cur:end]
it.cur = end
return ret, nil
}
func (it *sliceKeySpaceIterator) Rewind() {
it.cur = 0
}
// containerSrcFunc is a container.Source backed by a function.
type containerSrcFunc func(cid.ID) (*container.Container, error)
func (f containerSrcFunc) Get(id cid.ID) (*container.Container, error) { return f(id) }
// placementBuilderFunc is a placement.Builder backed by a function
type placementBuilderFunc func(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
func (f placementBuilderFunc) BuildPlacement(c cid.ID, o *oid.ID, p netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
return f(c, o, p)
}
// announcedKeysFunc is a netmap.AnnouncedKeys backed by a function.
type announcedKeysFunc func([]byte) bool
func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) }
// constNodeLoader is a nodeLoader that always returns a fixed value.
type constNodeLoader float64
func (f constNodeLoader) ObjectServiceLoad() float64 { return float64(f) }
// replicatorFunc is a Replicator backed by a function.
type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult)
func (f replicatorFunc) HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) {
f(ctx, task, res)
}