[#111] Use request scope logger
Some checks failed
/ Lint (pull_request) Failing after 32s
/ Tests (1.19) (pull_request) Failing after 32s
/ Tests (1.20) (pull_request) Failing after 32s
/ Builds (1.19) (pull_request) Failing after 32s
/ Builds (1.20) (pull_request) Failing after 32s
/ Vulncheck (pull_request) Failing after 57s
/ DCO (pull_request) Failing after 1m0s
Some checks failed
/ Lint (pull_request) Failing after 32s
/ Tests (1.19) (pull_request) Failing after 32s
/ Tests (1.20) (pull_request) Failing after 32s
/ Builds (1.19) (pull_request) Failing after 32s
/ Builds (1.20) (pull_request) Failing after 32s
/ Vulncheck (pull_request) Failing after 57s
/ DCO (pull_request) Failing after 1m0s
Store child zap logger with request scope fields into context. Request scoped fields: request_id, api/method, bucket, object Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
dfc4476afd
commit
23593eee3d
22 changed files with 269 additions and 197 deletions
|
@ -45,6 +45,7 @@ This document outlines major changes between releases.
|
|||
- Support multiple tree service endpoints (#74)
|
||||
- Timeout errors has code 504 now (#103)
|
||||
- Support multiple version credentials using GSet (#135)
|
||||
- Use request scope logger (#111)
|
||||
|
||||
### Removed
|
||||
- Drop `tree.service` param (now endpoints from `peers` section are used) (#133)
|
||||
|
|
|
@ -243,7 +243,8 @@ func (s *statement) UnmarshalJSON(data []byte) error {
|
|||
}
|
||||
|
||||
func (h *handler) GetBucketACLHandler(w http.ResponseWriter, r *http.Request) {
|
||||
reqInfo := api.GetReqInfo(r.Context())
|
||||
ctx := r.Context()
|
||||
reqInfo := api.GetReqInfo(ctx)
|
||||
|
||||
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
|
||||
if err != nil {
|
||||
|
@ -251,13 +252,13 @@ func (h *handler) GetBucketACLHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
bucketACL, err := h.obj.GetBucketACL(r.Context(), bktInfo)
|
||||
bucketACL, err := h.obj.GetBucketACL(ctx, bktInfo)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not fetch bucket acl", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err = api.EncodeToResponse(w, h.encodeBucketACL(bktInfo.Name, bucketACL)); err != nil {
|
||||
if err = api.EncodeToResponse(w, h.encodeBucketACL(ctx, bktInfo.Name, bucketACL)); err != nil {
|
||||
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
@ -365,7 +366,8 @@ func (h *handler) updateBucketACL(r *http.Request, astChild *ast, bktInfo *data.
|
|||
}
|
||||
|
||||
func (h *handler) GetObjectACLHandler(w http.ResponseWriter, r *http.Request) {
|
||||
reqInfo := api.GetReqInfo(r.Context())
|
||||
ctx := r.Context()
|
||||
reqInfo := api.GetReqInfo(ctx)
|
||||
|
||||
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
|
||||
if err != nil {
|
||||
|
@ -373,7 +375,7 @@ func (h *handler) GetObjectACLHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
bucketACL, err := h.obj.GetBucketACL(r.Context(), bktInfo)
|
||||
bucketACL, err := h.obj.GetBucketACL(ctx, bktInfo)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not fetch bucket acl", reqInfo, err)
|
||||
return
|
||||
|
@ -385,27 +387,28 @@ func (h *handler) GetObjectACLHandler(w http.ResponseWriter, r *http.Request) {
|
|||
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
|
||||
}
|
||||
|
||||
objInfo, err := h.obj.GetObjectInfo(r.Context(), prm)
|
||||
objInfo, err := h.obj.GetObjectInfo(ctx, prm)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not object info", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err = api.EncodeToResponse(w, h.encodeObjectACL(bucketACL, reqInfo.BucketName, objInfo.VersionID())); err != nil {
|
||||
if err = api.EncodeToResponse(w, h.encodeObjectACL(ctx, bucketACL, reqInfo.BucketName, objInfo.VersionID())); err != nil {
|
||||
h.logAndSendError(w, "failed to encode response", reqInfo, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) PutObjectACLHandler(w http.ResponseWriter, r *http.Request) {
|
||||
reqInfo := api.GetReqInfo(r.Context())
|
||||
ctx := r.Context()
|
||||
reqInfo := api.GetReqInfo(ctx)
|
||||
versionID := reqInfo.URL.Query().Get(api.QueryVersionID)
|
||||
key, err := h.bearerTokenIssuerKey(r.Context())
|
||||
key, err := h.bearerTokenIssuerKey(ctx)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "couldn't get gate key", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
token, err := getSessionTokenSetEACL(r.Context())
|
||||
token, err := getSessionTokenSetEACL(ctx)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "couldn't get eacl token", reqInfo, err)
|
||||
return
|
||||
|
@ -423,7 +426,7 @@ func (h *handler) PutObjectACLHandler(w http.ResponseWriter, r *http.Request) {
|
|||
VersionID: versionID,
|
||||
}
|
||||
|
||||
objInfo, err := h.obj.GetObjectInfo(r.Context(), p)
|
||||
objInfo, err := h.obj.GetObjectInfo(ctx, p)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get object info", reqInfo, err)
|
||||
return
|
||||
|
@ -465,8 +468,8 @@ func (h *handler) PutObjectACLHandler(w http.ResponseWriter, r *http.Request) {
|
|||
BktInfo: bktInfo,
|
||||
ReqInfo: reqInfo,
|
||||
}
|
||||
if err = h.sendNotifications(r.Context(), s); err != nil {
|
||||
h.log.Error("couldn't send notification: %w", zap.Error(err))
|
||||
if err = h.sendNotifications(ctx, s); err != nil {
|
||||
h.reqLogger(ctx).Error("couldn't send notification: %w", zap.Error(err))
|
||||
}
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
@ -1410,7 +1413,7 @@ func isWriteOperation(op eacl.Operation) bool {
|
|||
return op == eacl.OperationDelete || op == eacl.OperationPut
|
||||
}
|
||||
|
||||
func (h *handler) encodeObjectACL(bucketACL *layer.BucketACL, bucketName, objectVersion string) *AccessControlPolicy {
|
||||
func (h *handler) encodeObjectACL(ctx context.Context, bucketACL *layer.BucketACL, bucketName, objectVersion string) *AccessControlPolicy {
|
||||
res := &AccessControlPolicy{
|
||||
Owner: Owner{
|
||||
ID: bucketACL.Info.Owner.String(),
|
||||
|
@ -1456,7 +1459,7 @@ func (h *handler) encodeObjectACL(bucketACL *layer.BucketACL, bucketName, object
|
|||
if read {
|
||||
permission = aclFullControl
|
||||
} else {
|
||||
h.log.Warn("some acl not fully mapped")
|
||||
h.reqLogger(ctx).Warn("some acl not fully mapped")
|
||||
}
|
||||
|
||||
var grantee *Grantee
|
||||
|
@ -1478,8 +1481,8 @@ func (h *handler) encodeObjectACL(bucketACL *layer.BucketACL, bucketName, object
|
|||
return res
|
||||
}
|
||||
|
||||
func (h *handler) encodeBucketACL(bucketName string, bucketACL *layer.BucketACL) *AccessControlPolicy {
|
||||
return h.encodeObjectACL(bucketACL, bucketName, "")
|
||||
func (h *handler) encodeBucketACL(ctx context.Context, bucketName string, bucketACL *layer.BucketACL) *AccessControlPolicy {
|
||||
return h.encodeObjectACL(ctx, bucketACL, bucketName, "")
|
||||
}
|
||||
|
||||
func contains(list []eacl.Operation, op eacl.Operation) bool {
|
||||
|
|
|
@ -46,7 +46,8 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
tagSet map[string]string
|
||||
sessionTokenEACL *session.Container
|
||||
|
||||
reqInfo = api.GetReqInfo(r.Context())
|
||||
ctx = r.Context()
|
||||
reqInfo = api.GetReqInfo(ctx)
|
||||
|
||||
containsACL = containsACLHeaders(r)
|
||||
)
|
||||
|
@ -84,20 +85,20 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
settings, err := h.obj.GetBucketSettings(r.Context(), dstBktInfo)
|
||||
settings, err := h.obj.GetBucketSettings(ctx, dstBktInfo)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
if containsACL {
|
||||
if sessionTokenEACL, err = getSessionTokenSetEACL(r.Context()); err != nil {
|
||||
if sessionTokenEACL, err = getSessionTokenSetEACL(ctx); err != nil {
|
||||
h.logAndSendError(w, "could not get eacl session token from a box", reqInfo, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
extendedSrcObjInfo, err := h.obj.GetExtendedObjectInfo(r.Context(), srcObjPrm)
|
||||
extendedSrcObjInfo, err := h.obj.GetExtendedObjectInfo(ctx, srcObjPrm)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not find object", reqInfo, err)
|
||||
return
|
||||
|
@ -135,7 +136,7 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
NodeVersion: extendedSrcObjInfo.NodeVersion,
|
||||
}
|
||||
|
||||
_, tagSet, err = h.obj.GetObjectTagging(r.Context(), tagPrm)
|
||||
_, tagSet, err = h.obj.GetObjectTagging(ctx, tagPrm)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get object tagging", reqInfo, err)
|
||||
return
|
||||
|
@ -183,14 +184,14 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
params.Lock, err = formObjectLock(r.Context(), dstBktInfo, settings.LockConfiguration, r.Header)
|
||||
params.Lock, err = formObjectLock(ctx, dstBktInfo, settings.LockConfiguration, r.Header)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not form object lock", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
additional := []zap.Field{zap.String("src_bucket_name", srcBucket), zap.String("src_object_name", srcObject)}
|
||||
extendedDstObjInfo, err := h.obj.CopyObject(r.Context(), params)
|
||||
extendedDstObjInfo, err := h.obj.CopyObject(ctx, params)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "couldn't copy object", reqInfo, err, additional...)
|
||||
return
|
||||
|
@ -215,7 +216,7 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
SessionToken: sessionTokenEACL,
|
||||
}
|
||||
|
||||
if err = h.obj.PutBucketACL(r.Context(), p); err != nil {
|
||||
if err = h.obj.PutBucketACL(ctx, p); err != nil {
|
||||
h.logAndSendError(w, "could not put bucket acl", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
@ -231,16 +232,13 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
TagSet: tagSet,
|
||||
NodeVersion: extendedDstObjInfo.NodeVersion,
|
||||
}
|
||||
if _, err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
|
||||
if _, err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
|
||||
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
h.log.Info("object is copied",
|
||||
zap.String("bucket", dstObjInfo.Bucket),
|
||||
zap.String("object", dstObjInfo.Name),
|
||||
zap.Stringer("object_id", dstObjInfo.ID))
|
||||
h.reqLogger(ctx).Info("object is copied", zap.Stringer("object_id", dstObjInfo.ID))
|
||||
|
||||
s := &SendNotificationParams{
|
||||
Event: EventObjectCreatedCopy,
|
||||
|
@ -248,8 +246,8 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
BktInfo: dstBktInfo,
|
||||
ReqInfo: reqInfo,
|
||||
}
|
||||
if err = h.sendNotifications(r.Context(), s); err != nil {
|
||||
h.log.Error("couldn't send notification: %w", zap.Error(err))
|
||||
if err = h.sendNotifications(ctx, s); err != nil {
|
||||
h.reqLogger(ctx).Error("couldn't send notification: %w", zap.Error(err))
|
||||
}
|
||||
|
||||
if encryptionParams.Enabled() {
|
||||
|
|
|
@ -90,19 +90,21 @@ func (h *handler) AppendCORSHeaders(w http.ResponseWriter, r *http.Request) {
|
|||
if origin == "" {
|
||||
return
|
||||
}
|
||||
reqInfo := api.GetReqInfo(r.Context())
|
||||
|
||||
ctx := r.Context()
|
||||
reqInfo := api.GetReqInfo(ctx)
|
||||
if reqInfo.BucketName == "" {
|
||||
return
|
||||
}
|
||||
bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName)
|
||||
bktInfo, err := h.obj.GetBucketInfo(ctx, reqInfo.BucketName)
|
||||
if err != nil {
|
||||
h.log.Warn("get bucket info", zap.Error(err))
|
||||
h.reqLogger(ctx).Warn("get bucket info", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
cors, err := h.obj.GetBucketCORS(r.Context(), bktInfo)
|
||||
cors, err := h.obj.GetBucketCORS(ctx, bktInfo)
|
||||
if err != nil {
|
||||
h.log.Warn("get bucket cors", zap.Error(err))
|
||||
h.reqLogger(ctx).Warn("get bucket cors", zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -61,7 +61,8 @@ type DeleteObjectsResponse struct {
|
|||
}
|
||||
|
||||
func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||
reqInfo := api.GetReqInfo(r.Context())
|
||||
ctx := r.Context()
|
||||
reqInfo := api.GetReqInfo(ctx)
|
||||
versionID := reqInfo.URL.Query().Get(api.QueryVersionID)
|
||||
versionedObject := []*layer.VersionedObject{{
|
||||
Name: reqInfo.ObjectName,
|
||||
|
@ -74,7 +75,7 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
bktSettings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
|
||||
bktSettings, err := h.obj.GetBucketSettings(ctx, bktInfo)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
|
||||
return
|
||||
|
@ -85,7 +86,7 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
Objects: versionedObject,
|
||||
Settings: bktSettings,
|
||||
}
|
||||
deletedObjects := h.obj.DeleteObjects(r.Context(), p)
|
||||
deletedObjects := h.obj.DeleteObjects(ctx, p)
|
||||
deletedObject := deletedObjects[0]
|
||||
if deletedObject.Error != nil {
|
||||
if isErrObjectLocked(deletedObject.Error) {
|
||||
|
@ -112,7 +113,7 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
var objID oid.ID
|
||||
if len(versionID) != 0 {
|
||||
if err = objID.DecodeString(versionID); err != nil {
|
||||
h.log.Error("couldn't send notification: %w", zap.Error(err))
|
||||
h.reqLogger(ctx).Error("couldn't send notification: %w", zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -127,8 +128,8 @@ func (h *handler) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
if err = h.sendNotifications(r.Context(), m); err != nil {
|
||||
h.log.Error("couldn't send notification: %w", zap.Error(err))
|
||||
if err = h.sendNotifications(ctx, m); err != nil {
|
||||
h.reqLogger(ctx).Error("couldn't send notification: %w", zap.Error(err))
|
||||
}
|
||||
|
||||
if deletedObject.VersionID != "" {
|
||||
|
@ -156,7 +157,8 @@ func isErrObjectLocked(err error) bool {
|
|||
|
||||
// DeleteMultipleObjectsHandler handles multiple delete requests.
|
||||
func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) {
|
||||
reqInfo := api.GetReqInfo(r.Context())
|
||||
ctx := r.Context()
|
||||
reqInfo := api.GetReqInfo(ctx)
|
||||
|
||||
// Content-Md5 is required and should be set
|
||||
// http://docs.aws.amazon.com/AmazonS3/latest/API/multiobjectdeleteapi.html
|
||||
|
@ -206,7 +208,7 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re
|
|||
return
|
||||
}
|
||||
|
||||
bktSettings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
|
||||
bktSettings, err := h.obj.GetBucketSettings(ctx, bktInfo)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
|
||||
return
|
||||
|
@ -224,7 +226,7 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re
|
|||
Objects: toRemove,
|
||||
Settings: bktSettings,
|
||||
}
|
||||
deletedObjects := h.obj.DeleteObjects(r.Context(), p)
|
||||
deletedObjects := h.obj.DeleteObjects(ctx, p)
|
||||
|
||||
var errs []error
|
||||
for _, obj := range deletedObjects {
|
||||
|
@ -259,7 +261,7 @@ func (h *handler) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Re
|
|||
zap.Array("objects", marshaler),
|
||||
zap.Errors("errors", errs),
|
||||
}
|
||||
h.log.Error("couldn't delete objects", fields...)
|
||||
h.reqLogger(ctx).Error("couldn't delete objects", fields...)
|
||||
}
|
||||
|
||||
if err = api.EncodeToResponse(w, response); err != nil {
|
||||
|
|
|
@ -255,7 +255,8 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
|
|||
func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
|
||||
var (
|
||||
versionID string
|
||||
reqInfo = api.GetReqInfo(r.Context())
|
||||
ctx = r.Context()
|
||||
reqInfo = api.GetReqInfo(ctx)
|
||||
queryValues = reqInfo.URL.Query()
|
||||
uploadID = queryValues.Get(uploadIDHeaderName)
|
||||
additional = []zap.Field{zap.String("uploadID", uploadID), zap.String("Key", reqInfo.ObjectName)}
|
||||
|
@ -297,7 +298,7 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
srcInfo, err := h.obj.GetObjectInfo(r.Context(), &layer.HeadObjectParams{
|
||||
srcInfo, err := h.obj.GetObjectInfo(ctx, &layer.HeadObjectParams{
|
||||
BktInfo: srcBktInfo,
|
||||
Object: srcObject,
|
||||
VersionID: versionID,
|
||||
|
@ -348,7 +349,7 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
info, err := h.obj.UploadPartCopy(r.Context(), p)
|
||||
info, err := h.obj.UploadPartCopy(ctx, p)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not upload part copy", reqInfo, err, additional...)
|
||||
return
|
||||
|
@ -445,7 +446,8 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
|
|||
}
|
||||
|
||||
func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMultipartParams, bktInfo *data.BucketInfo, reqInfo *api.ReqInfo) (*data.ObjectInfo, error) {
|
||||
uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(r.Context(), c)
|
||||
ctx := r.Context()
|
||||
uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(ctx, c)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not complete multipart upload: %w", err)
|
||||
}
|
||||
|
@ -461,17 +463,17 @@ func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMult
|
|||
TagSet: uploadData.TagSet,
|
||||
NodeVersion: extendedObjInfo.NodeVersion,
|
||||
}
|
||||
if _, err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
|
||||
if _, err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
|
||||
return nil, fmt.Errorf("could not put tagging file of completed multipart upload: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(uploadData.ACLHeaders) != 0 {
|
||||
sessionTokenSetEACL, err := getSessionTokenSetEACL(r.Context())
|
||||
sessionTokenSetEACL, err := getSessionTokenSetEACL(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't get eacl token: %w", err)
|
||||
}
|
||||
key, err := h.bearerTokenIssuerKey(r.Context())
|
||||
key, err := h.bearerTokenIssuerKey(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't get gate key: %w", err)
|
||||
}
|
||||
|
@ -499,8 +501,8 @@ func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMult
|
|||
BktInfo: bktInfo,
|
||||
ReqInfo: reqInfo,
|
||||
}
|
||||
if err = h.sendNotifications(r.Context(), s); err != nil {
|
||||
h.log.Error("couldn't send notification: %w", zap.Error(err))
|
||||
if err = h.sendNotifications(ctx, s); err != nil {
|
||||
h.reqLogger(ctx).Error("couldn't send notification: %w", zap.Error(err))
|
||||
}
|
||||
|
||||
return objInfo, nil
|
||||
|
|
|
@ -202,7 +202,7 @@ func (h *handler) checkBucketConfiguration(ctx context.Context, conf *data.Notif
|
|||
return
|
||||
}
|
||||
} else {
|
||||
h.log.Warn("failed to send test event because notifications is disabled")
|
||||
h.reqLogger(ctx).Warn("failed to send test event because notifications is disabled")
|
||||
}
|
||||
|
||||
if q.ID == "" {
|
||||
|
|
|
@ -179,7 +179,8 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
newEaclTable *eacl.Table
|
||||
sessionTokenEACL *session.Container
|
||||
containsACL = containsACLHeaders(r)
|
||||
reqInfo = api.GetReqInfo(r.Context())
|
||||
ctx = r.Context()
|
||||
reqInfo = api.GetReqInfo(ctx)
|
||||
)
|
||||
|
||||
if containsACL {
|
||||
|
@ -238,19 +239,19 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
|
||||
settings, err := h.obj.GetBucketSettings(ctx, bktInfo)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
params.Lock, err = formObjectLock(r.Context(), bktInfo, settings.LockConfiguration, r.Header)
|
||||
params.Lock, err = formObjectLock(ctx, bktInfo, settings.LockConfiguration, r.Header)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not form object lock", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
extendedObjInfo, err := h.obj.PutObject(r.Context(), params)
|
||||
extendedObjInfo, err := h.obj.PutObject(ctx, params)
|
||||
if err != nil {
|
||||
_, err2 := io.Copy(io.Discard, r.Body)
|
||||
err3 := r.Body.Close()
|
||||
|
@ -265,8 +266,8 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
BktInfo: bktInfo,
|
||||
ReqInfo: reqInfo,
|
||||
}
|
||||
if err = h.sendNotifications(r.Context(), s); err != nil {
|
||||
h.log.Error("couldn't send notification: %w", zap.Error(err))
|
||||
if err = h.sendNotifications(ctx, s); err != nil {
|
||||
h.reqLogger(ctx).Error("couldn't send notification: %w", zap.Error(err))
|
||||
}
|
||||
|
||||
if containsACL {
|
||||
|
@ -365,7 +366,8 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
|||
newEaclTable *eacl.Table
|
||||
tagSet map[string]string
|
||||
sessionTokenEACL *session.Container
|
||||
reqInfo = api.GetReqInfo(r.Context())
|
||||
ctx = r.Context()
|
||||
reqInfo = api.GetReqInfo(ctx)
|
||||
metadata = make(map[string]string)
|
||||
containsACL = containsACLHeaders(r)
|
||||
)
|
||||
|
@ -386,7 +388,7 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
if containsACL {
|
||||
if sessionTokenEACL, err = getSessionTokenSetEACL(r.Context()); err != nil {
|
||||
if sessionTokenEACL, err = getSessionTokenSetEACL(ctx); err != nil {
|
||||
h.logAndSendError(w, "could not get eacl session token from a box", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
@ -412,7 +414,7 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
bktInfo, err := h.obj.GetBucketInfo(r.Context(), reqInfo.BucketName)
|
||||
bktInfo, err := h.obj.GetBucketInfo(ctx, reqInfo.BucketName)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
|
||||
return
|
||||
|
@ -426,7 +428,7 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
|||
Header: metadata,
|
||||
}
|
||||
|
||||
extendedObjInfo, err := h.obj.PutObject(r.Context(), params)
|
||||
extendedObjInfo, err := h.obj.PutObject(ctx, params)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not upload object", reqInfo, err)
|
||||
return
|
||||
|
@ -439,8 +441,8 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
|||
BktInfo: bktInfo,
|
||||
ReqInfo: reqInfo,
|
||||
}
|
||||
if err = h.sendNotifications(r.Context(), s); err != nil {
|
||||
h.log.Error("couldn't send notification: %w", zap.Error(err))
|
||||
if err = h.sendNotifications(ctx, s); err != nil {
|
||||
h.reqLogger(ctx).Error("couldn't send notification: %w", zap.Error(err))
|
||||
}
|
||||
|
||||
if acl := auth.MultipartFormValue(r, "acl"); acl != "" {
|
||||
|
@ -465,7 +467,7 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
|||
NodeVersion: extendedObjInfo.NodeVersion,
|
||||
}
|
||||
|
||||
if _, err = h.obj.PutObjectTagging(r.Context(), tagPrm); err != nil {
|
||||
if _, err = h.obj.PutObjectTagging(ctx, tagPrm); err != nil {
|
||||
h.logAndSendError(w, "could not upload object tagging", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
@ -478,14 +480,14 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
|||
SessionToken: sessionTokenEACL,
|
||||
}
|
||||
|
||||
if err = h.obj.PutBucketACL(r.Context(), p); err != nil {
|
||||
if err = h.obj.PutBucketACL(ctx, p); err != nil {
|
||||
h.logAndSendError(w, "could not put bucket acl", reqInfo, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo); err != nil {
|
||||
h.log.Warn("couldn't get bucket versioning", zap.String("bucket name", reqInfo.BucketName), zap.Error(err))
|
||||
if settings, err := h.obj.GetBucketSettings(ctx, bktInfo); err != nil {
|
||||
h.reqLogger(ctx).Warn("couldn't get bucket versioning", zap.String("bucket name", reqInfo.BucketName), zap.Error(err))
|
||||
} else if settings.VersioningEnabled() {
|
||||
w.Header().Set(api.AmzVersionID, objInfo.VersionID())
|
||||
}
|
||||
|
@ -657,7 +659,8 @@ func parseMetadata(r *http.Request) map[string]string {
|
|||
}
|
||||
|
||||
func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) {
|
||||
reqInfo := api.GetReqInfo(r.Context())
|
||||
ctx := r.Context()
|
||||
reqInfo := api.GetReqInfo(ctx)
|
||||
p := &layer.CreateBucketParams{
|
||||
Name: reqInfo.BucketName,
|
||||
}
|
||||
|
@ -667,7 +670,7 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
key, err := h.bearerTokenIssuerKey(r.Context())
|
||||
key, err := h.bearerTokenIssuerKey(ctx)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "couldn't get bearer token signature key", reqInfo, err)
|
||||
return
|
||||
|
@ -693,7 +696,7 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
var policies []*accessbox.ContainerPolicy
|
||||
boxData, err := layer.GetBoxData(r.Context())
|
||||
boxData, err := layer.GetBoxData(ctx)
|
||||
if err == nil {
|
||||
policies = boxData.Policies
|
||||
p.SessionContainerCreation = boxData.Gate.SessionTokenForPut()
|
||||
|
@ -717,21 +720,20 @@ func (h *handler) CreateBucketHandler(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
p.ObjectLockEnabled = isLockEnabled(r.Header)
|
||||
|
||||
bktInfo, err := h.obj.CreateBucket(r.Context(), p)
|
||||
bktInfo, err := h.obj.CreateBucket(ctx, p)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not create bucket", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
h.log.Info("bucket is created", zap.String("reqId", reqInfo.RequestID),
|
||||
zap.String("bucket", reqInfo.BucketName), zap.Stringer("container_id", bktInfo.CID))
|
||||
h.reqLogger(ctx).Info("bucket is created", zap.Stringer("container_id", bktInfo.CID))
|
||||
|
||||
if p.ObjectLockEnabled {
|
||||
sp := &layer.PutSettingsParams{
|
||||
BktInfo: bktInfo,
|
||||
Settings: &data.BucketSettings{Versioning: data.VersioningEnabled},
|
||||
}
|
||||
if err = h.obj.PutBucketSettings(r.Context(), sp); err != nil {
|
||||
if err = h.obj.PutBucketSettings(ctx, sp); err != nil {
|
||||
h.logAndSendError(w, "couldn't enable bucket versioning", reqInfo, err,
|
||||
zap.String("container_id", bktInfo.CID.EncodeToString()))
|
||||
return
|
||||
|
|
|
@ -24,7 +24,8 @@ const (
|
|||
)
|
||||
|
||||
func (h *handler) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
|
||||
reqInfo := api.GetReqInfo(r.Context())
|
||||
ctx := r.Context()
|
||||
reqInfo := api.GetReqInfo(ctx)
|
||||
|
||||
tagSet, err := readTagSet(r.Body)
|
||||
if err != nil {
|
||||
|
@ -46,7 +47,7 @@ func (h *handler) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request
|
|||
},
|
||||
TagSet: tagSet,
|
||||
}
|
||||
nodeVersion, err := h.obj.PutObjectTagging(r.Context(), tagPrm)
|
||||
nodeVersion, err := h.obj.PutObjectTagging(ctx, tagPrm)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not put object tagging", reqInfo, err)
|
||||
return
|
||||
|
@ -63,8 +64,8 @@ func (h *handler) PutObjectTaggingHandler(w http.ResponseWriter, r *http.Request
|
|||
BktInfo: bktInfo,
|
||||
ReqInfo: reqInfo,
|
||||
}
|
||||
if err = h.sendNotifications(r.Context(), s); err != nil {
|
||||
h.log.Error("couldn't send notification: %w", zap.Error(err))
|
||||
if err = h.sendNotifications(ctx, s); err != nil {
|
||||
h.reqLogger(ctx).Error("couldn't send notification: %w", zap.Error(err))
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
|
@ -108,7 +109,8 @@ func (h *handler) GetObjectTaggingHandler(w http.ResponseWriter, r *http.Request
|
|||
}
|
||||
|
||||
func (h *handler) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Request) {
|
||||
reqInfo := api.GetReqInfo(r.Context())
|
||||
ctx := r.Context()
|
||||
reqInfo := api.GetReqInfo(ctx)
|
||||
|
||||
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
|
||||
if err != nil {
|
||||
|
@ -122,7 +124,7 @@ func (h *handler) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Requ
|
|||
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
|
||||
}
|
||||
|
||||
nodeVersion, err := h.obj.DeleteObjectTagging(r.Context(), p)
|
||||
nodeVersion, err := h.obj.DeleteObjectTagging(ctx, p)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not delete object tagging", reqInfo, err)
|
||||
return
|
||||
|
@ -139,8 +141,8 @@ func (h *handler) DeleteObjectTaggingHandler(w http.ResponseWriter, r *http.Requ
|
|||
BktInfo: bktInfo,
|
||||
ReqInfo: reqInfo,
|
||||
}
|
||||
if err = h.sendNotifications(r.Context(), s); err != nil {
|
||||
h.log.Error("couldn't send notification: %w", zap.Error(err))
|
||||
if err = h.sendNotifications(ctx, s); err != nil {
|
||||
h.reqLogger(ctx).Error("couldn't send notification: %w", zap.Error(err))
|
||||
}
|
||||
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
|
|
|
@ -15,6 +15,14 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (h *handler) reqLogger(ctx context.Context) *zap.Logger {
|
||||
reqLogger := api.GetReqLog(ctx)
|
||||
if reqLogger != nil {
|
||||
return reqLogger
|
||||
}
|
||||
return h.log
|
||||
}
|
||||
|
||||
func (h *handler) logAndSendError(w http.ResponseWriter, logText string, reqInfo *api.ReqInfo, err error, additional ...zap.Field) {
|
||||
code := api.WriteErrorResponse(w, reqInfo, transformToS3Error(err))
|
||||
fields := []zap.Field{
|
||||
|
@ -26,7 +34,7 @@ func (h *handler) logAndSendError(w http.ResponseWriter, logText string, reqInfo
|
|||
zap.String("description", logText),
|
||||
zap.Error(err)}
|
||||
fields = append(fields, additional...)
|
||||
h.log.Error("call method", fields...)
|
||||
h.log.Error("reqeust failed", fields...) // consider using h.reqLogger (it requires accept context.Context or http.Request)
|
||||
}
|
||||
|
||||
func (h *handler) logAndSendErrorNoHeader(w http.ResponseWriter, logText string, reqInfo *api.ReqInfo, err error, additional ...zap.Field) {
|
||||
|
@ -39,7 +47,7 @@ func (h *handler) logAndSendErrorNoHeader(w http.ResponseWriter, logText string,
|
|||
zap.String("description", logText),
|
||||
zap.Error(err)}
|
||||
fields = append(fields, additional...)
|
||||
h.log.Error("call method", fields...)
|
||||
h.log.Error("reqeust failed", fields...) // consider using h.reqLogger (it requires accept context.Context or http.Request)
|
||||
}
|
||||
|
||||
func transformToS3Error(err error) error {
|
||||
|
|
|
@ -34,8 +34,7 @@ func (n *layer) containerInfo(ctx context.Context, idCnr cid.ID) (*data.BucketIn
|
|||
var (
|
||||
err error
|
||||
res *container.Container
|
||||
rid = api.GetRequestID(ctx)
|
||||
log = n.log.With(zap.Stringer("cid", idCnr), zap.String("request_id", rid))
|
||||
log = n.reqLogger(ctx).With(zap.Stringer("cid", idCnr))
|
||||
|
||||
info = &data.BucketInfo{
|
||||
CID: idCnr,
|
||||
|
@ -83,13 +82,10 @@ func (n *layer) containerList(ctx context.Context) ([]*data.BucketInfo, error) {
|
|||
err error
|
||||
own = n.Owner(ctx)
|
||||
res []cid.ID
|
||||
rid = api.GetRequestID(ctx)
|
||||
)
|
||||
res, err = n.frostFS.UserContainers(ctx, own)
|
||||
if err != nil {
|
||||
n.log.Error("could not list user containers",
|
||||
zap.String("request_id", rid),
|
||||
zap.Error(err))
|
||||
n.reqLogger(ctx).Error("could not list user containers", zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -97,9 +93,7 @@ func (n *layer) containerList(ctx context.Context) ([]*data.BucketInfo, error) {
|
|||
for i := range res {
|
||||
info, err := n.containerInfo(ctx, res[i])
|
||||
if err != nil {
|
||||
n.log.Error("could not fetch container info",
|
||||
zap.String("request_id", rid),
|
||||
zap.Error(err))
|
||||
n.reqLogger(ctx).Error("could not fetch container info", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
@ -58,9 +58,8 @@ func (n *layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
|
|||
|
||||
if !objIDToDeleteNotFound {
|
||||
if err = n.objectDelete(ctx, p.BktInfo, objIDToDelete); err != nil {
|
||||
n.log.Error("couldn't delete cors object", zap.Error(err),
|
||||
n.reqLogger(ctx).Error("couldn't delete cors object", zap.Error(err),
|
||||
zap.String("cnrID", p.BktInfo.CID.EncodeToString()),
|
||||
zap.String("bucket name", p.BktInfo.Name),
|
||||
zap.String("objID", objIDToDelete.EncodeToString()))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -328,6 +328,14 @@ func (n *layer) Owner(ctx context.Context) user.ID {
|
|||
return ownerID
|
||||
}
|
||||
|
||||
func (n *layer) reqLogger(ctx context.Context) *zap.Logger {
|
||||
reqLogger := api.GetReqLog(ctx)
|
||||
if reqLogger != nil {
|
||||
return reqLogger
|
||||
}
|
||||
return n.log
|
||||
}
|
||||
|
||||
func (n *layer) prepareAuthParameters(ctx context.Context, prm *PrmAuth, bktOwner user.ID) {
|
||||
if bd, ok := ctx.Value(api.BoxData).(*accessbox.Box); ok && bd != nil && bd.Gate != nil && bd.Gate.BearerToken != nil {
|
||||
if bd.Gate.BearerToken.Impersonate() || bktOwner.Equals(bearer.ResolveIssuer(*bd.Gate.BearerToken)) {
|
||||
|
@ -352,9 +360,11 @@ func (n *layer) GetBucketInfo(ctx context.Context, name string) (*data.BucketInf
|
|||
|
||||
containerID, err := n.ResolveBucket(ctx, name)
|
||||
if err != nil {
|
||||
n.log.Debug("bucket not found", zap.Error(err))
|
||||
if strings.Contains(err.Error(), "not found") {
|
||||
return nil, errors.GetAPIError(errors.ErrNoSuchBucket)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return n.containerInfo(ctx, containerID)
|
||||
}
|
||||
|
@ -495,12 +505,8 @@ func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams)
|
|||
return nil, err
|
||||
}
|
||||
|
||||
reqInfo := api.GetReqInfo(ctx)
|
||||
n.log.Debug("get object",
|
||||
zap.String("reqId", reqInfo.RequestID),
|
||||
zap.String("bucket", p.BktInfo.Name),
|
||||
n.reqLogger(ctx).Debug("get object",
|
||||
zap.Stringer("cid", p.BktInfo.CID),
|
||||
zap.String("object", objInfo.ObjectInfo.Name),
|
||||
zap.Stringer("oid", objInfo.ObjectInfo.ID))
|
||||
|
||||
return objInfo, nil
|
||||
|
@ -520,7 +526,7 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.Exte
|
|||
})
|
||||
|
||||
if err = pw.CloseWithError(err); err != nil {
|
||||
n.log.Error("could not get object", zap.Error(err))
|
||||
n.reqLogger(ctx).Error("could not get object", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -635,8 +641,8 @@ func (n *layer) handleNotFoundError(bkt *data.BucketInfo, obj *VersionedObject)
|
|||
|
||||
func (n *layer) handleObjectDeleteErrors(ctx context.Context, bkt *data.BucketInfo, obj *VersionedObject, nodeID uint64) *VersionedObject {
|
||||
if client.IsErrObjectAlreadyRemoved(obj.Error) {
|
||||
n.log.Debug("object already removed", zap.String("bucket", bkt.Name), zap.Stringer("cid", bkt.CID),
|
||||
zap.String("object", obj.Name), zap.String("oid", obj.VersionID))
|
||||
n.reqLogger(ctx).Debug("object already removed",
|
||||
zap.Stringer("cid", bkt.CID), zap.String("oid", obj.VersionID))
|
||||
|
||||
obj.Error = n.treeService.RemoveVersion(ctx, bkt, nodeID)
|
||||
if obj.Error != nil {
|
||||
|
@ -647,8 +653,8 @@ func (n *layer) handleObjectDeleteErrors(ctx context.Context, bkt *data.BucketIn
|
|||
}
|
||||
|
||||
if client.IsErrObjectNotFound(obj.Error) {
|
||||
n.log.Debug("object not found", zap.String("bucket", bkt.Name), zap.Stringer("cid", bkt.CID),
|
||||
zap.String("object", obj.Name), zap.String("oid", obj.VersionID))
|
||||
n.reqLogger(ctx).Debug("object not found",
|
||||
zap.Stringer("cid", bkt.CID), zap.String("oid", obj.VersionID))
|
||||
|
||||
obj.Error = nil
|
||||
|
||||
|
@ -725,8 +731,7 @@ func (n *layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error)
|
|||
return cid.ID{}, err
|
||||
}
|
||||
|
||||
reqInfo := api.GetReqInfo(ctx)
|
||||
n.log.Info("resolve bucket", zap.String("reqId", reqInfo.RequestID), zap.String("bucket", name), zap.Stringer("cid", cnrID))
|
||||
n.reqLogger(ctx).Info("resolve bucket", zap.Stringer("cid", cnrID))
|
||||
}
|
||||
|
||||
return cnrID, nil
|
||||
|
|
|
@ -11,7 +11,6 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
||||
|
@ -196,7 +195,7 @@ func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, er
|
|||
func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInfo, p *UploadPartParams) (*data.ObjectInfo, error) {
|
||||
encInfo := FormEncryptionInfo(multipartInfo.Meta)
|
||||
if err := p.Info.Encryption.MatchObjectEncryption(encInfo); err != nil {
|
||||
n.log.Warn("mismatched obj encryptionInfo", zap.Error(err))
|
||||
n.reqLogger(ctx).Warn("mismatched obj encryptionInfo", zap.Error(err))
|
||||
return nil, errors.GetAPIError(errors.ErrInvalidEncryptionParameters)
|
||||
}
|
||||
|
||||
|
@ -232,12 +231,9 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
|
|||
size = decSize
|
||||
}
|
||||
|
||||
reqInfo := api.GetReqInfo(ctx)
|
||||
n.log.Debug("upload part",
|
||||
zap.String("reqId", reqInfo.RequestID),
|
||||
zap.String("bucket", bktInfo.Name), zap.Stringer("cid", bktInfo.CID),
|
||||
zap.String("multipart upload", p.Info.UploadID),
|
||||
zap.Int("part number", p.PartNumber), zap.String("object", p.Info.Key), zap.Stringer("oid", id))
|
||||
n.reqLogger(ctx).Debug("upload part",
|
||||
zap.String("multipart upload", p.Info.UploadID), zap.Int("part number", p.PartNumber),
|
||||
zap.Stringer("cid", bktInfo.CID), zap.Stringer("oid", id))
|
||||
|
||||
partInfo := &data.PartInfo{
|
||||
Key: p.Info.Key,
|
||||
|
@ -256,10 +252,9 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
|
|||
}
|
||||
if !oldPartIDNotFound {
|
||||
if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil {
|
||||
n.log.Error("couldn't delete old part object", zap.Error(err),
|
||||
zap.String("cnrID", bktInfo.CID.EncodeToString()),
|
||||
zap.String("bucket name", bktInfo.Name),
|
||||
zap.String("objID", oldPartID.EncodeToString()))
|
||||
n.reqLogger(ctx).Error("couldn't delete old part object", zap.Error(err),
|
||||
zap.String("cid", bktInfo.CID.EncodeToString()),
|
||||
zap.String("oid", oldPartID.EncodeToString()))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -308,7 +303,7 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
|
|||
})
|
||||
|
||||
if err = pw.CloseWithError(err); err != nil {
|
||||
n.log.Error("could not get object", zap.Error(err))
|
||||
n.reqLogger(ctx).Error("could not get object", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -455,7 +450,7 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
|||
CopiesNumbers: multipartInfo.CopiesNumbers,
|
||||
})
|
||||
if err != nil {
|
||||
n.log.Error("could not put a completed object (multipart upload)",
|
||||
n.reqLogger(ctx).Error("could not put a completed object (multipart upload)",
|
||||
zap.String("uploadID", p.Info.UploadID),
|
||||
zap.String("uploadKey", p.Info.Key),
|
||||
zap.Error(err))
|
||||
|
@ -467,9 +462,8 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
|||
addr.SetContainer(p.Info.Bkt.CID)
|
||||
for _, partInfo := range partsInfo {
|
||||
if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil {
|
||||
n.log.Warn("could not delete upload part",
|
||||
zap.Stringer("object id", &partInfo.OID),
|
||||
zap.Stringer("bucket id", p.Info.Bkt.CID),
|
||||
n.reqLogger(ctx).Warn("could not delete upload part",
|
||||
zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID),
|
||||
zap.Error(err))
|
||||
}
|
||||
addr.SetObject(partInfo.OID)
|
||||
|
@ -547,7 +541,7 @@ func (n *layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
|
|||
|
||||
for _, info := range parts {
|
||||
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil {
|
||||
n.log.Warn("couldn't delete part", zap.String("cid", p.Bkt.CID.EncodeToString()),
|
||||
n.reqLogger(ctx).Warn("couldn't delete part", zap.String("cid", p.Bkt.CID.EncodeToString()),
|
||||
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
@ -564,7 +558,7 @@ func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
|
|||
|
||||
encInfo := FormEncryptionInfo(multipartInfo.Meta)
|
||||
if err = p.Info.Encryption.MatchObjectEncryption(encInfo); err != nil {
|
||||
n.log.Warn("mismatched obj encryptionInfo", zap.Error(err))
|
||||
n.reqLogger(ctx).Warn("mismatched obj encryptionInfo", zap.Error(err))
|
||||
return nil, errors.GetAPIError(errors.ErrInvalidEncryptionParameters)
|
||||
}
|
||||
|
||||
|
@ -628,12 +622,8 @@ func (n *layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.
|
|||
oids[i] = part.OID.EncodeToString()
|
||||
}
|
||||
|
||||
reqInfo := api.GetReqInfo(ctx)
|
||||
n.log.Debug("part details",
|
||||
zap.String("reqId", reqInfo.RequestID),
|
||||
zap.String("bucket", p.Bkt.Name),
|
||||
n.reqLogger(ctx).Debug("part details",
|
||||
zap.Stringer("cid", p.Bkt.CID),
|
||||
zap.String("object", p.Key),
|
||||
zap.String("upload id", p.UploadID),
|
||||
zap.Ints("part numbers", partsNumbers),
|
||||
zap.Strings("oids", oids))
|
||||
|
|
|
@ -47,10 +47,9 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu
|
|||
|
||||
if !objIDToDeleteNotFound {
|
||||
if err = n.objectDelete(ctx, p.BktInfo, objIDToDelete); err != nil {
|
||||
n.log.Error("couldn't delete notification configuration object", zap.Error(err),
|
||||
zap.String("cnrID", p.BktInfo.CID.EncodeToString()),
|
||||
zap.String("bucket name", p.BktInfo.Name),
|
||||
zap.String("objID", objIDToDelete.EncodeToString()))
|
||||
n.reqLogger(ctx).Error("couldn't delete notification configuration object", zap.Error(err),
|
||||
zap.String("cid", p.BktInfo.CID.EncodeToString()),
|
||||
zap.String("oid", objIDToDelete.EncodeToString()))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -240,11 +240,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
|||
return nil, err
|
||||
}
|
||||
|
||||
reqInfo := api.GetReqInfo(ctx)
|
||||
n.log.Debug("put object",
|
||||
zap.String("reqId", reqInfo.RequestID),
|
||||
zap.String("bucket", p.BktInfo.Name), zap.Stringer("cid", p.BktInfo.CID),
|
||||
zap.String("object", p.Object), zap.Stringer("oid", id))
|
||||
n.reqLogger(ctx).Debug("put object", zap.Stringer("cid", p.BktInfo.CID), zap.Stringer("oid", id))
|
||||
|
||||
newVersion := &data.NodeVersion{
|
||||
BaseNodeVersion: data.BaseNodeVersion{
|
||||
|
@ -562,7 +558,8 @@ func nodesGenerator(ctx context.Context, p allObjectParams, nodeVersions []*data
|
|||
}
|
||||
|
||||
func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams, input <-chan *data.NodeVersion) (<-chan *data.ObjectInfo, error) {
|
||||
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{n.log}))
|
||||
reqLog := n.reqLogger(ctx)
|
||||
pool, err := ants.NewPool(size, ants.WithLogger(&logWrapper{reqLog}))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("coudln't init go pool for listing: %w", err)
|
||||
}
|
||||
|
@ -600,7 +597,7 @@ func (n *layer) initWorkerPool(ctx context.Context, size int, p allObjectParams,
|
|||
})
|
||||
if err != nil {
|
||||
wg.Done()
|
||||
n.log.Warn("failed to submit task to pool", zap.Error(err))
|
||||
reqLog.Warn("failed to submit task to pool", zap.Error(err))
|
||||
}
|
||||
}(node)
|
||||
}
|
||||
|
@ -742,7 +739,7 @@ func (n *layer) objectInfoFromObjectsCacheOrFrostFS(ctx context.Context, bktInfo
|
|||
|
||||
meta, err := n.objectHead(ctx, bktInfo, node.OID)
|
||||
if err != nil {
|
||||
n.log.Warn("could not fetch object meta", zap.Error(err))
|
||||
n.reqLogger(ctx).Warn("could not fetch object meta", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
errorsStd "errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
|
@ -178,11 +177,8 @@ func (n *layer) getNodeVersion(ctx context.Context, objVersion *ObjectVersion) (
|
|||
}
|
||||
|
||||
if err == nil && version != nil && !version.IsDeleteMarker() {
|
||||
reqInfo := api.GetReqInfo(ctx)
|
||||
n.log.Debug("target details",
|
||||
zap.String("reqId", reqInfo.RequestID),
|
||||
zap.String("bucket", objVersion.BktInfo.Name), zap.Stringer("cid", objVersion.BktInfo.CID),
|
||||
zap.String("object", objVersion.ObjectName), zap.Stringer("oid", version.OID))
|
||||
n.reqLogger(ctx).Debug("get tree node",
|
||||
zap.Stringer("cid", objVersion.BktInfo.CID), zap.Stringer("oid", version.OID))
|
||||
}
|
||||
|
||||
return version, err
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"sync"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
|
@ -42,10 +43,13 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
// Key used for Get/SetReqInfo.
|
||||
// Key used for custom key/value in context.
|
||||
type contextKeyType string
|
||||
|
||||
const ctxRequestInfo = contextKeyType("FrostFS-S3-GW")
|
||||
const (
|
||||
ctxRequestInfo = contextKeyType("FrostFS-S3-GW")
|
||||
ctxRequestLogger = contextKeyType("FrostFS-S3-GW-Logger")
|
||||
)
|
||||
|
||||
var (
|
||||
// De-facto standard header keys.
|
||||
|
@ -104,7 +108,7 @@ func GetSourceIP(r *http.Request) string {
|
|||
return addr
|
||||
}
|
||||
|
||||
func prepareContext(w http.ResponseWriter, r *http.Request) context.Context {
|
||||
func prepareReqInfo(w http.ResponseWriter, r *http.Request) *ReqInfo {
|
||||
vars := mux.Vars(r)
|
||||
bucket := vars["bucket"]
|
||||
object, err := url.PathUnescape(vars["object"])
|
||||
|
@ -118,13 +122,11 @@ func prepareContext(w http.ResponseWriter, r *http.Request) context.Context {
|
|||
if prefix != "" {
|
||||
object = prefix
|
||||
}
|
||||
return SetReqInfo(r.Context(),
|
||||
// prepare request info
|
||||
NewReqInfo(w, r, ObjectRequest{
|
||||
return NewReqInfo(w, r, ObjectRequest{
|
||||
Bucket: bucket,
|
||||
Object: object,
|
||||
Method: mux.CurrentRoute(r).GetName(),
|
||||
}))
|
||||
})
|
||||
}
|
||||
|
||||
// NewReqInfo returns new ReqInfo based on parameters.
|
||||
|
@ -194,6 +196,7 @@ func SetReqInfo(ctx context.Context, req *ReqInfo) context.Context {
|
|||
}
|
||||
|
||||
// GetReqInfo returns ReqInfo if set.
|
||||
// If ReqInfo isn't set returns new empty ReqInfo.
|
||||
func GetReqInfo(ctx context.Context) *ReqInfo {
|
||||
if ctx == nil {
|
||||
return &ReqInfo{}
|
||||
|
@ -202,3 +205,22 @@ func GetReqInfo(ctx context.Context) *ReqInfo {
|
|||
}
|
||||
return &ReqInfo{}
|
||||
}
|
||||
|
||||
// SetReqLogger sets child zap.Logger in the context.
|
||||
func SetReqLogger(ctx context.Context, log *zap.Logger) context.Context {
|
||||
if ctx == nil {
|
||||
return nil
|
||||
}
|
||||
return context.WithValue(ctx, ctxRequestLogger, log)
|
||||
}
|
||||
|
||||
// GetReqLog returns log if set.
|
||||
// If zap.Logger isn't set returns nil.
|
||||
func GetReqLog(ctx context.Context) *zap.Logger {
|
||||
if ctx == nil {
|
||||
return nil
|
||||
} else if r, ok := ctx.Value(ctxRequestLogger).(*zap.Logger); ok {
|
||||
return r
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -124,26 +124,64 @@ func (lrw *logResponseWriter) Flush() {
|
|||
}
|
||||
}
|
||||
|
||||
func setRequestID(h http.Handler) http.Handler {
|
||||
func prepareRequest(log *zap.Logger) mux.MiddlewareFunc {
|
||||
return func(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// generate random UUIDv4
|
||||
id, _ := uuid.NewRandom()
|
||||
|
||||
// set request id into response header
|
||||
// also we have to set request id here
|
||||
// to be able to get it in prepareReqInfo
|
||||
w.Header().Set(hdrAmzRequestID, id.String())
|
||||
|
||||
// set request info into context
|
||||
reqInfo := prepareReqInfo(w, r)
|
||||
r = r.WithContext(SetReqInfo(r.Context(), reqInfo))
|
||||
|
||||
// set request id into gRPC meta header
|
||||
r = r.WithContext(metadata.AppendToOutgoingContext(
|
||||
r.Context(), hdrAmzRequestID, id.String(),
|
||||
r.Context(), hdrAmzRequestID, reqInfo.RequestID,
|
||||
))
|
||||
|
||||
// set request info into context
|
||||
r = r.WithContext(prepareContext(w, r))
|
||||
// set request scoped child logger into context
|
||||
additionalFields := []zap.Field{zap.String("request_id", reqInfo.RequestID),
|
||||
zap.String("method", reqInfo.API), zap.String("bucket", reqInfo.BucketName)}
|
||||
|
||||
if isObjectRequest(reqInfo) {
|
||||
additionalFields = append(additionalFields, zap.String("object", reqInfo.ObjectName))
|
||||
}
|
||||
reqLogger := log.With(additionalFields...)
|
||||
|
||||
r = r.WithContext(SetReqLogger(r.Context(), reqLogger))
|
||||
|
||||
reqLogger.Info("request start", zap.String("host", r.Host),
|
||||
zap.String("remote_host", reqInfo.RemoteHost))
|
||||
|
||||
// continue execution
|
||||
h.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
var objectMethods = []string{
|
||||
"HeadObject", "GetObject", "DeleteObject", "PutObject", "PostObject", "CopyObject",
|
||||
"CreateMultipartUpload", "UploadPartCopy", "UploadPart", "ListObjectParts",
|
||||
"CompleteMultipartUpload", "AbortMultipartUpload",
|
||||
"PutObjectACL", "GetObjectACL",
|
||||
"PutObjectTagging", "GetObjectTagging", "DeleteObjectTagging",
|
||||
"PutObjectRetention", "GetObjectRetention", "PutObjectLegalHold", "getobjectlegalhold",
|
||||
"SelectObjectContent", "GetObjectAttributes",
|
||||
}
|
||||
|
||||
func isObjectRequest(info *ReqInfo) bool {
|
||||
for _, method := range objectMethods {
|
||||
if info.API == method {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func appendCORS(handler Handler) mux.MiddlewareFunc {
|
||||
return func(h http.Handler) http.Handler {
|
||||
|
@ -173,10 +211,7 @@ func resolveCID(log *zap.Logger, resolveBucket BucketResolveFunc) CIDResolveFunc
|
|||
|
||||
bktInfo, err := resolveBucket(ctx, reqInfo.BucketName)
|
||||
if err != nil {
|
||||
log.Debug("failed to resolve CID",
|
||||
zap.String("request_id", reqInfo.RequestID), zap.String("method", reqInfo.API),
|
||||
zap.String("bucket", reqInfo.BucketName), zap.String("object", reqInfo.ObjectName),
|
||||
zap.Error(err))
|
||||
reqLogOrDefault(ctx, log).Debug("failed to resolve CID", zap.Error(err))
|
||||
return ""
|
||||
}
|
||||
|
||||
|
@ -188,7 +223,8 @@ func logSuccessResponse(l *zap.Logger) mux.MiddlewareFunc {
|
|||
return func(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
lw := &logResponseWriter{ResponseWriter: w}
|
||||
reqInfo := GetReqInfo(r.Context())
|
||||
|
||||
reqLogger := reqLogOrDefault(r.Context(), l)
|
||||
|
||||
// pass execution:
|
||||
h.ServeHTTP(lw, r)
|
||||
|
@ -198,13 +234,8 @@ func logSuccessResponse(l *zap.Logger) mux.MiddlewareFunc {
|
|||
return
|
||||
}
|
||||
|
||||
l.Info("call method",
|
||||
reqLogger.Info("request end",
|
||||
zap.Int("status", lw.statusCode),
|
||||
zap.String("host", r.Host),
|
||||
zap.String("request_id", GetRequestID(r.Context())),
|
||||
zap.String("method", mux.CurrentRoute(r).GetName()),
|
||||
zap.String("bucket", reqInfo.BucketName),
|
||||
zap.String("object", reqInfo.ObjectName),
|
||||
zap.String("description", http.StatusText(lw.statusCode)))
|
||||
})
|
||||
}
|
||||
|
@ -253,7 +284,7 @@ func Attach(r *mux.Router, domains []string, m MaxClients, h Handler, center aut
|
|||
|
||||
api.Use(
|
||||
// -- prepare request
|
||||
setRequestID,
|
||||
prepareRequest(log),
|
||||
|
||||
// Attach user authentication for all S3 routes.
|
||||
AuthMiddleware(log, center),
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
)
|
||||
|
||||
// TracingMiddleware adds tracing support for requests.
|
||||
// Must be placed after prepareRequest middleware.
|
||||
func TracingMiddleware() mux.MiddlewareFunc {
|
||||
return func(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
|
@ -111,6 +112,7 @@ func StartHTTPServerSpan(r *http.Request, operationName string, opts ...trace.Sp
|
|||
opts = append(opts, trace.WithAttributes(
|
||||
attribute.String("s3.client_address", r.RemoteAddr),
|
||||
attribute.String("s3.path", r.Host),
|
||||
attribute.String("s3.request_id", GetRequestID(r.Context())),
|
||||
semconv.HTTPMethod(r.Method),
|
||||
semconv.RPCService("frostfs-s3-gw"),
|
||||
attribute.String("s3.query", r.RequestURI),
|
||||
|
|
|
@ -27,10 +27,10 @@ func AuthMiddleware(log *zap.Logger, center auth.Center) mux.MiddlewareFunc {
|
|||
box, err := center.Authenticate(r)
|
||||
if err != nil {
|
||||
if err == auth.ErrNoAuthorizationHeader {
|
||||
log.Debug("couldn't receive access box for gate key, random key will be used")
|
||||
reqLogOrDefault(ctx, log).Debug("couldn't receive access box for gate key, random key will be used")
|
||||
ctx = r.Context()
|
||||
} else {
|
||||
log.Error("failed to pass authentication", zap.Error(err))
|
||||
reqLogOrDefault(ctx, log).Error("failed to pass authentication", zap.Error(err))
|
||||
if _, ok := err.(errors.Error); !ok {
|
||||
err = errors.GetAPIError(errors.ErrAccessDenied)
|
||||
}
|
||||
|
@ -48,3 +48,11 @@ func AuthMiddleware(log *zap.Logger, center auth.Center) mux.MiddlewareFunc {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func reqLogOrDefault(ctx context.Context, log *zap.Logger) *zap.Logger {
|
||||
reqLog := GetReqLog(ctx)
|
||||
if reqLog != nil {
|
||||
return reqLog
|
||||
}
|
||||
return log
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -825,7 +826,7 @@ func (c *Tree) getUnversioned(ctx context.Context, bktInfo *data.BucketInfo, tre
|
|||
}
|
||||
|
||||
if len(nodes) > 1 {
|
||||
c.log.Debug("found more than one unversioned node", zap.Stringer("cid", bktInfo.CID),
|
||||
c.reqLogger(ctx).Debug("found more than one unversioned node",
|
||||
zap.String("treeID", treeID), zap.String("filepath", filepath))
|
||||
}
|
||||
|
||||
|
@ -1190,6 +1191,14 @@ func (c *Tree) getNode(ctx context.Context, bktInfo *data.BucketInfo, treeID str
|
|||
return newTreeNode(nodes[0])
|
||||
}
|
||||
|
||||
func (c *Tree) reqLogger(ctx context.Context) *zap.Logger {
|
||||
reqLogger := api.GetReqLog(ctx)
|
||||
if reqLogger != nil {
|
||||
return reqLogger
|
||||
}
|
||||
return c.log
|
||||
}
|
||||
|
||||
func parseLockConfiguration(value string) (*data.ObjectLockConfiguration, error) {
|
||||
result := &data.ObjectLockConfiguration{}
|
||||
if len(value) == 0 {
|
||||
|
|
Loading…
Reference in a new issue