forked from TrueCloudLab/frostfs-node
[#1632] node: Subscribe on the successful container creations/removals
There is a need to sync container-related caching mechanism with the actual Sidechain changes. To do this, node should be able to listen incoming notifications about container ops. Define `PutSuccess` / `DeleteSuccess` notification event's parsers. Subscribe to these events in node app. As initial implementation node will log event receipts. Later handling is going to be practically complicated. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
This commit is contained in:
parent
d8a00c365a
commit
7c1babb7d6
5 changed files with 219 additions and 0 deletions
|
@ -63,6 +63,18 @@ func initContainerService(c *cfg) {
|
||||||
neoClient: wrap,
|
neoClient: wrap,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
subscribeToContainerCreation(c, func(e event.Event) {
|
||||||
|
c.log.Debug("container creation event's receipt",
|
||||||
|
zap.Stringer("id", e.(containerEvent.PutSuccess).ID),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
|
subscribeToContainerRemoval(c, func(e event.Event) {
|
||||||
|
c.log.Debug("container removal event's receipt",
|
||||||
|
zap.Stringer("id", e.(containerEvent.DeleteSuccess).ID),
|
||||||
|
)
|
||||||
|
})
|
||||||
|
|
||||||
if c.cfgMorph.disableCache {
|
if c.cfgMorph.disableCache {
|
||||||
c.cfgObject.eaclSource = eACLFetcher
|
c.cfgObject.eaclSource = eACLFetcher
|
||||||
cnrRdr.eacl = eACLFetcher
|
cnrRdr.eacl = eACLFetcher
|
||||||
|
@ -195,6 +207,34 @@ func addContainerAsyncNotificationHandler(c *cfg, sTyp string, h event.Handler)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// stores already registered parsers of the notification events thrown by Container contract.
|
||||||
|
// MUST NOT be used concurrently.
|
||||||
|
var mRegisteredParsersContainer = make(map[string]struct{})
|
||||||
|
|
||||||
|
// registers event parser by name once. MUST NOT be called concurrently.
|
||||||
|
func registerEventParserOnceContainer(c *cfg, name string, p event.NotificationParser) {
|
||||||
|
if _, ok := mRegisteredParsersContainer[name]; !ok {
|
||||||
|
setContainerNotificationParser(c, name, p)
|
||||||
|
mRegisteredParsersContainer[name] = struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// subscribes to successful container creation. Provided handler is called asynchronously
|
||||||
|
// on corresponding routine pool. MUST NOT be called concurrently with itself and other
|
||||||
|
// similar functions.
|
||||||
|
func subscribeToContainerCreation(c *cfg, h event.Handler) {
|
||||||
|
const eventNameContainerCreated = "PutSuccess"
|
||||||
|
registerEventParserOnceContainer(c, eventNameContainerCreated, containerEvent.ParsePutSuccess)
|
||||||
|
addContainerAsyncNotificationHandler(c, eventNameContainerCreated, h)
|
||||||
|
}
|
||||||
|
|
||||||
|
// like subscribeToContainerCreation but for removal.
|
||||||
|
func subscribeToContainerRemoval(c *cfg, h event.Handler) {
|
||||||
|
const eventNameContainerRemoved = "DeleteSuccess"
|
||||||
|
registerEventParserOnceContainer(c, eventNameContainerRemoved, containerEvent.ParseDeleteSuccess)
|
||||||
|
addContainerAsyncNotificationHandler(c, eventNameContainerRemoved, h)
|
||||||
|
}
|
||||||
|
|
||||||
func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationParser) {
|
func setContainerNotificationParser(c *cfg, sTyp string, p event.NotificationParser) {
|
||||||
typ := event.TypeFromString(sTyp)
|
typ := event.TypeFromString(sTyp)
|
||||||
|
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/network/payload"
|
"github.com/nspcc-dev/neo-go/pkg/network/payload"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
|
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||||||
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Delete structure of container.Delete notification from morph chain.
|
// Delete structure of container.Delete notification from morph chain.
|
||||||
|
@ -81,3 +82,42 @@ func ParseDelete(e *state.ContainedNotificationEvent) (event.Event, error) {
|
||||||
|
|
||||||
return ev, nil
|
return ev, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteSuccess structures notification event of successful container removal
|
||||||
|
// thrown by Container contract.
|
||||||
|
type DeleteSuccess struct {
|
||||||
|
// Identifier of the removed container.
|
||||||
|
ID cid.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
// MorphEvent implements Neo:Morph Event interface.
|
||||||
|
func (DeleteSuccess) MorphEvent() {}
|
||||||
|
|
||||||
|
// ParseDeleteSuccess decodes notification event thrown by Container contract into
|
||||||
|
// DeleteSuccess and returns it as event.Event.
|
||||||
|
func ParseDeleteSuccess(e *state.ContainedNotificationEvent) (event.Event, error) {
|
||||||
|
items, err := event.ParseStackArray(e)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parse stack array from raw notification event: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
const expectedItemNumDeleteSuccess = 1
|
||||||
|
|
||||||
|
if ln := len(items); ln != expectedItemNumDeleteSuccess {
|
||||||
|
return nil, event.WrongNumberOfParameters(expectedItemNumDeleteSuccess, ln)
|
||||||
|
}
|
||||||
|
|
||||||
|
binID, err := client.BytesFromStackItem(items[0])
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parse container ID item: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var res DeleteSuccess
|
||||||
|
|
||||||
|
err = res.ID.Decode(binID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("decode container ID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
|
@ -1,10 +1,12 @@
|
||||||
package container
|
package container
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/sha256"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
|
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||||||
|
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -67,3 +69,46 @@ func TestParseDelete(t *testing.T) {
|
||||||
}, ev)
|
}, ev)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestParseDeleteSuccess(t *testing.T) {
|
||||||
|
t.Run("wrong number of parameters", func(t *testing.T) {
|
||||||
|
prms := []stackitem.Item{
|
||||||
|
stackitem.NewMap(),
|
||||||
|
stackitem.NewMap(),
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := ParseDeleteSuccess(createNotifyEventFromItems(prms))
|
||||||
|
require.EqualError(t, err, event.WrongNumberOfParameters(1, len(prms)).Error())
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("wrong container parameter", func(t *testing.T) {
|
||||||
|
_, err := ParseDeleteSuccess(createNotifyEventFromItems([]stackitem.Item{
|
||||||
|
stackitem.NewMap(),
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.Error(t, err)
|
||||||
|
|
||||||
|
_, err = ParseDeleteSuccess(createNotifyEventFromItems([]stackitem.Item{
|
||||||
|
stackitem.NewByteArray([]byte{1, 2, 3}),
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("correct behavior", func(t *testing.T) {
|
||||||
|
id := cidtest.ID()
|
||||||
|
|
||||||
|
binID := make([]byte, sha256.Size)
|
||||||
|
id.Encode(binID)
|
||||||
|
|
||||||
|
ev, err := ParseDeleteSuccess(createNotifyEventFromItems([]stackitem.Item{
|
||||||
|
stackitem.NewByteArray(binID),
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, DeleteSuccess{
|
||||||
|
ID: id,
|
||||||
|
}, ev)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
|
@ -7,6 +7,7 @@ import (
|
||||||
"github.com/nspcc-dev/neo-go/pkg/network/payload"
|
"github.com/nspcc-dev/neo-go/pkg/network/payload"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
|
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||||||
|
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Put structure of container.Put notification from morph chain.
|
// Put structure of container.Put notification from morph chain.
|
||||||
|
@ -106,3 +107,47 @@ func ParsePut(e *state.ContainedNotificationEvent) (event.Event, error) {
|
||||||
|
|
||||||
return ev, nil
|
return ev, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PutSuccess structures notification event of successful container creation
|
||||||
|
// thrown by Container contract.
|
||||||
|
type PutSuccess struct {
|
||||||
|
// Identifier of the newly created container.
|
||||||
|
ID cid.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
// MorphEvent implements Neo:Morph Event interface.
|
||||||
|
func (PutSuccess) MorphEvent() {}
|
||||||
|
|
||||||
|
// ParsePutSuccess decodes notification event thrown by Container contract into
|
||||||
|
// PutSuccess and returns it as event.Event.
|
||||||
|
func ParsePutSuccess(e *state.ContainedNotificationEvent) (event.Event, error) {
|
||||||
|
items, err := event.ParseStackArray(e)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parse stack array from raw notification event: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
const expectedItemNumPutSuccess = 2
|
||||||
|
|
||||||
|
if ln := len(items); ln != expectedItemNumPutSuccess {
|
||||||
|
return nil, event.WrongNumberOfParameters(expectedItemNumPutSuccess, ln)
|
||||||
|
}
|
||||||
|
|
||||||
|
binID, err := client.BytesFromStackItem(items[0])
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parse container ID item: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = client.BytesFromStackItem(items[1])
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parse public key item: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var res PutSuccess
|
||||||
|
|
||||||
|
err = res.ID.Decode(binID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("decode container ID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, nil
|
||||||
|
}
|
||||||
|
|
|
@ -1,10 +1,12 @@
|
||||||
package container
|
package container
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"crypto/sha256"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
|
"github.com/nspcc-dev/neo-go/pkg/vm/stackitem"
|
||||||
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
|
||||||
|
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -81,3 +83,50 @@ func TestParsePut(t *testing.T) {
|
||||||
}, ev)
|
}, ev)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestParsePutSuccess(t *testing.T) {
|
||||||
|
t.Run("wrong number of parameters", func(t *testing.T) {
|
||||||
|
prms := []stackitem.Item{
|
||||||
|
stackitem.NewMap(),
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := ParsePutSuccess(createNotifyEventFromItems(prms))
|
||||||
|
require.EqualError(t, err, event.WrongNumberOfParameters(2, len(prms)).Error())
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("wrong container ID parameter", func(t *testing.T) {
|
||||||
|
_, err := ParsePutSuccess(createNotifyEventFromItems([]stackitem.Item{
|
||||||
|
stackitem.NewMap(),
|
||||||
|
stackitem.NewMap(),
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
id := cidtest.ID()
|
||||||
|
|
||||||
|
binID := make([]byte, sha256.Size)
|
||||||
|
id.Encode(binID)
|
||||||
|
|
||||||
|
t.Run("wrong public key parameter", func(t *testing.T) {
|
||||||
|
_, err := ParsePutSuccess(createNotifyEventFromItems([]stackitem.Item{
|
||||||
|
stackitem.NewByteArray(binID),
|
||||||
|
stackitem.NewMap(),
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.Error(t, err)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("correct behavior", func(t *testing.T) {
|
||||||
|
ev, err := ParsePutSuccess(createNotifyEventFromItems([]stackitem.Item{
|
||||||
|
stackitem.NewByteArray(binID),
|
||||||
|
stackitem.NewByteArray([]byte("key")),
|
||||||
|
}))
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, PutSuccess{
|
||||||
|
ID: id,
|
||||||
|
}, ev)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue