[#4] *: Add connections, neofs, logger from httpgw

Commit in http-gw: a4ad52e181e3653ee43742e2beb8c594c487c8cc
Rename connections dir and pkg to pool

Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
remotes/fyrchik/master
Angira Kekteeva 2021-05-25 11:48:01 +03:00
parent 9dee13f0d5
commit 4225a21ea5
7 changed files with 741 additions and 0 deletions

78
pkg/logger/grpc.go 100644
View File

@ -0,0 +1,78 @@
package logger
import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc/grpclog"
)
type (
zapLogger struct {
zapcore.Core
log *zap.SugaredLogger
}
// Logger includes grpclog.LoggerV2 interface with an additional
// Println method.
Logger interface {
grpclog.LoggerV2
Println(v ...interface{})
}
)
// GRPC wraps given zap.Logger into grpclog.LoggerV2+ interface.
func GRPC(l *zap.Logger) Logger {
log := l.WithOptions(
// skip gRPCLog + zapLogger in caller
zap.AddCallerSkip(2))
return &zapLogger{
Core: log.Core(),
log: log.Sugar(),
}
}
// Info implements grpclog.LoggerV2.
func (z *zapLogger) Info(args ...interface{}) { z.log.Info(args...) }
// Infoln implements grpclog.LoggerV2.
func (z *zapLogger) Infoln(args ...interface{}) { z.log.Info(args...) }
// Infof implements grpclog.LoggerV2.
func (z *zapLogger) Infof(format string, args ...interface{}) { z.log.Infof(format, args...) }
// Println allows to print a line with info severity.
func (z *zapLogger) Println(args ...interface{}) { z.log.Info(args...) }
// Printf implements grpclog.LoggerV2.
func (z *zapLogger) Printf(format string, args ...interface{}) { z.log.Infof(format, args...) }
// Warning implements grpclog.LoggerV2.
func (z *zapLogger) Warning(args ...interface{}) { z.log.Warn(args...) }
// Warningln implements grpclog.LoggerV2.
func (z *zapLogger) Warningln(args ...interface{}) { z.log.Warn(args...) }
// Warningf implements grpclog.LoggerV2.
func (z *zapLogger) Warningf(format string, args ...interface{}) { z.log.Warnf(format, args...) }
// Error implements grpclog.LoggerV2.
func (z *zapLogger) Error(args ...interface{}) { z.log.Error(args...) }
// Errorln implements grpclog.LoggerV2.
func (z *zapLogger) Errorln(args ...interface{}) { z.log.Error(args...) }
// Errorf implements grpclog.LoggerV2.
func (z *zapLogger) Errorf(format string, args ...interface{}) { z.log.Errorf(format, args...) }
// Fatal implements grpclog.LoggerV2.
func (z *zapLogger) Fatal(args ...interface{}) { z.log.Fatal(args...) }
// Fatalln implements grpclog.LoggerV2.
func (z *zapLogger) Fatalln(args ...interface{}) { z.log.Fatal(args...) }
// Fatalf implements grpclog.LoggerV2.
func (z *zapLogger) Fatalf(format string, args ...interface{}) { z.log.Fatalf(format, args...) }
// V implements grpclog.LoggerV2.
func (z *zapLogger) V(int) bool { return z.Enabled(zapcore.DebugLevel) }

View File

@ -0,0 +1,33 @@
package logger
import "go.uber.org/zap"
// WithSamplingInitial returns Option that sets sampling initial parameter.
func WithSamplingInitial(v int) Option { return func(o *options) { o.SamplingInitial = v } }
// WithSamplingThereafter returns Option that sets sampling thereafter parameter.
func WithSamplingThereafter(v int) Option { return func(o *options) { o.SamplingThereafter = v } }
// WithFormat returns Option that sets format parameter.
func WithFormat(v string) Option { return func(o *options) { o.Format = v } }
// WithLevel returns Option that sets Level parameter.
func WithLevel(v string) Option { return func(o *options) { o.Level = v } }
// WithTraceLevel returns Option that sets trace level parameter.
func WithTraceLevel(v string) Option { return func(o *options) { o.TraceLevel = v } }
// WithoutDisclaimer returns Option that disables disclaimer.
func WithoutDisclaimer() Option { return func(o *options) { o.NoDisclaimer = true } }
// WithoutCaller returns Option that disables caller printing.
func WithoutCaller() Option { return func(o *options) { o.NoCaller = true } }
// WithAppName returns Option that sets application name.
func WithAppName(v string) Option { return func(o *options) { o.AppName = v } }
// WithAppVersion returns Option that sets application version.
func WithAppVersion(v string) Option { return func(o *options) { o.AppVersion = v } }
// WithZapOptions returns Option that sets zap logger options.
func WithZapOptions(opts ...zap.Option) Option { return func(o *options) { o.Options = opts } }

