[#146] Move getting chunk payload reader to separate function
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
parent
e58ea40463
commit
751a9be7cc
2 changed files with 38 additions and 46 deletions
|
@ -217,6 +217,12 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
body, err := h.getBodyReader(r)
|
||||||
|
if err != nil {
|
||||||
|
h.logAndSendError(w, "failed to get body reader", reqInfo, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var size uint64
|
var size uint64
|
||||||
if r.ContentLength > 0 {
|
if r.ContentLength > 0 {
|
||||||
size = uint64(r.ContentLength)
|
size = uint64(r.ContentLength)
|
||||||
|
@ -230,28 +236,7 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
},
|
},
|
||||||
PartNumber: partNumber,
|
PartNumber: partNumber,
|
||||||
Size: size,
|
Size: size,
|
||||||
Reader: r.Body,
|
Reader: body,
|
||||||
}
|
|
||||||
|
|
||||||
if api.IsSignedStreamingV4(r) {
|
|
||||||
if decodeContentSize := r.Header.Get(api.AmzDecodedContentLength); len(decodeContentSize) > 0 {
|
|
||||||
_, err := strconv.Atoi(decodeContentSize)
|
|
||||||
if err != nil {
|
|
||||||
h.logAndSendError(w, "cannot parse decode content length information", reqInfo,
|
|
||||||
errors.GetAPIError(errors.ErrMissingContentLength))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
h.logAndSendError(w, "expecting decode content length information", reqInfo,
|
|
||||||
errors.GetAPIError(errors.ErrMissingContentLength))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
chunkReader, err := newSignV4ChunkedReader(r)
|
|
||||||
if err != nil {
|
|
||||||
h.logAndSendError(w, "cannot initialize chunk reader", reqInfo, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
p.Reader = chunkReader
|
|
||||||
}
|
}
|
||||||
|
|
||||||
p.Info.Encryption, err = formEncryptionParams(r)
|
p.Info.Encryption, err = formEncryptionParams(r)
|
||||||
|
|
|
@ -220,6 +220,12 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
body, err := h.getBodyReader(r)
|
||||||
|
if err != nil {
|
||||||
|
h.logAndSendError(w, "failed to get body reader", reqInfo, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
var size uint64
|
var size uint64
|
||||||
if r.ContentLength > 0 {
|
if r.ContentLength > 0 {
|
||||||
size = uint64(r.ContentLength)
|
size = uint64(r.ContentLength)
|
||||||
|
@ -228,33 +234,12 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
params := &layer.PutObjectParams{
|
params := &layer.PutObjectParams{
|
||||||
BktInfo: bktInfo,
|
BktInfo: bktInfo,
|
||||||
Object: reqInfo.ObjectName,
|
Object: reqInfo.ObjectName,
|
||||||
Reader: r.Body,
|
Reader: body,
|
||||||
Size: size,
|
Size: size,
|
||||||
Header: metadata,
|
Header: metadata,
|
||||||
Encryption: encryptionParams,
|
Encryption: encryptionParams,
|
||||||
}
|
}
|
||||||
|
|
||||||
if api.IsSignedStreamingV4(r) {
|
|
||||||
if decodeContentSize := r.Header.Get(api.AmzDecodedContentLength); len(decodeContentSize) > 0 {
|
|
||||||
_, err := strconv.Atoi(decodeContentSize)
|
|
||||||
if err != nil {
|
|
||||||
h.logAndSendError(w, "cannot parse decode content length information", reqInfo,
|
|
||||||
errors.GetAPIError(errors.ErrMissingContentLength))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
h.logAndSendError(w, "expecting decode content length information", reqInfo,
|
|
||||||
errors.GetAPIError(errors.ErrMissingContentLength))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
chunkReader, err := newSignV4ChunkedReader(r)
|
|
||||||
if err != nil {
|
|
||||||
h.logAndSendError(w, "cannot initialize chunk reader", reqInfo, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
params.Reader = chunkReader
|
|
||||||
}
|
|
||||||
|
|
||||||
params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, bktInfo.LocationConstraint)
|
params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, bktInfo.LocationConstraint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logAndSendError(w, "invalid copies number", reqInfo, err)
|
h.logAndSendError(w, "invalid copies number", reqInfo, err)
|
||||||
|
@ -275,8 +260,8 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
extendedObjInfo, err := h.obj.PutObject(ctx, params)
|
extendedObjInfo, err := h.obj.PutObject(ctx, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_, err2 := io.Copy(io.Discard, r.Body)
|
_, err2 := io.Copy(io.Discard, body)
|
||||||
err3 := r.Body.Close()
|
err3 := body.Close()
|
||||||
h.logAndSendError(w, "could not upload object", reqInfo, err, zap.Errors("body close errors", []error{err2, err3}))
|
h.logAndSendError(w, "could not upload object", reqInfo, err, zap.Errors("body close errors", []error{err2, err3}))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -339,6 +324,28 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
middleware.WriteSuccessResponseHeadersOnly(w)
|
middleware.WriteSuccessResponseHeadersOnly(w)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *handler) getBodyReader(r *http.Request) (io.ReadCloser, error) {
|
||||||
|
if !api.IsSignedStreamingV4(r) {
|
||||||
|
return r.Body, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
decodeContentSize := r.Header.Get(api.AmzDecodedContentLength)
|
||||||
|
if len(decodeContentSize) == 0 {
|
||||||
|
return nil, errors.GetAPIError(errors.ErrMissingContentLength)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := strconv.Atoi(decodeContentSize); err != nil {
|
||||||
|
return nil, fmt.Errorf("%w: parse decoded content length: %s", errors.GetAPIError(errors.ErrMissingContentLength), err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
chunkReader, err := newSignV4ChunkedReader(r)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("initialize chunk reader: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return chunkReader, nil
|
||||||
|
}
|
||||||
|
|
||||||
func formEncryptionParams(r *http.Request) (enc encryption.Params, err error) {
|
func formEncryptionParams(r *http.Request) (enc encryption.Params, err error) {
|
||||||
sseCustomerAlgorithm := r.Header.Get(api.AmzServerSideEncryptionCustomerAlgorithm)
|
sseCustomerAlgorithm := r.Header.Get(api.AmzServerSideEncryptionCustomerAlgorithm)
|
||||||
sseCustomerKey := r.Header.Get(api.AmzServerSideEncryptionCustomerKey)
|
sseCustomerKey := r.Header.Get(api.AmzServerSideEncryptionCustomerKey)
|
||||||
|
|
Loading…
Reference in a new issue