NFSSVC-27 Implement list-buckets
This commit is contained in:
parent
309179e9ba
commit
e776e9c0cb
8 changed files with 352 additions and 25 deletions
|
@ -195,7 +195,7 @@ func newSettings() *viper.Viper {
|
||||||
// logger:
|
// logger:
|
||||||
v.SetDefault(cfgLoggerLevel, "debug")
|
v.SetDefault(cfgLoggerLevel, "debug")
|
||||||
v.SetDefault(cfgLoggerFormat, "console")
|
v.SetDefault(cfgLoggerFormat, "console")
|
||||||
v.SetDefault(cfgLoggerTraceLevel, "fatal")
|
v.SetDefault(cfgLoggerTraceLevel, "panic")
|
||||||
v.SetDefault(cfgLoggerNoDisclaimer, true)
|
v.SetDefault(cfgLoggerNoDisclaimer, true)
|
||||||
v.SetDefault(cfgLoggerSamplingInitial, 1000)
|
v.SetDefault(cfgLoggerSamplingInitial, 1000)
|
||||||
v.SetDefault(cfgLoggerSamplingThereafter, 1000)
|
v.SetDefault(cfgLoggerSamplingThereafter, 1000)
|
||||||
|
|
|
@ -66,10 +66,6 @@ func newApp(l *zap.Logger, v *viper.Viper) *App {
|
||||||
l.Fatal("failed to initialize auth center", zap.Error(err))
|
l.Fatal("failed to initialize auth center", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
if caller, err = handler.New(); err != nil {
|
|
||||||
l.Fatal("could not initialize API handler", zap.Error(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
if v.IsSet(cfgTLSKeyFile) && v.IsSet(cfgTLSCertFile) {
|
if v.IsSet(cfgTLSKeyFile) && v.IsSet(cfgTLSCertFile) {
|
||||||
tls = &tlsConfig{
|
tls = &tlsConfig{
|
||||||
KeyFile: v.GetString(cfgTLSKeyFile),
|
KeyFile: v.GetString(cfgTLSKeyFile),
|
||||||
|
@ -133,6 +129,21 @@ func newApp(l *zap.Logger, v *viper.Viper) *App {
|
||||||
l.Fatal("could not prepare ObjectLayer", zap.Error(err))
|
l.Fatal("could not prepare ObjectLayer", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
{ // should prepare api.Handler:
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), conTimeout)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
apiParams := handler.Params{
|
||||||
|
Log: l,
|
||||||
|
Cli: cli,
|
||||||
|
Key: center.GetNeoFSPrivateKey(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if caller, err = handler.New(ctx, apiParams); err != nil {
|
||||||
|
l.Fatal("could not initialize API handler", zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return &App{
|
return &App{
|
||||||
center: center,
|
center: center,
|
||||||
cli: cli,
|
cli: cli,
|
||||||
|
|
2
go.mod
2
go.mod
|
@ -9,6 +9,7 @@ require (
|
||||||
github.com/Shopify/sarama v1.24.1
|
github.com/Shopify/sarama v1.24.1
|
||||||
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
|
github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect
|
||||||
github.com/alecthomas/participle v0.2.1
|
github.com/alecthomas/participle v0.2.1
|
||||||
|
github.com/aws/aws-sdk-go v1.33.8
|
||||||
github.com/bcicen/jstream v0.0.0-20190220045926-16c1f8af81c2
|
github.com/bcicen/jstream v0.0.0-20190220045926-16c1f8af81c2
|
||||||
github.com/beevik/ntp v0.2.0
|
github.com/beevik/ntp v0.2.0
|
||||||
github.com/cespare/xxhash/v2 v2.1.1
|
github.com/cespare/xxhash/v2 v2.1.1
|
||||||
|
@ -110,5 +111,4 @@ require (
|
||||||
gopkg.in/olivere/elastic.v5 v5.0.80
|
gopkg.in/olivere/elastic.v5 v5.0.80
|
||||||
gopkg.in/yaml.v2 v2.2.8
|
gopkg.in/yaml.v2 v2.2.8
|
||||||
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
|
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
|
||||||
github.com/aws/aws-sdk-go v1.33.8
|
|
||||||
)
|
)
|
||||||
|
|
|
@ -1939,8 +1939,8 @@ func toAPIError(ctx context.Context, err error) Error {
|
||||||
return apiErr
|
return apiErr
|
||||||
}
|
}
|
||||||
|
|
||||||
// getAPIError provides API Error for input API error code.
|
// GetAPIError provides API Error for input API error code.
|
||||||
func getAPIError(code ErrorCode) Error {
|
func GetAPIError(code ErrorCode) Error {
|
||||||
if apiErr, ok := errorCodes[code]; ok {
|
if apiErr, ok := errorCodes[code]; ok {
|
||||||
return apiErr
|
return apiErr
|
||||||
}
|
}
|
||||||
|
|
131
neofs/api/handler/api.go
Normal file
131
neofs/api/handler/api.go
Normal file
|
@ -0,0 +1,131 @@
|
||||||
|
package handler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"math"
|
||||||
|
|
||||||
|
"github.com/minio/minio/neofs/api"
|
||||||
|
"github.com/minio/minio/neofs/pool"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/refs"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/service"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/session"
|
||||||
|
crypto "github.com/nspcc-dev/neofs-crypto"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
handler struct {
|
||||||
|
log *zap.Logger
|
||||||
|
cli pool.Client
|
||||||
|
uid refs.OwnerID
|
||||||
|
tkn *service.Token
|
||||||
|
key *ecdsa.PrivateKey
|
||||||
|
}
|
||||||
|
|
||||||
|
Params struct {
|
||||||
|
Cli pool.Client
|
||||||
|
Log *zap.Logger
|
||||||
|
Key *ecdsa.PrivateKey
|
||||||
|
}
|
||||||
|
|
||||||
|
queryParams struct {
|
||||||
|
key *ecdsa.PrivateKey
|
||||||
|
addr refs.Address
|
||||||
|
verb service.Token_Info_Verb
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
var _ api.Handler = (*handler)(nil)
|
||||||
|
|
||||||
|
func New(ctx context.Context, p Params) (api.Handler, error) {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
uid refs.OwnerID
|
||||||
|
tkn *service.Token
|
||||||
|
)
|
||||||
|
|
||||||
|
switch {
|
||||||
|
case p.Key == nil:
|
||||||
|
return nil, errors.New("empty private key")
|
||||||
|
case p.Cli == nil:
|
||||||
|
return nil, errors.New("empty gRPC client")
|
||||||
|
case p.Log == nil:
|
||||||
|
return nil, errors.New("empty logger")
|
||||||
|
}
|
||||||
|
|
||||||
|
if uid, err = refs.NewOwnerID(&p.Key.PublicKey); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not fetch OwnerID")
|
||||||
|
} else if tkn, err = generateToken(ctx, p.Cli, p.Key); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not prepare session token")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &handler{
|
||||||
|
uid: uid,
|
||||||
|
tkn: tkn,
|
||||||
|
key: p.Key,
|
||||||
|
log: p.Log,
|
||||||
|
cli: p.Cli,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func generateToken(ctx context.Context, cli pool.Client, key *ecdsa.PrivateKey) (*service.Token, error) {
|
||||||
|
owner, err := refs.NewOwnerID(&key.PublicKey)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
token := new(service.Token)
|
||||||
|
token.SetOwnerID(owner)
|
||||||
|
token.SetExpirationEpoch(math.MaxUint64)
|
||||||
|
token.SetOwnerKey(crypto.MarshalPublicKey(&key.PublicKey))
|
||||||
|
|
||||||
|
conn, err := cli.GetConnection(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
creator, err := session.NewGRPCCreator(conn, key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := creator.Create(ctx, token)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
token.SetID(res.GetID())
|
||||||
|
token.SetSessionKey(res.GetSessionKey())
|
||||||
|
|
||||||
|
return token, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareToken(t *service.Token, p queryParams) (*service.Token, error) {
|
||||||
|
sig := make([]byte, len(t.Signature))
|
||||||
|
copy(sig, t.Signature)
|
||||||
|
|
||||||
|
token := &service.Token{
|
||||||
|
Token_Info: service.Token_Info{
|
||||||
|
ID: t.ID,
|
||||||
|
OwnerID: t.OwnerID,
|
||||||
|
Verb: t.Verb,
|
||||||
|
Address: t.Address,
|
||||||
|
TokenLifetime: t.TokenLifetime,
|
||||||
|
SessionKey: t.SessionKey,
|
||||||
|
OwnerKey: t.OwnerKey,
|
||||||
|
},
|
||||||
|
Signature: sig,
|
||||||
|
}
|
||||||
|
|
||||||
|
token.SetAddress(p.addr)
|
||||||
|
token.SetVerb(p.verb)
|
||||||
|
|
||||||
|
err := service.AddSignatureWithKey(p.key, service.NewSignedSessionToken(token))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return token, nil
|
||||||
|
}
|
193
neofs/api/handler/container.go
Normal file
193
neofs/api/handler/container.go
Normal file
|
@ -0,0 +1,193 @@
|
||||||
|
package handler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/xml"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/minio/minio/neofs/api"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/container"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/refs"
|
||||||
|
"github.com/nspcc-dev/neofs-api-go/service"
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
type (
|
||||||
|
// Owner - bucket owner/principal
|
||||||
|
Owner struct {
|
||||||
|
ID string
|
||||||
|
DisplayName string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Bucket container for bucket metadata
|
||||||
|
Bucket struct {
|
||||||
|
Name string
|
||||||
|
CreationDate string // time string of format "2006-01-02T15:04:05.000Z"
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListBucketsResponse - format for list buckets response
|
||||||
|
ListBucketsResponse struct {
|
||||||
|
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListAllMyBucketsResult" json:"-"`
|
||||||
|
|
||||||
|
Owner Owner
|
||||||
|
|
||||||
|
// Container for one or more buckets.
|
||||||
|
Buckets struct {
|
||||||
|
Buckets []*Bucket `xml:"Bucket"`
|
||||||
|
} // Buckets are nested
|
||||||
|
}
|
||||||
|
|
||||||
|
cnrInfoParams struct {
|
||||||
|
cid refs.CID
|
||||||
|
con *grpc.ClientConn
|
||||||
|
tkn *service.BearerTokenMsg
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO should be replaced with auth.GetBearerToken
|
||||||
|
func getBearerToken(ctx context.Context) (*service.BearerTokenMsg, error) {
|
||||||
|
if val := ctx.Value("ctxBearerToken"); val == nil {
|
||||||
|
return nil, errors.New("empty bearer token")
|
||||||
|
} else if tkn, ok := val.(*service.BearerTokenMsg); ok {
|
||||||
|
return tkn, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, errors.New("bad value for bearer token")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handler) getContainerInfo(ctx context.Context, p cnrInfoParams) (*Bucket, error) {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
res *container.GetResponse
|
||||||
|
)
|
||||||
|
|
||||||
|
req := new(container.GetRequest)
|
||||||
|
req.SetCID(p.cid)
|
||||||
|
req.SetTTL(service.SingleForwardingTTL)
|
||||||
|
req.SetBearer(p.tkn)
|
||||||
|
|
||||||
|
if err = service.SignRequestData(h.key, req); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not sign container info request")
|
||||||
|
} else if res, err = container.NewServiceClient(p.con).Get(ctx, req); err != nil {
|
||||||
|
return nil, errors.Wrap(err, "could not fetch container info")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO should extract nice name
|
||||||
|
// and datetime from container info:
|
||||||
|
_ = res
|
||||||
|
|
||||||
|
return &Bucket{
|
||||||
|
Name: p.cid.String(),
|
||||||
|
CreationDate: new(time.Time).Format(time.RFC3339),
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handler) ListBucketsHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
uid = h.uid
|
||||||
|
inf *Bucket
|
||||||
|
con *grpc.ClientConn
|
||||||
|
res *container.ListResponse
|
||||||
|
)
|
||||||
|
|
||||||
|
// TODO think about timeout
|
||||||
|
ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
// TODO should be replaced with auth.GetBearerToken,
|
||||||
|
// than if we not received token, should call
|
||||||
|
// api.WriteErrorResponse
|
||||||
|
bearer, _ := getBearerToken(ctx)
|
||||||
|
|
||||||
|
// should be taken from BearerToken, to display only users containers
|
||||||
|
// in future
|
||||||
|
if bearer != nil {
|
||||||
|
uid = bearer.OwnerID
|
||||||
|
}
|
||||||
|
|
||||||
|
req := new(container.ListRequest)
|
||||||
|
req.OwnerID = uid
|
||||||
|
req.SetTTL(service.SingleForwardingTTL)
|
||||||
|
req.SetBearer(bearer)
|
||||||
|
// req.SetVersion(APIVersion) ??
|
||||||
|
|
||||||
|
if con, err = h.cli.GetConnection(ctx); err != nil {
|
||||||
|
h.log.Error("could not get connection",
|
||||||
|
zap.Error(err))
|
||||||
|
|
||||||
|
e := api.GetAPIError(api.ErrInternalError)
|
||||||
|
|
||||||
|
api.WriteErrorResponse(ctx, w, api.Error{
|
||||||
|
Code: e.Code,
|
||||||
|
Description: err.Error(),
|
||||||
|
HTTPStatusCode: e.HTTPStatusCode,
|
||||||
|
}, r.URL)
|
||||||
|
|
||||||
|
return
|
||||||
|
} else if err = service.SignRequestData(h.key, req); err != nil {
|
||||||
|
h.log.Error("could not prepare request",
|
||||||
|
zap.Error(err))
|
||||||
|
|
||||||
|
e := api.GetAPIError(api.ErrInternalError)
|
||||||
|
|
||||||
|
api.WriteErrorResponse(ctx, w, api.Error{
|
||||||
|
Code: e.Code,
|
||||||
|
Description: err.Error(),
|
||||||
|
HTTPStatusCode: e.HTTPStatusCode,
|
||||||
|
}, r.URL)
|
||||||
|
|
||||||
|
return
|
||||||
|
} else if res, err = container.NewServiceClient(con).List(ctx, req); err != nil {
|
||||||
|
h.log.Error("could not list buckets",
|
||||||
|
zap.Error(err))
|
||||||
|
|
||||||
|
e := api.GetAPIError(api.ErrInternalError)
|
||||||
|
|
||||||
|
api.WriteErrorResponse(ctx, w, api.Error{
|
||||||
|
Code: e.Code,
|
||||||
|
Description: err.Error(),
|
||||||
|
HTTPStatusCode: e.HTTPStatusCode,
|
||||||
|
}, r.URL)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
result := &ListBucketsResponse{Owner: Owner{
|
||||||
|
ID: uid.String(),
|
||||||
|
DisplayName: uid.String(),
|
||||||
|
}}
|
||||||
|
|
||||||
|
params := cnrInfoParams{con: con, tkn: bearer}
|
||||||
|
|
||||||
|
for _, cid := range res.CID {
|
||||||
|
// should receive each container info (??):
|
||||||
|
params.cid = cid
|
||||||
|
|
||||||
|
if inf, err = h.getContainerInfo(ctx, params); err != nil {
|
||||||
|
h.log.Error("could not fetch bucket info",
|
||||||
|
zap.Error(err))
|
||||||
|
|
||||||
|
e := api.GetAPIError(api.ErrInternalError)
|
||||||
|
|
||||||
|
api.WriteErrorResponse(ctx, w, api.Error{
|
||||||
|
Code: e.Code,
|
||||||
|
Description: err.Error(),
|
||||||
|
HTTPStatusCode: e.HTTPStatusCode,
|
||||||
|
}, r.URL)
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
result.Buckets.Buckets = append(result.Buckets.Buckets, inf)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate response.
|
||||||
|
encodedSuccessResponse := api.EncodeResponse(result)
|
||||||
|
|
||||||
|
// Write response.
|
||||||
|
api.WriteSuccessResponseXML(w, encodedSuccessResponse)
|
||||||
|
}
|
|
@ -6,12 +6,6 @@ import (
|
||||||
"github.com/minio/minio/neofs/api"
|
"github.com/minio/minio/neofs/api"
|
||||||
)
|
)
|
||||||
|
|
||||||
type handler struct{}
|
|
||||||
|
|
||||||
var _ api.Handler = (*handler)(nil)
|
|
||||||
|
|
||||||
func New() (api.Handler, error) { return new(handler), nil }
|
|
||||||
|
|
||||||
func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
|
func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||||
Code: "XNeoFSUnimplemented",
|
Code: "XNeoFSUnimplemented",
|
||||||
|
@ -491,11 +485,3 @@ func (h *handler) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
HTTPStatusCode: http.StatusNotImplemented,
|
HTTPStatusCode: http.StatusNotImplemented,
|
||||||
}, r.URL)
|
}, r.URL)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handler) ListBucketsHandler(w http.ResponseWriter, r *http.Request) {
|
|
||||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
|
||||||
Code: "XNeoFSUnimplemented",
|
|
||||||
Description: "implement me",
|
|
||||||
HTTPStatusCode: http.StatusNotImplemented,
|
|
||||||
}, r.URL)
|
|
||||||
}
|
|
||||||
|
|
|
@ -75,7 +75,7 @@ func WriteErrorResponse(ctx context.Context, w http.ResponseWriter, err Error, r
|
||||||
// Generate error response.
|
// Generate error response.
|
||||||
errorResponse := getAPIErrorResponse(ctx, err, reqURL.Path,
|
errorResponse := getAPIErrorResponse(ctx, err, reqURL.Path,
|
||||||
w.Header().Get(hdrAmzRequestID), deploymentID.String())
|
w.Header().Get(hdrAmzRequestID), deploymentID.String())
|
||||||
encodedErrorResponse := encodeResponse(errorResponse)
|
encodedErrorResponse := EncodeResponse(errorResponse)
|
||||||
writeResponse(w, err.HTTPStatusCode, encodedErrorResponse, mimeXML)
|
writeResponse(w, err.HTTPStatusCode, encodedErrorResponse, mimeXML)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -119,8 +119,8 @@ func writeResponse(w http.ResponseWriter, statusCode int, response []byte, mType
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Encodes the response headers into XML format.
|
// EncodeResponse encodes the response headers into XML format.
|
||||||
func encodeResponse(response interface{}) []byte {
|
func EncodeResponse(response interface{}) []byte {
|
||||||
var bytesBuffer bytes.Buffer
|
var bytesBuffer bytes.Buffer
|
||||||
bytesBuffer.WriteString(xml.Header)
|
bytesBuffer.WriteString(xml.Header)
|
||||||
_ = xml.
|
_ = xml.
|
||||||
|
@ -128,3 +128,9 @@ func encodeResponse(response interface{}) []byte {
|
||||||
Encode(response)
|
Encode(response)
|
||||||
return bytesBuffer.Bytes()
|
return bytesBuffer.Bytes()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WriteSuccessResponseXML writes success headers and response if any,
|
||||||
|
// with content-type set to `application/xml`.
|
||||||
|
func WriteSuccessResponseXML(w http.ResponseWriter, response []byte) {
|
||||||
|
writeResponse(w, http.StatusOK, response, mimeXML)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue