forked from TrueCloudLab/frostfs-s3-gw
[#13] Rename go module name according to NSPCC standards
- refactoring s3 gate structure - cleanup unused code - rename go module to `github.com/nspcc-dev/neofs-s3-gate` closes #13 Signed-off-by: Evgeniy Kulikov <kim@nspcc.ru>
This commit is contained in:
parent
e7f72fc670
commit
0161d2fbd3
25 changed files with 396 additions and 1112 deletions
1995
api/errors.go
Normal file
1995
api/errors.go
Normal file
File diff suppressed because it is too large
Load diff
37
api/handler/api.go
Normal file
37
api/handler/api.go
Normal file
|
@ -0,0 +1,37 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/nspcc-dev/neofs-s3-gate/api"
|
||||
"github.com/nspcc-dev/neofs-s3-gate/api/layer"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
handler struct {
|
||||
log *zap.Logger
|
||||
obj layer.Client
|
||||
}
|
||||
|
||||
Params struct {
|
||||
Log *zap.Logger
|
||||
Obj layer.Client
|
||||
}
|
||||
)
|
||||
|
||||
var _ api.Handler = (*handler)(nil)
|
||||
|
||||
func New(log *zap.Logger, obj layer.Client) (api.Handler, error) {
|
||||
switch {
|
||||
case obj == nil:
|
||||
return nil, errors.New("empty NeoFS Object Layer")
|
||||
case log == nil:
|
||||
return nil, errors.New("empty logger")
|
||||
}
|
||||
|
||||
return &handler{
|
||||
log: log,
|
||||
obj: obj,
|
||||
}, nil
|
||||
}
|
495
api/handler/unimplemented.go
Normal file
495
api/handler/unimplemented.go
Normal file
|
@ -0,0 +1,495 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/nspcc-dev/neofs-s3-gate/api"
|
||||
)
|
||||
|
||||
func (h *handler) ListBucketsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me ListBucketsHandler",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) CopyObjectPartHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) PutObjectPartHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) ListObjectPartsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) NewMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) GetObjectACLHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) PutObjectACLHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) GetObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) SelectObjectContentHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) GetObjectRetentionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) GetObjectLegalHoldHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) PutObjectRetentionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) PutObjectLegalHoldHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) GetBucketLocationHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) GetBucketPolicyHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) GetBucketLifecycleHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) GetBucketEncryptionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) GetBucketACLHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) PutBucketACLHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) GetBucketCorsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) GetBucketWebsiteHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) GetBucketAccelerateHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) GetBucketRequestPaymentHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) GetBucketLoggingHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) GetBucketReplicationHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) GetBucketTaggingHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) DeleteBucketWebsiteHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) DeleteBucketTaggingHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) GetBucketObjectLockConfigHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) GetBucketVersioningHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) GetBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) ListenBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) ListMultipartUploadsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) ListObjectsV2MHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) ListBucketObjectVersionsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) PutBucketLifecycleHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) PutBucketEncryptionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) PutBucketPolicyHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) PutBucketObjectLockConfigHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) PutBucketTaggingHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) PutBucketVersioningHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) PutBucketNotificationHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) PutBucketHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) HeadBucketHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) PostPolicyBucketHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) DeleteBucketPolicyHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) DeleteBucketLifecycleHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) DeleteBucketEncryptionHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
func (h *handler) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) {
|
||||
api.WriteErrorResponse(r.Context(), w, api.Error{
|
||||
Code: "XNeoFSUnimplemented",
|
||||
Description: "implement me",
|
||||
HTTPStatusCode: http.StatusNotImplemented,
|
||||
}, r.URL)
|
||||
}
|
130
api/layer/container.go
Normal file
130
api/layer/container.go
Normal file
|
@ -0,0 +1,130 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/container"
|
||||
"github.com/nspcc-dev/neofs-api-go/refs"
|
||||
"github.com/nspcc-dev/neofs-api-go/service"
|
||||
"github.com/nspcc-dev/neofs-s3-gate/auth"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
BucketInfo struct {
|
||||
Name string
|
||||
CID refs.CID
|
||||
Created time.Time
|
||||
}
|
||||
|
||||
ListObjectsParams struct {
|
||||
Bucket string
|
||||
Prefix string
|
||||
Token string
|
||||
Delimiter string
|
||||
MaxKeys int
|
||||
}
|
||||
)
|
||||
|
||||
func (n *layer) containerInfo(ctx context.Context, cid refs.CID) (*BucketInfo, error) {
|
||||
bearer, err := auth.GetBearerToken(ctx)
|
||||
if err != nil {
|
||||
n.log.Error("could not receive bearer token",
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req := new(container.GetRequest)
|
||||
req.SetCID(cid)
|
||||
req.SetTTL(service.SingleForwardingTTL)
|
||||
// req.SetBearer(bearer)
|
||||
|
||||
_ = bearer
|
||||
|
||||
if err = service.SignRequestData(n.key, req); err != nil {
|
||||
n.log.Error("could not prepare request",
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn, err := n.cli.GetConnection(ctx)
|
||||
if err != nil {
|
||||
n.log.Error("could not prepare client",
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// todo: think about timeout
|
||||
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
res, err := container.NewServiceClient(conn).Get(ctx, req)
|
||||
if err != nil {
|
||||
n.log.Error("could not list buckets",
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_ = res
|
||||
|
||||
return &BucketInfo{
|
||||
CID: cid,
|
||||
Name: cid.String(), // should be fetched from container.GetResponse
|
||||
Created: time.Time{}, // should be fetched from container.GetResponse
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (n *layer) containerList(ctx context.Context) ([]BucketInfo, error) {
|
||||
bearer, err := auth.GetBearerToken(ctx)
|
||||
if err != nil {
|
||||
n.log.Error("could not receive bearer token",
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req := new(container.ListRequest)
|
||||
req.OwnerID = n.uid
|
||||
req.SetTTL(service.SingleForwardingTTL)
|
||||
// req.SetBearer(bearer)
|
||||
|
||||
_ = bearer
|
||||
|
||||
if err := service.SignRequestData(n.key, req); err != nil {
|
||||
n.log.Error("could not prepare request",
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conn, err := n.cli.GetConnection(ctx)
|
||||
if err != nil {
|
||||
n.log.Error("could not prepare client",
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// todo: think about timeout
|
||||
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
res, err := container.NewServiceClient(conn).List(ctx, req)
|
||||
if err != nil {
|
||||
n.log.Error("could not list buckets",
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
list := make([]BucketInfo, 0, len(res.CID))
|
||||
for _, cid := range res.CID {
|
||||
info, err := n.containerInfo(ctx, cid)
|
||||
if err != nil {
|
||||
n.log.Error("could not fetch container info",
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
list = append(list, *info)
|
||||
}
|
||||
|
||||
return list, nil
|
||||
}
|
388
api/layer/layer.go
Normal file
388
api/layer/layer.go
Normal file
|
@ -0,0 +1,388 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"io"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/object"
|
||||
"github.com/nspcc-dev/neofs-api-go/refs"
|
||||
"github.com/nspcc-dev/neofs-api-go/service"
|
||||
"github.com/nspcc-dev/neofs-s3-gate/api/pool"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
layer struct {
|
||||
log *zap.Logger
|
||||
cli pool.Client
|
||||
uid refs.OwnerID
|
||||
key *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
GetObjectParams struct {
|
||||
Bucket string
|
||||
Object string
|
||||
Offset int64
|
||||
Length int64
|
||||
Writer io.Writer
|
||||
}
|
||||
|
||||
PutObjectParams struct {
|
||||
Bucket string
|
||||
Object string
|
||||
Size int64
|
||||
Reader io.Reader
|
||||
Header map[string]string
|
||||
}
|
||||
|
||||
CopyObjectParams struct {
|
||||
SrcBucket string
|
||||
DstBucket string
|
||||
SrcObject string
|
||||
DstObject string
|
||||
}
|
||||
|
||||
NeoFS interface {
|
||||
Get(ctx context.Context, address refs.Address) (*object.Object, error)
|
||||
}
|
||||
|
||||
Client interface {
|
||||
NeoFS
|
||||
|
||||
ListBuckets(ctx context.Context) ([]BucketInfo, error)
|
||||
GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error)
|
||||
|
||||
GetObject(ctx context.Context, p *GetObjectParams) error
|
||||
GetObjectInfo(ctx context.Context, bucketName, objectName string) (*ObjectInfo, error)
|
||||
|
||||
PutObject(ctx context.Context, p *PutObjectParams) (*ObjectInfo, error)
|
||||
|
||||
CopyObject(ctx context.Context, p *CopyObjectParams) (*ObjectInfo, error)
|
||||
|
||||
ListObjects(ctx context.Context, p *ListObjectsParams) (*ListObjectsInfo, error)
|
||||
|
||||
DeleteObject(ctx context.Context, bucket, object string) error
|
||||
DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error)
|
||||
}
|
||||
)
|
||||
|
||||
// AWS3NameHeader key in the object neofs.
|
||||
const AWS3NameHeader = "filename"
|
||||
|
||||
// NewGatewayLayer creates instance of layer. It checks credentials
|
||||
// and establishes gRPC connection with node.
|
||||
func NewLayer(log *zap.Logger, cli pool.Client, key *ecdsa.PrivateKey) (Client, error) {
|
||||
uid, err := refs.NewOwnerID(&key.PublicKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &layer{
|
||||
cli: cli,
|
||||
key: key,
|
||||
log: log,
|
||||
uid: uid,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Get NeoFS Object by refs.Address (should be used by auth.Center)
|
||||
func (n *layer) Get(ctx context.Context, address refs.Address) (*object.Object, error) {
|
||||
conn, err := n.cli.GetConnection(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
token, err := n.cli.SessionToken(ctx, &pool.SessionParams{
|
||||
Conn: conn,
|
||||
Addr: address,
|
||||
Verb: service.Token_Info_Get,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req := new(object.GetRequest)
|
||||
req.Address = address
|
||||
req.SetTTL(service.SingleForwardingTTL)
|
||||
req.SetToken(token)
|
||||
|
||||
err = service.SignRequestData(n.key, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// todo: think about timeout
|
||||
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
cli, err := object.NewServiceClient(conn).Get(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return receiveObject(cli)
|
||||
}
|
||||
|
||||
// GetBucketInfo returns bucket name.
|
||||
func (n *layer) GetBucketInfo(ctx context.Context, name string) (*BucketInfo, error) {
|
||||
list, err := n.containerList(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, bkt := range list {
|
||||
if bkt.Name == name {
|
||||
return &bkt, nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil, errors.New("bucket not found")
|
||||
}
|
||||
|
||||
// ListBuckets returns all user containers. Name of the bucket is a container
|
||||
// id. Timestamp is omitted since it is not saved in neofs container.
|
||||
func (n *layer) ListBuckets(ctx context.Context) ([]BucketInfo, error) {
|
||||
return n.containerList(ctx)
|
||||
}
|
||||
|
||||
// ListObjects returns objects from the container. It ignores tombstones and
|
||||
// storage groups.
|
||||
// ctx, bucket, prefix, continuationToken, delimiter, maxKeys
|
||||
func (n *layer) ListObjects(ctx context.Context, p *ListObjectsParams) (*ListObjectsInfo, error) {
|
||||
// todo: make pagination when search response will be gRPC stream,
|
||||
// pagination must be implemented with cache, because search results
|
||||
// may be different between search calls
|
||||
var (
|
||||
result ListObjectsInfo
|
||||
uniqNames = make(map[string]struct{})
|
||||
)
|
||||
|
||||
bkt, err := n.GetBucketInfo(ctx, p.Bucket)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
objectIDs, err := n.objectSearchContainer(ctx, bkt.CID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ln := len(objectIDs)
|
||||
// todo: check what happens if there is more than maxKeys objects
|
||||
if ln > p.MaxKeys {
|
||||
result.IsTruncated = true
|
||||
ln = p.MaxKeys
|
||||
}
|
||||
|
||||
result.Objects = make([]ObjectInfo, 0, ln)
|
||||
|
||||
for i := 0; i < ln; i++ {
|
||||
addr := refs.Address{ObjectID: objectIDs[i], CID: bkt.CID}
|
||||
|
||||
meta, err := n.objectHead(ctx, addr)
|
||||
if err != nil {
|
||||
n.log.Warn("could not fetch object meta", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
// ignore tombstone objects
|
||||
_, hdr := meta.LastHeader(object.HeaderType(object.TombstoneHdr))
|
||||
if hdr != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// ignore storage group objects
|
||||
_, hdr = meta.LastHeader(object.HeaderType(object.StorageGroupHdr))
|
||||
if hdr != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// dirs don't exist in neofs, gateway stores full path to the file
|
||||
// in object header, e.g. `filename`:`/this/is/path/file.txt`
|
||||
|
||||
// prefix argument contains full dir path from the root, e.g. `/this/is/`
|
||||
|
||||
// to emulate dirs we take dirs in path, compare it with prefix
|
||||
// and look for entities after prefix. If entity does not have any
|
||||
// sub-entities, then it is a file, else directory.
|
||||
|
||||
_, dirname := nameFromObject(meta)
|
||||
if strings.HasPrefix(dirname, p.Prefix) {
|
||||
var (
|
||||
oi *ObjectInfo
|
||||
tail = strings.TrimLeft(dirname, p.Prefix)
|
||||
ind = strings.Index(tail, pathSeparator)
|
||||
)
|
||||
|
||||
if ind < 0 { // if there are not sub-entities in tail - file
|
||||
oi = objectInfoFromMeta(meta)
|
||||
} else { // if there are sub-entities in tail - dir
|
||||
oi = &ObjectInfo{
|
||||
Bucket: meta.SystemHeader.CID.String(),
|
||||
Name: tail[:ind+1], // dir MUST have slash symbol in the end
|
||||
// IsDir: true,
|
||||
}
|
||||
}
|
||||
|
||||
// use only unique dir names
|
||||
if _, ok := uniqNames[oi.Name]; !ok {
|
||||
uniqNames[oi.Name] = struct{}{}
|
||||
|
||||
result.Objects = append(result.Objects, *oi)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
// GetObject from storage.
|
||||
func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
|
||||
cid, err := refs.CIDFromString(p.Bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
oid, err := n.objectFindID(ctx, cid, p.Object, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
addr := refs.Address{
|
||||
ObjectID: oid,
|
||||
CID: cid,
|
||||
}
|
||||
_, err = n.objectGet(ctx, getParams{
|
||||
addr: addr,
|
||||
start: p.Offset,
|
||||
length: p.Length,
|
||||
writer: p.Writer,
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// GetObjectInfo returns meta information about the object.
|
||||
func (n *layer) GetObjectInfo(ctx context.Context, bucketName, objectName string) (*ObjectInfo, error) {
|
||||
var meta *object.Object
|
||||
if cid, err := refs.CIDFromString(bucketName); err != nil {
|
||||
return nil, err
|
||||
} else if oid, err := n.objectFindID(ctx, cid, objectName, false); err != nil {
|
||||
return nil, err
|
||||
} else if meta, err = n.objectHead(ctx, refs.Address{CID: cid, ObjectID: oid}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return objectInfoFromMeta(meta), nil
|
||||
}
|
||||
|
||||
// PutObject into storage.
|
||||
func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*ObjectInfo, error) {
|
||||
cid, err := refs.CIDFromString(p.Bucket)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = n.objectFindID(ctx, cid, p.Object, true)
|
||||
if err == nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
oid, err := refs.NewObjectID()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sgid, err := refs.NewSGID()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
addr := refs.Address{
|
||||
ObjectID: oid,
|
||||
CID: cid,
|
||||
}
|
||||
|
||||
meta, err := n.objectPut(ctx, putParams{
|
||||
addr: addr,
|
||||
size: p.Size,
|
||||
name: p.Object,
|
||||
r: p.Reader,
|
||||
userHeaders: p.Header,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
oi := objectInfoFromMeta(meta)
|
||||
|
||||
// for every object create storage group, otherwise object will be deleted
|
||||
addr.ObjectID = sgid
|
||||
|
||||
_, err = n.storageGroupPut(ctx, sgParams{
|
||||
addr: addr,
|
||||
objects: []refs.ObjectID{oid},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return oi, nil
|
||||
}
|
||||
|
||||
// CopyObject from one bucket into another bucket.
|
||||
func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*ObjectInfo, error) {
|
||||
info, err := n.GetObjectInfo(ctx, p.SrcBucket, p.SrcObject)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
|
||||
go func() {
|
||||
err := n.GetObject(ctx, &GetObjectParams{
|
||||
Bucket: p.SrcBucket,
|
||||
Object: p.SrcObject,
|
||||
Writer: pw,
|
||||
})
|
||||
|
||||
_ = pw.CloseWithError(err)
|
||||
}()
|
||||
|
||||
return n.PutObject(ctx, &PutObjectParams{
|
||||
Bucket: p.DstBucket,
|
||||
Object: p.DstObject,
|
||||
Size: info.Size,
|
||||
Reader: pr,
|
||||
Header: info.Headers,
|
||||
})
|
||||
}
|
||||
|
||||
// DeleteObject from the storage.
|
||||
func (n *layer) DeleteObject(ctx context.Context, bucket, object string) error {
|
||||
cid, err := refs.CIDFromString(bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
oid, err := n.objectFindID(ctx, cid, object, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return n.objectDelete(ctx, delParams{addr: refs.Address{CID: cid, ObjectID: oid}})
|
||||
}
|
||||
|
||||
// DeleteObjects from the storage.
|
||||
func (n *layer) DeleteObjects(ctx context.Context, bucket string, objects []string) ([]error, error) {
|
||||
var errs = make([]error, 0, len(objects))
|
||||
|
||||
for i := range objects {
|
||||
errs = append(errs, n.DeleteObject(ctx, bucket, objects[i]))
|
||||
}
|
||||
|
||||
return errs, nil
|
||||
}
|
566
api/layer/object.go
Normal file
566
api/layer/object.go
Normal file
|
@ -0,0 +1,566 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/object"
|
||||
"github.com/nspcc-dev/neofs-api-go/query"
|
||||
"github.com/nspcc-dev/neofs-api-go/refs"
|
||||
"github.com/nspcc-dev/neofs-api-go/service"
|
||||
"github.com/nspcc-dev/neofs-api-go/storagegroup"
|
||||
"github.com/nspcc-dev/neofs-s3-gate/api/pool"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
dataChunkSize = 3 * object.UnitsMB
|
||||
objectVersion = 1
|
||||
)
|
||||
|
||||
type (
|
||||
putParams struct {
|
||||
addr refs.Address
|
||||
name string
|
||||
size int64
|
||||
r io.Reader
|
||||
userHeaders map[string]string
|
||||
}
|
||||
|
||||
sgParams struct {
|
||||
addr refs.Address
|
||||
objects []refs.ObjectID
|
||||
}
|
||||
|
||||
delParams struct {
|
||||
addr refs.Address
|
||||
}
|
||||
|
||||
getParams struct {
|
||||
addr refs.Address
|
||||
start int64
|
||||
length int64
|
||||
writer io.Writer
|
||||
}
|
||||
)
|
||||
|
||||
// objectSearchContainer returns all available objects in the container.
|
||||
func (n *layer) objectSearchContainer(ctx context.Context, cid refs.CID) ([]refs.ObjectID, error) {
|
||||
var q query.Query
|
||||
q.Filters = append(q.Filters, query.Filter{
|
||||
Type: query.Filter_Exact,
|
||||
Name: object.KeyRootObject,
|
||||
})
|
||||
|
||||
conn, err := n.cli.GetConnection(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
queryBinary, err := q.Marshal()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
token, err := n.cli.SessionToken(ctx, &pool.SessionParams{
|
||||
Conn: conn,
|
||||
Addr: refs.Address{CID: cid},
|
||||
Verb: service.Token_Info_Search,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req := new(object.SearchRequest)
|
||||
req.Query = queryBinary
|
||||
req.QueryVersion = 1
|
||||
req.ContainerID = cid
|
||||
req.SetTTL(service.SingleForwardingTTL)
|
||||
req.SetToken(token)
|
||||
// req.SetBearer(bearerToken)
|
||||
|
||||
err = service.SignRequestData(n.key, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// todo: think about timeout
|
||||
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
searchClient, err := object.NewServiceClient(conn).Search(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var (
|
||||
response []refs.Address
|
||||
result []refs.ObjectID
|
||||
)
|
||||
|
||||
for {
|
||||
resp, err := searchClient.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
return nil, errors.New("search command received error")
|
||||
}
|
||||
|
||||
response = append(response, resp.Addresses...)
|
||||
}
|
||||
|
||||
for i := range response {
|
||||
result = append(result, response[i].ObjectID)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// objectFindID returns object id (uuid) based on it's nice name in s3. If
|
||||
// nice name is uuid compatible, then function returns it.
|
||||
func (n *layer) objectFindID(ctx context.Context, cid refs.CID, name string, put bool) (refs.ObjectID, error) {
|
||||
var (
|
||||
id refs.ObjectID
|
||||
q query.Query
|
||||
)
|
||||
|
||||
q.Filters = append(q.Filters, query.Filter{
|
||||
Type: query.Filter_Exact,
|
||||
Name: object.KeyRootObject,
|
||||
})
|
||||
q.Filters = append(q.Filters, query.Filter{
|
||||
Type: query.Filter_Exact,
|
||||
Name: AWS3NameHeader,
|
||||
Value: name,
|
||||
})
|
||||
|
||||
queryBinary, err := q.Marshal()
|
||||
if err != nil {
|
||||
return id, err
|
||||
}
|
||||
|
||||
conn, err := n.cli.GetConnection(ctx)
|
||||
if err != nil {
|
||||
return id, err
|
||||
}
|
||||
|
||||
token, err := n.cli.SessionToken(ctx, &pool.SessionParams{
|
||||
Conn: conn,
|
||||
Addr: refs.Address{CID: cid},
|
||||
Verb: service.Token_Info_Search,
|
||||
})
|
||||
if err != nil {
|
||||
return id, err
|
||||
}
|
||||
|
||||
req := new(object.SearchRequest)
|
||||
req.Query = queryBinary
|
||||
req.QueryVersion = 1
|
||||
req.ContainerID = cid
|
||||
req.SetTTL(service.SingleForwardingTTL)
|
||||
req.SetToken(token)
|
||||
// req.SetBearer(bearerToken)
|
||||
|
||||
err = service.SignRequestData(n.key, req)
|
||||
if err != nil {
|
||||
return id, err
|
||||
}
|
||||
|
||||
// todo: think about timeout
|
||||
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
searchClient, err := object.NewServiceClient(conn).Search(ctx, req)
|
||||
if err != nil {
|
||||
return id, err
|
||||
}
|
||||
|
||||
var response []refs.Address
|
||||
|
||||
for {
|
||||
resp, err := searchClient.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
return id, errors.New("search command received error")
|
||||
}
|
||||
|
||||
response = append(response, resp.Addresses...)
|
||||
}
|
||||
|
||||
switch ln := len(response); {
|
||||
case ln > 1:
|
||||
return id, errors.New("several objects with the same name found")
|
||||
case ln == 1:
|
||||
return response[0].ObjectID, nil
|
||||
default:
|
||||
// Minio lists all objects with and without nice names. All objects
|
||||
// without nice name still have "name" in terms of minio - uuid encoded
|
||||
// into string. There is a tricky case when user upload object
|
||||
// with nice name that is encoded uuid.
|
||||
// There is an optimisation to parse name and return uuid if it name is uuid
|
||||
// compatible. It _should not_ work in case of put operation, because object
|
||||
// with uuid compatible nice name may not exist. Therefore this optimization
|
||||
// breaks object put logic and must be turned off.
|
||||
if !put {
|
||||
err := id.Parse(name)
|
||||
if err == nil {
|
||||
return id, nil
|
||||
}
|
||||
}
|
||||
return id, errors.New("object not found")
|
||||
}
|
||||
}
|
||||
|
||||
// objectHead returns all object's headers.
|
||||
func (n *layer) objectHead(ctx context.Context, addr refs.Address) (*object.Object, error) {
|
||||
|
||||
conn, err := n.cli.GetConnection(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
token, err := n.cli.SessionToken(ctx, &pool.SessionParams{
|
||||
Conn: conn,
|
||||
Addr: addr,
|
||||
Verb: service.Token_Info_Head,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req := new(object.HeadRequest)
|
||||
req.Address = addr
|
||||
req.FullHeaders = true
|
||||
req.SetTTL(service.SingleForwardingTTL)
|
||||
req.SetToken(token)
|
||||
// req.SetBearer(bearerToken)
|
||||
|
||||
err = service.SignRequestData(n.key, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// todo: think about timeout
|
||||
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
res, err := object.NewServiceClient(conn).Head(ctx, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.Object, nil
|
||||
}
|
||||
|
||||
func receiveObject(cli object.Service_GetClient) (*object.Object, error) {
|
||||
var (
|
||||
off int
|
||||
buf []byte
|
||||
obj *object.Object
|
||||
)
|
||||
|
||||
for {
|
||||
resp, err := cli.Recv()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch o := resp.R.(type) {
|
||||
case *object.GetResponse_Object:
|
||||
|
||||
if _, hdr := o.Object.LastHeader(object.HeaderType(object.TombstoneHdr)); hdr != nil {
|
||||
return nil, errors.New("object already removed")
|
||||
}
|
||||
|
||||
obj = o.Object
|
||||
buf = make([]byte, obj.SystemHeader.PayloadLength)
|
||||
|
||||
if len(obj.Payload) > 0 {
|
||||
off += copy(buf, obj.Payload)
|
||||
}
|
||||
case *object.GetResponse_Chunk:
|
||||
if obj == nil {
|
||||
return nil, errors.New("object headers not received")
|
||||
}
|
||||
off += copy(buf[off:], o.Chunk)
|
||||
default:
|
||||
return nil, errors.Errorf("unknown response %T", o)
|
||||
}
|
||||
}
|
||||
|
||||
if obj == nil {
|
||||
return nil, errors.New("object headers not received")
|
||||
}
|
||||
obj.Payload = buf
|
||||
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
// objectGet and write it into provided io.Reader.
|
||||
func (n *layer) objectGet(ctx context.Context, p getParams) (*object.Object, error) {
|
||||
conn, err := n.cli.GetConnection(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
token, err := n.cli.SessionToken(ctx, &pool.SessionParams{
|
||||
Conn: conn,
|
||||
Addr: p.addr,
|
||||
Verb: service.Token_Info_Get,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// todo: replace object.Get() call by object.GetRange() for
|
||||
// true sequential reading support; it will be possible when
|
||||
// object.GetRange() response message become gRPC stream.
|
||||
req := new(object.GetRequest)
|
||||
req.Address = p.addr
|
||||
req.SetTTL(service.SingleForwardingTTL)
|
||||
req.SetToken(token)
|
||||
// req.SetBearer(bearerToken)
|
||||
|
||||
err = service.SignRequestData(n.key, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// todo: think about timeout
|
||||
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
var obj *object.Object
|
||||
|
||||
if cli, err := object.NewServiceClient(conn).Get(ctx, req); err != nil {
|
||||
return nil, err
|
||||
} else if obj, err = receiveObject(cli); err != nil {
|
||||
return nil, err
|
||||
} else if ln := int64(obj.SystemHeader.PayloadLength); p.start+p.length > ln {
|
||||
return nil, errors.Errorf("slice bounds out of range: len = %d, start = %d, offset = %d",
|
||||
ln, p.start, p.length)
|
||||
} else if _, err = p.writer.Write(obj.Payload[p.start : p.start+p.length]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// remove payload:
|
||||
obj.Payload = nil
|
||||
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
// objectPut into neofs, took payload from io.Reader.
|
||||
func (n *layer) objectPut(ctx context.Context, p putParams) (*object.Object, error) {
|
||||
conn, err := n.cli.GetConnection(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
token, err := n.cli.SessionToken(ctx, &pool.SessionParams{
|
||||
Conn: conn,
|
||||
Addr: p.addr,
|
||||
Verb: service.Token_Info_Put,
|
||||
})
|
||||
if err != nil {
|
||||
n.log.Error("could not prepare token",
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
putClient, err := object.NewServiceClient(conn).Put(ctx)
|
||||
if err != nil {
|
||||
n.log.Error("could not prepare PutClient",
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if p.userHeaders == nil {
|
||||
p.userHeaders = make(map[string]string)
|
||||
}
|
||||
|
||||
p.userHeaders[AWS3NameHeader] = p.name
|
||||
|
||||
readBuffer := make([]byte, dataChunkSize)
|
||||
obj := &object.Object{
|
||||
SystemHeader: object.SystemHeader{
|
||||
Version: objectVersion,
|
||||
ID: p.addr.ObjectID,
|
||||
OwnerID: n.uid,
|
||||
CID: p.addr.CID,
|
||||
PayloadLength: uint64(p.size),
|
||||
},
|
||||
Headers: parseUserHeaders(p.userHeaders),
|
||||
}
|
||||
|
||||
req := object.MakePutRequestHeader(obj)
|
||||
req.SetTTL(service.SingleForwardingTTL)
|
||||
req.SetToken(token)
|
||||
// req.SetBearer(bearerToken)
|
||||
|
||||
err = service.SignRequestData(n.key, req)
|
||||
if err != nil {
|
||||
n.log.Error("could not prepare request",
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = putClient.Send(req)
|
||||
if err != nil {
|
||||
n.log.Error("could not send request",
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
read, err := p.r.Read(readBuffer)
|
||||
for read > 0 {
|
||||
if err != nil && err != io.EOF {
|
||||
n.log.Error("something went wrong",
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if read > 0 {
|
||||
req := object.MakePutRequestChunk(readBuffer[:read])
|
||||
req.SetTTL(service.SingleForwardingTTL)
|
||||
// req.SetBearer(bearerToken)
|
||||
|
||||
err = service.SignRequestData(n.key, req)
|
||||
if err != nil {
|
||||
n.log.Error("could not sign chunk request",
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = putClient.Send(req)
|
||||
if err != nil && err != io.EOF {
|
||||
n.log.Error("could not send chunk",
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
read, err = p.r.Read(readBuffer)
|
||||
}
|
||||
|
||||
_, err = putClient.CloseAndRecv()
|
||||
if err != nil {
|
||||
n.log.Error("could not finish request",
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// maybe make a head?
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
// storageGroupPut prepares storage group object and put it into neofs.
|
||||
func (n *layer) storageGroupPut(ctx context.Context, p sgParams) (*object.Object, error) {
|
||||
conn, err := n.cli.GetConnection(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
token, err := n.cli.SessionToken(ctx, &pool.SessionParams{
|
||||
Conn: conn,
|
||||
Addr: p.addr,
|
||||
Verb: service.Token_Info_Put,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
client := object.NewServiceClient(conn)
|
||||
// todo: think about timeout
|
||||
putClient, err := client.Put(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
sg := &object.Object{
|
||||
SystemHeader: object.SystemHeader{
|
||||
Version: objectVersion,
|
||||
ID: p.addr.ObjectID,
|
||||
OwnerID: n.uid,
|
||||
CID: p.addr.CID,
|
||||
},
|
||||
Headers: make([]object.Header, 0, len(p.objects)),
|
||||
}
|
||||
|
||||
for i := range p.objects {
|
||||
sg.AddHeader(&object.Header{Value: &object.Header_Link{
|
||||
Link: &object.Link{Type: object.Link_StorageGroup, ID: p.objects[i]},
|
||||
}})
|
||||
}
|
||||
|
||||
sg.SetStorageGroup(new(storagegroup.StorageGroup))
|
||||
|
||||
req := object.MakePutRequestHeader(sg)
|
||||
req.SetTTL(service.SingleForwardingTTL)
|
||||
req.SetToken(token)
|
||||
// req.SetBearer(bearerToken)
|
||||
|
||||
err = service.SignRequestData(n.key, req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = putClient.Send(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err = putClient.CloseAndRecv()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return sg, nil
|
||||
}
|
||||
|
||||
// objectDelete puts tombstone object into neofs.
|
||||
func (n *layer) objectDelete(ctx context.Context, p delParams) error {
|
||||
conn, err := n.cli.GetConnection(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
token, err := n.cli.SessionToken(ctx, &pool.SessionParams{
|
||||
Conn: conn,
|
||||
Addr: p.addr,
|
||||
Verb: service.Token_Info_Delete,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
req := new(object.DeleteRequest)
|
||||
req.Address = p.addr
|
||||
req.OwnerID = n.uid
|
||||
req.SetTTL(service.SingleForwardingTTL)
|
||||
req.SetToken(token)
|
||||
// req.SetBearer(bearerToken)
|
||||
|
||||
err = service.SignRequestData(n.key, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// todo: think about timeout
|
||||
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
_, err = object.NewServiceClient(conn).Delete(ctx, req)
|
||||
|
||||
return err
|
||||
}
|
113
api/layer/util.go
Normal file
113
api/layer/util.go
Normal file
|
@ -0,0 +1,113 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/object"
|
||||
)
|
||||
|
||||
type (
|
||||
ObjectInfo struct {
|
||||
Bucket string
|
||||
Name string
|
||||
Size int64
|
||||
ContentType string
|
||||
Created time.Time
|
||||
Headers map[string]string
|
||||
}
|
||||
|
||||
// ListObjectsInfo - container for list objects.
|
||||
ListObjectsInfo struct {
|
||||
// Indicates whether the returned list objects response is truncated. A
|
||||
// value of true indicates that the list was truncated. The list can be truncated
|
||||
// if the number of objects exceeds the limit allowed or specified
|
||||
// by max keys.
|
||||
IsTruncated bool
|
||||
|
||||
// When response is truncated (the IsTruncated element value in the response
|
||||
// is true), you can use the key name in this field as marker in the subsequent
|
||||
// request to get next set of objects.
|
||||
//
|
||||
// NOTE: This element is returned only if you have delimiter request parameter
|
||||
// specified.
|
||||
ContinuationToken string
|
||||
NextContinuationToken string
|
||||
|
||||
// List of objects info for this request.
|
||||
Objects []ObjectInfo
|
||||
|
||||
// List of prefixes for this request.
|
||||
Prefixes []string
|
||||
}
|
||||
)
|
||||
|
||||
const pathSeparator = string(os.PathSeparator)
|
||||
|
||||
func userHeaders(h []object.Header) map[string]string {
|
||||
result := make(map[string]string, len(h))
|
||||
|
||||
for i := range h {
|
||||
switch v := h[i].Value.(type) {
|
||||
case *object.Header_UserHeader:
|
||||
result[v.UserHeader.Key] = v.UserHeader.Value
|
||||
default:
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func objectInfoFromMeta(meta *object.Object) *ObjectInfo {
|
||||
aws3name := meta.SystemHeader.ID.String()
|
||||
|
||||
userHeaders := userHeaders(meta.Headers)
|
||||
if name, ok := userHeaders[AWS3NameHeader]; ok {
|
||||
aws3name = name
|
||||
delete(userHeaders, name)
|
||||
}
|
||||
|
||||
mimeType := http.DetectContentType(meta.Payload)
|
||||
|
||||
return &ObjectInfo{
|
||||
Bucket: meta.SystemHeader.CID.String(),
|
||||
Name: aws3name,
|
||||
ContentType: mimeType,
|
||||
Headers: userHeaders,
|
||||
Size: int64(meta.SystemHeader.PayloadLength),
|
||||
Created: time.Unix(meta.SystemHeader.CreatedAt.UnixTime, 0),
|
||||
}
|
||||
}
|
||||
|
||||
func parseUserHeaders(h map[string]string) []object.Header {
|
||||
headers := make([]object.Header, 0, len(h))
|
||||
|
||||
for k, v := range h {
|
||||
uh := &object.UserHeader{Key: k, Value: v}
|
||||
headers = append(headers, object.Header{
|
||||
Value: &object.Header_UserHeader{UserHeader: uh},
|
||||
})
|
||||
}
|
||||
|
||||
return headers
|
||||
}
|
||||
|
||||
func nameFromObject(o *object.Object) (string, string) {
|
||||
var (
|
||||
name string
|
||||
uh = userHeaders(o.Headers)
|
||||
)
|
||||
|
||||
if _, ok := uh[AWS3NameHeader]; !ok {
|
||||
name = o.SystemHeader.ID.String()
|
||||
} else {
|
||||
name = uh[AWS3NameHeader]
|
||||
}
|
||||
|
||||
ind := strings.LastIndex(name, pathSeparator)
|
||||
|
||||
return name[ind+1:], name[:ind+1]
|
||||
}
|
56
api/max-clients.go
Normal file
56
api/max-clients.go
Normal file
|
@ -0,0 +1,56 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
type (
|
||||
MaxClients interface {
|
||||
Handle(http.HandlerFunc) http.HandlerFunc
|
||||
}
|
||||
|
||||
maxClients struct {
|
||||
pool chan struct{}
|
||||
timeout time.Duration
|
||||
}
|
||||
)
|
||||
|
||||
const defaultRequestDeadline = time.Second * 30
|
||||
|
||||
func NewMaxClientsMiddleware(count int, timeout time.Duration) MaxClients {
|
||||
if timeout <= 0 {
|
||||
timeout = defaultRequestDeadline
|
||||
}
|
||||
|
||||
return &maxClients{
|
||||
pool: make(chan struct{}, count),
|
||||
timeout: timeout,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *maxClients) Handle(f http.HandlerFunc) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
if m.pool == nil {
|
||||
f.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
deadline := time.NewTimer(m.timeout)
|
||||
defer deadline.Stop()
|
||||
|
||||
select {
|
||||
case m.pool <- struct{}{}:
|
||||
defer func() { <-m.pool }()
|
||||
f.ServeHTTP(w, r)
|
||||
case <-deadline.C:
|
||||
// Send a http timeout message
|
||||
WriteErrorResponse(r.Context(), w,
|
||||
errorCodes.ToAPIErr(ErrOperationMaxedOut),
|
||||
r.URL)
|
||||
return
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
230
api/metrics/api.go
Normal file
230
api/metrics/api.go
Normal file
|
@ -0,0 +1,230 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type (
|
||||
// HTTPAPIStats holds statistics information about
|
||||
// a given API in the requests.
|
||||
HTTPAPIStats struct {
|
||||
apiStats map[string]int
|
||||
sync.RWMutex
|
||||
}
|
||||
|
||||
// HTTPStats holds statistics information about
|
||||
// HTTP requests made by all clients
|
||||
HTTPStats struct {
|
||||
currentS3Requests HTTPAPIStats
|
||||
totalS3Requests HTTPAPIStats
|
||||
totalS3Errors HTTPAPIStats
|
||||
|
||||
totalInputBytes uint64
|
||||
totalOutputBytes uint64
|
||||
}
|
||||
|
||||
readCounter struct {
|
||||
io.ReadCloser
|
||||
countBytes uint64
|
||||
}
|
||||
|
||||
writeCounter struct {
|
||||
http.ResponseWriter
|
||||
countBytes uint64
|
||||
}
|
||||
|
||||
responseWrapper struct {
|
||||
http.ResponseWriter
|
||||
|
||||
statusCode int
|
||||
headWritten bool
|
||||
startTime time.Time
|
||||
}
|
||||
)
|
||||
|
||||
const systemPath = "/system"
|
||||
|
||||
var (
|
||||
httpStatsMetric = new(HTTPStats)
|
||||
httpRequestsDuration = prometheus.NewHistogramVec(
|
||||
prometheus.HistogramOpts{
|
||||
Name: "neofs_s3_request_seconds",
|
||||
Help: "Time taken by requests served by current NeoFS S3 Gate instance",
|
||||
Buckets: []float64{.05, .1, .25, .5, 1, 2.5, 5, 10},
|
||||
},
|
||||
[]string{"api"},
|
||||
)
|
||||
)
|
||||
|
||||
// collects http metrics for NeoFS S3 Gate in Prometheus specific format
|
||||
// and sends to given channel
|
||||
func collectHTTPMetrics(ch chan<- prometheus.Metric) {
|
||||
for api, value := range httpStatsMetric.currentS3Requests.Load() {
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
prometheus.NewDesc(
|
||||
prometheus.BuildFQName("neofs_s3", "requests", "current"),
|
||||
"Total number of running s3 requests in current MinIO server instance",
|
||||
[]string{"api"}, nil),
|
||||
prometheus.CounterValue,
|
||||
float64(value),
|
||||
api,
|
||||
)
|
||||
}
|
||||
|
||||
for api, value := range httpStatsMetric.totalS3Requests.Load() {
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
prometheus.NewDesc(
|
||||
prometheus.BuildFQName("neofs_s3", "requests", "total"),
|
||||
"Total number of s3 requests in current MinIO server instance",
|
||||
[]string{"api"}, nil),
|
||||
prometheus.CounterValue,
|
||||
float64(value),
|
||||
api,
|
||||
)
|
||||
}
|
||||
|
||||
for api, value := range httpStatsMetric.totalS3Errors.Load() {
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
prometheus.NewDesc(
|
||||
prometheus.BuildFQName("neofs_s3", "errors", "total"),
|
||||
"Total number of s3 errors in current MinIO server instance",
|
||||
[]string{"api"}, nil),
|
||||
prometheus.CounterValue,
|
||||
float64(value),
|
||||
api,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func APIStats(api string, f http.HandlerFunc) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
httpStatsMetric.currentS3Requests.Inc(api)
|
||||
defer httpStatsMetric.currentS3Requests.Dec(api)
|
||||
|
||||
in := &readCounter{ReadCloser: r.Body}
|
||||
out := &writeCounter{ResponseWriter: w}
|
||||
|
||||
r.Body = in
|
||||
|
||||
statsWriter := &responseWrapper{
|
||||
ResponseWriter: out,
|
||||
startTime: time.Now(),
|
||||
}
|
||||
|
||||
f.ServeHTTP(statsWriter, r)
|
||||
|
||||
// Time duration in secs since the call started.
|
||||
// We don't need to do nanosecond precision in this
|
||||
// simply for the fact that it is not human readable.
|
||||
durationSecs := time.Since(statsWriter.startTime).Seconds()
|
||||
|
||||
httpStatsMetric.updateStats(api, statsWriter, r, durationSecs)
|
||||
|
||||
atomic.AddUint64(&httpStatsMetric.totalInputBytes, in.countBytes)
|
||||
atomic.AddUint64(&httpStatsMetric.totalOutputBytes, out.countBytes)
|
||||
}
|
||||
}
|
||||
|
||||
// Inc increments the api stats counter.
|
||||
func (stats *HTTPAPIStats) Inc(api string) {
|
||||
if stats == nil {
|
||||
return
|
||||
}
|
||||
stats.Lock()
|
||||
defer stats.Unlock()
|
||||
if stats.apiStats == nil {
|
||||
stats.apiStats = make(map[string]int)
|
||||
}
|
||||
stats.apiStats[api]++
|
||||
}
|
||||
|
||||
// Dec increments the api stats counter.
|
||||
func (stats *HTTPAPIStats) Dec(api string) {
|
||||
if stats == nil {
|
||||
return
|
||||
}
|
||||
stats.Lock()
|
||||
defer stats.Unlock()
|
||||
if val, ok := stats.apiStats[api]; ok && val > 0 {
|
||||
stats.apiStats[api]--
|
||||
}
|
||||
}
|
||||
|
||||
// Load returns the recorded stats.
|
||||
func (stats *HTTPAPIStats) Load() map[string]int {
|
||||
stats.Lock()
|
||||
defer stats.Unlock()
|
||||
var apiStats = make(map[string]int, len(stats.apiStats))
|
||||
for k, v := range stats.apiStats {
|
||||
apiStats[k] = v
|
||||
}
|
||||
return apiStats
|
||||
}
|
||||
|
||||
func (st *HTTPStats) getInputBytes() uint64 {
|
||||
return atomic.LoadUint64(&st.totalInputBytes)
|
||||
}
|
||||
|
||||
func (st *HTTPStats) getOutputBytes() uint64 {
|
||||
return atomic.LoadUint64(&st.totalOutputBytes)
|
||||
}
|
||||
|
||||
// Update statistics from http request and response data
|
||||
func (st *HTTPStats) updateStats(api string, w http.ResponseWriter, r *http.Request, durationSecs float64) {
|
||||
var code int
|
||||
|
||||
if res, ok := w.(*responseWrapper); ok {
|
||||
code = res.statusCode
|
||||
}
|
||||
|
||||
// A successful request has a 2xx response code
|
||||
successReq := code >= http.StatusOK && code < http.StatusMultipleChoices
|
||||
|
||||
if !strings.HasSuffix(r.URL.Path, systemPath) {
|
||||
st.totalS3Requests.Inc(api)
|
||||
if !successReq && code != 0 {
|
||||
st.totalS3Errors.Inc(api)
|
||||
}
|
||||
}
|
||||
|
||||
if r.Method == http.MethodGet {
|
||||
// Increment the prometheus http request response histogram with appropriate label
|
||||
httpRequestsDuration.With(prometheus.Labels{"api": api}).Observe(durationSecs)
|
||||
}
|
||||
}
|
||||
|
||||
// WriteHeader - writes http status code
|
||||
func (w *responseWrapper) WriteHeader(code int) {
|
||||
if !w.headWritten {
|
||||
w.statusCode = code
|
||||
w.headWritten = true
|
||||
|
||||
w.ResponseWriter.WriteHeader(code)
|
||||
}
|
||||
}
|
||||
|
||||
// Flush - Calls the underlying Flush.
|
||||
func (w *responseWrapper) Flush() {
|
||||
if f, ok := w.ResponseWriter.(http.Flusher); ok {
|
||||
f.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
func (w *writeCounter) Write(p []byte) (int, error) {
|
||||
n, err := w.ResponseWriter.Write(p)
|
||||
atomic.AddUint64(&w.countBytes, uint64(n))
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (r *readCounter) Read(p []byte) (int, error) {
|
||||
n, err := r.ReadCloser.Read(p)
|
||||
atomic.AddUint64(&r.countBytes, uint64(n))
|
||||
return n, err
|
||||
}
|
70
api/metrics/collector.go
Normal file
70
api/metrics/collector.go
Normal file
|
@ -0,0 +1,70 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"github.com/nspcc-dev/neofs-s3-gate/misc"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
type stats struct {
|
||||
desc *prometheus.Desc
|
||||
}
|
||||
|
||||
var (
|
||||
versionInfo = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Namespace: "neofs_s3",
|
||||
Name: "version_info",
|
||||
Help: "Version of current NeoFS S3 Gate instance",
|
||||
},
|
||||
[]string{
|
||||
// current version
|
||||
"version",
|
||||
// build time of the current version
|
||||
"build_time",
|
||||
},
|
||||
)
|
||||
|
||||
statsMetrics = &stats{
|
||||
desc: prometheus.NewDesc("neofs_s3_stats", "Statistics exposed by MinIO server", nil, nil),
|
||||
}
|
||||
)
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(versionInfo)
|
||||
prometheus.MustRegister(statsMetrics)
|
||||
prometheus.MustRegister(httpRequestsDuration)
|
||||
}
|
||||
|
||||
func collectNetworkMetrics(ch chan<- prometheus.Metric) {
|
||||
// Network Sent/Received Bytes (Outbound)
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
prometheus.NewDesc(
|
||||
prometheus.BuildFQName("neofs_s3", "tx", "bytes_total"),
|
||||
"Total number of bytes sent by current NeoFS S3 Gate instance",
|
||||
nil, nil),
|
||||
prometheus.CounterValue,
|
||||
float64(httpStatsMetric.getInputBytes()),
|
||||
)
|
||||
|
||||
ch <- prometheus.MustNewConstMetric(
|
||||
prometheus.NewDesc(
|
||||
prometheus.BuildFQName("neofs_s3", "rx", "bytes_total"),
|
||||
"Total number of bytes received by current NeoFS S3 Gate instance",
|
||||
nil, nil),
|
||||
prometheus.CounterValue,
|
||||
float64(httpStatsMetric.getOutputBytes()),
|
||||
)
|
||||
}
|
||||
|
||||
func (s *stats) Describe(ch chan<- *prometheus.Desc) {
|
||||
ch <- s.desc
|
||||
}
|
||||
|
||||
func (s *stats) Collect(ch chan<- prometheus.Metric) {
|
||||
// Expose current version information
|
||||
versionInfo.WithLabelValues(misc.Version, misc.Build).Set(1.0)
|
||||
|
||||
// connect collectors
|
||||
collectHTTPMetrics(ch)
|
||||
collectNetworkMetrics(ch)
|
||||
}
|
377
api/pool/pool.go
Normal file
377
api/pool/pool.go
Normal file
|
@ -0,0 +1,377 @@
|
|||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"math/rand"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/service"
|
||||
"github.com/nspcc-dev/neofs-api-go/state"
|
||||
"github.com/pkg/errors"
|
||||
"go.uber.org/atomic"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
)
|
||||
|
||||
type (
|
||||
node struct {
|
||||
index int32
|
||||
address string
|
||||
weight uint32
|
||||
usedAt time.Time
|
||||
conn *grpc.ClientConn
|
||||
}
|
||||
|
||||
Client interface {
|
||||
Status() error
|
||||
GetConnection(context.Context) (*grpc.ClientConn, error)
|
||||
SessionToken(ctx context.Context, params *SessionParams) (*service.Token, error)
|
||||
}
|
||||
|
||||
Pool interface {
|
||||
Client
|
||||
|
||||
Close()
|
||||
ReBalance(ctx context.Context)
|
||||
}
|
||||
|
||||
Peer struct {
|
||||
Address string
|
||||
Weight float64
|
||||
}
|
||||
|
||||
Config struct {
|
||||
keepalive.ClientParameters
|
||||
|
||||
ConnectionTTL time.Duration
|
||||
ConnectTimeout time.Duration
|
||||
RequestTimeout time.Duration
|
||||
|
||||
Peers []Peer
|
||||
|
||||
GRPCVerbose bool
|
||||
GRPCLogger grpclog.LoggerV2
|
||||
|
||||
Logger *zap.Logger
|
||||
PrivateKey *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
pool struct {
|
||||
log *zap.Logger
|
||||
|
||||
ttl time.Duration
|
||||
|
||||
conTimeout time.Duration
|
||||
reqTimeout time.Duration
|
||||
opts keepalive.ClientParameters
|
||||
|
||||
currentIdx *atomic.Int32
|
||||
currentConn *grpc.ClientConn
|
||||
|
||||
reqHealth *state.HealthRequest
|
||||
|
||||
*sync.Mutex
|
||||
nodes []*node
|
||||
keys []uint32
|
||||
conns map[uint32][]*node
|
||||
key *ecdsa.PrivateKey
|
||||
tokens map[string]*service.Token
|
||||
|
||||
unhealthy *atomic.Error
|
||||
}
|
||||
)
|
||||
|
||||
var (
|
||||
errBootstrapping = errors.New("bootstrapping")
|
||||
errEmptyConnection = errors.New("empty connection")
|
||||
errNoHealthyConnections = errors.New("no active connections")
|
||||
)
|
||||
|
||||
func New(cfg *Config) (Pool, error) {
|
||||
p := &pool{
|
||||
log: cfg.Logger,
|
||||
key: cfg.PrivateKey,
|
||||
Mutex: new(sync.Mutex),
|
||||
keys: make([]uint32, 0),
|
||||
nodes: make([]*node, 0),
|
||||
conns: make(map[uint32][]*node),
|
||||
tokens: make(map[string]*service.Token),
|
||||
|
||||
currentIdx: atomic.NewInt32(-1),
|
||||
|
||||
ttl: cfg.ConnectionTTL,
|
||||
|
||||
conTimeout: cfg.ConnectTimeout,
|
||||
reqTimeout: cfg.RequestTimeout,
|
||||
opts: cfg.ClientParameters,
|
||||
|
||||
unhealthy: atomic.NewError(errBootstrapping),
|
||||
}
|
||||
|
||||
if cfg.GRPCVerbose {
|
||||
grpclog.SetLoggerV2(cfg.GRPCLogger)
|
||||
}
|
||||
|
||||
seed := time.Now().UnixNano()
|
||||
|
||||
rand.Seed(seed)
|
||||
cfg.Logger.Info("used random seed", zap.Int64("seed", seed))
|
||||
|
||||
p.reqHealth = new(state.HealthRequest)
|
||||
p.reqHealth.SetTTL(service.NonForwardingTTL)
|
||||
|
||||
if err := service.SignRequestData(cfg.PrivateKey, p.reqHealth); err != nil {
|
||||
return nil, errors.Wrap(err, "could not sign `HealthRequest`")
|
||||
}
|
||||
|
||||
for i := range cfg.Peers {
|
||||
if cfg.Peers[i].Address == "" {
|
||||
cfg.Logger.Warn("skip, empty address")
|
||||
break
|
||||
}
|
||||
|
||||
p.nodes = append(p.nodes, &node{
|
||||
index: int32(i),
|
||||
address: cfg.Peers[i].Address,
|
||||
weight: uint32(cfg.Peers[i].Weight * 100),
|
||||
})
|
||||
|
||||
cfg.Logger.Info("add new peer",
|
||||
zap.String("address", p.nodes[i].address),
|
||||
zap.Uint32("weight", p.nodes[i].weight))
|
||||
}
|
||||
|
||||
return p, nil
|
||||
}
|
||||
|
||||
func (p *pool) Status() error {
|
||||
return p.unhealthy.Load()
|
||||
}
|
||||
|
||||
func (p *pool) Close() {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
for i := range p.nodes {
|
||||
if p.nodes[i] == nil || p.nodes[i].conn == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
p.log.Warn("close connection",
|
||||
zap.String("address", p.nodes[i].address),
|
||||
zap.Error(p.nodes[i].conn.Close()))
|
||||
}
|
||||
}
|
||||
|
||||
func (p *pool) ReBalance(ctx context.Context) {
|
||||
p.Lock()
|
||||
defer func() {
|
||||
p.Unlock()
|
||||
|
||||
_, err := p.GetConnection(ctx)
|
||||
p.unhealthy.Store(err)
|
||||
}()
|
||||
|
||||
keys := make(map[uint32]struct{})
|
||||
tokens := make(map[string]*service.Token)
|
||||
|
||||
for i := range p.nodes {
|
||||
var (
|
||||
idx = -1
|
||||
exists bool
|
||||
err error
|
||||
start = time.Now()
|
||||
tkn *service.Token
|
||||
conn = p.nodes[i].conn
|
||||
weight = p.nodes[i].weight
|
||||
)
|
||||
|
||||
if err = ctx.Err(); err != nil {
|
||||
p.log.Warn("something went wrong", zap.Error(err))
|
||||
p.unhealthy.Store(err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if conn == nil {
|
||||
p.log.Debug("empty connection, try to connect",
|
||||
zap.String("address", p.nodes[i].address))
|
||||
|
||||
{ // try to connect
|
||||
ctx, cancel := context.WithTimeout(ctx, p.conTimeout)
|
||||
conn, err = grpc.DialContext(ctx, p.nodes[i].address,
|
||||
grpc.WithBlock(),
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithKeepaliveParams(p.opts))
|
||||
cancel()
|
||||
}
|
||||
|
||||
if err != nil || conn == nil {
|
||||
p.log.Warn("skip, could not connect to node",
|
||||
zap.String("address", p.nodes[i].address),
|
||||
zap.Stringer("elapsed", time.Since(start)),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
{ // try to prepare token
|
||||
ctx, cancel := context.WithTimeout(ctx, p.reqTimeout)
|
||||
tkn, err = generateToken(ctx, conn, p.key)
|
||||
cancel()
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
p.log.Debug("could not prepare session token",
|
||||
zap.String("address", p.nodes[i].address),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
tokens[conn.Target()] = tkn
|
||||
|
||||
p.nodes[i].conn = conn
|
||||
p.nodes[i].usedAt = time.Now()
|
||||
p.log.Debug("connected to node", zap.String("address", p.nodes[i].address))
|
||||
} else if tkn, exists = p.tokens[conn.Target()]; exists {
|
||||
// token exists, ignore
|
||||
} else if tkn, err = generateToken(ctx, conn, p.key); err != nil {
|
||||
p.log.Error("could not prepare session token",
|
||||
zap.String("address", p.nodes[i].address),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
for j := range p.conns[weight] {
|
||||
if p.conns[weight][j] != nil && p.conns[weight][j].conn == conn {
|
||||
idx = j
|
||||
exists = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
usedAt := time.Since(p.nodes[i].usedAt)
|
||||
|
||||
// if something wrong with connection (bad state, unhealthy or not used a long time), try to close it and remove
|
||||
if err = p.isAlive(ctx, conn); err != nil || usedAt > p.ttl {
|
||||
p.log.Warn("connection not alive",
|
||||
zap.String("address", p.nodes[i].address),
|
||||
zap.Stringer("since", usedAt),
|
||||
zap.Error(err))
|
||||
|
||||
if exists {
|
||||
// remove from connections
|
||||
p.conns[weight] = append(p.conns[weight][:idx], p.conns[weight][idx+1:]...)
|
||||
}
|
||||
|
||||
// remove token
|
||||
delete(tokens, conn.Target())
|
||||
|
||||
if err = conn.Close(); err != nil {
|
||||
p.log.Warn("could not close bad connection",
|
||||
zap.String("address", p.nodes[i].address),
|
||||
zap.Stringer("since", usedAt),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
||||
if p.nodes[i].conn != nil {
|
||||
p.nodes[i].conn = nil
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
keys[weight] = struct{}{}
|
||||
|
||||
p.log.Debug("connection alive",
|
||||
zap.String("address", p.nodes[i].address),
|
||||
zap.Stringer("since", usedAt))
|
||||
|
||||
if !exists {
|
||||
p.conns[weight] = append(p.conns[weight], p.nodes[i])
|
||||
}
|
||||
|
||||
if tkn != nil {
|
||||
tokens[conn.Target()] = tkn
|
||||
}
|
||||
}
|
||||
|
||||
p.tokens = tokens
|
||||
p.keys = p.keys[:0]
|
||||
for w := range keys {
|
||||
p.keys = append(p.keys, w)
|
||||
}
|
||||
|
||||
sort.Slice(p.keys, func(i, j int) bool {
|
||||
return p.keys[i] > p.keys[j]
|
||||
})
|
||||
}
|
||||
|
||||
func (p *pool) GetConnection(ctx context.Context) (*grpc.ClientConn, error) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
if err := p.isAlive(ctx, p.currentConn); err == nil {
|
||||
if id := p.currentIdx.Load(); id != -1 && p.nodes[id] != nil {
|
||||
p.nodes[id].usedAt = time.Now()
|
||||
}
|
||||
|
||||
return p.currentConn, nil
|
||||
}
|
||||
|
||||
for _, w := range p.keys {
|
||||
switch ln := len(p.conns[w]); ln {
|
||||
case 0:
|
||||
continue
|
||||
case 1:
|
||||
p.currentConn = p.conns[w][0].conn
|
||||
p.conns[w][0].usedAt = time.Now()
|
||||
p.currentIdx.Store(p.conns[w][0].index)
|
||||
return p.currentConn, nil
|
||||
default: // > 1
|
||||
i := rand.Intn(ln)
|
||||
p.currentConn = p.conns[w][i].conn
|
||||
p.conns[w][i].usedAt = time.Now()
|
||||
p.currentIdx.Store(p.conns[w][i].index)
|
||||
return p.currentConn, nil
|
||||
}
|
||||
}
|
||||
|
||||
p.currentConn = nil
|
||||
p.currentIdx.Store(-1)
|
||||
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
return nil, errNoHealthyConnections
|
||||
}
|
||||
|
||||
func (p *pool) isAlive(ctx context.Context, cur *grpc.ClientConn) error {
|
||||
if cur == nil {
|
||||
return errEmptyConnection
|
||||
}
|
||||
|
||||
switch st := cur.GetState(); st {
|
||||
case connectivity.Idle, connectivity.Ready, connectivity.Connecting:
|
||||
ctx, cancel := context.WithTimeout(ctx, p.reqTimeout)
|
||||
defer cancel()
|
||||
|
||||
res, err := state.NewStatusClient(cur).HealthCheck(ctx, p.reqHealth)
|
||||
if err != nil {
|
||||
p.log.Warn("could not fetch health-check", zap.Error(err))
|
||||
|
||||
return err
|
||||
} else if !res.Healthy {
|
||||
return errors.New(res.Status)
|
||||
}
|
||||
|
||||
return nil
|
||||
default:
|
||||
return errors.New(st.String())
|
||||
}
|
||||
}
|
123
api/pool/session.go
Normal file
123
api/pool/session.go
Normal file
|
@ -0,0 +1,123 @@
|
|||
package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"math"
|
||||
|
||||
"github.com/nspcc-dev/neofs-api-go/refs"
|
||||
"github.com/nspcc-dev/neofs-api-go/service"
|
||||
"github.com/nspcc-dev/neofs-api-go/session"
|
||||
crypto "github.com/nspcc-dev/neofs-crypto"
|
||||
"github.com/pkg/errors"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type (
|
||||
queryParams struct {
|
||||
key *ecdsa.PrivateKey
|
||||
addr refs.Address
|
||||
verb service.Token_Info_Verb
|
||||
}
|
||||
|
||||
SessionParams struct {
|
||||
Addr refs.Address
|
||||
Conn *grpc.ClientConn
|
||||
Verb service.Token_Info_Verb
|
||||
}
|
||||
)
|
||||
|
||||
func (p *pool) fetchToken(ctx context.Context, con *grpc.ClientConn) (*session.Token, error) {
|
||||
p.Lock()
|
||||
defer p.Unlock()
|
||||
|
||||
// if we had token for current connection - return it
|
||||
if tkn, ok := p.tokens[con.Target()]; ok {
|
||||
return tkn, nil
|
||||
}
|
||||
|
||||
// try to generate token for connection
|
||||
tkn, err := generateToken(ctx, con, p.key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
p.tokens[con.Target()] = tkn
|
||||
return tkn, nil
|
||||
}
|
||||
|
||||
// SessionToken returns session token for connection
|
||||
func (p *pool) SessionToken(ctx context.Context, params *SessionParams) (*service.Token, error) {
|
||||
var (
|
||||
err error
|
||||
tkn *session.Token
|
||||
)
|
||||
|
||||
if params.Conn == nil {
|
||||
return nil, errors.New("empty connection")
|
||||
} else if tkn, err = p.fetchToken(ctx, params.Conn); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return prepareToken(tkn, queryParams{
|
||||
key: p.key,
|
||||
addr: params.Addr,
|
||||
verb: params.Verb,
|
||||
})
|
||||
}
|
||||
|
||||
// creates token using
|
||||
func generateToken(ctx context.Context, con *grpc.ClientConn, key *ecdsa.PrivateKey) (*service.Token, error) {
|
||||
owner, err := refs.NewOwnerID(&key.PublicKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
token := new(service.Token)
|
||||
token.SetOwnerID(owner)
|
||||
token.SetExpirationEpoch(math.MaxUint64)
|
||||
token.SetOwnerKey(crypto.MarshalPublicKey(&key.PublicKey))
|
||||
|
||||
creator, err := session.NewGRPCCreator(con, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res, err := creator.Create(ctx, token)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
token.SetID(res.GetID())
|
||||
token.SetSessionKey(res.GetSessionKey())
|
||||
|
||||
return token, nil
|
||||
}
|
||||
|
||||
func prepareToken(t *service.Token, p queryParams) (*service.Token, error) {
|
||||
sig := make([]byte, len(t.Signature))
|
||||
copy(sig, t.Signature)
|
||||
|
||||
token := &service.Token{
|
||||
Token_Info: service.Token_Info{
|
||||
ID: t.ID,
|
||||
OwnerID: t.OwnerID,
|
||||
Verb: t.Verb,
|
||||
Address: t.Address,
|
||||
TokenLifetime: t.TokenLifetime,
|
||||
SessionKey: t.SessionKey,
|
||||
OwnerKey: t.OwnerKey,
|
||||
},
|
||||
Signature: sig,
|
||||
}
|
||||
|
||||
token.SetAddress(p.addr)
|
||||
token.SetVerb(p.verb)
|
||||
|
||||
err := service.AddSignatureWithKey(p.key, service.NewSignedSessionToken(token))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return token, nil
|
||||
}
|
112
api/reqinfo.go
Normal file
112
api/reqinfo.go
Normal file
|
@ -0,0 +1,112 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type (
|
||||
// KeyVal - appended to ReqInfo.Tags
|
||||
KeyVal struct {
|
||||
Key string
|
||||
Val string
|
||||
}
|
||||
|
||||
// ReqInfo stores the request info.
|
||||
ReqInfo struct {
|
||||
sync.RWMutex
|
||||
RemoteHost string // Client Host/IP
|
||||
Host string // Node Host/IP
|
||||
UserAgent string // User Agent
|
||||
DeploymentID string // x-minio-deployment-id
|
||||
RequestID string // x-amz-request-id
|
||||
API string // API name - GetObject PutObject NewMultipartUpload etc.
|
||||
BucketName string // Bucket name
|
||||
ObjectName string // Object name
|
||||
tags []KeyVal // Any additional info not accommodated by above fields
|
||||
}
|
||||
)
|
||||
|
||||
// Key used for Get/SetReqInfo
|
||||
type contextKeyType string
|
||||
|
||||
const ctxRequestInfo = contextKeyType("NeoFS-S3-Gate")
|
||||
|
||||
// NewReqInfo :
|
||||
func NewReqInfo(remoteHost, userAgent, deploymentID, requestID, api, bucket, object string) *ReqInfo {
|
||||
req := ReqInfo{}
|
||||
req.RemoteHost = remoteHost
|
||||
req.UserAgent = userAgent
|
||||
req.API = api
|
||||
req.DeploymentID = deploymentID
|
||||
req.RequestID = requestID
|
||||
req.BucketName = bucket
|
||||
req.ObjectName = object
|
||||
return &req
|
||||
}
|
||||
|
||||
// AppendTags - appends key/val to ReqInfo.tags
|
||||
func (r *ReqInfo) AppendTags(key string, val string) *ReqInfo {
|
||||
if r == nil {
|
||||
return nil
|
||||
}
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
r.tags = append(r.tags, KeyVal{key, val})
|
||||
return r
|
||||
}
|
||||
|
||||
// SetTags - sets key/val to ReqInfo.tags
|
||||
func (r *ReqInfo) SetTags(key string, val string) *ReqInfo {
|
||||
if r == nil {
|
||||
return nil
|
||||
}
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
// Search of tag key already exists in tags
|
||||
var updated bool
|
||||
for _, tag := range r.tags {
|
||||
if tag.Key == key {
|
||||
tag.Val = val
|
||||
updated = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !updated {
|
||||
// Append to the end of tags list
|
||||
r.tags = append(r.tags, KeyVal{key, val})
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
// GetTags - returns the user defined tags
|
||||
func (r *ReqInfo) GetTags() []KeyVal {
|
||||
if r == nil {
|
||||
return nil
|
||||
}
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
return append([]KeyVal(nil), r.tags...)
|
||||
}
|
||||
|
||||
// SetReqInfo sets ReqInfo in the context.
|
||||
func SetReqInfo(ctx context.Context, req *ReqInfo) context.Context {
|
||||
if ctx == nil {
|
||||
return nil
|
||||
}
|
||||
return context.WithValue(ctx, ctxRequestInfo, req)
|
||||
}
|
||||
|
||||
// GetReqInfo returns ReqInfo if set.
|
||||
func GetReqInfo(ctx context.Context) *ReqInfo {
|
||||
if ctx != nil {
|
||||
r, ok := ctx.Value(ctxRequestInfo).(*ReqInfo)
|
||||
if ok {
|
||||
return r
|
||||
}
|
||||
r = &ReqInfo{}
|
||||
SetReqInfo(ctx, r)
|
||||
return r
|
||||
}
|
||||
return nil
|
||||
}
|
200
api/response.go
Normal file
200
api/response.go
Normal file
|
@ -0,0 +1,200 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/nspcc-dev/neofs-s3-gate/misc"
|
||||
)
|
||||
|
||||
type (
|
||||
// ErrorResponse - error response format
|
||||
ErrorResponse struct {
|
||||
XMLName xml.Name `xml:"Error" json:"-"`
|
||||
Code string
|
||||
Message string
|
||||
Key string `xml:"Key,omitempty" json:"Key,omitempty"`
|
||||
BucketName string `xml:"BucketName,omitempty" json:"BucketName,omitempty"`
|
||||
Resource string
|
||||
RequestID string `xml:"RequestId" json:"RequestId"`
|
||||
HostID string `xml:"HostId" json:"HostId"`
|
||||
|
||||
// Region where the bucket is located. This header is returned
|
||||
// only in HEAD bucket and ListObjects response.
|
||||
Region string `xml:"Region,omitempty" json:"Region,omitempty"`
|
||||
|
||||
// Captures the server string returned in response header.
|
||||
Server string `xml:"-" json:"-"`
|
||||
|
||||
// Underlying HTTP status code for the returned error
|
||||
StatusCode int `xml:"-" json:"-"`
|
||||
}
|
||||
|
||||
// APIError structure
|
||||
Error struct {
|
||||
Code string
|
||||
Description string
|
||||
HTTPStatusCode int
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
hdrServerInfo = "Server"
|
||||
hdrAcceptRanges = "Accept-Ranges"
|
||||
hdrContentType = "Content-Type"
|
||||
hdrContentLength = "Content-Length"
|
||||
hdrRetryAfter = "Retry-After"
|
||||
|
||||
hdrAmzCopySource = "X-Amz-Copy-Source"
|
||||
|
||||
// Response request id.
|
||||
hdrAmzRequestID = "x-amz-request-id"
|
||||
|
||||
// hdrSSE is the general AWS SSE HTTP header key.
|
||||
hdrSSE = "X-Amz-Server-Side-Encryption"
|
||||
|
||||
// hdrSSECustomerKey is the HTTP header key referencing the
|
||||
// SSE-C client-provided key..
|
||||
hdrSSECustomerKey = hdrSSE + "-Customer-Key"
|
||||
|
||||
// hdrSSECopyKey is the HTTP header key referencing the SSE-C
|
||||
// client-provided key for SSE-C copy requests.
|
||||
hdrSSECopyKey = "X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key"
|
||||
)
|
||||
|
||||
var deploymentID, _ = uuid.NewRandom()
|
||||
|
||||
// Non exhaustive list of AWS S3 standard error responses -
|
||||
// http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
|
||||
var s3ErrorResponseMap = map[string]string{
|
||||
"AccessDenied": "Access Denied.",
|
||||
"BadDigest": "The Content-Md5 you specified did not match what we received.",
|
||||
"EntityTooSmall": "Your proposed upload is smaller than the minimum allowed object size.",
|
||||
"EntityTooLarge": "Your proposed upload exceeds the maximum allowed object size.",
|
||||
"IncompleteBody": "You did not provide the number of bytes specified by the Content-Length HTTP header.",
|
||||
"InternalError": "We encountered an internal error, please try again.",
|
||||
"InvalidAccessKeyId": "The access key ID you provided does not exist in our records.",
|
||||
"InvalidBucketName": "The specified bucket is not valid.",
|
||||
"InvalidDigest": "The Content-Md5 you specified is not valid.",
|
||||
"InvalidRange": "The requested range is not satisfiable",
|
||||
"MalformedXML": "The XML you provided was not well-formed or did not validate against our published schema.",
|
||||
"MissingContentLength": "You must provide the Content-Length HTTP header.",
|
||||
"MissingContentMD5": "Missing required header for this request: Content-Md5.",
|
||||
"MissingRequestBodyError": "Request body is empty.",
|
||||
"NoSuchBucket": "The specified bucket does not exist.",
|
||||
"NoSuchBucketPolicy": "The bucket policy does not exist",
|
||||
"NoSuchKey": "The specified key does not exist.",
|
||||
"NoSuchUpload": "The specified multipart upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed.",
|
||||
"NotImplemented": "A header you provided implies functionality that is not implemented",
|
||||
"PreconditionFailed": "At least one of the pre-conditions you specified did not hold",
|
||||
"RequestTimeTooSkewed": "The difference between the request time and the server's time is too large.",
|
||||
"SignatureDoesNotMatch": "The request signature we calculated does not match the signature you provided. Check your key and signing method.",
|
||||
"MethodNotAllowed": "The specified method is not allowed against this resource.",
|
||||
"InvalidPart": "One or more of the specified parts could not be found.",
|
||||
"InvalidPartOrder": "The list of parts was not in ascending order. The parts list must be specified in order by part number.",
|
||||
"InvalidObjectState": "The operation is not valid for the current state of the object.",
|
||||
"AuthorizationHeaderMalformed": "The authorization header is malformed; the region is wrong.",
|
||||
"MalformedPOSTRequest": "The body of your POST request is not well-formed multipart/form-data.",
|
||||
"BucketNotEmpty": "The bucket you tried to delete is not empty",
|
||||
"AllAccessDisabled": "All access to this bucket has been disabled.",
|
||||
"MalformedPolicy": "Policy has invalid resource.",
|
||||
"MissingFields": "Missing fields in request.",
|
||||
"AuthorizationQueryParametersError": "Error parsing the X-Amz-Credential parameter; the Credential is mal-formed; expecting \"<YOUR-AKID>/YYYYMMDD/REGION/SERVICE/aws4_request\".",
|
||||
"MalformedDate": "Invalid date format header, expected to be in ISO8601, RFC1123 or RFC1123Z time format.",
|
||||
"BucketAlreadyOwnedByYou": "Your previous request to create the named bucket succeeded and you already own it.",
|
||||
"InvalidDuration": "Duration provided in the request is invalid.",
|
||||
"XAmzContentSHA256Mismatch": "The provided 'x-amz-content-sha256' header does not match what was computed.",
|
||||
// Add new API errors here.
|
||||
}
|
||||
|
||||
// WriteErrorResponse writes error headers
|
||||
func WriteErrorResponse(ctx context.Context, w http.ResponseWriter, err Error, reqURL *url.URL) {
|
||||
switch err.Code {
|
||||
case "SlowDown", "XNeoFSServerNotInitialized", "XNeoFSReadQuorum", "XNeoFSWriteQuorum":
|
||||
// Set retry-after header to indicate user-agents to retry request after 120secs.
|
||||
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After
|
||||
w.Header().Set(hdrRetryAfter, "120")
|
||||
case "AccessDenied":
|
||||
// TODO process when the request is from browser and also if browser
|
||||
}
|
||||
|
||||
// Generate error response.
|
||||
errorResponse := getAPIErrorResponse(ctx, err, reqURL.Path,
|
||||
w.Header().Get(hdrAmzRequestID), deploymentID.String())
|
||||
encodedErrorResponse := EncodeResponse(errorResponse)
|
||||
writeResponse(w, err.HTTPStatusCode, encodedErrorResponse, mimeXML)
|
||||
}
|
||||
|
||||
// If none of the http routes match respond with appropriate errors
|
||||
func errorResponseHandler(w http.ResponseWriter, r *http.Request) {
|
||||
desc := fmt.Sprintf("Unknown API request at %s", r.URL.Path)
|
||||
WriteErrorResponse(r.Context(), w, Error{
|
||||
Code: "XMinioUnknownAPIRequest",
|
||||
Description: desc,
|
||||
HTTPStatusCode: http.StatusBadRequest,
|
||||
}, r.URL)
|
||||
}
|
||||
|
||||
// Write http common headers
|
||||
func setCommonHeaders(w http.ResponseWriter) {
|
||||
w.Header().Set(hdrServerInfo, "NeoFS-S3-Gate/"+misc.Version)
|
||||
w.Header().Set(hdrAcceptRanges, "bytes")
|
||||
|
||||
// Remove sensitive information
|
||||
removeSensitiveHeaders(w.Header())
|
||||
}
|
||||
|
||||
// removeSensitiveHeaders removes confidential encryption
|
||||
// information - e.g. the SSE-C key - from the HTTP headers.
|
||||
// It has the same semantics as RemoveSensitiveEntries.
|
||||
func removeSensitiveHeaders(h http.Header) {
|
||||
h.Del(hdrSSECustomerKey)
|
||||
h.Del(hdrSSECopyKey)
|
||||
}
|
||||
|
||||
func writeResponse(w http.ResponseWriter, statusCode int, response []byte, mType mimeType) {
|
||||
setCommonHeaders(w)
|
||||
if mType != mimeNone {
|
||||
w.Header().Set(hdrContentType, string(mType))
|
||||
}
|
||||
w.Header().Set(hdrContentLength, strconv.Itoa(len(response)))
|
||||
w.WriteHeader(statusCode)
|
||||
if response != nil {
|
||||
_, _ = w.Write(response)
|
||||
w.(http.Flusher).Flush()
|
||||
}
|
||||
}
|
||||
|
||||
// EncodeResponse encodes the response headers into XML format.
|
||||
func EncodeResponse(response interface{}) []byte {
|
||||
var bytesBuffer bytes.Buffer
|
||||
bytesBuffer.WriteString(xml.Header)
|
||||
_ = xml.
|
||||
NewEncoder(&bytesBuffer).
|
||||
Encode(response)
|
||||
return bytesBuffer.Bytes()
|
||||
}
|
||||
|
||||
// WriteSuccessResponseXML writes success headers and response if any,
|
||||
// with content-type set to `application/xml`.
|
||||
func WriteSuccessResponseXML(w http.ResponseWriter, response []byte) {
|
||||
writeResponse(w, http.StatusOK, response, mimeXML)
|
||||
}
|
||||
|
||||
// Error - Returns S3 error string.
|
||||
func (e ErrorResponse) Error() string {
|
||||
if e.Message == "" {
|
||||
msg, ok := s3ErrorResponseMap[e.Code]
|
||||
if !ok {
|
||||
msg = fmt.Sprintf("Error response code %s.", e.Code)
|
||||
}
|
||||
return msg
|
||||
}
|
||||
return e.Message
|
||||
}
|
304
api/router.go
Normal file
304
api/router.go
Normal file
|
@ -0,0 +1,304 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/nspcc-dev/neofs-s3-gate/api/metrics"
|
||||
"github.com/nspcc-dev/neofs-s3-gate/auth"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
Handler interface {
|
||||
HeadObjectHandler(http.ResponseWriter, *http.Request)
|
||||
CopyObjectPartHandler(http.ResponseWriter, *http.Request)
|
||||
PutObjectPartHandler(http.ResponseWriter, *http.Request)
|
||||
ListObjectPartsHandler(http.ResponseWriter, *http.Request)
|
||||
CompleteMultipartUploadHandler(http.ResponseWriter, *http.Request)
|
||||
NewMultipartUploadHandler(http.ResponseWriter, *http.Request)
|
||||
AbortMultipartUploadHandler(http.ResponseWriter, *http.Request)
|
||||
GetObjectACLHandler(http.ResponseWriter, *http.Request)
|
||||
PutObjectACLHandler(http.ResponseWriter, *http.Request)
|
||||
GetObjectTaggingHandler(http.ResponseWriter, *http.Request)
|
||||
PutObjectTaggingHandler(http.ResponseWriter, *http.Request)
|
||||
DeleteObjectTaggingHandler(http.ResponseWriter, *http.Request)
|
||||
SelectObjectContentHandler(http.ResponseWriter, *http.Request)
|
||||
GetObjectRetentionHandler(http.ResponseWriter, *http.Request)
|
||||
GetObjectLegalHoldHandler(http.ResponseWriter, *http.Request)
|
||||
GetObjectHandler(http.ResponseWriter, *http.Request)
|
||||
CopyObjectHandler(http.ResponseWriter, *http.Request)
|
||||
PutObjectRetentionHandler(http.ResponseWriter, *http.Request)
|
||||
PutObjectLegalHoldHandler(http.ResponseWriter, *http.Request)
|
||||
PutObjectHandler(http.ResponseWriter, *http.Request)
|
||||
DeleteObjectHandler(http.ResponseWriter, *http.Request)
|
||||
GetBucketLocationHandler(http.ResponseWriter, *http.Request)
|
||||
GetBucketPolicyHandler(http.ResponseWriter, *http.Request)
|
||||
GetBucketLifecycleHandler(http.ResponseWriter, *http.Request)
|
||||
GetBucketEncryptionHandler(http.ResponseWriter, *http.Request)
|
||||
GetBucketACLHandler(http.ResponseWriter, *http.Request)
|
||||
PutBucketACLHandler(http.ResponseWriter, *http.Request)
|
||||
GetBucketCorsHandler(http.ResponseWriter, *http.Request)
|
||||
GetBucketWebsiteHandler(http.ResponseWriter, *http.Request)
|
||||
GetBucketAccelerateHandler(http.ResponseWriter, *http.Request)
|
||||
GetBucketRequestPaymentHandler(http.ResponseWriter, *http.Request)
|
||||
GetBucketLoggingHandler(http.ResponseWriter, *http.Request)
|
||||
GetBucketReplicationHandler(http.ResponseWriter, *http.Request)
|
||||
GetBucketTaggingHandler(http.ResponseWriter, *http.Request)
|
||||
DeleteBucketWebsiteHandler(http.ResponseWriter, *http.Request)
|
||||
DeleteBucketTaggingHandler(http.ResponseWriter, *http.Request)
|
||||
GetBucketObjectLockConfigHandler(http.ResponseWriter, *http.Request)
|
||||
GetBucketVersioningHandler(http.ResponseWriter, *http.Request)
|
||||
GetBucketNotificationHandler(http.ResponseWriter, *http.Request)
|
||||
ListenBucketNotificationHandler(http.ResponseWriter, *http.Request)
|
||||
ListMultipartUploadsHandler(http.ResponseWriter, *http.Request)
|
||||
ListObjectsV2MHandler(http.ResponseWriter, *http.Request)
|
||||
ListObjectsV2Handler(http.ResponseWriter, *http.Request)
|
||||
ListBucketObjectVersionsHandler(http.ResponseWriter, *http.Request)
|
||||
ListObjectsV1Handler(http.ResponseWriter, *http.Request)
|
||||
PutBucketLifecycleHandler(http.ResponseWriter, *http.Request)
|
||||
PutBucketEncryptionHandler(http.ResponseWriter, *http.Request)
|
||||
PutBucketPolicyHandler(http.ResponseWriter, *http.Request)
|
||||
PutBucketObjectLockConfigHandler(http.ResponseWriter, *http.Request)
|
||||
PutBucketTaggingHandler(http.ResponseWriter, *http.Request)
|
||||
PutBucketVersioningHandler(http.ResponseWriter, *http.Request)
|
||||
PutBucketNotificationHandler(http.ResponseWriter, *http.Request)
|
||||
PutBucketHandler(http.ResponseWriter, *http.Request)
|
||||
HeadBucketHandler(http.ResponseWriter, *http.Request)
|
||||
PostPolicyBucketHandler(http.ResponseWriter, *http.Request)
|
||||
DeleteMultipleObjectsHandler(http.ResponseWriter, *http.Request)
|
||||
DeleteBucketPolicyHandler(http.ResponseWriter, *http.Request)
|
||||
DeleteBucketLifecycleHandler(http.ResponseWriter, *http.Request)
|
||||
DeleteBucketEncryptionHandler(http.ResponseWriter, *http.Request)
|
||||
DeleteBucketHandler(http.ResponseWriter, *http.Request)
|
||||
ListBucketsHandler(http.ResponseWriter, *http.Request)
|
||||
}
|
||||
|
||||
// mimeType represents various MIME type used API responses.
|
||||
mimeType string
|
||||
)
|
||||
|
||||
const (
|
||||
// SlashSeparator - slash separator.
|
||||
SlashSeparator = "/"
|
||||
|
||||
// Means no response type.
|
||||
mimeNone mimeType = ""
|
||||
// Means response type is JSON.
|
||||
// mimeJSON mimeType = "application/json"
|
||||
// Means response type is XML.
|
||||
mimeXML mimeType = "application/xml"
|
||||
)
|
||||
|
||||
func Attach(r *mux.Router, m MaxClients, h Handler, center *auth.Center, log *zap.Logger) {
|
||||
api := r.PathPrefix(SlashSeparator).Subrouter()
|
||||
// Attach user authentication for all S3 routes.
|
||||
AttachUserAuth(api, center, log)
|
||||
|
||||
bucket := api.PathPrefix("/{bucket}").Subrouter()
|
||||
|
||||
// Object operations
|
||||
// HeadObject
|
||||
bucket.Methods(http.MethodHead).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("headobject", h.HeadObjectHandler)))
|
||||
// CopyObjectPart
|
||||
bucket.Methods(http.MethodPut).Path("/{object:.+}").HeadersRegexp(hdrAmzCopySource, ".*?(\\/|%2F).*?").HandlerFunc(m.Handle(metrics.APIStats("copyobjectpart", h.CopyObjectPartHandler))).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
|
||||
// PutObjectPart
|
||||
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("putobjectpart", h.PutObjectPartHandler))).Queries("partNumber", "{partNumber:[0-9]+}", "uploadId", "{uploadId:.*}")
|
||||
// ListObjectParts
|
||||
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("listobjectparts", h.ListObjectPartsHandler))).Queries("uploadId", "{uploadId:.*}")
|
||||
// CompleteMultipartUpload
|
||||
bucket.Methods(http.MethodPost).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("completemutipartupload", h.CompleteMultipartUploadHandler))).Queries("uploadId", "{uploadId:.*}")
|
||||
// NewMultipartUpload
|
||||
bucket.Methods(http.MethodPost).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("newmultipartupload", h.NewMultipartUploadHandler))).Queries("uploads", "")
|
||||
// AbortMultipartUpload
|
||||
bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("abortmultipartupload", h.AbortMultipartUploadHandler))).Queries("uploadId", "{uploadId:.*}")
|
||||
// GetObjectACL - this is a dummy call.
|
||||
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getobjectacl", h.GetObjectACLHandler))).Queries("acl", "")
|
||||
// PutObjectACL - this is a dummy call.
|
||||
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("putobjectacl", h.PutObjectACLHandler))).Queries("acl", "")
|
||||
// GetObjectTagging
|
||||
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getobjecttagging", h.GetObjectTaggingHandler))).Queries("tagging", "")
|
||||
// PutObjectTagging
|
||||
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("putobjecttagging", h.PutObjectTaggingHandler))).Queries("tagging", "")
|
||||
// DeleteObjectTagging
|
||||
bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("deleteobjecttagging", h.DeleteObjectTaggingHandler))).Queries("tagging", "")
|
||||
// SelectObjectContent
|
||||
bucket.Methods(http.MethodPost).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("selectobjectcontent", h.SelectObjectContentHandler))).Queries("select", "").Queries("select-type", "2")
|
||||
// GetObjectRetention
|
||||
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getobjectretention", h.GetObjectRetentionHandler))).Queries("retention", "")
|
||||
// GetObjectLegalHold
|
||||
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getobjectlegalhold", h.GetObjectLegalHoldHandler))).Queries("legal-hold", "")
|
||||
// GetObject
|
||||
bucket.Methods(http.MethodGet).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getobject", h.GetObjectHandler)))
|
||||
// CopyObject
|
||||
bucket.Methods(http.MethodPut).Path("/{object:.+}").HeadersRegexp(hdrAmzCopySource, ".*?(\\/|%2F).*?").HandlerFunc(m.Handle(metrics.APIStats("copyobject", h.CopyObjectHandler)))
|
||||
// PutObjectRetention
|
||||
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("putobjectretention", h.PutObjectRetentionHandler))).Queries("retention", "")
|
||||
// PutObjectLegalHold
|
||||
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("putobjectlegalhold", h.PutObjectLegalHoldHandler))).Queries("legal-hold", "")
|
||||
|
||||
// PutObject
|
||||
bucket.Methods(http.MethodPut).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("putobject", h.PutObjectHandler)))
|
||||
// DeleteObject
|
||||
bucket.Methods(http.MethodDelete).Path("/{object:.+}").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("deleteobject", h.DeleteObjectHandler)))
|
||||
|
||||
// Bucket operations
|
||||
// GetBucketLocation
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getbucketlocation", h.GetBucketLocationHandler))).Queries("location", "")
|
||||
// GetBucketPolicy
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getbucketpolicy", h.GetBucketPolicyHandler))).Queries("policy", "")
|
||||
// GetBucketLifecycle
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getbucketlifecycle", h.GetBucketLifecycleHandler))).Queries("lifecycle", "")
|
||||
// GetBucketEncryption
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getbucketencryption", h.GetBucketEncryptionHandler))).Queries("encryption", "")
|
||||
|
||||
// Dummy Bucket Calls
|
||||
// GetBucketACL -- this is a dummy call.
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getbucketacl", h.GetBucketACLHandler))).Queries("acl", "")
|
||||
// PutBucketACL -- this is a dummy call.
|
||||
bucket.Methods(http.MethodPut).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("putbucketacl", h.PutBucketACLHandler))).Queries("acl", "")
|
||||
// GetBucketCors - this is a dummy call.
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getbucketcors", h.GetBucketCorsHandler))).Queries("cors", "")
|
||||
// GetBucketWebsiteHandler - this is a dummy call.
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getbucketwebsite", h.GetBucketWebsiteHandler))).Queries("website", "")
|
||||
// GetBucketAccelerateHandler - this is a dummy call.
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getbucketaccelerate", h.GetBucketAccelerateHandler))).Queries("accelerate", "")
|
||||
// GetBucketRequestPaymentHandler - this is a dummy call.
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getbucketrequestpayment", h.GetBucketRequestPaymentHandler))).Queries("requestPayment", "")
|
||||
// GetBucketLoggingHandler - this is a dummy call.
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getbucketlogging", h.GetBucketLoggingHandler))).Queries("logging", "")
|
||||
// GetBucketLifecycleHandler - this is a dummy call.
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getbucketlifecycle", h.GetBucketLifecycleHandler))).Queries("lifecycle", "")
|
||||
// GetBucketReplicationHandler - this is a dummy call.
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getbucketreplication", h.GetBucketReplicationHandler))).Queries("replication", "")
|
||||
// GetBucketTaggingHandler
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getbuckettagging", h.GetBucketTaggingHandler))).Queries("tagging", "")
|
||||
// DeleteBucketWebsiteHandler
|
||||
bucket.Methods(http.MethodDelete).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("deletebucketwebsite", h.DeleteBucketWebsiteHandler))).Queries("website", "")
|
||||
// DeleteBucketTaggingHandler
|
||||
bucket.Methods(http.MethodDelete).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("deletebuckettagging", h.DeleteBucketTaggingHandler))).Queries("tagging", "")
|
||||
|
||||
// GetBucketObjectLockConfig
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getbucketobjectlockconfiguration", h.GetBucketObjectLockConfigHandler))).Queries("object-lock", "")
|
||||
// GetBucketVersioning
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getbucketversioning", h.GetBucketVersioningHandler))).Queries("versioning", "")
|
||||
// GetBucketNotification
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("getbucketnotification", h.GetBucketNotificationHandler))).Queries("notification", "")
|
||||
// ListenBucketNotification
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(metrics.APIStats("listenbucketnotification", h.ListenBucketNotificationHandler)).Queries("events", "{events:.*}")
|
||||
// ListMultipartUploads
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("listmultipartuploads", h.ListMultipartUploadsHandler))).Queries("uploads", "")
|
||||
// ListObjectsV2M
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("listobjectsv2M", h.ListObjectsV2MHandler))).Queries("list-type", "2", "metadata", "true")
|
||||
// ListObjectsV2
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("listobjectsv2", h.ListObjectsV2Handler))).Queries("list-type", "2")
|
||||
// ListBucketVersions
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("listbucketversions", h.ListBucketObjectVersionsHandler))).Queries("versions", "")
|
||||
// ListObjectsV1 (Legacy)
|
||||
bucket.Methods(http.MethodGet).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("listobjectsv1", h.ListObjectsV1Handler)))
|
||||
// PutBucketLifecycle
|
||||
bucket.Methods(http.MethodPut).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("putbucketlifecycle", h.PutBucketLifecycleHandler))).Queries("lifecycle", "")
|
||||
// PutBucketEncryption
|
||||
bucket.Methods(http.MethodPut).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("putbucketencryption", h.PutBucketEncryptionHandler))).Queries("encryption", "")
|
||||
|
||||
// PutBucketPolicy
|
||||
bucket.Methods(http.MethodPut).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("putbucketpolicy", h.PutBucketPolicyHandler))).Queries("policy", "")
|
||||
|
||||
// PutBucketObjectLockConfig
|
||||
bucket.Methods(http.MethodPut).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("putbucketobjectlockconfig", h.PutBucketObjectLockConfigHandler))).Queries("object-lock", "")
|
||||
// PutBucketTaggingHandler
|
||||
bucket.Methods(http.MethodPut).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("putbuckettagging", h.PutBucketTaggingHandler))).Queries("tagging", "")
|
||||
// PutBucketVersioning
|
||||
bucket.Methods(http.MethodPut).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("putbucketversioning", h.PutBucketVersioningHandler))).Queries("versioning", "")
|
||||
// PutBucketNotification
|
||||
bucket.Methods(http.MethodPut).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("putbucketnotification", h.PutBucketNotificationHandler))).Queries("notification", "")
|
||||
// PutBucket
|
||||
bucket.Methods(http.MethodPut).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("putbucket", h.PutBucketHandler)))
|
||||
// HeadBucket
|
||||
bucket.Methods(http.MethodHead).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("headbucket", h.HeadBucketHandler)))
|
||||
// PostPolicy
|
||||
bucket.Methods(http.MethodPost).HeadersRegexp(hdrContentType, "multipart/form-data*").HandlerFunc(
|
||||
m.Handle(metrics.APIStats("postpolicybucket", h.PostPolicyBucketHandler)))
|
||||
// DeleteMultipleObjects
|
||||
bucket.Methods(http.MethodPost).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("deletemultipleobjects", h.DeleteMultipleObjectsHandler))).Queries("delete", "")
|
||||
// DeleteBucketPolicy
|
||||
bucket.Methods(http.MethodDelete).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("deletebucketpolicy", h.DeleteBucketPolicyHandler))).Queries("policy", "")
|
||||
// DeleteBucketLifecycle
|
||||
bucket.Methods(http.MethodDelete).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("deletebucketlifecycle", h.DeleteBucketLifecycleHandler))).Queries("lifecycle", "")
|
||||
// DeleteBucketEncryption
|
||||
bucket.Methods(http.MethodDelete).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("deletebucketencryption", h.DeleteBucketEncryptionHandler))).Queries("encryption", "")
|
||||
// DeleteBucket
|
||||
bucket.Methods(http.MethodDelete).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("deletebucket", h.DeleteBucketHandler)))
|
||||
|
||||
// Root operation
|
||||
|
||||
// ListBuckets
|
||||
api.Methods(http.MethodGet).Path(SlashSeparator).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("listbuckets", h.ListBucketsHandler)))
|
||||
|
||||
// S3 browser with signature v4 adds '//' for ListBuckets request, so rather
|
||||
// than failing with UnknownAPIRequest we simply handle it for now.
|
||||
api.Methods(http.MethodGet).Path(SlashSeparator + SlashSeparator).HandlerFunc(
|
||||
m.Handle(metrics.APIStats("listbuckets", h.ListBucketsHandler)))
|
||||
|
||||
// If none of the routes match add default error handler routes
|
||||
api.NotFoundHandler = metrics.APIStats("notfound", errorResponseHandler)
|
||||
api.MethodNotAllowedHandler = metrics.APIStats("methodnotallowed", errorResponseHandler)
|
||||
}
|
25
api/user-auth.go
Normal file
25
api/user-auth.go
Normal file
|
@ -0,0 +1,25 @@
|
|||
package api
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/nspcc-dev/neofs-s3-gate/auth"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func AttachUserAuth(router *mux.Router, center *auth.Center, log *zap.Logger) {
|
||||
uamw := func(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
bearerToken, err := center.AuthenticationPassed(r)
|
||||
if err != nil {
|
||||
log.Error("failed to pass authentication", zap.Error(err))
|
||||
WriteErrorResponse(r.Context(), w, GetAPIError(ErrAccessDenied), r.URL)
|
||||
return
|
||||
}
|
||||
h.ServeHTTP(w, r.WithContext(auth.SetBearerToken(r.Context(), bearerToken)))
|
||||
|
||||
})
|
||||
}
|
||||
router.Use(uamw)
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue