diff --git a/pkg/rpcclient/local.go b/pkg/rpcclient/local.go new file mode 100644 index 000000000..19e67ceaa --- /dev/null +++ b/pkg/rpcclient/local.go @@ -0,0 +1,82 @@ +package rpcclient + +import ( + "context" + + "github.com/nspcc-dev/neo-go/pkg/neorpc" + "go.uber.org/atomic" +) + +// InternalHook is a function signature that is required to create a local client +// (see NewInternal). It performs registration of local client's event channel +// and returns a request handler function. +type InternalHook func(context.Context, chan<- neorpc.Notification) func(*neorpc.Request) (*neorpc.Response, error) + +// Internal is an experimental "local" client that does not connect to RPC via +// network. It's made for deeply integrated applications like NeoFS that have +// blockchain running in the same process, so use it only if you know what you're +// doing. It provides the same interface WSClient does. +type Internal struct { + WSClient + + events chan neorpc.Notification +} + +// NewInternal creates an instance of internal client. It accepts a method +// provided by RPC server. +func NewInternal(ctx context.Context, register InternalHook) (*Internal, error) { + c := &Internal{ + WSClient: WSClient{ + Client: Client{}, + Notifications: make(chan Notification), + + shutdown: make(chan struct{}), + done: make(chan struct{}), + closeCalled: *atomic.NewBool(false), + subscriptions: make(map[string]notificationReceiver), + receivers: make(map[interface{}][]string), + }, + events: make(chan neorpc.Notification), + } + + err := initClient(ctx, &c.WSClient.Client, "localhost:0", Options{}) + if err != nil { + return nil, err // Can't really happen for internal client. + } + c.cli = nil + go c.eventLoop() + // c.ctx is inherited from ctx in fact (see initClient). + c.requestF = register(c.ctx, c.events) //nolint:contextcheck // Non-inherited new context, use function like `context.WithXXX` instead + return c, nil +} + +func (c *Internal) eventLoop() { +eventloop: + for { + select { + case <-c.ctx.Done(): + break eventloop + case <-c.shutdown: + break eventloop + case ev := <-c.events: + ntf := Notification{Type: ev.Event} + if len(ev.Payload) > 0 { + ntf.Value = ev.Payload[0] + } + c.notifySubscribers(ntf) + } + } + close(c.done) + close(c.Notifications) + c.ctxCancel() + // ctx is cancelled, server is notified and will finish soon. +drainloop: + for { + select { + case <-c.events: + default: + break drainloop + } + } + close(c.events) +} diff --git a/pkg/rpcclient/local_test.go b/pkg/rpcclient/local_test.go new file mode 100644 index 000000000..0be93a646 --- /dev/null +++ b/pkg/rpcclient/local_test.go @@ -0,0 +1,18 @@ +package rpcclient + +import ( + "context" + "testing" + + "github.com/nspcc-dev/neo-go/pkg/neorpc" + "github.com/stretchr/testify/require" +) + +func TestInternalClientClose(t *testing.T) { + icl, err := NewInternal(context.TODO(), func(ctx context.Context, ch chan<- neorpc.Notification) func(*neorpc.Request) (*neorpc.Response, error) { + return nil + }) + require.NoError(t, err) + icl.Close() + require.NoError(t, icl.GetError()) +} diff --git a/pkg/rpcclient/wsclient.go b/pkg/rpcclient/wsclient.go index 08dc72310..8cb224ccf 100644 --- a/pkg/rpcclient/wsclient.go +++ b/pkg/rpcclient/wsclient.go @@ -456,7 +456,7 @@ readloop: connCloseErr = fmt.Errorf("bad event received: %s / %d", event, len(rr.RawParams)) break readloop } - var val interface{} + ntf := Notification{Type: event} switch event { case neorpc.BlockEventID: sr, err := c.StateRootInHeader() @@ -465,15 +465,15 @@ readloop: connCloseErr = fmt.Errorf("failed to fetch StateRootInHeader: %w", err) break readloop } - val = block.New(sr) + ntf.Value = block.New(sr) case neorpc.TransactionEventID: - val = &transaction.Transaction{} + ntf.Value = &transaction.Transaction{} case neorpc.NotificationEventID: - val = new(state.ContainedNotificationEvent) + ntf.Value = new(state.ContainedNotificationEvent) case neorpc.ExecutionEventID: - val = new(state.AppExecResult) + ntf.Value = new(state.AppExecResult) case neorpc.NotaryRequestEventID: - val = new(result.NotaryRequestEvent) + ntf.Value = new(result.NotaryRequestEvent) case neorpc.MissedEventID: // No value. default: @@ -482,32 +482,14 @@ readloop: break readloop } if event != neorpc.MissedEventID { - err = json.Unmarshal(rr.RawParams[0], val) + err = json.Unmarshal(rr.RawParams[0], ntf.Value) if err != nil { // Bad event received. connCloseErr = fmt.Errorf("failed to unmarshal event of type %s from JSON: %w", event, err) break readloop } } - if event == neorpc.MissedEventID { - c.subscriptionsLock.Lock() - for rcvr, ids := range c.receivers { - c.subscriptions[ids[0]].Close() - delete(c.receivers, rcvr) - } - c.subscriptionsLock.Unlock() - continue readloop - } - c.subscriptionsLock.RLock() - ntf := Notification{Type: event, Value: val} - for _, ids := range c.receivers { - for _, id := range ids { - if c.subscriptions[id].TrySend(ntf) { - break // strictly one notification per channel - } - } - } - c.subscriptionsLock.RUnlock() + c.notifySubscribers(ntf) } else if rr.ID != nil && (rr.Error != nil || rr.Result != nil) { id, err := strconv.ParseUint(string(rr.ID), 10, 64) if err != nil { @@ -580,6 +562,27 @@ writeloop: } } +func (c *WSClient) notifySubscribers(ntf Notification) { + if ntf.Type == neorpc.MissedEventID { + c.subscriptionsLock.Lock() + for rcvr, ids := range c.receivers { + c.subscriptions[ids[0]].Close() + delete(c.receivers, rcvr) + } + c.subscriptionsLock.Unlock() + return + } + c.subscriptionsLock.RLock() + for _, ids := range c.receivers { + for _, id := range ids { + if c.subscriptions[id].TrySend(ntf) { + break // strictly one notification per channel + } + } + } + c.subscriptionsLock.RUnlock() +} + func (c *WSClient) unregisterRespChannel(id uint64) { c.respLock.Lock() defer c.respLock.Unlock() diff --git a/pkg/services/rpcsrv/client_test.go b/pkg/services/rpcsrv/client_test.go index e0a075278..031dab5ee 100644 --- a/pkg/services/rpcsrv/client_test.go +++ b/pkg/services/rpcsrv/client_test.go @@ -2033,15 +2033,45 @@ func TestClient_Wait(t *testing.T) { check(t, util.Uint256{1, 2, 3}, chain.BlockHeight()-1, true) } -func TestWSClient_Wait(t *testing.T) { +func mkSubsClient(t *testing.T, rpcSrv *Server, httpSrv *httptest.Server, local bool) *rpcclient.WSClient { + var ( + c *rpcclient.WSClient + err error + icl *rpcclient.Internal + ) + if local { + icl, err = rpcclient.NewInternal(context.Background(), rpcSrv.RegisterLocal) + } else { + url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws" + c, err = rpcclient.NewWS(context.Background(), url, rpcclient.Options{}) + } + require.NoError(t, err) + if local { + c = &icl.WSClient + } + require.NoError(t, c.Init()) + return c +} + +func runWSAndLocal(t *testing.T, test func(*testing.T, bool)) { + t.Run("ws", func(t *testing.T) { + test(t, false) + }) + t.Run("local", func(t *testing.T) { + test(t, true) + }) +} + +func TestSubClientWait(t *testing.T) { + runWSAndLocal(t, testSubClientWait) +} + +func testSubClientWait(t *testing.T, local bool) { chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false, true) defer chain.Close() defer rpcSrv.Shutdown() - url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws" - c, err := rpcclient.NewWS(context.Background(), url, rpcclient.Options{}) - require.NoError(t, err) - require.NoError(t, c.Init()) + c := mkSubsClient(t, rpcSrv, httpSrv, local) acc, err := wallet.NewAccount() require.NoError(t, err) act, err := actor.New(c, []actor.SignerAccount{ @@ -2135,15 +2165,16 @@ func TestWSClient_Wait(t *testing.T) { require.True(t, faultedChecked, "FAULTed transaction wasn't checked") } -func TestWSClient_WaitWithLateSubscription(t *testing.T) { +func TestSubClientWaitWithLateSubscription(t *testing.T) { + runWSAndLocal(t, testSubClientWaitWithLateSubscription) +} + +func testSubClientWaitWithLateSubscription(t *testing.T, local bool) { chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false, true) defer chain.Close() defer rpcSrv.Shutdown() - url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws" - c, err := rpcclient.NewWS(context.Background(), url, rpcclient.Options{}) - require.NoError(t, err) - require.NoError(t, c.Init()) + c := mkSubsClient(t, rpcSrv, httpSrv, local) acc, err := wallet.NewAccount() require.NoError(t, err) act, err := actor.New(c, []actor.SignerAccount{ @@ -2182,15 +2213,16 @@ func TestWSClientHandshakeError(t *testing.T) { require.ErrorContains(t, err, "websocket users limit reached") } -func TestWSClient_WaitWithMissedEvent(t *testing.T) { +func TestSubClientWaitWithMissedEvent(t *testing.T) { + runWSAndLocal(t, testSubClientWaitWithMissedEvent) +} + +func testSubClientWaitWithMissedEvent(t *testing.T, local bool) { chain, rpcSrv, httpSrv := initClearServerWithServices(t, false, false, true) defer chain.Close() defer rpcSrv.Shutdown() - url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws" - c, err := rpcclient.NewWS(context.Background(), url, rpcclient.Options{}) - require.NoError(t, err) - require.NoError(t, c.Init()) + c := mkSubsClient(t, rpcSrv, httpSrv, local) acc, err := wallet.NewAccount() require.NoError(t, err) act, err := actor.New(c, []actor.SignerAccount{ @@ -2233,18 +2265,19 @@ func TestWSClient_WaitWithMissedEvent(t *testing.T) { // Accept the next block, but subscriber will get no events because it's overflown. require.NoError(t, chain.AddBlock(b1)) - overEvent, err := json.Marshal(neorpc.Notification{ + overNotification := neorpc.Notification{ JSONRPC: neorpc.JSONRPCVersion, Event: neorpc.MissedEventID, Payload: make([]interface{}, 0), - }) + } + overEvent, err := json.Marshal(overNotification) require.NoError(t, err) overflowMsg, err := websocket.NewPreparedMessage(websocket.TextMessage, overEvent) require.NoError(t, err) rpcSrv.subsLock.Lock() // Deliver overflow message -> triggers subscriber to retry with polling waiter. for s := range rpcSrv.subscribers { - s.writer <- overflowMsg + s.writer <- intEvent{overflowMsg, &overNotification} } rpcSrv.subsLock.Unlock() @@ -2271,10 +2304,7 @@ func TestWSClient_SubscriptionsCompat(t *testing.T) { defer chain.Close() defer rpcSrv.Shutdown() - url := "ws" + strings.TrimPrefix(httpSrv.URL, "http") + "/ws" - c, err := rpcclient.NewWS(context.Background(), url, rpcclient.Options{}) - require.NoError(t, err) - require.NoError(t, c.Init()) + c := mkSubsClient(t, rpcSrv, httpSrv, false) blocks := getTestBlocks(t) bCount := uint32(0) @@ -2289,8 +2319,11 @@ func TestWSClient_SubscriptionsCompat(t *testing.T) { return b1, primary, sender, ntfName, st } checkDeprecated := func(t *testing.T, filtered bool) { + var ( + bID, txID, ntfID, aerID string + err error + ) b, primary, sender, ntfName, st := getData(t) - var bID, txID, ntfID, aerID string if filtered { bID, err = c.SubscribeForNewBlocks(&primary) //nolint:staticcheck // SA1019: c.SubscribeForNewBlocks is deprecated require.NoError(t, err) @@ -2381,6 +2414,7 @@ func TestWSClient_SubscriptionsCompat(t *testing.T) { txFlt *neorpc.TxFilter ntfFlt *neorpc.NotificationFilter aerFlt *neorpc.ExecutionFilter + err error ) if filtered { bFlt = &neorpc.BlockFilter{Primary: &primary} diff --git a/pkg/services/rpcsrv/local_test.go b/pkg/services/rpcsrv/local_test.go new file mode 100644 index 000000000..50e7660da --- /dev/null +++ b/pkg/services/rpcsrv/local_test.go @@ -0,0 +1,57 @@ +package rpcsrv + +import ( + "context" + "math/big" + "testing" + + "github.com/nspcc-dev/neo-go/internal/testchain" + "github.com/nspcc-dev/neo-go/pkg/config" + "github.com/nspcc-dev/neo-go/pkg/rpcclient" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/actor" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/gas" + "github.com/nspcc-dev/neo-go/pkg/rpcclient/invoker" + "github.com/nspcc-dev/neo-go/pkg/util" + "github.com/nspcc-dev/neo-go/pkg/wallet" + "github.com/stretchr/testify/require" +) + +func TestLocalClient(t *testing.T) { + _, rpcSrv, _ := initClearServerWithCustomConfig(t, func(cfg *config.Config) { + // No addresses configured -> RPC server listens nothing (but it + // has MaxGasInvoke, sessions and other stuff). + cfg.ApplicationConfiguration.RPC.BasicService.Enabled = true + cfg.ApplicationConfiguration.RPC.BasicService.Address = nil //nolint:staticcheck // SA1019: cfg.ApplicationConfiguration.RPC.BasicService.Address is deprecated + cfg.ApplicationConfiguration.RPC.BasicService.Port = nil //nolint:staticcheck // SA1019: cfg.ApplicationConfiguration.RPC.BasicService.Port is deprecated + cfg.ApplicationConfiguration.RPC.BasicService.Addresses = nil + cfg.ApplicationConfiguration.RPC.TLSConfig.Address = nil //nolint:staticcheck // SA1019: cfg.ApplicationConfiguration.RPC.TLSConfig.Address is deprecated + cfg.ApplicationConfiguration.RPC.TLSConfig.Port = nil //nolint:staticcheck // SA1019: cfg.ApplicationConfiguration.RPC.TLSConfig.Port is deprecated + cfg.ApplicationConfiguration.RPC.TLSConfig.Addresses = nil + }) + // RPC server listens nothing (not exposed in any way), but it works for internal clients. + c, err := rpcclient.NewInternal(context.TODO(), rpcSrv.RegisterLocal) + require.NoError(t, err) + require.NoError(t, c.Init()) + + // Invokers can use this client. + gasReader := gas.NewReader(invoker.New(c, nil)) + d, err := gasReader.Decimals() + require.NoError(t, err) + require.EqualValues(t, 8, d) + + // Actors can use it as well + priv := testchain.PrivateKeyByID(0) + acc := wallet.NewAccountFromPrivateKey(priv) + addr := priv.PublicKey().GetScriptHash() + + act, err := actor.NewSimple(c, acc) + require.NoError(t, err) + gasprom := gas.New(act) + txHash, _, err := gasprom.Transfer(addr, util.Uint160{}, big.NewInt(1000), nil) + require.NoError(t, err) + // No new blocks are produced here, but the tx is OK and is in the mempool. + txes, err := c.GetRawMemPool() + require.NoError(t, err) + require.Equal(t, []util.Uint256{txHash}, txes) + // Subscriptions are checked by other tests. +} diff --git a/pkg/services/rpcsrv/params/params.go b/pkg/services/rpcsrv/params/params.go index 92a197f8d..23e5fe189 100644 --- a/pkg/services/rpcsrv/params/params.go +++ b/pkg/services/rpcsrv/params/params.go @@ -1,12 +1,29 @@ package params -import "fmt" +import ( + "encoding/json" + "fmt" +) type ( // Params represents the JSON-RPC params. Params []Param ) +// FromAny allows to create Params for a slice of abstract values (by +// JSON-marshaling them). +func FromAny(arr []interface{}) (Params, error) { + var res Params + for i := range arr { + b, err := json.Marshal(arr[i]) + if err != nil { + return nil, fmt.Errorf("wrong parameter %d: %w", i, err) + } + res = append(res, Param{RawMessage: json.RawMessage(b)}) + } + return res, nil +} + // Value returns the param struct for the given // index if it exists. func (p Params) Value(index int) *Param { diff --git a/pkg/services/rpcsrv/params/params_test.go b/pkg/services/rpcsrv/params/params_test.go new file mode 100644 index 000000000..8fd066951 --- /dev/null +++ b/pkg/services/rpcsrv/params/params_test.go @@ -0,0 +1,31 @@ +package params + +import ( + "testing" + + "github.com/nspcc-dev/neo-go/pkg/smartcontract" + "github.com/stretchr/testify/require" +) + +func TestParamsFromAny(t *testing.T) { + str := "jajaja" + + ps, err := FromAny([]interface{}{str, smartcontract.Parameter{Type: smartcontract.StringType, Value: str}}) + require.NoError(t, err) + require.Equal(t, 2, len(ps)) + + resStr, err := ps[0].GetString() + require.NoError(t, err) + require.Equal(t, resStr, str) + + resFP, err := ps[1].GetFuncParam() + require.NoError(t, err) + require.Equal(t, resFP.Type, smartcontract.StringType) + resStr, err = resFP.Value.GetString() + require.NoError(t, err) + require.Equal(t, resStr, str) + + // Invalid item. + _, err = FromAny([]interface{}{smartcontract.Parameter{Type: smartcontract.IntegerType, Value: str}}) + require.Error(t, err) +} diff --git a/pkg/services/rpcsrv/server.go b/pkg/services/rpcsrv/server.go index e2a5d2a58..6531ccaf8 100644 --- a/pkg/services/rpcsrv/server.go +++ b/pkg/services/rpcsrv/server.go @@ -469,8 +469,8 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ return } resChan := make(chan abstractResult) // response.abstract or response.abstractBatch - subChan := make(chan *websocket.PreparedMessage, notificationBufSize) - subscr := &subscriber{writer: subChan, ws: ws} + subChan := make(chan intEvent, notificationBufSize) + subscr := &subscriber{writer: subChan} s.subsLock.Lock() s.subscribers[subscr] = true s.subsLock.Unlock() @@ -505,6 +505,19 @@ func (s *Server) handleHTTPRequest(w http.ResponseWriter, httpRequest *http.Requ s.writeHTTPServerResponse(req, w, resp) } +// RegisterLocal performs local client registration. +func (s *Server) RegisterLocal(ctx context.Context, events chan<- neorpc.Notification) func(*neorpc.Request) (*neorpc.Response, error) { + subChan := make(chan intEvent, notificationBufSize) + subscr := &subscriber{writer: subChan} + s.subsLock.Lock() + s.subscribers[subscr] = true + s.subsLock.Unlock() + go s.handleLocalNotifications(ctx, events, subChan, subscr) + return func(req *neorpc.Request) (*neorpc.Response, error) { + return s.handleInternal(req, subscr) + } +} + func (s *Server) handleRequest(req *params.Request, sub *subscriber) abstractResult { if req.In != nil { req.In.Method = escapeForLog(req.In.Method) // No valid method name will be changed by it. @@ -518,6 +531,51 @@ func (s *Server) handleRequest(req *params.Request, sub *subscriber) abstractRes return resp } +// handleInternal is an experimental interface to handle client requests directly. +func (s *Server) handleInternal(req *neorpc.Request, sub *subscriber) (*neorpc.Response, error) { + var ( + res interface{} + rpcRes = &neorpc.Response{ + HeaderAndError: neorpc.HeaderAndError{ + Header: neorpc.Header{ + JSONRPC: req.JSONRPC, + ID: json.RawMessage(strconv.FormatUint(req.ID, 10)), + }, + }, + } + ) + reqParams, err := params.FromAny(req.Params) + if err != nil { + return nil, err + } + + s.log.Debug("processing local rpc request", + zap.String("method", req.Method), + zap.Stringer("params", reqParams)) + + start := time.Now() + defer func() { addReqTimeMetric(req.Method, time.Since(start)) }() + + rpcRes.Error = neorpc.NewMethodNotFoundError(fmt.Sprintf("method %q not supported", req.Method)) + handler, ok := rpcHandlers[req.Method] + if ok { + res, rpcRes.Error = handler(s, reqParams) + } else if sub != nil { + handler, ok := rpcWsHandlers[req.Method] + if ok { + res, rpcRes.Error = handler(s, reqParams, sub) + } + } + if res != nil { + b, err := json.Marshal(res) + if err != nil { + return nil, fmt.Errorf("response can't be JSONized: %w", err) + } + rpcRes.Result = json.RawMessage(b) + } + return rpcRes, nil +} + func (s *Server) handleIn(req *params.In, sub *subscriber) abstract { var res interface{} var resErr *neorpc.Error @@ -547,7 +605,31 @@ func (s *Server) handleIn(req *params.In, sub *subscriber) abstract { return s.packResponse(req, res, resErr) } -func (s *Server) handleWsWrites(ws *websocket.Conn, resChan <-chan abstractResult, subChan <-chan *websocket.PreparedMessage) { +func (s *Server) handleLocalNotifications(ctx context.Context, events chan<- neorpc.Notification, subChan <-chan intEvent, subscr *subscriber) { +eventloop: + for { + select { + case <-s.shutdown: + break eventloop + case <-ctx.Done(): + break eventloop + case ev := <-subChan: + events <- *ev.ntf // Make a copy. + } + } + close(events) + s.dropSubscriber(subscr) +drainloop: + for { + select { + case <-subChan: + default: + break drainloop + } + } +} + +func (s *Server) handleWsWrites(ws *websocket.Conn, resChan <-chan abstractResult, subChan <-chan intEvent) { pingTicker := time.NewTicker(wsPingPeriod) eventloop: for { @@ -561,7 +643,7 @@ eventloop: if err := ws.SetWriteDeadline(time.Now().Add(wsWriteLimit)); err != nil { break eventloop } - if err := ws.WritePreparedMessage(event); err != nil { + if err := ws.WritePreparedMessage(event.msg); err != nil { break eventloop } case res, ok := <-resChan: @@ -621,7 +703,12 @@ requestloop: case resChan <- res: } } + s.dropSubscriber(subscr) + close(resChan) + ws.Close() +} +func (s *Server) dropSubscriber(subscr *subscriber) { s.subsLock.Lock() delete(s.subscribers, subscr) s.subsLock.Unlock() @@ -632,8 +719,6 @@ requestloop: } } s.subsCounterLock.Unlock() - close(resChan) - ws.Close() } func (s *Server) getBestBlockHash(_ params.Params) (interface{}, *neorpc.Error) { @@ -2581,11 +2666,12 @@ func (s *Server) unsubscribeFromChannel(event neorpc.EventID) { } func (s *Server) handleSubEvents() { - b, err := json.Marshal(neorpc.Notification{ + var overflowEvent = neorpc.Notification{ JSONRPC: neorpc.JSONRPCVersion, Event: neorpc.MissedEventID, Payload: make([]interface{}, 0), - }) + } + b, err := json.Marshal(overflowEvent) if err != nil { s.log.Error("fatal: failed to marshal overflow event", zap.Error(err)) return @@ -2649,12 +2735,12 @@ chloop: } } select { - case sub.writer <- msg: + case sub.writer <- intEvent{msg, &resp}: default: sub.overflown.Store(true) // MissedEvent is to be delivered eventually. go func(sub *subscriber) { - sub.writer <- overflowMsg + sub.writer <- intEvent{overflowMsg, &overflowEvent} sub.overflown.Store(false) }(sub) } diff --git a/pkg/services/rpcsrv/subscription.go b/pkg/services/rpcsrv/subscription.go index 85e9a7036..511aed2ef 100644 --- a/pkg/services/rpcsrv/subscription.go +++ b/pkg/services/rpcsrv/subscription.go @@ -7,10 +7,16 @@ import ( ) type ( + // intEvent is an internal event that has both a proper structure and + // a websocket-ready message. It's used to serve websocket-based clients + // as well as internal ones. + intEvent struct { + msg *websocket.PreparedMessage + ntf *neorpc.Notification + } // subscriber is an event subscriber. subscriber struct { - writer chan<- *websocket.PreparedMessage - ws *websocket.Conn + writer chan<- intEvent overflown atomic.Bool // These work like slots as there is not a lot of them (it's // cheaper doing it this way rather than creating a map),