[#191] Refactor error handling and logging
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
0f73da258b
commit
458bf933fc
17 changed files with 327 additions and 398 deletions
|
@ -50,44 +50,41 @@ func (pr *putResponse) encode(w io.Writer) error {
|
|||
}
|
||||
|
||||
// Upload handles multipart upload request.
|
||||
func (h *Handler) Upload(c *fasthttp.RequestCtx) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.Upload")
|
||||
func (h *Handler) Upload(req *fasthttp.RequestCtx) {
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(req), "handler.Upload")
|
||||
defer span.End()
|
||||
utils.SetContextToRequest(ctx, c)
|
||||
|
||||
var file MultipartFile
|
||||
|
||||
scid, _ := c.UserValue("cid").(string)
|
||||
bodyStream := c.RequestBodyStream()
|
||||
scid, _ := req.UserValue("cid").(string)
|
||||
bodyStream := req.RequestBodyStream()
|
||||
drainBuf := make([]byte, drainBufSize)
|
||||
|
||||
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
||||
log := reqLog.With(zap.String("cid", scid))
|
||||
log := h.reqLogger(ctx)
|
||||
ctx = utils.SetReqLog(ctx, log.With(zap.String("cid", scid)))
|
||||
|
||||
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
||||
bktInfo, err := h.getBucketInfo(ctx, scid)
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
h.logAndSendError(ctx, req, logs.FailedToGetBucketInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
boundary := string(c.Request.Header.MultipartFormBoundary())
|
||||
boundary := string(req.Request.Header.MultipartFormBoundary())
|
||||
if file, err = fetchMultipartFile(log, bodyStream, boundary); err != nil {
|
||||
log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
ResponseError(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
h.logAndSendError(ctx, req, logs.CouldNotReceiveMultipartForm, err)
|
||||
return
|
||||
}
|
||||
|
||||
filtered, err := filterHeaders(log, &c.Request.Header)
|
||||
filtered, err := filterHeaders(log, &req.Request.Header)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToFilterHeaders, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
ResponseError(c, err.Error(), fasthttp.StatusBadRequest)
|
||||
h.logAndSendError(ctx, req, logs.FailedToFilterHeaders, err)
|
||||
return
|
||||
}
|
||||
|
||||
if c.Request.Header.Peek(explodeArchiveHeader) != nil {
|
||||
h.explodeArchive(request{c, log}, bktInfo, file, filtered)
|
||||
if req.Request.Header.Peek(explodeArchiveHeader) != nil {
|
||||
h.explodeArchive(ctx, req, bktInfo, file, filtered)
|
||||
} else {
|
||||
h.uploadSingleObject(request{c, log}, bktInfo, file, filtered)
|
||||
h.uploadSingleObject(ctx, req, bktInfo, file, filtered)
|
||||
}
|
||||
|
||||
// Multipart is multipart and thus can contain more than one part which
|
||||
|
@ -104,46 +101,39 @@ func (h *Handler) Upload(c *fasthttp.RequestCtx) {
|
|||
}
|
||||
}
|
||||
|
||||
func (h *Handler) uploadSingleObject(req request, bkt *data.BucketInfo, file MultipartFile, filtered map[string]string) {
|
||||
c, log := req.RequestCtx, req.log
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.uploadSingleObject")
|
||||
func (h *Handler) uploadSingleObject(ctx context.Context, req *fasthttp.RequestCtx, bkt *data.BucketInfo, file MultipartFile, filtered map[string]string) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "handler.uploadSingleObject")
|
||||
defer span.End()
|
||||
utils.SetContextToRequest(ctx, c)
|
||||
|
||||
setIfNotExist(filtered, object.AttributeFileName, file.FileName())
|
||||
|
||||
attributes, err := h.extractAttributes(c, log, filtered)
|
||||
attributes, err := h.extractAttributes(ctx, req, filtered)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToGetAttributes, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
ResponseError(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
h.logAndSendError(ctx, req, logs.FailedToGetAttributes, err)
|
||||
return
|
||||
}
|
||||
|
||||
idObj, err := h.uploadObject(c, bkt, attributes, file)
|
||||
idObj, err := h.uploadObject(ctx, bkt, attributes, file)
|
||||
if err != nil {
|
||||
h.handlePutFrostFSErr(c, err, log)
|
||||
h.logAndSendError(ctx, req, logs.FailedToUploadObject, err)
|
||||
return
|
||||
}
|
||||
log.Debug(logs.ObjectUploaded,
|
||||
h.reqLogger(ctx).Debug(logs.ObjectUploaded,
|
||||
zap.String("oid", idObj.EncodeToString()),
|
||||
zap.String("FileName", file.FileName()),
|
||||
logs.TagField(logs.TagExternalStorage),
|
||||
)
|
||||
|
||||
addr := newAddress(bkt.CID, idObj)
|
||||
c.Response.Header.SetContentType(jsonHeader)
|
||||
req.Response.Header.SetContentType(jsonHeader)
|
||||
// Try to return the response, otherwise, if something went wrong, throw an error.
|
||||
if err = newPutResponse(addr).encode(c); err != nil {
|
||||
log.Error(logs.CouldNotEncodeResponse, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
ResponseError(c, "could not encode response", fasthttp.StatusBadRequest)
|
||||
if err = newPutResponse(addr).encode(req); err != nil {
|
||||
h.logAndSendError(ctx, req, logs.CouldNotEncodeResponse, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) uploadObject(c *fasthttp.RequestCtx, bkt *data.BucketInfo, attrs []object.Attribute, file io.Reader) (oid.ID, error) {
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
|
||||
func (h *Handler) uploadObject(ctx context.Context, bkt *data.BucketInfo, attrs []object.Attribute, file io.Reader) (oid.ID, error) {
|
||||
obj := object.New()
|
||||
obj.SetContainerID(bkt.CID)
|
||||
obj.SetOwnerID(*h.ownerID)
|
||||
|
@ -168,19 +158,18 @@ func (h *Handler) uploadObject(c *fasthttp.RequestCtx, bkt *data.BucketInfo, att
|
|||
return idObj, nil
|
||||
}
|
||||
|
||||
func (h *Handler) extractAttributes(c *fasthttp.RequestCtx, log *zap.Logger, filtered map[string]string) ([]object.Attribute, error) {
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
func (h *Handler) extractAttributes(ctx context.Context, req *fasthttp.RequestCtx, filtered map[string]string) ([]object.Attribute, error) {
|
||||
now := time.Now()
|
||||
if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
|
||||
if rawHeader := req.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
|
||||
if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil {
|
||||
log.Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err),
|
||||
h.reqLogger(ctx).Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err),
|
||||
logs.TagField(logs.TagDatapath))
|
||||
} else {
|
||||
now = parsed
|
||||
}
|
||||
}
|
||||
if err := utils.PrepareExpirationHeader(ctx, h.frostfs, filtered, now); err != nil {
|
||||
log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
h.reqLogger(ctx).Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
return nil, err
|
||||
}
|
||||
attributes := make([]object.Attribute, 0, len(filtered))
|
||||
|
@ -207,38 +196,33 @@ func newAttribute(key string, val string) object.Attribute {
|
|||
|
||||
// explodeArchive read files from archive and creates objects for each of them.
|
||||
// Sets FilePath attribute with name from tar.Header.
|
||||
func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.ReadCloser, filtered map[string]string) {
|
||||
c, log := req.RequestCtx, req.log
|
||||
|
||||
ctx, span := tracing.StartSpanFromContext(utils.GetContextFromRequest(c), "handler.explodeArchive")
|
||||
func (h *Handler) explodeArchive(ctx context.Context, req *fasthttp.RequestCtx, bkt *data.BucketInfo, file io.ReadCloser, filtered map[string]string) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "handler.explodeArchive")
|
||||
defer span.End()
|
||||
utils.SetContextToRequest(ctx, c)
|
||||
|
||||
// remove user attributes which vary for each file in archive
|
||||
// to guarantee that they won't appear twice
|
||||
delete(filtered, object.AttributeFileName)
|
||||
delete(filtered, object.AttributeFilePath)
|
||||
|
||||
commonAttributes, err := h.extractAttributes(c, log, filtered)
|
||||
commonAttributes, err := h.extractAttributes(ctx, req, filtered)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToGetAttributes, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
ResponseError(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
h.logAndSendError(ctx, req, logs.FailedToGetAttributes, err)
|
||||
return
|
||||
}
|
||||
attributes := commonAttributes
|
||||
|
||||
reader := file
|
||||
if bytes.EqualFold(c.Request.Header.Peek(fasthttp.HeaderContentEncoding), []byte("gzip")) {
|
||||
log.Debug(logs.GzipReaderSelected, logs.TagField(logs.TagDatapath))
|
||||
if bytes.EqualFold(req.Request.Header.Peek(fasthttp.HeaderContentEncoding), []byte("gzip")) {
|
||||
h.reqLogger(ctx).Debug(logs.GzipReaderSelected, logs.TagField(logs.TagDatapath))
|
||||
gzipReader, err := gzip.NewReader(file)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToCreateGzipReader, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
ResponseError(c, "could read gzip file: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
h.logAndSendError(ctx, req, logs.FailedToCreateGzipReader, err)
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
if err := gzipReader.Close(); err != nil {
|
||||
log.Warn(logs.FailedToCloseReader, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
h.reqLogger(ctx).Warn(logs.FailedToCloseReader, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
}
|
||||
}()
|
||||
reader = gzipReader
|
||||
|
@ -250,8 +234,7 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
|
|||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
} else if err != nil {
|
||||
log.Error(logs.FailedToReadFileFromTar, zap.Error(err), logs.TagField(logs.TagDatapath))
|
||||
ResponseError(c, "could not get next entry: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
h.logAndSendError(ctx, req, logs.FailedToReadFileFromTar, err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -265,13 +248,13 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
|
|||
attributes = append(attributes, newAttribute(object.AttributeFilePath, obj.Name))
|
||||
attributes = append(attributes, newAttribute(object.AttributeFileName, fileName))
|
||||
|
||||
idObj, err := h.uploadObject(c, bkt, attributes, tarReader)
|
||||
idObj, err := h.uploadObject(ctx, bkt, attributes, tarReader)
|
||||
if err != nil {
|
||||
h.handlePutFrostFSErr(c, err, log)
|
||||
h.logAndSendError(ctx, req, logs.FailedToUploadObject, err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug(logs.ObjectUploaded,
|
||||
h.reqLogger(ctx).Debug(logs.ObjectUploaded,
|
||||
zap.String("oid", idObj.EncodeToString()),
|
||||
zap.String("FileName", fileName),
|
||||
logs.TagField(logs.TagExternalStorage),
|
||||
|
@ -279,14 +262,6 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
|
|||
}
|
||||
}
|
||||
|
||||
func (h *Handler) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error, log *zap.Logger) {
|
||||
statusCode, msg, additionalFields := formErrorResponse("could not store file in frostfs", err)
|
||||
logFields := append([]zap.Field{zap.Error(err)}, additionalFields...)
|
||||
|
||||
log.Error(logs.CouldNotStoreFileInFrostfs, append(logFields, logs.TagField(logs.TagExternalStorage))...)
|
||||
ResponseError(r, msg, statusCode)
|
||||
}
|
||||
|
||||
func (h *Handler) fetchBearerToken(ctx context.Context) *bearer.Token {
|
||||
if tkn, err := tokens.LoadBearerToken(ctx); err == nil && tkn != nil {
|
||||
return tkn
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue