From 4225a21ea52320ab4d1130ca52c71c8459454425 Mon Sep 17 00:00:00 2001 From: Angira Kekteeva Date: Tue, 25 May 2021 11:48:01 +0300 Subject: [PATCH] [#4] *: Add connections, neofs, logger from httpgw Commit in http-gw: a4ad52e181e3653ee43742e2beb8c594c487c8cc Rename connections dir and pkg to pool Signed-off-by: Angira Kekteeva --- pkg/logger/grpc.go | 78 +++++++++++++++++ pkg/logger/option.go | 33 +++++++ pkg/logger/zap.go | 134 +++++++++++++++++++++++++++++ pkg/neofs/client-plant.go | 177 ++++++++++++++++++++++++++++++++++++++ pkg/neofs/credentials.go | 78 +++++++++++++++++ pkg/pool/pool.go | 160 ++++++++++++++++++++++++++++++++++ pkg/pool/sampler.go | 81 +++++++++++++++++ 7 files changed, 741 insertions(+) create mode 100644 pkg/logger/grpc.go create mode 100644 pkg/logger/option.go create mode 100644 pkg/logger/zap.go create mode 100644 pkg/neofs/client-plant.go create mode 100644 pkg/neofs/credentials.go create mode 100644 pkg/pool/pool.go create mode 100644 pkg/pool/sampler.go diff --git a/pkg/logger/grpc.go b/pkg/logger/grpc.go new file mode 100644 index 0000000..584bf25 --- /dev/null +++ b/pkg/logger/grpc.go @@ -0,0 +1,78 @@ +package logger + +import ( + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "google.golang.org/grpc/grpclog" +) + +type ( + zapLogger struct { + zapcore.Core + log *zap.SugaredLogger + } + + // Logger includes grpclog.LoggerV2 interface with an additional + // Println method. + Logger interface { + grpclog.LoggerV2 + Println(v ...interface{}) + } +) + +// GRPC wraps given zap.Logger into grpclog.LoggerV2+ interface. +func GRPC(l *zap.Logger) Logger { + log := l.WithOptions( + // skip gRPCLog + zapLogger in caller + zap.AddCallerSkip(2)) + + return &zapLogger{ + Core: log.Core(), + log: log.Sugar(), + } +} + +// Info implements grpclog.LoggerV2. +func (z *zapLogger) Info(args ...interface{}) { z.log.Info(args...) } + +// Infoln implements grpclog.LoggerV2. +func (z *zapLogger) Infoln(args ...interface{}) { z.log.Info(args...) } + +// Infof implements grpclog.LoggerV2. +func (z *zapLogger) Infof(format string, args ...interface{}) { z.log.Infof(format, args...) } + +// Println allows to print a line with info severity. +func (z *zapLogger) Println(args ...interface{}) { z.log.Info(args...) } + +// Printf implements grpclog.LoggerV2. +func (z *zapLogger) Printf(format string, args ...interface{}) { z.log.Infof(format, args...) } + +// Warning implements grpclog.LoggerV2. +func (z *zapLogger) Warning(args ...interface{}) { z.log.Warn(args...) } + +// Warningln implements grpclog.LoggerV2. +func (z *zapLogger) Warningln(args ...interface{}) { z.log.Warn(args...) } + +// Warningf implements grpclog.LoggerV2. +func (z *zapLogger) Warningf(format string, args ...interface{}) { z.log.Warnf(format, args...) } + +// Error implements grpclog.LoggerV2. +func (z *zapLogger) Error(args ...interface{}) { z.log.Error(args...) } + +// Errorln implements grpclog.LoggerV2. +func (z *zapLogger) Errorln(args ...interface{}) { z.log.Error(args...) } + +// Errorf implements grpclog.LoggerV2. +func (z *zapLogger) Errorf(format string, args ...interface{}) { z.log.Errorf(format, args...) } + +// Fatal implements grpclog.LoggerV2. +func (z *zapLogger) Fatal(args ...interface{}) { z.log.Fatal(args...) } + +// Fatalln implements grpclog.LoggerV2. +func (z *zapLogger) Fatalln(args ...interface{}) { z.log.Fatal(args...) } + +// Fatalf implements grpclog.LoggerV2. +func (z *zapLogger) Fatalf(format string, args ...interface{}) { z.log.Fatalf(format, args...) } + +// V implements grpclog.LoggerV2. +func (z *zapLogger) V(int) bool { return z.Enabled(zapcore.DebugLevel) } diff --git a/pkg/logger/option.go b/pkg/logger/option.go new file mode 100644 index 0000000..f617010 --- /dev/null +++ b/pkg/logger/option.go @@ -0,0 +1,33 @@ +package logger + +import "go.uber.org/zap" + +// WithSamplingInitial returns Option that sets sampling initial parameter. +func WithSamplingInitial(v int) Option { return func(o *options) { o.SamplingInitial = v } } + +// WithSamplingThereafter returns Option that sets sampling thereafter parameter. +func WithSamplingThereafter(v int) Option { return func(o *options) { o.SamplingThereafter = v } } + +// WithFormat returns Option that sets format parameter. +func WithFormat(v string) Option { return func(o *options) { o.Format = v } } + +// WithLevel returns Option that sets Level parameter. +func WithLevel(v string) Option { return func(o *options) { o.Level = v } } + +// WithTraceLevel returns Option that sets trace level parameter. +func WithTraceLevel(v string) Option { return func(o *options) { o.TraceLevel = v } } + +// WithoutDisclaimer returns Option that disables disclaimer. +func WithoutDisclaimer() Option { return func(o *options) { o.NoDisclaimer = true } } + +// WithoutCaller returns Option that disables caller printing. +func WithoutCaller() Option { return func(o *options) { o.NoCaller = true } } + +// WithAppName returns Option that sets application name. +func WithAppName(v string) Option { return func(o *options) { o.AppName = v } } + +// WithAppVersion returns Option that sets application version. +func WithAppVersion(v string) Option { return func(o *options) { o.AppVersion = v } } + +// WithZapOptions returns Option that sets zap logger options. +func WithZapOptions(opts ...zap.Option) Option { return func(o *options) { o.Options = opts } } diff --git a/pkg/logger/zap.go b/pkg/logger/zap.go new file mode 100644 index 0000000..5e9ee62 --- /dev/null +++ b/pkg/logger/zap.go @@ -0,0 +1,134 @@ +package logger + +import ( + "strings" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +type ( + // Option represents logger option setter. + Option func(o *options) + + options struct { + Options []zap.Option + + SamplingInitial int + SamplingThereafter int + + Format string + Level string + TraceLevel string + + NoCaller bool + NoDisclaimer bool + + AppName string + AppVersion string + } +) + +const ( + formatJSON = "json" + formatConsole = "console" + + defaultSamplingInitial = 100 + defaultSamplingThereafter = 100 + + lvlInfo = "info" + lvlWarn = "warn" + lvlDebug = "debug" + lvlError = "error" + lvlFatal = "fatal" + lvlPanic = "panic" +) + +func safeLevel(lvl string) zap.AtomicLevel { + switch strings.ToLower(lvl) { + case lvlDebug: + return zap.NewAtomicLevelAt(zap.DebugLevel) + case lvlWarn: + return zap.NewAtomicLevelAt(zap.WarnLevel) + case lvlError: + return zap.NewAtomicLevelAt(zap.ErrorLevel) + case lvlFatal: + return zap.NewAtomicLevelAt(zap.FatalLevel) + case lvlPanic: + return zap.NewAtomicLevelAt(zap.PanicLevel) + default: + return zap.NewAtomicLevelAt(zap.InfoLevel) + } +} + +func defaults() *options { + return &options{ + SamplingInitial: defaultSamplingInitial, + SamplingThereafter: defaultSamplingThereafter, + + Format: formatConsole, + Level: lvlDebug, + TraceLevel: lvlInfo, + + NoCaller: false, + NoDisclaimer: false, + + AppName: "", + AppVersion: "", + } +} + +// New returns new zap.Logger using all options specified and stdout used +// for output. +func New(opts ...Option) (*zap.Logger, error) { + o := defaults() + c := zap.NewProductionConfig() + + c.OutputPaths = []string{"stdout"} + c.ErrorOutputPaths = []string{"stdout"} + + for _, opt := range opts { + opt(o) + } + + // set sampling + c.Sampling = &zap.SamplingConfig{ + Initial: o.SamplingInitial, + Thereafter: o.SamplingThereafter, + } + + // logger level + c.Level = safeLevel(o.Level) + traceLvl := safeLevel(o.TraceLevel) + + // logger format + switch f := o.Format; strings.ToLower(f) { + case formatConsole: + c.Encoding = formatConsole + default: + c.Encoding = formatJSON + } + + // logger time + c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder + + if o.NoCaller { + c.EncoderConfig.EncodeCaller = nil + } + + // enable trace only for current log-level + o.Options = append(o.Options, zap.AddStacktrace(traceLvl)) + + l, err := c.Build(o.Options...) + if err != nil { + return nil, err + } + + if o.NoDisclaimer { + return l, nil + } + + return l.With( + zap.String("app_name", o.AppName), + zap.String("app_version", o.AppVersion)), nil +} diff --git a/pkg/neofs/client-plant.go b/pkg/neofs/client-plant.go new file mode 100644 index 0000000..e13f29d --- /dev/null +++ b/pkg/neofs/client-plant.go @@ -0,0 +1,177 @@ +package neofs + +import ( + "context" + "crypto/ecdsa" + "io" + + "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/container" + "github.com/nspcc-dev/neofs-api-go/pkg/object" + "github.com/nspcc-dev/neofs-api-go/pkg/owner" + "github.com/nspcc-dev/neofs-api-go/pkg/token" + "github.com/nspcc-dev/neofs-http-gw/connections" +) + +// BaseOptions represents basic NeoFS request options. +type BaseOptions struct { + Client client.Client + SessionToken *token.SessionToken + BearerToken *token.BearerToken +} + +// PutOptions represents NeoFS Put request options. +type PutOptions struct { + BaseOptions + Attributes []*object.Attribute + ContainerID *container.ID + OwnerID *owner.ID + Reader io.Reader +} + +// GetOptions represents NeoFS Get request options. +type GetOptions struct { + BaseOptions + ObjectAddress *object.Address + Writer io.Writer +} + +// SearchOptions represents NeoFS Search request options. +type SearchOptions struct { + BaseOptions + ContainerID *container.ID + Attribute struct { + Key string + Value string + } +} + +// DeleteOptions represents NeoFS Delete request options. +type DeleteOptions struct { + BaseOptions + ObjectAddress *object.Address +} + +// ObjectClient wraps basic NeoFS requests. +type ObjectClient interface { + Put(context.Context, *PutOptions) (*object.Address, error) + Get(context.Context, *GetOptions) (*object.Object, error) + Search(context.Context, *SearchOptions) ([]*object.ID, error) + Delete(context.Context, *DeleteOptions) error +} + +// ClientPlant provides connections to NeoFS nodes from pool and allows to +// get local owner ID. +type ClientPlant interface { + ConnectionArtifacts() (client.Client, *token.SessionToken, error) + Object() ObjectClient + OwnerID() *owner.ID +} + +type neofsObjectClient struct { + key *ecdsa.PrivateKey + pool connections.Pool +} + +type neofsClientPlant struct { + key *ecdsa.PrivateKey + ownerID *owner.ID + pool connections.Pool +} + +// ConnectionArtifacts returns connection from pool. +func (cp *neofsClientPlant) ConnectionArtifacts() (client.Client, *token.SessionToken, error) { + return cp.pool.ConnectionArtifacts() +} + +// Object returns ObjectClient instance from plant. +func (cp *neofsClientPlant) Object() ObjectClient { + return &neofsObjectClient{ + key: cp.key, + pool: cp.pool, + } +} + +// OwnerID returns plant's owner ID. +func (cp *neofsClientPlant) OwnerID() *owner.ID { + return cp.ownerID +} + +// NewClientPlant creates new ClientPlant from given context, pool and credentials. +func NewClientPlant(ctx context.Context, pool connections.Pool, creds Credentials) (ClientPlant, error) { + return &neofsClientPlant{key: creds.PrivateKey(), ownerID: creds.Owner(), pool: pool}, nil +} + +// Put does NeoFS Put request, returning new object address if successful. +func (oc *neofsObjectClient) Put(ctx context.Context, options *PutOptions) (*object.Address, error) { + var ( + err error + objectID *object.ID + ) + address := object.NewAddress() + rawObject := object.NewRaw() + rawObject.SetContainerID(options.ContainerID) + rawObject.SetOwnerID(options.OwnerID) + rawObject.SetAttributes(options.Attributes...) + ops := new(client.PutObjectParams). + WithObject(rawObject.Object()). + WithPayloadReader(options.Reader) + objectID, err = options.Client.PutObject( + ctx, + ops, + client.WithSession(options.SessionToken), + client.WithBearer(options.BearerToken), + ) + if err != nil { + return nil, err + } + address.SetObjectID(objectID) + address.SetContainerID(options.ContainerID) + return address, nil +} + +// Get does NeoFS Get request, returning an object received if successful. +func (oc *neofsObjectClient) Get(ctx context.Context, options *GetOptions) (*object.Object, error) { + var ( + err error + obj *object.Object + ) + ops := new(client.GetObjectParams). + WithAddress(options.ObjectAddress). + WithPayloadWriter(options.Writer) + obj, err = options.Client.GetObject( + ctx, + ops, + client.WithSession(options.SessionToken), + client.WithBearer(options.BearerToken), + ) + return obj, err +} + +// Search does NeoFS Search request, returning object IDs if successful. +func (oc *neofsObjectClient) Search(ctx context.Context, options *SearchOptions) ([]*object.ID, error) { + sfs := object.NewSearchFilters() + sfs.AddRootFilter() + sfs.AddFilter(options.Attribute.Key, options.Attribute.Value, object.MatchStringEqual) + sops := new(client.SearchObjectParams) + sops.WithContainerID(options.ContainerID) + sops.WithSearchFilters(sfs) + return options.Client.SearchObject( + ctx, + sops, + client.WithSession(options.SessionToken), + client.WithBearer(options.BearerToken), + ) +} + +// Delete deletes NeoFS object. +func (oc *neofsObjectClient) Delete(ctx context.Context, options *DeleteOptions) error { + ops := new(client.DeleteObjectParams).WithAddress(options.ObjectAddress) + err := options.Client.DeleteObject( + ctx, + ops, + client.WithSession(options.SessionToken), + client.WithBearer(options.BearerToken), + ) + return err +} diff --git a/pkg/neofs/credentials.go b/pkg/neofs/credentials.go new file mode 100644 index 0000000..c229468 --- /dev/null +++ b/pkg/neofs/credentials.go @@ -0,0 +1,78 @@ +package neofs + +import ( + "crypto/ecdsa" + "crypto/elliptic" + "crypto/rand" + "math/big" + + "github.com/nspcc-dev/neofs-api-go/pkg/owner" + crypto "github.com/nspcc-dev/neofs-crypto" +) + +type ( + // Credentials contains methods that needed to work with NeoFS. + Credentials interface { + Owner() *owner.ID + PublicKey() *ecdsa.PublicKey + PrivateKey() *ecdsa.PrivateKey + } + + credentials struct { + key *ecdsa.PrivateKey + ownerID *owner.ID + } +) + +// NewCredentials creates an instance of Credentials through string +// representation of secret. It allows passing WIF, path, hex-encoded and others. +func NewCredentials(secret string) (Credentials, error) { + key, err := crypto.LoadPrivateKey(secret) + if err != nil { + return nil, err + } + return setFromPrivateKey(key) +} + +// NewEphemeralCredentials creates new private key and Credentials based on that +// key. +func NewEphemeralCredentials() (Credentials, error) { + c := elliptic.P256() + priv, x, y, err := elliptic.GenerateKey(c, rand.Reader) + if err != nil { + return nil, err + } + key := &ecdsa.PrivateKey{ + PublicKey: ecdsa.PublicKey{ + Curve: c, + X: x, + Y: y, + }, + D: new(big.Int).SetBytes(priv), + } + return setFromPrivateKey(key) +} + +// PrivateKey returns ecdsa.PrivateKey. +func (c *credentials) PrivateKey() *ecdsa.PrivateKey { + return c.key +} + +// PublicKey returns ecdsa.PublicKey. +func (c *credentials) PublicKey() *ecdsa.PublicKey { + return &c.key.PublicKey +} + +// Owner returns owner.ID. +func (c *credentials) Owner() *owner.ID { + return c.ownerID +} + +func setFromPrivateKey(key *ecdsa.PrivateKey) (*credentials, error) { + wallet, err := owner.NEO3WalletFromPublicKey(&key.PublicKey) + if err != nil { + return nil, err + } + ownerID := owner.NewIDFromNeo3Wallet(wallet) + return &credentials{key: key, ownerID: ownerID}, nil +} diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go new file mode 100644 index 0000000..3422f75 --- /dev/null +++ b/pkg/pool/pool.go @@ -0,0 +1,160 @@ +package pool + +import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + "math/rand" + "sync" + "time" + + "github.com/nspcc-dev/neofs-api-go/pkg/client" + "github.com/nspcc-dev/neofs-api-go/pkg/token" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" +) + +// PoolBuilderOptions contains options used to build connection pool. +type PoolBuilderOptions struct { + Key *ecdsa.PrivateKey + NodeConnectionTimeout time.Duration + NodeRequestTimeout time.Duration + ClientRebalanceInterval time.Duration + KeepaliveTime time.Duration + KeepaliveTimeout time.Duration + KeepalivePermitWoStream bool + SessionExpirationEpoch uint64 + weights []float64 + connections []*grpc.ClientConn +} + +// PoolBuilder is an interim structure used to collect node addresses/weights and +// build connection pool subsequently. +type PoolBuilder struct { + addresses []string + weights []float64 +} + +// AddNode adds address/weight pair to node PoolBuilder list. +func (pb *PoolBuilder) AddNode(address string, weight float64) *PoolBuilder { + pb.addresses = append(pb.addresses, address) + pb.weights = append(pb.weights, weight) + return pb +} + +// Build creates new pool based on current PoolBuilder state and options. +func (pb *PoolBuilder) Build(ctx context.Context, options *PoolBuilderOptions) (Pool, error) { + if len(pb.addresses) == 0 { + return nil, errors.New("no NeoFS peers configured") + } + totalWeight := 0.0 + for _, w := range pb.weights { + totalWeight += w + } + for i, w := range pb.weights { + pb.weights[i] = w / totalWeight + } + var cons = make([]*grpc.ClientConn, len(pb.addresses)) + for i, address := range pb.addresses { + con, err := func() (*grpc.ClientConn, error) { + toctx, c := context.WithTimeout(ctx, options.NodeConnectionTimeout) + defer c() + return grpc.DialContext(toctx, address, + grpc.WithInsecure(), + grpc.WithBlock(), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: options.KeepaliveTime, + Timeout: options.KeepaliveTimeout, + PermitWithoutStream: options.KeepalivePermitWoStream, + }), + ) + }() + if err != nil { + return nil, err + } + cons[i] = con + } + options.weights = pb.weights + options.connections = cons + return new(ctx, options) +} + +// Pool is an interface providing connection artifacts on request. +type Pool interface { + ConnectionArtifacts() (client.Client, *token.SessionToken, error) +} + +type clientPack struct { + client client.Client + sessionToken *token.SessionToken + healthy bool +} + +type pool struct { + lock sync.RWMutex + sampler *Sampler + clientPacks []*clientPack +} + +func new(ctx context.Context, options *PoolBuilderOptions) (Pool, error) { + clientPacks := make([]*clientPack, len(options.weights)) + for i, con := range options.connections { + c, err := client.New(client.WithDefaultPrivateKey(options.Key), client.WithGRPCConnection(con)) + if err != nil { + return nil, err + } + st, err := c.CreateSession(ctx, options.SessionExpirationEpoch) + if err != nil { + address := "unknown" + if epi, err := c.EndpointInfo(ctx); err == nil { + address = epi.NodeInfo().Address() + } + return nil, fmt.Errorf("failed to create neofs session token for client %s: %w", address, err) + } + clientPacks[i] = &clientPack{client: c, sessionToken: st, healthy: true} + } + source := rand.NewSource(time.Now().UnixNano()) + sampler := NewSampler(options.weights, source) + pool := &pool{sampler: sampler, clientPacks: clientPacks} + go func() { + ticker := time.NewTimer(options.ClientRebalanceInterval) + for range ticker.C { + ok := true + for i, clientPack := range pool.clientPacks { + func() { + tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout) + defer c() + if _, err := clientPack.client.EndpointInfo(tctx); err != nil { + ok = false + } + pool.lock.Lock() + pool.clientPacks[i].healthy = ok + pool.lock.Unlock() + }() + } + ticker.Reset(options.ClientRebalanceInterval) + } + }() + return pool, nil +} + +func (p *pool) ConnectionArtifacts() (client.Client, *token.SessionToken, error) { + p.lock.RLock() + defer p.lock.RUnlock() + if len(p.clientPacks) == 1 { + cp := p.clientPacks[0] + if cp.healthy { + return cp.client, cp.sessionToken, nil + } + return nil, nil, errors.New("no healthy client") + } + attempts := 3 * len(p.clientPacks) + for k := 0; k < attempts; k++ { + i := p.sampler.Next() + if cp := p.clientPacks[i]; cp.healthy { + return cp.client, cp.sessionToken, nil + } + } + return nil, nil, errors.New("no healthy client") +} diff --git a/pkg/pool/sampler.go b/pkg/pool/sampler.go new file mode 100644 index 0000000..33b473c --- /dev/null +++ b/pkg/pool/sampler.go @@ -0,0 +1,81 @@ +package pool + +import "math/rand" + +// Sampler implements weighted random number generation using Vose's Alias +// Method (https://www.keithschwarz.com/darts-dice-coins/). +type Sampler struct { + randomGenerator *rand.Rand + probabilities []float64 + alias []int +} + +// NewSampler creates new Sampler with a given set of probabilities using +// given source of randomness. Created Sampler will produce numbers from +// 0 to len(probabilities). +func NewSampler(probabilities []float64, source rand.Source) *Sampler { + sampler := &Sampler{} + var ( + small workList + large workList + ) + n := len(probabilities) + sampler.randomGenerator = rand.New(source) + sampler.probabilities = make([]float64, n) + sampler.alias = make([]int, n) + // Compute scaled probabilities. + p := make([]float64, n) + for i := 0; i < n; i++ { + p[i] = probabilities[i] * float64(n) + } + for i, pi := range p { + if pi < 1 { + small.add(i) + } else { + large.add(i) + } + } + for len(small) > 0 && len(large) > 0 { + l, g := small.remove(), large.remove() + sampler.probabilities[l] = p[l] + sampler.alias[l] = g + p[g] = p[g] + p[l] - 1 + if p[g] < 1 { + small.add(g) + } else { + large.add(g) + } + } + for len(large) > 0 { + g := large.remove() + sampler.probabilities[g] = 1 + } + for len(small) > 0 { + l := small.remove() + sampler.probabilities[l] = 1 + } + return sampler +} + +// Next returns the next (not so) random number from Sampler. +func (g *Sampler) Next() int { + n := len(g.alias) + i := g.randomGenerator.Intn(n) + if g.randomGenerator.Float64() < g.probabilities[i] { + return i + } + return g.alias[i] +} + +type workList []int + +func (wl *workList) add(e int) { + *wl = append(*wl, e) +} + +func (wl *workList) remove() int { + l := len(*wl) - 1 + n := (*wl)[l] + *wl = (*wl)[:l] + return n +}