forked from TrueCloudLab/frostfs-s3-gw
[#149] Move middlewares to separate package
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
37f2f468fe
commit
83cdfbee78
7 changed files with 615 additions and 221 deletions
51
api/middleware/auth.go
Normal file
51
api/middleware/auth.go
Normal file
|
@ -0,0 +1,51 @@
|
|||
package middleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// KeyWrapper is wrapper for context keys.
|
||||
type KeyWrapper string
|
||||
|
||||
// AuthHeaders is a wrapper for authentication headers of a request.
|
||||
var AuthHeaders = KeyWrapper("__context_auth_headers_key")
|
||||
|
||||
// BoxData is an ID used to store accessbox.Box in a context.
|
||||
var BoxData = KeyWrapper("__context_box_key")
|
||||
|
||||
// ClientTime is an ID used to store client time.Time in a context.
|
||||
var ClientTime = KeyWrapper("__context_client_time")
|
||||
|
||||
func Auth(center auth.Center, log *zap.Logger) Func {
|
||||
return func(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
box, err := center.Authenticate(r)
|
||||
if err != nil {
|
||||
if err == auth.ErrNoAuthorizationHeader {
|
||||
reqLogOrDefault(ctx, log).Debug("couldn't receive access box for gate key, random key will be used")
|
||||
} else {
|
||||
reqLogOrDefault(ctx, log).Error("failed to pass authentication", zap.Error(err))
|
||||
if _, ok := err.(errors.Error); !ok {
|
||||
err = errors.GetAPIError(errors.ErrAccessDenied)
|
||||
}
|
||||
WriteErrorResponse(w, GetReqInfo(r.Context()), err)
|
||||
return
|
||||
}
|
||||
} else {
|
||||
ctx = context.WithValue(ctx, BoxData, box.AccessBox)
|
||||
if !box.ClientTime.IsZero() {
|
||||
ctx = context.WithValue(ctx, ClientTime, box.ClientTime)
|
||||
}
|
||||
ctx = context.WithValue(ctx, AuthHeaders, box.AuthHeaders)
|
||||
}
|
||||
|
||||
h.ServeHTTP(w, r.WithContext(ctx))
|
||||
})
|
||||
}
|
||||
}
|
192
api/middleware/metrics.go
Normal file
192
api/middleware/metrics.go
Normal file
|
@ -0,0 +1,192 @@
|
|||
package middleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/accessbox"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
UsersStat interface {
|
||||
Update(user, bucket, cnrID string, reqType int, in, out uint64)
|
||||
}
|
||||
|
||||
readCounter struct {
|
||||
io.ReadCloser
|
||||
countBytes uint64
|
||||
}
|
||||
|
||||
writeCounter struct {
|
||||
http.ResponseWriter
|
||||
countBytes uint64
|
||||
}
|
||||
|
||||
responseWrapper struct {
|
||||
sync.Once
|
||||
http.ResponseWriter
|
||||
|
||||
statusCode int
|
||||
startTime time.Time
|
||||
}
|
||||
|
||||
// BucketResolveFunc is a func to resolve bucket info by name.
|
||||
BucketResolveFunc func(ctx context.Context, bucket string) (*data.BucketInfo, error)
|
||||
|
||||
// cidResolveFunc is a func to resolve CID in Stats handler.
|
||||
cidResolveFunc func(ctx context.Context, reqInfo *ReqInfo) (cnrID string)
|
||||
)
|
||||
|
||||
const systemPath = "/system"
|
||||
|
||||
// Metrics wraps http handler for api with basic statistics collection.
|
||||
func Metrics(log *zap.Logger, resolveBucket BucketResolveFunc, appMetrics *metrics.AppMetrics) Func {
|
||||
return func(h http.Handler) http.Handler {
|
||||
return stats(h.ServeHTTP, resolveCID(log, resolveBucket), appMetrics)
|
||||
}
|
||||
}
|
||||
|
||||
// Stats is a handler that update metrics.
|
||||
func stats(f http.HandlerFunc, resolveCID cidResolveFunc, appMetrics *metrics.AppMetrics) http.HandlerFunc {
|
||||
return func(w http.ResponseWriter, r *http.Request) {
|
||||
reqInfo := GetReqInfo(r.Context())
|
||||
|
||||
appMetrics.Statistic().CurrentS3RequestsInc(reqInfo.API)
|
||||
defer appMetrics.Statistic().CurrentS3RequestsDec(reqInfo.API)
|
||||
|
||||
in := &readCounter{ReadCloser: r.Body}
|
||||
out := &writeCounter{ResponseWriter: w}
|
||||
|
||||
r.Body = in
|
||||
|
||||
statsWriter := &responseWrapper{
|
||||
ResponseWriter: out,
|
||||
startTime: time.Now(),
|
||||
}
|
||||
|
||||
f(statsWriter, r)
|
||||
|
||||
// Time duration in secs since the call started.
|
||||
// We don't need to do nanosecond precision here
|
||||
// simply for the fact that it is not human-readable.
|
||||
durationSecs := time.Since(statsWriter.startTime).Seconds()
|
||||
|
||||
user := resolveUser(r.Context())
|
||||
cnrID := resolveCID(r.Context(), reqInfo)
|
||||
appMetrics.Update(user, reqInfo.BucketName, cnrID, requestTypeFromAPI(reqInfo.API), in.countBytes, out.countBytes)
|
||||
|
||||
code := statsWriter.statusCode
|
||||
// A successful request has a 2xx response code
|
||||
successReq := code >= http.StatusOK && code < http.StatusMultipleChoices
|
||||
if !strings.HasSuffix(r.URL.Path, systemPath) {
|
||||
appMetrics.Statistic().TotalS3RequestsInc(reqInfo.API)
|
||||
if !successReq && code != 0 {
|
||||
appMetrics.Statistic().TotalS3ErrorsInc(reqInfo.API)
|
||||
}
|
||||
}
|
||||
|
||||
if r.Method == http.MethodGet {
|
||||
// Increment the prometheus http request response histogram with appropriate label
|
||||
appMetrics.Statistic().RequestDurationsUpdate(reqInfo.API, durationSecs)
|
||||
}
|
||||
|
||||
appMetrics.Statistic().TotalInputBytesAdd(in.countBytes)
|
||||
appMetrics.Statistic().TotalOutputBytesAdd(out.countBytes)
|
||||
}
|
||||
}
|
||||
|
||||
func requestTypeFromAPI(api string) metrics.RequestType {
|
||||
switch api {
|
||||
case "Options", "HeadObject", "HeadBucket":
|
||||
return metrics.HEADRequest
|
||||
case "CreateMultipartUpload", "UploadPartCopy", "UploadPart", "CompleteMultipartUpload",
|
||||
"PutObjectACL", "PutObjectTagging", "CopyObject", "PutObjectRetention", "PutObjectLegalHold",
|
||||
"PutObject", "PutBucketCors", "PutBucketACL", "PutBucketLifecycle", "PutBucketEncryption",
|
||||
"PutBucketPolicy", "PutBucketObjectLockConfig", "PutBucketTagging", "PutBucketVersioning",
|
||||
"PutBucketNotification", "CreateBucket", "PostObject":
|
||||
return metrics.PUTRequest
|
||||
case "ListObjectParts", "ListMultipartUploads", "ListObjectsV2M", "ListObjectsV2", "ListBucketVersions",
|
||||
"ListObjectsV1", "ListBuckets":
|
||||
return metrics.LISTRequest
|
||||
case "GetObjectACL", "GetObjectTagging", "SelectObjectContent", "GetObjectRetention", "getobjectlegalhold",
|
||||
"GetObjectAttributes", "GetObject", "GetBucketLocation", "GetBucketPolicy",
|
||||
"GetBucketLifecycle", "GetBucketEncryption", "GetBucketCors", "GetBucketACL",
|
||||
"GetBucketWebsite", "GetBucketAccelerate", "GetBucketRequestPayment", "GetBucketLogging",
|
||||
"GetBucketReplication", "GetBucketTagging", "GetBucketObjectLockConfig",
|
||||
"GetBucketVersioning", "GetBucketNotification", "ListenBucketNotification":
|
||||
return metrics.GETRequest
|
||||
case "AbortMultipartUpload", "DeleteObjectTagging", "DeleteObject", "DeleteBucketCors",
|
||||
"DeleteBucketWebsite", "DeleteBucketTagging", "DeleteMultipleObjects", "DeleteBucketPolicy",
|
||||
"DeleteBucketLifecycle", "DeleteBucketEncryption", "DeleteBucket":
|
||||
return metrics.DELETERequest
|
||||
default:
|
||||
return metrics.UNKNOWNRequest
|
||||
}
|
||||
}
|
||||
|
||||
// resolveCID forms CIDResolveFunc using BucketResolveFunc.
|
||||
func resolveCID(log *zap.Logger, resolveBucket BucketResolveFunc) cidResolveFunc {
|
||||
return func(ctx context.Context, reqInfo *ReqInfo) (cnrID string) {
|
||||
if reqInfo.BucketName == "" || reqInfo.API == "CreateBucket" || reqInfo.API == "" {
|
||||
return ""
|
||||
}
|
||||
|
||||
bktInfo, err := resolveBucket(ctx, reqInfo.BucketName)
|
||||
if err != nil {
|
||||
reqLogOrDefault(ctx, log).Debug("failed to resolve CID", zap.Error(err))
|
||||
return ""
|
||||
}
|
||||
|
||||
return bktInfo.CID.EncodeToString()
|
||||
}
|
||||
}
|
||||
|
||||
func resolveUser(ctx context.Context) string {
|
||||
user := "anon"
|
||||
if bd, ok := ctx.Value(BoxData).(*accessbox.Box); ok && bd != nil && bd.Gate != nil && bd.Gate.BearerToken != nil {
|
||||
user = bearer.ResolveIssuer(*bd.Gate.BearerToken).String()
|
||||
}
|
||||
return user
|
||||
}
|
||||
|
||||
// WriteHeader -- writes http status code.
|
||||
func (w *responseWrapper) WriteHeader(code int) {
|
||||
w.Do(func() {
|
||||
w.statusCode = code
|
||||
w.ResponseWriter.WriteHeader(code)
|
||||
})
|
||||
}
|
||||
|
||||
// Flush -- calls the underlying Flush.
|
||||
func (w *responseWrapper) Flush() {
|
||||
if f, ok := w.ResponseWriter.(http.Flusher); ok {
|
||||
f.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
func (w *writeCounter) Flush() {
|
||||
if f, ok := w.ResponseWriter.(http.Flusher); ok {
|
||||
f.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
func (w *writeCounter) Write(p []byte) (int, error) {
|
||||
n, err := w.ResponseWriter.Write(p)
|
||||
atomic.AddUint64(&w.countBytes, uint64(n))
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (r *readCounter) Read(p []byte) (int, error) {
|
||||
n, err := r.ReadCloser.Read(p)
|
||||
atomic.AddUint64(&r.countBytes, uint64(n))
|
||||
return n, err
|
||||
}
|
27
api/middleware/middleware.go
Normal file
27
api/middleware/middleware.go
Normal file
|
@ -0,0 +1,27 @@
|
|||
package middleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type Func func(h http.Handler) http.Handler
|
||||
|
||||
func WrapHandler(handler http.HandlerFunc) Func {
|
||||
return func(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
handler(w, r)
|
||||
h.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func reqLogOrDefault(ctx context.Context, log *zap.Logger) *zap.Logger {
|
||||
reqLog := GetReqLog(ctx)
|
||||
if reqLog != nil {
|
||||
return reqLog
|
||||
}
|
||||
return log
|
||||
}
|
306
api/middleware/reqinfo.go
Normal file
306
api/middleware/reqinfo.go
Normal file
|
@ -0,0 +1,306 @@
|
|||
package middleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/google/uuid"
|
||||
"go.uber.org/zap"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
type (
|
||||
// KeyVal -- appended to ReqInfo.Tags.
|
||||
KeyVal struct {
|
||||
Key string
|
||||
Val string
|
||||
}
|
||||
|
||||
// ReqInfo stores the request info.
|
||||
ReqInfo struct {
|
||||
sync.RWMutex
|
||||
RemoteHost string // Client Host/IP
|
||||
Host string // Node Host/IP
|
||||
UserAgent string // User Agent
|
||||
DeploymentID string // random generated s3-deployment-id
|
||||
RequestID string // x-amz-request-id
|
||||
API string // API name -- GetObject PutObject NewMultipartUpload etc.
|
||||
BucketName string // Bucket name
|
||||
ObjectName string // Object name
|
||||
URL *url.URL // Request url
|
||||
tags []KeyVal // Any additional info not accommodated by above fields
|
||||
}
|
||||
|
||||
// ObjectRequest represents object request data.
|
||||
ObjectRequest struct {
|
||||
Bucket string
|
||||
Object string
|
||||
Method string
|
||||
}
|
||||
|
||||
// Key used for custom key/value in context.
|
||||
contextKeyType string
|
||||
)
|
||||
|
||||
const (
|
||||
ctxRequestInfo = contextKeyType("FrostFS-S3-GW")
|
||||
ctxRequestLogger = contextKeyType("FrostFS-S3-GW-Logger")
|
||||
)
|
||||
|
||||
const HdrAmzRequestID = "x-amz-request-id"
|
||||
|
||||
const (
|
||||
BucketURLPrm = "bucket"
|
||||
ObjectURLPrm = "object"
|
||||
)
|
||||
|
||||
var deploymentID = uuid.Must(uuid.NewRandom())
|
||||
|
||||
var (
|
||||
// De-facto standard header keys.
|
||||
xForwardedFor = http.CanonicalHeaderKey("X-Forwarded-For")
|
||||
xRealIP = http.CanonicalHeaderKey("X-Real-IP")
|
||||
|
||||
// RFC7239 defines a new "Forwarded: " header designed to replace the
|
||||
// existing use of X-Forwarded-* headers.
|
||||
// e.g. Forwarded: for=192.0.2.60;proto=https;by=203.0.113.43.
|
||||
forwarded = http.CanonicalHeaderKey("Forwarded")
|
||||
// Allows for a sub-match of the first value after 'for=' to the next
|
||||
// comma, semi-colon or space. The match is case-insensitive.
|
||||
forRegex = regexp.MustCompile(`(?i)(?:for=)([^(;|, )]+)(.*)`)
|
||||
)
|
||||
|
||||
// NewReqInfo returns new ReqInfo based on parameters.
|
||||
func NewReqInfo(w http.ResponseWriter, r *http.Request, req ObjectRequest) *ReqInfo {
|
||||
return &ReqInfo{
|
||||
API: req.Method,
|
||||
BucketName: req.Bucket,
|
||||
ObjectName: req.Object,
|
||||
UserAgent: r.UserAgent(),
|
||||
RemoteHost: getSourceIP(r),
|
||||
RequestID: GetRequestID(w),
|
||||
DeploymentID: deploymentID.String(),
|
||||
URL: r.URL,
|
||||
}
|
||||
}
|
||||
|
||||
// AppendTags -- appends key/val to ReqInfo.tags.
|
||||
func (r *ReqInfo) AppendTags(key string, val string) *ReqInfo {
|
||||
if r == nil {
|
||||
return nil
|
||||
}
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
r.tags = append(r.tags, KeyVal{key, val})
|
||||
return r
|
||||
}
|
||||
|
||||
// SetTags -- sets key/val to ReqInfo.tags.
|
||||
func (r *ReqInfo) SetTags(key string, val string) *ReqInfo {
|
||||
if r == nil {
|
||||
return nil
|
||||
}
|
||||
r.Lock()
|
||||
defer r.Unlock()
|
||||
// Search for a tag key already existing in tags
|
||||
var updated bool
|
||||
for _, tag := range r.tags {
|
||||
if tag.Key == key {
|
||||
tag.Val = val
|
||||
updated = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !updated {
|
||||
// Append to the end of tags list
|
||||
r.tags = append(r.tags, KeyVal{key, val})
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
// GetTags -- returns the user defined tags.
|
||||
func (r *ReqInfo) GetTags() []KeyVal {
|
||||
if r == nil {
|
||||
return nil
|
||||
}
|
||||
r.RLock()
|
||||
defer r.RUnlock()
|
||||
return append([]KeyVal(nil), r.tags...)
|
||||
}
|
||||
|
||||
// GetRequestID returns the request ID from the response writer or the context.
|
||||
func GetRequestID(v interface{}) string {
|
||||
switch t := v.(type) {
|
||||
case context.Context:
|
||||
return GetReqInfo(t).RequestID
|
||||
case http.ResponseWriter:
|
||||
return t.Header().Get(HdrAmzRequestID)
|
||||
default:
|
||||
panic("unknown type")
|
||||
}
|
||||
}
|
||||
|
||||
// SetReqInfo sets ReqInfo in the context.
|
||||
func SetReqInfo(ctx context.Context, req *ReqInfo) context.Context {
|
||||
if ctx == nil {
|
||||
return nil
|
||||
}
|
||||
return context.WithValue(ctx, ctxRequestInfo, req)
|
||||
}
|
||||
|
||||
// GetReqInfo returns ReqInfo if set.
|
||||
// If ReqInfo isn't set returns new empty ReqInfo.
|
||||
func GetReqInfo(ctx context.Context) *ReqInfo {
|
||||
if ctx == nil {
|
||||
return &ReqInfo{}
|
||||
} else if r, ok := ctx.Value(ctxRequestInfo).(*ReqInfo); ok {
|
||||
return r
|
||||
}
|
||||
return &ReqInfo{}
|
||||
}
|
||||
|
||||
// SetReqLogger sets child zap.Logger in the context.
|
||||
func SetReqLogger(ctx context.Context, log *zap.Logger) context.Context {
|
||||
if ctx == nil {
|
||||
return nil
|
||||
}
|
||||
return context.WithValue(ctx, ctxRequestLogger, log)
|
||||
}
|
||||
|
||||
// GetReqLog returns log if set.
|
||||
// If zap.Logger isn't set returns nil.
|
||||
func GetReqLog(ctx context.Context) *zap.Logger {
|
||||
if ctx == nil {
|
||||
return nil
|
||||
} else if r, ok := ctx.Value(ctxRequestLogger).(*zap.Logger); ok {
|
||||
return r
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func Request(log *zap.Logger) Func {
|
||||
return func(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// generate random UUIDv4
|
||||
id, _ := uuid.NewRandom()
|
||||
|
||||
// set request id into response header
|
||||
// also we have to set request id here
|
||||
// to be able to get it in NewReqInfo
|
||||
w.Header().Set(HdrAmzRequestID, id.String())
|
||||
|
||||
// set request info into context
|
||||
// bucket name and object will be set in reqInfo later (limitation of go-chi)
|
||||
reqInfo := NewReqInfo(w, r, ObjectRequest{})
|
||||
r = r.WithContext(SetReqInfo(r.Context(), reqInfo))
|
||||
|
||||
// set request id into gRPC meta header
|
||||
r = r.WithContext(metadata.AppendToOutgoingContext(
|
||||
r.Context(), HdrAmzRequestID, reqInfo.RequestID,
|
||||
))
|
||||
|
||||
reqLogger := log.With(zap.String("request_id", reqInfo.RequestID))
|
||||
r = r.WithContext(SetReqLogger(r.Context(), reqLogger))
|
||||
|
||||
reqLogger.Info("request start", zap.String("host", r.Host),
|
||||
zap.String("remote_host", reqInfo.RemoteHost))
|
||||
|
||||
// continue execution
|
||||
h.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// AddBucketName adds bucket name to ReqInfo from context.
|
||||
func AddBucketName(l *zap.Logger) Func {
|
||||
return func(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
reqInfo := GetReqInfo(ctx)
|
||||
reqInfo.BucketName = chi.URLParam(r, BucketURLPrm)
|
||||
|
||||
reqLogger := reqLogOrDefault(ctx, l)
|
||||
r = r.WithContext(SetReqLogger(ctx, reqLogger.With(zap.String("bucket", reqInfo.BucketName))))
|
||||
|
||||
h.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// AddObjectName adds objects name to ReqInfo from context.
|
||||
func AddObjectName(l *zap.Logger) Func {
|
||||
return func(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
obj := chi.URLParam(r, ObjectURLPrm)
|
||||
object, err := url.PathUnescape(obj)
|
||||
if err != nil {
|
||||
object = obj
|
||||
}
|
||||
prefix, err := url.QueryUnescape(chi.URLParam(r, "prefix"))
|
||||
if err != nil {
|
||||
prefix = chi.URLParam(r, "prefix")
|
||||
}
|
||||
if prefix != "" {
|
||||
object = prefix
|
||||
}
|
||||
|
||||
ctx := r.Context()
|
||||
|
||||
reqInfo := GetReqInfo(ctx)
|
||||
reqInfo.ObjectName = object
|
||||
|
||||
reqLogger := reqLogOrDefault(ctx, l)
|
||||
r = r.WithContext(SetReqLogger(ctx, reqLogger.With(zap.String("object", reqInfo.ObjectName))))
|
||||
|
||||
h.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// getSourceIP retrieves the IP from the X-Forwarded-For, X-Real-IP and RFC7239
|
||||
// Forwarded headers (in that order), falls back to r.RemoteAddr when everything
|
||||
// else fails.
|
||||
func getSourceIP(r *http.Request) string {
|
||||
var addr string
|
||||
|
||||
if fwd := r.Header.Get(xForwardedFor); fwd != "" {
|
||||
// Only grabs the first (client) address. Note that '192.168.0.1,
|
||||
// 10.1.1.1' is a valid key for X-Forwarded-For where addresses after
|
||||
// the first one may represent forwarding proxies earlier in the chain.
|
||||
s := strings.Index(fwd, ", ")
|
||||
if s == -1 {
|
||||
s = len(fwd)
|
||||
}
|
||||
addr = fwd[:s]
|
||||
} else if fwd := r.Header.Get(xRealIP); fwd != "" {
|
||||
// X-Real-IP should only contain one IP address (the client making the
|
||||
// request).
|
||||
addr = fwd
|
||||
} else if fwd := r.Header.Get(forwarded); fwd != "" {
|
||||
// match should contain at least two elements if the protocol was
|
||||
// specified in the Forwarded header. The first element will always be
|
||||
// the 'for=' capture, which we ignore. In the case of multiple IP
|
||||
// addresses (for=8.8.8.8, 8.8.4.4, 172.16.1.20 is valid) we only
|
||||
// extract the first, which should be the client IP.
|
||||
if match := forRegex.FindStringSubmatch(fwd); len(match) > 1 {
|
||||
// IPv6 addresses in Forwarded headers are quoted-strings. We strip
|
||||
// these quotes.
|
||||
addr = strings.Trim(match[1], `"`)
|
||||
}
|
||||
}
|
||||
|
||||
if addr != "" {
|
||||
return addr
|
||||
}
|
||||
|
||||
// Default to remote address if headers not set.
|
||||
addr, _, _ = net.SplitHostPort(r.RemoteAddr)
|
||||
return addr
|
||||
}
|
331
api/middleware/response.go
Normal file
331
api/middleware/response.go
Normal file
|
@ -0,0 +1,331 @@
|
|||
package middleware
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
// ErrorResponse -- error response format.
|
||||
ErrorResponse struct {
|
||||
XMLName xml.Name `xml:"Error" json:"-"`
|
||||
Code string
|
||||
Message string
|
||||
Key string `xml:"Key,omitempty" json:"Key,omitempty"`
|
||||
BucketName string `xml:"BucketName,omitempty" json:"BucketName,omitempty"`
|
||||
Resource string
|
||||
RequestID string `xml:"RequestId" json:"RequestId"`
|
||||
HostID string `xml:"HostId" json:"HostId"`
|
||||
|
||||
// The region where the bucket is located. This header is returned
|
||||
// only in HEAD bucket and ListObjects response.
|
||||
Region string `xml:"Region,omitempty" json:"Region,omitempty"`
|
||||
|
||||
// Captures the server string returned in response header.
|
||||
Server string `xml:"-" json:"-"`
|
||||
|
||||
// Underlying HTTP status code for the returned error.
|
||||
StatusCode int `xml:"-" json:"-"`
|
||||
}
|
||||
|
||||
// mimeType represents various MIME types used in API responses.
|
||||
mimeType string
|
||||
)
|
||||
|
||||
const (
|
||||
|
||||
// MimeNone means no response type.
|
||||
MimeNone mimeType = ""
|
||||
|
||||
// MimeXML means response type is XML.
|
||||
MimeXML mimeType = "application/xml"
|
||||
|
||||
hdrServerInfo = "Server"
|
||||
hdrAcceptRanges = "Accept-Ranges"
|
||||
hdrContentType = "Content-Type"
|
||||
hdrContentLength = "Content-Length"
|
||||
hdrRetryAfter = "Retry-After"
|
||||
|
||||
// Response request id.
|
||||
|
||||
// hdrSSE is the general AWS SSE HTTP header key.
|
||||
hdrSSE = "X-Amz-Server-Side-Encryption"
|
||||
|
||||
// hdrSSECustomerKey is the HTTP header key referencing the
|
||||
// SSE-C client-provided key..
|
||||
hdrSSECustomerKey = hdrSSE + "-Customer-Key"
|
||||
|
||||
// hdrSSECopyKey is the HTTP header key referencing the SSE-C
|
||||
// client-provided key for SSE-C copy requests.
|
||||
hdrSSECopyKey = "X-Amz-Copy-Source-Server-Side-Encryption-Customer-Key"
|
||||
)
|
||||
|
||||
var (
|
||||
xmlHeader = []byte(xml.Header)
|
||||
)
|
||||
|
||||
// Non exhaustive list of AWS S3 standard error responses -
|
||||
// http://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
|
||||
var s3ErrorResponseMap = map[string]string{
|
||||
"AccessDenied": "Access Denied.",
|
||||
"BadDigest": "The Content-Md5 you specified did not match what we received.",
|
||||
"EntityTooSmall": "Your proposed upload is smaller than the minimum allowed object size.",
|
||||
"EntityTooLarge": "Your proposed upload exceeds the maximum allowed object size.",
|
||||
"IncompleteBody": "You did not provide the number of bytes specified by the Content-Length HTTP header.",
|
||||
"InternalError": "We encountered an internal error, please try again.",
|
||||
"InvalidAccessKeyId": "The access key ID you provided does not exist in our records.",
|
||||
"InvalidBucketName": "The specified bucket is not valid.",
|
||||
"InvalidDigest": "The Content-Md5 you specified is not valid.",
|
||||
"InvalidRange": "The requested range is not satisfiable",
|
||||
"MalformedXML": "The XML you provided was not well-formed or did not validate against our published schema.",
|
||||
"MissingContentLength": "You must provide the Content-Length HTTP header.",
|
||||
"MissingContentMD5": "Missing required header for this request: Content-Md5.",
|
||||
"MissingRequestBodyError": "Request body is empty.",
|
||||
"NoSuchBucket": "The specified bucket does not exist.",
|
||||
"NoSuchBucketPolicy": "The bucket policy does not exist",
|
||||
"NoSuchKey": "The specified key does not exist.",
|
||||
"NoSuchUpload": "The specified multipart upload does not exist. The upload ID may be invalid, or the upload may have been aborted or completed.",
|
||||
"NotImplemented": "A header you provided implies functionality that is not implemented",
|
||||
"PreconditionFailed": "At least one of the pre-conditions you specified did not hold",
|
||||
"RequestTimeTooSkewed": "The difference between the request time and the server's time is too large.",
|
||||
"SignatureDoesNotMatch": "The request signature we calculated does not match the signature you provided. Check your key and signing method.",
|
||||
"MethodNotAllowed": "The specified method is not allowed against this resource.",
|
||||
"InvalidPart": "One or more of the specified parts could not be found.",
|
||||
"InvalidPartOrder": "The list of parts was not in ascending order. The parts list must be specified in order by part number.",
|
||||
"InvalidObjectState": "The operation is not valid for the current state of the object.",
|
||||
"AuthorizationHeaderMalformed": "The authorization header is malformed; the region is wrong.",
|
||||
"MalformedPOSTRequest": "The body of your POST request is not well-formed multipart/form-data.",
|
||||
"BucketNotEmpty": "The bucket you tried to delete is not empty",
|
||||
"AllAccessDisabled": "All access to this bucket has been disabled.",
|
||||
"MalformedPolicy": "Policy has invalid resource.",
|
||||
"MissingFields": "Missing fields in request.",
|
||||
"AuthorizationQueryParametersError": "Error parsing the X-Amz-Credential parameter; the Credential is mal-formed; expecting \"<YOUR-AKID>/YYYYMMDD/REGION/SERVICE/aws4_request\".",
|
||||
"MalformedDate": "Invalid date format header, expected to be in ISO8601, RFC1123 or RFC1123Z time format.",
|
||||
"BucketAlreadyOwnedByYou": "Your previous request to create the named bucket succeeded and you already own it.",
|
||||
"InvalidDuration": "Duration provided in the request is invalid.",
|
||||
"XAmzContentSHA256Mismatch": "The provided 'x-amz-content-sha256' header does not match what was computed.",
|
||||
// Add new API errors here.
|
||||
}
|
||||
|
||||
// WriteErrorResponse writes error headers.
|
||||
func WriteErrorResponse(w http.ResponseWriter, reqInfo *ReqInfo, err error) int {
|
||||
code := http.StatusInternalServerError
|
||||
|
||||
if e, ok := err.(errors.Error); ok {
|
||||
code = e.HTTPStatusCode
|
||||
|
||||
switch e.Code {
|
||||
case "SlowDown", "XFrostFSServerNotInitialized", "XFrostFSReadQuorum", "XFrostFSWriteQuorum":
|
||||
// Set retry-after header to indicate user-agents to retry request after 120secs.
|
||||
// https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Retry-After
|
||||
w.Header().Set(hdrRetryAfter, "120")
|
||||
case "AccessDenied":
|
||||
// TODO process when the request is from browser and also if browser
|
||||
}
|
||||
}
|
||||
|
||||
// Generates error response.
|
||||
errorResponse := getAPIErrorResponse(reqInfo, err)
|
||||
encodedErrorResponse := EncodeResponse(errorResponse)
|
||||
WriteResponse(w, code, encodedErrorResponse, MimeXML)
|
||||
return code
|
||||
}
|
||||
|
||||
// WriteErrorResponseNoHeader writes XML encoded error to the response body.
|
||||
func WriteErrorResponseNoHeader(w http.ResponseWriter, reqInfo *ReqInfo, err error) {
|
||||
errorResponse := getAPIErrorResponse(reqInfo, err)
|
||||
encodedErrorResponse := EncodeResponse(errorResponse)
|
||||
WriteResponseBody(w, encodedErrorResponse)
|
||||
}
|
||||
|
||||
// Write http common headers.
|
||||
func setCommonHeaders(w http.ResponseWriter) {
|
||||
w.Header().Set(hdrServerInfo, version.Server)
|
||||
w.Header().Set(hdrAcceptRanges, "bytes")
|
||||
|
||||
// Remove sensitive information
|
||||
removeSensitiveHeaders(w.Header())
|
||||
}
|
||||
|
||||
// removeSensitiveHeaders removes confidential encryption
|
||||
// information -- e.g. the SSE-C key -- from the HTTP headers.
|
||||
// It has the same semantics as RemoveSensitiveEntries.
|
||||
func removeSensitiveHeaders(h http.Header) {
|
||||
h.Del(hdrSSECustomerKey)
|
||||
h.Del(hdrSSECopyKey)
|
||||
}
|
||||
|
||||
// WriteResponse writes given statusCode and response into w (with mType header if set).
|
||||
func WriteResponse(w http.ResponseWriter, statusCode int, response []byte, mType mimeType) {
|
||||
setCommonHeaders(w)
|
||||
if mType != MimeNone {
|
||||
w.Header().Set(hdrContentType, string(mType))
|
||||
}
|
||||
w.Header().Set(hdrContentLength, strconv.Itoa(len(response)))
|
||||
w.WriteHeader(statusCode)
|
||||
if response == nil {
|
||||
return
|
||||
}
|
||||
|
||||
WriteResponseBody(w, response)
|
||||
}
|
||||
|
||||
// WriteResponseBody writes response into w.
|
||||
func WriteResponseBody(w http.ResponseWriter, response []byte) {
|
||||
_, _ = w.Write(response)
|
||||
if flusher, ok := w.(http.Flusher); ok {
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
// EncodeResponse encodes the response headers into XML format.
|
||||
func EncodeResponse(response interface{}) []byte {
|
||||
var bytesBuffer bytes.Buffer
|
||||
bytesBuffer.WriteString(xml.Header)
|
||||
_ = xml.
|
||||
NewEncoder(&bytesBuffer).
|
||||
Encode(response)
|
||||
return bytesBuffer.Bytes()
|
||||
}
|
||||
|
||||
// EncodeResponseNoHeader encodes response without setting xml.Header.
|
||||
// Should be used with periodicXMLWriter which sends xml.Header to the client
|
||||
// with whitespaces to keep connection alive.
|
||||
func EncodeResponseNoHeader(response interface{}) []byte {
|
||||
var bytesBuffer bytes.Buffer
|
||||
_ = xml.NewEncoder(&bytesBuffer).Encode(response)
|
||||
return bytesBuffer.Bytes()
|
||||
}
|
||||
|
||||
// EncodeToResponse encodes the response into ResponseWriter.
|
||||
func EncodeToResponse(w http.ResponseWriter, response interface{}) error {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
||||
if _, err := w.Write(xmlHeader); err != nil {
|
||||
return fmt.Errorf("write headers: %w", err)
|
||||
}
|
||||
|
||||
return EncodeToResponseNoHeader(w, response)
|
||||
}
|
||||
|
||||
// EncodeToResponseNoHeader encodes the response into ResponseWriter without
|
||||
// header status.
|
||||
func EncodeToResponseNoHeader(w http.ResponseWriter, response interface{}) error {
|
||||
if err := xml.NewEncoder(w).Encode(response); err != nil {
|
||||
return fmt.Errorf("encode xml response: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// // WriteSuccessResponseXML writes success headers and response if any,
|
||||
// // with content-type set to `application/xml`.
|
||||
// func WriteSuccessResponseXML(w http.ResponseWriter, response []byte) {
|
||||
// WriteResponse(w, http.StatusOK, response, MimeXML)
|
||||
// }
|
||||
|
||||
// WriteSuccessResponseHeadersOnly writes HTTP (200) OK response with no data
|
||||
// to the client.
|
||||
func WriteSuccessResponseHeadersOnly(w http.ResponseWriter) {
|
||||
WriteResponse(w, http.StatusOK, nil, MimeNone)
|
||||
}
|
||||
|
||||
// Error -- Returns S3 error string.
|
||||
func (e ErrorResponse) Error() string {
|
||||
if e.Message == "" {
|
||||
msg, ok := s3ErrorResponseMap[e.Code]
|
||||
if !ok {
|
||||
msg = fmt.Sprintf("Error response code %s.", e.Code)
|
||||
}
|
||||
return msg
|
||||
}
|
||||
return e.Message
|
||||
}
|
||||
|
||||
// getErrorResponse gets in standard error and resource value and
|
||||
// provides an encodable populated response values.
|
||||
func getAPIErrorResponse(info *ReqInfo, err error) ErrorResponse {
|
||||
code := "InternalError"
|
||||
desc := err.Error()
|
||||
|
||||
if e, ok := err.(errors.Error); ok {
|
||||
code = e.Code
|
||||
desc = e.Description
|
||||
}
|
||||
|
||||
var resource string
|
||||
if info.URL != nil {
|
||||
resource = info.URL.Path
|
||||
}
|
||||
|
||||
return ErrorResponse{
|
||||
Code: code,
|
||||
Message: desc,
|
||||
BucketName: info.BucketName,
|
||||
Key: info.ObjectName,
|
||||
Resource: resource,
|
||||
RequestID: info.RequestID,
|
||||
HostID: info.DeploymentID,
|
||||
}
|
||||
}
|
||||
|
||||
type logResponseWriter struct {
|
||||
sync.Once
|
||||
http.ResponseWriter
|
||||
|
||||
statusCode int
|
||||
}
|
||||
|
||||
func (lrw *logResponseWriter) WriteHeader(code int) {
|
||||
lrw.Do(func() {
|
||||
lrw.statusCode = code
|
||||
lrw.ResponseWriter.WriteHeader(code)
|
||||
})
|
||||
}
|
||||
|
||||
func (lrw *logResponseWriter) Flush() {
|
||||
if f, ok := lrw.ResponseWriter.(http.Flusher); ok {
|
||||
f.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
func LogSuccessResponse(l *zap.Logger) Func {
|
||||
return func(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
lw := &logResponseWriter{ResponseWriter: w}
|
||||
|
||||
// here reqInfo doesn't contain bucket name and object name
|
||||
|
||||
// pass execution:
|
||||
h.ServeHTTP(lw, r)
|
||||
|
||||
// here reqInfo contains bucket name and object name because of
|
||||
// addBucketName and addObjectName middlewares
|
||||
|
||||
// Ignore >400 status codes
|
||||
if lw.statusCode >= http.StatusBadRequest {
|
||||
return
|
||||
}
|
||||
|
||||
ctx := r.Context()
|
||||
reqLogger := reqLogOrDefault(ctx, l)
|
||||
reqInfo := GetReqInfo(ctx)
|
||||
|
||||
reqLogger.Info("request end",
|
||||
zap.String("method", reqInfo.API),
|
||||
zap.String("bucket", reqInfo.BucketName),
|
||||
zap.String("object", reqInfo.ObjectName),
|
||||
zap.Int("status", lw.statusCode),
|
||||
zap.String("description", http.StatusText(lw.statusCode)),
|
||||
)
|
||||
})
|
||||
}
|
||||
}
|
120
api/middleware/tracing.go
Normal file
120
api/middleware/tracing.go
Normal file
|
@ -0,0 +1,120 @@
|
|||
package middleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// Tracing adds tracing support for requests.
|
||||
// Must be placed after prepareRequest middleware.
|
||||
func Tracing() Func {
|
||||
return func(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
appCtx, span := StartHTTPServerSpan(r, "REQUEST S3")
|
||||
lw := &traceResponseWriter{ResponseWriter: w, ctx: appCtx, span: span}
|
||||
h.ServeHTTP(lw, r.WithContext(appCtx))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type traceResponseWriter struct {
|
||||
sync.Once
|
||||
http.ResponseWriter
|
||||
|
||||
ctx context.Context
|
||||
span trace.Span
|
||||
}
|
||||
|
||||
func (lrw *traceResponseWriter) WriteHeader(code int) {
|
||||
lrw.Do(func() {
|
||||
lrw.span.SetAttributes(
|
||||
semconv.HTTPStatusCode(code),
|
||||
)
|
||||
|
||||
carrier := &httpResponseCarrier{resp: lrw.ResponseWriter}
|
||||
tracing.Propagator.Inject(lrw.ctx, carrier)
|
||||
|
||||
lrw.ResponseWriter.WriteHeader(code)
|
||||
lrw.span.End()
|
||||
})
|
||||
}
|
||||
|
||||
func (lrw *traceResponseWriter) Flush() {
|
||||
if f, ok := lrw.ResponseWriter.(http.Flusher); ok {
|
||||
f.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
type httpResponseCarrier struct {
|
||||
resp http.ResponseWriter
|
||||
}
|
||||
|
||||
func (h httpResponseCarrier) Get(key string) string {
|
||||
return h.resp.Header().Get(key)
|
||||
}
|
||||
|
||||
func (h httpResponseCarrier) Set(key string, value string) {
|
||||
h.resp.Header().Set(key, value)
|
||||
}
|
||||
|
||||
func (h httpResponseCarrier) Keys() []string {
|
||||
result := make([]string, 0, len(h.resp.Header()))
|
||||
for key := range h.resp.Header() {
|
||||
result = append(result, key)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
type httpRequestCarrier struct {
|
||||
req *http.Request
|
||||
}
|
||||
|
||||
func (c *httpRequestCarrier) Get(key string) string {
|
||||
bytes := c.req.Header.Get(key)
|
||||
if len(bytes) == 0 {
|
||||
return ""
|
||||
}
|
||||
return bytes
|
||||
}
|
||||
|
||||
func (c *httpRequestCarrier) Set(key string, value string) {
|
||||
c.req.Response.Header.Set(key, value)
|
||||
}
|
||||
|
||||
func (c *httpRequestCarrier) Keys() []string {
|
||||
result := make([]string, 0, len(c.req.Header))
|
||||
for key := range c.req.Header {
|
||||
result = append(result, key)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func extractHTTPTraceInfo(ctx context.Context, req *http.Request) context.Context {
|
||||
if req == nil {
|
||||
return ctx
|
||||
}
|
||||
carrier := &httpRequestCarrier{req: req}
|
||||
return tracing.Propagator.Extract(ctx, carrier)
|
||||
}
|
||||
|
||||
// StartHTTPServerSpan starts root HTTP server span.
|
||||
func StartHTTPServerSpan(r *http.Request, operationName string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
|
||||
ctx := extractHTTPTraceInfo(r.Context(), r)
|
||||
opts = append(opts, trace.WithAttributes(
|
||||
attribute.String("s3.client_address", r.RemoteAddr),
|
||||
attribute.String("s3.path", r.Host),
|
||||
attribute.String("s3.request_id", GetRequestID(r.Context())),
|
||||
semconv.HTTPMethod(r.Method),
|
||||
semconv.RPCService("frostfs-s3-gw"),
|
||||
attribute.String("s3.query", r.RequestURI),
|
||||
), trace.WithSpanKind(trace.SpanKindServer))
|
||||
return tracing.StartSpanFromContext(ctx, operationName, opts...)
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue