Evgenii Stratonikov
140d970a95
Some checks failed
ci/woodpecker/pr/pre-commit Pipeline failed
Build / Build Components (1.19) (pull_request) Successful in 4m23s
Tests and linters / Tests (1.19) (pull_request) Successful in 3m27s
Tests and linters / Lint (pull_request) Successful in 4m11s
Tests and linters / Tests with -race (pull_request) Successful in 5m36s
Build / Build Components (1.20) (pull_request) Successful in 10m18s
Tests and linters / Staticcheck (pull_request) Successful in 5m41s
Tests and linters / Tests (1.20) (pull_request) Successful in 11m33s
ci/woodpecker/push/pre-commit Pipeline was successful
Signed-off-by: Evgenii Stratonikov <e.stratonikov@yadro.com>
390 lines
11 KiB
Go
390 lines
11 KiB
Go
package policer
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"errors"
|
|
"sort"
|
|
"testing"
|
|
"time"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
|
objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator"
|
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
|
"github.com/panjf2000/ants/v2"
|
|
"github.com/stretchr/testify/require"
|
|
)
|
|
|
|
func TestBuryObjectWithoutContainer(t *testing.T) {
|
|
// Key space
|
|
addr := oidtest.Address()
|
|
objs := []objectcore.AddressWithType{
|
|
{
|
|
Address: addr,
|
|
Type: objectSDK.TypeRegular,
|
|
},
|
|
}
|
|
|
|
// Container source and bury function
|
|
buryCh := make(chan oid.Address)
|
|
containerSrc := func(id cid.ID) (*container.Container, error) {
|
|
return nil, apistatus.ContainerNotFound{}
|
|
}
|
|
buryFn := func(ctx context.Context, a oid.Address) error {
|
|
buryCh <- a
|
|
return nil
|
|
}
|
|
|
|
// Task pool
|
|
pool, err := ants.NewPool(4)
|
|
require.NoError(t, err)
|
|
|
|
// Policer instance
|
|
p := New(
|
|
WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}),
|
|
WithContainerSource(containerSrcFunc(containerSrc)),
|
|
WithBuryFunc(buryFn),
|
|
WithPool(pool),
|
|
WithNodeLoader(constNodeLoader(0)),
|
|
)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
go p.Run(ctx)
|
|
|
|
require.Equal(t, addr, <-buryCh)
|
|
}
|
|
|
|
func TestProcessObject(t *testing.T) {
|
|
// Notes:
|
|
// - nodes are referred to by their index throughout, which is embedded in the public key
|
|
// - node with index 0 always refers to the local node, so there's no need to add it to objHolders
|
|
// - policy is used only to match the number of replicas for each index in the placement
|
|
tests := []struct {
|
|
desc string
|
|
objType objectSDK.Type
|
|
nodeCount int
|
|
policy string
|
|
placement [][]int
|
|
objHolders []int
|
|
maintenanceNodes []int
|
|
wantRemoveRedundant bool
|
|
wantReplicateTo []int
|
|
}{
|
|
{
|
|
desc: "1 copy already held by local node",
|
|
nodeCount: 1,
|
|
policy: `REP 1`,
|
|
placement: [][]int{{0}},
|
|
},
|
|
{
|
|
desc: "1 copy already held by the remote node",
|
|
nodeCount: 2,
|
|
policy: `REP 1`,
|
|
placement: [][]int{{1}},
|
|
objHolders: []int{1},
|
|
wantRemoveRedundant: true,
|
|
},
|
|
{
|
|
desc: "1 copy not yet held by the remote node",
|
|
nodeCount: 2,
|
|
policy: `REP 1`,
|
|
placement: [][]int{{1}},
|
|
wantReplicateTo: []int{1},
|
|
},
|
|
{
|
|
desc: "2 copies already held by local and remote node",
|
|
nodeCount: 2,
|
|
policy: `REP 2`,
|
|
placement: [][]int{{0, 1}},
|
|
objHolders: []int{1},
|
|
},
|
|
{
|
|
desc: "2 copies but not held by remote node",
|
|
nodeCount: 2,
|
|
policy: `REP 2`,
|
|
placement: [][]int{{0, 1}},
|
|
wantReplicateTo: []int{1},
|
|
},
|
|
{
|
|
desc: "multiple vectors already held by remote node",
|
|
nodeCount: 2,
|
|
policy: `REP 2 REP 2`,
|
|
placement: [][]int{{0, 1}, {0, 1}},
|
|
objHolders: []int{1},
|
|
},
|
|
{
|
|
desc: "multiple vectors not yet held by remote node",
|
|
nodeCount: 2,
|
|
policy: `REP 2 REP 2`,
|
|
placement: [][]int{{0, 1}, {0, 1}},
|
|
wantReplicateTo: []int{1, 1}, // is this actually good?
|
|
},
|
|
{
|
|
desc: "lock object must be replicated to all nodes",
|
|
objType: objectSDK.TypeLock,
|
|
nodeCount: 3,
|
|
policy: `REP 1`,
|
|
placement: [][]int{{0, 1, 2}},
|
|
wantReplicateTo: []int{1, 2},
|
|
},
|
|
{
|
|
desc: "preserve local copy when maintenance nodes exist",
|
|
nodeCount: 3,
|
|
policy: `REP 2`,
|
|
placement: [][]int{{1, 2}},
|
|
objHolders: []int{1},
|
|
maintenanceNodes: []int{2},
|
|
},
|
|
}
|
|
|
|
for i := range tests {
|
|
ti := tests[i]
|
|
t.Run(ti.desc, func(t *testing.T) {
|
|
addr := oidtest.Address()
|
|
|
|
// Netmap, placement policy and placement builder
|
|
nodes := make([]netmap.NodeInfo, ti.nodeCount)
|
|
for i := range nodes {
|
|
nodes[i].SetPublicKey([]byte{byte(i)})
|
|
}
|
|
for _, i := range ti.maintenanceNodes {
|
|
nodes[i].SetMaintenance()
|
|
}
|
|
|
|
var policy netmap.PlacementPolicy
|
|
require.NoError(t, policy.DecodeString(ti.policy))
|
|
|
|
placementVectors := make([][]netmap.NodeInfo, len(ti.placement))
|
|
for i, pv := range ti.placement {
|
|
for _, nj := range pv {
|
|
placementVectors[i] = append(placementVectors[i], nodes[nj])
|
|
}
|
|
}
|
|
placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
|
|
if cnr.Equals(addr.Container()) && obj != nil && obj.Equals(addr.Object()) {
|
|
return placementVectors, nil
|
|
}
|
|
t.Errorf("unexpected placement build: cid=%v oid=%v", cnr, obj)
|
|
return nil, errors.New("unexpected placement build")
|
|
}
|
|
|
|
// Object remote header
|
|
headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address) (*objectSDK.Object, error) {
|
|
index := int(ni.PublicKey()[0])
|
|
if a != addr || index < 1 || index >= ti.nodeCount {
|
|
t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a)
|
|
return nil, errors.New("unexpected object head")
|
|
}
|
|
for _, i := range ti.objHolders {
|
|
if index == i {
|
|
return nil, nil
|
|
}
|
|
}
|
|
return nil, apistatus.ObjectNotFound{}
|
|
}
|
|
|
|
// Container source
|
|
cnr := &container.Container{}
|
|
cnr.Value.Init()
|
|
cnr.Value.SetPlacementPolicy(policy)
|
|
containerSrc := func(id cid.ID) (*container.Container, error) {
|
|
if id.Equals(addr.Container()) {
|
|
return cnr, nil
|
|
}
|
|
t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container())
|
|
return nil, apistatus.ContainerNotFound{}
|
|
}
|
|
buryFn := func(ctx context.Context, a oid.Address) error {
|
|
t.Errorf("unexpected object buried: %v", a)
|
|
return nil
|
|
}
|
|
|
|
// Policer instance
|
|
var gotRemoveRedundant bool
|
|
var gotReplicateTo []int
|
|
|
|
p := New(
|
|
WithContainerSource(containerSrcFunc(containerSrc)),
|
|
WithPlacementBuilder(placementBuilderFunc(placementBuilder)),
|
|
WithNetmapKeys(announcedKeysFunc(func(k []byte) bool {
|
|
return bytes.Equal(k, nodes[0].PublicKey())
|
|
})),
|
|
WithRemoteObjectHeaderFunc(headFn),
|
|
WithBuryFunc(buryFn),
|
|
WithRedundantCopyCallback(func(_ context.Context, a oid.Address) {
|
|
require.True(t, eqAddr(a, addr), "unexpected redundant copy callback: a=%v", a)
|
|
gotRemoveRedundant = true
|
|
}),
|
|
WithReplicator(replicatorFunc(func(_ context.Context, task replicator.Task, res replicator.TaskResult) {
|
|
require.True(t, eqAddr(task.Addr, addr), "unexpected replicator task: %+v", task)
|
|
for _, node := range task.Nodes {
|
|
gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0]))
|
|
}
|
|
})),
|
|
)
|
|
|
|
addrWithType := objectcore.AddressWithType{
|
|
Address: addr,
|
|
Type: ti.objType,
|
|
}
|
|
|
|
p.processObject(context.Background(), addrWithType)
|
|
sort.Ints(gotReplicateTo)
|
|
|
|
require.Equal(t, ti.wantRemoveRedundant, gotRemoveRedundant)
|
|
require.Equal(t, ti.wantReplicateTo, gotReplicateTo)
|
|
})
|
|
}
|
|
}
|
|
|
|
func TestIteratorContract(t *testing.T) {
|
|
addr := oidtest.Address()
|
|
objs := []objectcore.AddressWithType{{
|
|
Address: addr,
|
|
Type: objectSDK.TypeRegular,
|
|
}}
|
|
|
|
containerSrc := func(id cid.ID) (*container.Container, error) {
|
|
return nil, apistatus.ContainerNotFound{}
|
|
}
|
|
buryFn := func(ctx context.Context, a oid.Address) error {
|
|
return nil
|
|
}
|
|
|
|
pool, err := ants.NewPool(4)
|
|
require.NoError(t, err)
|
|
|
|
it := &predefinedIterator{
|
|
scenario: []nextResult{
|
|
{objs, nil},
|
|
{nil, errors.New("opaque")},
|
|
{nil, engine.ErrEndOfListing},
|
|
{nil, engine.ErrEndOfListing},
|
|
{nil, errors.New("opaque")},
|
|
{objs, engine.ErrEndOfListing},
|
|
},
|
|
finishCh: make(chan struct{}),
|
|
}
|
|
|
|
p := New(
|
|
WithKeySpaceIterator(it),
|
|
WithContainerSource(containerSrcFunc(containerSrc)),
|
|
WithBuryFunc(buryFn),
|
|
WithPool(pool),
|
|
WithNodeLoader(constNodeLoader(0)),
|
|
func(c *cfg) {
|
|
c.sleepDuration = time.Millisecond
|
|
},
|
|
)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
go p.Run(ctx)
|
|
|
|
<-it.finishCh
|
|
cancel()
|
|
require.Equal(t, []string{
|
|
"Next",
|
|
"Next",
|
|
"Next",
|
|
"Rewind",
|
|
"Next",
|
|
"Rewind",
|
|
"Next",
|
|
"Next",
|
|
"Rewind",
|
|
}, it.calls)
|
|
}
|
|
|
|
// TODO(https://git.frostfs.info/TrueCloudLab/frostfs-sdk-go/issues/101)
|
|
func eqAddr(a, b oid.Address) bool {
|
|
return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object())
|
|
}
|
|
|
|
type nextResult struct {
|
|
objs []objectcore.AddressWithType
|
|
err error
|
|
}
|
|
|
|
type predefinedIterator struct {
|
|
scenario []nextResult
|
|
finishCh chan struct{}
|
|
pos int
|
|
calls []string
|
|
}
|
|
|
|
func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.AddressWithType, error) {
|
|
if it.pos == len(it.scenario) {
|
|
close(it.finishCh)
|
|
<-ctx.Done()
|
|
return nil, nil
|
|
}
|
|
|
|
res := it.scenario[it.pos]
|
|
it.pos += 1
|
|
it.calls = append(it.calls, "Next")
|
|
return res.objs, res.err
|
|
}
|
|
|
|
func (it *predefinedIterator) Rewind() {
|
|
it.calls = append(it.calls, "Rewind")
|
|
}
|
|
|
|
// sliceKeySpaceIterator is a KeySpaceIterator backed by a slice.
|
|
type sliceKeySpaceIterator struct {
|
|
objs []objectcore.AddressWithType
|
|
cur int
|
|
}
|
|
|
|
func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.AddressWithType, error) {
|
|
if it.cur >= len(it.objs) {
|
|
return nil, engine.ErrEndOfListing
|
|
}
|
|
end := it.cur + int(size)
|
|
if end > len(it.objs) {
|
|
end = len(it.objs)
|
|
}
|
|
ret := it.objs[it.cur:end]
|
|
it.cur = end
|
|
return ret, nil
|
|
}
|
|
|
|
func (it *sliceKeySpaceIterator) Rewind() {
|
|
it.cur = 0
|
|
}
|
|
|
|
// containerSrcFunc is a container.Source backed by a function.
|
|
type containerSrcFunc func(cid.ID) (*container.Container, error)
|
|
|
|
func (f containerSrcFunc) Get(id cid.ID) (*container.Container, error) { return f(id) }
|
|
|
|
// placementBuilderFunc is a placement.Builder backed by a function
|
|
type placementBuilderFunc func(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error)
|
|
|
|
func (f placementBuilderFunc) BuildPlacement(c cid.ID, o *oid.ID, p netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) {
|
|
return f(c, o, p)
|
|
}
|
|
|
|
// announcedKeysFunc is a netmap.AnnouncedKeys backed by a function.
|
|
type announcedKeysFunc func([]byte) bool
|
|
|
|
func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) }
|
|
|
|
// constNodeLoader is a nodeLoader that always returns a fixed value.
|
|
type constNodeLoader float64
|
|
|
|
func (f constNodeLoader) ObjectServiceLoad() float64 { return float64(f) }
|
|
|
|
// replicatorFunc is a Replicator backed by a function.
|
|
type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult)
|
|
|
|
func (f replicatorFunc) HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) {
|
|
f(ctx, task, res)
|
|
}
|