134
pkg/logger/zap.go 100644
View File

@ -0,0 +1,134 @@
package logger
import (
"strings"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
type (
// Option represents logger option setter.
Option func(o *options)
options struct {
Options []zap.Option
SamplingInitial int
SamplingThereafter int
Format string
Level string
TraceLevel string
NoCaller bool
NoDisclaimer bool
AppName string
AppVersion string
}
)
const (
formatJSON = "json"
formatConsole = "console"
defaultSamplingInitial = 100
defaultSamplingThereafter = 100
lvlInfo = "info"
lvlWarn = "warn"
lvlDebug = "debug"
lvlError = "error"
lvlFatal = "fatal"
lvlPanic = "panic"
)
func safeLevel(lvl string) zap.AtomicLevel {
switch strings.ToLower(lvl) {
case lvlDebug:
return zap.NewAtomicLevelAt(zap.DebugLevel)
case lvlWarn:
return zap.NewAtomicLevelAt(zap.WarnLevel)
case lvlError:
return zap.NewAtomicLevelAt(zap.ErrorLevel)
case lvlFatal:
return zap.NewAtomicLevelAt(zap.FatalLevel)
case lvlPanic:
return zap.NewAtomicLevelAt(zap.PanicLevel)
default:
return zap.NewAtomicLevelAt(zap.InfoLevel)
}
}
func defaults() *options {
return &options{
SamplingInitial: defaultSamplingInitial,
SamplingThereafter: defaultSamplingThereafter,
Format: formatConsole,
Level: lvlDebug,
TraceLevel: lvlInfo,
NoCaller: false,
NoDisclaimer: false,
AppName: "",
AppVersion: "",
}
}
// New returns new zap.Logger using all options specified and stdout used
// for output.
func New(opts ...Option) (*zap.Logger, error) {
o := defaults()
c := zap.NewProductionConfig()
c.OutputPaths = []string{"stdout"}
c.ErrorOutputPaths = []string{"stdout"}
for _, opt := range opts {
opt(o)
}
// set sampling
c.Sampling = &zap.SamplingConfig{
Initial: o.SamplingInitial,
Thereafter: o.SamplingThereafter,
}
// logger level
c.Level = safeLevel(o.Level)
traceLvl := safeLevel(o.TraceLevel)
// logger format
switch f := o.Format; strings.ToLower(f) {
case formatConsole:
c.Encoding = formatConsole
default:
c.Encoding = formatJSON
}
// logger time
c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
if o.NoCaller {
c.EncoderConfig.EncodeCaller = nil
}
// enable trace only for current log-level
o.Options = append(o.Options, zap.AddStacktrace(traceLvl))
l, err := c.Build(o.Options...)
if err != nil {
return nil, err
}
if o.NoDisclaimer {
return l, nil
}
return l.With(
zap.String("app_name", o.AppName),
zap.String("app_version", o.AppVersion)), nil
}

View File

@ -0,0 +1,177 @@
package neofs
import (
"context"
"crypto/ecdsa"
"io"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
"github.com/nspcc-dev/neofs-api-go/pkg/token"
"github.com/nspcc-dev/neofs-http-gw/connections"
)
// BaseOptions represents basic NeoFS request options.
type BaseOptions struct {
Client client.Client
SessionToken *token.SessionToken
BearerToken *token.BearerToken
}
// PutOptions represents NeoFS Put request options.
type PutOptions struct {
BaseOptions
Attributes []*object.Attribute
ContainerID *container.ID
OwnerID *owner.ID
Reader io.Reader
}
// GetOptions represents NeoFS Get request options.
type GetOptions struct {
BaseOptions
ObjectAddress *object.Address
Writer io.Writer
}
// SearchOptions represents NeoFS Search request options.
type SearchOptions struct {
BaseOptions
ContainerID *container.ID
Attribute struct {
Key string
Value string
}
}
// DeleteOptions represents NeoFS Delete request options.
type DeleteOptions struct {
BaseOptions
ObjectAddress *object.Address
}
// ObjectClient wraps basic NeoFS requests.
type ObjectClient interface {
Put(context.Context, *PutOptions) (*object.Address, error)
Get(context.Context, *GetOptions) (*object.Object, error)
Search(context.Context, *SearchOptions) ([]*object.ID, error)
Delete(context.Context, *DeleteOptions) error
}
// ClientPlant provides connections to NeoFS nodes from pool and allows to
// get local owner ID.
type ClientPlant interface {
ConnectionArtifacts() (client.Client, *token.SessionToken, error)
Object() ObjectClient
OwnerID() *owner.ID
}
type neofsObjectClient struct {
key *ecdsa.PrivateKey
pool connections.Pool
}
type neofsClientPlant struct {
key *ecdsa.PrivateKey
ownerID *owner.ID
pool connections.Pool
}
// ConnectionArtifacts returns connection from pool.
func (cp *neofsClientPlant) ConnectionArtifacts() (client.Client, *token.SessionToken, error) {
return cp.pool.ConnectionArtifacts()
}
// Object returns ObjectClient instance from plant.
func (cp *neofsClientPlant) Object() ObjectClient {
return &neofsObjectClient{
key: cp.key,
pool: cp.pool,
}
}
// OwnerID returns plant's owner ID.
func (cp *neofsClientPlant) OwnerID() *owner.ID {
return cp.ownerID
}
// NewClientPlant creates new ClientPlant from given context, pool and credentials.
func NewClientPlant(ctx context.Context, pool connections.Pool, creds Credentials) (ClientPlant, error) {
return &neofsClientPlant{key: creds.PrivateKey(), ownerID: creds.Owner(), pool: pool}, nil
}
// Put does NeoFS Put request, returning new object address if successful.
func (oc *neofsObjectClient) Put(ctx context.Context, options *PutOptions) (*object.Address, error) {
var (
err error
objectID *object.ID
)
address := object.NewAddress()
rawObject := object.NewRaw()
rawObject.SetContainerID(options.ContainerID)
rawObject.SetOwnerID(options.OwnerID)
rawObject.SetAttributes(options.Attributes...)
ops := new(client.PutObjectParams).
WithObject(rawObject.Object()).
WithPayloadReader(options.Reader)
objectID, err = options.Client.PutObject(
ctx,
ops,
client.WithSession(options.SessionToken),
client.WithBearer(options.BearerToken),
)
if err != nil {
return nil, err
}
address.SetObjectID(objectID)
address.SetContainerID(options.ContainerID)
return address, nil
}
// Get does NeoFS Get request, returning an object received if successful.
func (oc *neofsObjectClient) Get(ctx context.Context, options *GetOptions) (*object.Object, error) {
var (
err error
obj *object.Object
)
ops := new(client.GetObjectParams).
WithAddress(options.ObjectAddress).
WithPayloadWriter(options.Writer)
obj, err = options.Client.GetObject(
ctx,
ops,
client.WithSession(options.SessionToken),
client.WithBearer(options.BearerToken),
)
return obj, err
}
// Search does NeoFS Search request, returning object IDs if successful.
func (oc *neofsObjectClient) Search(ctx context.Context, options *SearchOptions) ([]*object.ID, error) {
sfs := object.NewSearchFilters()
sfs.AddRootFilter()
sfs.AddFilter(options.Attribute.Key, options.Attribute.Value, object.MatchStringEqual)
sops := new(client.SearchObjectParams)
sops.WithContainerID(options.ContainerID)
sops.WithSearchFilters(sfs)
return options.Client.SearchObject(
ctx,
sops,
client.WithSession(options.SessionToken),
client.WithBearer(options.BearerToken),
)
}
// Delete deletes NeoFS object.
func (oc *neofsObjectClient) Delete(ctx context.Context, options *DeleteOptions) error {
ops := new(client.DeleteObjectParams).WithAddress(options.ObjectAddress)
err := options.Client.DeleteObject(
ctx,
ops,
client.WithSession(options.SessionToken),
client.WithBearer(options.BearerToken),
)
return err
}

View File

@ -0,0 +1,78 @@
package neofs
import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"math/big"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
crypto "github.com/nspcc-dev/neofs-crypto"
)
type (
// Credentials contains methods that needed to work with NeoFS.
Credentials interface {
Owner() *owner.ID
PublicKey() *ecdsa.PublicKey
PrivateKey() *ecdsa.PrivateKey
}
credentials struct {
key *ecdsa.PrivateKey
ownerID *owner.ID
}
)
// NewCredentials creates an instance of Credentials through string
// representation of secret. It allows passing WIF, path, hex-encoded and others.
func NewCredentials(secret string) (Credentials, error) {
key, err := crypto.LoadPrivateKey(secret)
if err != nil {
return nil, err
}
return setFromPrivateKey(key)
}
// NewEphemeralCredentials creates new private key and Credentials based on that
// key.
func NewEphemeralCredentials() (Credentials, error) {
c := elliptic.P256()
priv, x, y, err := elliptic.GenerateKey(c, rand.Reader)
if err != nil {
return nil, err
}
key := &ecdsa.PrivateKey{
PublicKey: ecdsa.PublicKey{
Curve: c,
X: x,
Y: y,
},
D: new(big.Int).SetBytes(priv),
}
return setFromPrivateKey(key)
}
// PrivateKey returns ecdsa.PrivateKey.
func (c *credentials) PrivateKey() *ecdsa.PrivateKey {
return c.key
}
// PublicKey returns ecdsa.PublicKey.
func (c *credentials) PublicKey() *ecdsa.PublicKey {
return &c.key.PublicKey
}
// Owner returns owner.ID.
func (c *credentials) Owner() *owner.ID {
return c.ownerID
}
func setFromPrivateKey(key *ecdsa.PrivateKey) (*credentials, error) {
wallet, err := owner.NEO3WalletFromPublicKey(&key.PublicKey)
if err != nil {
return nil, err
}
ownerID := owner.NewIDFromNeo3Wallet(wallet)
return &credentials{key: key, ownerID: ownerID}, nil
}

