[#170] Support tar.gz archives #177

Merged
alexvanin merged 4 commits from nzinkevich/frostfs-http-gw:tar_download into master 2025-01-23 14:42:42 +00:00
15 changed files with 492 additions and 276 deletions

View file

@ -97,7 +97,7 @@ type (
mu sync.RWMutex mu sync.RWMutex
defaultTimestamp bool defaultTimestamp bool
zipCompression bool archiveCompression bool
clientCut bool clientCut bool
returnIndexPage bool returnIndexPage bool
indexPageTemplate string indexPageTemplate string
@ -178,7 +178,7 @@ func (a *app) initAppSettings() {
func (s *appSettings) update(v *viper.Viper, l *zap.Logger) { func (s *appSettings) update(v *viper.Viper, l *zap.Logger) {
defaultTimestamp := v.GetBool(cfgUploaderHeaderEnableDefaultTimestamp) defaultTimestamp := v.GetBool(cfgUploaderHeaderEnableDefaultTimestamp)
zipCompression := v.GetBool(cfgZipCompression) archiveCompression := fetchArchiveCompression(v)
alexvanin marked this conversation as resolved Outdated

I had a chat with developers who configure gateways and received a feedback that deprecated version of old settings will be nice for one minor release. This also was mentioned by @dkirillov in the comments.

I suggest:

  1. Keep cfgZipCompression constant and declare it deprecated in the comment
  2. if v.IsSet(cfgZipCompression) then use it as value for archiveCompression variable
  3. Add comment to config.yaml, config.env that setting is deprecated, use new one
  4. Keep zip section in gate-configuration.md but mention this is deprecated
  5. Create an issue to remove deprecated zip section
I had a chat with developers who configure gateways and received a feedback that deprecated version of old settings will be nice for one minor release. This also was mentioned by @dkirillov in the comments. I suggest: 1) Keep `cfgZipCompression` constant and declare it deprecated in the comment 2) `if v.IsSet(cfgZipCompression)` then use it as value for `archiveCompression` variable 3) Add comment to config.yaml, config.env that setting is deprecated, use new one 4) Keep `zip` section in gate-configuration.md but mention this is deprecated 5) Create an issue to remove deprecated zip section
returnIndexPage := v.GetBool(cfgIndexPageEnabled) returnIndexPage := v.GetBool(cfgIndexPageEnabled)
clientCut := v.GetBool(cfgClientCut) clientCut := v.GetBool(cfgClientCut)
bufferMaxSizeForPut := v.GetUint64(cfgBufferMaxSizeForPut) bufferMaxSizeForPut := v.GetUint64(cfgBufferMaxSizeForPut)
@ -197,7 +197,7 @@ func (s *appSettings) update(v *viper.Viper, l *zap.Logger) {
defer s.mu.Unlock() defer s.mu.Unlock()
s.defaultTimestamp = defaultTimestamp s.defaultTimestamp = defaultTimestamp
s.zipCompression = zipCompression s.archiveCompression = archiveCompression
s.returnIndexPage = returnIndexPage s.returnIndexPage = returnIndexPage
s.clientCut = clientCut s.clientCut = clientCut
s.bufferMaxSizeForPut = bufferMaxSizeForPut s.bufferMaxSizeForPut = bufferMaxSizeForPut
@ -236,10 +236,10 @@ func (s *appSettings) DefaultTimestamp() bool {
return s.defaultTimestamp return s.defaultTimestamp
} }
func (s *appSettings) ZipCompression() bool { func (s *appSettings) ArchiveCompression() bool {
s.mu.RLock() s.mu.RLock()
defer s.mu.RUnlock() defer s.mu.RUnlock()
return s.zipCompression return s.archiveCompression
} }
func (s *appSettings) IndexPageEnabled() bool { func (s *appSettings) IndexPageEnabled() bool {
@ -656,8 +656,10 @@ func (a *app) configureRouter(h *handler.Handler) {
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(h.HeadByAttribute)) r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(h.HeadByAttribute))
r.OPTIONS("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addPreflight()) r.OPTIONS("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addPreflight())
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal) a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal)
r.GET("/zip/{cid}/{prefix:*}", a.addMiddlewares(h.DownloadZipped)) r.GET("/zip/{cid}/{prefix:*}", a.addMiddlewares(h.DownloadZip))
dkirillov marked this conversation as resolved Outdated

Please add r.OPTIONS also

Please add `r.OPTIONS` also
r.OPTIONS("/zip/{cid}/{prefix:*}", a.addPreflight()) r.OPTIONS("/zip/{cid}/{prefix:*}", a.addPreflight())
r.GET("/tar/{cid}/{prefix:*}", a.addMiddlewares(h.DownloadTar))
r.OPTIONS("/tar/{cid}/{prefix:*}", a.addPreflight())
a.log.Info(logs.AddedPathZipCidPrefix) a.log.Info(logs.AddedPathZipCidPrefix)
a.webServer.Handler = r.Handler a.webServer.Handler = r.Handler

View file

@ -128,8 +128,13 @@ const (
cfgResolveOrder = "resolve_order" cfgResolveOrder = "resolve_order"
// Zip compression. // Zip compression.
//

I'm not sure if we can remove zip.compression parameter. Probably we should keep it for one release.
cc @alexvanin

I'm not sure if we can remove `zip.compression` parameter. Probably we should keep it for one release. cc @alexvanin
// Deprecated: Use cfgArchiveCompression instead.
cfgZipCompression = "zip.compression" cfgZipCompression = "zip.compression"
// Archive compression.
cfgArchiveCompression = "archive.compression"
// Runtime. // Runtime.
cfgSoftMemoryLimit = "runtime.soft_memory_limit" cfgSoftMemoryLimit = "runtime.soft_memory_limit"
@ -255,9 +260,6 @@ func settings() *viper.Viper {
// upload header // upload header
v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false) v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false)
// zip:
v.SetDefault(cfgZipCompression, false)
// metrics // metrics
v.SetDefault(cfgPprofAddress, "localhost:8083") v.SetDefault(cfgPprofAddress, "localhost:8083")
v.SetDefault(cfgPrometheusAddress, "localhost:8084") v.SetDefault(cfgPrometheusAddress, "localhost:8084")
@ -844,3 +846,10 @@ func fetchTracingAttributes(v *viper.Viper) (map[string]string, error) {
return attributes, nil return attributes, nil
} }
func fetchArchiveCompression(v *viper.Viper) bool {
if v.IsSet(cfgZipCompression) {
return v.GetBool(cfgZipCompression)
}
return v.GetBool(cfgArchiveCompression)
}

View file

@ -97,9 +97,13 @@ HTTP_GW_REBALANCE_TIMER=30s
# The number of errors on connection after which node is considered as unhealthy # The number of errors on connection after which node is considered as unhealthy
HTTP_GW_POOL_ERROR_THRESHOLD=100 HTTP_GW_POOL_ERROR_THRESHOLD=100
# Enable zip compression to download files by common prefix. # Enable archive compression to download files by common prefix.
# DEPRECATED: Use HTTP_GW_ARCHIVE_COMPRESSION instead.
HTTP_GW_ZIP_COMPRESSION=false HTTP_GW_ZIP_COMPRESSION=false
# Enable archive compression to download files by common prefix.
HTTP_GW_ARCHIVE_COMPRESSION=false
HTTP_GW_TRACING_ENABLED=true HTTP_GW_TRACING_ENABLED=true
HTTP_GW_TRACING_ENDPOINT="localhost:4317" HTTP_GW_TRACING_ENDPOINT="localhost:4317"
HTTP_GW_TRACING_EXPORTER="otlp_grpc" HTTP_GW_TRACING_EXPORTER="otlp_grpc"

View file

@ -116,13 +116,19 @@ pool_error_threshold: 100 # The number of errors on connection after which node
# Number of workers in handler's worker pool # Number of workers in handler's worker pool
worker_pool_size: 1000 worker_pool_size: 1000
# Enable index page to see objects list for specified container and prefix # Enables index page to see objects list for specified container and prefix
index_page: index_page:
enabled: false enabled: false
template_path: internal/handler/templates/index.gotmpl template_path: internal/handler/templates/index.gotmpl
# Deprecated: Use archive.compression instead
zip: zip:
compression: false # Enable zip compression to download files by common prefix. # Enables zip compression to download files by common prefix.
compression: false
archive:
# Enables archive compression to download files by common prefix.
compression: false
runtime: runtime:
soft_memory_limit: 1gb soft_memory_limit: 1gb

