[#227] Add versionID header after complete multipart

Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
Marina Biryukova 2023-10-26 11:33:51 +03:00 committed by Alexey Vanin
parent 0bed25816c
commit 890a8ed237
11 changed files with 25 additions and 163 deletions

View file

@ -3,7 +3,6 @@ package handler
import (
"encoding/xml"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
@ -399,6 +398,12 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
return
}
settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
if err != nil {
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
return
}
var (
uploadID = r.URL.Query().Get(uploadIDHeaderName)
uploadInfo = &layer.UploadInfoParams{
@ -425,29 +430,12 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
Parts: reqBody.Parts,
}
// Next operations might take some time, so we want to keep client's
// connection alive. To do so, gateway sends periodic white spaces
// back to the client the same way as Amazon S3 service does.
stopPeriodicResponseWriter := periodicXMLWriter(w, h.cfg.CompleteMultipartKeepalive())
// Start complete multipart upload which may take some time to fetch object
// and re-upload it part by part.
objInfo, err := h.completeMultipartUpload(r, c, bktInfo, reqInfo)
// Stop periodic writer as complete multipart upload is finished
// successfully or not.
headerIsWritten := stopPeriodicResponseWriter()
responseWriter := middleware.EncodeToResponse
errLogger := h.logAndSendError
// Do not send XML and HTTP headers if periodic writer was invoked at this point.
if headerIsWritten {
responseWriter = middleware.EncodeToResponseNoHeader
errLogger = h.logAndSendErrorNoHeader
}
if err != nil {
errLogger(w, "complete multipart error", reqInfo, err, additional...)
h.logAndSendError(w, "complete multipart error", reqInfo, err, additional...)
return
}
@ -457,12 +445,12 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
ETag: objInfo.ETag(h.cfg.MD5Enabled()),
}
// Here we previously set api.AmzVersionID header for versioned bucket.
// It is not possible after #60, because of periodic white
// space XML writer to keep connection with the client.
if settings.VersioningEnabled() {
w.Header().Set(api.AmzVersionID, objInfo.VersionID())
}
if err = responseWriter(w, response); err != nil {
errLogger(w, "something went wrong", reqInfo, err, additional...)
if err = middleware.EncodeToResponse(w, response); err != nil {
h.logAndSendError(w, "something went wrong", reqInfo, err, additional...)
}
}
@ -730,53 +718,3 @@ func encodeListPartsToResponse(info *layer.ListPartsInfo, params *layer.ListPart
StorageClass: api.DefaultStorageClass,
}
}
// periodicXMLWriter creates go routine to write xml header and whitespaces
// over time to avoid connection drop from the client. To work properly,
// pass `http.ResponseWriter` with implemented `http.Flusher` interface.
// Returns stop function which returns boolean if writer has been used
// during goroutine execution. To disable writer, pass 0 duration value.
func periodicXMLWriter(w io.Writer, dur time.Duration) (stop func() bool) {
if dur == 0 { // 0 duration disables periodic writer
return func() bool { return false }
}
whitespaceChar := []byte(" ")
closer := make(chan struct{})
done := make(chan struct{})
headerWritten := false
go func() {
defer close(done)
tick := time.NewTicker(dur)
defer tick.Stop()
for {
select {
case <-tick.C:
if !headerWritten {
_, err := w.Write([]byte(xml.Header))
headerWritten = err == nil
}
_, err := w.Write(whitespaceChar)
if err != nil {
return // is there anything we can do better than ignore error?
}
if buffered, ok := w.(http.Flusher); ok {
buffered.Flush()
}
case <-closer:
return
}
}
}()
stop = func() bool {
close(closer)
<-done // wait for goroutine to stop
return headerWritten
}
return stop
}