Fix S3 NeoFS integration

This commit is contained in:
Evgeniy Kulikov 2020-07-13 14:23:23 +03:00
parent 45d31752a9
commit b9c4156e5b
5 changed files with 80 additions and 14 deletions

View file

@ -122,7 +122,7 @@ func newApp(l *zap.Logger, v *viper.Viper) *App {
zap.String("SecretKey", wif)) zap.String("SecretKey", wif))
} }
if obj, err = layer.NewLayer(cli, auth.Credentials{AccessKey: uid.String(), SecretKey: wif}); err != nil { if obj, err = layer.NewLayer(cli, l, auth.Credentials{AccessKey: uid.String(), SecretKey: wif}); err != nil {
l.Fatal("could not prepare ObjectLayer", l.Fatal("could not prepare ObjectLayer",
zap.Error(err)) zap.Error(err))
} }
@ -188,8 +188,13 @@ func (a *App) Server(ctx context.Context) {
a.log.Info("starting server", a.log.Info("starting server",
zap.String("bind", addr)) zap.String("bind", addr))
// var (
// keyPath string
// certPath string
// )
if err = srv.Serve(lis); err != nil && err != http.ErrServerClosed { if err = srv.Serve(lis); err != nil && err != http.ErrServerClosed {
a.log.Warn("listen and serve", a.log.Fatal("listen and serve",
zap.Error(err)) zap.Error(err))
} }
}() }()

View file