160
pkg/pool/pool.go 100644
View File

@ -0,0 +1,160 @@
package pool
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"math/rand"
"sync"
"time"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/token"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
)
// PoolBuilderOptions contains options used to build connection pool.
type PoolBuilderOptions struct {
Key *ecdsa.PrivateKey
NodeConnectionTimeout time.Duration
NodeRequestTimeout time.Duration
ClientRebalanceInterval time.Duration
KeepaliveTime time.Duration
KeepaliveTimeout time.Duration
KeepalivePermitWoStream bool
SessionExpirationEpoch uint64
weights []float64
connections []*grpc.ClientConn
}
// PoolBuilder is an interim structure used to collect node addresses/weights and
// build connection pool subsequently.
type PoolBuilder struct {
addresses []string
weights []float64
}
// AddNode adds address/weight pair to node PoolBuilder list.
func (pb *PoolBuilder) AddNode(address string, weight float64) *PoolBuilder {
pb.addresses = append(pb.addresses, address)
pb.weights = append(pb.weights, weight)
return pb
}
// Build creates new pool based on current PoolBuilder state and options.
func (pb *PoolBuilder) Build(ctx context.Context, options *PoolBuilderOptions) (Pool, error) {
if len(pb.addresses) == 0 {
return nil, errors.New("no NeoFS peers configured")
}
totalWeight := 0.0
for _, w := range pb.weights {
totalWeight += w
}
for i, w := range pb.weights {
pb.weights[i] = w / totalWeight
}
var cons = make([]*grpc.ClientConn, len(pb.addresses))
for i, address := range pb.addresses {
con, err := func() (*grpc.ClientConn, error) {
toctx, c := context.WithTimeout(ctx, options.NodeConnectionTimeout)
defer c()
return grpc.DialContext(toctx, address,
grpc.WithInsecure(),
grpc.WithBlock(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: options.KeepaliveTime,
Timeout: options.KeepaliveTimeout,
PermitWithoutStream: options.KeepalivePermitWoStream,
}),
)
}()
if err != nil {
return nil, err
}
cons[i] = con
}
options.weights = pb.weights
options.connections = cons
return new(ctx, options)
}
// Pool is an interface providing connection artifacts on request.
type Pool interface {
ConnectionArtifacts() (client.Client, *token.SessionToken, error)
}
type clientPack struct {
client client.Client
sessionToken *token.SessionToken
healthy bool
}
type pool struct {
lock sync.RWMutex
sampler *Sampler
clientPacks []*clientPack
}
func new(ctx context.Context, options *PoolBuilderOptions) (Pool, error) {
clientPacks := make([]*clientPack, len(options.weights))
for i, con := range options.connections {
c, err := client.New(client.WithDefaultPrivateKey(options.Key), client.WithGRPCConnection(con))
if err != nil {
return nil, err
}
st, err := c.CreateSession(ctx, options.SessionExpirationEpoch)
if err != nil {
address := "unknown"
if epi, err := c.EndpointInfo(ctx); err == nil {
address = epi.NodeInfo().Address()
}
return nil, fmt.Errorf("failed to create neofs session token for client %s: %w", address, err)
}
clientPacks[i] = &clientPack{client: c, sessionToken: st, healthy: true}
}
source := rand.NewSource(time.Now().UnixNano())
sampler := NewSampler(options.weights, source)
pool := &pool{sampler: sampler, clientPacks: clientPacks}
go func() {
ticker := time.NewTimer(options.ClientRebalanceInterval)
for range ticker.C {
ok := true
for i, clientPack := range pool.clientPacks {
func() {
tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout)
defer c()
if _, err := clientPack.client.EndpointInfo(tctx); err != nil {
ok = false
}
pool.lock.Lock()
pool.clientPacks[i].healthy = ok
pool.lock.Unlock()
}()
}
ticker.Reset(options.ClientRebalanceInterval)
}
}()
return pool, nil
}
func (p *pool) ConnectionArtifacts() (client.Client, *token.SessionToken, error) {
p.lock.RLock()
defer p.lock.RUnlock()
if len(p.clientPacks) == 1 {
cp := p.clientPacks[0]
if cp.healthy {
return cp.client, cp.sessionToken, nil
}
return nil, nil, errors.New("no healthy client")
}
attempts := 3 * len(p.clientPacks)
for k := 0; k < attempts; k++ {
i := p.sampler.Next()
if cp := p.clientPacks[i]; cp.healthy {
return cp.client, cp.sessionToken, nil
}
}
return nil, nil, errors.New("no healthy client")
}

