forked from TrueCloudLab/frostfs-http-gw
[#73] Uploader, downloader structures refactoring
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
This commit is contained in:
parent
add07a21ed
commit
d219943542
27 changed files with 672 additions and 664 deletions
10
Makefile
10
Makefile
|
@ -18,8 +18,8 @@ TMP_DIR := .cache
|
||||||
|
|
||||||
# List of binaries to build. For now just one.
|
# List of binaries to build. For now just one.
|
||||||
BINDIR = bin
|
BINDIR = bin
|
||||||
DIRS = $(BINDIR)
|
CMDS = $(addprefix frostfs-, $(notdir $(wildcard cmd/*)))
|
||||||
BINS = $(BINDIR)/frostfs-http-gw
|
BINS = $(addprefix $(BINDIR)/, $(CMDS))
|
||||||
|
|
||||||
.PHONY: all $(BINS) $(DIRS) dep docker/ test cover fmt image image-push dirty-image lint docker/lint pre-commit unpre-commit version clean
|
.PHONY: all $(BINS) $(DIRS) dep docker/ test cover fmt image image-push dirty-image lint docker/lint pre-commit unpre-commit version clean
|
||||||
|
|
||||||
|
@ -37,7 +37,7 @@ $(BINS): $(DIRS) dep
|
||||||
CGO_ENABLED=0 \
|
CGO_ENABLED=0 \
|
||||||
go build -v -trimpath \
|
go build -v -trimpath \
|
||||||
-ldflags "-X main.Version=$(VERSION)" \
|
-ldflags "-X main.Version=$(VERSION)" \
|
||||||
-o $@ ./
|
-o $@ ./cmd/$(subst frostfs-,,$(notdir $@))
|
||||||
|
|
||||||
$(DIRS):
|
$(DIRS):
|
||||||
@echo "⇒ Ensure dir: $@"
|
@echo "⇒ Ensure dir: $@"
|
||||||
|
@ -90,7 +90,7 @@ image:
|
||||||
--build-arg REPO=$(REPO) \
|
--build-arg REPO=$(REPO) \
|
||||||
--build-arg VERSION=$(VERSION) \
|
--build-arg VERSION=$(VERSION) \
|
||||||
--rm \
|
--rm \
|
||||||
-f Dockerfile \
|
-f .docker/Dockerfile \
|
||||||
-t $(HUB_IMAGE):$(HUB_TAG) .
|
-t $(HUB_IMAGE):$(HUB_TAG) .
|
||||||
|
|
||||||
# Push Docker image to the hub
|
# Push Docker image to the hub
|
||||||
|
@ -105,7 +105,7 @@ dirty-image:
|
||||||
--build-arg REPO=$(REPO) \
|
--build-arg REPO=$(REPO) \
|
||||||
--build-arg VERSION=$(VERSION) \
|
--build-arg VERSION=$(VERSION) \
|
||||||
--rm \
|
--rm \
|
||||||
-f Dockerfile.dirty \
|
-f .docker/Dockerfile.dirty \
|
||||||
-t $(HUB_IMAGE)-dirty:$(HUB_TAG) .
|
-t $(HUB_IMAGE)-dirty:$(HUB_TAG) .
|
||||||
|
|
||||||
# Install linters
|
# Install linters
|
||||||
|
|
|
@ -11,15 +11,14 @@ import (
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/downloader"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/services"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/services"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/uploader"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
|
@ -51,15 +50,10 @@ type (
|
||||||
resolver *resolver.ContainerResolver
|
resolver *resolver.ContainerResolver
|
||||||
metrics *gateMetrics
|
metrics *gateMetrics
|
||||||
services []*metrics.Service
|
services []*metrics.Service
|
||||||
settings *appSettings
|
settings *handler.Settings
|
||||||
servers []Server
|
servers []Server
|
||||||
}
|
}
|
||||||
|
|
||||||
appSettings struct {
|
|
||||||
Uploader *uploader.Settings
|
|
||||||
Downloader *downloader.Settings
|
|
||||||
}
|
|
||||||
|
|
||||||
// App is an interface for the main gateway function.
|
// App is an interface for the main gateway function.
|
||||||
App interface {
|
App interface {
|
||||||
Wait()
|
Wait()
|
||||||
|
@ -140,10 +134,7 @@ func newApp(ctx context.Context, opt ...Option) App {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) initAppSettings() {
|
func (a *app) initAppSettings() {
|
||||||
a.settings = &appSettings{
|
a.settings = &handler.Settings{}
|
||||||
Uploader: &uploader.Settings{},
|
|
||||||
Downloader: &downloader.Settings{},
|
|
||||||
}
|
|
||||||
|
|
||||||
a.updateSettings()
|
a.updateSettings()
|
||||||
}
|
}
|
||||||
|
@ -334,11 +325,10 @@ func (a *app) setHealthStatus() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) Serve() {
|
func (a *app) Serve() {
|
||||||
uploadRoutes := uploader.New(a.AppParams(), a.settings.Uploader)
|
handler := handler.New(a.AppParams(), a.settings, tree.NewTree(services.NewPoolWrapper(a.treePool)))
|
||||||
downloadRoutes := downloader.New(a.AppParams(), a.settings.Downloader, tree.NewTree(services.NewPoolWrapper(a.treePool)))
|
|
||||||
|
|
||||||
// Configure router.
|
// Configure router.
|
||||||
a.configureRouter(uploadRoutes, downloadRoutes)
|
a.configureRouter(handler)
|
||||||
|
|
||||||
a.startServices()
|
a.startServices()
|
||||||
a.initServers(a.ctx)
|
a.initServers(a.ctx)
|
||||||
|
@ -425,8 +415,8 @@ func (a *app) configReload(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) updateSettings() {
|
func (a *app) updateSettings() {
|
||||||
a.settings.Uploader.SetDefaultTimestamp(a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp))
|
a.settings.SetDefaultTimestamp(a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp))
|
||||||
a.settings.Downloader.SetZipCompression(a.cfg.GetBool(cfgZipCompression))
|
a.settings.SetZipCompression(a.cfg.GetBool(cfgZipCompression))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) startServices() {
|
func (a *app) startServices() {
|
||||||
|
@ -450,7 +440,7 @@ func (a *app) stopServices() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) configureRouter(uploadRoutes *uploader.Uploader, downloadRoutes *downloader.Downloader) {
|
func (a *app) configureRouter(handler *handler.Handler) {
|
||||||
r := router.New()
|
r := router.New()
|
||||||
r.RedirectTrailingSlash = true
|
r.RedirectTrailingSlash = true
|
||||||
r.NotFound = func(r *fasthttp.RequestCtx) {
|
r.NotFound = func(r *fasthttp.RequestCtx) {
|
||||||
|
@ -459,15 +449,16 @@ func (a *app) configureRouter(uploadRoutes *uploader.Uploader, downloadRoutes *d
|
||||||
r.MethodNotAllowed = func(r *fasthttp.RequestCtx) {
|
r.MethodNotAllowed = func(r *fasthttp.RequestCtx) {
|
||||||
response.Error(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)
|
response.Error(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)
|
||||||
}
|
}
|
||||||
r.POST("/upload/{cid}", a.logger(a.tokenizer(a.tracer(uploadRoutes.Upload))))
|
|
||||||
|
r.POST("/upload/{cid}", a.logger(a.tokenizer(a.tracer(handler.Upload))))
|
||||||
a.log.Info(logs.AddedPathUploadCid)
|
a.log.Info(logs.AddedPathUploadCid)
|
||||||
r.GET("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.DownloadByAddressOrBucketName))))
|
r.GET("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(handler.DownloadByAddressOrBucketName))))
|
||||||
r.HEAD("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.HeadByAddressOrBucketName))))
|
r.HEAD("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(handler.HeadByAddressOrBucketName))))
|
||||||
a.log.Info(logs.AddedPathGetCidOid)
|
a.log.Info(logs.AddedPathGetCidOid)
|
||||||
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.DownloadByAttribute))))
|
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(handler.DownloadByAttribute))))
|
||||||
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.HeadByAttribute))))
|
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(handler.HeadByAttribute))))
|
||||||
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal)
|
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal)
|
||||||
r.GET("/zip/{cid}/{prefix:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.DownloadZipped))))
|
r.GET("/zip/{cid}/{prefix:*}", a.logger(a.tokenizer(a.tracer(handler.DownloadZipped))))
|
||||||
a.log.Info(logs.AddedPathZipCidPrefix)
|
a.log.Info(logs.AddedPathZipCidPrefix)
|
||||||
|
|
||||||
a.webServer.Handler = r.Handler
|
a.webServer.Handler = r.Handler
|
|
@ -83,7 +83,7 @@ func runServer() (App, context.CancelFunc) {
|
||||||
v := getDefaultConfig()
|
v := getDefaultConfig()
|
||||||
l, lvl := newLogger(v)
|
l, lvl := newLogger(v)
|
||||||
application := newApp(cancelCtx, WithConfig(v), WithLogger(l, lvl))
|
application := newApp(cancelCtx, WithConfig(v), WithLogger(l, lvl))
|
||||||
go application.Serve(cancelCtx)
|
go application.Serve()
|
||||||
|
|
||||||
return application, cancel
|
return application, cancel
|
||||||
}
|
}
|
|
@ -1,537 +0,0 @@
|
||||||
package downloader
|
|
||||||
|
|
||||||
import (
|
|
||||||
"archive/zip"
|
|
||||||
"bufio"
|
|
||||||
"bytes"
|
|
||||||
"context"
|
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"net/http"
|
|
||||||
"net/url"
|
|
||||||
"path"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
|
||||||
"github.com/valyala/fasthttp"
|
|
||||||
"go.uber.org/atomic"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
|
||||||
|
|
||||||
type request struct {
|
|
||||||
*fasthttp.RequestCtx
|
|
||||||
log *zap.Logger
|
|
||||||
}
|
|
||||||
|
|
||||||
func isValidToken(s string) bool {
|
|
||||||
for _, c := range s {
|
|
||||||
if c <= ' ' || c > 127 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if strings.ContainsRune("()<>@,;:\\\"/[]?={}", c) {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func isValidValue(s string) bool {
|
|
||||||
for _, c := range s {
|
|
||||||
// HTTP specification allows for more technically, but we don't want to escape things.
|
|
||||||
if c < ' ' || c > 127 || c == '"' {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
type readCloser struct {
|
|
||||||
io.Reader
|
|
||||||
io.Closer
|
|
||||||
}
|
|
||||||
|
|
||||||
// initializes io.Reader with the limited size and detects Content-Type from it.
|
|
||||||
// Returns r's error directly. Also returns the processed data.
|
|
||||||
func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (string, []byte, error) {
|
|
||||||
if maxSize > sizeToDetectType {
|
|
||||||
maxSize = sizeToDetectType
|
|
||||||
}
|
|
||||||
|
|
||||||
buf := make([]byte, maxSize) // maybe sync-pool the slice?
|
|
||||||
|
|
||||||
r, err := rInit(maxSize)
|
|
||||||
if err != nil {
|
|
||||||
return "", nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
n, err := r.Read(buf)
|
|
||||||
if err != nil && err != io.EOF {
|
|
||||||
return "", nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
buf = buf[:n]
|
|
||||||
|
|
||||||
return http.DetectContentType(buf), buf, err // to not lose io.EOF
|
|
||||||
}
|
|
||||||
|
|
||||||
func receiveFile(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) {
|
|
||||||
var (
|
|
||||||
err error
|
|
||||||
dis = "inline"
|
|
||||||
start = time.Now()
|
|
||||||
filename string
|
|
||||||
)
|
|
||||||
|
|
||||||
var prm pool.PrmObjectGet
|
|
||||||
prm.SetAddress(objectAddress)
|
|
||||||
if btoken := bearerToken(ctx); btoken != nil {
|
|
||||||
prm.UseBearer(*btoken)
|
|
||||||
}
|
|
||||||
|
|
||||||
rObj, err := clnt.GetObject(ctx, prm)
|
|
||||||
if err != nil {
|
|
||||||
req.handleFrostFSErr(err, start)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// we can't close reader in this function, so how to do it?
|
|
||||||
|
|
||||||
if req.Request.URI().QueryArgs().GetBool("download") {
|
|
||||||
dis = "attachment"
|
|
||||||
}
|
|
||||||
|
|
||||||
payloadSize := rObj.Header.PayloadSize()
|
|
||||||
|
|
||||||
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
|
||||||
var contentType string
|
|
||||||
for _, attr := range rObj.Header.Attributes() {
|
|
||||||
key := attr.Key()
|
|
||||||
val := attr.Value()
|
|
||||||
if !isValidToken(key) || !isValidValue(val) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
key = utils.BackwardTransformIfSystem(key)
|
|
||||||
|
|
||||||
req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
|
|
||||||
switch key {
|
|
||||||
case object.AttributeFileName:
|
|
||||||
filename = val
|
|
||||||
case object.AttributeTimestamp:
|
|
||||||
value, err := strconv.ParseInt(val, 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
req.log.Info(logs.CouldntParseCreationDate,
|
|
||||||
zap.String("key", key),
|
|
||||||
zap.String("val", val),
|
|
||||||
zap.Error(err))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
req.Response.Header.Set(fasthttp.HeaderLastModified,
|
|
||||||
time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
|
||||||
case object.AttributeContentType:
|
|
||||||
contentType = val
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
idsToResponse(&req.Response, &rObj.Header)
|
|
||||||
|
|
||||||
if len(contentType) == 0 {
|
|
||||||
// determine the Content-Type from the payload head
|
|
||||||
var payloadHead []byte
|
|
||||||
|
|
||||||
contentType, payloadHead, err = readContentType(payloadSize, func(uint64) (io.Reader, error) {
|
|
||||||
return rObj.Payload, nil
|
|
||||||
})
|
|
||||||
if err != nil && err != io.EOF {
|
|
||||||
req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err))
|
|
||||||
response.Error(req.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// reset payload reader since a part of the data has been read
|
|
||||||
var headReader io.Reader = bytes.NewReader(payloadHead)
|
|
||||||
|
|
||||||
if err != io.EOF { // otherwise, we've already read full payload
|
|
||||||
headReader = io.MultiReader(headReader, rObj.Payload)
|
|
||||||
}
|
|
||||||
|
|
||||||
// note: we could do with io.Reader, but SetBodyStream below closes body stream
|
|
||||||
// if it implements io.Closer and that's useful for us.
|
|
||||||
rObj.Payload = readCloser{headReader, rObj.Payload}
|
|
||||||
}
|
|
||||||
req.SetContentType(contentType)
|
|
||||||
|
|
||||||
req.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
|
||||||
|
|
||||||
req.Response.SetBodyStream(rObj.Payload, int(payloadSize))
|
|
||||||
}
|
|
||||||
|
|
||||||
func bearerToken(ctx context.Context) *bearer.Token {
|
|
||||||
if tkn, err := tokens.LoadBearerToken(ctx); err == nil {
|
|
||||||
return tkn
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *request) handleFrostFSErr(err error, start time.Time) {
|
|
||||||
logFields := []zap.Field{
|
|
||||||
zap.Stringer("elapsed", time.Since(start)),
|
|
||||||
zap.Error(err),
|
|
||||||
}
|
|
||||||
statusCode, msg, additionalFields := response.FormErrorResponse("could not receive object", err)
|
|
||||||
logFields = append(logFields, additionalFields...)
|
|
||||||
|
|
||||||
r.log.Error(logs.CouldNotReceiveObject, logFields...)
|
|
||||||
response.Error(r.RequestCtx, msg, statusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Downloader is a download request handler.
|
|
||||||
type Downloader struct {
|
|
||||||
log *zap.Logger
|
|
||||||
pool *pool.Pool
|
|
||||||
containerResolver *resolver.ContainerResolver
|
|
||||||
settings *Settings
|
|
||||||
tree *tree.Tree
|
|
||||||
}
|
|
||||||
|
|
||||||
// Settings stores reloading parameters, so it has to provide atomic getters and setters.
|
|
||||||
type Settings struct {
|
|
||||||
zipCompression atomic.Bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Settings) ZipCompression() bool {
|
|
||||||
return s.zipCompression.Load()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Settings) SetZipCompression(val bool) {
|
|
||||||
s.zipCompression.Store(val)
|
|
||||||
}
|
|
||||||
|
|
||||||
// New creates an instance of Downloader using specified options.
|
|
||||||
func New(params *utils.AppParams, settings *Settings, tree *tree.Tree) *Downloader {
|
|
||||||
return &Downloader{
|
|
||||||
log: params.Logger,
|
|
||||||
pool: params.Pool,
|
|
||||||
settings: settings,
|
|
||||||
containerResolver: params.Resolver,
|
|
||||||
tree: tree,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request {
|
|
||||||
return &request{
|
|
||||||
RequestCtx: ctx,
|
|
||||||
log: log,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format.
|
|
||||||
func (d *Downloader) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
|
||||||
test, _ := c.UserValue("oid").(string)
|
|
||||||
var id oid.ID
|
|
||||||
err := id.DecodeString(test)
|
|
||||||
if err != nil {
|
|
||||||
d.byBucketname(c, receiveFile)
|
|
||||||
} else {
|
|
||||||
d.byAddress(c, receiveFile)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
|
||||||
// prepares request and object address to it.
|
|
||||||
func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) {
|
|
||||||
var (
|
|
||||||
idCnr, _ = c.UserValue("cid").(string)
|
|
||||||
idObj, _ = c.UserValue("oid").(string)
|
|
||||||
log = d.log.With(zap.String("cid", idCnr), zap.String("oid", idObj))
|
|
||||||
)
|
|
||||||
|
|
||||||
ctx := utils.GetContextFromRequest(c)
|
|
||||||
|
|
||||||
cnrID, err := utils.GetContainerID(ctx, idCnr, d.containerResolver)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(logs.WrongContainerID, zap.Error(err))
|
|
||||||
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
objID := new(oid.ID)
|
|
||||||
if err = objID.DecodeString(idObj); err != nil {
|
|
||||||
log.Error(logs.WrongObjectID, zap.Error(err))
|
|
||||||
response.Error(c, "wrong object id", fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var addr oid.Address
|
|
||||||
addr.SetContainer(*cnrID)
|
|
||||||
addr.SetObject(*objID)
|
|
||||||
|
|
||||||
f(ctx, *d.newRequest(c, log), d.pool, addr)
|
|
||||||
}
|
|
||||||
|
|
||||||
// byBucketname is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
|
||||||
// prepares request and object address to it.
|
|
||||||
func (d *Downloader) byBucketname(req *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) {
|
|
||||||
var (
|
|
||||||
bucketname = req.UserValue("cid").(string)
|
|
||||||
key = req.UserValue("oid").(string)
|
|
||||||
log = d.log.With(zap.String("bucketname", bucketname), zap.String("key", key))
|
|
||||||
)
|
|
||||||
|
|
||||||
ctx := utils.GetContextFromRequest(req)
|
|
||||||
|
|
||||||
cnrID, err := utils.GetContainerID(ctx, bucketname, d.containerResolver)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(logs.WrongContainerID, zap.Error(err))
|
|
||||||
response.Error(req, "wrong container id", fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
foundOid, err := d.tree.GetLatestVersion(ctx, cnrID, key)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(logs.ObjectWasntFound, zap.Error(err))
|
|
||||||
response.Error(req, "object wasn't found", fasthttp.StatusNotFound)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if foundOid.DeleteMarker {
|
|
||||||
log.Error(logs.ObjectWasDeleted)
|
|
||||||
response.Error(req, "object deleted", fasthttp.StatusNotFound)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var addr oid.Address
|
|
||||||
addr.SetContainer(*cnrID)
|
|
||||||
addr.SetObject(foundOid.OID)
|
|
||||||
|
|
||||||
f(ctx, *d.newRequest(req, log), d.pool, addr)
|
|
||||||
}
|
|
||||||
|
|
||||||
// DownloadByAttribute handles attribute-based download requests.
|
|
||||||
func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) {
|
|
||||||
d.byAttribute(c, receiveFile)
|
|
||||||
}
|
|
||||||
|
|
||||||
// byAttribute is a wrapper similar to byAddress.
|
|
||||||
func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) {
|
|
||||||
var (
|
|
||||||
scid, _ = c.UserValue("cid").(string)
|
|
||||||
key, _ = url.QueryUnescape(c.UserValue("attr_key").(string))
|
|
||||||
val, _ = url.QueryUnescape(c.UserValue("attr_val").(string))
|
|
||||||
log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
|
|
||||||
)
|
|
||||||
|
|
||||||
ctx := utils.GetContextFromRequest(c)
|
|
||||||
|
|
||||||
containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(logs.WrongContainerID, zap.Error(err))
|
|
||||||
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := d.search(ctx, containerID, key, val, object.MatchStringEqual)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
|
||||||
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
defer res.Close()
|
|
||||||
|
|
||||||
buf := make([]oid.ID, 1)
|
|
||||||
|
|
||||||
n, err := res.Read(buf)
|
|
||||||
if n == 0 {
|
|
||||||
if errors.Is(err, io.EOF) {
|
|
||||||
log.Error(logs.ObjectNotFound, zap.Error(err))
|
|
||||||
response.Error(c, "object not found", fasthttp.StatusNotFound)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Error(logs.ReadObjectListFailed, zap.Error(err))
|
|
||||||
response.Error(c, "read object list failed: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
var addrObj oid.Address
|
|
||||||
addrObj.SetContainer(*containerID)
|
|
||||||
addrObj.SetObject(buf[0])
|
|
||||||
|
|
||||||
f(ctx, *d.newRequest(c, log), d.pool, addrObj)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Downloader) search(ctx context.Context, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) {
|
|
||||||
filters := object.NewSearchFilters()
|
|
||||||
filters.AddRootFilter()
|
|
||||||
filters.AddFilter(key, val, op)
|
|
||||||
|
|
||||||
var prm pool.PrmObjectSearch
|
|
||||||
prm.SetContainerID(*cid)
|
|
||||||
prm.SetFilters(filters)
|
|
||||||
if btoken := bearerToken(ctx); btoken != nil {
|
|
||||||
prm.UseBearer(*btoken)
|
|
||||||
}
|
|
||||||
|
|
||||||
return d.pool.SearchObjects(ctx, prm)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Downloader) getContainer(ctx context.Context, cnrID cid.ID) (container.Container, error) {
|
|
||||||
var prm pool.PrmContainerGet
|
|
||||||
prm.SetContainerID(cnrID)
|
|
||||||
|
|
||||||
return d.pool.GetContainer(ctx, prm)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Downloader) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer, error) {
|
|
||||||
method := zip.Store
|
|
||||||
if d.settings.ZipCompression() {
|
|
||||||
method = zip.Deflate
|
|
||||||
}
|
|
||||||
|
|
||||||
filePath := getZipFilePath(obj)
|
|
||||||
if len(filePath) == 0 || filePath[len(filePath)-1] == '/' {
|
|
||||||
return nil, fmt.Errorf("invalid filepath '%s'", filePath)
|
|
||||||
}
|
|
||||||
|
|
||||||
return zw.CreateHeader(&zip.FileHeader{
|
|
||||||
Name: filePath,
|
|
||||||
Method: method,
|
|
||||||
Modified: time.Now(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// DownloadZipped handles zip by prefix requests.
|
|
||||||
func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) {
|
|
||||||
scid, _ := c.UserValue("cid").(string)
|
|
||||||
prefix, _ := url.QueryUnescape(c.UserValue("prefix").(string))
|
|
||||||
log := d.log.With(zap.String("cid", scid), zap.String("prefix", prefix))
|
|
||||||
|
|
||||||
ctx := utils.GetContextFromRequest(c)
|
|
||||||
|
|
||||||
containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(logs.WrongContainerID, zap.Error(err))
|
|
||||||
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// check if container exists here to be able to return 404 error,
|
|
||||||
// otherwise we get this error only in object iteration step
|
|
||||||
// and client get 200 OK.
|
|
||||||
if _, err = d.getContainer(ctx, *containerID); err != nil {
|
|
||||||
log.Error(logs.CouldNotCheckContainerExistence, zap.Error(err))
|
|
||||||
if client.IsErrContainerNotFound(err) {
|
|
||||||
response.Error(c, "Not Found", fasthttp.StatusNotFound)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
response.Error(c, "could not check container existence: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
resSearch, err := d.search(ctx, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
|
||||||
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
c.Response.Header.Set(fasthttp.HeaderContentType, "application/zip")
|
|
||||||
c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"")
|
|
||||||
c.Response.SetStatusCode(http.StatusOK)
|
|
||||||
|
|
||||||
c.SetBodyStreamWriter(func(w *bufio.Writer) {
|
|
||||||
defer resSearch.Close()
|
|
||||||
|
|
||||||
zipWriter := zip.NewWriter(w)
|
|
||||||
|
|
||||||
var bufZip []byte
|
|
||||||
var addr oid.Address
|
|
||||||
|
|
||||||
empty := true
|
|
||||||
called := false
|
|
||||||
btoken := bearerToken(ctx)
|
|
||||||
addr.SetContainer(*containerID)
|
|
||||||
|
|
||||||
errIter := resSearch.Iterate(func(id oid.ID) bool {
|
|
||||||
called = true
|
|
||||||
|
|
||||||
if empty {
|
|
||||||
bufZip = make([]byte, 3<<20) // the same as for upload
|
|
||||||
}
|
|
||||||
empty = false
|
|
||||||
|
|
||||||
addr.SetObject(id)
|
|
||||||
if err = d.zipObject(ctx, zipWriter, addr, btoken, bufZip); err != nil {
|
|
||||||
log.Error(logs.FailedToAddObjectToArchive, zap.String("oid", id.EncodeToString()), zap.Error(err))
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
if errIter != nil {
|
|
||||||
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter))
|
|
||||||
} else if !called {
|
|
||||||
log.Error(logs.ObjectsNotFound)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = zipWriter.Close(); err != nil {
|
|
||||||
log.Error(logs.CloseZipWriter, zap.Error(err))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
func (d *Downloader) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error {
|
|
||||||
var prm pool.PrmObjectGet
|
|
||||||
prm.SetAddress(addr)
|
|
||||||
if btoken != nil {
|
|
||||||
prm.UseBearer(*btoken)
|
|
||||||
}
|
|
||||||
|
|
||||||
resGet, err := d.pool.GetObject(ctx, prm)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("get FrostFS object: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
objWriter, err := d.addObjectToZip(zipWriter, &resGet.Header)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("zip create header: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if _, err = io.CopyBuffer(objWriter, resGet.Payload, bufZip); err != nil {
|
|
||||||
return fmt.Errorf("copy object payload to zip file: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = resGet.Payload.Close(); err != nil {
|
|
||||||
return fmt.Errorf("object body close error: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = zipWriter.Flush(); err != nil {
|
|
||||||
return fmt.Errorf("flush zip writer: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getZipFilePath(obj *object.Object) string {
|
|
||||||
for _, attr := range obj.Attributes() {
|
|
||||||
if attr.Key() == object.AttributeFilePath {
|
|
||||||
return attr.Value()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return ""
|
|
||||||
}
|
|
2
go.mod
2
go.mod
|
@ -17,7 +17,6 @@ require (
|
||||||
github.com/valyala/fasthttp v1.34.0
|
github.com/valyala/fasthttp v1.34.0
|
||||||
go.opentelemetry.io/otel v1.16.0
|
go.opentelemetry.io/otel v1.16.0
|
||||||
go.opentelemetry.io/otel/trace v1.16.0
|
go.opentelemetry.io/otel/trace v1.16.0
|
||||||
go.uber.org/atomic v1.10.0
|
|
||||||
go.uber.org/zap v1.24.0
|
go.uber.org/zap v1.24.0
|
||||||
google.golang.org/grpc v1.55.0
|
google.golang.org/grpc v1.55.0
|
||||||
)
|
)
|
||||||
|
@ -97,6 +96,7 @@ require (
|
||||||
go.opentelemetry.io/otel/metric v1.16.0 // indirect
|
go.opentelemetry.io/otel/metric v1.16.0 // indirect
|
||||||
go.opentelemetry.io/otel/sdk v1.16.0 // indirect
|
go.opentelemetry.io/otel/sdk v1.16.0 // indirect
|
||||||
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
|
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
|
||||||
|
go.uber.org/atomic v1.10.0 // indirect
|
||||||
go.uber.org/multierr v1.11.0 // indirect
|
go.uber.org/multierr v1.11.0 // indirect
|
||||||
golang.org/x/crypto v0.9.0 // indirect
|
golang.org/x/crypto v0.9.0 // indirect
|
||||||
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
|
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
|
||||||
|
|
|
@ -4,7 +4,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/api"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/api"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
)
|
)
|
||||||
|
|
210
internal/handler/download.go
Normal file
210
internal/handler/download.go
Normal file
|
@ -0,0 +1,210 @@
|
||||||
|
package handler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"archive/zip"
|
||||||
|
"bufio"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
|
"github.com/valyala/fasthttp"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
// DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format.
|
||||||
|
func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
||||||
|
test, _ := c.UserValue("oid").(string)
|
||||||
|
var id oid.ID
|
||||||
|
err := id.DecodeString(test)
|
||||||
|
if err != nil {
|
||||||
|
h.byBucketname(c, h.receiveFile)
|
||||||
|
} else {
|
||||||
|
h.byAddress(c, h.receiveFile)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request {
|
||||||
|
return &request{
|
||||||
|
RequestCtx: ctx,
|
||||||
|
log: log,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// DownloadByAttribute handles attribute-based download requests.
|
||||||
|
func (h *Handler) DownloadByAttribute(c *fasthttp.RequestCtx) {
|
||||||
|
h.byAttribute(c, h.receiveFile)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) search(ctx context.Context, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) {
|
||||||
|
filters := object.NewSearchFilters()
|
||||||
|
filters.AddRootFilter()
|
||||||
|
filters.AddFilter(key, val, op)
|
||||||
|
|
||||||
|
var prm pool.PrmObjectSearch
|
||||||
|
prm.SetContainerID(*cid)
|
||||||
|
prm.SetFilters(filters)
|
||||||
|
if btoken := bearerToken(ctx); btoken != nil {
|
||||||
|
prm.UseBearer(*btoken)
|
||||||
|
}
|
||||||
|
|
||||||
|
return h.pool.SearchObjects(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) getContainer(ctx context.Context, cnrID cid.ID) (container.Container, error) {
|
||||||
|
var prm pool.PrmContainerGet
|
||||||
|
prm.SetContainerID(cnrID)
|
||||||
|
|
||||||
|
return h.pool.GetContainer(ctx, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer, error) {
|
||||||
|
method := zip.Store
|
||||||
|
if h.settings.ZipCompression() {
|
||||||
|
method = zip.Deflate
|
||||||
|
}
|
||||||
|
|
||||||
|
filePath := getZipFilePath(obj)
|
||||||
|
if len(filePath) == 0 || filePath[len(filePath)-1] == '/' {
|
||||||
|
return nil, fmt.Errorf("invalid filepath '%s'", filePath)
|
||||||
|
}
|
||||||
|
|
||||||
|
return zw.CreateHeader(&zip.FileHeader{
|
||||||
|
Name: filePath,
|
||||||
|
Method: method,
|
||||||
|
Modified: time.Now(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// DownloadZipped handles zip by prefix requests.
|
||||||
|
func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
|
||||||
|
scid, _ := c.UserValue("cid").(string)
|
||||||
|
prefix, _ := url.QueryUnescape(c.UserValue("prefix").(string))
|
||||||
|
log := h.log.With(zap.String("cid", scid), zap.String("prefix", prefix))
|
||||||
|
|
||||||
|
ctx := utils.GetContextFromRequest(c)
|
||||||
|
|
||||||
|
containerID, err := h.getContainerID(ctx, scid)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(logs.WrongContainerID, zap.Error(err))
|
||||||
|
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if container exists here to be able to return 404 error,
|
||||||
|
// otherwise we get this error only in object iteration step
|
||||||
|
// and client get 200 OK.
|
||||||
|
if _, err = h.getContainer(ctx, *containerID); err != nil {
|
||||||
|
log.Error(logs.CouldNotCheckContainerExistence, zap.Error(err))
|
||||||
|
if client.IsErrContainerNotFound(err) {
|
||||||
|
response.Error(c, "Not Found", fasthttp.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
response.Error(c, "could not check container existence: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
resSearch, err := h.search(ctx, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
||||||
|
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Response.Header.Set(fasthttp.HeaderContentType, "application/zip")
|
||||||
|
c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"")
|
||||||
|
c.Response.SetStatusCode(http.StatusOK)
|
||||||
|
|
||||||
|
c.SetBodyStreamWriter(func(w *bufio.Writer) {
|
||||||
|
defer resSearch.Close()
|
||||||
|
|
||||||
|
zipWriter := zip.NewWriter(w)
|
||||||
|
|
||||||
|
var bufZip []byte
|
||||||
|
var addr oid.Address
|
||||||
|
|
||||||
|
empty := true
|
||||||
|
called := false
|
||||||
|
btoken := bearerToken(ctx)
|
||||||
|
addr.SetContainer(*containerID)
|
||||||
|
|
||||||
|
errIter := resSearch.Iterate(func(id oid.ID) bool {
|
||||||
|
called = true
|
||||||
|
|
||||||
|
if empty {
|
||||||
|
bufZip = make([]byte, 3<<20) // the same as for upload
|
||||||
|
}
|
||||||
|
empty = false
|
||||||
|
|
||||||
|
addr.SetObject(id)
|
||||||
|
if err = h.zipObject(ctx, zipWriter, addr, btoken, bufZip); err != nil {
|
||||||
|
log.Error(logs.FailedToAddObjectToArchive, zap.String("oid", id.EncodeToString()), zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
})
|
||||||
|
if errIter != nil {
|
||||||
|
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter))
|
||||||
|
} else if !called {
|
||||||
|
log.Error(logs.ObjectsNotFound)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = zipWriter.Close(); err != nil {
|
||||||
|
log.Error(logs.CloseZipWriter, zap.Error(err))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error {
|
||||||
|
var prm pool.PrmObjectGet
|
||||||
|
prm.SetAddress(addr)
|
||||||
|
if btoken != nil {
|
||||||
|
prm.UseBearer(*btoken)
|
||||||
|
}
|
||||||
|
|
||||||
|
resGet, err := h.pool.GetObject(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get FrostFS object: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
objWriter, err := h.addObjectToZip(zipWriter, &resGet.Header)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("zip create header: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err = io.CopyBuffer(objWriter, resGet.Payload, bufZip); err != nil {
|
||||||
|
return fmt.Errorf("copy object payload to zip file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = resGet.Payload.Close(); err != nil {
|
||||||
|
return fmt.Errorf("object body close error: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = zipWriter.Flush(); err != nil {
|
||||||
|
return fmt.Errorf("flush zip writer: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getZipFilePath(obj *object.Object) string {
|
||||||
|
for _, attr := range obj.Attributes() {
|
||||||
|
if attr.Key() == object.AttributeFilePath {
|
||||||
|
return attr.Value()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ""
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package uploader
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
|
@ -1,6 +1,6 @@
|
||||||
//go:build !integration
|
//go:build !integration
|
||||||
|
|
||||||
package uploader
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"testing"
|
"testing"
|
193
internal/handler/handler.go
Normal file
193
internal/handler/handler.go
Normal file
|
@ -0,0 +1,193 @@
|
||||||
|
package handler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"io"
|
||||||
|
"net/url"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
|
"github.com/valyala/fasthttp"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Handler struct {
|
||||||
|
log *zap.Logger
|
||||||
|
pool *pool.Pool
|
||||||
|
ownerID *user.ID
|
||||||
|
settings *Settings
|
||||||
|
containerResolver *resolver.ContainerResolver
|
||||||
|
tree *tree.Tree
|
||||||
|
}
|
||||||
|
|
||||||
|
// Settings stores reloading parameters, so it has to provide atomic getters and setters.
|
||||||
|
type Settings struct {
|
||||||
|
defaultTimestamp atomic.Bool
|
||||||
|
zipCompression atomic.Bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Settings) DefaultTimestamp() bool {
|
||||||
|
return s.defaultTimestamp.Load()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Settings) SetDefaultTimestamp(val bool) {
|
||||||
|
s.defaultTimestamp.Store(val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Settings) ZipCompression() bool {
|
||||||
|
return s.zipCompression.Load()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Settings) SetZipCompression(val bool) {
|
||||||
|
s.zipCompression.Store(val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(params *utils.AppParams, settings *Settings, tree *tree.Tree) *Handler {
|
||||||
|
return &Handler{
|
||||||
|
log: params.Logger,
|
||||||
|
pool: params.Pool,
|
||||||
|
ownerID: params.Owner,
|
||||||
|
settings: settings,
|
||||||
|
containerResolver: params.Resolver,
|
||||||
|
tree: tree,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// getContainerID decode container id, if it's not a valid container id
|
||||||
|
// then trey to resolve name using provided resolver.
|
||||||
|
func (h *Handler) getContainerID(ctx context.Context, containerID string) (*cid.ID, error) {
|
||||||
|
cnrID := new(cid.ID)
|
||||||
|
err := cnrID.DecodeString(containerID)
|
||||||
|
if err != nil {
|
||||||
|
cnrID, err = h.containerResolver.Resolve(ctx, containerID)
|
||||||
|
}
|
||||||
|
return cnrID, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||||
|
// prepares request and object address to it.
|
||||||
|
func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||||
|
var (
|
||||||
|
idCnr, _ = c.UserValue("cid").(string)
|
||||||
|
idObj, _ = c.UserValue("oid").(string)
|
||||||
|
log = h.log.With(zap.String("cid", idCnr), zap.String("oid", idObj))
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx := utils.GetContextFromRequest(c)
|
||||||
|
|
||||||
|
cnrID, err := h.getContainerID(ctx, idCnr)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(logs.WrongContainerID, zap.Error(err))
|
||||||
|
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
objID := new(oid.ID)
|
||||||
|
if err = objID.DecodeString(idObj); err != nil {
|
||||||
|
log.Error(logs.WrongObjectID, zap.Error(err))
|
||||||
|
response.Error(c, "wrong object id", fasthttp.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(*cnrID)
|
||||||
|
addr.SetObject(*objID)
|
||||||
|
|
||||||
|
f(ctx, *h.newRequest(c, log), addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// byBucketname is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||||
|
// prepares request and object address to it.
|
||||||
|
func (h *Handler) byBucketname(req *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||||
|
var (
|
||||||
|
bucketname = req.UserValue("cid").(string)
|
||||||
|
key = req.UserValue("oid").(string)
|
||||||
|
log = h.log.With(zap.String("bucketname", bucketname), zap.String("key", key))
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx := utils.GetContextFromRequest(req)
|
||||||
|
|
||||||
|
cnrID, err := h.getContainerID(ctx, bucketname)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(logs.WrongContainerID, zap.Error(err))
|
||||||
|
response.Error(req, "wrong container id", fasthttp.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
foundOid, err := h.tree.GetLatestVersion(ctx, cnrID, key)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(logs.ObjectWasntFound, zap.Error(err))
|
||||||
|
response.Error(req, "object wasn't found", fasthttp.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if foundOid.DeleteMarker {
|
||||||
|
log.Error(logs.ObjectWasDeleted)
|
||||||
|
response.Error(req, "object deleted", fasthttp.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var addr oid.Address
|
||||||
|
addr.SetContainer(*cnrID)
|
||||||
|
addr.SetObject(foundOid.OID)
|
||||||
|
|
||||||
|
f(ctx, *h.newRequest(req, log), addr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// byAttribute is a wrapper similar to byAddress.
|
||||||
|
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||||
|
var (
|
||||||
|
scid, _ = c.UserValue("cid").(string)
|
||||||
|
key, _ = url.QueryUnescape(c.UserValue("attr_key").(string))
|
||||||
|
val, _ = url.QueryUnescape(c.UserValue("attr_val").(string))
|
||||||
|
log = h.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx := utils.GetContextFromRequest(c)
|
||||||
|
|
||||||
|
containerID, err := h.getContainerID(ctx, scid)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(logs.WrongContainerID, zap.Error(err))
|
||||||
|
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := h.search(ctx, containerID, key, val, object.MatchStringEqual)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
||||||
|
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
defer res.Close()
|
||||||
|
|
||||||
|
buf := make([]oid.ID, 1)
|
||||||
|
|
||||||
|
n, err := res.Read(buf)
|
||||||
|
if n == 0 {
|
||||||
|
if errors.Is(err, io.EOF) {
|
||||||
|
log.Error(logs.ObjectNotFound, zap.Error(err))
|
||||||
|
response.Error(c, "object not found", fasthttp.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Error(logs.ReadObjectListFailed, zap.Error(err))
|
||||||
|
response.Error(c, "read object list failed: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var addrObj oid.Address
|
||||||
|
addrObj.SetContainer(*containerID)
|
||||||
|
addrObj.SetObject(buf[0])
|
||||||
|
|
||||||
|
f(ctx, *h.newRequest(c, log), addrObj)
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
package downloader
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -25,7 +25,7 @@ const (
|
||||||
hdrContainerID = "X-Container-Id"
|
hdrContainerID = "X-Container-Id"
|
||||||
)
|
)
|
||||||
|
|
||||||
func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) {
|
func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid.Address) {
|
||||||
var start = time.Now()
|
var start = time.Now()
|
||||||
|
|
||||||
btoken := bearerToken(ctx)
|
btoken := bearerToken(ctx)
|
||||||
|
@ -36,7 +36,7 @@ func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress
|
||||||
prm.UseBearer(*btoken)
|
prm.UseBearer(*btoken)
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err := clnt.HeadObject(ctx, prm)
|
obj, err := h.pool.HeadObject(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
req.handleFrostFSErr(err, start)
|
req.handleFrostFSErr(err, start)
|
||||||
return
|
return
|
||||||
|
@ -81,7 +81,7 @@ func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress
|
||||||
prmRange.UseBearer(*btoken)
|
prmRange.UseBearer(*btoken)
|
||||||
}
|
}
|
||||||
|
|
||||||
resObj, err := clnt.ObjectRange(ctx, prmRange)
|
resObj, err := h.pool.ObjectRange(ctx, prmRange)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -104,19 +104,19 @@ func idsToResponse(resp *fasthttp.Response, obj *object.Object) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// HeadByAddressOrBucketName handles head requests using simple cid/oid or bucketname/key format.
|
// HeadByAddressOrBucketName handles head requests using simple cid/oid or bucketname/key format.
|
||||||
func (d *Downloader) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
||||||
test, _ := c.UserValue("oid").(string)
|
test, _ := c.UserValue("oid").(string)
|
||||||
var id oid.ID
|
var id oid.ID
|
||||||
|
|
||||||
err := id.DecodeString(test)
|
err := id.DecodeString(test)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.byBucketname(c, headObject)
|
h.byBucketname(c, h.headObject)
|
||||||
} else {
|
} else {
|
||||||
d.byAddress(c, headObject)
|
h.byAddress(c, h.headObject)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// HeadByAttribute handles attribute-based head requests.
|
// HeadByAttribute handles attribute-based head requests.
|
||||||
func (d *Downloader) HeadByAttribute(c *fasthttp.RequestCtx) {
|
func (h *Handler) HeadByAttribute(c *fasthttp.RequestCtx) {
|
||||||
d.byAttribute(c, headObject)
|
h.byAttribute(c, h.headObject)
|
||||||
}
|
}
|
|
@ -1,10 +1,10 @@
|
||||||
package uploader
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/multipart"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/uploader/multipart"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
//go:build !integration
|
//go:build !integration
|
||||||
|
|
||||||
package uploader
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"crypto/rand"
|
"crypto/rand"
|
141
internal/handler/reader.go
Normal file
141
internal/handler/reader.go
Normal file
|
@ -0,0 +1,141 @@
|
||||||
|
package handler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"path"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
|
"github.com/valyala/fasthttp"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type readCloser struct {
|
||||||
|
io.Reader
|
||||||
|
io.Closer
|
||||||
|
}
|
||||||
|
|
||||||
|
// initializes io.Reader with the limited size and detects Content-Type from it.
|
||||||
|
// Returns r's error directly. Also returns the processed data.
|
||||||
|
func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (string, []byte, error) {
|
||||||
|
if maxSize > sizeToDetectType {
|
||||||
|
maxSize = sizeToDetectType
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := make([]byte, maxSize) // maybe sync-pool the slice?
|
||||||
|
|
||||||
|
r, err := rInit(maxSize)
|
||||||
|
if err != nil {
|
||||||
|
return "", nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
n, err := r.Read(buf)
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
return "", nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
buf = buf[:n]
|
||||||
|
|
||||||
|
return http.DetectContentType(buf), buf, err // to not lose io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oid.Address) {
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
dis = "inline"
|
||||||
|
start = time.Now()
|
||||||
|
filename string
|
||||||
|
)
|
||||||
|
|
||||||
|
var prm pool.PrmObjectGet
|
||||||
|
prm.SetAddress(objectAddress)
|
||||||
|
if btoken := bearerToken(ctx); btoken != nil {
|
||||||
|
prm.UseBearer(*btoken)
|
||||||
|
}
|
||||||
|
|
||||||
|
rObj, err := h.pool.GetObject(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
req.handleFrostFSErr(err, start)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// we can't close reader in this function, so how to do it?
|
||||||
|
|
||||||
|
if req.Request.URI().QueryArgs().GetBool("download") {
|
||||||
|
dis = "attachment"
|
||||||
|
}
|
||||||
|
|
||||||
|
payloadSize := rObj.Header.PayloadSize()
|
||||||
|
|
||||||
|
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10))
|
||||||
|
var contentType string
|
||||||
|
for _, attr := range rObj.Header.Attributes() {
|
||||||
|
key := attr.Key()
|
||||||
|
val := attr.Value()
|
||||||
|
if !isValidToken(key) || !isValidValue(val) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
key = utils.BackwardTransformIfSystem(key)
|
||||||
|
|
||||||
|
req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val)
|
||||||
|
switch key {
|
||||||
|
case object.AttributeFileName:
|
||||||
|
filename = val
|
||||||
|
case object.AttributeTimestamp:
|
||||||
|
value, err := strconv.ParseInt(val, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
req.log.Info(logs.CouldntParseCreationDate,
|
||||||
|
zap.String("key", key),
|
||||||
|
zap.String("val", val),
|
||||||
|
zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
req.Response.Header.Set(fasthttp.HeaderLastModified,
|
||||||
|
time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
||||||
|
case object.AttributeContentType:
|
||||||
|
contentType = val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
idsToResponse(&req.Response, &rObj.Header)
|
||||||
|
|
||||||
|
if len(contentType) == 0 {
|
||||||
|
// determine the Content-Type from the payload head
|
||||||
|
var payloadHead []byte
|
||||||
|
|
||||||
|
contentType, payloadHead, err = readContentType(payloadSize, func(uint64) (io.Reader, error) {
|
||||||
|
return rObj.Payload, nil
|
||||||
|
})
|
||||||
|
if err != nil && err != io.EOF {
|
||||||
|
req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err))
|
||||||
|
response.Error(req.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset payload reader since a part of the data has been read
|
||||||
|
var headReader io.Reader = bytes.NewReader(payloadHead)
|
||||||
|
|
||||||
|
if err != io.EOF { // otherwise, we've already read full payload
|
||||||
|
headReader = io.MultiReader(headReader, rObj.Payload)
|
||||||
|
}
|
||||||
|
|
||||||
|
// note: we could do with io.Reader, but SetBodyStream below closes body stream
|
||||||
|
// if it implements io.Closer and that's useful for us.
|
||||||
|
rObj.Payload = readCloser{headReader, rObj.Payload}
|
||||||
|
}
|
||||||
|
req.SetContentType(contentType)
|
||||||
|
|
||||||
|
req.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
||||||
|
|
||||||
|
req.Response.SetBodyStream(rObj.Payload, int(payloadSize))
|
||||||
|
}
|
|
@ -1,6 +1,6 @@
|
||||||
//go:build !integration
|
//go:build !integration
|
||||||
|
|
||||||
package downloader
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"io"
|
"io"
|
|
@ -1,4 +1,4 @@
|
||||||
package uploader
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -9,7 +9,6 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
|
@ -17,9 +16,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
"go.uber.org/atomic"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -28,55 +25,39 @@ const (
|
||||||
drainBufSize = 4096
|
drainBufSize = 4096
|
||||||
)
|
)
|
||||||
|
|
||||||
// Uploader is an upload request handler.
|
type putResponse struct {
|
||||||
type Uploader struct {
|
ObjectID string `json:"object_id"`
|
||||||
log *zap.Logger
|
ContainerID string `json:"container_id"`
|
||||||
pool *pool.Pool
|
|
||||||
ownerID *user.ID
|
|
||||||
settings *Settings
|
|
||||||
containerResolver *resolver.ContainerResolver
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Settings stores reloading parameters, so it has to provide atomic getters and setters.
|
func newPutResponse(addr oid.Address) *putResponse {
|
||||||
type Settings struct {
|
return &putResponse{
|
||||||
defaultTimestamp atomic.Bool
|
ObjectID: addr.Object().EncodeToString(),
|
||||||
|
ContainerID: addr.Container().EncodeToString(),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Settings) DefaultTimestamp() bool {
|
func (pr *putResponse) encode(w io.Writer) error {
|
||||||
return s.defaultTimestamp.Load()
|
enc := json.NewEncoder(w)
|
||||||
}
|
enc.SetIndent("", "\t")
|
||||||
|
return enc.Encode(pr)
|
||||||
func (s *Settings) SetDefaultTimestamp(val bool) {
|
|
||||||
s.defaultTimestamp.Store(val)
|
|
||||||
}
|
|
||||||
|
|
||||||
// New creates a new Uploader using specified logger, connection pool and
|
|
||||||
// other options.
|
|
||||||
func New(params *utils.AppParams, settings *Settings) *Uploader {
|
|
||||||
return &Uploader{
|
|
||||||
log: params.Logger,
|
|
||||||
pool: params.Pool,
|
|
||||||
ownerID: params.Owner,
|
|
||||||
settings: settings,
|
|
||||||
containerResolver: params.Resolver,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Upload handles multipart upload request.
|
// Upload handles multipart upload request.
|
||||||
func (u *Uploader) Upload(req *fasthttp.RequestCtx) {
|
func (h *Handler) Upload(req *fasthttp.RequestCtx) {
|
||||||
var (
|
var (
|
||||||
file MultipartFile
|
file MultipartFile
|
||||||
idObj oid.ID
|
idObj oid.ID
|
||||||
addr oid.Address
|
addr oid.Address
|
||||||
scid, _ = req.UserValue("cid").(string)
|
scid, _ = req.UserValue("cid").(string)
|
||||||
log = u.log.With(zap.String("cid", scid))
|
log = h.log.With(zap.String("cid", scid))
|
||||||
bodyStream = req.RequestBodyStream()
|
bodyStream = req.RequestBodyStream()
|
||||||
drainBuf = make([]byte, drainBufSize)
|
drainBuf = make([]byte, drainBufSize)
|
||||||
)
|
)
|
||||||
|
|
||||||
ctx := utils.GetContextFromRequest(req)
|
ctx := utils.GetContextFromRequest(req)
|
||||||
|
|
||||||
idCnr, err := utils.GetContainerID(ctx, scid, u.containerResolver)
|
idCnr, err := h.getContainerID(ctx, scid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.WrongContainerID, zap.Error(err))
|
log.Error(logs.WrongContainerID, zap.Error(err))
|
||||||
response.Error(req, "wrong container id", fasthttp.StatusBadRequest)
|
response.Error(req, "wrong container id", fasthttp.StatusBadRequest)
|
||||||
|
@ -97,12 +78,12 @@ func (u *Uploader) Upload(req *fasthttp.RequestCtx) {
|
||||||
)
|
)
|
||||||
}()
|
}()
|
||||||
boundary := string(req.Request.Header.MultipartFormBoundary())
|
boundary := string(req.Request.Header.MultipartFormBoundary())
|
||||||
if file, err = fetchMultipartFile(u.log, bodyStream, boundary); err != nil {
|
if file, err = fetchMultipartFile(h.log, bodyStream, boundary); err != nil {
|
||||||
log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err))
|
log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err))
|
||||||
response.Error(req, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
response.Error(req, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
filtered, err := filterHeaders(u.log, &req.Request.Header)
|
filtered, err := filterHeaders(h.log, &req.Request.Header)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.CouldNotProcessHeaders, zap.Error(err))
|
log.Error(logs.CouldNotProcessHeaders, zap.Error(err))
|
||||||
response.Error(req, err.Error(), fasthttp.StatusBadRequest)
|
response.Error(req, err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
@ -118,7 +99,7 @@ func (u *Uploader) Upload(req *fasthttp.RequestCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = utils.PrepareExpirationHeader(req, u.pool, filtered, now); err != nil {
|
if err = utils.PrepareExpirationHeader(req, h.pool, filtered, now); err != nil {
|
||||||
log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err))
|
log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err))
|
||||||
response.Error(req, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest)
|
response.Error(req, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
|
@ -140,7 +121,7 @@ func (u *Uploader) Upload(req *fasthttp.RequestCtx) {
|
||||||
attributes = append(attributes, *filename)
|
attributes = append(attributes, *filename)
|
||||||
}
|
}
|
||||||
// sets Timestamp attribute if it wasn't set from header and enabled by settings
|
// sets Timestamp attribute if it wasn't set from header and enabled by settings
|
||||||
if _, ok := filtered[object.AttributeTimestamp]; !ok && u.settings.DefaultTimestamp() {
|
if _, ok := filtered[object.AttributeTimestamp]; !ok && h.settings.DefaultTimestamp() {
|
||||||
timestamp := object.NewAttribute()
|
timestamp := object.NewAttribute()
|
||||||
timestamp.SetKey(object.AttributeTimestamp)
|
timestamp.SetKey(object.AttributeTimestamp)
|
||||||
timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10))
|
timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10))
|
||||||
|
@ -149,20 +130,20 @@ func (u *Uploader) Upload(req *fasthttp.RequestCtx) {
|
||||||
|
|
||||||
obj := object.New()
|
obj := object.New()
|
||||||
obj.SetContainerID(*idCnr)
|
obj.SetContainerID(*idCnr)
|
||||||
obj.SetOwnerID(u.ownerID)
|
obj.SetOwnerID(h.ownerID)
|
||||||
obj.SetAttributes(attributes...)
|
obj.SetAttributes(attributes...)
|
||||||
|
|
||||||
var prm pool.PrmObjectPut
|
var prm pool.PrmObjectPut
|
||||||
prm.SetHeader(*obj)
|
prm.SetHeader(*obj)
|
||||||
prm.SetPayload(file)
|
prm.SetPayload(file)
|
||||||
|
|
||||||
bt := u.fetchBearerToken(ctx)
|
bt := h.fetchBearerToken(ctx)
|
||||||
if bt != nil {
|
if bt != nil {
|
||||||
prm.UseBearer(*bt)
|
prm.UseBearer(*bt)
|
||||||
}
|
}
|
||||||
|
|
||||||
if idObj, err = u.pool.PutObject(ctx, prm); err != nil {
|
if idObj, err = h.pool.PutObject(ctx, prm); err != nil {
|
||||||
u.handlePutFrostFSErr(req, err)
|
h.handlePutFrostFSErr(req, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -193,35 +174,17 @@ func (u *Uploader) Upload(req *fasthttp.RequestCtx) {
|
||||||
req.Response.Header.SetContentType(jsonHeader)
|
req.Response.Header.SetContentType(jsonHeader)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *Uploader) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error) {
|
func (h *Handler) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error) {
|
||||||
statusCode, msg, additionalFields := response.FormErrorResponse("could not store file in frostfs", err)
|
statusCode, msg, additionalFields := response.FormErrorResponse("could not store file in frostfs", err)
|
||||||
logFields := append([]zap.Field{zap.Error(err)}, additionalFields...)
|
logFields := append([]zap.Field{zap.Error(err)}, additionalFields...)
|
||||||
|
|
||||||
u.log.Error(logs.CouldNotStoreFileInFrostfs, logFields...)
|
h.log.Error(logs.CouldNotStoreFileInFrostfs, logFields...)
|
||||||
response.Error(r, msg, statusCode)
|
response.Error(r, msg, statusCode)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (u *Uploader) fetchBearerToken(ctx context.Context) *bearer.Token {
|
func (h *Handler) fetchBearerToken(ctx context.Context) *bearer.Token {
|
||||||
if tkn, err := tokens.LoadBearerToken(ctx); err == nil && tkn != nil {
|
if tkn, err := tokens.LoadBearerToken(ctx); err == nil && tkn != nil {
|
||||||
return tkn
|
return tkn
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type putResponse struct {
|
|
||||||
ObjectID string `json:"object_id"`
|
|
||||||
ContainerID string `json:"container_id"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func newPutResponse(addr oid.Address) *putResponse {
|
|
||||||
return &putResponse{
|
|
||||||
ObjectID: addr.Object().EncodeToString(),
|
|
||||||
ContainerID: addr.Container().EncodeToString(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (pr *putResponse) encode(w io.Writer) error {
|
|
||||||
enc := json.NewEncoder(w)
|
|
||||||
enc.SetIndent("", "\t")
|
|
||||||
return enc.Encode(pr)
|
|
||||||
}
|
|
60
internal/handler/utils.go
Normal file
60
internal/handler/utils.go
Normal file
|
@ -0,0 +1,60 @@
|
||||||
|
package handler
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
|
"github.com/valyala/fasthttp"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type request struct {
|
||||||
|
*fasthttp.RequestCtx
|
||||||
|
log *zap.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) handleFrostFSErr(err error, start time.Time) {
|
||||||
|
logFields := []zap.Field{
|
||||||
|
zap.Stringer("elapsed", time.Since(start)),
|
||||||
|
zap.Error(err),
|
||||||
|
}
|
||||||
|
statusCode, msg, additionalFields := response.FormErrorResponse("could not receive object", err)
|
||||||
|
logFields = append(logFields, additionalFields...)
|
||||||
|
|
||||||
|
r.log.Error(logs.CouldNotReceiveObject, logFields...)
|
||||||
|
response.Error(r.RequestCtx, msg, statusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
func bearerToken(ctx context.Context) *bearer.Token {
|
||||||
|
if tkn, err := tokens.LoadBearerToken(ctx); err == nil {
|
||||||
|
return tkn
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func isValidToken(s string) bool {
|
||||||
|
for _, c := range s {
|
||||||
|
if c <= ' ' || c > 127 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if strings.ContainsRune("()<>@,;:\\\"/[]?={}", c) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func isValidValue(s string) bool {
|
||||||
|
for _, c := range s {
|
||||||
|
// HTTP specification allows for more technically, but we don't want to escape things.
|
||||||
|
if c < ' ' || c > 127 || c == '"' {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
|
@ -5,8 +5,8 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/api"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/api"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/api/layer"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/api/layer"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
)
|
)
|
||||||
|
|
|
@ -4,23 +4,10 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
)
|
)
|
||||||
|
|
||||||
// GetContainerID decode container id, if it's not a valid container id
|
|
||||||
// then trey to resolve name using provided resolver.
|
|
||||||
func GetContainerID(ctx context.Context, containerID string, resolver *resolver.ContainerResolver) (*cid.ID, error) {
|
|
||||||
cnrID := new(cid.ID)
|
|
||||||
err := cnrID.DecodeString(containerID)
|
|
||||||
if err != nil {
|
|
||||||
cnrID, err = resolver.Resolve(ctx, containerID)
|
|
||||||
}
|
|
||||||
return cnrID, err
|
|
||||||
}
|
|
||||||
|
|
||||||
type EpochDurations struct {
|
type EpochDurations struct {
|
||||||
CurrentEpoch uint64
|
CurrentEpoch uint64
|
||||||
MsPerBlock int64
|
MsPerBlock int64
|
||||||
|
|
Loading…
Reference in a new issue