@ -6,6 +6,40 @@ import (
) )
func AttachS3API(r *mux.Router, obj ObjectLayer, l *zap.Logger) { func AttachS3API(r *mux.Router, obj ObjectLayer, l *zap.Logger) {
{ // should be removed in feature
// Initialize all help
initHelp()
globalGatewayName = "NeoFS GW"
// Set when gateway is enabled
globalIsGateway = true
// Handle gateway specific env
gatewayHandleEnvVars()
// Set system resources to maximum.
if err := setMaxResources(); err != nil {
l.Warn("could not set max resources",
zap.Error(err))
}
// TODO: We need to move this code with globalConfigSys.Init()
// for now keep it here such that "s3" gateway layer initializes
// itself properly when KMS is set.
// Initialize server config.
srvCfg := newServerConfig()
// Override any values from ENVs.
lookupConfigs(srvCfg)
// hold the mutex lock before a new config is assigned.
globalServerConfigMu.Lock()
globalServerConfig = srvCfg
globalServerConfigMu.Unlock()
}
// Add healthcheck router // Add healthcheck router
registerHealthCheckRouter(r) registerHealthCheckRouter(r)
@ -35,13 +69,4 @@ func AttachS3API(r *mux.Router, obj ObjectLayer, l *zap.Logger) {
globalObjLayerMutex.Lock() globalObjLayerMutex.Lock()
globalSafeMode = false globalSafeMode = false
globalObjLayerMutex.Unlock() globalObjLayerMutex.Unlock()
// Handle gateway specific env
gatewayHandleEnvVars()
// Set system resources to maximum.
if err := setMaxResources(); err != nil {
l.Warn("could not set max resources",
zap.Error(err))
}
} }

View file

@ -14,6 +14,7 @@ import (
"github.com/nspcc-dev/neofs-api-go/service" "github.com/nspcc-dev/neofs-api-go/service"
crypto "github.com/nspcc-dev/neofs-crypto" crypto "github.com/nspcc-dev/neofs-crypto"
"github.com/pkg/errors" "github.com/pkg/errors"
"go.uber.org/zap"
) )
type ( type (
@ -23,6 +24,7 @@ type (
minio.GatewayUnsupported // placeholder for unimplemented functions minio.GatewayUnsupported // placeholder for unimplemented functions
cli pool.Client cli pool.Client
log *zap.Logger
key *ecdsa.PrivateKey key *ecdsa.PrivateKey
owner refs.OwnerID owner refs.OwnerID
token *service.Token token *service.Token
@ -40,7 +42,7 @@ type (
// NewGatewayLayer creates instance of neofsObject. It checks credentials // NewGatewayLayer creates instance of neofsObject. It checks credentials
// and establishes gRPC connection with node. // and establishes gRPC connection with node.
func NewLayer(cli pool.Client, cred auth.Credentials) (minio.ObjectLayer, error) { func NewLayer(cli pool.Client, log *zap.Logger, cred auth.Credentials) (minio.ObjectLayer, error) {
// check if wif is correct // check if wif is correct
key, err := crypto.WIFDecode(cred.SecretKey) key, err := crypto.WIFDecode(cred.SecretKey)
if err != nil { if err != nil {
@ -73,6 +75,7 @@ func NewLayer(cli pool.Client, cred auth.Credentials) (minio.ObjectLayer, error)
return &neofsObject{ return &neofsObject{
cli: cli, cli: cli,
key: key, key: key,
log: log,
owner: owner, owner: owner,
token: token, token: token,
}, nil }, nil

View file

@ -12,6 +12,7 @@ import (
"github.com/nspcc-dev/neofs-api-go/refs" "github.com/nspcc-dev/neofs-api-go/refs"
"github.com/nspcc-dev/neofs-api-go/service" "github.com/nspcc-dev/neofs-api-go/service"
"github.com/nspcc-dev/neofs-api-go/storagegroup" "github.com/nspcc-dev/neofs-api-go/storagegroup"
"go.uber.org/zap"
) )
const ( const (
@ -350,11 +351,15 @@ func (n *neofsObject) objectPut(ctx context.Context, p putParams) (*object.Objec
verb: service.Token_Info_Put, verb: service.Token_Info_Put,
}) })
if err != nil { if err != nil {
n.log.Error("could not prepare token",
zap.Error(err))
return nil, err return nil, err
} }
conn, err := n.cli.GetConnection(ctx) conn, err := n.cli.GetConnection(ctx)
if err != nil { if err != nil {
n.log.Error("could not prepare connection",
zap.Error(err))
return nil, err return nil, err
} }
@ -362,6 +367,8 @@ func (n *neofsObject) objectPut(ctx context.Context, p putParams) (*object.Objec
// todo: think about timeout // todo: think about timeout
putClient, err := client.Put(ctx) putClient, err := client.Put(ctx)
if err != nil { if err != nil {
n.log.Error("could not prepare PutClient",
zap.Error(err))
return nil, err return nil, err
} }
@ -390,17 +397,23 @@ func (n *neofsObject) objectPut(ctx context.Context, p putParams) (*object.Objec
err = service.SignRequestData(n.key, req) err = service.SignRequestData(n.key, req)
if err != nil { if err != nil {
n.log.Error("could not prepare request",
zap.Error(err))
return nil, err return nil, err
} }
err = putClient.Send(req) err = putClient.Send(req)
if err != nil { if err != nil {
n.log.Error("could not send request",
zap.Error(err))
return nil, err return nil, err
} }
read, err := p.r.Read(readBuffer) read, err := p.r.Read(readBuffer)
for read > 0 { for read > 0 {
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
n.log.Error("something went wrong",
zap.Error(err))
return nil, err return nil, err
} }
@ -411,11 +424,15 @@ func (n *neofsObject) objectPut(ctx context.Context, p putParams) (*object.Objec
err = service.SignRequestData(n.key, req) err = service.SignRequestData(n.key, req)
if err != nil { if err != nil {
n.log.Error("could not sign chunk request",
zap.Error(err))
return nil, err return nil, err
} }
err = putClient.Send(req) err = putClient.Send(req)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
n.log.Error("could not send chunk",
zap.Error(err))
return nil, err return nil, err
} }
} }
@ -425,6 +442,8 @@ func (n *neofsObject) objectPut(ctx context.Context, p putParams) (*object.Objec
_, err = putClient.CloseAndRecv() _, err = putClient.CloseAndRecv()
if err != nil { if err != nil {
n.log.Error("could not finish request",
zap.Error(err))
return nil, err return nil, err
} }

View file

@ -6,7 +6,6 @@ import (
"strings" "strings"
"time" "time"
"github.com/gogo/protobuf/proto"
minio "github.com/minio/minio/legacy" minio "github.com/minio/minio/legacy"
"github.com/minio/minio/neofs/pool" "github.com/minio/minio/neofs/pool"
"github.com/nspcc-dev/neofs-api-go/object" "github.com/nspcc-dev/neofs-api-go/object"
@ -104,7 +103,22 @@ func generateToken(ctx context.Context, p tokenParams) (*service.Token, error) {
} }
func prepareToken(t *service.Token, p queryParams) (*service.Token, error) { func prepareToken(t *service.Token, p queryParams) (*service.Token, error) {
token := proto.Clone(t).(*service.Token) sig := make([]byte, len(t.Signature))
copy(sig, t.Signature)
token := &service.Token{
Token_Info: service.Token_Info{
ID: t.ID,
OwnerID: t.OwnerID,
Verb: t.Verb,
Address: t.Address,
TokenLifetime: t.TokenLifetime,
SessionKey: t.SessionKey,
OwnerKey: t.OwnerKey,
},
Signature: sig,
}
token.SetAddress(p.addr) token.SetAddress(p.addr)
token.SetVerb(p.verb) token.SetVerb(p.verb)