package policer import ( "bytes" "context" "errors" "sort" "testing" "time" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" objectcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/replicator" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" "github.com/panjf2000/ants/v2" "github.com/stretchr/testify/require" ) func TestBuryObjectWithoutContainer(t *testing.T) { // Key space addr := oidtest.Address() objs := []objectcore.Info{ { Address: addr, Type: objectSDK.TypeRegular, }, } // Container source and bury function buryCh := make(chan oid.Address) containerSrc := containerSrc{ get: func(id cid.ID) (*container.Container, error) { return nil, new(apistatus.ContainerNotFound) }, deletionInfo: func(id cid.ID) (*container.DelInfo, error) { return &container.DelInfo{}, nil }, } buryFn := func(ctx context.Context, a oid.Address) error { buryCh <- a return nil } // Policer instance p := New( WithKeySpaceIterator(&sliceKeySpaceIterator{objs: objs}), WithContainerSource(containerSrc), WithBuryFunc(buryFn), WithPool(testPool(t)), ) ctx, cancel := context.WithCancel(context.Background()) defer cancel() go p.Run(ctx) require.Equal(t, addr, <-buryCh) } func TestProcessObject(t *testing.T) { // Notes: // - nodes are referred to by their index throughout, which is embedded in the public key // - node with index 0 always refers to the local node, so there's no need to add it to objHolders // - policy is used only to match the number of replicas for each index in the placement tests := []struct { desc string objType objectSDK.Type nodeCount int policy string placement [][]int objHolders []int maintenanceNodes []int wantRemoveRedundant bool wantReplicateTo []int ecInfo *objectcore.ECInfo }{ { desc: "1 copy already held by local node", nodeCount: 1, policy: `REP 1`, placement: [][]int{{0}}, }, { desc: "1 copy already held by the remote node", nodeCount: 2, policy: `REP 1`, placement: [][]int{{1}}, objHolders: []int{1}, wantRemoveRedundant: true, }, { desc: "1 copy not yet held by the remote node", nodeCount: 2, policy: `REP 1`, placement: [][]int{{1}}, wantReplicateTo: []int{1}, }, { desc: "2 copies already held by local and remote node", nodeCount: 2, policy: `REP 2`, placement: [][]int{{0, 1}}, objHolders: []int{1}, }, { desc: "2 copies but not held by remote node", nodeCount: 2, policy: `REP 2`, placement: [][]int{{0, 1}}, wantReplicateTo: []int{1}, }, { desc: "multiple vectors already held by remote node", nodeCount: 2, policy: `REP 2 REP 2`, placement: [][]int{{0, 1}, {0, 1}}, objHolders: []int{1}, }, { desc: "multiple vectors not yet held by remote node", nodeCount: 2, policy: `REP 2 REP 2`, placement: [][]int{{0, 1}, {0, 1}}, wantReplicateTo: []int{1, 1}, // is this actually good? }, { desc: "lock object must be replicated to all nodes", objType: objectSDK.TypeLock, nodeCount: 3, policy: `REP 1`, placement: [][]int{{0, 1, 2}}, wantReplicateTo: []int{1, 2}, }, { desc: "preserve local copy when maintenance nodes exist", nodeCount: 3, policy: `REP 2`, placement: [][]int{{1, 2}}, objHolders: []int{1}, maintenanceNodes: []int{2}, }, { desc: "lock object must be replicated to all EC nodes", objType: objectSDK.TypeLock, nodeCount: 3, policy: `EC 1.1`, placement: [][]int{{0, 1, 2}}, wantReplicateTo: []int{1, 2}, }, { desc: "tombstone object must be replicated to all EC nodes", objType: objectSDK.TypeTombstone, nodeCount: 3, policy: `EC 1.1`, placement: [][]int{{0, 1, 2}}, wantReplicateTo: []int{1, 2}, }, } for i := range tests { ti := tests[i] t.Run(ti.desc, func(t *testing.T) { addr := oidtest.Address() // Netmap, placement policy and placement builder nodes := make([]netmap.NodeInfo, ti.nodeCount) for i := range nodes { nodes[i].SetPublicKey([]byte{byte(i)}) } for _, i := range ti.maintenanceNodes { nodes[i].SetStatus(netmap.Maintenance) } var policy netmap.PlacementPolicy require.NoError(t, policy.DecodeString(ti.policy)) placementVectors := make([][]netmap.NodeInfo, len(ti.placement)) for i, pv := range ti.placement { for _, nj := range pv { placementVectors[i] = append(placementVectors[i], nodes[nj]) } } placementBuilder := func(cnr cid.ID, obj *oid.ID, p netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { if cnr.Equals(addr.Container()) && obj != nil && obj.Equals(addr.Object()) { return placementVectors, nil } if ti.ecInfo != nil && cnr.Equals(addr.Container()) && obj != nil && obj.Equals(ti.ecInfo.ParentID) { return placementVectors, nil } t.Errorf("unexpected placement build: cid=%v oid=%v", cnr, obj) return nil, errors.New("unexpected placement build") } // Object remote header headFn := func(_ context.Context, ni netmap.NodeInfo, a oid.Address, raw bool) (*objectSDK.Object, error) { index := int(ni.PublicKey()[0]) if a != addr || index < 1 || index >= ti.nodeCount { t.Errorf("unexpected remote object head: node=%+v addr=%v", ni, a) return nil, errors.New("unexpected object head") } for _, i := range ti.objHolders { if index == i { return nil, nil } } return nil, new(apistatus.ObjectNotFound) } // Container source cnr := &container.Container{} cnr.Value.Init() cnr.Value.SetPlacementPolicy(policy) containerSrc := containerSrc{ get: func(id cid.ID) (*container.Container, error) { if id.Equals(addr.Container()) { return cnr, nil } t.Errorf("unexpected container requested: got=%v, want=%v", id, addr.Container()) return nil, new(apistatus.ContainerNotFound) }, deletionInfo: func(id cid.ID) (*container.DelInfo, error) { return &container.DelInfo{}, nil }, } buryFn := func(ctx context.Context, a oid.Address) error { t.Errorf("unexpected object buried: %v", a) return nil } // Policer instance var gotRemoveRedundant bool var gotReplicateTo []int p := New( WithContainerSource(containerSrc), WithPlacementBuilder(placementBuilderFunc(placementBuilder)), WithNetmapKeys(announcedKeysFunc(func(k []byte) bool { return bytes.Equal(k, nodes[0].PublicKey()) })), WithRemoteObjectHeaderFunc(headFn), WithBuryFunc(buryFn), WithRedundantCopyCallback(func(_ context.Context, a oid.Address) { require.True(t, a.Equals(addr), "unexpected redundant copy callback: a=%v", a) gotRemoveRedundant = true }), WithReplicator(&testReplicator{ handleReplicationTask: func(_ context.Context, task replicator.Task, res replicator.TaskResult) { require.True(t, task.Addr.Equals(addr), "unexpected replicator task: %+v", task) for _, node := range task.Nodes { gotReplicateTo = append(gotReplicateTo, int(node.PublicKey()[0])) } }, }), WithPool(testPool(t)), ) addrWithType := objectcore.Info{ Address: addr, Type: ti.objType, ECInfo: ti.ecInfo, } err := p.processObject(context.Background(), addrWithType) require.NoError(t, err) sort.Ints(gotReplicateTo) require.Equal(t, ti.wantRemoveRedundant, gotRemoveRedundant) require.Equal(t, ti.wantReplicateTo, gotReplicateTo) }) } } func TestProcessObjectError(t *testing.T) { addr := oidtest.Address() // Container source cnr := &container.Container{} cnr.Value.Init() source := containerSrc{ get: func(id cid.ID) (*container.Container, error) { return nil, new(apistatus.ContainerNotFound) }, deletionInfo: func(id cid.ID) (*container.DelInfo, error) { return nil, new(apistatus.ContainerNotFound) }, } buryFn := func(ctx context.Context, a oid.Address) error { t.Errorf("unexpected object buried: %v", a) return nil } p := New( WithContainerSource(source), WithBuryFunc(buryFn), WithPool(testPool(t)), ) addrWithType := objectcore.Info{ Address: addr, } require.True(t, client.IsErrContainerNotFound(p.processObject(context.Background(), addrWithType))) } func TestIteratorContract(t *testing.T) { addr := oidtest.Address() objs := []objectcore.Info{{ Address: addr, Type: objectSDK.TypeRegular, }} buryFn := func(ctx context.Context, a oid.Address) error { return nil } it := &predefinedIterator{ scenario: []nextResult{ {objs, nil}, {nil, errors.New("opaque")}, {nil, engine.ErrEndOfListing}, {nil, engine.ErrEndOfListing}, {nil, errors.New("opaque")}, {objs, engine.ErrEndOfListing}, }, finishCh: make(chan struct{}), } containerSrc := containerSrc{ get: func(id cid.ID) (*container.Container, error) { return nil, new(apistatus.ContainerNotFound) }, deletionInfo: func(id cid.ID) (*container.DelInfo, error) { return &container.DelInfo{}, nil }, } p := New( WithKeySpaceIterator(it), WithContainerSource(containerSrc), WithBuryFunc(buryFn), WithPool(testPool(t)), func(c *cfg) { c.sleepDuration = time.Millisecond }, ) ctx, cancel := context.WithCancel(context.Background()) go p.Run(ctx) <-it.finishCh cancel() require.Equal(t, []string{ "Next", "Next", "Next", "Rewind", "Next", "Rewind", "Next", "Next", "Rewind", }, it.calls) } func testPool(t *testing.T) *ants.Pool { pool, err := ants.NewPool(4) require.NoError(t, err) return pool } type nextResult struct { objs []objectcore.Info err error } type predefinedIterator struct { scenario []nextResult finishCh chan struct{} pos int calls []string } func (it *predefinedIterator) Next(ctx context.Context, size uint32) ([]objectcore.Info, error) { if it.pos == len(it.scenario) { close(it.finishCh) <-ctx.Done() return nil, nil } res := it.scenario[it.pos] it.pos += 1 it.calls = append(it.calls, "Next") return res.objs, res.err } func (it *predefinedIterator) Rewind() { it.calls = append(it.calls, "Rewind") } // sliceKeySpaceIterator is a KeySpaceIterator backed by a slice. type sliceKeySpaceIterator struct { objs []objectcore.Info cur int } func (it *sliceKeySpaceIterator) Next(_ context.Context, size uint32) ([]objectcore.Info, error) { if it.cur >= len(it.objs) { return nil, engine.ErrEndOfListing } end := min(it.cur+int(size), len(it.objs)) ret := it.objs[it.cur:end] it.cur = end return ret, nil } func (it *sliceKeySpaceIterator) Rewind() { it.cur = 0 } type containerSrc struct { get func(id cid.ID) (*container.Container, error) deletionInfo func(id cid.ID) (*container.DelInfo, error) } func (f containerSrc) Get(id cid.ID) (*container.Container, error) { return f.get(id) } func (f containerSrc) DeletionInfo(id cid.ID) (*container.DelInfo, error) { return f.deletionInfo(id) } // placementBuilderFunc is a placement.Builder backed by a function type placementBuilderFunc func(cid.ID, *oid.ID, netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) func (f placementBuilderFunc) BuildPlacement(c cid.ID, o *oid.ID, p netmap.PlacementPolicy) ([][]netmap.NodeInfo, error) { return f(c, o, p) } // announcedKeysFunc is a netmap.AnnouncedKeys backed by a function. type announcedKeysFunc func([]byte) bool func (f announcedKeysFunc) IsLocalKey(k []byte) bool { return f(k) } type testReplicator struct { handleReplicationTask func(ctx context.Context, task replicator.Task, res replicator.TaskResult) handleLocalPutTask func(ctx context.Context, task replicator.Task) handlePullTask func(ctx context.Context, task replicator.Task) } func (r *testReplicator) HandleReplicationTask(ctx context.Context, task replicator.Task, res replicator.TaskResult) { r.handleReplicationTask(ctx, task, res) } func (r *testReplicator) HandleLocalPutTask(ctx context.Context, task replicator.Task) { r.handleLocalPutTask(ctx, task) } func (r *testReplicator) HandlePullTask(ctx context.Context, task replicator.Task) { r.handlePullTask(ctx, task) }