From 82b2126bfdcf4551d563d4fc34eed02c55cc0de2 Mon Sep 17 00:00:00 2001 From: Angira Kekteeva Date: Fri, 28 May 2021 11:57:28 +0300 Subject: [PATCH] [#46] *: Remove moved to sdk packages, refactoring Removed connections, logger, neofs because they were moved to sdk repo. Made changes in downloader, uploader, main.go and app.go via refactoring of neofs. Replaced dependencies to removed packages by sdk packages. Signed-off-by: Angira Kekteeva --- app.go | 10 +-- connections/pool.go | 160 ------------------------------------- connections/sampler.go | 81 ------------------- downloader/download.go | 83 +++++++++---------- go.mod | 2 +- go.sum | 4 + logger/grpc.go | 78 ------------------ logger/option.go | 33 -------- logger/zap.go | 134 ------------------------------- main.go | 2 +- neofs/client-plant.go | 177 ----------------------------------------- neofs/credentials.go | 78 ------------------ uploader/upload.go | 43 +++++----- 13 files changed, 73 insertions(+), 812 deletions(-) delete mode 100644 connections/pool.go delete mode 100644 connections/sampler.go delete mode 100644 logger/grpc.go delete mode 100644 logger/option.go delete mode 100644 logger/zap.go delete mode 100644 neofs/client-plant.go delete mode 100644 neofs/credentials.go diff --git a/app.go b/app.go index 841b305..e086ed2 100644 --- a/app.go +++ b/app.go @@ -6,11 +6,11 @@ import ( "strconv" "github.com/fasthttp/router" - "github.com/nspcc-dev/neofs-http-gw/connections" "github.com/nspcc-dev/neofs-http-gw/downloader" - "github.com/nspcc-dev/neofs-http-gw/logger" - "github.com/nspcc-dev/neofs-http-gw/neofs" "github.com/nspcc-dev/neofs-http-gw/uploader" + "github.com/nspcc-dev/neofs-sdk-go/pkg/logger" + "github.com/nspcc-dev/neofs-sdk-go/pkg/neofs" + "github.com/nspcc-dev/neofs-sdk-go/pkg/pool" "github.com/spf13/viper" "github.com/valyala/fasthttp" "go.uber.org/zap" @@ -99,7 +99,7 @@ func newApp(ctx context.Context, opt ...Option) App { if err != nil { a.log.Fatal("failed to get neofs credentials", zap.Error(err)) } - pb := new(connections.PoolBuilder) + pb := new(pool.Builder) for i := 0; ; i++ { address := a.cfg.GetString(cfgPeers + "." + strconv.Itoa(i) + ".address") weight := a.cfg.GetFloat64(cfgPeers + "." + strconv.Itoa(i) + ".weight") @@ -112,7 +112,7 @@ func newApp(ctx context.Context, opt ...Option) App { pb.AddNode(address, weight) a.log.Info("add connection", zap.String("address", address), zap.Float64("weight", weight)) } - opts := &connections.PoolBuilderOptions{ + opts := &pool.BuilderOptions{ Key: creds.PrivateKey(), NodeConnectionTimeout: a.cfg.GetDuration(cfgConTimeout), NodeRequestTimeout: a.cfg.GetDuration(cfgReqTimeout), diff --git a/connections/pool.go b/connections/pool.go deleted file mode 100644 index 4aae633..0000000 --- a/connections/pool.go +++ /dev/null @@ -1,160 +0,0 @@ -package connections - -import ( - "context" - "crypto/ecdsa" - "errors" - "fmt" - "math/rand" - "sync" - "time" - - "github.com/nspcc-dev/neofs-api-go/pkg/client" - "github.com/nspcc-dev/neofs-api-go/pkg/token" - "google.golang.org/grpc" - "google.golang.org/grpc/keepalive" -) - -// PoolBuilderOptions contains options used to build connection pool. -type PoolBuilderOptions struct { - Key *ecdsa.PrivateKey - NodeConnectionTimeout time.Duration - NodeRequestTimeout time.Duration - ClientRebalanceInterval time.Duration - KeepaliveTime time.Duration - KeepaliveTimeout time.Duration - KeepalivePermitWoStream bool - SessionExpirationEpoch uint64 - weights []float64 - connections []*grpc.ClientConn -} - -// PoolBuilder is an interim structure used to collect node addresses/weights and -// build connection pool subsequently. -type PoolBuilder struct { - addresses []string - weights []float64 -} - -// AddNode adds address/weight pair to node PoolBuilder list. -func (pb *PoolBuilder) AddNode(address string, weight float64) *PoolBuilder { - pb.addresses = append(pb.addresses, address) - pb.weights = append(pb.weights, weight) - return pb -} - -// Build creates new pool based on current PoolBuilder state and options. -func (pb *PoolBuilder) Build(ctx context.Context, options *PoolBuilderOptions) (Pool, error) { - if len(pb.addresses) == 0 { - return nil, errors.New("no NeoFS peers configured") - } - totalWeight := 0.0 - for _, w := range pb.weights { - totalWeight += w - } - for i, w := range pb.weights { - pb.weights[i] = w / totalWeight - } - var cons = make([]*grpc.ClientConn, len(pb.addresses)) - for i, address := range pb.addresses { - con, err := func() (*grpc.ClientConn, error) { - toctx, c := context.WithTimeout(ctx, options.NodeConnectionTimeout) - defer c() - return grpc.DialContext(toctx, address, - grpc.WithInsecure(), - grpc.WithBlock(), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: options.KeepaliveTime, - Timeout: options.KeepaliveTimeout, - PermitWithoutStream: options.KeepalivePermitWoStream, - }), - ) - }() - if err != nil { - return nil, err - } - cons[i] = con - } - options.weights = pb.weights - options.connections = cons - return new(ctx, options) -} - -// Pool is an interface providing connection artifacts on request. -type Pool interface { - ConnectionArtifacts() (client.Client, *token.SessionToken, error) -} - -type clientPack struct { - client client.Client - sessionToken *token.SessionToken - healthy bool -} - -type pool struct { - lock sync.RWMutex - sampler *Sampler - clientPacks []*clientPack -} - -func new(ctx context.Context, options *PoolBuilderOptions) (Pool, error) { - clientPacks := make([]*clientPack, len(options.weights)) - for i, con := range options.connections { - c, err := client.New(client.WithDefaultPrivateKey(options.Key), client.WithGRPCConnection(con)) - if err != nil { - return nil, err - } - st, err := c.CreateSession(ctx, options.SessionExpirationEpoch) - if err != nil { - address := "unknown" - if epi, err := c.EndpointInfo(ctx); err == nil { - address = epi.NodeInfo().Address() - } - return nil, fmt.Errorf("failed to create neofs session token for client %s: %w", address, err) - } - clientPacks[i] = &clientPack{client: c, sessionToken: st, healthy: true} - } - source := rand.NewSource(time.Now().UnixNano()) - sampler := NewSampler(options.weights, source) - pool := &pool{sampler: sampler, clientPacks: clientPacks} - go func() { - ticker := time.NewTimer(options.ClientRebalanceInterval) - for range ticker.C { - ok := true - for i, clientPack := range pool.clientPacks { - func() { - tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout) - defer c() - if _, err := clientPack.client.EndpointInfo(tctx); err != nil { - ok = false - } - pool.lock.Lock() - pool.clientPacks[i].healthy = ok - pool.lock.Unlock() - }() - } - ticker.Reset(options.ClientRebalanceInterval) - } - }() - return pool, nil -} - -func (p *pool) ConnectionArtifacts() (client.Client, *token.SessionToken, error) { - p.lock.RLock() - defer p.lock.RUnlock() - if len(p.clientPacks) == 1 { - cp := p.clientPacks[0] - if cp.healthy { - return cp.client, cp.sessionToken, nil - } - return nil, nil, errors.New("no healthy client") - } - attempts := 3 * len(p.clientPacks) - for k := 0; k < attempts; k++ { - i := p.sampler.Next() - if cp := p.clientPacks[i]; cp.healthy { - return cp.client, cp.sessionToken, nil - } - } - return nil, nil, errors.New("no healthy client") -} diff --git a/connections/sampler.go b/connections/sampler.go deleted file mode 100644 index caff5d6..0000000 --- a/connections/sampler.go +++ /dev/null @@ -1,81 +0,0 @@ -package connections - -import "math/rand" - -// Sampler implements weighted random number generation using Vose's Alias -// Method (https://www.keithschwarz.com/darts-dice-coins/). -type Sampler struct { - randomGenerator *rand.Rand - probabilities []float64 - alias []int -} - -// NewSampler creates new Sampler with a given set of probabilities using -// given source of randomness. Created Sampler will produce numbers from -// 0 to len(probabilities). -func NewSampler(probabilities []float64, source rand.Source) *Sampler { - sampler := &Sampler{} - var ( - small workList - large workList - ) - n := len(probabilities) - sampler.randomGenerator = rand.New(source) - sampler.probabilities = make([]float64, n) - sampler.alias = make([]int, n) - // Compute scaled probabilities. - p := make([]float64, n) - for i := 0; i < n; i++ { - p[i] = probabilities[i] * float64(n) - } - for i, pi := range p { - if pi < 1 { - small.add(i) - } else { - large.add(i) - } - } - for len(small) > 0 && len(large) > 0 { - l, g := small.remove(), large.remove() - sampler.probabilities[l] = p[l] - sampler.alias[l] = g - p[g] = p[g] + p[l] - 1 - if p[g] < 1 { - small.add(g) - } else { - large.add(g) - } - } - for len(large) > 0 { - g := large.remove() - sampler.probabilities[g] = 1 - } - for len(small) > 0 { - l := small.remove() - sampler.probabilities[l] = 1 - } - return sampler -} - -// Next returns the next (not so) random number from Sampler. -func (g *Sampler) Next() int { - n := len(g.alias) - i := g.randomGenerator.Intn(n) - if g.randomGenerator.Float64() < g.probabilities[i] { - return i - } - return g.alias[i] -} - -type workList []int - -func (wl *workList) add(e int) { - *wl = append(*wl, e) -} - -func (wl *workList) remove() int { - l := len(*wl) - 1 - n := (*wl)[l] - *wl = (*wl)[:l] - return n -} diff --git a/downloader/download.go b/downloader/download.go index 39ff41b..51baea2 100644 --- a/downloader/download.go +++ b/downloader/download.go @@ -12,30 +12,18 @@ import ( "sync" "time" + "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-http-gw/neofs" + "github.com/nspcc-dev/neofs-api-go/pkg/token" "github.com/nspcc-dev/neofs-http-gw/tokens" + "github.com/nspcc-dev/neofs-sdk-go/pkg/neofs" "github.com/valyala/fasthttp" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) -var ( - getOptionsPool = sync.Pool{ - New: func() interface{} { - return new(neofs.GetOptions) - }, - } - - searchOptionsPool = sync.Pool{ - New: func() interface{} { - return new(neofs.SearchOptions) - }, - } -) - type ( detector struct { io.Writer @@ -45,8 +33,7 @@ type ( request struct { *fasthttp.RequestCtx - log *zap.Logger - objectClient neofs.ObjectClient + log *zap.Logger } objectIDs []*object.ID @@ -85,12 +72,15 @@ func isValidValue(s string) bool { return true } -func (r *request) receiveFile(options *neofs.GetOptions) { +func (r *request) receiveFile(clnt client.Client, + sessionToken *token.SessionToken, + objectAddress *object.Address) { var ( err error dis = "inline" start = time.Now() filename string + obj *object.Object ) if err = tokens.StoreBearerToken(r.RequestCtx); err != nil { r.log.Error("could not fetch and store bearer token", zap.Error(err)) @@ -98,8 +88,15 @@ func (r *request) receiveFile(options *neofs.GetOptions) { return } writer := newDetector(r.Response.BodyWriter()) - options.Writer = writer - obj, err := r.objectClient.Get(r.RequestCtx, options) + options := new(client.GetObjectParams). + WithAddress(objectAddress). + WithPayloadWriter(writer) + + obj, err = clnt.GetObject( + r.RequestCtx, + options, + client.WithSession(sessionToken), + ) if err != nil { r.log.Error( "could not receive object", @@ -183,9 +180,8 @@ func New(ctx context.Context, log *zap.Logger, plant neofs.ClientPlant) (*Downlo func (d *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request { return &request{ - RequestCtx: ctx, - log: log, - objectClient: d.plant.Object(), + RequestCtx: ctx, + log: log, } } @@ -198,23 +194,22 @@ func (d *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) { oid, _ = c.UserValue("oid").(string) val = strings.Join([]string{cid, oid}, "/") log = d.log.With(zap.String("cid", cid), zap.String("oid", oid)) + conn client.Client + tkn *token.SessionToken ) if err = address.Parse(val); err != nil { log.Error("wrong object address", zap.Error(err)) c.Error("wrong object address", fasthttp.StatusBadRequest) return } - getOpts := getOptionsPool.Get().(*neofs.GetOptions) - defer getOptionsPool.Put(getOpts) - getOpts.Client, getOpts.SessionToken, err = d.plant.ConnectionArtifacts() + + conn, tkn, err = d.plant.ConnectionArtifacts() if err != nil { log.Error("failed to get neofs connection artifacts", zap.Error(err)) c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError) return } - getOpts.ObjectAddress = address - getOpts.Writer = nil - d.newRequest(c, log).receiveFile(getOpts) + d.newRequest(c, log).receiveFile(conn, tkn, address) } // DownloadByAttribute handles attribute-based download requests. @@ -225,6 +220,9 @@ func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) { key, _ = c.UserValue("attr_key").(string) val, _ = c.UserValue("attr_val").(string) log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val)) + ids []*object.ID + conn client.Client + tkn *token.SessionToken ) cid := container.NewID() if err = cid.Parse(scid); err != nil { @@ -232,20 +230,20 @@ func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) { c.Error("wrong container id", fasthttp.StatusBadRequest) return } - searchOpts := searchOptionsPool.Get().(*neofs.SearchOptions) - defer searchOptionsPool.Put(searchOpts) - searchOpts.Client, searchOpts.SessionToken, err = d.plant.ConnectionArtifacts() + + conn, tkn, err = d.plant.ConnectionArtifacts() if err != nil { log.Error("failed to get neofs connection artifacts", zap.Error(err)) c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError) return } - searchOpts.BearerToken = nil - searchOpts.ContainerID = cid - searchOpts.Attribute.Key = key - searchOpts.Attribute.Value = val - var ids []*object.ID - if ids, err = d.plant.Object().Search(c, searchOpts); err != nil { + + options := object.NewSearchFilters() + options.AddRootFilter() + options.AddFilter(key, val, object.MatchStringEqual) + + sops := new(client.SearchObjectParams).WithContainerID(cid).WithSearchFilters(options) + if ids, err = conn.SearchObject(c, sops, client.WithSession(tkn)); err != nil { log.Error("something went wrong", zap.Error(err)) c.Error("something went wrong", fasthttp.StatusBadRequest) return @@ -262,15 +260,12 @@ func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) { address := object.NewAddress() address.SetContainerID(cid) address.SetObjectID(ids[0]) - getOpts := getOptionsPool.Get().(*neofs.GetOptions) - defer getOptionsPool.Put(getOpts) - getOpts.Client, getOpts.SessionToken, err = d.plant.ConnectionArtifacts() + + conn, tkn, err = d.plant.ConnectionArtifacts() if err != nil { log.Error("failed to get neofs connection artifacts", zap.Error(err)) c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError) return } - getOpts.ObjectAddress = address - getOpts.Writer = nil - d.newRequest(c, log).receiveFile(getOpts) + d.newRequest(c, log).receiveFile(conn, tkn, address) } diff --git a/go.mod b/go.mod index 2919778..059c184 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/fasthttp/router v1.3.5 github.com/mr-tron/base58 v1.1.3 // indirect github.com/nspcc-dev/neofs-api-go v1.26.1 - github.com/nspcc-dev/neofs-crypto v0.3.0 + github.com/nspcc-dev/neofs-sdk-go v0.0.0-20210527182636-cbfc17a1a9a2 github.com/prometheus/client_golang v1.9.0 github.com/prometheus/common v0.15.0 github.com/spf13/pflag v1.0.5 diff --git a/go.sum b/go.sum index 63228d1..6459a87 100644 --- a/go.sum +++ b/go.sum @@ -29,6 +29,8 @@ github.com/abiosoft/ishell v2.0.0+incompatible/go.mod h1:HQR9AqF2R3P4XXpMpI0NAzg github.com/abiosoft/readline v0.0.0-20180607040430-155bce2042db/go.mod h1:rB3B4rKii8V21ydCbIzH5hZiCQE7f5E9SzUb/ZZx530= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= +github.com/alecthomas/participle v0.7.1/go.mod h1:HfdmEuwvr12HXQN44HPWXR0lHmVolVYe4dyL6lQ3duY= +github.com/alecthomas/repr v0.0.0-20181024024818-d37bc2a10ba1/go.mod h1:xTS7Pm1pD1mvyM075QCDSRqH6qRLXylzS24ZTpRiSzQ= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -317,6 +319,8 @@ github.com/nspcc-dev/neofs-crypto v0.2.0/go.mod h1:F/96fUzPM3wR+UGsPi3faVNmFlA9K github.com/nspcc-dev/neofs-crypto v0.2.3/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw= github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM= github.com/nspcc-dev/neofs-crypto v0.3.0/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw= +github.com/nspcc-dev/neofs-sdk-go v0.0.0-20210527182636-cbfc17a1a9a2 h1:z8xtKILKi+Dolk3VAyCaFPMroFnT+x8qTqMT/zBRqIc= +github.com/nspcc-dev/neofs-sdk-go v0.0.0-20210527182636-cbfc17a1a9a2/go.mod h1:QZE7VaNQRyNFS+3gsrNEQEiLe+d6AR6EteX1M9geh6A= github.com/nspcc-dev/rfc6979 v0.1.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso= github.com/nspcc-dev/rfc6979 v0.2.0 h1:3e1WNxrN60/6N0DW7+UYisLeZJyfqZTNOjeV/toYvOE= github.com/nspcc-dev/rfc6979 v0.2.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso= diff --git a/logger/grpc.go b/logger/grpc.go deleted file mode 100644 index 584bf25..0000000 --- a/logger/grpc.go +++ /dev/null @@ -1,78 +0,0 @@ -package logger - -import ( - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "google.golang.org/grpc/grpclog" -) - -type ( - zapLogger struct { - zapcore.Core - log *zap.SugaredLogger - } - - // Logger includes grpclog.LoggerV2 interface with an additional - // Println method. - Logger interface { - grpclog.LoggerV2 - Println(v ...interface{}) - } -) - -// GRPC wraps given zap.Logger into grpclog.LoggerV2+ interface. -func GRPC(l *zap.Logger) Logger { - log := l.WithOptions( - // skip gRPCLog + zapLogger in caller - zap.AddCallerSkip(2)) - - return &zapLogger{ - Core: log.Core(), - log: log.Sugar(), - } -} - -// Info implements grpclog.LoggerV2. -func (z *zapLogger) Info(args ...interface{}) { z.log.Info(args...) } - -// Infoln implements grpclog.LoggerV2. -func (z *zapLogger) Infoln(args ...interface{}) { z.log.Info(args...) } - -// Infof implements grpclog.LoggerV2. -func (z *zapLogger) Infof(format string, args ...interface{}) { z.log.Infof(format, args...) } - -// Println allows to print a line with info severity. -func (z *zapLogger) Println(args ...interface{}) { z.log.Info(args...) } - -// Printf implements grpclog.LoggerV2. -func (z *zapLogger) Printf(format string, args ...interface{}) { z.log.Infof(format, args...) } - -// Warning implements grpclog.LoggerV2. -func (z *zapLogger) Warning(args ...interface{}) { z.log.Warn(args...) } - -// Warningln implements grpclog.LoggerV2. -func (z *zapLogger) Warningln(args ...interface{}) { z.log.Warn(args...) } - -// Warningf implements grpclog.LoggerV2. -func (z *zapLogger) Warningf(format string, args ...interface{}) { z.log.Warnf(format, args...) } - -// Error implements grpclog.LoggerV2. -func (z *zapLogger) Error(args ...interface{}) { z.log.Error(args...) } - -// Errorln implements grpclog.LoggerV2. -func (z *zapLogger) Errorln(args ...interface{}) { z.log.Error(args...) } - -// Errorf implements grpclog.LoggerV2. -func (z *zapLogger) Errorf(format string, args ...interface{}) { z.log.Errorf(format, args...) } - -// Fatal implements grpclog.LoggerV2. -func (z *zapLogger) Fatal(args ...interface{}) { z.log.Fatal(args...) } - -// Fatalln implements grpclog.LoggerV2. -func (z *zapLogger) Fatalln(args ...interface{}) { z.log.Fatal(args...) } - -// Fatalf implements grpclog.LoggerV2. -func (z *zapLogger) Fatalf(format string, args ...interface{}) { z.log.Fatalf(format, args...) } - -// V implements grpclog.LoggerV2. -func (z *zapLogger) V(int) bool { return z.Enabled(zapcore.DebugLevel) } diff --git a/logger/option.go b/logger/option.go deleted file mode 100644 index f617010..0000000 --- a/logger/option.go +++ /dev/null @@ -1,33 +0,0 @@ -package logger - -import "go.uber.org/zap" - -// WithSamplingInitial returns Option that sets sampling initial parameter. -func WithSamplingInitial(v int) Option { return func(o *options) { o.SamplingInitial = v } } - -// WithSamplingThereafter returns Option that sets sampling thereafter parameter. -func WithSamplingThereafter(v int) Option { return func(o *options) { o.SamplingThereafter = v } } - -// WithFormat returns Option that sets format parameter. -func WithFormat(v string) Option { return func(o *options) { o.Format = v } } - -// WithLevel returns Option that sets Level parameter. -func WithLevel(v string) Option { return func(o *options) { o.Level = v } } - -// WithTraceLevel returns Option that sets trace level parameter. -func WithTraceLevel(v string) Option { return func(o *options) { o.TraceLevel = v } } - -// WithoutDisclaimer returns Option that disables disclaimer. -func WithoutDisclaimer() Option { return func(o *options) { o.NoDisclaimer = true } } - -// WithoutCaller returns Option that disables caller printing. -func WithoutCaller() Option { return func(o *options) { o.NoCaller = true } } - -// WithAppName returns Option that sets application name. -func WithAppName(v string) Option { return func(o *options) { o.AppName = v } } - -// WithAppVersion returns Option that sets application version. -func WithAppVersion(v string) Option { return func(o *options) { o.AppVersion = v } } - -// WithZapOptions returns Option that sets zap logger options. -func WithZapOptions(opts ...zap.Option) Option { return func(o *options) { o.Options = opts } } diff --git a/logger/zap.go b/logger/zap.go deleted file mode 100644 index 5e9ee62..0000000 --- a/logger/zap.go +++ /dev/null @@ -1,134 +0,0 @@ -package logger - -import ( - "strings" - - "go.uber.org/zap" - "go.uber.org/zap/zapcore" -) - -type ( - // Option represents logger option setter. - Option func(o *options) - - options struct { - Options []zap.Option - - SamplingInitial int - SamplingThereafter int - - Format string - Level string - TraceLevel string - - NoCaller bool - NoDisclaimer bool - - AppName string - AppVersion string - } -) - -const ( - formatJSON = "json" - formatConsole = "console" - - defaultSamplingInitial = 100 - defaultSamplingThereafter = 100 - - lvlInfo = "info" - lvlWarn = "warn" - lvlDebug = "debug" - lvlError = "error" - lvlFatal = "fatal" - lvlPanic = "panic" -) - -func safeLevel(lvl string) zap.AtomicLevel { - switch strings.ToLower(lvl) { - case lvlDebug: - return zap.NewAtomicLevelAt(zap.DebugLevel) - case lvlWarn: - return zap.NewAtomicLevelAt(zap.WarnLevel) - case lvlError: - return zap.NewAtomicLevelAt(zap.ErrorLevel) - case lvlFatal: - return zap.NewAtomicLevelAt(zap.FatalLevel) - case lvlPanic: - return zap.NewAtomicLevelAt(zap.PanicLevel) - default: - return zap.NewAtomicLevelAt(zap.InfoLevel) - } -} - -func defaults() *options { - return &options{ - SamplingInitial: defaultSamplingInitial, - SamplingThereafter: defaultSamplingThereafter, - - Format: formatConsole, - Level: lvlDebug, - TraceLevel: lvlInfo, - - NoCaller: false, - NoDisclaimer: false, - - AppName: "", - AppVersion: "", - } -} - -// New returns new zap.Logger using all options specified and stdout used -// for output. -func New(opts ...Option) (*zap.Logger, error) { - o := defaults() - c := zap.NewProductionConfig() - - c.OutputPaths = []string{"stdout"} - c.ErrorOutputPaths = []string{"stdout"} - - for _, opt := range opts { - opt(o) - } - - // set sampling - c.Sampling = &zap.SamplingConfig{ - Initial: o.SamplingInitial, - Thereafter: o.SamplingThereafter, - } - - // logger level - c.Level = safeLevel(o.Level) - traceLvl := safeLevel(o.TraceLevel) - - // logger format - switch f := o.Format; strings.ToLower(f) { - case formatConsole: - c.Encoding = formatConsole - default: - c.Encoding = formatJSON - } - - // logger time - c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder - - if o.NoCaller { - c.EncoderConfig.EncodeCaller = nil - } - - // enable trace only for current log-level - o.Options = append(o.Options, zap.AddStacktrace(traceLvl)) - - l, err := c.Build(o.Options...) - if err != nil { - return nil, err - } - - if o.NoDisclaimer { - return l, nil - } - - return l.With( - zap.String("app_name", o.AppName), - zap.String("app_version", o.AppVersion)), nil -} diff --git a/main.go b/main.go index e89489b..830a26f 100644 --- a/main.go +++ b/main.go @@ -5,7 +5,7 @@ import ( "os/signal" "syscall" - "github.com/nspcc-dev/neofs-http-gw/logger" + "github.com/nspcc-dev/neofs-sdk-go/pkg/logger" "github.com/spf13/viper" "go.uber.org/zap" ) diff --git a/neofs/client-plant.go b/neofs/client-plant.go deleted file mode 100644 index e13f29d..0000000 --- a/neofs/client-plant.go +++ /dev/null @@ -1,177 +0,0 @@ -package neofs - -import ( - "context" - "crypto/ecdsa" - "io" - - "github.com/nspcc-dev/neofs-api-go/pkg/client" - "github.com/nspcc-dev/neofs-api-go/pkg/container" - "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-api-go/pkg/owner" - "github.com/nspcc-dev/neofs-api-go/pkg/token" - "github.com/nspcc-dev/neofs-http-gw/connections" -) - -// BaseOptions represents basic NeoFS request options. -type BaseOptions struct { - Client client.Client - SessionToken *token.SessionToken - BearerToken *token.BearerToken -} - -// PutOptions represents NeoFS Put request options. -type PutOptions struct { - BaseOptions - Attributes []*object.Attribute - ContainerID *container.ID - OwnerID *owner.ID - Reader io.Reader -} - -// GetOptions represents NeoFS Get request options. -type GetOptions struct { - BaseOptions - ObjectAddress *object.Address - Writer io.Writer -} - -// SearchOptions represents NeoFS Search request options. -type SearchOptions struct { - BaseOptions - ContainerID *container.ID - Attribute struct { - Key string - Value string - } -} - -// DeleteOptions represents NeoFS Delete request options. -type DeleteOptions struct { - BaseOptions - ObjectAddress *object.Address -} - -// ObjectClient wraps basic NeoFS requests. -type ObjectClient interface { - Put(context.Context, *PutOptions) (*object.Address, error) - Get(context.Context, *GetOptions) (*object.Object, error) - Search(context.Context, *SearchOptions) ([]*object.ID, error) - Delete(context.Context, *DeleteOptions) error -} - -// ClientPlant provides connections to NeoFS nodes from pool and allows to -// get local owner ID. -type ClientPlant interface { - ConnectionArtifacts() (client.Client, *token.SessionToken, error) - Object() ObjectClient - OwnerID() *owner.ID -} - -type neofsObjectClient struct { - key *ecdsa.PrivateKey - pool connections.Pool -} - -type neofsClientPlant struct { - key *ecdsa.PrivateKey - ownerID *owner.ID - pool connections.Pool -} - -// ConnectionArtifacts returns connection from pool. -func (cp *neofsClientPlant) ConnectionArtifacts() (client.Client, *token.SessionToken, error) { - return cp.pool.ConnectionArtifacts() -} - -// Object returns ObjectClient instance from plant. -func (cp *neofsClientPlant) Object() ObjectClient { - return &neofsObjectClient{ - key: cp.key, - pool: cp.pool, - } -} - -// OwnerID returns plant's owner ID. -func (cp *neofsClientPlant) OwnerID() *owner.ID { - return cp.ownerID -} - -// NewClientPlant creates new ClientPlant from given context, pool and credentials. -func NewClientPlant(ctx context.Context, pool connections.Pool, creds Credentials) (ClientPlant, error) { - return &neofsClientPlant{key: creds.PrivateKey(), ownerID: creds.Owner(), pool: pool}, nil -} - -// Put does NeoFS Put request, returning new object address if successful. -func (oc *neofsObjectClient) Put(ctx context.Context, options *PutOptions) (*object.Address, error) { - var ( - err error - objectID *object.ID - ) - address := object.NewAddress() - rawObject := object.NewRaw() - rawObject.SetContainerID(options.ContainerID) - rawObject.SetOwnerID(options.OwnerID) - rawObject.SetAttributes(options.Attributes...) - ops := new(client.PutObjectParams). - WithObject(rawObject.Object()). - WithPayloadReader(options.Reader) - objectID, err = options.Client.PutObject( - ctx, - ops, - client.WithSession(options.SessionToken), - client.WithBearer(options.BearerToken), - ) - if err != nil { - return nil, err - } - address.SetObjectID(objectID) - address.SetContainerID(options.ContainerID) - return address, nil -} - -// Get does NeoFS Get request, returning an object received if successful. -func (oc *neofsObjectClient) Get(ctx context.Context, options *GetOptions) (*object.Object, error) { - var ( - err error - obj *object.Object - ) - ops := new(client.GetObjectParams). - WithAddress(options.ObjectAddress). - WithPayloadWriter(options.Writer) - obj, err = options.Client.GetObject( - ctx, - ops, - client.WithSession(options.SessionToken), - client.WithBearer(options.BearerToken), - ) - return obj, err -} - -// Search does NeoFS Search request, returning object IDs if successful. -func (oc *neofsObjectClient) Search(ctx context.Context, options *SearchOptions) ([]*object.ID, error) { - sfs := object.NewSearchFilters() - sfs.AddRootFilter() - sfs.AddFilter(options.Attribute.Key, options.Attribute.Value, object.MatchStringEqual) - sops := new(client.SearchObjectParams) - sops.WithContainerID(options.ContainerID) - sops.WithSearchFilters(sfs) - return options.Client.SearchObject( - ctx, - sops, - client.WithSession(options.SessionToken), - client.WithBearer(options.BearerToken), - ) -} - -// Delete deletes NeoFS object. -func (oc *neofsObjectClient) Delete(ctx context.Context, options *DeleteOptions) error { - ops := new(client.DeleteObjectParams).WithAddress(options.ObjectAddress) - err := options.Client.DeleteObject( - ctx, - ops, - client.WithSession(options.SessionToken), - client.WithBearer(options.BearerToken), - ) - return err -} diff --git a/neofs/credentials.go b/neofs/credentials.go deleted file mode 100644 index c229468..0000000 --- a/neofs/credentials.go +++ /dev/null @@ -1,78 +0,0 @@ -package neofs - -import ( - "crypto/ecdsa" - "crypto/elliptic" - "crypto/rand" - "math/big" - - "github.com/nspcc-dev/neofs-api-go/pkg/owner" - crypto "github.com/nspcc-dev/neofs-crypto" -) - -type ( - // Credentials contains methods that needed to work with NeoFS. - Credentials interface { - Owner() *owner.ID - PublicKey() *ecdsa.PublicKey - PrivateKey() *ecdsa.PrivateKey - } - - credentials struct { - key *ecdsa.PrivateKey - ownerID *owner.ID - } -) - -// NewCredentials creates an instance of Credentials through string -// representation of secret. It allows passing WIF, path, hex-encoded and others. -func NewCredentials(secret string) (Credentials, error) { - key, err := crypto.LoadPrivateKey(secret) - if err != nil { - return nil, err - } - return setFromPrivateKey(key) -} - -// NewEphemeralCredentials creates new private key and Credentials based on that -// key. -func NewEphemeralCredentials() (Credentials, error) { - c := elliptic.P256() - priv, x, y, err := elliptic.GenerateKey(c, rand.Reader) - if err != nil { - return nil, err - } - key := &ecdsa.PrivateKey{ - PublicKey: ecdsa.PublicKey{ - Curve: c, - X: x, - Y: y, - }, - D: new(big.Int).SetBytes(priv), - } - return setFromPrivateKey(key) -} - -// PrivateKey returns ecdsa.PrivateKey. -func (c *credentials) PrivateKey() *ecdsa.PrivateKey { - return c.key -} - -// PublicKey returns ecdsa.PublicKey. -func (c *credentials) PublicKey() *ecdsa.PublicKey { - return &c.key.PublicKey -} - -// Owner returns owner.ID. -func (c *credentials) Owner() *owner.ID { - return c.ownerID -} - -func setFromPrivateKey(key *ecdsa.PrivateKey) (*credentials, error) { - wallet, err := owner.NEO3WalletFromPublicKey(&key.PublicKey) - if err != nil { - return nil, err - } - ownerID := owner.NewIDFromNeo3Wallet(wallet) - return &credentials{key: key, ownerID: ownerID}, nil -} diff --git a/uploader/upload.go b/uploader/upload.go index a9c874f..db5b7e8 100644 --- a/uploader/upload.go +++ b/uploader/upload.go @@ -5,15 +5,15 @@ import ( "encoding/json" "io" "strconv" - "sync" "time" + "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/nspcc-dev/neofs-api-go/pkg/token" - "github.com/nspcc-dev/neofs-http-gw/neofs" "github.com/nspcc-dev/neofs-http-gw/tokens" + "github.com/nspcc-dev/neofs-sdk-go/pkg/neofs" "github.com/valyala/fasthttp" "go.uber.org/zap" ) @@ -23,12 +23,6 @@ const ( drainBufSize = 4096 ) -var putOptionsPool = sync.Pool{ - New: func() interface{} { - return new(neofs.PutOptions) - }, -} - // Uploader is an upload request handler. type Uploader struct { log *zap.Logger @@ -47,7 +41,10 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { var ( err error file MultipartFile - addr *object.Address + obj *object.ID + conn client.Client + tkn *token.SessionToken + addr = object.NewAddress() cid = container.NewID() scid, _ = c.UserValue("cid").(string) log = u.log.With(zap.String("cid", scid)) @@ -107,25 +104,31 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { attributes = append(attributes, timestamp) } oid, bt := u.fetchOwnerAndBearerToken(c) - putOpts := putOptionsPool.Get().(*neofs.PutOptions) - defer putOptionsPool.Put(putOpts) + // Try to put file into NeoFS or throw an error. - putOpts.Client, putOpts.SessionToken, err = u.plant.ConnectionArtifacts() + conn, tkn, err = u.plant.ConnectionArtifacts() if err != nil { log.Error("failed to get neofs connection artifacts", zap.Error(err)) c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError) return } - putOpts.Attributes = attributes - putOpts.BearerToken = bt - putOpts.ContainerID = cid - putOpts.OwnerID = oid - putOpts.Reader = file - if addr, err = u.plant.Object().Put(c, putOpts); err != nil { + + rawObject := object.NewRaw() + rawObject.SetContainerID(cid) + rawObject.SetOwnerID(oid) + rawObject.SetAttributes(attributes...) + + ops := new(client.PutObjectParams).WithObject(rawObject.Object()).WithPayloadReader(file) + + if obj, err = conn.PutObject(c, ops, client.WithSession(tkn), client.WithBearer(bt)); err != nil { log.Error("could not store file in neofs", zap.Error(err)) c.Error("could not store file in neofs", fasthttp.StatusBadRequest) return } + + addr.SetObjectID(obj) + addr.SetContainerID(cid) + // Try to return the response, otherwise, if something went wrong, throw an error. if err = newPutResponse(addr).encode(c); err != nil { log.Error("could not prepare response", zap.Error(err)) @@ -151,8 +154,8 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { } func (u *Uploader) fetchOwnerAndBearerToken(ctx context.Context) (*owner.ID, *token.BearerToken) { - if token, err := tokens.LoadBearerToken(ctx); err == nil && token != nil { - return token.Issuer(), token + if tkn, err := tokens.LoadBearerToken(ctx); err == nil && tkn != nil { + return tkn.Issuer(), tkn } return u.plant.OwnerID(), nil }