From d2199435426b673f89fef39e7234061bd476caad Mon Sep 17 00:00:00 2001 From: Marina Biryukova Date: Thu, 31 Aug 2023 11:37:03 +0300 Subject: [PATCH] [#73] Uploader, downloader structures refactoring Signed-off-by: Marina Biryukova --- Dockerfile => .docker/Dockerfile | 0 Dockerfile.dirty => .docker/Dockerfile.dirty | 0 Makefile | 10 +- app.go => cmd/http-gw/app.go | 39 +- .../http-gw/integration_test.go | 2 +- main.go => cmd/http-gw/main.go | 0 misc.go => cmd/http-gw/misc.go | 0 server.go => cmd/http-gw/server.go | 0 settings.go => cmd/http-gw/settings.go | 0 downloader/download.go | 537 ------------------ go.mod | 2 +- {api => internal/api}/layer/tree_service.go | 2 +- {api => internal/api}/tree.go | 0 internal/handler/download.go | 210 +++++++ {uploader => internal/handler}/filter.go | 2 +- {uploader => internal/handler}/filter_test.go | 2 +- internal/handler/handler.go | 193 +++++++ {downloader => internal/handler}/head.go | 18 +- {uploader => internal/handler}/multipart.go | 4 +- .../handler}/multipart/multipart.go | 0 .../handler}/multipart_test.go | 2 +- internal/handler/reader.go | 141 +++++ .../handler}/reader_test.go | 2 +- {uploader => internal/handler}/upload.go | 93 +-- internal/handler/utils.go | 60 ++ tree/tree.go | 4 +- utils/util.go | 13 - 27 files changed, 672 insertions(+), 664 deletions(-) rename Dockerfile => .docker/Dockerfile (100%) rename Dockerfile.dirty => .docker/Dockerfile.dirty (100%) rename app.go => cmd/http-gw/app.go (91%) rename integration_test.go => cmd/http-gw/integration_test.go (99%) rename main.go => cmd/http-gw/main.go (100%) rename misc.go => cmd/http-gw/misc.go (100%) rename server.go => cmd/http-gw/server.go (100%) rename settings.go => cmd/http-gw/settings.go (100%) delete mode 100644 downloader/download.go rename {api => internal/api}/layer/tree_service.go (90%) rename {api => internal/api}/tree.go (100%) create mode 100644 internal/handler/download.go rename {uploader => internal/handler}/filter.go (98%) rename {uploader => internal/handler}/filter_test.go (98%) create mode 100644 internal/handler/handler.go rename {downloader => internal/handler}/head.go (86%) rename {uploader => internal/handler}/multipart.go (92%) rename {uploader => internal/handler}/multipart/multipart.go (100%) rename {uploader => internal/handler}/multipart_test.go (99%) create mode 100644 internal/handler/reader.go rename {downloader => internal/handler}/reader_test.go (98%) rename {uploader => internal/handler}/upload.go (73%) create mode 100644 internal/handler/utils.go diff --git a/Dockerfile b/.docker/Dockerfile similarity index 100% rename from Dockerfile rename to .docker/Dockerfile diff --git a/Dockerfile.dirty b/.docker/Dockerfile.dirty similarity index 100% rename from Dockerfile.dirty rename to .docker/Dockerfile.dirty diff --git a/Makefile b/Makefile index c5296e1..d02d41b 100755 --- a/Makefile +++ b/Makefile @@ -18,8 +18,8 @@ TMP_DIR := .cache # List of binaries to build. For now just one. BINDIR = bin -DIRS = $(BINDIR) -BINS = $(BINDIR)/frostfs-http-gw +CMDS = $(addprefix frostfs-, $(notdir $(wildcard cmd/*))) +BINS = $(addprefix $(BINDIR)/, $(CMDS)) .PHONY: all $(BINS) $(DIRS) dep docker/ test cover fmt image image-push dirty-image lint docker/lint pre-commit unpre-commit version clean @@ -37,7 +37,7 @@ $(BINS): $(DIRS) dep CGO_ENABLED=0 \ go build -v -trimpath \ -ldflags "-X main.Version=$(VERSION)" \ - -o $@ ./ + -o $@ ./cmd/$(subst frostfs-,,$(notdir $@)) $(DIRS): @echo "⇒ Ensure dir: $@" @@ -90,7 +90,7 @@ image: --build-arg REPO=$(REPO) \ --build-arg VERSION=$(VERSION) \ --rm \ - -f Dockerfile \ + -f .docker/Dockerfile \ -t $(HUB_IMAGE):$(HUB_TAG) . # Push Docker image to the hub @@ -105,7 +105,7 @@ dirty-image: --build-arg REPO=$(REPO) \ --build-arg VERSION=$(VERSION) \ --rm \ - -f Dockerfile.dirty \ + -f .docker/Dockerfile.dirty \ -t $(HUB_IMAGE)-dirty:$(HUB_TAG) . # Install linters diff --git a/app.go b/cmd/http-gw/app.go similarity index 91% rename from app.go rename to cmd/http-gw/app.go index a0d6e0e..40db433 100644 --- a/app.go +++ b/cmd/http-gw/app.go @@ -11,15 +11,14 @@ import ( "syscall" "time" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/downloader" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/services" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/uploader" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" @@ -51,15 +50,10 @@ type ( resolver *resolver.ContainerResolver metrics *gateMetrics services []*metrics.Service - settings *appSettings + settings *handler.Settings servers []Server } - appSettings struct { - Uploader *uploader.Settings - Downloader *downloader.Settings - } - // App is an interface for the main gateway function. App interface { Wait() @@ -140,10 +134,7 @@ func newApp(ctx context.Context, opt ...Option) App { } func (a *app) initAppSettings() { - a.settings = &appSettings{ - Uploader: &uploader.Settings{}, - Downloader: &downloader.Settings{}, - } + a.settings = &handler.Settings{} a.updateSettings() } @@ -334,11 +325,10 @@ func (a *app) setHealthStatus() { } func (a *app) Serve() { - uploadRoutes := uploader.New(a.AppParams(), a.settings.Uploader) - downloadRoutes := downloader.New(a.AppParams(), a.settings.Downloader, tree.NewTree(services.NewPoolWrapper(a.treePool))) + handler := handler.New(a.AppParams(), a.settings, tree.NewTree(services.NewPoolWrapper(a.treePool))) // Configure router. - a.configureRouter(uploadRoutes, downloadRoutes) + a.configureRouter(handler) a.startServices() a.initServers(a.ctx) @@ -425,8 +415,8 @@ func (a *app) configReload(ctx context.Context) { } func (a *app) updateSettings() { - a.settings.Uploader.SetDefaultTimestamp(a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp)) - a.settings.Downloader.SetZipCompression(a.cfg.GetBool(cfgZipCompression)) + a.settings.SetDefaultTimestamp(a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp)) + a.settings.SetZipCompression(a.cfg.GetBool(cfgZipCompression)) } func (a *app) startServices() { @@ -450,7 +440,7 @@ func (a *app) stopServices() { } } -func (a *app) configureRouter(uploadRoutes *uploader.Uploader, downloadRoutes *downloader.Downloader) { +func (a *app) configureRouter(handler *handler.Handler) { r := router.New() r.RedirectTrailingSlash = true r.NotFound = func(r *fasthttp.RequestCtx) { @@ -459,15 +449,16 @@ func (a *app) configureRouter(uploadRoutes *uploader.Uploader, downloadRoutes *d r.MethodNotAllowed = func(r *fasthttp.RequestCtx) { response.Error(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed) } - r.POST("/upload/{cid}", a.logger(a.tokenizer(a.tracer(uploadRoutes.Upload)))) + + r.POST("/upload/{cid}", a.logger(a.tokenizer(a.tracer(handler.Upload)))) a.log.Info(logs.AddedPathUploadCid) - r.GET("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.DownloadByAddressOrBucketName)))) - r.HEAD("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.HeadByAddressOrBucketName)))) + r.GET("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(handler.DownloadByAddressOrBucketName)))) + r.HEAD("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(handler.HeadByAddressOrBucketName)))) a.log.Info(logs.AddedPathGetCidOid) - r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.DownloadByAttribute)))) - r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.HeadByAttribute)))) + r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(handler.DownloadByAttribute)))) + r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(handler.HeadByAttribute)))) a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal) - r.GET("/zip/{cid}/{prefix:*}", a.logger(a.tokenizer(a.tracer(downloadRoutes.DownloadZipped)))) + r.GET("/zip/{cid}/{prefix:*}", a.logger(a.tokenizer(a.tracer(handler.DownloadZipped)))) a.log.Info(logs.AddedPathZipCidPrefix) a.webServer.Handler = r.Handler diff --git a/integration_test.go b/cmd/http-gw/integration_test.go similarity index 99% rename from integration_test.go rename to cmd/http-gw/integration_test.go index 34506c4..76a8325 100644 --- a/integration_test.go +++ b/cmd/http-gw/integration_test.go @@ -83,7 +83,7 @@ func runServer() (App, context.CancelFunc) { v := getDefaultConfig() l, lvl := newLogger(v) application := newApp(cancelCtx, WithConfig(v), WithLogger(l, lvl)) - go application.Serve(cancelCtx) + go application.Serve() return application, cancel } diff --git a/main.go b/cmd/http-gw/main.go similarity index 100% rename from main.go rename to cmd/http-gw/main.go diff --git a/misc.go b/cmd/http-gw/misc.go similarity index 100% rename from misc.go rename to cmd/http-gw/misc.go diff --git a/server.go b/cmd/http-gw/server.go similarity index 100% rename from server.go rename to cmd/http-gw/server.go diff --git a/settings.go b/cmd/http-gw/settings.go similarity index 100% rename from settings.go rename to cmd/http-gw/settings.go diff --git a/downloader/download.go b/downloader/download.go deleted file mode 100644 index cd7f72f..0000000 --- a/downloader/download.go +++ /dev/null @@ -1,537 +0,0 @@ -package downloader - -import ( - "archive/zip" - "bufio" - "bytes" - "context" - "errors" - "fmt" - "io" - "net/http" - "net/url" - "path" - "strconv" - "strings" - "time" - - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" - oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" - "github.com/valyala/fasthttp" - "go.uber.org/atomic" - "go.uber.org/zap" -) - -type request struct { - *fasthttp.RequestCtx - log *zap.Logger -} - -func isValidToken(s string) bool { - for _, c := range s { - if c <= ' ' || c > 127 { - return false - } - if strings.ContainsRune("()<>@,;:\\\"/[]?={}", c) { - return false - } - } - return true -} - -func isValidValue(s string) bool { - for _, c := range s { - // HTTP specification allows for more technically, but we don't want to escape things. - if c < ' ' || c > 127 || c == '"' { - return false - } - } - return true -} - -type readCloser struct { - io.Reader - io.Closer -} - -// initializes io.Reader with the limited size and detects Content-Type from it. -// Returns r's error directly. Also returns the processed data. -func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (string, []byte, error) { - if maxSize > sizeToDetectType { - maxSize = sizeToDetectType - } - - buf := make([]byte, maxSize) // maybe sync-pool the slice? - - r, err := rInit(maxSize) - if err != nil { - return "", nil, err - } - - n, err := r.Read(buf) - if err != nil && err != io.EOF { - return "", nil, err - } - - buf = buf[:n] - - return http.DetectContentType(buf), buf, err // to not lose io.EOF -} - -func receiveFile(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) { - var ( - err error - dis = "inline" - start = time.Now() - filename string - ) - - var prm pool.PrmObjectGet - prm.SetAddress(objectAddress) - if btoken := bearerToken(ctx); btoken != nil { - prm.UseBearer(*btoken) - } - - rObj, err := clnt.GetObject(ctx, prm) - if err != nil { - req.handleFrostFSErr(err, start) - return - } - - // we can't close reader in this function, so how to do it? - - if req.Request.URI().QueryArgs().GetBool("download") { - dis = "attachment" - } - - payloadSize := rObj.Header.PayloadSize() - - req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10)) - var contentType string - for _, attr := range rObj.Header.Attributes() { - key := attr.Key() - val := attr.Value() - if !isValidToken(key) || !isValidValue(val) { - continue - } - - key = utils.BackwardTransformIfSystem(key) - - req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val) - switch key { - case object.AttributeFileName: - filename = val - case object.AttributeTimestamp: - value, err := strconv.ParseInt(val, 10, 64) - if err != nil { - req.log.Info(logs.CouldntParseCreationDate, - zap.String("key", key), - zap.String("val", val), - zap.Error(err)) - continue - } - req.Response.Header.Set(fasthttp.HeaderLastModified, - time.Unix(value, 0).UTC().Format(http.TimeFormat)) - case object.AttributeContentType: - contentType = val - } - } - - idsToResponse(&req.Response, &rObj.Header) - - if len(contentType) == 0 { - // determine the Content-Type from the payload head - var payloadHead []byte - - contentType, payloadHead, err = readContentType(payloadSize, func(uint64) (io.Reader, error) { - return rObj.Payload, nil - }) - if err != nil && err != io.EOF { - req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err)) - response.Error(req.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest) - return - } - - // reset payload reader since a part of the data has been read - var headReader io.Reader = bytes.NewReader(payloadHead) - - if err != io.EOF { // otherwise, we've already read full payload - headReader = io.MultiReader(headReader, rObj.Payload) - } - - // note: we could do with io.Reader, but SetBodyStream below closes body stream - // if it implements io.Closer and that's useful for us. - rObj.Payload = readCloser{headReader, rObj.Payload} - } - req.SetContentType(contentType) - - req.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename)) - - req.Response.SetBodyStream(rObj.Payload, int(payloadSize)) -} - -func bearerToken(ctx context.Context) *bearer.Token { - if tkn, err := tokens.LoadBearerToken(ctx); err == nil { - return tkn - } - return nil -} - -func (r *request) handleFrostFSErr(err error, start time.Time) { - logFields := []zap.Field{ - zap.Stringer("elapsed", time.Since(start)), - zap.Error(err), - } - statusCode, msg, additionalFields := response.FormErrorResponse("could not receive object", err) - logFields = append(logFields, additionalFields...) - - r.log.Error(logs.CouldNotReceiveObject, logFields...) - response.Error(r.RequestCtx, msg, statusCode) -} - -// Downloader is a download request handler. -type Downloader struct { - log *zap.Logger - pool *pool.Pool - containerResolver *resolver.ContainerResolver - settings *Settings - tree *tree.Tree -} - -// Settings stores reloading parameters, so it has to provide atomic getters and setters. -type Settings struct { - zipCompression atomic.Bool -} - -func (s *Settings) ZipCompression() bool { - return s.zipCompression.Load() -} - -func (s *Settings) SetZipCompression(val bool) { - s.zipCompression.Store(val) -} - -// New creates an instance of Downloader using specified options. -func New(params *utils.AppParams, settings *Settings, tree *tree.Tree) *Downloader { - return &Downloader{ - log: params.Logger, - pool: params.Pool, - settings: settings, - containerResolver: params.Resolver, - tree: tree, - } -} - -func (d *Downloader) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request { - return &request{ - RequestCtx: ctx, - log: log, - } -} - -// DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format. -func (d *Downloader) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) { - test, _ := c.UserValue("oid").(string) - var id oid.ID - err := id.DecodeString(test) - if err != nil { - d.byBucketname(c, receiveFile) - } else { - d.byAddress(c, receiveFile) - } -} - -// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that -// prepares request and object address to it. -func (d *Downloader) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) { - var ( - idCnr, _ = c.UserValue("cid").(string) - idObj, _ = c.UserValue("oid").(string) - log = d.log.With(zap.String("cid", idCnr), zap.String("oid", idObj)) - ) - - ctx := utils.GetContextFromRequest(c) - - cnrID, err := utils.GetContainerID(ctx, idCnr, d.containerResolver) - if err != nil { - log.Error(logs.WrongContainerID, zap.Error(err)) - response.Error(c, "wrong container id", fasthttp.StatusBadRequest) - return - } - - objID := new(oid.ID) - if err = objID.DecodeString(idObj); err != nil { - log.Error(logs.WrongObjectID, zap.Error(err)) - response.Error(c, "wrong object id", fasthttp.StatusBadRequest) - return - } - - var addr oid.Address - addr.SetContainer(*cnrID) - addr.SetObject(*objID) - - f(ctx, *d.newRequest(c, log), d.pool, addr) -} - -// byBucketname is a wrapper for function (e.g. request.headObject, request.receiveFile) that -// prepares request and object address to it. -func (d *Downloader) byBucketname(req *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) { - var ( - bucketname = req.UserValue("cid").(string) - key = req.UserValue("oid").(string) - log = d.log.With(zap.String("bucketname", bucketname), zap.String("key", key)) - ) - - ctx := utils.GetContextFromRequest(req) - - cnrID, err := utils.GetContainerID(ctx, bucketname, d.containerResolver) - if err != nil { - log.Error(logs.WrongContainerID, zap.Error(err)) - response.Error(req, "wrong container id", fasthttp.StatusBadRequest) - return - } - - foundOid, err := d.tree.GetLatestVersion(ctx, cnrID, key) - if err != nil { - log.Error(logs.ObjectWasntFound, zap.Error(err)) - response.Error(req, "object wasn't found", fasthttp.StatusNotFound) - return - } - if foundOid.DeleteMarker { - log.Error(logs.ObjectWasDeleted) - response.Error(req, "object deleted", fasthttp.StatusNotFound) - return - } - - var addr oid.Address - addr.SetContainer(*cnrID) - addr.SetObject(foundOid.OID) - - f(ctx, *d.newRequest(req, log), d.pool, addr) -} - -// DownloadByAttribute handles attribute-based download requests. -func (d *Downloader) DownloadByAttribute(c *fasthttp.RequestCtx) { - d.byAttribute(c, receiveFile) -} - -// byAttribute is a wrapper similar to byAddress. -func (d *Downloader) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, *pool.Pool, oid.Address)) { - var ( - scid, _ = c.UserValue("cid").(string) - key, _ = url.QueryUnescape(c.UserValue("attr_key").(string)) - val, _ = url.QueryUnescape(c.UserValue("attr_val").(string)) - log = d.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val)) - ) - - ctx := utils.GetContextFromRequest(c) - - containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver) - if err != nil { - log.Error(logs.WrongContainerID, zap.Error(err)) - response.Error(c, "wrong container id", fasthttp.StatusBadRequest) - return - } - - res, err := d.search(ctx, containerID, key, val, object.MatchStringEqual) - if err != nil { - log.Error(logs.CouldNotSearchForObjects, zap.Error(err)) - response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest) - return - } - - defer res.Close() - - buf := make([]oid.ID, 1) - - n, err := res.Read(buf) - if n == 0 { - if errors.Is(err, io.EOF) { - log.Error(logs.ObjectNotFound, zap.Error(err)) - response.Error(c, "object not found", fasthttp.StatusNotFound) - return - } - - log.Error(logs.ReadObjectListFailed, zap.Error(err)) - response.Error(c, "read object list failed: "+err.Error(), fasthttp.StatusBadRequest) - return - } - - var addrObj oid.Address - addrObj.SetContainer(*containerID) - addrObj.SetObject(buf[0]) - - f(ctx, *d.newRequest(c, log), d.pool, addrObj) -} - -func (d *Downloader) search(ctx context.Context, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) { - filters := object.NewSearchFilters() - filters.AddRootFilter() - filters.AddFilter(key, val, op) - - var prm pool.PrmObjectSearch - prm.SetContainerID(*cid) - prm.SetFilters(filters) - if btoken := bearerToken(ctx); btoken != nil { - prm.UseBearer(*btoken) - } - - return d.pool.SearchObjects(ctx, prm) -} - -func (d *Downloader) getContainer(ctx context.Context, cnrID cid.ID) (container.Container, error) { - var prm pool.PrmContainerGet - prm.SetContainerID(cnrID) - - return d.pool.GetContainer(ctx, prm) -} - -func (d *Downloader) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer, error) { - method := zip.Store - if d.settings.ZipCompression() { - method = zip.Deflate - } - - filePath := getZipFilePath(obj) - if len(filePath) == 0 || filePath[len(filePath)-1] == '/' { - return nil, fmt.Errorf("invalid filepath '%s'", filePath) - } - - return zw.CreateHeader(&zip.FileHeader{ - Name: filePath, - Method: method, - Modified: time.Now(), - }) -} - -// DownloadZipped handles zip by prefix requests. -func (d *Downloader) DownloadZipped(c *fasthttp.RequestCtx) { - scid, _ := c.UserValue("cid").(string) - prefix, _ := url.QueryUnescape(c.UserValue("prefix").(string)) - log := d.log.With(zap.String("cid", scid), zap.String("prefix", prefix)) - - ctx := utils.GetContextFromRequest(c) - - containerID, err := utils.GetContainerID(ctx, scid, d.containerResolver) - if err != nil { - log.Error(logs.WrongContainerID, zap.Error(err)) - response.Error(c, "wrong container id", fasthttp.StatusBadRequest) - return - } - - // check if container exists here to be able to return 404 error, - // otherwise we get this error only in object iteration step - // and client get 200 OK. - if _, err = d.getContainer(ctx, *containerID); err != nil { - log.Error(logs.CouldNotCheckContainerExistence, zap.Error(err)) - if client.IsErrContainerNotFound(err) { - response.Error(c, "Not Found", fasthttp.StatusNotFound) - return - } - response.Error(c, "could not check container existence: "+err.Error(), fasthttp.StatusBadRequest) - return - } - - resSearch, err := d.search(ctx, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix) - if err != nil { - log.Error(logs.CouldNotSearchForObjects, zap.Error(err)) - response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest) - return - } - - c.Response.Header.Set(fasthttp.HeaderContentType, "application/zip") - c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"") - c.Response.SetStatusCode(http.StatusOK) - - c.SetBodyStreamWriter(func(w *bufio.Writer) { - defer resSearch.Close() - - zipWriter := zip.NewWriter(w) - - var bufZip []byte - var addr oid.Address - - empty := true - called := false - btoken := bearerToken(ctx) - addr.SetContainer(*containerID) - - errIter := resSearch.Iterate(func(id oid.ID) bool { - called = true - - if empty { - bufZip = make([]byte, 3<<20) // the same as for upload - } - empty = false - - addr.SetObject(id) - if err = d.zipObject(ctx, zipWriter, addr, btoken, bufZip); err != nil { - log.Error(logs.FailedToAddObjectToArchive, zap.String("oid", id.EncodeToString()), zap.Error(err)) - } - - return false - }) - if errIter != nil { - log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter)) - } else if !called { - log.Error(logs.ObjectsNotFound) - } - - if err = zipWriter.Close(); err != nil { - log.Error(logs.CloseZipWriter, zap.Error(err)) - } - }) -} - -func (d *Downloader) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error { - var prm pool.PrmObjectGet - prm.SetAddress(addr) - if btoken != nil { - prm.UseBearer(*btoken) - } - - resGet, err := d.pool.GetObject(ctx, prm) - if err != nil { - return fmt.Errorf("get FrostFS object: %v", err) - } - - objWriter, err := d.addObjectToZip(zipWriter, &resGet.Header) - if err != nil { - return fmt.Errorf("zip create header: %v", err) - } - - if _, err = io.CopyBuffer(objWriter, resGet.Payload, bufZip); err != nil { - return fmt.Errorf("copy object payload to zip file: %v", err) - } - - if err = resGet.Payload.Close(); err != nil { - return fmt.Errorf("object body close error: %w", err) - } - - if err = zipWriter.Flush(); err != nil { - return fmt.Errorf("flush zip writer: %v", err) - } - - return nil -} - -func getZipFilePath(obj *object.Object) string { - for _, attr := range obj.Attributes() { - if attr.Key() == object.AttributeFilePath { - return attr.Value() - } - } - - return "" -} diff --git a/go.mod b/go.mod index 6e6bc80..80d794c 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,6 @@ require ( github.com/valyala/fasthttp v1.34.0 go.opentelemetry.io/otel v1.16.0 go.opentelemetry.io/otel/trace v1.16.0 - go.uber.org/atomic v1.10.0 go.uber.org/zap v1.24.0 google.golang.org/grpc v1.55.0 ) @@ -97,6 +96,7 @@ require ( go.opentelemetry.io/otel/metric v1.16.0 // indirect go.opentelemetry.io/otel/sdk v1.16.0 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect + go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.9.0 // indirect golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect diff --git a/api/layer/tree_service.go b/internal/api/layer/tree_service.go similarity index 90% rename from api/layer/tree_service.go rename to internal/api/layer/tree_service.go index 9852257..beb1e7a 100644 --- a/api/layer/tree_service.go +++ b/internal/api/layer/tree_service.go @@ -4,7 +4,7 @@ import ( "context" "errors" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/api" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/api" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" ) diff --git a/api/tree.go b/internal/api/tree.go similarity index 100% rename from api/tree.go rename to internal/api/tree.go diff --git a/internal/handler/download.go b/internal/handler/download.go new file mode 100644 index 0000000..8ee76bf --- /dev/null +++ b/internal/handler/download.go @@ -0,0 +1,210 @@ +package handler + +import ( + "archive/zip" + "bufio" + "context" + "fmt" + "io" + "net/http" + "net/url" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" + "github.com/valyala/fasthttp" + "go.uber.org/zap" +) + +// DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format. +func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) { + test, _ := c.UserValue("oid").(string) + var id oid.ID + err := id.DecodeString(test) + if err != nil { + h.byBucketname(c, h.receiveFile) + } else { + h.byAddress(c, h.receiveFile) + } +} + +func (h *Handler) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request { + return &request{ + RequestCtx: ctx, + log: log, + } +} + +// DownloadByAttribute handles attribute-based download requests. +func (h *Handler) DownloadByAttribute(c *fasthttp.RequestCtx) { + h.byAttribute(c, h.receiveFile) +} + +func (h *Handler) search(ctx context.Context, cid *cid.ID, key, val string, op object.SearchMatchType) (pool.ResObjectSearch, error) { + filters := object.NewSearchFilters() + filters.AddRootFilter() + filters.AddFilter(key, val, op) + + var prm pool.PrmObjectSearch + prm.SetContainerID(*cid) + prm.SetFilters(filters) + if btoken := bearerToken(ctx); btoken != nil { + prm.UseBearer(*btoken) + } + + return h.pool.SearchObjects(ctx, prm) +} + +func (h *Handler) getContainer(ctx context.Context, cnrID cid.ID) (container.Container, error) { + var prm pool.PrmContainerGet + prm.SetContainerID(cnrID) + + return h.pool.GetContainer(ctx, prm) +} + +func (h *Handler) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer, error) { + method := zip.Store + if h.settings.ZipCompression() { + method = zip.Deflate + } + + filePath := getZipFilePath(obj) + if len(filePath) == 0 || filePath[len(filePath)-1] == '/' { + return nil, fmt.Errorf("invalid filepath '%s'", filePath) + } + + return zw.CreateHeader(&zip.FileHeader{ + Name: filePath, + Method: method, + Modified: time.Now(), + }) +} + +// DownloadZipped handles zip by prefix requests. +func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) { + scid, _ := c.UserValue("cid").(string) + prefix, _ := url.QueryUnescape(c.UserValue("prefix").(string)) + log := h.log.With(zap.String("cid", scid), zap.String("prefix", prefix)) + + ctx := utils.GetContextFromRequest(c) + + containerID, err := h.getContainerID(ctx, scid) + if err != nil { + log.Error(logs.WrongContainerID, zap.Error(err)) + response.Error(c, "wrong container id", fasthttp.StatusBadRequest) + return + } + + // check if container exists here to be able to return 404 error, + // otherwise we get this error only in object iteration step + // and client get 200 OK. + if _, err = h.getContainer(ctx, *containerID); err != nil { + log.Error(logs.CouldNotCheckContainerExistence, zap.Error(err)) + if client.IsErrContainerNotFound(err) { + response.Error(c, "Not Found", fasthttp.StatusNotFound) + return + } + response.Error(c, "could not check container existence: "+err.Error(), fasthttp.StatusBadRequest) + return + } + + resSearch, err := h.search(ctx, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix) + if err != nil { + log.Error(logs.CouldNotSearchForObjects, zap.Error(err)) + response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest) + return + } + + c.Response.Header.Set(fasthttp.HeaderContentType, "application/zip") + c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"") + c.Response.SetStatusCode(http.StatusOK) + + c.SetBodyStreamWriter(func(w *bufio.Writer) { + defer resSearch.Close() + + zipWriter := zip.NewWriter(w) + + var bufZip []byte + var addr oid.Address + + empty := true + called := false + btoken := bearerToken(ctx) + addr.SetContainer(*containerID) + + errIter := resSearch.Iterate(func(id oid.ID) bool { + called = true + + if empty { + bufZip = make([]byte, 3<<20) // the same as for upload + } + empty = false + + addr.SetObject(id) + if err = h.zipObject(ctx, zipWriter, addr, btoken, bufZip); err != nil { + log.Error(logs.FailedToAddObjectToArchive, zap.String("oid", id.EncodeToString()), zap.Error(err)) + } + + return false + }) + if errIter != nil { + log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter)) + } else if !called { + log.Error(logs.ObjectsNotFound) + } + + if err = zipWriter.Close(); err != nil { + log.Error(logs.CloseZipWriter, zap.Error(err)) + } + }) +} + +func (h *Handler) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error { + var prm pool.PrmObjectGet + prm.SetAddress(addr) + if btoken != nil { + prm.UseBearer(*btoken) + } + + resGet, err := h.pool.GetObject(ctx, prm) + if err != nil { + return fmt.Errorf("get FrostFS object: %v", err) + } + + objWriter, err := h.addObjectToZip(zipWriter, &resGet.Header) + if err != nil { + return fmt.Errorf("zip create header: %v", err) + } + + if _, err = io.CopyBuffer(objWriter, resGet.Payload, bufZip); err != nil { + return fmt.Errorf("copy object payload to zip file: %v", err) + } + + if err = resGet.Payload.Close(); err != nil { + return fmt.Errorf("object body close error: %w", err) + } + + if err = zipWriter.Flush(); err != nil { + return fmt.Errorf("flush zip writer: %v", err) + } + + return nil +} + +func getZipFilePath(obj *object.Object) string { + for _, attr := range obj.Attributes() { + if attr.Key() == object.AttributeFilePath { + return attr.Value() + } + } + + return "" +} diff --git a/uploader/filter.go b/internal/handler/filter.go similarity index 98% rename from uploader/filter.go rename to internal/handler/filter.go index 70d6eef..745718a 100644 --- a/uploader/filter.go +++ b/internal/handler/filter.go @@ -1,4 +1,4 @@ -package uploader +package handler import ( "bytes" diff --git a/uploader/filter_test.go b/internal/handler/filter_test.go similarity index 98% rename from uploader/filter_test.go rename to internal/handler/filter_test.go index 9d32b84..0322952 100644 --- a/uploader/filter_test.go +++ b/internal/handler/filter_test.go @@ -1,6 +1,6 @@ //go:build !integration -package uploader +package handler import ( "testing" diff --git a/internal/handler/handler.go b/internal/handler/handler.go new file mode 100644 index 0000000..d462280 --- /dev/null +++ b/internal/handler/handler.go @@ -0,0 +1,193 @@ +package handler + +import ( + "context" + "errors" + "io" + "net/url" + "sync/atomic" + + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" + "github.com/valyala/fasthttp" + "go.uber.org/zap" +) + +type Handler struct { + log *zap.Logger + pool *pool.Pool + ownerID *user.ID + settings *Settings + containerResolver *resolver.ContainerResolver + tree *tree.Tree +} + +// Settings stores reloading parameters, so it has to provide atomic getters and setters. +type Settings struct { + defaultTimestamp atomic.Bool + zipCompression atomic.Bool +} + +func (s *Settings) DefaultTimestamp() bool { + return s.defaultTimestamp.Load() +} + +func (s *Settings) SetDefaultTimestamp(val bool) { + s.defaultTimestamp.Store(val) +} + +func (s *Settings) ZipCompression() bool { + return s.zipCompression.Load() +} + +func (s *Settings) SetZipCompression(val bool) { + s.zipCompression.Store(val) +} + +func New(params *utils.AppParams, settings *Settings, tree *tree.Tree) *Handler { + return &Handler{ + log: params.Logger, + pool: params.Pool, + ownerID: params.Owner, + settings: settings, + containerResolver: params.Resolver, + tree: tree, + } +} + +// getContainerID decode container id, if it's not a valid container id +// then trey to resolve name using provided resolver. +func (h *Handler) getContainerID(ctx context.Context, containerID string) (*cid.ID, error) { + cnrID := new(cid.ID) + err := cnrID.DecodeString(containerID) + if err != nil { + cnrID, err = h.containerResolver.Resolve(ctx, containerID) + } + return cnrID, err +} + +// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that +// prepares request and object address to it. +func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { + var ( + idCnr, _ = c.UserValue("cid").(string) + idObj, _ = c.UserValue("oid").(string) + log = h.log.With(zap.String("cid", idCnr), zap.String("oid", idObj)) + ) + + ctx := utils.GetContextFromRequest(c) + + cnrID, err := h.getContainerID(ctx, idCnr) + if err != nil { + log.Error(logs.WrongContainerID, zap.Error(err)) + response.Error(c, "wrong container id", fasthttp.StatusBadRequest) + return + } + + objID := new(oid.ID) + if err = objID.DecodeString(idObj); err != nil { + log.Error(logs.WrongObjectID, zap.Error(err)) + response.Error(c, "wrong object id", fasthttp.StatusBadRequest) + return + } + + var addr oid.Address + addr.SetContainer(*cnrID) + addr.SetObject(*objID) + + f(ctx, *h.newRequest(c, log), addr) +} + +// byBucketname is a wrapper for function (e.g. request.headObject, request.receiveFile) that +// prepares request and object address to it. +func (h *Handler) byBucketname(req *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { + var ( + bucketname = req.UserValue("cid").(string) + key = req.UserValue("oid").(string) + log = h.log.With(zap.String("bucketname", bucketname), zap.String("key", key)) + ) + + ctx := utils.GetContextFromRequest(req) + + cnrID, err := h.getContainerID(ctx, bucketname) + if err != nil { + log.Error(logs.WrongContainerID, zap.Error(err)) + response.Error(req, "wrong container id", fasthttp.StatusBadRequest) + return + } + + foundOid, err := h.tree.GetLatestVersion(ctx, cnrID, key) + if err != nil { + log.Error(logs.ObjectWasntFound, zap.Error(err)) + response.Error(req, "object wasn't found", fasthttp.StatusNotFound) + return + } + if foundOid.DeleteMarker { + log.Error(logs.ObjectWasDeleted) + response.Error(req, "object deleted", fasthttp.StatusNotFound) + return + } + + var addr oid.Address + addr.SetContainer(*cnrID) + addr.SetObject(foundOid.OID) + + f(ctx, *h.newRequest(req, log), addr) +} + +// byAttribute is a wrapper similar to byAddress. +func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) { + var ( + scid, _ = c.UserValue("cid").(string) + key, _ = url.QueryUnescape(c.UserValue("attr_key").(string)) + val, _ = url.QueryUnescape(c.UserValue("attr_val").(string)) + log = h.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val)) + ) + + ctx := utils.GetContextFromRequest(c) + + containerID, err := h.getContainerID(ctx, scid) + if err != nil { + log.Error(logs.WrongContainerID, zap.Error(err)) + response.Error(c, "wrong container id", fasthttp.StatusBadRequest) + return + } + + res, err := h.search(ctx, containerID, key, val, object.MatchStringEqual) + if err != nil { + log.Error(logs.CouldNotSearchForObjects, zap.Error(err)) + response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest) + return + } + + defer res.Close() + + buf := make([]oid.ID, 1) + + n, err := res.Read(buf) + if n == 0 { + if errors.Is(err, io.EOF) { + log.Error(logs.ObjectNotFound, zap.Error(err)) + response.Error(c, "object not found", fasthttp.StatusNotFound) + return + } + + log.Error(logs.ReadObjectListFailed, zap.Error(err)) + response.Error(c, "read object list failed: "+err.Error(), fasthttp.StatusBadRequest) + return + } + + var addrObj oid.Address + addrObj.SetContainer(*containerID) + addrObj.SetObject(buf[0]) + + f(ctx, *h.newRequest(c, log), addrObj) +} diff --git a/downloader/head.go b/internal/handler/head.go similarity index 86% rename from downloader/head.go rename to internal/handler/head.go index 76dfd93..f7478f1 100644 --- a/downloader/head.go +++ b/internal/handler/head.go @@ -1,4 +1,4 @@ -package downloader +package handler import ( "context" @@ -25,7 +25,7 @@ const ( hdrContainerID = "X-Container-Id" ) -func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress oid.Address) { +func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid.Address) { var start = time.Now() btoken := bearerToken(ctx) @@ -36,7 +36,7 @@ func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress prm.UseBearer(*btoken) } - obj, err := clnt.HeadObject(ctx, prm) + obj, err := h.pool.HeadObject(ctx, prm) if err != nil { req.handleFrostFSErr(err, start) return @@ -81,7 +81,7 @@ func headObject(ctx context.Context, req request, clnt *pool.Pool, objectAddress prmRange.UseBearer(*btoken) } - resObj, err := clnt.ObjectRange(ctx, prmRange) + resObj, err := h.pool.ObjectRange(ctx, prmRange) if err != nil { return nil, err } @@ -104,19 +104,19 @@ func idsToResponse(resp *fasthttp.Response, obj *object.Object) { } // HeadByAddressOrBucketName handles head requests using simple cid/oid or bucketname/key format. -func (d *Downloader) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) { +func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) { test, _ := c.UserValue("oid").(string) var id oid.ID err := id.DecodeString(test) if err != nil { - d.byBucketname(c, headObject) + h.byBucketname(c, h.headObject) } else { - d.byAddress(c, headObject) + h.byAddress(c, h.headObject) } } // HeadByAttribute handles attribute-based head requests. -func (d *Downloader) HeadByAttribute(c *fasthttp.RequestCtx) { - d.byAttribute(c, headObject) +func (h *Handler) HeadByAttribute(c *fasthttp.RequestCtx) { + h.byAttribute(c, h.headObject) } diff --git a/uploader/multipart.go b/internal/handler/multipart.go similarity index 92% rename from uploader/multipart.go rename to internal/handler/multipart.go index 135ee88..de9242f 100644 --- a/uploader/multipart.go +++ b/internal/handler/multipart.go @@ -1,10 +1,10 @@ -package uploader +package handler import ( "io" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/multipart" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/uploader/multipart" "go.uber.org/zap" ) diff --git a/uploader/multipart/multipart.go b/internal/handler/multipart/multipart.go similarity index 100% rename from uploader/multipart/multipart.go rename to internal/handler/multipart/multipart.go diff --git a/uploader/multipart_test.go b/internal/handler/multipart_test.go similarity index 99% rename from uploader/multipart_test.go rename to internal/handler/multipart_test.go index d19cd5e..2c50a87 100644 --- a/uploader/multipart_test.go +++ b/internal/handler/multipart_test.go @@ -1,6 +1,6 @@ //go:build !integration -package uploader +package handler import ( "crypto/rand" diff --git a/internal/handler/reader.go b/internal/handler/reader.go new file mode 100644 index 0000000..76801f7 --- /dev/null +++ b/internal/handler/reader.go @@ -0,0 +1,141 @@ +package handler + +import ( + "bytes" + "context" + "io" + "net/http" + "path" + "strconv" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" + "github.com/valyala/fasthttp" + "go.uber.org/zap" +) + +type readCloser struct { + io.Reader + io.Closer +} + +// initializes io.Reader with the limited size and detects Content-Type from it. +// Returns r's error directly. Also returns the processed data. +func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (string, []byte, error) { + if maxSize > sizeToDetectType { + maxSize = sizeToDetectType + } + + buf := make([]byte, maxSize) // maybe sync-pool the slice? + + r, err := rInit(maxSize) + if err != nil { + return "", nil, err + } + + n, err := r.Read(buf) + if err != nil && err != io.EOF { + return "", nil, err + } + + buf = buf[:n] + + return http.DetectContentType(buf), buf, err // to not lose io.EOF +} + +func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oid.Address) { + var ( + err error + dis = "inline" + start = time.Now() + filename string + ) + + var prm pool.PrmObjectGet + prm.SetAddress(objectAddress) + if btoken := bearerToken(ctx); btoken != nil { + prm.UseBearer(*btoken) + } + + rObj, err := h.pool.GetObject(ctx, prm) + if err != nil { + req.handleFrostFSErr(err, start) + return + } + + // we can't close reader in this function, so how to do it? + + if req.Request.URI().QueryArgs().GetBool("download") { + dis = "attachment" + } + + payloadSize := rObj.Header.PayloadSize() + + req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(payloadSize, 10)) + var contentType string + for _, attr := range rObj.Header.Attributes() { + key := attr.Key() + val := attr.Value() + if !isValidToken(key) || !isValidValue(val) { + continue + } + + key = utils.BackwardTransformIfSystem(key) + + req.Response.Header.Set(utils.UserAttributeHeaderPrefix+key, val) + switch key { + case object.AttributeFileName: + filename = val + case object.AttributeTimestamp: + value, err := strconv.ParseInt(val, 10, 64) + if err != nil { + req.log.Info(logs.CouldntParseCreationDate, + zap.String("key", key), + zap.String("val", val), + zap.Error(err)) + continue + } + req.Response.Header.Set(fasthttp.HeaderLastModified, + time.Unix(value, 0).UTC().Format(http.TimeFormat)) + case object.AttributeContentType: + contentType = val + } + } + + idsToResponse(&req.Response, &rObj.Header) + + if len(contentType) == 0 { + // determine the Content-Type from the payload head + var payloadHead []byte + + contentType, payloadHead, err = readContentType(payloadSize, func(uint64) (io.Reader, error) { + return rObj.Payload, nil + }) + if err != nil && err != io.EOF { + req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err)) + response.Error(req.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest) + return + } + + // reset payload reader since a part of the data has been read + var headReader io.Reader = bytes.NewReader(payloadHead) + + if err != io.EOF { // otherwise, we've already read full payload + headReader = io.MultiReader(headReader, rObj.Payload) + } + + // note: we could do with io.Reader, but SetBodyStream below closes body stream + // if it implements io.Closer and that's useful for us. + rObj.Payload = readCloser{headReader, rObj.Payload} + } + req.SetContentType(contentType) + + req.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename)) + + req.Response.SetBodyStream(rObj.Payload, int(payloadSize)) +} diff --git a/downloader/reader_test.go b/internal/handler/reader_test.go similarity index 98% rename from downloader/reader_test.go rename to internal/handler/reader_test.go index 09c990a..73899ca 100644 --- a/downloader/reader_test.go +++ b/internal/handler/reader_test.go @@ -1,6 +1,6 @@ //go:build !integration -package downloader +package handler import ( "io" diff --git a/uploader/upload.go b/internal/handler/upload.go similarity index 73% rename from uploader/upload.go rename to internal/handler/upload.go index 2832043..8d4e681 100644 --- a/uploader/upload.go +++ b/internal/handler/upload.go @@ -1,4 +1,4 @@ -package uploader +package handler import ( "context" @@ -9,7 +9,6 @@ import ( "time" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" @@ -17,9 +16,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" - "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "github.com/valyala/fasthttp" - "go.uber.org/atomic" "go.uber.org/zap" ) @@ -28,55 +25,39 @@ const ( drainBufSize = 4096 ) -// Uploader is an upload request handler. -type Uploader struct { - log *zap.Logger - pool *pool.Pool - ownerID *user.ID - settings *Settings - containerResolver *resolver.ContainerResolver +type putResponse struct { + ObjectID string `json:"object_id"` + ContainerID string `json:"container_id"` } -// Settings stores reloading parameters, so it has to provide atomic getters and setters. -type Settings struct { - defaultTimestamp atomic.Bool -} - -func (s *Settings) DefaultTimestamp() bool { - return s.defaultTimestamp.Load() -} - -func (s *Settings) SetDefaultTimestamp(val bool) { - s.defaultTimestamp.Store(val) -} - -// New creates a new Uploader using specified logger, connection pool and -// other options. -func New(params *utils.AppParams, settings *Settings) *Uploader { - return &Uploader{ - log: params.Logger, - pool: params.Pool, - ownerID: params.Owner, - settings: settings, - containerResolver: params.Resolver, +func newPutResponse(addr oid.Address) *putResponse { + return &putResponse{ + ObjectID: addr.Object().EncodeToString(), + ContainerID: addr.Container().EncodeToString(), } } +func (pr *putResponse) encode(w io.Writer) error { + enc := json.NewEncoder(w) + enc.SetIndent("", "\t") + return enc.Encode(pr) +} + // Upload handles multipart upload request. -func (u *Uploader) Upload(req *fasthttp.RequestCtx) { +func (h *Handler) Upload(req *fasthttp.RequestCtx) { var ( file MultipartFile idObj oid.ID addr oid.Address scid, _ = req.UserValue("cid").(string) - log = u.log.With(zap.String("cid", scid)) + log = h.log.With(zap.String("cid", scid)) bodyStream = req.RequestBodyStream() drainBuf = make([]byte, drainBufSize) ) ctx := utils.GetContextFromRequest(req) - idCnr, err := utils.GetContainerID(ctx, scid, u.containerResolver) + idCnr, err := h.getContainerID(ctx, scid) if err != nil { log.Error(logs.WrongContainerID, zap.Error(err)) response.Error(req, "wrong container id", fasthttp.StatusBadRequest) @@ -97,12 +78,12 @@ func (u *Uploader) Upload(req *fasthttp.RequestCtx) { ) }() boundary := string(req.Request.Header.MultipartFormBoundary()) - if file, err = fetchMultipartFile(u.log, bodyStream, boundary); err != nil { + if file, err = fetchMultipartFile(h.log, bodyStream, boundary); err != nil { log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err)) response.Error(req, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest) return } - filtered, err := filterHeaders(u.log, &req.Request.Header) + filtered, err := filterHeaders(h.log, &req.Request.Header) if err != nil { log.Error(logs.CouldNotProcessHeaders, zap.Error(err)) response.Error(req, err.Error(), fasthttp.StatusBadRequest) @@ -118,7 +99,7 @@ func (u *Uploader) Upload(req *fasthttp.RequestCtx) { } } - if err = utils.PrepareExpirationHeader(req, u.pool, filtered, now); err != nil { + if err = utils.PrepareExpirationHeader(req, h.pool, filtered, now); err != nil { log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err)) response.Error(req, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest) return @@ -140,7 +121,7 @@ func (u *Uploader) Upload(req *fasthttp.RequestCtx) { attributes = append(attributes, *filename) } // sets Timestamp attribute if it wasn't set from header and enabled by settings - if _, ok := filtered[object.AttributeTimestamp]; !ok && u.settings.DefaultTimestamp() { + if _, ok := filtered[object.AttributeTimestamp]; !ok && h.settings.DefaultTimestamp() { timestamp := object.NewAttribute() timestamp.SetKey(object.AttributeTimestamp) timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10)) @@ -149,20 +130,20 @@ func (u *Uploader) Upload(req *fasthttp.RequestCtx) { obj := object.New() obj.SetContainerID(*idCnr) - obj.SetOwnerID(u.ownerID) + obj.SetOwnerID(h.ownerID) obj.SetAttributes(attributes...) var prm pool.PrmObjectPut prm.SetHeader(*obj) prm.SetPayload(file) - bt := u.fetchBearerToken(ctx) + bt := h.fetchBearerToken(ctx) if bt != nil { prm.UseBearer(*bt) } - if idObj, err = u.pool.PutObject(ctx, prm); err != nil { - u.handlePutFrostFSErr(req, err) + if idObj, err = h.pool.PutObject(ctx, prm); err != nil { + h.handlePutFrostFSErr(req, err) return } @@ -193,35 +174,17 @@ func (u *Uploader) Upload(req *fasthttp.RequestCtx) { req.Response.Header.SetContentType(jsonHeader) } -func (u *Uploader) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error) { +func (h *Handler) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error) { statusCode, msg, additionalFields := response.FormErrorResponse("could not store file in frostfs", err) logFields := append([]zap.Field{zap.Error(err)}, additionalFields...) - u.log.Error(logs.CouldNotStoreFileInFrostfs, logFields...) + h.log.Error(logs.CouldNotStoreFileInFrostfs, logFields...) response.Error(r, msg, statusCode) } -func (u *Uploader) fetchBearerToken(ctx context.Context) *bearer.Token { +func (h *Handler) fetchBearerToken(ctx context.Context) *bearer.Token { if tkn, err := tokens.LoadBearerToken(ctx); err == nil && tkn != nil { return tkn } return nil } - -type putResponse struct { - ObjectID string `json:"object_id"` - ContainerID string `json:"container_id"` -} - -func newPutResponse(addr oid.Address) *putResponse { - return &putResponse{ - ObjectID: addr.Object().EncodeToString(), - ContainerID: addr.Container().EncodeToString(), - } -} - -func (pr *putResponse) encode(w io.Writer) error { - enc := json.NewEncoder(w) - enc.SetIndent("", "\t") - return enc.Encode(pr) -} diff --git a/internal/handler/utils.go b/internal/handler/utils.go new file mode 100644 index 0000000..b51400c --- /dev/null +++ b/internal/handler/utils.go @@ -0,0 +1,60 @@ +package handler + +import ( + "context" + "strings" + "time" + + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/response" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer" + "github.com/valyala/fasthttp" + "go.uber.org/zap" +) + +type request struct { + *fasthttp.RequestCtx + log *zap.Logger +} + +func (r *request) handleFrostFSErr(err error, start time.Time) { + logFields := []zap.Field{ + zap.Stringer("elapsed", time.Since(start)), + zap.Error(err), + } + statusCode, msg, additionalFields := response.FormErrorResponse("could not receive object", err) + logFields = append(logFields, additionalFields...) + + r.log.Error(logs.CouldNotReceiveObject, logFields...) + response.Error(r.RequestCtx, msg, statusCode) +} + +func bearerToken(ctx context.Context) *bearer.Token { + if tkn, err := tokens.LoadBearerToken(ctx); err == nil { + return tkn + } + return nil +} + +func isValidToken(s string) bool { + for _, c := range s { + if c <= ' ' || c > 127 { + return false + } + if strings.ContainsRune("()<>@,;:\\\"/[]?={}", c) { + return false + } + } + return true +} + +func isValidValue(s string) bool { + for _, c := range s { + // HTTP specification allows for more technically, but we don't want to escape things. + if c < ' ' || c > 127 || c == '"' { + return false + } + } + return true +} diff --git a/tree/tree.go b/tree/tree.go index 84b6707..3a673b3 100644 --- a/tree/tree.go +++ b/tree/tree.go @@ -5,8 +5,8 @@ import ( "fmt" "strings" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/api" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/api/layer" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/api" + "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/api/layer" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" ) diff --git a/utils/util.go b/utils/util.go index e54c98d..a328769 100644 --- a/utils/util.go +++ b/utils/util.go @@ -4,23 +4,10 @@ import ( "context" "fmt" - "git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver" - cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool" "github.com/valyala/fasthttp" ) -// GetContainerID decode container id, if it's not a valid container id -// then trey to resolve name using provided resolver. -func GetContainerID(ctx context.Context, containerID string, resolver *resolver.ContainerResolver) (*cid.ID, error) { - cnrID := new(cid.ID) - err := cnrID.DecodeString(containerID) - if err != nil { - cnrID, err = resolver.Resolve(ctx, containerID) - } - return cnrID, err -} - type EpochDurations struct { CurrentEpoch uint64 MsPerBlock int64