frostfs-http-gw/internal/handler/download.go
Roman Loginov 4b782cf124
All checks were successful
/ Vulncheck (push) Successful in 45s
/ Builds (push) Successful in 58s
/ OCI image (push) Successful in 1m19s
/ Lint (push) Successful in 2m3s
/ Tests (push) Successful in 54s
[#187] Add handling quota limit reached error
The Access Denied status may be received
from APE due to exceeding the quota. In
this situation, you need to return the
appropriate status code.

Signed-off-by: Roman Loginov <r.loginov@yadro.com>
2025-01-21 06:59:47 +00:00

226 lines
6 KiB
Go

package handler
import (
"archive/zip"
"bufio"
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
)
// DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format.
func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) {
cidParam := c.UserValue("cid").(string)
oidParam := c.UserValue("oid").(string)
downloadParam := c.QueryArgs().GetBool("download")
ctx := utils.GetContextFromRequest(c)
log := utils.GetReqLogOrDefault(ctx, h.log).With(
zap.String("cid", cidParam),
zap.String("oid", oidParam),
)
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
if err != nil {
logAndSendBucketError(c, log, err)
return
}
checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo)
if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) {
logAndSendBucketError(c, log, checkS3Err)
return
}
req := h.newRequest(c, log)
var objID oid.ID
if checkS3Err == nil && shouldDownload(oidParam, downloadParam) {
h.byS3Path(ctx, req, bktInfo.CID, oidParam, h.receiveFile)
} else if err = objID.DecodeString(oidParam); err == nil {
h.byNativeAddress(ctx, req, bktInfo.CID, objID, h.receiveFile)
} else {
h.browseIndex(c, checkS3Err != nil)
}
}
func shouldDownload(oidParam string, downloadParam bool) bool {
return !isDir(oidParam) || downloadParam
}
func (h *Handler) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) request {
return request{
RequestCtx: ctx,
log: log,
}
}
// DownloadByAttribute handles attribute-based download requests.
func (h *Handler) DownloadByAttribute(c *fasthttp.RequestCtx) {
h.byAttribute(c, h.receiveFile)
}
func (h *Handler) search(ctx context.Context, cnrID cid.ID, key, val string, op object.SearchMatchType) (ResObjectSearch, error) {
filters := object.NewSearchFilters()
filters.AddRootFilter()
filters.AddFilter(key, val, op)
prm := PrmObjectSearch{
PrmAuth: PrmAuth{
BearerToken: bearerToken(ctx),
},
Container: cnrID,
Filters: filters,
}
return h.frostfs.SearchObjects(ctx, prm)
}
func (h *Handler) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer, error) {
method := zip.Store
if h.config.ZipCompression() {
method = zip.Deflate
}
filePath := getZipFilePath(obj)
if len(filePath) == 0 || filePath[len(filePath)-1] == '/' {
return nil, fmt.Errorf("invalid filepath '%s'", filePath)
}
return zw.CreateHeader(&zip.FileHeader{
Name: filePath,
Method: method,
Modified: time.Now(),
})
}
// DownloadZipped handles zip by prefix requests.
func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
scid, _ := c.UserValue("cid").(string)
prefix, _ := c.UserValue("prefix").(string)
ctx := utils.GetContextFromRequest(c)
log := utils.GetReqLogOrDefault(ctx, h.log)
prefix, err := url.QueryUnescape(prefix)
if err != nil {
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("prefix", prefix), zap.Error(err))
ResponseError(c, "could not unescape prefix: "+err.Error(), fasthttp.StatusBadRequest)
return
}
log = log.With(zap.String("cid", scid), zap.String("prefix", prefix))
bktInfo, err := h.getBucketInfo(ctx, scid, log)
if err != nil {
logAndSendBucketError(c, log, err)
return
}
resSearch, err := h.search(ctx, bktInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
if err != nil {
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
ResponseError(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
return
}
c.Response.Header.Set(fasthttp.HeaderContentType, "application/zip")
c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"")
c.Response.SetStatusCode(http.StatusOK)
c.SetBodyStreamWriter(func(w *bufio.Writer) {
defer resSearch.Close()
zipWriter := zip.NewWriter(w)
var bufZip []byte
var addr oid.Address
empty := true
called := false
btoken := bearerToken(ctx)
addr.SetContainer(bktInfo.CID)
errIter := resSearch.Iterate(func(id oid.ID) bool {
called = true
if empty {
bufZip = make([]byte, 3<<20) // the same as for upload
}
empty = false
addr.SetObject(id)
if err = h.zipObject(ctx, zipWriter, addr, btoken, bufZip); err != nil {
log.Error(logs.FailedToAddObjectToArchive, zap.String("oid", id.EncodeToString()), zap.Error(err))
}
return false
})
if errIter != nil {
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter))
} else if !called {
log.Error(logs.ObjectsNotFound)
}
if err = zipWriter.Close(); err != nil {
log.Error(logs.CloseZipWriter, zap.Error(err))
}
})
}
func (h *Handler) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error {
prm := PrmObjectGet{
PrmAuth: PrmAuth{
BearerToken: btoken,
},
Address: addr,
}
resGet, err := h.frostfs.GetObject(ctx, prm)
if err != nil {
return fmt.Errorf("get FrostFS object: %v", err)
}
objWriter, err := h.addObjectToZip(zipWriter, &resGet.Header)
if err != nil {
return fmt.Errorf("zip create header: %v", err)
}
if _, err = io.CopyBuffer(objWriter, resGet.Payload, bufZip); err != nil {
return fmt.Errorf("copy object payload to zip file: %v", err)
}
if err = resGet.Payload.Close(); err != nil {
return fmt.Errorf("object body close error: %w", err)
}
if err = zipWriter.Flush(); err != nil {
return fmt.Errorf("flush zip writer: %v", err)
}
return nil
}
func getZipFilePath(obj *object.Object) string {
for _, attr := range obj.Attributes() {
if attr.Key() == object.AttributeFilePath {
return attr.Value()
}
}
return ""
}