diff --git a/app.go b/app.go index 841b305..e086ed2 100644 --- a/app.go +++ b/app.go @@ -6,11 +6,11 @@ import ( "strconv" "github.com/fasthttp/router" - "github.com/nspcc-dev/neofs-http-gw/connections" "github.com/nspcc-dev/neofs-http-gw/downloader" - "github.com/nspcc-dev/neofs-http-gw/logger" - "github.com/nspcc-dev/neofs-http-gw/neofs" "github.com/nspcc-dev/neofs-http-gw/uploader" + "github.com/nspcc-dev/neofs-sdk-go/pkg/logger" + "github.com/nspcc-dev/neofs-sdk-go/pkg/neofs" + "github.com/nspcc-dev/neofs-sdk-go/pkg/pool" "github.com/spf13/viper" "github.com/valyala/fasthttp" "go.uber.org/zap" @@ -99,7 +99,7 @@ func newApp(ctx context.Context, opt ...Option) App { if err != nil { a.log.Fatal("failed to get neofs credentials", zap.Error(err)) } - pb := new(connections.PoolBuilder) + pb := new(pool.Builder) for i := 0; ; i++ { address := a.cfg.GetString(cfgPeers + "." + strconv.Itoa(i) + ".address") weight := a.cfg.GetFloat64(cfgPeers + "." + strconv.Itoa(i) + ".weight") @@ -112,7 +112,7 @@ func newApp(ctx context.Context, opt ...Option) App { pb.AddNode(address, weight) a.log.Info("add connection", zap.String("address", address), zap.Float64("weight", weight)) } - opts := &connections.PoolBuilderOptions{ + opts := &pool.BuilderOptions{ Key: creds.PrivateKey(), NodeConnectionTimeout: a.cfg.GetDuration(cfgConTimeout), NodeRequestTimeout: a.cfg.GetDuration(cfgReqTimeout), diff --git a/connections/pool.go b/connections/pool.go deleted file mode 100644 index 4aae633..0000000 --- a/connections/pool.go +++ /dev/null @@ -1,160 +0,0 @@ -package connections - -import ( - "context" - "crypto/ecdsa" - "errors" - "fmt" - "math/rand" - "sync" - "time" - - "github.com/nspcc-dev/neofs-api-go/pkg/client" - "github.com/nspcc-dev/neofs-api-go/pkg/token" - "google.golang.org/grpc" - "google.golang.org/grpc/keepalive" -) - -// PoolBuilderOptions contains options used to build connection pool. -type PoolBuilderOptions struct { - Key *ecdsa.PrivateKey - NodeConnectionTimeout time.Duration - NodeRequestTimeout time.Duration - ClientRebalanceInterval time.Duration - KeepaliveTime time.Duration - KeepaliveTimeout time.Duration - KeepalivePermitWoStream bool - SessionExpirationEpoch uint64 - weights []float64 - connections []*grpc.ClientConn -} - -// PoolBuilder is an interim structure used to collect node addresses/weights and -// build connection pool subsequently. -type PoolBuilder struct { - addresses []string - weights []float64 -} - -// AddNode adds address/weight pair to node PoolBuilder list. -func (pb *PoolBuilder) AddNode(address string, weight float64) *PoolBuilder { - pb.addresses = append(pb.addresses, address) - pb.weights = append(pb.weights, weight) - return pb -} - -// Build creates new pool based on current PoolBuilder state and options. -func (pb *PoolBuilder) Build(ctx context.Context, options *PoolBuilderOptions) (Pool, error) { - if len(pb.addresses) == 0 { - return nil, errors.New("no NeoFS peers configured") - } - totalWeight := 0.0 - for _, w := range pb.weights { - totalWeight += w - } - for i, w := range pb.weights { - pb.weights[i] = w / totalWeight - } - var cons = make([]*grpc.ClientConn, len(pb.addresses)) - for i, address := range pb.addresses { - con, err := func() (*grpc.ClientConn, error) { - toctx, c := context.WithTimeout(ctx, options.NodeConnectionTimeout) - defer c() - return grpc.DialContext(toctx, address, - grpc.WithInsecure(), - grpc.WithBlock(), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: options.KeepaliveTime, - Timeout: options.KeepaliveTimeout, - PermitWithoutStream: options.KeepalivePermitWoStream, - }), - ) - }() - if err != nil { - return nil, err - } - cons[i] = con - } - options.weights = pb.weights - options.connections = cons - return new(ctx, options) -} - -// Pool is an interface providing connection artifacts on request. -type Pool interface { - ConnectionArtifacts() (client.Client, *token.SessionToken, error) -} - -type clientPack struct { - client client.Client - sessionToken *token.SessionToken - healthy bool -} - -type pool struct { - lock sync.RWMutex - sampler *Sampler - clientPacks []*clientPack -} - -func new(ctx context.Context, options *PoolBuilderOptions) (Pool, error) { - clientPacks := make([]*clientPack, len(options.weights)) - for i, con := range options.connections { - c, err := client.New(client.WithDefaultPrivateKey(options.Key), client.WithGRPCConnection(con)) - if err != nil { - return nil, err - } - st, err := c.CreateSession(ctx, options.SessionExpirationEpoch) - if err != nil { - address := "unknown" - if epi, err := c.EndpointInfo(ctx); err == nil { - address = epi.NodeInfo().Address() - } - return nil, fmt.Errorf("failed to create neofs session token for client %s: %w", address, err) - } - clientPacks[i] = &clientPack{client: c, sessionToken: st, healthy: true} - } - source := rand.NewSource(time.Now().UnixNano()) - sampler := NewSampler(options.weights, source) - pool := &pool{sampler: sampler, clientPacks: clientPacks} - go func() { - ticker := time.NewTimer(options.ClientRebalanceInterval) - for range ticker.C { - ok := true - for i, clientPack := range pool.clientPacks { - func() { - tctx, c := context.WithTimeout(ctx, options.NodeRequestTimeout) - defer c() - if _, err := clientPack.client.EndpointInfo(tctx); err != nil { - ok = false - } - pool.lock.Lock() - pool.clientPacks[i].healthy = ok - pool.lock.Unlock() - }() - } - ticker.Reset(options.ClientRebalanceInterval) - } - }() - return pool, nil -} - -func (p *pool) ConnectionArtifacts() (client.Client, *token.SessionToken, error) { - p.lock.RLock() - defer p.lock.RUnlock() - if len(p.clientPacks) == 1 { - cp := p.clientPacks[0] - if cp.healthy { - return cp.client, cp.sessionToken, nil - } - return nil, nil, errors.New("no healthy client") - } - attempts := 3 * len(p.clientPacks) - for k := 0; k < attempts; k++ { - i := p.sampler.Next() - if cp := p.clientPacks[i]; cp.healthy { - return cp.client, cp.sessionToken, nil - } - } - return nil, nil, errors.New("no healthy client") -} diff --git a/connections/sampler.go b/connections/sampler.go deleted file mode 100644 index caff5d6..0000000 --- a/connections/sampler.go +++ /dev/null @@ -1,81 +0,0 @@ -package connections - -import "math/rand" - -// Sampler implements weighted random number generation using Vose's Alias -// Method (https://www.keithschwarz.com/darts-dice-coins/). -type Sampler struct { - randomGenerator *rand.Rand - probabilities []float64 - alias []int -} - -// NewSampler creates new Sampler with a given set of probabilities using -// given source of randomness. Created Sampler will produce numbers from -// 0 to len(probabilities). -func NewSampler(probabilities []float64, source rand.Source) *Sampler { - sampler := &Sampler{} - var ( - small workList - large workList - ) - n := len(probabilities) - sampler.randomGenerator = rand.New(source) - sampler.probabilities = make([]float64, n) - sampler.alias = make([]int, n) - // Compute scaled probabilities. - p := make([]float64, n) - for i := 0; i < n; i++ { - p[i] = probabilities[i] * float64(n) - } - for i, pi := range p { - if pi < 1 { - small.add(i) - } else { - large.add(i) - } - } - for len(small) > 0 && len(large) > 0 { - l, g := small.remove(), large.remove() - sampler.probabilities[l] = p[l] - sampler.alias[l] = g - p[g] = p[g] + p[l] - 1 - if p[g] < 1 { - small.add(g) - } else { - large.add(g) - } - } - for len(large) > 0 { - g := large.remove() - sampler.probabilities[g] = 1 - } - for len(small) > 0 { - l := small.remove() - sampler.probabilities[l] = 1 - } - return sampler -} - -// Next returns the next (not so) random number from Sampler. -func (g *Sampler) Next() int { - n := len(g.alias) - i := g.randomGenerator.Intn(n) - if g.randomGenerator.Float64() < g.probabilities[i] { - return i - } - return g.alias[i] -} - -type workList []int - -func (wl *workList) add(e int) { - *wl = append(*wl, e) -} - -func (wl *workList) remove() int { - l := len(*wl) - 1 - n := (*wl)[l] - *wl = (*wl)[:l] - return n -} diff --git a/downloader/download.go b/downloader/download.go index 39ff41b..51baea2 100644 --- a/downloader/download.go +++ b/downloader/download.go @@ -12,30 +12,18 @@ import ( "sync" "time" + "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-http-gw/neofs" + "github.com/nspcc-dev/neofs-api-go/pkg/token" "github.com/nspcc-dev/neofs-http-gw/tokens" + "github.com/nspcc-dev/neofs-sdk-go/pkg/neofs" "github.com/valyala/fasthttp" "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) -var ( - getOptionsPool = sync.Pool{ - New: func() interface{} { - return new(neofs.GetOptions) - }, - } - - searchOptionsPool = sync.Pool{ - New: func() interface{} { - return new(neofs.SearchOptions) - }, - } -) - type ( detector struct { io.Writer @@ -45,8 +33,7 @@ type ( request struct { *fasthttp.RequestCtx - log *zap.Logger - objectClient neofs.ObjectClient + log *zap.Logger } objectIDs []*object.ID @@ -85,12 +72,15 @@ func isValidValue(s string) bool { return true } -func (r *request) receiveFile(options *neofs.GetOptions) { +func (r *request) receiveFile(clnt client.Client, + sessionToken *token.SessionToken, + objectAddress *object.Address) { var ( err error dis = "inline" start = time.Now() filename string + obj *object.Object ) if err = tokens.StoreBearerToken(r.RequestCtx); err != nil { r.log.Error("could not fetch and store bearer token", zap.Error(err)) @@ -98,8 +88,15 @@ func (r *request) receiveFile(options *neofs.GetOptions) { return } writer := newDetector(r.Response.BodyWriter()) - options.Writer = writer - obj, err := r.objectClient.Get(r.RequestCtx, options) + options := new(client.GetObjectParams). + WithAddress(objectAddress). + WithPayloadWriter(writer) + + obj, err = clnt.GetObject( + r.RequestCtx, + options, + client.WithSession(sessionToken), + ) if err != nil { r.log.Error( "could not receive object", @@ -183,9 +180,8 @@ func New(ctx context.Context, log *zap.Logger, plant neofs.ClientPlant) (*Downlo func (d *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request { return &request{ - RequestCtx: ctx, - log: log, - objectClient: d.plant.Object(), + RequestCtx: ctx, + log: log, } } @@ -198,23 +194,22 @@ func (d *Downloader) DownloadByAddress(c *fasthttp.RequestCtx) { oid, _ = c.UserValue("oid").(string) val = strings.Join([]string{cid, oid}, "/") log = d.log.With(zap.String("cid", cid), zap.String("oid", oid)) + conn client.Client + tkn *token.SessionToken ) if err = address.Parse(val); err != nil { log.Error("wrong object address", zap.Error(err)) c.Error("wrong object address", fasthttp.StatusBadRequest) return } - getOpts := getOptionsPool.Get().(*neofs.GetOptions) - defer getOptionsPool.Put(getOpts) - getOpts.Client, getOpts.SessionToken, err = d.plant.ConnectionArtifacts() + + conn, tkn, err = d.plant.ConnectionArtifacts() if err != nil { log.Error("failed to get neofs connection artifacts", zap.Error(err)) c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError) return } - getOpts.ObjectAddress = address - getOpts.Writer = nil - d.newRequest(c, log).receiveFile(getOpts) + d.newRequest(c, log).receiveFile(conn, tkn, address) } // DownloadByAttribute handles attribute-based download requests. @@ -225,6 +220,9 @@ func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) { key, _ = c.UserValue("attr_key").(string) val, _ = c.UserValue("attr_val").(string) log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val)) + ids []*object.ID + conn client.Client + tkn *token.SessionToken ) cid := container.NewID() if err = cid.Parse(scid); err != nil { @@ -232,20 +230,20 @@ func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) { c.Error("wrong container id", fasthttp.StatusBadRequest) return } - searchOpts := searchOptionsPool.Get().(*neofs.SearchOptions) - defer searchOptionsPool.Put(searchOpts) - searchOpts.Client, searchOpts.SessionToken, err = d.plant.ConnectionArtifacts() + + conn, tkn, err = d.plant.ConnectionArtifacts() if err != nil { log.Error("failed to get neofs connection artifacts", zap.Error(err)) c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError) return } - searchOpts.BearerToken = nil - searchOpts.ContainerID = cid - searchOpts.Attribute.Key = key - searchOpts.Attribute.Value = val - var ids []*object.ID - if ids, err = d.plant.Object().Search(c, searchOpts); err != nil { + + options := object.NewSearchFilters() + options.AddRootFilter() + options.AddFilter(key, val, object.MatchStringEqual) + + sops := new(client.SearchObjectParams).WithContainerID(cid).WithSearchFilters(options) + if ids, err = conn.SearchObject(c, sops, client.WithSession(tkn)); err != nil { log.Error("something went wrong", zap.Error(err)) c.Error("something went wrong", fasthttp.StatusBadRequest) return @@ -262,15 +260,12 @@ func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) { address := object.NewAddress() address.SetContainerID(cid) address.SetObjectID(ids[0]) - getOpts := getOptionsPool.Get().(*neofs.GetOptions) - defer getOptionsPool.Put(getOpts) - getOpts.Client, getOpts.SessionToken, err = d.plant.ConnectionArtifacts() + + conn, tkn, err = d.plant.ConnectionArtifacts() if err != nil { log.Error("failed to get neofs connection artifacts", zap.Error(err)) c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError) return } - getOpts.ObjectAddress = address - getOpts.Writer = nil - d.newRequest(c, log).receiveFile(getOpts) + d.newRequest(c, log).receiveFile(conn, tkn, address) } diff --git a/go.mod b/go.mod index 2919778..059c184 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/fasthttp/router v1.3.5 github.com/mr-tron/base58 v1.1.3 // indirect github.com/nspcc-dev/neofs-api-go v1.26.1 - github.com/nspcc-dev/neofs-crypto v0.3.0 + github.com/nspcc-dev/neofs-sdk-go v0.0.0-20210527182636-cbfc17a1a9a2 github.com/prometheus/client_golang v1.9.0 github.com/prometheus/common v0.15.0 github.com/spf13/pflag v1.0.5 diff --git a/go.sum b/go.sum index 63228d1..6459a87 100644 --- a/go.sum +++ b/go.sum @@ -29,6 +29,8 @@ github.com/abiosoft/ishell v2.0.0+incompatible/go.mod h1:HQR9AqF2R3P4XXpMpI0NAzg github.com/abiosoft/readline v0.0.0-20180607040430-155bce2042db/go.mod h1:rB3B4rKii8V21ydCbIzH5hZiCQE7f5E9SzUb/ZZx530= github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII= github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5/go.mod h1:SkGFH1ia65gfNATL8TAiHDNxPzPdmEL5uirI2Uyuz6c= +github.com/alecthomas/participle v0.7.1/go.mod h1:HfdmEuwvr12HXQN44HPWXR0lHmVolVYe4dyL6lQ3duY= +github.com/alecthomas/repr v0.0.0-20181024024818-d37bc2a10ba1/go.mod h1:xTS7Pm1pD1mvyM075QCDSRqH6qRLXylzS24ZTpRiSzQ= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -317,6 +319,8 @@ github.com/nspcc-dev/neofs-crypto v0.2.0/go.mod h1:F/96fUzPM3wR+UGsPi3faVNmFlA9K github.com/nspcc-dev/neofs-crypto v0.2.3/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw= github.com/nspcc-dev/neofs-crypto v0.3.0 h1:zlr3pgoxuzrmGCxc5W8dGVfA9Rro8diFvVnBg0L4ifM= github.com/nspcc-dev/neofs-crypto v0.3.0/go.mod h1:8w16GEJbH6791ktVqHN9YRNH3s9BEEKYxGhlFnp0cDw= +github.com/nspcc-dev/neofs-sdk-go v0.0.0-20210527182636-cbfc17a1a9a2 h1:z8xtKILKi+Dolk3VAyCaFPMroFnT+x8qTqMT/zBRqIc= +github.com/nspcc-dev/neofs-sdk-go v0.0.0-20210527182636-cbfc17a1a9a2/go.mod h1:QZE7VaNQRyNFS+3gsrNEQEiLe+d6AR6EteX1M9geh6A= github.com/nspcc-dev/rfc6979 v0.1.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso= github.com/nspcc-dev/rfc6979 v0.2.0 h1:3e1WNxrN60/6N0DW7+UYisLeZJyfqZTNOjeV/toYvOE= github.com/nspcc-dev/rfc6979 v0.2.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso= diff --git a/logger/grpc.go b/logger/grpc.go deleted file mode 100644 index 584bf25..0000000 --- a/logger/grpc.go +++ /dev/null @@ -1,78 +0,0 @@ -package logger - -import ( - "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "google.golang.org/grpc/grpclog" -) - -type ( - zapLogger struct { - zapcore.Core - log *zap.SugaredLogger - } - - // Logger includes grpclog.LoggerV2 interface with an additional - // Println method. - Logger interface { - grpclog.LoggerV2 - Println(v ...interface{}) - } -) - -// GRPC wraps given zap.Logger into grpclog.LoggerV2+ interface. -func GRPC(l *zap.Logger) Logger { - log := l.WithOptions( - // skip gRPCLog + zapLogger in caller - zap.AddCallerSkip(2)) - - return &zapLogger{ - Core: log.Core(), - log: log.Sugar(), - } -} - -// Info implements grpclog.LoggerV2. -func (z *zapLogger) Info(args ...interface{}) { z.log.Info(args...) } - -// Infoln implements grpclog.LoggerV2. -func (z *zapLogger) Infoln(args ...interface{}) { z.log.Info(args...) } - -// Infof implements grpclog.LoggerV2. -func (z *zapLogger) Infof(format string, args ...interface{}) { z.log.Infof(format, args...) } - -// Println allows to print a line with info severity. -func (z *zapLogger) Println(args ...interface{}) { z.log.Info(args...) } - -// Printf implements grpclog.LoggerV2. -func (z *zapLogger) Printf(format string, args ...interface{}) { z.log.Infof(format, args...) } - -// Warning implements grpclog.LoggerV2. -func (z *zapLogger) Warning(args ...interface{}) { z.log.Warn(args...) } - -// Warningln implements grpclog.LoggerV2. -func (z *zapLogger) Warningln(args ...interface{}) { z.log.Warn(args...) } - -// Warningf implements grpclog.LoggerV2. -func (z *zapLogger) Warningf(format string, args ...interface{}) { z.log.Warnf(format, args...) } - -// Error implements grpclog.LoggerV2. -func (z *zapLogger) Error(args ...interface{}) { z.log.Error(args...) } - -// Errorln implements grpclog.LoggerV2. -func (z *zapLogger) Errorln(args ...interface{}) { z.log.Error(args...) } - -// Errorf implements grpclog.LoggerV2. -func (z *zapLogger) Errorf(format string, args ...interface{}) { z.log.Errorf(format, args...) } - -// Fatal implements grpclog.LoggerV2. -func (z *zapLogger) Fatal(args ...interface{}) { z.log.Fatal(args...) } - -// Fatalln implements grpclog.LoggerV2. -func (z *zapLogger) Fatalln(args ...interface{}) { z.log.Fatal(args...) } - -// Fatalf implements grpclog.LoggerV2. -func (z *zapLogger) Fatalf(format string, args ...interface{}) { z.log.Fatalf(format, args...) } - -// V implements grpclog.LoggerV2. -func (z *zapLogger) V(int) bool { return z.Enabled(zapcore.DebugLevel) } diff --git a/logger/option.go b/logger/option.go deleted file mode 100644 index f617010..0000000 --- a/logger/option.go +++ /dev/null @@ -1,33 +0,0 @@ -package logger - -import "go.uber.org/zap" - -// WithSamplingInitial returns Option that sets sampling initial parameter. -func WithSamplingInitial(v int) Option { return func(o *options) { o.SamplingInitial = v } } - -// WithSamplingThereafter returns Option that sets sampling thereafter parameter. -func WithSamplingThereafter(v int) Option { return func(o *options) { o.SamplingThereafter = v } } - -// WithFormat returns Option that sets format parameter. -func WithFormat(v string) Option { return func(o *options) { o.Format = v } } - -// WithLevel returns Option that sets Level parameter. -func WithLevel(v string) Option { return func(o *options) { o.Level = v } } - -// WithTraceLevel returns Option that sets trace level parameter. -func WithTraceLevel(v string) Option { return func(o *options) { o.TraceLevel = v } } - -// WithoutDisclaimer returns Option that disables disclaimer. -func WithoutDisclaimer() Option { return func(o *options) { o.NoDisclaimer = true } } - -// WithoutCaller returns Option that disables caller printing. -func WithoutCaller() Option { return func(o *options) { o.NoCaller = true } } - -// WithAppName returns Option that sets application name. -func WithAppName(v string) Option { return func(o *options) { o.AppName = v } } - -// WithAppVersion returns Option that sets application version. -func WithAppVersion(v string) Option { return func(o *options) { o.AppVersion = v } } - -// WithZapOptions returns Option that sets zap logger options. -func WithZapOptions(opts ...zap.Option) Option { return func(o *options) { o.Options = opts } } diff --git a/logger/zap.go b/logger/zap.go deleted file mode 100644 index 5e9ee62..0000000 --- a/logger/zap.go +++ /dev/null @@ -1,134 +0,0 @@ -package logger - -import ( - "strings" - - "go.uber.org/zap" - "go.uber.org/zap/zapcore" -) - -type ( - // Option represents logger option setter. - Option func(o *options) - - options struct { - Options []zap.Option - - SamplingInitial int - SamplingThereafter int - - Format string - Level string - TraceLevel string - - NoCaller bool - NoDisclaimer bool - - AppName string - AppVersion string - } -) - -const ( - formatJSON = "json" - formatConsole = "console" - - defaultSamplingInitial = 100 - defaultSamplingThereafter = 100 - - lvlInfo = "info" - lvlWarn = "warn" - lvlDebug = "debug" - lvlError = "error" - lvlFatal = "fatal" - lvlPanic = "panic" -) - -func safeLevel(lvl string) zap.AtomicLevel { - switch strings.ToLower(lvl) { - case lvlDebug: - return zap.NewAtomicLevelAt(zap.DebugLevel) - case lvlWarn: - return zap.NewAtomicLevelAt(zap.WarnLevel) - case lvlError: - return zap.NewAtomicLevelAt(zap.ErrorLevel) - case lvlFatal: - return zap.NewAtomicLevelAt(zap.FatalLevel) - case lvlPanic: - return zap.NewAtomicLevelAt(zap.PanicLevel) - default: - return zap.NewAtomicLevelAt(zap.InfoLevel) - } -} - -func defaults() *options { - return &options{ - SamplingInitial: defaultSamplingInitial, - SamplingThereafter: defaultSamplingThereafter, - - Format: formatConsole, - Level: lvlDebug, - TraceLevel: lvlInfo, - - NoCaller: false, - NoDisclaimer: false, - - AppName: "", - AppVersion: "", - } -} - -// New returns new zap.Logger using all options specified and stdout used -// for output. -func New(opts ...Option) (*zap.Logger, error) { - o := defaults() - c := zap.NewProductionConfig() - - c.OutputPaths = []string{"stdout"} - c.ErrorOutputPaths = []string{"stdout"} - - for _, opt := range opts { - opt(o) - } - - // set sampling - c.Sampling = &zap.SamplingConfig{ - Initial: o.SamplingInitial, - Thereafter: o.SamplingThereafter, - } - - // logger level - c.Level = safeLevel(o.Level) - traceLvl := safeLevel(o.TraceLevel) - - // logger format - switch f := o.Format; strings.ToLower(f) { - case formatConsole: - c.Encoding = formatConsole - default: - c.Encoding = formatJSON - } - - // logger time - c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder - - if o.NoCaller { - c.EncoderConfig.EncodeCaller = nil - } - - // enable trace only for current log-level - o.Options = append(o.Options, zap.AddStacktrace(traceLvl)) - - l, err := c.Build(o.Options...) - if err != nil { - return nil, err - } - - if o.NoDisclaimer { - return l, nil - } - - return l.With( - zap.String("app_name", o.AppName), - zap.String("app_version", o.AppVersion)), nil -} diff --git a/main.go b/main.go index e89489b..830a26f 100644 --- a/main.go +++ b/main.go @@ -5,7 +5,7 @@ import ( "os/signal" "syscall" - "github.com/nspcc-dev/neofs-http-gw/logger" + "github.com/nspcc-dev/neofs-sdk-go/pkg/logger" "github.com/spf13/viper" "go.uber.org/zap" ) diff --git a/neofs/client-plant.go b/neofs/client-plant.go deleted file mode 100644 index e13f29d..0000000 --- a/neofs/client-plant.go +++ /dev/null @@ -1,177 +0,0 @@ -package neofs - -import ( - "context" - "crypto/ecdsa" - "io" - - "github.com/nspcc-dev/neofs-api-go/pkg/client" - "github.com/nspcc-dev/neofs-api-go/pkg/container" - "github.com/nspcc-dev/neofs-api-go/pkg/object" - "github.com/nspcc-dev/neofs-api-go/pkg/owner" - "github.com/nspcc-dev/neofs-api-go/pkg/token" - "github.com/nspcc-dev/neofs-http-gw/connections" -) - -// BaseOptions represents basic NeoFS request options. -type BaseOptions struct { - Client client.Client - SessionToken *token.SessionToken - BearerToken *token.BearerToken -} - -// PutOptions represents NeoFS Put request options. -type PutOptions struct { - BaseOptions - Attributes []*object.Attribute - ContainerID *container.ID - OwnerID *owner.ID - Reader io.Reader -} - -// GetOptions represents NeoFS Get request options. -type GetOptions struct { - BaseOptions - ObjectAddress *object.Address - Writer io.Writer -} - -// SearchOptions represents NeoFS Search request options. -type SearchOptions struct { - BaseOptions - ContainerID *container.ID - Attribute struct { - Key string - Value string - } -} - -// DeleteOptions represents NeoFS Delete request options. -type DeleteOptions struct { - BaseOptions - ObjectAddress *object.Address -} - -// ObjectClient wraps basic NeoFS requests. -type ObjectClient interface { - Put(context.Context, *PutOptions) (*object.Address, error) - Get(context.Context, *GetOptions) (*object.Object, error) - Search(context.Context, *SearchOptions) ([]*object.ID, error) - Delete(context.Context, *DeleteOptions) error -} - -// ClientPlant provides connections to NeoFS nodes from pool and allows to -// get local owner ID. -type ClientPlant interface { - ConnectionArtifacts() (client.Client, *token.SessionToken, error) - Object() ObjectClient - OwnerID() *owner.ID -} - -type neofsObjectClient struct { - key *ecdsa.PrivateKey - pool connections.Pool -} - -type neofsClientPlant struct { - key *ecdsa.PrivateKey - ownerID *owner.ID - pool connections.Pool -} - -// ConnectionArtifacts returns connection from pool. -func (cp *neofsClientPlant) ConnectionArtifacts() (client.Client, *token.SessionToken, error) { - return cp.pool.ConnectionArtifacts() -} - -// Object returns ObjectClient instance from plant. -func (cp *neofsClientPlant) Object() ObjectClient { - return &neofsObjectClient{ - key: cp.key, - pool: cp.pool, - } -} - -// OwnerID returns plant's owner ID. -func (cp *neofsClientPlant) OwnerID() *owner.ID { - return cp.ownerID -} - -// NewClientPlant creates new ClientPlant from given context, pool and credentials. -func NewClientPlant(ctx context.Context, pool connections.Pool, creds Credentials) (ClientPlant, error) { - return &neofsClientPlant{key: creds.PrivateKey(), ownerID: creds.Owner(), pool: pool}, nil -} - -// Put does NeoFS Put request, returning new object address if successful. -func (oc *neofsObjectClient) Put(ctx context.Context, options *PutOptions) (*object.Address, error) { - var ( - err error - objectID *object.ID - ) - address := object.NewAddress() - rawObject := object.NewRaw() - rawObject.SetContainerID(options.ContainerID) - rawObject.SetOwnerID(options.OwnerID) - rawObject.SetAttributes(options.Attributes...) - ops := new(client.PutObjectParams). - WithObject(rawObject.Object()). - WithPayloadReader(options.Reader) - objectID, err = options.Client.PutObject( - ctx, - ops, - client.WithSession(options.SessionToken), - client.WithBearer(options.BearerToken), - ) - if err != nil { - return nil, err - } - address.SetObjectID(objectID) - address.SetContainerID(options.ContainerID) - return address, nil -} - -// Get does NeoFS Get request, returning an object received if successful. -func (oc *neofsObjectClient) Get(ctx context.Context, options *GetOptions) (*object.Object, error) { - var ( - err error - obj *object.Object - ) - ops := new(client.GetObjectParams). - WithAddress(options.ObjectAddress). - WithPayloadWriter(options.Writer) - obj, err = options.Client.GetObject( - ctx, - ops, - client.WithSession(options.SessionToken), - client.WithBearer(options.BearerToken), - ) - return obj, err -} - -// Search does NeoFS Search request, returning object IDs if successful. -func (oc *neofsObjectClient) Search(ctx context.Context, options *SearchOptions) ([]*object.ID, error) { - sfs := object.NewSearchFilters() - sfs.AddRootFilter() - sfs.AddFilter(options.Attribute.Key, options.Attribute.Value, object.MatchStringEqual) - sops := new(client.SearchObjectParams) - sops.WithContainerID(options.ContainerID) - sops.WithSearchFilters(sfs) - return options.Client.SearchObject( - ctx, - sops, - client.WithSession(options.SessionToken), - client.WithBearer(options.BearerToken), - ) -} - -// Delete deletes NeoFS object. -func (oc *neofsObjectClient) Delete(ctx context.Context, options *DeleteOptions) error { - ops := new(client.DeleteObjectParams).WithAddress(options.ObjectAddress) - err := options.Client.DeleteObject( - ctx, - ops, - client.WithSession(options.SessionToken), - client.WithBearer(options.BearerToken), - ) - return err -} diff --git a/neofs/credentials.go b/neofs/credentials.go deleted file mode 100644 index c229468..0000000 --- a/neofs/credentials.go +++ /dev/null @@ -1,78 +0,0 @@ -package neofs - -import ( - "crypto/ecdsa" - "crypto/elliptic" - "crypto/rand" - "math/big" - - "github.com/nspcc-dev/neofs-api-go/pkg/owner" - crypto "github.com/nspcc-dev/neofs-crypto" -) - -type ( - // Credentials contains methods that needed to work with NeoFS. - Credentials interface { - Owner() *owner.ID - PublicKey() *ecdsa.PublicKey - PrivateKey() *ecdsa.PrivateKey - } - - credentials struct { - key *ecdsa.PrivateKey - ownerID *owner.ID - } -) - -// NewCredentials creates an instance of Credentials through string -// representation of secret. It allows passing WIF, path, hex-encoded and others. -func NewCredentials(secret string) (Credentials, error) { - key, err := crypto.LoadPrivateKey(secret) - if err != nil { - return nil, err - } - return setFromPrivateKey(key) -} - -// NewEphemeralCredentials creates new private key and Credentials based on that -// key. -func NewEphemeralCredentials() (Credentials, error) { - c := elliptic.P256() - priv, x, y, err := elliptic.GenerateKey(c, rand.Reader) - if err != nil { - return nil, err - } - key := &ecdsa.PrivateKey{ - PublicKey: ecdsa.PublicKey{ - Curve: c, - X: x, - Y: y, - }, - D: new(big.Int).SetBytes(priv), - } - return setFromPrivateKey(key) -} - -// PrivateKey returns ecdsa.PrivateKey. -func (c *credentials) PrivateKey() *ecdsa.PrivateKey { - return c.key -} - -// PublicKey returns ecdsa.PublicKey. -func (c *credentials) PublicKey() *ecdsa.PublicKey { - return &c.key.PublicKey -} - -// Owner returns owner.ID. -func (c *credentials) Owner() *owner.ID { - return c.ownerID -} - -func setFromPrivateKey(key *ecdsa.PrivateKey) (*credentials, error) { - wallet, err := owner.NEO3WalletFromPublicKey(&key.PublicKey) - if err != nil { - return nil, err - } - ownerID := owner.NewIDFromNeo3Wallet(wallet) - return &credentials{key: key, ownerID: ownerID}, nil -} diff --git a/uploader/upload.go b/uploader/upload.go index a9c874f..db5b7e8 100644 --- a/uploader/upload.go +++ b/uploader/upload.go @@ -5,15 +5,15 @@ import ( "encoding/json" "io" "strconv" - "sync" "time" + "github.com/nspcc-dev/neofs-api-go/pkg/client" "github.com/nspcc-dev/neofs-api-go/pkg/container" "github.com/nspcc-dev/neofs-api-go/pkg/object" "github.com/nspcc-dev/neofs-api-go/pkg/owner" "github.com/nspcc-dev/neofs-api-go/pkg/token" - "github.com/nspcc-dev/neofs-http-gw/neofs" "github.com/nspcc-dev/neofs-http-gw/tokens" + "github.com/nspcc-dev/neofs-sdk-go/pkg/neofs" "github.com/valyala/fasthttp" "go.uber.org/zap" ) @@ -23,12 +23,6 @@ const ( drainBufSize = 4096 ) -var putOptionsPool = sync.Pool{ - New: func() interface{} { - return new(neofs.PutOptions) - }, -} - // Uploader is an upload request handler. type Uploader struct { log *zap.Logger @@ -47,7 +41,10 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { var ( err error file MultipartFile - addr *object.Address + obj *object.ID + conn client.Client + tkn *token.SessionToken + addr = object.NewAddress() cid = container.NewID() scid, _ = c.UserValue("cid").(string) log = u.log.With(zap.String("cid", scid)) @@ -107,25 +104,31 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { attributes = append(attributes, timestamp) } oid, bt := u.fetchOwnerAndBearerToken(c) - putOpts := putOptionsPool.Get().(*neofs.PutOptions) - defer putOptionsPool.Put(putOpts) + // Try to put file into NeoFS or throw an error. - putOpts.Client, putOpts.SessionToken, err = u.plant.ConnectionArtifacts() + conn, tkn, err = u.plant.ConnectionArtifacts() if err != nil { log.Error("failed to get neofs connection artifacts", zap.Error(err)) c.Error("failed to get neofs connection artifacts", fasthttp.StatusInternalServerError) return } - putOpts.Attributes = attributes - putOpts.BearerToken = bt - putOpts.ContainerID = cid - putOpts.OwnerID = oid - putOpts.Reader = file - if addr, err = u.plant.Object().Put(c, putOpts); err != nil { + + rawObject := object.NewRaw() + rawObject.SetContainerID(cid) + rawObject.SetOwnerID(oid) + rawObject.SetAttributes(attributes...) + + ops := new(client.PutObjectParams).WithObject(rawObject.Object()).WithPayloadReader(file) + + if obj, err = conn.PutObject(c, ops, client.WithSession(tkn), client.WithBearer(bt)); err != nil { log.Error("could not store file in neofs", zap.Error(err)) c.Error("could not store file in neofs", fasthttp.StatusBadRequest) return } + + addr.SetObjectID(obj) + addr.SetContainerID(cid) + // Try to return the response, otherwise, if something went wrong, throw an error. if err = newPutResponse(addr).encode(c); err != nil { log.Error("could not prepare response", zap.Error(err)) @@ -151,8 +154,8 @@ func (u *Uploader) Upload(c *fasthttp.RequestCtx) { } func (u *Uploader) fetchOwnerAndBearerToken(ctx context.Context) (*owner.ID, *token.BearerToken) { - if token, err := tokens.LoadBearerToken(ctx); err == nil && token != nil { - return token.Issuer(), token + if tkn, err := tokens.LoadBearerToken(ctx); err == nil && tkn != nil { + return tkn.Issuer(), tkn } return u.plant.OwnerID(), nil }