package policer import ( "bytes" "context" "errors" "sort" "testing" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/panjf2000/ants/v2" "github.com/stretchr/testify/require" ) func TestBuryObjectWithoutContainer(t *testing.T) { // Key space addr := oidtest.Address() objs := []objectcore.AddressWithType{ { Address: addr, Type: objectSDK.TypeRegular, }, } // Container source and bury function buryCh := make(chan oid.Address) containerSrc := func(id cid.ID) (*container.Container, error) { return nil, apistatus.ContainerNotFound{} } buryFn := func(ctx context.Context, a oid.Address) error { buryCh <- a return nil } // Task pool pool, err := ants.NewPool(4) require.NoError(t, err) // Policer instance p := New( WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}), WithContainerSource(containerSrcFunc(containerSrc)), WithBuryFunc(buryFn), WithPool(pool), ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go p.Run(ctx) require.Equal(t, addr, <-buryCh) } func TestProcessObject(t *testing.T) { // Notes: // - nodes are referred to by their index throughout, which is embedded in the public key // - node with index 0 always refers to the local node, so there's no need to add it to objHolders // - policy is used only to match the number of replicas for each index in the placement tests := []struct { desc string objType objectSDK.Type nodeCount int policy string placement [][]int objHolders []int maintenanceNodes []int wantRemoveRedundant bool wantReplicateTo []int }{ { desc: "1 copy already held by local node", nodeCount: 1, policy: `REP 1`, placement: [][]int{{0}}, }, { desc: "1 copy already held by the remote node", nodeCount: 2, policy: `REP 1`, placement: [][]int{{1}}, objHolders: []int{1}, wantRemoveRedundant: true, }, { desc: "1 copy not yet held by the remote node", nodeCount: 2, policy: `REP 1`, placement: [][]int{{1}}, wantReplicateTo: []int{1}, }, { desc: "2 copies already held by local and remote node", nodeCount: 2, policy: `REP 2`, placement: [][]int{{0, 1}}, objHolders: []int{1}, }, { desc: "2 copies but not held by remote node", nodeCount: 2, policy: `REP 2`, placement: [][]int{{0, 1}}, wantReplicateTo: []int{1}, }, { desc: "multiple vectors already held by remote node", nodeCount: 2, policy: `REP 2 REP 2`, placement: [][]int{{0, 1}, {0, 1}}, objHolders: []int{1}, }, { desc: "multiple vectors not yet held by remote node", nodeCount: 2, policy: `REP 2 REP 2`, placement: [][]int{{0, 1}, {0, 1}}, wantReplicateTo: []int{1, 1}, // is this actually good? }, { desc: "lock object must be replicated to all nodes", objType: objectSDK.TypeLock, nodeCount: 3, policy: `REP 1`, placement: [][]int{{0, 1, 2}}, wantReplicateTo: []int{1, 2}, }, { desc: "preserve local copy when maintenance nodes exist", nodeCount: 3, policy: `REP 2`, placement: [][]int{{1, 2}}, objHolders: []int{1}, maintenanceNodes: []int{2}, }, } for i := range tests { ti := tests[i] t.Run(ti.desc, func(t *testing.T) { addr := oidtest.Address() // Netmap, placement policy and placement builder nodes := make([]netmap.NodeInfo, ti.nodeCount) for i := range nodes { nodes[i].SetPublicKey([]byte{byte(i)}) } for _, i := range ti.maintenanceNodes { nodes[i].SetMaintenance() } var policy netmap.PlacementPolicy require.NoError(t, policy.DecodeString(ti.policy)) placementVectors := make([][]netmap.NodeInfo, len(ti.placement)) for i, pv := range ti.placement { for _, nj := range pv { placementVectors[i] = append(placementVectors[i], nodes[nj]) } } placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { if cnr.Equals(addr.Container()) && obj != nil && obj.Equals(addr.Object()) { return placementVectors, nil } t.Errorf("unexpected placement build: cid=%v oid=%v", cnr, obj) return nil, errors.New("unexpected placement build") } // Object remote header headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address) (*objectSDK.Object, error) { index := int(ni.PublicKey()[0]) if a != addr || index < 1 || index >= ti.nodeCount { t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a) return nil, errors.New("unexpected object head") } for _, i := range ti.objHolders { if index == i { return nil, nil } } return nil, apistatus.ObjectNotFound{} } // Container source cnr := &container.Container{} cnr.Value.Init() cnr.Value.SetPlacementPolicy(policy) containerSrc := func(id cid.ID) (*container.Container, error) { if id.Equals(addr.Container()) { return cnr, nil } t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container()) return nil, apistatus.ContainerNotFound{} } buryFn := func(ctx context.Context, a oid.Address) error { t.Errorf("unexpected object buried: %v", a) return nil } // Policer instance var gotRemoveRedundant bool var gotReplicateTo []int p := New( WithContainerSource(containerSrcFunc(containerSrc)), WithPlacementBuilder(placementBuilderFunc(placementBuilder)), WithNetmapKeys(announcedKeysFunc(func(k []byte) bool { return bytes.Equal(k, nodes[0].PublicKey()) })), WithRemoteObjectHeaderFunc(headFn), WithBuryFunc(buryFn), WithRedundantCopyCallback(func(_ context.Context, a oid.Address) { require.True(t, a.Equals(addr), "unexpected redundant copy callback: a=%v", a) gotRemoveRedundant = true }), WithReplicator(replicatorFunc(func(_ context.Context, task replicator.Task, res replicator.TaskResult) { require.True(t, task.Addr.Equals(addr), "unexpected replicator task: %+v", task) for _, node := range task.Nodes { gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0])) } })), ) addrWithType := objectcore.AddressWithType{ Address: addr, Type: ti.objType, } p.processObject(context.Background(), addrWithType) sort.Ints(gotReplicateTo) require.Equal(t, ti.wantRemoveRedundant, gotRemoveRedundant) require.Equal(t, ti.wantReplicateTo, gotReplicateTo) }) } } func TestIteratorContract(t *testing.T) { addr := oidtest.Address() objs := []objectcore.AddressWithType{{ Address: addr, Type: objectSDK.TypeRegular, }} containerSrc := func(id cid.ID) (*container.Container, error) { return nil, apistatus.ContainerNotFound{} } buryFn := func(ctx context.Context, a oid.Address) error { return nil } pool, err := ants.NewPool(4) require.NoError(t, err) it := &predefinedIterator{ scenario: []nextResult{ {objs, nil}, {nil, errors.New("opaque")}, {nil, engine.ErrEndOfListing}, {nil, engine.ErrEndOfListing}, {nil, errors.New("opaque")}, {objs, engine.ErrEndOfListing}, }, finishCh: make(chan struct{}), } p := New( WithKeySpaceIterator(it), WithContainerSource(containerSrcFunc(containerSrc)), WithBuryFunc(buryFn), WithPool(pool), func(c *cfg) { c.sleepDuration = time.Millisecond }, ) ctx, cancel := context.WithCancel(context.Background()) go p.Run(ctx) <-it.finishCh cancel() require.Equal(t, []string{ "Next", "Next", "Next", "Rewind", "Next", "Rewind", "Next", "Next", "Rewind", }, it.calls) } type nextResult struct { objs []objectcore.AddressWithType err error } type predefinedIterator struct { scenario []nextResult finishCh chan struct{} pos int calls []string } func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.AddressWithType, error) { if it.pos == len(it.scenario) { close(it.finishCh) <-ctx.Done() return nil, nil } res := it.scenario[it.pos] it.pos += 1 it.calls = append(it.calls, "Next") return res.objs, res.err } func (it *predefinedIterator) Rewind() { it.calls = append(it.calls, "Rewind") } // sliceKeySpaceIterator is a KeySpaceIterator backed by a slice. type sliceKeySpaceIterator struct { objs []objectcore.AddressWithType cur int } func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.AddressWithType, error) { if it.cur >= len(it.objs) { return nil, engine.ErrEndOfListing } end := it.cur + int(size) if end > len(it.objs) { end = len(it.objs) } ret := it.objs[it.cur:end] it.cur = end return ret, nil } func (it *sliceKeySpaceIterator) Rewind() { it.cur = 0 } // containerSrcFunc is a container.Source backed by a function. type containerSrcFunc func(cid.ID) (*container.Container, error) func (f containerSrcFunc) Get(id cid.ID) (*container.Container, error) { return f(id) } // placementBuilderFunc is a placement.Builder backed by a function type placementBuilderFunc func(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) func (f placementBuilderFunc) BuildPlacement(c cid.ID, o *oid.ID, p netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { return f(c, o, p) } // announcedKeysFunc is a netmap.AnnouncedKeys backed by a function. type announcedKeysFunc func([]byte) bool func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) } // replicatorFunc is a Replicator backed by a function. type replicatorFunc func(context.Context, replicator.Task, replicator.TaskResult) func (f replicatorFunc) HandleTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) { f(ctx, task, res) }