View file

@ -1,11 +1,11 @@
# HTTP Gateway Specification # HTTP Gateway Specification
| Route | Description | | Route | Description |
|-------------------------------------------------|----------------------------------------------| |-------------------------------------------------|--------------------------------------------------|
| `/upload/{cid}` | [Put object](#put-object) | | `/upload/{cid}` | [Put object](#put-object) |
| `/get/{cid}/{oid}` | [Get object](#get-object) | | `/get/{cid}/{oid}` | [Get object](#get-object) |
| `/get_by_attribute/{cid}/{attr_key}/{attr_val}` | [Search object](#search-object) | | `/get_by_attribute/{cid}/{attr_key}/{attr_val}` | [Search object](#search-object) |
| `/zip/{cid}/{prefix}` | [Download objects in archive](#download-zip) | | `/zip/{cid}/{prefix}`, `/tar/{cid}/{prefix}` | [Download objects in archive](#download-archive) |
**Note:** `cid` parameter can be base58 encoded container ID or container name **Note:** `cid` parameter can be base58 encoded container ID or container name
(the name must be registered in NNS, see appropriate section in [nns.md](./nns.md)). (the name must be registered in NNS, see appropriate section in [nns.md](./nns.md)).
@ -56,12 +56,14 @@ Upload file as object with attributes to FrostFS.
###### Headers ###### Headers
| Header | Description | | Header | Description |
|------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------| |------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Common headers | See [bearer token](#bearer-token). | | Common headers | See [bearer token](#bearer-token). |
| `X-Attribute-System-*` | Used to set system FrostFS object attributes <br/> (e.g. use "X-Attribute-System-Expiration-Epoch" to set `__SYSTEM__EXPIRATION_EPOCH` attribute). | | `X-Attribute-System-*` | Used to set system FrostFS object attributes <br/> (e.g. use "X-Attribute-System-Expiration-Epoch" to set `__SYSTEM__EXPIRATION_EPOCH` attribute). |
| `X-Attribute-*` | Used to set regular object attributes <br/> (e.g. use "X-Attribute-My-Tag" to set `My-Tag` attribute). | | `X-Attribute-*` | Used to set regular object attributes <br/> (e.g. use "X-Attribute-My-Tag" to set `My-Tag` attribute). |
| `Date` | This header is used to calculate the right `__SYSTEM__EXPIRATION` attribute for object. If the header is missing, the current server time is used. | | `X-Explode-Archive` | If set, gate tries to read files from uploading `tar` archive and creates an object for each file in it. Uploading `tar` could be compressed via Gzip by setting a `Content-Encoding` header. Sets a `FilePath` attribute as a relative path from archive root and a `FileName` as the last path element of the `FilePath`. |
dkirillov marked this conversation as resolved Outdated

This should be X-Explode-Archive

This should be `X-Explode-Archive`
dkirillov marked this conversation as resolved Outdated

We should mention behavior with Content-Encoding header

We should mention behavior with `Content-Encoding` header
| `Content-Encoding` | If set and value is `gzip`, gate will handle uploading file as a `Gzip` compressed `tar` file. |
| `Date` | This header is used to calculate the right `__SYSTEM__EXPIRATION` attribute for object. If the header is missing, the current server time is used. |
There are some reserved headers type of `X-Attribute-FROSTFS-*` (headers are arranged in descending order of priority): There are some reserved headers type of `X-Attribute-FROSTFS-*` (headers are arranged in descending order of priority):
@ -269,9 +271,9 @@ If more than one object is found, an arbitrary one will be used to get attribute
| 400 | Some error occurred during operation. | | 400 | Some error occurred during operation. |
| 404 | Container or object not found. | | 404 | Container or object not found. |
## Download zip ## Download archive
Route: `/zip/{cid}/{prefix}` Route: `/zip/{cid}/{prefix}`, `/tar/{cid}/{prefix}`
| Route parameter | Type | Description | | Route parameter | Type | Description |
|-----------------|-----------|---------------------------------------------------------| |-----------------|-----------|---------------------------------------------------------|
@ -282,12 +284,13 @@ Route: `/zip/{cid}/{prefix}`
#### GET #### GET
Find objects by prefix for `FilePath` attributes. Return found objects in zip archive. Find objects by prefix for `FilePath` attributes. Return found objects in zip or tar archive.
Name of files in archive sets to `FilePath` attribute of objects. Name of files in archive sets to `FilePath` attribute of objects.
Time of files sets to time when object has started downloading. Time of files sets to time when object has started downloading.
You can download all files in container that have `FilePath` attribute by `/zip/{cid}/` route. You can download all files in container that have `FilePath` attribute by `/zip/{cid}/` or
`/tar/{cid}/` route.
Archive can be compressed (see http-gw [configuration](gate-configuration.md#zip-section)). Archive can be compressed (see http-gw [configuration](gate-configuration.md#archive-section)).
##### Request ##### Request

View file

@ -218,9 +218,10 @@ upload_header:
|-------------------------|--------|---------------|---------------|-------------------------------------------------------------| |-------------------------|--------|---------------|---------------|-------------------------------------------------------------|
| `use_default_timestamp` | `bool` | yes | `false` | Create timestamp for object if it isn't provided by header. | | `use_default_timestamp` | `bool` | yes | `false` | Create timestamp for object if it isn't provided by header. |
# `zip` section # `zip` section
> **_DEPRECATED:_** Use archive section instead
```yaml ```yaml
zip: zip:
compression: false compression: false
@ -230,6 +231,17 @@ zip:
|---------------|--------|---------------|---------------|--------------------------------------------------------------| |---------------|--------|---------------|---------------|--------------------------------------------------------------|
| `compression` | `bool` | yes | `false` | Enable zip compression when download files by common prefix. | | `compression` | `bool` | yes | `false` | Enable zip compression when download files by common prefix. |
# `archive` section
```yaml
archive:
compression: false
```
| Parameter | Type | SIGHUP reload | Default value | Description |
|---------------|--------|---------------|---------------|------------------------------------------------------------------|
| `compression` | `bool` | yes | `false` | Enable archive compression when download files by common prefix. |
# `pprof` section # `pprof` section

View file

@ -1,20 +1,21 @@
package handler package handler
import ( import (
"archive/tar"
"archive/zip" "archive/zip"
"bufio" "bufio"
"compress/gzip"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"net/http"
"net/url" "net/url"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -46,7 +47,7 @@ func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) {
return return
} }
req := h.newRequest(c, log) req := newRequest(c, log)
var objID oid.ID var objID oid.ID
if checkS3Err == nil && shouldDownload(oidParam, downloadParam) { if checkS3Err == nil && shouldDownload(oidParam, downloadParam) {
@ -62,13 +63,6 @@ func shouldDownload(oidParam string, downloadParam bool) bool {
return !isDir(oidParam) || downloadParam return !isDir(oidParam) || downloadParam
} }
func (h *Handler) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) request {
return request{
RequestCtx: ctx,
log: log,
}
}
// DownloadByAttribute handles attribute-based download requests. // DownloadByAttribute handles attribute-based download requests.
func (h *Handler) DownloadByAttribute(c *fasthttp.RequestCtx) { func (h *Handler) DownloadByAttribute(c *fasthttp.RequestCtx) {
h.byAttribute(c, h.receiveFile) h.byAttribute(c, h.receiveFile)
@ -90,13 +84,61 @@ func (h *Handler) search(ctx context.Context, cnrID cid.ID, key, val string, op
return h.frostfs.SearchObjects(ctx, prm) return h.frostfs.SearchObjects(ctx, prm)
} }
func (h *Handler) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer, error) { // DownloadZip handles zip by prefix requests.
func (h *Handler) DownloadZip(c *fasthttp.RequestCtx) {
scid, _ := c.UserValue("cid").(string)
ctx := utils.GetContextFromRequest(c)
log := utils.GetReqLogOrDefault(ctx, h.log)
bktInfo, err := h.getBucketInfo(ctx, scid, log)
if err != nil {
logAndSendBucketError(c, log, err)
return
}
resSearch, err := h.searchObjectsByPrefix(c, log, bktInfo.CID)
dkirillov marked this conversation as resolved Outdated

Why do we need log this? I don't think this is error by the way.
And probably there is more straightforward way to know if no objects were found:

diff --git a/internal/handler/download.go b/internal/handler/download.go
index 0c83b0d..cbafaa1 100644
--- a/internal/handler/download.go
+++ b/internal/handler/download.go
@@ -87,8 +87,10 @@ func (h *Handler) getZipResponseWriter(ctx context.Context, log *zap.Logger, res
                zipWriter := zip.NewWriter(w)
                var bufZip []byte
 
+               var writtenObjects int
                errIter := resSearch.Iterate(h.putObjectToArchive(ctx, log, bktInfo.CID, &bufZip,
                        func(obj *object.Object) (io.Writer, error) {
+                               writtenObjects++
                                return h.createZipFile(zipWriter, obj)
                        }),
                )

The same for tar

Why do we need log this? I don't think this is error by the way. And probably there is more straightforward way to know if no objects were found: ```diff diff --git a/internal/handler/download.go b/internal/handler/download.go index 0c83b0d..cbafaa1 100644 --- a/internal/handler/download.go +++ b/internal/handler/download.go @@ -87,8 +87,10 @@ func (h *Handler) getZipResponseWriter(ctx context.Context, log *zap.Logger, res zipWriter := zip.NewWriter(w) var bufZip []byte + var writtenObjects int errIter := resSearch.Iterate(h.putObjectToArchive(ctx, log, bktInfo.CID, &bufZip, func(obj *object.Object) (io.Writer, error) { + writtenObjects++ return h.createZipFile(zipWriter, obj) }), ) ``` The same for `tar`

I can change log to Warn, but I think creating new variable for counting is redundant whilst I can only check for bufzip == nil (or len(bufzip) == 0 for more readability)

UPD: I forgot that I can omit usage of *[]byte in case of using separate counter, so this suggestion seems valid

I can change log to Warn, but I think creating new variable for counting is redundant whilst I can only check for `bufzip == nil` (or `len(bufzip) == 0` for more readability) UPD: I forgot that I can omit usage of *[]byte in case of using separate counter, so this suggestion seems valid
if err != nil {
return
dkirillov marked this conversation as resolved Outdated

We shouldn't invoke zipWriter.Close on writing error. This method finishes writing the zip file by writing the central directory I suppose this in't what we want when writing zipped object to client failed

We shouldn't invoke `zipWriter.Close` on writing error. This method `finishes writing the zip file by writing the central directory` I suppose this in't what we want when writing zipped object to client failed
}
c.Response.Header.Set(fasthttp.HeaderContentType, "application/zip")
c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"")
c.SetBodyStreamWriter(h.getZipResponseWriter(ctx, log, resSearch, bktInfo))
}
func (h *Handler) getZipResponseWriter(ctx context.Context, log *zap.Logger, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) {
return func(w *bufio.Writer) {
defer resSearch.Close()
buf := make([]byte, 3<<20)
zipWriter := zip.NewWriter(w)
var objectsWritten int
errIter := resSearch.Iterate(h.putObjectToArchive(ctx, log, bktInfo.CID, buf,
func(obj *object.Object) (io.Writer, error) {
objectsWritten++
return h.createZipFile(zipWriter, obj)
}),
)
if errIter != nil {
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter))
return
} else if objectsWritten == 0 {
log.Warn(logs.ObjectsNotFound)
}
if err := zipWriter.Close(); err != nil {
log.Error(logs.CloseZipWriter, zap.Error(err))
}
}
}
func (h *Handler) createZipFile(zw *zip.Writer, obj *object.Object) (io.Writer, error) {
method := zip.Store method := zip.Store
dkirillov marked this conversation as resolved Outdated

Probably we shouldn't return here. I would expect that empty archive be downloaded

Probably we shouldn't `return` here. I would expect that empty archive be downloaded
if h.config.ZipCompression() { if h.config.ArchiveCompression() {
method = zip.Deflate method = zip.Deflate
} }
dkirillov marked this conversation as resolved Outdated

Shouldn't it be application/x-gtar ?
https://en.wikipedia.org/wiki/List_of_archive_formats

Shouldn't it be `application/x-gtar` ? https://en.wikipedia.org/wiki/List_of_archive_formats

According to this only application/gzip is presented. So i think it's more common

According to [this](https://www.iana.org/assignments/media-types/media-types.xhtml) only `application/gzip` is presented. So i think it's more common
filePath := getZipFilePath(obj) filePath := getFilePath(obj)
if len(filePath) == 0 || filePath[len(filePath)-1] == '/' { if len(filePath) == 0 || filePath[len(filePath)-1] == '/' {
return nil, fmt.Errorf("invalid filepath '%s'", filePath) return nil, fmt.Errorf("invalid filepath '%s'", filePath)
} }
@ -108,99 +150,139 @@ func (h *Handler) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer,
}) })
} }
// DownloadZipped handles zip by prefix requests. // DownloadTar forms tar.gz from objects by prefix.
func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) { func (h *Handler) DownloadTar(c *fasthttp.RequestCtx) {
dkirillov marked this conversation as resolved Outdated

Matter of taste, but can we use something like this:

		compressionLevel := gzip.NoCompression
		if h.config.ArchiveCompression() {
			compressionLevel = gzip.DefaultCompression
		}

		gzipWriter, _ := gzip.NewWriterLevel(w, compressionLevel)
Matter of taste, but can we use something like this: ```golang compressionLevel := gzip.NoCompression if h.config.ArchiveCompression() { compressionLevel = gzip.DefaultCompression } gzipWriter, _ := gzip.NewWriterLevel(w, compressionLevel) ```
scid, _ := c.UserValue("cid").(string) scid, _ := c.UserValue("cid").(string)
prefix, _ := c.UserValue("prefix").(string)
ctx := utils.GetContextFromRequest(c) ctx := utils.GetContextFromRequest(c)
log := utils.GetReqLogOrDefault(ctx, h.log) log := utils.GetReqLogOrDefault(ctx, h.log)
prefix, err := url.QueryUnescape(prefix)
if err != nil {
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("prefix", prefix), zap.Error(err))
ResponseError(c, "could not unescape prefix: "+err.Error(), fasthttp.StatusBadRequest)
return
}
log = log.With(zap.String("cid", scid), zap.String("prefix", prefix))
bktInfo, err := h.getBucketInfo(ctx, scid, log) bktInfo, err := h.getBucketInfo(ctx, scid, log)
if err != nil { if err != nil {
logAndSendBucketError(c, log, err) logAndSendBucketError(c, log, err)
return return
} }
resSearch, err := h.searchObjectsByPrefix(c, log, bktInfo.CID)
resSearch, err := h.search(ctx, bktInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
if err != nil { if err != nil {
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
ResponseError(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
return return
} }
c.Response.Header.Set(fasthttp.HeaderContentType, "application/zip") c.Response.Header.Set(fasthttp.HeaderContentType, "application/gzip")
c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"") c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.tar.gz\"")
c.Response.SetStatusCode(http.StatusOK)
c.SetBodyStreamWriter(func(w *bufio.Writer) { c.SetBodyStreamWriter(h.getTarResponseWriter(ctx, log, resSearch, bktInfo))
}
func (h *Handler) getTarResponseWriter(ctx context.Context, log *zap.Logger, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) {
return func(w *bufio.Writer) {
defer resSearch.Close() defer resSearch.Close()
zipWriter := zip.NewWriter(w) compressionLevel := gzip.NoCompression
if h.config.ArchiveCompression() {
compressionLevel = gzip.DefaultCompression
}
var bufZip []byte // ignore error because it's not nil only if compressionLevel argument is invalid
var addr oid.Address gzipWriter, _ := gzip.NewWriterLevel(w, compressionLevel)
tarWriter := tar.NewWriter(gzipWriter)
empty := true defer func() {
called := false if err := tarWriter.Close(); err != nil {
btoken := bearerToken(ctx) log.Error(logs.CloseTarWriter, zap.Error(err))
addr.SetContainer(bktInfo.CID)
errIter := resSearch.Iterate(func(id oid.ID) bool {
called = true
if empty {
bufZip = make([]byte, 3<<20) // the same as for upload
} }
empty = false if err := gzipWriter.Close(); err != nil {
nzinkevich marked this conversation as resolved Outdated

Can we add comment why do we ignore the error?

Can we add comment why do we ignore the error?
log.Error(logs.CloseGzipWriter, zap.Error(err))
addr.SetObject(id)
if err = h.zipObject(ctx, zipWriter, addr, btoken, bufZip); err != nil {
log.Error(logs.FailedToAddObjectToArchive, zap.String("oid", id.EncodeToString()), zap.Error(err))
} }
}()
return false var objectsWritten int
dkirillov marked this conversation as resolved Outdated

Do we really need bufZip *[]byte ?
It seem we can use just bufZip []byte and init this buffer in getZipResponseWriter/getTarResponseWriter

Do we really need `bufZip *[]byte` ? It seem we can use just `bufZip []byte` and init this buffer in `getZipResponseWriter/getTarResponseWriter`
}) buf := make([]byte, 3<<20) // the same as for upload
dkirillov marked this conversation as resolved Outdated

Should be

log = log.With(zap.String("oid", id.EncodeToString()))
Should be ```golang log = log.With(zap.String("oid", id.EncodeToString())) ```
errIter := resSearch.Iterate(h.putObjectToArchive(ctx, log, bktInfo.CID, buf,
func(obj *object.Object) (io.Writer, error) {
objectsWritten++
return h.createTarFile(tarWriter, obj)
}),
)
if errIter != nil { if errIter != nil {
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter)) log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter))
} else if !called { } else if objectsWritten == 0 {
log.Error(logs.ObjectsNotFound) log.Warn(logs.ObjectsNotFound)
} }
}
}
if err = zipWriter.Close(); err != nil { func (h *Handler) createTarFile(tw *tar.Writer, obj *object.Object) (io.Writer, error) {
log.Error(logs.CloseZipWriter, zap.Error(err)) filePath := getFilePath(obj)
} if len(filePath) == 0 || filePath[len(filePath)-1] == '/' {
return nil, fmt.Errorf("invalid filepath '%s'", filePath)
}
return tw, tw.WriteHeader(&tar.Header{
Name: filePath,
Mode: 0655,
Size: int64(obj.PayloadSize()),
}) })
} }
func (h *Handler) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error { func (h *Handler) putObjectToArchive(ctx context.Context, log *zap.Logger, cnrID cid.ID, buf []byte, createArchiveHeader func(obj *object.Object) (io.Writer, error)) func(id oid.ID) bool {
prm := PrmObjectGet{ return func(id oid.ID) bool {
PrmAuth: PrmAuth{ log = log.With(zap.String("oid", id.EncodeToString()))
BearerToken: btoken,
},
Address: addr,
}
resGet, err := h.frostfs.GetObject(ctx, prm) prm := PrmObjectGet{
PrmAuth: PrmAuth{
BearerToken: bearerToken(ctx),
},
alexvanin marked this conversation as resolved Outdated

bufZip may confuse in the future. I think it's just a buf now for both tar ang zip archives.

`bufZip` may confuse in the future. I think it's just a `buf` now for both tar ang zip archives.
Address: newAddress(cnrID, id),
}
resGet, err := h.frostfs.GetObject(ctx, prm)
if err != nil {
log.Error(logs.FailedToGetObject, zap.Error(err))
return false
}
fileWriter, err := createArchiveHeader(&resGet.Header)
if err != nil {
log.Error(logs.FailedToAddObjectToArchive, zap.Error(err))
return false
}
if err = writeToArchive(resGet, fileWriter, buf); err != nil {
log.Error(logs.FailedToAddObjectToArchive, zap.Error(err))
return false
}
return false
}
}
dkirillov marked this conversation as resolved Outdated

It seems this we don't use h. So function should be

func writeToArchive(resGet *Object, objWriter io.Writer, bufZip []byte) error {
It seems this we don't use `h`. So function should be ```golang func writeToArchive(resGet *Object, objWriter io.Writer, bufZip []byte) error { ```
func (h *Handler) searchObjectsByPrefix(c *fasthttp.RequestCtx, log *zap.Logger, cnrID cid.ID) (ResObjectSearch, error) {
scid, _ := c.UserValue("cid").(string)
prefix, _ := c.UserValue("prefix").(string)
ctx := utils.GetContextFromRequest(c)
prefix, err := url.QueryUnescape(prefix)
if err != nil { if err != nil {
return fmt.Errorf("get FrostFS object: %v", err) log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("prefix", prefix), zap.Error(err))
ResponseError(c, "could not unescape prefix: "+err.Error(), fasthttp.StatusBadRequest)
return nil, err
} }
objWriter, err := h.addObjectToZip(zipWriter, &resGet.Header) log = log.With(zap.String("cid", scid), zap.String("prefix", prefix))
resSearch, err := h.search(ctx, cnrID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
if err != nil { if err != nil {
return fmt.Errorf("zip create header: %v", err) log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
ResponseError(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
return nil, err
} }
return resSearch, nil
}
if _, err = io.CopyBuffer(objWriter, resGet.Payload, bufZip); err != nil { func writeToArchive(resGet *Object, objWriter io.Writer, buf []byte) error {
var err error
if _, err = io.CopyBuffer(objWriter, resGet.Payload, buf); err != nil {
return fmt.Errorf("copy object payload to zip file: %v", err) return fmt.Errorf("copy object payload to zip file: %v", err)
} }
@ -208,14 +290,10 @@ func (h *Handler) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid
return fmt.Errorf("object body close error: %w", err) return fmt.Errorf("object body close error: %w", err)
} }
if err = zipWriter.Flush(); err != nil {
return fmt.Errorf("flush zip writer: %v", err)
}
return nil return nil
} }
func getZipFilePath(obj *object.Object) string { func getFilePath(obj *object.Object) string {
for _, attr := range obj.Attributes() { for _, attr := range obj.Attributes() {
if attr.Key() == object.AttributeFilePath { if attr.Key() == object.AttributeFilePath {
return attr.Value() return attr.Value()

View file

@ -28,7 +28,7 @@ import (
type Config interface { type Config interface {
DefaultTimestamp() bool DefaultTimestamp() bool
ZipCompression() bool ArchiveCompression() bool
ClientCut() bool ClientCut() bool
IndexPageEnabled() bool IndexPageEnabled() bool
IndexPageTemplate() string IndexPageTemplate() string
@ -216,7 +216,7 @@ func (h *Handler) byS3Path(ctx context.Context, req request, cnrID cid.ID, path
} }
addr := newAddress(cnrID, foundOID.OID) addr := newAddress(cnrID, foundOID.OID)
handler(ctx, h.newRequest(c, log), addr) handler(ctx, newRequest(c, log), addr)
} }
// byAttribute is a wrapper similar to byNativeAddress. // byAttribute is a wrapper similar to byNativeAddress.
@ -265,7 +265,7 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, handler func(context.Conte
addr.SetContainer(bktInfo.CID) addr.SetContainer(bktInfo.CID)
addr.SetObject(objID) addr.SetObject(objID)
handler(ctx, h.newRequest(c, log), addr) handler(ctx, newRequest(c, log), addr)
alexvanin marked this conversation as resolved Outdated

Can you describe this change? In my opinion we should either:

  • keep newRequest and remove (h *Handler) newRequest
  • do not add newRequest at all.

Both are okay for me.

Can you describe this change? In my opinion we should either: - keep `newRequest` and remove `(h *Handler) newRequest` - do not add `newRequest` at all. Both are okay for me.
} }
func (h *Handler) findObjectByAttribute(ctx context.Context, log *zap.Logger, cnrID cid.ID, attrKey, attrVal string) (oid.ID, error) { func (h *Handler) findObjectByAttribute(ctx context.Context, log *zap.Logger, cnrID cid.ID, attrKey, attrVal string) (oid.ID, error) {

View file

@ -517,7 +517,7 @@ func DoFuzzDownloadZipped(input []byte) int {
r.SetUserValue("cid", cid) r.SetUserValue("cid", cid)
r.SetUserValue("prefix", prefix) r.SetUserValue("prefix", prefix)
hc.Handler().DownloadZipped(r) hc.Handler().DownloadZip(r)
return fuzzSuccessExitCode return fuzzSuccessExitCode
} }

View file

@ -66,7 +66,7 @@ func (c *configMock) DefaultTimestamp() bool {
return false return false
} }
func (c *configMock) ZipCompression() bool { func (c *configMock) ArchiveCompression() bool {
return false return false
} }
@ -250,7 +250,7 @@ func TestBasic(t *testing.T) {
t.Run("zip", func(t *testing.T) { t.Run("zip", func(t *testing.T) {
r = prepareGetZipped(ctx, bktName, "") r = prepareGetZipped(ctx, bktName, "")
hc.Handler().DownloadZipped(r) hc.Handler().DownloadZip(r)
readerAt := bytes.NewReader(r.Response.Body()) readerAt := bytes.NewReader(r.Response.Body())
zipReader, err := zip.NewReader(readerAt, int64(len(r.Response.Body()))) zipReader, err := zip.NewReader(readerAt, int64(len(r.Response.Body())))

View file

@ -135,7 +135,7 @@ func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
return return
} }
req := h.newRequest(c, log) req := newRequest(c, log)
var objID oid.ID var objID oid.ID
if checkS3Err == nil { if checkS3Err == nil {

View file

@ -42,7 +42,9 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF
// ignore multipart/form-data values // ignore multipart/form-data values
if filename == "" { if filename == "" {
l.Debug(logs.IgnorePartEmptyFilename, zap.String("form", name)) l.Debug(logs.IgnorePartEmptyFilename, zap.String("form", name))
if err = part.Close(); err != nil {
l.Warn(logs.FailedToCloseReader, zap.Error(err))
}
continue continue
} }

View file

@ -1,13 +1,19 @@
package handler package handler
import ( import (
"archive/tar"
"bytes"
"compress/gzip"
"context" "context"
"encoding/json" "encoding/json"
"errors"
"io" "io"
"net/http" "net/http"
"path/filepath"
"strconv" "strconv"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils" "git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
@ -19,8 +25,9 @@ import (
) )
const ( const (
jsonHeader = "application/json; charset=UTF-8" jsonHeader = "application/json; charset=UTF-8"
drainBufSize = 4096 drainBufSize = 4096
explodeArchiveHeader = "X-Explode-Archive"
) )
type putResponse struct { type putResponse struct {
@ -43,11 +50,7 @@ func (pr *putResponse) encode(w io.Writer) error {
// Upload handles multipart upload request. // Upload handles multipart upload request.
func (h *Handler) Upload(c *fasthttp.RequestCtx) { func (h *Handler) Upload(c *fasthttp.RequestCtx) {
var ( var file MultipartFile
file MultipartFile
idObj oid.ID
addr oid.Address
)
scid, _ := c.UserValue("cid").(string) scid, _ := c.UserValue("cid").(string)
bodyStream := c.RequestBodyStream() bodyStream := c.RequestBodyStream()
@ -63,20 +66,6 @@ func (h *Handler) Upload(c *fasthttp.RequestCtx) {
return return
} }
defer func() {
// If the temporary reader can be closed - let's close it.
if file == nil {
return
}
err := file.Close()
log.Debug(
logs.CloseTemporaryMultipartFormFile,
zap.Stringer("address", addr),
zap.String("filename", file.FileName()),
zap.Error(err),
)
}()
boundary := string(c.Request.Header.MultipartFormBoundary()) boundary := string(c.Request.Header.MultipartFormBoundary())
if file, err = fetchMultipartFile(log, bodyStream, boundary); err != nil { if file, err = fetchMultipartFile(log, bodyStream, boundary); err != nil {
log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err)) log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err))
@ -86,53 +75,69 @@ func (h *Handler) Upload(c *fasthttp.RequestCtx) {
filtered, err := filterHeaders(log, &c.Request.Header) filtered, err := filterHeaders(log, &c.Request.Header)
if err != nil { if err != nil {
log.Error(logs.CouldNotProcessHeaders, zap.Error(err)) log.Error(logs.FailedToFilterHeaders, zap.Error(err))
ResponseError(c, err.Error(), fasthttp.StatusBadRequest) ResponseError(c, err.Error(), fasthttp.StatusBadRequest)
return return
} }
now := time.Now() if c.Request.Header.Peek(explodeArchiveHeader) != nil {
if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil { h.explodeArchive(request{c, log}, bktInfo, file, filtered)
if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil { } else {
log.Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err)) h.uploadSingleObject(request{c, log}, bktInfo, file, filtered)
} else {
now = parsed
}
} }
if err = utils.PrepareExpirationHeader(c, h.frostfs, filtered, now); err != nil { // Multipart is multipart and thus can contain more than one part which
log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err)) // we ignore at the moment. Also, when dealing with chunked encoding
ResponseError(c, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest) // the last zero-length chunk might be left unread (because multipart
dkirillov marked this conversation as resolved Outdated

Is this different from #176 ?
If no than we should either:

  • keep only one PR with logically separated commits (upload/download etc)
  • make this PR draft and mention that it requires #176 (and again commits be different)

If yes than why do we ever need #176 ?

Is this different from https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/pulls/176 ? If no than we should either: * keep only one PR with logically separated commits (upload/download etc) * make this PR draft and mention that it requires #176 (and again commits be different) If yes than why do we ever need #176 ?

I accidently include changes from #176 here. For now I think I should remain this PR with commits for download/upload

I accidently include changes from [#176](https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/issues/176) here. For now I think I should remain this PR with commits for download/upload
// reader only cares about its boundary and doesn't look further) and
// it will be (erroneously) interpreted as the start of the next
// pipelined header. Thus, we need to drain the body buffer.
for {
_, err = bodyStream.Read(drainBuf)
if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) {
break
}
}
}
func (h *Handler) uploadSingleObject(req request, bkt *data.BucketInfo, file MultipartFile, filtered map[string]string) {
c, log := req.RequestCtx, req.log
setIfNotExist(filtered, object.AttributeFileName, file.FileName())
attributes, err := h.extractAttributes(c, log, filtered)
if err != nil {
log.Error(logs.FailedToGetAttributes, zap.Error(err))
ResponseError(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest)
return return
} }
attributes := make([]object.Attribute, 0, len(filtered)) idObj, err := h.uploadObject(c, bkt, attributes, file)
// prepares attributes from filtered headers if err != nil {
for key, val := range filtered { h.handlePutFrostFSErr(c, err, log)
attribute := object.NewAttribute() return
attribute.SetKey(key)
attribute.SetValue(val)
attributes = append(attributes, *attribute)
} }
// sets FileName attribute if it wasn't set from header log.Debug(logs.ObjectUploaded,
if _, ok := filtered[object.AttributeFileName]; !ok { zap.String("oid", idObj.EncodeToString()),
filename := object.NewAttribute() zap.String("FileName", file.FileName()),
filename.SetKey(object.AttributeFileName) )
filename.SetValue(file.FileName())
attributes = append(attributes, *filename) addr := newAddress(bkt.CID, idObj)
} c.Response.Header.SetContentType(jsonHeader)
// sets Timestamp attribute if it wasn't set from header and enabled by settings // Try to return the response, otherwise, if something went wrong, throw an error.
if _, ok := filtered[object.AttributeTimestamp]; !ok && h.config.DefaultTimestamp() { if err = newPutResponse(addr).encode(c); err != nil {
timestamp := object.NewAttribute() log.Error(logs.CouldNotEncodeResponse, zap.Error(err))
timestamp.SetKey(object.AttributeTimestamp) ResponseError(c, "could not encode response", fasthttp.StatusBadRequest)
timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10)) return
attributes = append(attributes, *timestamp)
} }
}
func (h *Handler) uploadObject(c *fasthttp.RequestCtx, bkt *data.BucketInfo, attrs []object.Attribute, file io.Reader) (oid.ID, error) {
ctx := utils.GetContextFromRequest(c)
obj := object.New() obj := object.New()
obj.SetContainerID(bktInfo.CID) obj.SetContainerID(bkt.CID)
obj.SetOwnerID(*h.ownerID) obj.SetOwnerID(*h.ownerID)
obj.SetAttributes(attributes...) obj.SetAttributes(attrs...)
prm := PrmObjectCreate{ prm := PrmObjectCreate{
PrmAuth: PrmAuth{ PrmAuth: PrmAuth{
@ -141,40 +146,120 @@ func (h *Handler) Upload(c *fasthttp.RequestCtx) {
Object: obj, Object: obj,
Payload: file, Payload: file,
ClientCut: h.config.ClientCut(), ClientCut: h.config.ClientCut(),
WithoutHomomorphicHash: bktInfo.HomomorphicHashDisabled, WithoutHomomorphicHash: bkt.HomomorphicHashDisabled,
BufferMaxSize: h.config.BufferMaxSizeForPut(), BufferMaxSize: h.config.BufferMaxSizeForPut(),
} }
if idObj, err = h.frostfs.CreateObject(ctx, prm); err != nil { idObj, err := h.frostfs.CreateObject(ctx, prm)
h.handlePutFrostFSErr(c, err, log) if err != nil {
return return oid.ID{}, err
} }
addr.SetObject(idObj) return idObj, nil
addr.SetContainer(bktInfo.CID) }
// Try to return the response, otherwise, if something went wrong, throw an error. func (h *Handler) extractAttributes(c *fasthttp.RequestCtx, log *zap.Logger, filtered map[string]string) ([]object.Attribute, error) {
if err = newPutResponse(addr).encode(c); err != nil { now := time.Now()
log.Error(logs.CouldNotEncodeResponse, zap.Error(err)) if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
ResponseError(c, "could not encode response", fasthttp.StatusBadRequest) if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil {
log.Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err))
return } else {
} now = parsed
// Multipart is multipart and thus can contain more than one part which
// we ignore at the moment. Also, when dealing with chunked encoding
// the last zero-length chunk might be left unread (because multipart
// reader only cares about its boundary and doesn't look further) and
// it will be (erroneously) interpreted as the start of the next
// pipelined header. Thus we need to drain the body buffer.
for {
_, err = bodyStream.Read(drainBuf)
if err == io.EOF || err == io.ErrUnexpectedEOF {
break
} }
} }
// Report status code and content type. if err := utils.PrepareExpirationHeader(c, h.frostfs, filtered, now); err != nil {
c.Response.SetStatusCode(fasthttp.StatusOK) log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err))
c.Response.Header.SetContentType(jsonHeader) return nil, err
}
attributes := make([]object.Attribute, 0, len(filtered))
// prepares attributes from filtered headers
for key, val := range filtered {
attribute := newAttribute(key, val)
attributes = append(attributes, attribute)
}
// sets Timestamp attribute if it wasn't set from header and enabled by settings
if _, ok := filtered[object.AttributeTimestamp]; !ok && h.config.DefaultTimestamp() {
timestamp := newAttribute(object.AttributeTimestamp, strconv.FormatInt(time.Now().Unix(), 10))
attributes = append(attributes, timestamp)
}
return attributes, nil
}
func newAttribute(key string, val string) object.Attribute {
attr := object.NewAttribute()
dkirillov marked this conversation as resolved Outdated

Let's use

func newAttribute(key string, val string) object.Attribute {
Let's use ```golang func newAttribute(key string, val string) object.Attribute { ```
attr.SetKey(key)
attr.SetValue(val)
return *attr
}
// explodeArchive read files from archive and creates objects for each of them.
// Sets FilePath attribute with name from tar.Header.
func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.ReadCloser, filtered map[string]string) {
c, log := req.RequestCtx, req.log
// remove user attributes which vary for each file in archive
// to guarantee that they won't appear twice
delete(filtered, object.AttributeFileName)
delete(filtered, object.AttributeFilePath)
commonAttributes, err := h.extractAttributes(c, log, filtered)
if err != nil {
log.Error(logs.FailedToGetAttributes, zap.Error(err))
ResponseError(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest)
return
}
attributes := commonAttributes
reader := file
if bytes.EqualFold(c.Request.Header.Peek(fasthttp.HeaderContentEncoding), []byte("gzip")) {
dkirillov marked this conversation as resolved Outdated

Suggestion:

diff --git a/internal/handler/upload.go b/internal/handler/upload.go
index fb54e6c..2d1912e 100644
--- a/internal/handler/upload.go
+++ b/internal/handler/upload.go
@@ -204,18 +204,14 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
        delete(filtered, object.AttributeFileName)
        delete(filtered, object.AttributeFilePath)
 
-       attributes, err := h.extractAttributes(c, log, filtered)
+       commonAttributes, err := h.extractAttributes(c, log, filtered)
        if err != nil {
                log.Error(logs.FailedToGetAttributes, zap.Error(err))
                response.Error(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest)
                return
        }
-       attributes = append(attributes,
-               *newAttribute(object.AttributeFileName, ""),
-               *newAttribute(object.AttributeFilePath, ""),
-       )
-       attrlen := len(attributes)
-       filePathIndex, fileNameIndex := attrlen-1, attrlen-2
+
+       attributes := commonAttributes
 
        var reader = file
        if bytes.EqualFold(c.Request.Header.Peek(fasthttp.HeaderContentEncoding), []byte("gzip")) {
@@ -249,10 +245,10 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read
                        continue
                }
 
-               // set varying attributes
+               attributes = attributes[:len(commonAttributes)]
                fileName := filepath.Base(obj.Name)
-               attributes[filePathIndex].SetValue(obj.Name)
-               attributes[fileNameIndex].SetValue(fileName)
+               attributes = append(attributes, *newAttribute(object.AttributeFilePath, obj.Name))
+               attributes = append(attributes, *newAttribute(object.AttributeFileName, fileName))
 
                idObj, err := h.uploadObject(c, bkt, attributes, tarReader)
                if err != nil {

Suggestion: ```diff diff --git a/internal/handler/upload.go b/internal/handler/upload.go index fb54e6c..2d1912e 100644 --- a/internal/handler/upload.go +++ b/internal/handler/upload.go @@ -204,18 +204,14 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read delete(filtered, object.AttributeFileName) delete(filtered, object.AttributeFilePath) - attributes, err := h.extractAttributes(c, log, filtered) + commonAttributes, err := h.extractAttributes(c, log, filtered) if err != nil { log.Error(logs.FailedToGetAttributes, zap.Error(err)) response.Error(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest) return } - attributes = append(attributes, - *newAttribute(object.AttributeFileName, ""), - *newAttribute(object.AttributeFilePath, ""), - ) - attrlen := len(attributes) - filePathIndex, fileNameIndex := attrlen-1, attrlen-2 + + attributes := commonAttributes var reader = file if bytes.EqualFold(c.Request.Header.Peek(fasthttp.HeaderContentEncoding), []byte("gzip")) { @@ -249,10 +245,10 @@ func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.Read continue } - // set varying attributes + attributes = attributes[:len(commonAttributes)] fileName := filepath.Base(obj.Name) - attributes[filePathIndex].SetValue(obj.Name) - attributes[fileNameIndex].SetValue(fileName) + attributes = append(attributes, *newAttribute(object.AttributeFilePath, obj.Name)) + attributes = append(attributes, *newAttribute(object.AttributeFileName, fileName)) idObj, err := h.uploadObject(c, bkt, attributes, tarReader) if err != nil { ```
log.Debug(logs.GzipReaderSelected)
gzipReader, err := gzip.NewReader(file)
if err != nil {
log.Error(logs.FailedToCreateGzipReader, zap.Error(err))
ResponseError(c, "could read gzip file: "+err.Error(), fasthttp.StatusBadRequest)
dkirillov marked this conversation as resolved Outdated

Please, write:

reader := file
Please, write: ```golang reader := file ```
return
}
defer func() {
if err := gzipReader.Close(); err != nil {
log.Warn(logs.FailedToCloseReader, zap.Error(err))
}
}()
dkirillov marked this conversation as resolved Outdated

Maybe we can use Content-Encoding header for this purpose?
https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding

Maybe we can use `Content-Encoding` header for this purpose? https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
reader = gzipReader
}
tarReader := tar.NewReader(reader)
for {
obj, err := tarReader.Next()
if errors.Is(err, io.EOF) {
break
dkirillov marked this conversation as resolved Outdated

Probably we can extract attributes (at least common) once before the loop.

Probably we can extract attributes (at least common) once before the loop.
} else if err != nil {
log.Error(logs.FailedToReadFileFromTar, zap.Error(err))
ResponseError(c, "could not get next entry: "+err.Error(), fasthttp.StatusBadRequest)
return
}
if isDir(obj.Name) {
nzinkevich marked this conversation as resolved Outdated

The prefix "X-Attribute-" isn't needed for the FilePath attribute

The prefix "X-Attribute-" isn't needed for the FilePath attribute

We also should check (and probably drop if it exists) if the FilePath attribute already presence in attributes. Otherwise (as I remember) we will get error duplicate attributes from storage.

We also should check (and probably drop if it exists) if the `FilePath` attribute already presence in `attributes`. Otherwise (as I remember) we will get error `duplicate attributes` from storage.
continue
}
// set varying attributes
attributes = attributes[:len(commonAttributes)]
fileName := filepath.Base(obj.Name)
attributes = append(attributes, newAttribute(object.AttributeFilePath, obj.Name))
attributes = append(attributes, newAttribute(object.AttributeFileName, fileName))
idObj, err := h.uploadObject(c, bkt, attributes, tarReader)
if err != nil {
h.handlePutFrostFSErr(c, err, log)
return
}
log.Debug(logs.ObjectUploaded,
zap.String("oid", idObj.EncodeToString()),
zap.String("FileName", fileName),
)
}
} }
func (h *Handler) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error, log *zap.Logger) { func (h *Handler) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error, log *zap.Logger) {

View file

@ -24,6 +24,13 @@ type request struct {
log *zap.Logger log *zap.Logger
} }
func newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) request {
return request{
RequestCtx: ctx,
log: log,
}
}
func (r *request) handleFrostFSErr(err error, start time.Time) { func (r *request) handleFrostFSErr(err error, start time.Time) {
logFields := []zap.Field{ logFields := []zap.Field{
zap.Stringer("elapsed", time.Since(start)), zap.Stringer("elapsed", time.Since(start)),
@ -94,6 +101,13 @@ func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
return addr return addr
} }

thoughts: I'm not sure if we need this function. But this matter of taste. I'm ok to keep it

thoughts: I'm not sure if we need this function. But this matter of taste. I'm ok to keep it
// setIfNotExist sets key value to map if key is not present yet.
func setIfNotExist(m map[string]string, key, value string) {
if _, ok := m[key]; !ok {
m[key] = value
}
}
func ResponseError(r *fasthttp.RequestCtx, msg string, code int) { func ResponseError(r *fasthttp.RequestCtx, msg string, code int) {
r.Error(msg+"\n", code) r.Error(msg+"\n", code)
} }

View file

@ -1,89 +1,81 @@
package logs package logs
const ( const (
CouldntParseCreationDate = "couldn't parse creation date" // Info in ../../downloader/* CouldntParseCreationDate = "couldn't parse creation date"
CouldNotDetectContentTypeFromPayload = "could not detect Content-Type from payload" // Error in ../../downloader/download.go CouldNotDetectContentTypeFromPayload = "could not detect Content-Type from payload"
CouldNotReceiveObject = "could not receive object" // Error in ../../downloader/download.go CouldNotReceiveObject = "could not receive object"
WrongObjectID = "wrong object id" // Error in ../../downloader/download.go ObjectWasDeleted = "object was deleted"
GetLatestObjectVersion = "get latest object version" // Error in ../../downloader/download.go CouldNotSearchForObjects = "could not search for objects"
ObjectWasDeleted = "object was deleted" // Error in ../../downloader/download.go ObjectNotFound = "object not found"
CouldNotSearchForObjects = "could not search for objects" // Error in ../../downloader/download.go ReadObjectListFailed = "read object list failed"
ObjectNotFound = "object not found" // Error in ../../downloader/download.go FailedToAddObjectToArchive = "failed to add object to archive"
ReadObjectListFailed = "read object list failed" // Error in ../../downloader/download.go FailedToGetObject = "failed to get object"
FailedToAddObjectToArchive = "failed to add object to archive" // Error in ../../downloader/download.go IteratingOverSelectedObjectsFailed = "iterating over selected objects failed"
IteratingOverSelectedObjectsFailed = "iterating over selected objects failed" // Error in ../../downloader/download.go ObjectsNotFound = "objects not found"
ObjectsNotFound = "objects not found" // Error in ../../downloader/download.go CloseZipWriter = "close zip writer"
CloseZipWriter = "close zip writer" // Error in ../../downloader/download.go ServiceIsRunning = "service is running"
ServiceIsRunning = "service is running" // Info in ../../metrics/service.go ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port"
ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port" // Warn in ../../metrics/service.go ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled"
ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled" // Info in ../../metrics/service.go ShuttingDownService = "shutting down service"
nzinkevich marked this conversation as resolved Outdated

Please don't add comments at the line end

Please don't add comments at the line end

Shall we remove the rest of the comments in file?

Shall we remove the rest of the comments in file?

Yes, we should remove this either:

  • In separate commit
  • In separate issue/PR
Yes, we should remove this either: * In separate commit * In separate issue/PR
ShuttingDownService = "shutting down service" // Info in ../../metrics/service.go CantShutDownService = "can't shut down service"
CantShutDownService = "can't shut down service" // Panic in ../../metrics/service.go CantGracefullyShutDownService = "can't gracefully shut down service, force stop"
CantGracefullyShutDownService = "can't gracefully shut down service, force stop" // Error in ../../metrics/service.go IgnorePartEmptyFormName = "ignore part, empty form name"
IgnorePartEmptyFormName = "ignore part, empty form name" // Debug in ../../uploader/upload.go IgnorePartEmptyFilename = "ignore part, empty filename"
IgnorePartEmptyFilename = "ignore part, empty filename" // Debug in ../../uploader/upload.go CouldNotReceiveMultipartForm = "could not receive multipart/form"
CloseTemporaryMultipartFormFile = "close temporary multipart/form file" // Debug in ../../uploader/upload.go CouldNotParseClientTime = "could not parse client time"
CouldNotReceiveMultipartForm = "could not receive multipart/form" // Error in ../../uploader/upload.go CouldNotPrepareExpirationHeader = "could not prepare expiration header"
CouldNotProcessHeaders = "could not process headers" // Error in ../../uploader/upload.go CouldNotEncodeResponse = "could not encode response"
CouldNotParseClientTime = "could not parse client time" // Warn in ../../uploader/upload.go CouldNotStoreFileInFrostfs = "could not store file in frostfs"
CouldNotPrepareExpirationHeader = "could not prepare expiration header" // Error in ../../uploader/upload.go AddAttributeToResultObject = "add attribute to result object"
CouldNotEncodeResponse = "could not encode response" // Error in ../../uploader/upload.go FailedToCreateResolver = "failed to create resolver"
CouldNotStoreFileInFrostfs = "could not store file in frostfs" // Error in ../../uploader/upload.go FailedToCreateWorkerPool = "failed to create worker pool"
AddAttributeToResultObject = "add attribute to result object" // Debug in ../../uploader/filter.go FailedToReadIndexPageTemplate = "failed to read index page template"
FailedToCreateResolver = "failed to create resolver" // Fatal in ../../app.go SetCustomIndexPageTemplate = "set custom index page template"
FailedToCreateWorkerPool = "failed to create worker pool" // Fatal in ../../app.go ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty = "container resolver will be disabled because of resolvers 'resolver_order' is empty"
FailedToReadIndexPageTemplate = "failed to read index page template" // Error in ../../app.go MetricsAreDisabled = "metrics are disabled"
SetCustomIndexPageTemplate = "set custom index page template" // Info in ../../app.go NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun = "no wallet path specified, creating ephemeral key automatically for this run"
ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty = "container resolver will be disabled because of resolvers 'resolver_order' is empty" // Info in ../../app.go StartingApplication = "starting application"
MetricsAreDisabled = "metrics are disabled" // Warn in ../../app.go StartingServer = "starting server"
NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun = "no wallet path specified, creating ephemeral key automatically for this run" // Info in ../../app.go ListenAndServe = "listen and serve"
StartingApplication = "starting application" // Info in ../../app.go ShuttingDownWebServer = "shutting down web server"
StartingServer = "starting server" // Info in ../../app.go FailedToShutdownTracing = "failed to shutdown tracing"
ListenAndServe = "listen and serve" // Fatal in ../../app.go SIGHUPConfigReloadStarted = "SIGHUP config reload started"
ShuttingDownWebServer = "shutting down web server" // Info in ../../app.go FailedToReloadConfigBecauseItsMissed = "failed to reload config because it's missed"
FailedToShutdownTracing = "failed to shutdown tracing" // Warn in ../../app.go FailedToReloadConfig = "failed to reload config"
SIGHUPConfigReloadStarted = "SIGHUP config reload started" // Info in ../../app.go LogLevelWontBeUpdated = "log level won't be updated"
FailedToReloadConfigBecauseItsMissed = "failed to reload config because it's missed" // Warn in ../../app.go FailedToUpdateResolvers = "failed to update resolvers"
FailedToReloadConfig = "failed to reload config" // Warn in ../../app.go FailedToReloadServerParameters = "failed to reload server parameters"
LogLevelWontBeUpdated = "log level won't be updated" // Warn in ../../app.go SIGHUPConfigReloadCompleted = "SIGHUP config reload completed"
FailedToUpdateResolvers = "failed to update resolvers" // Warn in ../../app.go AddedPathUploadCid = "added path /upload/{cid}"
FailedToReloadServerParameters = "failed to reload server parameters" // Warn in ../../app.go AddedPathGetCidOid = "added path /get/{cid}/{oid}"
SIGHUPConfigReloadCompleted = "SIGHUP config reload completed" // Info in ../../app.go AddedPathGetByAttributeCidAttrKeyAttrVal = "added path /get_by_attribute/{cid}/{attr_key}/{attr_val:*}"
AddedPathUploadCid = "added path /upload/{cid}" // Info in ../../app.go AddedPathZipCidPrefix = "added path /zip/{cid}/{prefix}"
AddedPathGetCidOid = "added path /get/{cid}/{oid}" // Info in ../../app.go Request = "request"
AddedPathGetByAttributeCidAttrKeyAttrVal = "added path /get_by_attribute/{cid}/{attr_key}/{attr_val:*}" // Info in ../../app.go CouldNotFetchAndStoreBearerToken = "could not fetch and store bearer token"
AddedPathZipCidPrefix = "added path /zip/{cid}/{prefix}" // Info in ../../app.go FailedToAddServer = "failed to add server"
Request = "request" // Info in ../../app.go AddServer = "add server"
CouldNotFetchAndStoreBearerToken = "could not fetch and store bearer token" // Error in ../../app.go NoHealthyServers = "no healthy servers"
FailedToAddServer = "failed to add server" // Warn in ../../app.go FailedToInitializeTracing = "failed to initialize tracing"
AddServer = "add server" // Info in ../../app.go TracingConfigUpdated = "tracing config updated"
NoHealthyServers = "no healthy servers" // Fatal in ../../app.go ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided = "resolver nns won't be used since rpc_endpoint isn't provided"
FailedToInitializeTracing = "failed to initialize tracing" // Warn in ../../app.go RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
TracingConfigUpdated = "tracing config updated" // Info in ../../app.go RuntimeSoftMemoryLimitUpdated = "soft runtime memory limit value updated"
ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided = "resolver nns won't be used since rpc_endpoint isn't provided" // Warn in ../../app.go CouldNotLoadFrostFSPrivateKey = "could not load FrostFS private key"
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped" // Warn in ../../app.go UsingCredentials = "using credentials"
RuntimeSoftMemoryLimitUpdated = "soft runtime memory limit value updated" // Info in ../../app.go FailedToCreateConnectionPool = "failed to create connection pool"
CouldNotLoadFrostFSPrivateKey = "could not load FrostFS private key" // Fatal in ../../settings.go FailedToDialConnectionPool = "failed to dial connection pool"
UsingCredentials = "using credentials" // Info in ../../settings.go FailedToCreateTreePool = "failed to create tree pool"
FailedToCreateConnectionPool = "failed to create connection pool" // Fatal in ../../settings.go FailedToDialTreePool = "failed to dial tree pool"
FailedToDialConnectionPool = "failed to dial connection pool" // Fatal in ../../settings.go AddedStoragePeer = "added storage peer"
FailedToCreateTreePool = "failed to create tree pool" // Fatal in ../../settings.go CouldntGetBucket = "could not get bucket"
FailedToDialTreePool = "failed to dial tree pool" // Fatal in ../../settings.go CouldntPutBucketIntoCache = "couldn't put bucket info into cache"
AddedStoragePeer = "added storage peer" // Info in ../../settings.go FailedToSumbitTaskToPool = "failed to submit task to pool"
CouldntGetBucket = "could not get bucket" // Error in ../handler/utils.go FailedToHeadObject = "failed to head object"
CouldntPutBucketIntoCache = "couldn't put bucket info into cache" // Warn in ../handler/handler.go FailedToIterateOverResponse = "failed to iterate over search response"
FailedToSumbitTaskToPool = "failed to submit task to pool" // Error in ../handler/browse.go InvalidCacheEntryType = "invalid cache entry type"
FailedToHeadObject = "failed to head object" // Error in ../handler/browse.go InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)"
FailedToIterateOverResponse = "failed to iterate over search response" // Error in ../handler/browse.go InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value"
InvalidCacheEntryType = "invalid cache entry type" // Warn in ../cache/buckets.go
InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/http-gw/settings.go
InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/http-gw/settings.go
FailedToUnescapeQuery = "failed to unescape query" FailedToUnescapeQuery = "failed to unescape query"
FailedToParseAddressInTreeNode = "failed to parse object addr in tree node"
SettingsNodeInvalidOwnerKey = "settings node: invalid owner key"
SystemNodeHasMultipleIDs = "system node has multiple ids"
FailedToRemoveOldSystemNode = "failed to remove old system node"
BucketSettingsNodeHasMultipleIDs = "bucket settings node has multiple ids"
ServerReconnecting = "reconnecting server..." ServerReconnecting = "reconnecting server..."
ServerReconnectedSuccessfully = "server reconnected successfully" ServerReconnectedSuccessfully = "server reconnected successfully"
ServerReconnectFailed = "failed to reconnect server" ServerReconnectFailed = "failed to reconnect server"
@ -94,4 +86,13 @@ const (
MultinetConfigWontBeUpdated = "multinet config won't be updated" MultinetConfigWontBeUpdated = "multinet config won't be updated"
ObjectNotFoundByFilePathTrySearchByFileName = "object not found by filePath attribute, try search by fileName" ObjectNotFoundByFilePathTrySearchByFileName = "object not found by filePath attribute, try search by fileName"
CouldntCacheNetmap = "couldn't cache netmap" CouldntCacheNetmap = "couldn't cache netmap"
FailedToFilterHeaders = "failed to filter headers"
FailedToReadFileFromTar = "failed to read file from tar"
FailedToGetAttributes = "failed to get attributes"
ObjectUploaded = "object uploaded"
CloseGzipWriter = "close gzip writer"
CloseTarWriter = "close tar writer"
FailedToCloseReader = "failed to close reader"
FailedToCreateGzipReader = "failed to create gzip reader"
GzipReaderSelected = "gzip reader selected"
) )