[#195] Add Lock tick listening

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-03-05 09:58:54 +03:00 committed by Angira Kekteeva
parent 32e83db064
commit 4c3c4b6bee
3 changed files with 121 additions and 11 deletions

View file

@ -9,18 +9,19 @@ import (
"net/url" "net/url"
"strings" "strings"
"github.com/nats-io/nats.go"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neofs-s3-gw/api" "github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/cache" "github.com/nspcc-dev/neofs-s3-gw/api/cache"
"github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/data"
"github.com/nspcc-dev/neofs-s3-gw/api/errors" "github.com/nspcc-dev/neofs-s3-gw/api/errors"
"github.com/nspcc-dev/neofs-s3-gw/api/layer/neofs" "github.com/nspcc-dev/neofs-s3-gw/api/layer/neofs"
"github.com/nspcc-dev/neofs-s3-gw/api/notifications"
"github.com/nspcc-dev/neofs-s3-gw/api/resolver" "github.com/nspcc-dev/neofs-s3-gw/api/resolver"
"github.com/nspcc-dev/neofs-s3-gw/creds/accessbox" "github.com/nspcc-dev/neofs-s3-gw/creds/accessbox"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id" cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/eacl"
"github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object/address"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"github.com/nspcc-dev/neofs-sdk-go/owner" "github.com/nspcc-dev/neofs-sdk-go/owner"
"github.com/nspcc-dev/neofs-sdk-go/pool" "github.com/nspcc-dev/neofs-sdk-go/pool"
@ -29,12 +30,23 @@ import (
) )
type ( type (
Notificator interface {
Subscribe(context.Context, string, MsgHandler) error
Listen(context.Context)
}
MsgHandler interface {
HandleMessage(context.Context, *nats.Msg) error
}
MsgHandlerFunc func(context.Context, *nats.Msg) error
layer struct { layer struct {
neoFS neofs.NeoFS neoFS neofs.NeoFS
log *zap.Logger log *zap.Logger
anonKey AnonymousKey anonKey AnonymousKey
resolver *resolver.BucketResolver resolver *resolver.BucketResolver
ncontroller *notifications.Controller ncontroller Notificator
listsCache *cache.ObjectsListCache listsCache *cache.ObjectsListCache
objCache *cache.ObjectsCache objCache *cache.ObjectsCache
namesCache *cache.ObjectsNameCache namesCache *cache.ObjectsNameCache
@ -47,7 +59,6 @@ type (
Caches *CachesConfig Caches *CachesConfig
AnonKey AnonymousKey AnonKey AnonymousKey
Resolver *resolver.BucketResolver Resolver *resolver.BucketResolver
NotificationController *notifications.Controller
} }
// AnonymousKey contains data for anonymous requests. // AnonymousKey contains data for anonymous requests.
@ -174,6 +185,7 @@ type (
// Client provides S3 API client interface. // Client provides S3 API client interface.
Client interface { Client interface {
Initialize(ctx context.Context, c Notificator) error
EphemeralKey() *keys.PublicKey EphemeralKey() *keys.PublicKey
GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error)
@ -234,6 +246,10 @@ func (t *VersionedObject) String() string {
return t.Name + ":" + t.VersionID return t.Name + ":" + t.VersionID
} }
func (f MsgHandlerFunc) HandleMessage(ctx context.Context, msg *nats.Msg) error {
return f(ctx, msg)
}
// DefaultCachesConfigs returns filled configs. // DefaultCachesConfigs returns filled configs.
func DefaultCachesConfigs() *CachesConfig { func DefaultCachesConfigs() *CachesConfig {
return &CachesConfig{ return &CachesConfig{
@ -254,7 +270,6 @@ func NewLayer(log *zap.Logger, neoFS neofs.NeoFS, config *Config) Client {
anonKey: config.AnonKey, anonKey: config.AnonKey,
resolver: config.Resolver, resolver: config.Resolver,
listsCache: cache.NewObjectsListCache(config.Caches.ObjectsList), listsCache: cache.NewObjectsListCache(config.Caches.ObjectsList),
ncontroller: config.NotificationController,
objCache: cache.New(config.Caches.Objects), objCache: cache.New(config.Caches.Objects),
namesCache: cache.NewObjectsNameCache(config.Caches.Names), namesCache: cache.NewObjectsNameCache(config.Caches.Names),
bucketCache: cache.NewBucketCache(config.Caches.Buckets), bucketCache: cache.NewBucketCache(config.Caches.Buckets),
@ -266,10 +281,40 @@ func (n *layer) EphemeralKey() *keys.PublicKey {
return n.anonKey.Key.PublicKey() return n.anonKey.Key.PublicKey()
} }
func (n *layer) Initialize(ctx context.Context, c Notificator) error {
if c == nil {
return nil
}
if n.IsNotificationEnabled() {
return fmt.Errorf("already initialized")
}
if err := c.Subscribe(ctx, "lock", MsgHandlerFunc(n.handleLockTick)); err != nil {
return fmt.Errorf("couldn't initialize layer: %w", err)
}
c.Listen(ctx)
n.ncontroller = c
return nil
}
func (n *layer) IsNotificationEnabled() bool { func (n *layer) IsNotificationEnabled() bool {
return n.ncontroller != nil return n.ncontroller != nil
} }
func (n *layer) handleLockTick(ctx context.Context, msg *nats.Msg) error {
addr := address.NewAddress()
if err := addr.Parse(string(msg.Data)); err != nil {
return fmt.Errorf("invalid msg, address expected: %w", err)
}
// todo clear cache
// and make sure having right access
return n.objectDelete(ctx, addr.ContainerID(), addr.ObjectID())
}
// IsAuthenticatedRequest check if access box exists in current request. // IsAuthenticatedRequest check if access box exists in current request.
func IsAuthenticatedRequest(ctx context.Context) bool { func IsAuthenticatedRequest(ctx context.Context) bool {
_, ok := ctx.Value(api.BoxData).(*accessbox.Box) _, ok := ctx.Value(api.BoxData).(*accessbox.Box)

View file

@ -1,9 +1,13 @@
package notifications package notifications
import ( import (
"context"
"fmt"
"sync"
"time" "time"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
) )
const ( const (
@ -20,7 +24,14 @@ type Options struct {
type Controller struct { type Controller struct {
taskQueueConnection *nats.Conn taskQueueConnection *nats.Conn
jsClient nats.JetStream jsClient nats.JetStreamContext
handlers map[string]Stream
mu sync.RWMutex
}
type Stream struct {
h layer.MsgHandler
ch chan *nats.Msg
} }
func NewController(p *Options) (*Controller, error) { func NewController(p *Options) (*Controller, error) {
@ -52,5 +63,56 @@ func NewController(p *Options) (*Controller, error) {
return &Controller{ return &Controller{
taskQueueConnection: nc, taskQueueConnection: nc,
jsClient: js, jsClient: js,
handlers: make(map[string]Stream),
}, nil }, nil
} }
func (c *Controller) Subscribe(ctx context.Context, topic string, handler layer.MsgHandler) error {
ch := make(chan *nats.Msg, 1)
c.mu.RLock()
if _, ok := c.handlers[topic]; ok {
return fmt.Errorf("already subscribed to topic '%s'", topic)
}
c.mu.RUnlock()
if _, err := c.jsClient.AddStream(&nats.StreamConfig{Name: topic}); err != nil {
return err
}
if _, err := c.jsClient.ChanSubscribe(topic, ch); err != nil {
return fmt.Errorf("could not subscribe: %w", err)
}
c.mu.Lock()
c.handlers[topic] = Stream{
h: handler,
ch: ch,
}
c.mu.Unlock()
return nil
}
func (c *Controller) Listen(ctx context.Context) {
c.mu.RLock()
defer c.mu.RUnlock()
for _, stream := range c.handlers {
go func(stream Stream) {
for {
select {
case msg := <-stream.ch:
fmt.Printf("got message: %s\n", msg.Data)
if err := stream.h.HandleMessage(ctx, msg); err != nil {
fmt.Printf("could not handle message: %s", err)
} else if err = msg.Ack(); err != nil {
fmt.Printf("could not ACK message: %s", err)
}
case <-ctx.Done():
return
}
}
}(stream)
}
}

View file

@ -155,12 +155,15 @@ func newApp(ctx context.Context, l *zap.Logger, v *viper.Viper) *App {
Key: randomKey, Key: randomKey,
}, },
Resolver: bucketResolver, Resolver: bucketResolver,
NotificationController: nc,
} }
// prepare object layer // prepare object layer
obj = layer.NewLayer(l, &layerNeoFS{&neoFS}, layerCfg) obj = layer.NewLayer(l, &layerNeoFS{&neoFS}, layerCfg)
if err = obj.Initialize(ctx, nc); err != nil {
l.Fatal("couldn't initialize layer", zap.Error(err))
}
// prepare auth center // prepare auth center
ctr = auth.New(&neofs.AuthmateNeoFS{ ctr = auth.New(&neofs.AuthmateNeoFS{
NeoFS: &neoFS, NeoFS: &neoFS,