From 4c3c4b6bee46932ba9b9f80a951719ae36850448 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Sat, 5 Mar 2022 09:58:54 +0300 Subject: [PATCH] [#195] Add Lock tick listening Signed-off-by: Denis Kirillov --- api/layer/layer.go | 61 ++++++++++++++++++++++++++----- api/notifications/controller.go | 64 ++++++++++++++++++++++++++++++++- cmd/s3-gw/app.go | 7 ++-- 3 files changed, 121 insertions(+), 11 deletions(-) diff --git a/api/layer/layer.go b/api/layer/layer.go index a257a58b..21abcf7d 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -9,18 +9,19 @@ import ( "net/url" "strings" + "github.com/nats-io/nats.go" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neofs-s3-gw/api" "github.com/nspcc-dev/neofs-s3-gw/api/cache" "github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/errors" "github.com/nspcc-dev/neofs-s3-gw/api/layer/neofs" - "github.com/nspcc-dev/neofs-s3-gw/api/notifications" "github.com/nspcc-dev/neofs-s3-gw/api/resolver" "github.com/nspcc-dev/neofs-s3-gw/creds/accessbox" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/nspcc-dev/neofs-sdk-go/object/address" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/owner" "github.com/nspcc-dev/neofs-sdk-go/pool" @@ -29,12 +30,23 @@ import ( ) type ( + Notificator interface { + Subscribe(context.Context, string, MsgHandler) error + Listen(context.Context) + } + + MsgHandler interface { + HandleMessage(context.Context, *nats.Msg) error + } + + MsgHandlerFunc func(context.Context, *nats.Msg) error + layer struct { neoFS neofs.NeoFS log *zap.Logger anonKey AnonymousKey resolver *resolver.BucketResolver - ncontroller *notifications.Controller + ncontroller Notificator listsCache *cache.ObjectsListCache objCache *cache.ObjectsCache namesCache *cache.ObjectsNameCache @@ -43,11 +55,10 @@ type ( } Config struct { - ChainAddress string - Caches *CachesConfig - AnonKey AnonymousKey - Resolver *resolver.BucketResolver - NotificationController *notifications.Controller + ChainAddress string + Caches *CachesConfig + AnonKey AnonymousKey + Resolver *resolver.BucketResolver } // AnonymousKey contains data for anonymous requests. @@ -174,6 +185,7 @@ type ( // Client provides S3 API client interface. Client interface { + Initialize(ctx context.Context, c Notificator) error EphemeralKey() *keys.PublicKey GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) @@ -234,6 +246,10 @@ func (t *VersionedObject) String() string { return t.Name + ":" + t.VersionID } +func (f MsgHandlerFunc) HandleMessage(ctx context.Context, msg *nats.Msg) error { + return f(ctx, msg) +} + // DefaultCachesConfigs returns filled configs. func DefaultCachesConfigs() *CachesConfig { return &CachesConfig{ @@ -254,7 +270,6 @@ func NewLayer(log *zap.Logger, neoFS neofs.NeoFS, config *Config) Client { anonKey: config.AnonKey, resolver: config.Resolver, listsCache: cache.NewObjectsListCache(config.Caches.ObjectsList), - ncontroller: config.NotificationController, objCache: cache.New(config.Caches.Objects), namesCache: cache.NewObjectsNameCache(config.Caches.Names), bucketCache: cache.NewBucketCache(config.Caches.Buckets), @@ -266,10 +281,40 @@ func (n *layer) EphemeralKey() *keys.PublicKey { return n.anonKey.Key.PublicKey() } +func (n *layer) Initialize(ctx context.Context, c Notificator) error { + if c == nil { + return nil + } + + if n.IsNotificationEnabled() { + return fmt.Errorf("already initialized") + } + + if err := c.Subscribe(ctx, "lock", MsgHandlerFunc(n.handleLockTick)); err != nil { + return fmt.Errorf("couldn't initialize layer: %w", err) + } + + c.Listen(ctx) + + n.ncontroller = c + return nil +} + func (n *layer) IsNotificationEnabled() bool { return n.ncontroller != nil } +func (n *layer) handleLockTick(ctx context.Context, msg *nats.Msg) error { + addr := address.NewAddress() + if err := addr.Parse(string(msg.Data)); err != nil { + return fmt.Errorf("invalid msg, address expected: %w", err) + } + + // todo clear cache + // and make sure having right access + return n.objectDelete(ctx, addr.ContainerID(), addr.ObjectID()) +} + // IsAuthenticatedRequest check if access box exists in current request. func IsAuthenticatedRequest(ctx context.Context) bool { _, ok := ctx.Value(api.BoxData).(*accessbox.Box) diff --git a/api/notifications/controller.go b/api/notifications/controller.go index 06c34c8f..fb6da671 100644 --- a/api/notifications/controller.go +++ b/api/notifications/controller.go @@ -1,9 +1,13 @@ package notifications import ( + "context" + "fmt" + "sync" "time" "github.com/nats-io/nats.go" + "github.com/nspcc-dev/neofs-s3-gw/api/layer" ) const ( @@ -20,7 +24,14 @@ type Options struct { type Controller struct { taskQueueConnection *nats.Conn - jsClient nats.JetStream + jsClient nats.JetStreamContext + handlers map[string]Stream + mu sync.RWMutex +} + +type Stream struct { + h layer.MsgHandler + ch chan *nats.Msg } func NewController(p *Options) (*Controller, error) { @@ -52,5 +63,56 @@ func NewController(p *Options) (*Controller, error) { return &Controller{ taskQueueConnection: nc, jsClient: js, + handlers: make(map[string]Stream), }, nil } + +func (c *Controller) Subscribe(ctx context.Context, topic string, handler layer.MsgHandler) error { + ch := make(chan *nats.Msg, 1) + + c.mu.RLock() + if _, ok := c.handlers[topic]; ok { + return fmt.Errorf("already subscribed to topic '%s'", topic) + } + c.mu.RUnlock() + + if _, err := c.jsClient.AddStream(&nats.StreamConfig{Name: topic}); err != nil { + return err + } + + if _, err := c.jsClient.ChanSubscribe(topic, ch); err != nil { + return fmt.Errorf("could not subscribe: %w", err) + } + + c.mu.Lock() + c.handlers[topic] = Stream{ + h: handler, + ch: ch, + } + c.mu.Unlock() + + return nil +} + +func (c *Controller) Listen(ctx context.Context) { + c.mu.RLock() + defer c.mu.RUnlock() + + for _, stream := range c.handlers { + go func(stream Stream) { + for { + select { + case msg := <-stream.ch: + fmt.Printf("got message: %s\n", msg.Data) + if err := stream.h.HandleMessage(ctx, msg); err != nil { + fmt.Printf("could not handle message: %s", err) + } else if err = msg.Ack(); err != nil { + fmt.Printf("could not ACK message: %s", err) + } + case <-ctx.Done(): + return + } + } + }(stream) + } +} diff --git a/cmd/s3-gw/app.go b/cmd/s3-gw/app.go index 6b95b7bb..79a482b2 100644 --- a/cmd/s3-gw/app.go +++ b/cmd/s3-gw/app.go @@ -154,13 +154,16 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App { AnonKey: layer.AnonymousKey{ Key: randomKey, }, - Resolver: bucketResolver, - NotificationController: nc, + Resolver: bucketResolver, } // prepare object layer obj = layer.NewLayer(l, &layerNeoFS{&neoFS}, layerCfg) + if err = obj.Initialize(ctx, nc); err != nil { + l.Fatal("couldn't initialize layer", zap.Error(err)) + } + // prepare auth center ctr = auth.New(&neofs.AuthmateNeoFS{ NeoFS: &neoFS,