frostfs-s3-gw/api/handler/object_list.go
Denis Kirillov 07b60b15b3 [#644] Support keepalive during listing
Send whitespaces every time as new object in list is ready
to prevent client from context cancelling.

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2025-02-27 09:37:04 +03:00

449 lines
14 KiB
Go

package handler
import (
"context"
"encoding/xml"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
const maxObjectList = 1000 // Limit number of objects in a listObjectsResponse/listObjectsVersionsResponse.
// ListObjectsV1Handler handles objects listing requests for API version 1.
func (h *handler) ListObjectsV1Handler(w http.ResponseWriter, r *http.Request) {
ctx, span := tracing.StartSpanFromContext(r.Context(), "handler.ListObjectsV1")
defer span.End()
reqInfo := middleware.GetReqInfo(ctx)
params, err := parseListObjectsArgsV1(reqInfo)
if err != nil {
h.logAndSendError(ctx, w, "failed to parse arguments", reqInfo, err)
return
}
if params.BktInfo, err = h.getBucketAndCheckOwner(r, reqInfo.BucketName); err != nil {
h.logAndSendError(ctx, w, "could not get bucket info", reqInfo, err)
return
}
ch := make(chan struct{})
defer close(ch)
params.Chan = ch
stopPeriodicResponseWriter := periodicXMLWriter(w, h.cfg.ListingKeepaliveThrottle(), ch)
list, err := h.obj.ListObjectsV1(ctx, params)
if err != nil {
logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter())
logAndSendError(ctx, w, "could not list objects v1", reqInfo, err)
return
}
headerIsWritten := stopPeriodicResponseWriter()
if headerIsWritten {
if err = middleware.EncodeToResponseNoHeader(w, h.encodeV1(params, list)); err != nil {
h.logAndSendErrorNoHeader(ctx, w, "could not encode listing v1 response", reqInfo, err)
}
} else {
if err = middleware.EncodeToResponse(w, h.encodeV1(params, list)); err != nil {
h.logAndSendError(ctx, w, "could not encode listing v1 response", reqInfo, err)
}
}
}
func (h *handler) encodeV1(p *layer.ListObjectsParamsV1, list *layer.ListObjectsInfoV1) *ListObjectsV1Response {
res := &ListObjectsV1Response{
Name: p.BktInfo.Name,
EncodingType: p.Encode,
Marker: p.Marker,
Prefix: p.Prefix,
MaxKeys: p.MaxKeys,
Delimiter: p.Delimiter,
IsTruncated: list.IsTruncated,
NextMarker: list.NextMarker,
}
res.CommonPrefixes = fillPrefixes(list.Prefixes, p.Encode)
res.Contents = fillContentsWithOwner(list.Objects, p.Encode, h.cfg.MD5Enabled())
return res
}
// ListObjectsV2Handler handles objects listing requests for API version 2.
func (h *handler) ListObjectsV2Handler(w http.ResponseWriter, r *http.Request) {
ctx, span := tracing.StartSpanFromContext(r.Context(), "handler.ListObjectsV2")
defer span.End()
reqInfo := middleware.GetReqInfo(ctx)
params, err := parseListObjectsArgsV2(reqInfo)
if err != nil {
h.logAndSendError(ctx, w, "failed to parse arguments", reqInfo, err)
return
}
if params.BktInfo, err = h.getBucketAndCheckOwner(r, reqInfo.BucketName); err != nil {
h.logAndSendError(ctx, w, "could not get bucket info", reqInfo, err)
return
}
ch := make(chan struct{})
defer close(ch)
params.Chan = ch
stopPeriodicResponseWriter := periodicXMLWriter(w, h.cfg.ListingKeepaliveThrottle(), ch)
list, err := h.obj.ListObjectsV2(ctx, params)
if err != nil {
logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter())
logAndSendError(ctx, w, "could not list objects v2", reqInfo, err)
return
}
headerIsWritten := stopPeriodicResponseWriter()
if headerIsWritten {
if err = middleware.EncodeToResponseNoHeader(w, h.encodeV2(params, list)); err != nil {
h.logAndSendErrorNoHeader(ctx, w, "could not encode listing v2 response", reqInfo, err)
}
} else {
if err = middleware.EncodeToResponse(w, h.encodeV2(params, list)); err != nil {
h.logAndSendError(ctx, w, "could not encode listing v2 response", reqInfo, err)
}
}
}
func (h *handler) encodeV2(p *layer.ListObjectsParamsV2, list *layer.ListObjectsInfoV2) *ListObjectsV2Response {
res := &ListObjectsV2Response{
Name: p.BktInfo.Name,
EncodingType: p.Encode,
Prefix: s3PathEncode(p.Prefix, p.Encode),
KeyCount: len(list.Objects) + len(list.Prefixes),
MaxKeys: p.MaxKeys,
Delimiter: s3PathEncode(p.Delimiter, p.Encode),
StartAfter: s3PathEncode(p.StartAfter, p.Encode),
IsTruncated: list.IsTruncated,
ContinuationToken: p.ContinuationToken,
NextContinuationToken: list.NextContinuationToken,
}
res.CommonPrefixes = fillPrefixes(list.Prefixes, p.Encode)
res.Contents = fillContents(list.Objects, p.Encode, p.FetchOwner, h.cfg.MD5Enabled())
return res
}
func parseListObjectsArgsV1(reqInfo *middleware.ReqInfo) (*layer.ListObjectsParamsV1, error) {
var (
res layer.ListObjectsParamsV1
queryValues = reqInfo.URL.Query()
)
common, err := parseListObjectArgs(reqInfo)
if err != nil {
return nil, err
}
res.ListObjectsParamsCommon = *common
res.Marker = queryValues.Get("marker")
return &res, nil
}
func parseListObjectsArgsV2(reqInfo *middleware.ReqInfo) (*layer.ListObjectsParamsV2, error) {
var (
res layer.ListObjectsParamsV2
queryValues = reqInfo.URL.Query()
)
common, err := parseListObjectArgs(reqInfo)
if err != nil {
return nil, err
}
res.ListObjectsParamsCommon = *common
res.ContinuationToken, err = parseContinuationToken(queryValues)
if err != nil {
return nil, err
}
res.StartAfter = queryValues.Get("start-after")
res.FetchOwner, _ = strconv.ParseBool(queryValues.Get("fetch-owner"))
return &res, nil
}
func parseListObjectArgs(reqInfo *middleware.ReqInfo) (*layer.ListObjectsParamsCommon, error) {
var (
err error
res layer.ListObjectsParamsCommon
queryValues = reqInfo.URL.Query()
)
res.Delimiter = queryValues.Get("delimiter")
res.Encode = queryValues.Get("encoding-type")
if res.Encode != "" && strings.ToLower(res.Encode) != urlEncodingType {
return nil, errors.GetAPIError(errors.ErrInvalidEncodingMethod)
}
if queryValues.Get("max-keys") == "" {
res.MaxKeys = maxObjectList
} else if res.MaxKeys, err = strconv.Atoi(queryValues.Get("max-keys")); err != nil || res.MaxKeys < 0 {
return nil, errors.GetAPIError(errors.ErrInvalidMaxKeys)
}
res.Prefix = queryValues.Get("prefix")
return &res, nil
}
func parseContinuationToken(queryValues url.Values) (string, error) {
if val, ok := queryValues["continuation-token"]; ok {
var objID oid.ID
if err := objID.DecodeString(val[0]); err != nil {
return "", errors.GetAPIError(errors.ErrIncorrectContinuationToken)
}
return val[0], nil
}
return "", nil
}
func fillPrefixes(src []string, encode string) []CommonPrefix {
var dst []CommonPrefix
for _, obj := range src {
dst = append(dst, CommonPrefix{
Prefix: s3PathEncode(obj, encode),
})
}
return dst
}
func fillContentsWithOwner(src []*data.ExtendedNodeVersion, encode string, md5Enabled bool) []Object {
return fillContents(src, encode, true, md5Enabled)
}
func fillContents(src []*data.ExtendedNodeVersion, encode string, fetchOwner, md5Enabled bool) []Object {
var dst []Object
for _, obj := range src {
res := Object{
Key: s3PathEncode(obj.NodeVersion.FilePath, encode),
Size: obj.NodeVersion.Size,
LastModified: obj.NodeVersion.Created.UTC().Format(time.RFC3339),
ETag: data.Quote(obj.NodeVersion.GetETag(md5Enabled)),
StorageClass: api.DefaultStorageClass,
}
if fetchOwner {
owner := obj.NodeVersion.Owner.String()
res.Owner = &Owner{
ID: owner,
DisplayName: owner,
}
}
dst = append(dst, res)
}
return dst
}
func (h *handler) ListBucketObjectVersionsHandler(w http.ResponseWriter, r *http.Request) {
ctx, span := tracing.StartSpanFromContext(r.Context(), "handler.ListBucketObjectVersions")
defer span.End()
reqInfo := middleware.GetReqInfo(ctx)
p, err := parseListObjectVersionsRequest(reqInfo)
if err != nil {
h.logAndSendError(ctx, w, "failed to parse request", reqInfo, err)
return
}
if p.BktInfo, err = h.getBucketAndCheckOwner(r, reqInfo.BucketName); err != nil {
h.logAndSendError(ctx, w, "could not get bucket info", reqInfo, err)
return
}
ch := make(chan struct{})
defer close(ch)
p.Chan = ch
stopPeriodicResponseWriter := periodicXMLWriter(w, h.cfg.ListingKeepaliveThrottle(), ch)
list, err := h.obj.ListObjectVersions(ctx, p)
if err != nil {
logAndSendError := h.periodicWriterErrorSender(stopPeriodicResponseWriter())
logAndSendError(ctx, w, "could not list objects versions", reqInfo, err)
return
}
response := encodeListObjectVersionsToResponse(p, list, p.BktInfo.Name, h.cfg.MD5Enabled())
headerIsWritten := stopPeriodicResponseWriter()
if headerIsWritten {
if err = middleware.EncodeToResponseNoHeader(w, response); err != nil {
h.logAndSendErrorNoHeader(ctx, w, "could not encode listing versions response", reqInfo, err)
}
} else {
if err = middleware.EncodeToResponse(w, response); err != nil {
h.logAndSendError(ctx, w, "could not encode listing versions response", reqInfo, err)
}
}
}
func parseListObjectVersionsRequest(reqInfo *middleware.ReqInfo) (*layer.ListObjectVersionsParams, error) {
var (
err error
res layer.ListObjectVersionsParams
queryValues = reqInfo.URL.Query()
)
if queryValues.Get("max-keys") == "" {
res.MaxKeys = maxObjectList
} else if res.MaxKeys, err = strconv.Atoi(queryValues.Get("max-keys")); err != nil || res.MaxKeys <= 0 {
return nil, errors.GetAPIError(errors.ErrInvalidMaxKeys)
}
res.Prefix = queryValues.Get("prefix")
res.KeyMarker = queryValues.Get("key-marker")
res.Delimiter = queryValues.Get("delimiter")
res.Encode = queryValues.Get("encoding-type")
res.VersionIDMarker = queryValues.Get("version-id-marker")
if res.Encode != "" && strings.ToLower(res.Encode) != urlEncodingType {
return nil, errors.GetAPIError(errors.ErrInvalidEncodingMethod)
}
if res.VersionIDMarker != "" && res.KeyMarker == "" {
return nil, errors.GetAPIError(errors.VersionIDMarkerWithoutKeyMarker)
}
return &res, nil
}
func encodeListObjectVersionsToResponse(p *layer.ListObjectVersionsParams, info *layer.ListObjectVersionsInfo, bucketName string, md5Enabled bool) *ListObjectsVersionsResponse {
res := ListObjectsVersionsResponse{
Name: bucketName,
IsTruncated: info.IsTruncated,
KeyMarker: s3PathEncode(info.KeyMarker, p.Encode),
NextKeyMarker: s3PathEncode(info.NextKeyMarker, p.Encode),
NextVersionIDMarker: info.NextVersionIDMarker,
VersionIDMarker: info.VersionIDMarker,
Prefix: s3PathEncode(p.Prefix, p.Encode),
Delimiter: s3PathEncode(p.Delimiter, p.Encode),
EncodingType: p.Encode,
MaxKeys: p.MaxKeys,
}
for _, prefix := range info.CommonPrefixes {
res.CommonPrefixes = append(res.CommonPrefixes, CommonPrefix{Prefix: s3PathEncode(prefix, p.Encode)})
}
for _, ver := range info.Version {
res.Version = append(res.Version, ObjectVersionResponse{
IsLatest: ver.IsLatest,
Key: s3PathEncode(ver.NodeVersion.FilePath, p.Encode),
LastModified: ver.NodeVersion.Created.UTC().Format(time.RFC3339),
Owner: Owner{
ID: ver.NodeVersion.Owner.String(),
DisplayName: ver.NodeVersion.Owner.String(),
},
Size: ver.NodeVersion.Size,
VersionID: ver.Version(),
ETag: data.Quote(ver.NodeVersion.GetETag(md5Enabled)),
StorageClass: api.DefaultStorageClass,
})
}
// this loop is not starting till versioning is not implemented
for _, del := range info.DeleteMarker {
res.DeleteMarker = append(res.DeleteMarker, DeleteMarkerEntry{
IsLatest: del.IsLatest,
Key: s3PathEncode(del.NodeVersion.FilePath, p.Encode),
LastModified: del.NodeVersion.Created.UTC().Format(time.RFC3339),
Owner: Owner{
ID: del.NodeVersion.Owner.String(),
DisplayName: del.NodeVersion.Owner.String(),
},
VersionID: del.Version(),
})
}
return &res
}
// periodicWriterErrorSender returns handler function to send error. If header is
// already written by periodic XML writer, do not send HTTP and XML headers.
func (h *handler) periodicWriterErrorSender(headerWritten bool) func(context.Context, http.ResponseWriter, string, *middleware.ReqInfo, error, ...zap.Field) {
if headerWritten {
return h.logAndSendErrorNoHeader
}
return h.logAndSendError
}
// periodicXMLWriter creates go routine to write xml header and whitespaces
// over time to avoid connection drop from the client. To work properly,
// pass `http.ResponseWriter` with implemented `http.Flusher` interface.
// Returns stop function which returns boolean if writer has been used
// during goroutine execution. Zero or negative duration disable writing.
func periodicXMLWriter(w io.Writer, dur time.Duration, ch <-chan struct{}) (stop func() bool) {
if dur <= 0 {
return func() bool { return false }
}
whitespaceChar := []byte(" ")
closer := make(chan struct{})
done := make(chan struct{})
headerWritten := false
go func() {
defer close(done)
var lastEvent time.Time
for {
select {
case _, ok := <-ch:
if !ok {
return
}
if time.Since(lastEvent) < dur {
continue
}
lastEvent = time.Now()
if !headerWritten {
_, err := w.Write([]byte(xml.Header))
headerWritten = err == nil
}
_, err := w.Write(whitespaceChar)
if err != nil {
return // is there anything we can do better than ignore error?
}
if buffered, ok := w.(http.Flusher); ok {
buffered.Flush()
}
case <-closer:
return
}
}
}()
stop = func() bool {
close(closer)
<-done // wait for goroutine to stop
return headerWritten
}
return stop
}