View File

@ -0,0 +1,81 @@
package pool
import "math/rand"
// Sampler implements weighted random number generation using Vose's Alias
// Method (https://www.keithschwarz.com/darts-dice-coins/).
type Sampler struct {
randomGenerator *rand.Rand
probabilities []float64
alias []int
}
// NewSampler creates new Sampler with a given set of probabilities using
// given source of randomness. Created Sampler will produce numbers from
// 0 to len(probabilities).
func NewSampler(probabilities []float64, source rand.Source) *Sampler {
sampler := &Sampler{}
var (
small workList
large workList
)
n := len(probabilities)
sampler.randomGenerator = rand.New(source)
sampler.probabilities = make([]float64, n)
sampler.alias = make([]int, n)
// Compute scaled probabilities.
p := make([]float64, n)
for i := 0; i < n; i++ {
p[i] = probabilities[i] * float64(n)
}
for i, pi := range p {
if pi < 1 {
small.add(i)
} else {
large.add(i)
}
}
for len(small) > 0 && len(large) > 0 {
l, g := small.remove(), large.remove()
sampler.probabilities[l] = p[l]
sampler.alias[l] = g
p[g] = p[g] + p[l] - 1
if p[g] < 1 {
small.add(g)
} else {
large.add(g)
}
}
for len(large) > 0 {
g := large.remove()
sampler.probabilities[g] = 1
}
for len(small) > 0 {
l := small.remove()
sampler.probabilities[l] = 1
}
return sampler
}
// Next returns the next (not so) random number from Sampler.
func (g *Sampler) Next() int {
n := len(g.alias)
i := g.randomGenerator.Intn(n)
if g.randomGenerator.Float64() < g.probabilities[i] {
return i
}
return g.alias[i]
}
type workList []int
func (wl *workList) add(e int) {
*wl = append(*wl, e)
}
func (wl *workList) remove() int {
l := len(*wl) - 1
n := (*wl)[l]
*wl = (*wl)[:l]
return n
}