feature/328-add_log_ignored_errors-support #331
11 changed files with 154 additions and 63 deletions
|
@ -66,7 +66,10 @@ func (h *handler) PutBucketCorsHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
middleware.WriteSuccessResponseHeadersOnly(w)
|
if err = middleware.WriteSuccessResponseHeadersOnly(w); err != nil {
|
||||||
|
h.logAndSendError(w, "write response", reqInfo, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handler) DeleteBucketCorsHandler(w http.ResponseWriter, r *http.Request) {
|
func (h *handler) DeleteBucketCorsHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
@ -200,7 +203,10 @@ func (h *handler) Preflight(w http.ResponseWriter, r *http.Request) {
|
||||||
if o != wildcard {
|
if o != wildcard {
|
||||||
w.Header().Set(api.AccessControlAllowCredentials, "true")
|
w.Header().Set(api.AccessControlAllowCredentials, "true")
|
||||||
}
|
}
|
||||||
middleware.WriteSuccessResponseHeadersOnly(w)
|
if err = middleware.WriteSuccessResponseHeadersOnly(w); err != nil {
|
||||||
|
h.logAndSendError(w, "write response", reqInfo, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -140,7 +140,10 @@ func (h *handler) HeadBucketHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
w.Header().Set(api.ContainerZone, bktInfo.Zone)
|
w.Header().Set(api.ContainerZone, bktInfo.Zone)
|
||||||
}
|
}
|
||||||
|
|
||||||
middleware.WriteResponse(w, http.StatusOK, nil, middleware.MimeNone)
|
if err = middleware.WriteResponse(w, http.StatusOK, nil, middleware.MimeNone); err != nil {
|
||||||
|
h.logAndSendError(w, "write response", reqInfo, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handler) setLockingHeaders(bktInfo *data.BucketInfo, lockInfo data.LockInfo, header http.Header) error {
|
func (h *handler) setLockingHeaders(bktInfo *data.BucketInfo, lockInfo data.LockInfo, header http.Header) error {
|
||||||
|
|
|
@ -266,7 +266,10 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Header().Set(api.ETag, data.Quote(hash))
|
w.Header().Set(api.ETag, data.Quote(hash))
|
||||||
middleware.WriteSuccessResponseHeadersOnly(w)
|
if err = middleware.WriteSuccessResponseHeadersOnly(w); err != nil {
|
||||||
|
h.logAndSendError(w, "write response", reqInfo, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
|
func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
|
@ -337,7 +337,10 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
w.Header().Set(api.ETag, data.Quote(objInfo.ETag(h.cfg.MD5Enabled())))
|
w.Header().Set(api.ETag, data.Quote(objInfo.ETag(h.cfg.MD5Enabled())))
|
||||||
|
|
||||||
middleware.WriteSuccessResponseHeadersOnly(w)
|
if err = middleware.WriteSuccessResponseHeadersOnly(w); err != nil {
|
||||||
|
h.logAndSendError(w, "write response", reqInfo, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handler) getBodyReader(r *http.Request) (io.ReadCloser, error) {
|
func (h *handler) getBodyReader(r *http.Request) (io.ReadCloser, error) {
|
||||||
|
@ -602,7 +605,11 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
||||||
ETag: data.Quote(objInfo.ETag(h.cfg.MD5Enabled())),
|
ETag: data.Quote(objInfo.ETag(h.cfg.MD5Enabled())),
|
||||||
}
|
}
|
||||||
w.WriteHeader(status)
|
w.WriteHeader(status)
|
||||||
if _, err = w.Write(middleware.EncodeResponse(resp)); err != nil {
|
respData, err := middleware.EncodeResponse(resp)
|
||||||
|
if err != nil {
|
||||||
|
h.logAndSendError(w, "encode response", reqInfo, err)
|
||||||
|
}
|
||||||
|
if _, err = w.Write(respData); err != nil {
|
||||||
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
@ -813,7 +820,7 @@ func (h *handler) parseCommonCreateBucketParams(reqInfo *middleware.ReqInfo, box
|
||||||
return nil, nil, fmt.Errorf("couldn't set placement policy: %w", err)
|
return nil, nil, fmt.Errorf("couldn't set placement policy: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.ObjectLockEnabled = isLockEnabled(r.Header)
|
p.ObjectLockEnabled = isLockEnabled(h.reqLogger(r.Context()), r.Header)
|
||||||
|
|
||||||
return key, p, nil
|
return key, p, nil
|
||||||
}
|
}
|
||||||
|
@ -873,7 +880,10 @@ func (h *handler) createBucketHandlerPolicy(w http.ResponseWriter, r *http.Reque
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
middleware.WriteSuccessResponseHeadersOnly(w)
|
if err = middleware.WriteSuccessResponseHeadersOnly(w); err != nil {
|
||||||
|
h.logAndSendError(w, "write response", reqInfo, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handler) createBucketHandlerACL(w http.ResponseWriter, r *http.Request) {
|
func (h *handler) createBucketHandlerACL(w http.ResponseWriter, r *http.Request) {
|
||||||
|
@ -942,7 +952,10 @@ func (h *handler) createBucketHandlerACL(w http.ResponseWriter, r *http.Request)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
middleware.WriteSuccessResponseHeadersOnly(w)
|
if err = middleware.WriteSuccessResponseHeadersOnly(w); err != nil {
|
||||||
|
h.logAndSendError(w, "write response", reqInfo, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const s3ActionPrefix = "s3:"
|
const s3ActionPrefix = "s3:"
|
||||||
|
@ -1096,9 +1109,17 @@ func (h handler) setPlacementPolicy(prm *layer.CreateBucketParams, namespace, lo
|
||||||
return errors.GetAPIError(errors.ErrInvalidLocationConstraint)
|
return errors.GetAPIError(errors.ErrInvalidLocationConstraint)
|
||||||
}
|
}
|
||||||
|
|
||||||
func isLockEnabled(header http.Header) bool {
|
func isLockEnabled(log *zap.Logger, header http.Header) bool {
|
||||||
lockEnabledStr := header.Get(api.AmzBucketObjectLockEnabled)
|
lockEnabledStr := header.Get(api.AmzBucketObjectLockEnabled)
|
||||||
lockEnabled, _ := strconv.ParseBool(lockEnabledStr)
|
if len(lockEnabledStr) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
lockEnabled, err := strconv.ParseBool(lockEnabledStr)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn(logs.InvalidBucketObjectLockEnabledHeader, zap.String("header", lockEnabledStr), zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
return lockEnabled
|
return lockEnabled
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,9 +31,12 @@ func (h *handler) reqLogger(ctx context.Context) *zap.Logger {
|
||||||
|
|
||||||
func (h *handler) logAndSendError(w http.ResponseWriter, logText string, reqInfo *middleware.ReqInfo, err error, additional ...zap.Field) {
|
func (h *handler) logAndSendError(w http.ResponseWriter, logText string, reqInfo *middleware.ReqInfo, err error, additional ...zap.Field) {
|
||||||
err = handleDeleteMarker(w, err)
|
err = handleDeleteMarker(w, err)
|
||||||
code := middleware.WriteErrorResponse(w, reqInfo, transformToS3Error(err))
|
if code, wrErr := middleware.WriteErrorResponse(w, reqInfo, transformToS3Error(err)); wrErr != nil {
|
||||||
|
additional = append(additional, zap.NamedError("write_response_error", wrErr))
|
||||||
|
} else {
|
||||||
|
additional = append(additional, zap.Int("status", code))
|
||||||
|
}
|
||||||
fields := []zap.Field{
|
fields := []zap.Field{
|
||||||
zap.Int("status", code),
|
|
||||||
zap.String("request_id", reqInfo.RequestID),
|
zap.String("request_id", reqInfo.RequestID),
|
||||||
zap.String("method", reqInfo.API),
|
zap.String("method", reqInfo.API),
|
||||||
zap.String("bucket", reqInfo.BucketName),
|
zap.String("bucket", reqInfo.BucketName),
|
||||||
|
|
|
@ -59,7 +59,9 @@ func Auth(center Center, log *zap.Logger) Func {
|
||||||
if _, ok := err.(apiErrors.Error); !ok {
|
if _, ok := err.(apiErrors.Error); !ok {
|
||||||
err = apiErrors.GetAPIError(apiErrors.ErrAccessDenied)
|
err = apiErrors.GetAPIError(apiErrors.ErrAccessDenied)
|
||||||
}
|
}
|
||||||
WriteErrorResponse(w, GetReqInfo(r.Context()), err)
|
if _, wrErr := WriteErrorResponse(w, GetReqInfo(r.Context()), err); wrErr != nil {
|
||||||
|
reqLogOrDefault(ctx, log).Error(logs.FailedToWriteResponse, zap.Error(wrErr))
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -97,7 +99,9 @@ func FrostfsIDValidation(frostfsID FrostFSIDValidator, log *zap.Logger) Func {
|
||||||
|
|
||||||
if err = validateBearerToken(frostfsID, bd.Gate.BearerToken); err != nil {
|
if err = validateBearerToken(frostfsID, bd.Gate.BearerToken); err != nil {
|
||||||
reqLogOrDefault(ctx, log).Error(logs.FrostfsIDValidationFailed, zap.Error(err))
|
reqLogOrDefault(ctx, log).Error(logs.FrostfsIDValidationFailed, zap.Error(err))
|
||||||
WriteErrorResponse(w, GetReqInfo(r.Context()), err)
|
if _, wrErr := WriteErrorResponse(w, GetReqInfo(r.Context()), err); wrErr != nil {
|
||||||
|
reqLogOrDefault(ctx, log).Error(logs.FailedToWriteResponse, zap.Error(wrErr))
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -47,7 +47,9 @@ func PolicyCheck(cfg PolicyConfig) Func {
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
if err := policyCheck(r, cfg); err != nil {
|
if err := policyCheck(r, cfg); err != nil {
|
||||||
reqLogOrDefault(ctx, cfg.Log).Error(logs.PolicyValidationFailed, zap.Error(err))
|
reqLogOrDefault(ctx, cfg.Log).Error(logs.PolicyValidationFailed, zap.Error(err))
|
||||||
WriteErrorResponse(w, GetReqInfo(ctx), err)
|
if _, wrErr := WriteErrorResponse(w, GetReqInfo(ctx), err); wrErr != nil {
|
||||||
|
reqLogOrDefault(ctx, cfg.Log).Error(logs.FailedToWriteResponse, zap.Error(wrErr))
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -118,7 +118,8 @@ var s3ErrorResponseMap = map[string]string{
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteErrorResponse writes error headers.
|
// WriteErrorResponse writes error headers.
|
||||||
func WriteErrorResponse(w http.ResponseWriter, reqInfo *ReqInfo, err error) int {
|
// returns http error code and error in case of failure of response writing.
|
||||||
|
func WriteErrorResponse(w http.ResponseWriter, reqInfo *ReqInfo, err error) (int, error) {
|
||||||
code := http.StatusInternalServerError
|
code := http.StatusInternalServerError
|
||||||
|
|
||||||
if e, ok := err.(errors.Error); ok {
|
if e, ok := err.(errors.Error); ok {
|
||||||
|
@ -134,9 +135,14 @@ func WriteErrorResponse(w http.ResponseWriter, reqInfo *ReqInfo, err error) int
|
||||||
|
|
||||||
// Generates error response.
|
// Generates error response.
|
||||||
errorResponse := getAPIErrorResponse(reqInfo, err)
|
errorResponse := getAPIErrorResponse(reqInfo, err)
|
||||||
encodedErrorResponse := EncodeResponse(errorResponse)
|
encodedErrorResponse, err := EncodeResponse(errorResponse)
|
||||||
WriteResponse(w, code, encodedErrorResponse, MimeXML)
|
if err != nil {
|
||||||
return code
|
return 0, fmt.Errorf("encode response: %w", err)
|
||||||
|
}
|
||||||
|
if err = WriteResponse(w, code, encodedErrorResponse, MimeXML); err != nil {
|
||||||
|
return 0, fmt.Errorf("write response: %w", err)
|
||||||
|
}
|
||||||
|
return code, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write http common headers.
|
// Write http common headers.
|
||||||
|
@ -157,7 +163,7 @@ func removeSensitiveHeaders(h http.Header) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteResponse writes given statusCode and response into w (with mType header if set).
|
// WriteResponse writes given statusCode and response into w (with mType header if set).
|
||||||
func WriteResponse(w http.ResponseWriter, statusCode int, response []byte, mType mimeType) {
|
func WriteResponse(w http.ResponseWriter, statusCode int, response []byte, mType mimeType) error {
|
||||||
setCommonHeaders(w)
|
setCommonHeaders(w)
|
||||||
if mType != MimeNone {
|
if mType != MimeNone {
|
||||||
w.Header().Set(hdrContentType, string(mType))
|
w.Header().Set(hdrContentType, string(mType))
|
||||||
|
@ -165,37 +171,46 @@ func WriteResponse(w http.ResponseWriter, statusCode int, response []byte, mType
|
||||||
w.Header().Set(hdrContentLength, strconv.Itoa(len(response)))
|
w.Header().Set(hdrContentLength, strconv.Itoa(len(response)))
|
||||||
w.WriteHeader(statusCode)
|
w.WriteHeader(statusCode)
|
||||||
if response == nil {
|
if response == nil {
|
||||||
return
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
WriteResponseBody(w, response)
|
return WriteResponseBody(w, response)
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteResponseBody writes response into w.
|
// WriteResponseBody writes response into w.
|
||||||
func WriteResponseBody(w http.ResponseWriter, response []byte) {
|
func WriteResponseBody(w http.ResponseWriter, response []byte) error {
|
||||||
_, _ = w.Write(response)
|
if _, err := w.Write(response); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
if flusher, ok := w.(http.Flusher); ok {
|
if flusher, ok := w.(http.Flusher); ok {
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// EncodeResponse encodes the response headers into XML format.
|
// EncodeResponse encodes the response headers into XML format.
|
||||||
func EncodeResponse(response interface{}) []byte {
|
func EncodeResponse(response interface{}) ([]byte, error) {
|
||||||
var bytesBuffer bytes.Buffer
|
var bytesBuffer bytes.Buffer
|
||||||
bytesBuffer.WriteString(xml.Header)
|
bytesBuffer.WriteString(xml.Header)
|
||||||
_ = xml.
|
if err := xml.NewEncoder(&bytesBuffer).Encode(response); err != nil {
|
||||||
NewEncoder(&bytesBuffer).
|
return nil, err
|
||||||
Encode(response)
|
}
|
||||||
return bytesBuffer.Bytes()
|
|
||||||
|
return bytesBuffer.Bytes(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// EncodeResponseNoHeader encodes response without setting xml.Header.
|
// EncodeResponseNoHeader encodes response without setting xml.Header.
|
||||||
// Should be used with periodicXMLWriter which sends xml.Header to the client
|
// Should be used with periodicXMLWriter which sends xml.Header to the client
|
||||||
// with whitespaces to keep connection alive.
|
// with whitespaces to keep connection alive.
|
||||||
func EncodeResponseNoHeader(response interface{}) []byte {
|
func EncodeResponseNoHeader(response interface{}) ([]byte, error) {
|
||||||
var bytesBuffer bytes.Buffer
|
var bytesBuffer bytes.Buffer
|
||||||
_ = xml.NewEncoder(&bytesBuffer).Encode(response)
|
if err := xml.NewEncoder(&bytesBuffer).Encode(response); err != nil {
|
||||||
return bytesBuffer.Bytes()
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return bytesBuffer.Bytes(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// EncodeToResponse encodes the response into ResponseWriter.
|
// EncodeToResponse encodes the response into ResponseWriter.
|
||||||
|
@ -227,8 +242,8 @@ func EncodeToResponseNoHeader(w http.ResponseWriter, response interface{}) error
|
||||||
|
|
||||||
// WriteSuccessResponseHeadersOnly writes HTTP (200) OK response with no data
|
// WriteSuccessResponseHeadersOnly writes HTTP (200) OK response with no data
|
||||||
// to the client.
|
// to the client.
|
||||||
func WriteSuccessResponseHeadersOnly(w http.ResponseWriter) {
|
func WriteSuccessResponseHeadersOnly(w http.ResponseWriter) error {
|
||||||
WriteResponse(w, http.StatusOK, nil, MimeNone)
|
return WriteResponse(w, http.StatusOK, nil, MimeNone)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Error -- Returns S3 error string.
|
// Error -- Returns S3 error string.
|
||||||
|
|
|
@ -178,14 +178,24 @@ func errorResponseHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
reqInfo := s3middleware.GetReqInfo(ctx)
|
reqInfo := s3middleware.GetReqInfo(ctx)
|
||||||
|
|
||||||
desc := fmt.Sprintf("Unknown API request at %s", r.URL.Path)
|
desc := fmt.Sprintf("Unknown API request at %s", r.URL.Path)
|
||||||
s3middleware.WriteErrorResponse(w, reqInfo, errors.Error{
|
_, wrErr := s3middleware.WriteErrorResponse(w, reqInfo, errors.Error{
|
||||||
Code: "UnknownAPIRequest",
|
Code: "UnknownAPIRequest",
|
||||||
Description: desc,
|
Description: desc,
|
||||||
HTTPStatusCode: http.StatusBadRequest,
|
HTTPStatusCode: http.StatusBadRequest,
|
||||||
})
|
})
|
||||||
|
|
||||||
if log := s3middleware.GetReqLog(ctx); log != nil {
|
if log := s3middleware.GetReqLog(ctx); log != nil {
|
||||||
log.Error(logs.RequestUnmatched, zap.String("method", reqInfo.API), zap.String("http method", r.Method), zap.String("url", r.RequestURI))
|
fields := []zap.Field{
|
||||||
|
zap.String("method", reqInfo.API),
|
||||||
|
zap.String("http method", r.Method),
|
||||||
|
zap.String("url", r.RequestURI),
|
||||||
|
}
|
||||||
|
|
||||||
|
if wrErr != nil {
|
||||||
|
fields = append(fields, zap.NamedError("write_response_error", wrErr))
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Error(logs.RequestUnmatched, fields...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -147,4 +147,7 @@ const (
|
||||||
SuccessfulAuth = "successful auth"
|
SuccessfulAuth = "successful auth"
|
||||||
PolicyRequest = "policy request"
|
PolicyRequest = "policy request"
|
||||||
FailedToGenerateRequestID = "failed to generate request id"
|
FailedToGenerateRequestID = "failed to generate request id"
|
||||||
|
InvalidBucketObjectLockEnabledHeader = "invalid X-Amz-Bucket-Object-Lock-Enabled header"
|
||||||
|
InvalidTreeKV = "invalid tree service meta KV"
|
||||||
|
FailedToWriteResponse = "failed to write response"
|
||||||
)
|
)
|
||||||
|
|
|
@ -184,16 +184,16 @@ func (n *treeNode) FileName() (string, bool) {
|
||||||
return value, ok
|
return value, ok
|
||||||
}
|
}
|
||||||
|
|
||||||
func newNodeVersion(filePath string, node NodeResponse) (*data.NodeVersion, error) {
|
func newNodeVersion(log *zap.Logger, filePath string, node NodeResponse) (*data.NodeVersion, error) {
|
||||||
treeNode, err := newTreeNode(node)
|
treeNode, err := newTreeNode(node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("invalid tree node: %w", err)
|
return nil, fmt.Errorf("invalid tree node: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return newNodeVersionFromTreeNode(filePath, treeNode), nil
|
return newNodeVersionFromTreeNode(log, filePath, treeNode), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeVersion {
|
func newNodeVersionFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) *data.NodeVersion {
|
||||||
_, isUnversioned := treeNode.Get(isUnversionedKV)
|
_, isUnversioned := treeNode.Get(isUnversionedKV)
|
||||||
_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
|
_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
|
||||||
_, isCombined := treeNode.Get(isCombinedKV)
|
_, isCombined := treeNode.Get(isCombinedKV)
|
||||||
|
@ -217,7 +217,9 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV
|
||||||
}
|
}
|
||||||
|
|
||||||
if createdStr, ok := treeNode.Get(createdKV); ok {
|
if createdStr, ok := treeNode.Get(createdKV); ok {
|
||||||
if utcMilli, err := strconv.ParseInt(createdStr, 10, 64); err == nil {
|
if utcMilli, err := strconv.ParseInt(createdStr, 10, 64); err != nil {
|
||||||
|
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, createdStr), zap.Error(err))
|
||||||
|
} else {
|
||||||
created := time.UnixMilli(utcMilli)
|
created := time.UnixMilli(utcMilli)
|
||||||
version.Created = &created
|
version.Created = &created
|
||||||
}
|
}
|
||||||
|
@ -225,7 +227,9 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV
|
||||||
|
|
||||||
if ownerStr, ok := treeNode.Get(ownerKV); ok {
|
if ownerStr, ok := treeNode.Get(ownerKV); ok {
|
||||||
var owner user.ID
|
var owner user.ID
|
||||||
if err := owner.DecodeString(ownerStr); err == nil {
|
if err := owner.DecodeString(ownerStr); err != nil {
|
||||||
|
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, ownerStr), zap.Error(err))
|
||||||
|
} else {
|
||||||
version.Owner = &owner
|
version.Owner = &owner
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -233,10 +237,10 @@ func newNodeVersionFromTreeNode(filePath string, treeNode *treeNode) *data.NodeV
|
||||||
return version
|
return version
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMultipartInfoFromTreeNode(filePath string, treeNode *treeNode) (*data.MultipartInfo, error) {
|
func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *treeNode) (*data.MultipartInfo, error) {
|
||||||
uploadID, _ := treeNode.Get(uploadIDKV)
|
uploadID, _ := treeNode.Get(uploadIDKV)
|
||||||
if uploadID == "" {
|
if uploadID == "" {
|
||||||
return nil, fmt.Errorf("it's not a multipart node")
|
return nil, fmt.Errorf("it's not a multipart node: missing UploadId")
|
||||||
}
|
}
|
||||||
|
|
||||||
multipartInfo := &data.MultipartInfo{
|
multipartInfo := &data.MultipartInfo{
|
||||||
|
@ -246,23 +250,32 @@ func newMultipartInfoFromTreeNode(filePath string, treeNode *treeNode) (*data.Mu
|
||||||
Meta: treeNode.Meta,
|
Meta: treeNode.Meta,
|
||||||
}
|
}
|
||||||
|
|
||||||
ownerID, _ := treeNode.Get(ownerKV)
|
if ownerID, ok := treeNode.Get(ownerKV); ok {
|
||||||
_ = multipartInfo.Owner.DecodeString(ownerID)
|
if err := multipartInfo.Owner.DecodeString(ownerID); err != nil {
|
||||||
|
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, ownerID), zap.Error(err))
|
||||||
created, _ := treeNode.Get(createdKV)
|
}
|
||||||
if utcMilli, err := strconv.ParseInt(created, 10, 64); err == nil {
|
|
||||||
multipartInfo.Created = time.UnixMilli(utcMilli)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
finished, _ := treeNode.Get(finishedKV)
|
if created, ok := treeNode.Get(createdKV); ok {
|
||||||
if flag, err := strconv.ParseBool(finished); err == nil {
|
if utcMilli, err := strconv.ParseInt(created, 10, 64); err != nil {
|
||||||
multipartInfo.Finished = flag
|
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, created), zap.Error(err))
|
||||||
|
} else {
|
||||||
|
multipartInfo.Created = time.UnixMilli(utcMilli)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if finished, ok := treeNode.Get(finishedKV); ok {
|
||||||
|
if flag, err := strconv.ParseBool(finished); err != nil {
|
||||||
|
log.Warn(logs.InvalidTreeKV, zap.String(finishedKV, finished), zap.Error(err))
|
||||||
|
} else {
|
||||||
|
multipartInfo.Finished = flag
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return multipartInfo, nil
|
return multipartInfo, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) {
|
func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) {
|
||||||
multipartInfo := &data.MultipartInfo{
|
multipartInfo := &data.MultipartInfo{
|
||||||
ID: node.GetNodeID(),
|
ID: node.GetNodeID(),
|
||||||
Meta: make(map[string]string, len(node.GetMeta())),
|
Meta: make(map[string]string, len(node.GetMeta())),
|
||||||
|
@ -275,13 +288,19 @@ func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) {
|
||||||
case FileNameKey:
|
case FileNameKey:
|
||||||
multipartInfo.Key = string(kv.GetValue())
|
multipartInfo.Key = string(kv.GetValue())
|
||||||
case createdKV:
|
case createdKV:
|
||||||
if utcMilli, err := strconv.ParseInt(string(kv.GetValue()), 10, 64); err == nil {
|
if utcMilli, err := strconv.ParseInt(string(kv.GetValue()), 10, 64); err != nil {
|
||||||
|
log.Warn(logs.InvalidTreeKV, zap.String(createdKV, string(kv.GetValue())), zap.Error(err))
|
||||||
|
} else {
|
||||||
multipartInfo.Created = time.UnixMilli(utcMilli)
|
multipartInfo.Created = time.UnixMilli(utcMilli)
|
||||||
}
|
}
|
||||||
case ownerKV:
|
case ownerKV:
|
||||||
_ = multipartInfo.Owner.DecodeString(string(kv.GetValue()))
|
if err := multipartInfo.Owner.DecodeString(string(kv.GetValue())); err != nil {
|
||||||
|
log.Warn(logs.InvalidTreeKV, zap.String(ownerKV, string(kv.GetValue())), zap.Error(err))
|
||||||
|
}
|
||||||
case finishedKV:
|
case finishedKV:
|
||||||
if isFinished, err := strconv.ParseBool(string(kv.GetValue())); err == nil {
|
if isFinished, err := strconv.ParseBool(string(kv.GetValue())); err != nil {
|
||||||
|
log.Warn(logs.InvalidTreeKV, zap.String(finishedKV, string(kv.GetValue())), zap.Error(err))
|
||||||
|
} else {
|
||||||
multipartInfo.Finished = isFinished
|
multipartInfo.Finished = isFinished
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
@ -612,7 +631,7 @@ func (c *Tree) GetLatestVersion(ctx context.Context, bktInfo *data.BucketInfo, o
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return newNodeVersion(objectName, latestNode)
|
return newNodeVersion(c.reqLogger(ctx), objectName, latestNode)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getLatestNode(nodes []NodeResponse) (NodeResponse, error) {
|
func getLatestNode(nodes []NodeResponse) (NodeResponse, error) {
|
||||||
|
@ -818,7 +837,7 @@ func (s *VersionsByPrefixStreamImpl) parseNodeResponse(node NodeResponse) (res *
|
||||||
return nil, true, nil
|
return nil, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
return newNodeVersionFromTreeNode(filepath, trNode), false, nil
|
return newNodeVersionFromTreeNode(s.log, filepath, trNode), false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Tree) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) {
|
func (c *Tree) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) (data.VersionsStream, error) {
|
||||||
|
@ -840,7 +859,7 @@ func (c *Tree) InitVersionsByPrefixStream(ctx context.Context, bktInfo *data.Buc
|
||||||
headPrefix: strings.TrimSuffix(prefix, tailPrefix),
|
headPrefix: strings.TrimSuffix(prefix, tailPrefix),
|
||||||
tailPrefix: tailPrefix,
|
tailPrefix: tailPrefix,
|
||||||
latestOnly: latestOnly,
|
latestOnly: latestOnly,
|
||||||
log: c.log,
|
log: c.reqLogger(ctx),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1102,7 +1121,7 @@ func (c *Tree) getSubTreeMultipartUploads(ctx context.Context, bktInfo *data.Buc
|
||||||
namesMap[treeNode.ID] = filepath
|
namesMap[treeNode.ID] = filepath
|
||||||
}
|
}
|
||||||
|
|
||||||
multipartInfo, err := newMultipartInfoFromTreeNode(filepath, treeNode)
|
multipartInfo, err := newMultipartInfoFromTreeNode(c.reqLogger(ctx), filepath, treeNode)
|
||||||
if err != nil || multipartInfo.Finished {
|
if err != nil || multipartInfo.Finished {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -1140,8 +1159,9 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo,
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log := c.reqLogger(ctx)
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
info, err := newMultipartInfo(node)
|
info, err := newMultipartInfo(log, node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
@ -1373,9 +1393,10 @@ func (c *Tree) getVersions(ctx context.Context, bktInfo *data.BucketInfo, treeID
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log := c.reqLogger(ctx)
|
||||||
result := make([]*data.NodeVersion, 0, len(nodes))
|
result := make([]*data.NodeVersion, 0, len(nodes))
|
||||||
for _, node := range nodes {
|
for _, node := range nodes {
|
||||||
nodeVersion, err := newNodeVersion(filepath, node)
|
nodeVersion, err := newNodeVersion(log, filepath, node)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue