From 7c1babb7d6536a7459cfcd0dab12d615d4664cae Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 11 Aug 2022 17:24:26 +0400 Subject: [PATCH] [#1632] node: Subscribe on the successful container creations/removals There is a need to sync container-related caching mechanism with the actual Sidechain changes. To do this, node should be able to listen incoming notifications about container ops. Define `PutSuccess` / `DeleteSuccess` notification event's parsers. Subscribe to these events in node app. As initial implementation node will log event receipts. Later handling is going to be practically complicated. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/container.go | 40 +++++++++++++++++++ pkg/morph/event/container/delete.go | 40 +++++++++++++++++++ pkg/morph/event/container/delete_test.go | 45 ++++++++++++++++++++++ pkg/morph/event/container/put.go | 45 ++++++++++++++++++++++ pkg/morph/event/container/put_test.go | 49 ++++++++++++++++++++++++ 5 files changed, 219 insertions(+) diff --git a/cmd/neofs-node/container.go b/cmd/neofs-node/container.go index d00c2ee0f..454fb8ed8 100644 --- a/cmd/neofs-node/container.go +++ b/cmd/neofs-node/container.go @@ -63,6 +63,18 @@ func initContainerService(c *cfg) { neoClient: wrap, } + subscribeToContainerCreation(c, func(e event.Event) { + c.log.Debug("container creation event's receipt", + zap.Stringer("id", e.(containerEvent.PutSuccess).ID), + ) + }) + + subscribeToContainerRemoval(c, func(e event.Event) { + c.log.Debug("container removal event's receipt", + zap.Stringer("id", e.(containerEvent.DeleteSuccess).ID), + ) + }) + if c.cfgMorph.disableCache { c.cfgObject.eaclSource = eACLFetcher cnrRdr.eacl = eACLFetcher @@ -195,6 +207,34 @@ func addContainerAsyncNotificationHandler(c *cfg, sTyp string, h event.Handler) ) } +// stores already registered parsers of the notification events thrown by Container contract. +// MUST NOT be used concurrently. +var mRegisteredParsersContainer = make(map[string]struct{}) + +// registers event parser by name once. MUST NOT be called concurrently. +func registerEventParserOnceContainer(c *cfg, name string, p event.NotificationParser) { + if _, ok := mRegisteredParsersContainer[name]; !ok { + setContainerNotificationParser(c, name, p) + mRegisteredParsersContainer[name] = struct{}{} + } +} + +// subscribes to successful container creation. Provided handler is called asynchronously +// on corresponding routine pool. MUST NOT be called concurrently with itself and other +// similar functions. +func subscribeToContainerCreation(c *cfg, h event.Handler) { + const eventNameContainerCreated = "PutSuccess" + registerEventParserOnceContainer(c, eventNameContainerCreated, containerEvent.ParsePutSuccess) + addContainerAsyncNotificationHandler(c, eventNameContainerCreated, h) +} + +// like subscribeToContainerCreation but for removal. +func subscribeToContainerRemoval(c *cfg, h event.Handler) { + const eventNameContainerRemoved = "DeleteSuccess" + registerEventParserOnceContainer(c, eventNameContainerRemoved, containerEvent.ParseDeleteSuccess) + addContainerAsyncNotificationHandler(c, eventNameContainerRemoved, h) +} + func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationParser) { typ := event.TypeFromString(sTyp) diff --git a/pkg/morph/event/container/delete.go b/pkg/morph/event/container/delete.go index a37cf333b..863a99ce7 100644 --- a/pkg/morph/event/container/delete.go +++ b/pkg/morph/event/container/delete.go @@ -7,6 +7,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/nspcc-dev/neofs-node/pkg/morph/event" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" ) // Delete structure of container.Delete notification from morph chain. @@ -81,3 +82,42 @@ func ParseDelete(e *state.ContainedNotificationEvent) (event.Event, error) { return ev, nil } + +// DeleteSuccess structures notification event of successful container removal +// thrown by Container contract. +type DeleteSuccess struct { + // Identifier of the removed container. + ID cid.ID +} + +// MorphEvent implements Neo:Morph Event interface. +func (DeleteSuccess) MorphEvent() {} + +// ParseDeleteSuccess decodes notification event thrown by Container contract into +// DeleteSuccess and returns it as event.Event. +func ParseDeleteSuccess(e *state.ContainedNotificationEvent) (event.Event, error) { + items, err := event.ParseStackArray(e) + if err != nil { + return nil, fmt.Errorf("parse stack array from raw notification event: %w", err) + } + + const expectedItemNumDeleteSuccess = 1 + + if ln := len(items); ln != expectedItemNumDeleteSuccess { + return nil, event.WrongNumberOfParameters(expectedItemNumDeleteSuccess, ln) + } + + binID, err := client.BytesFromStackItem(items[0]) + if err != nil { + return nil, fmt.Errorf("parse container ID item: %w", err) + } + + var res DeleteSuccess + + err = res.ID.Decode(binID) + if err != nil { + return nil, fmt.Errorf("decode container ID: %w", err) + } + + return res, nil +} diff --git a/pkg/morph/event/container/delete_test.go b/pkg/morph/event/container/delete_test.go index ea808b3e7..7ca10331a 100644 --- a/pkg/morph/event/container/delete_test.go +++ b/pkg/morph/event/container/delete_test.go @@ -1,10 +1,12 @@ package container import ( + "crypto/sha256" "testing" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "github.com/nspcc-dev/neofs-node/pkg/morph/event" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" "github.com/stretchr/testify/require" ) @@ -67,3 +69,46 @@ func TestParseDelete(t *testing.T) { }, ev) }) } + +func TestParseDeleteSuccess(t *testing.T) { + t.Run("wrong number of parameters", func(t *testing.T) { + prms := []stackitem.Item{ + stackitem.NewMap(), + stackitem.NewMap(), + } + + _, err := ParseDeleteSuccess(createNotifyEventFromItems(prms)) + require.EqualError(t, err, event.WrongNumberOfParameters(1, len(prms)).Error()) + }) + + t.Run("wrong container parameter", func(t *testing.T) { + _, err := ParseDeleteSuccess(createNotifyEventFromItems([]stackitem.Item{ + stackitem.NewMap(), + })) + + require.Error(t, err) + + _, err = ParseDeleteSuccess(createNotifyEventFromItems([]stackitem.Item{ + stackitem.NewByteArray([]byte{1, 2, 3}), + })) + + require.Error(t, err) + }) + + t.Run("correct behavior", func(t *testing.T) { + id := cidtest.ID() + + binID := make([]byte, sha256.Size) + id.Encode(binID) + + ev, err := ParseDeleteSuccess(createNotifyEventFromItems([]stackitem.Item{ + stackitem.NewByteArray(binID), + })) + + require.NoError(t, err) + + require.Equal(t, DeleteSuccess{ + ID: id, + }, ev) + }) +} diff --git a/pkg/morph/event/container/put.go b/pkg/morph/event/container/put.go index abf177cd2..51f1f4aa2 100644 --- a/pkg/morph/event/container/put.go +++ b/pkg/morph/event/container/put.go @@ -7,6 +7,7 @@ import ( "github.com/nspcc-dev/neo-go/pkg/network/payload" "github.com/nspcc-dev/neofs-node/pkg/morph/client" "github.com/nspcc-dev/neofs-node/pkg/morph/event" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" ) // Put structure of container.Put notification from morph chain. @@ -106,3 +107,47 @@ func ParsePut(e *state.ContainedNotificationEvent) (event.Event, error) { return ev, nil } + +// PutSuccess structures notification event of successful container creation +// thrown by Container contract. +type PutSuccess struct { + // Identifier of the newly created container. + ID cid.ID +} + +// MorphEvent implements Neo:Morph Event interface. +func (PutSuccess) MorphEvent() {} + +// ParsePutSuccess decodes notification event thrown by Container contract into +// PutSuccess and returns it as event.Event. +func ParsePutSuccess(e *state.ContainedNotificationEvent) (event.Event, error) { + items, err := event.ParseStackArray(e) + if err != nil { + return nil, fmt.Errorf("parse stack array from raw notification event: %w", err) + } + + const expectedItemNumPutSuccess = 2 + + if ln := len(items); ln != expectedItemNumPutSuccess { + return nil, event.WrongNumberOfParameters(expectedItemNumPutSuccess, ln) + } + + binID, err := client.BytesFromStackItem(items[0]) + if err != nil { + return nil, fmt.Errorf("parse container ID item: %w", err) + } + + _, err = client.BytesFromStackItem(items[1]) + if err != nil { + return nil, fmt.Errorf("parse public key item: %w", err) + } + + var res PutSuccess + + err = res.ID.Decode(binID) + if err != nil { + return nil, fmt.Errorf("decode container ID: %w", err) + } + + return res, nil +} diff --git a/pkg/morph/event/container/put_test.go b/pkg/morph/event/container/put_test.go index e53bfd916..4f55e5ad8 100644 --- a/pkg/morph/event/container/put_test.go +++ b/pkg/morph/event/container/put_test.go @@ -1,10 +1,12 @@ package container import ( + "crypto/sha256" "testing" "github.com/nspcc-dev/neo-go/pkg/vm/stackitem" "github.com/nspcc-dev/neofs-node/pkg/morph/event" + cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" "github.com/stretchr/testify/require" ) @@ -81,3 +83,50 @@ func TestParsePut(t *testing.T) { }, ev) }) } + +func TestParsePutSuccess(t *testing.T) { + t.Run("wrong number of parameters", func(t *testing.T) { + prms := []stackitem.Item{ + stackitem.NewMap(), + } + + _, err := ParsePutSuccess(createNotifyEventFromItems(prms)) + require.EqualError(t, err, event.WrongNumberOfParameters(2, len(prms)).Error()) + }) + + t.Run("wrong container ID parameter", func(t *testing.T) { + _, err := ParsePutSuccess(createNotifyEventFromItems([]stackitem.Item{ + stackitem.NewMap(), + stackitem.NewMap(), + })) + + require.Error(t, err) + }) + + id := cidtest.ID() + + binID := make([]byte, sha256.Size) + id.Encode(binID) + + t.Run("wrong public key parameter", func(t *testing.T) { + _, err := ParsePutSuccess(createNotifyEventFromItems([]stackitem.Item{ + stackitem.NewByteArray(binID), + stackitem.NewMap(), + })) + + require.Error(t, err) + }) + + t.Run("correct behavior", func(t *testing.T) { + ev, err := ParsePutSuccess(createNotifyEventFromItems([]stackitem.Item{ + stackitem.NewByteArray(binID), + stackitem.NewByteArray([]byte("key")), + })) + + require.NoError(t, err) + + require.Equal(t, PutSuccess{ + ID: id, + }, ev) + }) +}