forked from TrueCloudLab/frostfs-http-gw
[#170] Support tar.gz downloading
Split DownloadZip handler on methods. Add handler DownloadTar for downloading tar.gz archives. Make methods more universal for using in both implementations Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
This commit is contained in:
parent
a7617514d3
commit
7901d00924
12 changed files with 214 additions and 94 deletions
|
@ -97,7 +97,7 @@ type (
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
defaultTimestamp bool
|
defaultTimestamp bool
|
||||||
zipCompression bool
|
archiveCompression bool
|
||||||
clientCut bool
|
clientCut bool
|
||||||
returnIndexPage bool
|
returnIndexPage bool
|
||||||
indexPageTemplate string
|
indexPageTemplate string
|
||||||
|
@ -178,7 +178,7 @@ func (a *app) initAppSettings() {
|
||||||
|
|
||||||
func (s *appSettings) update(v *viper.Viper, l *zap.Logger) {
|
func (s *appSettings) update(v *viper.Viper, l *zap.Logger) {
|
||||||
defaultTimestamp := v.GetBool(cfgUploaderHeaderEnableDefaultTimestamp)
|
defaultTimestamp := v.GetBool(cfgUploaderHeaderEnableDefaultTimestamp)
|
||||||
zipCompression := v.GetBool(cfgZipCompression)
|
archiveCompression := fetchArchiveCompression(v)
|
||||||
returnIndexPage := v.GetBool(cfgIndexPageEnabled)
|
returnIndexPage := v.GetBool(cfgIndexPageEnabled)
|
||||||
clientCut := v.GetBool(cfgClientCut)
|
clientCut := v.GetBool(cfgClientCut)
|
||||||
bufferMaxSizeForPut := v.GetUint64(cfgBufferMaxSizeForPut)
|
bufferMaxSizeForPut := v.GetUint64(cfgBufferMaxSizeForPut)
|
||||||
|
@ -197,7 +197,7 @@ func (s *appSettings) update(v *viper.Viper, l *zap.Logger) {
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
s.defaultTimestamp = defaultTimestamp
|
s.defaultTimestamp = defaultTimestamp
|
||||||
s.zipCompression = zipCompression
|
s.archiveCompression = archiveCompression
|
||||||
s.returnIndexPage = returnIndexPage
|
s.returnIndexPage = returnIndexPage
|
||||||
s.clientCut = clientCut
|
s.clientCut = clientCut
|
||||||
s.bufferMaxSizeForPut = bufferMaxSizeForPut
|
s.bufferMaxSizeForPut = bufferMaxSizeForPut
|
||||||
|
@ -236,10 +236,10 @@ func (s *appSettings) DefaultTimestamp() bool {
|
||||||
return s.defaultTimestamp
|
return s.defaultTimestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *appSettings) ZipCompression() bool {
|
func (s *appSettings) ArchiveCompression() bool {
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
defer s.mu.RUnlock()
|
defer s.mu.RUnlock()
|
||||||
return s.zipCompression
|
return s.archiveCompression
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *appSettings) IndexPageEnabled() bool {
|
func (s *appSettings) IndexPageEnabled() bool {
|
||||||
|
@ -656,8 +656,10 @@ func (a *app) configureRouter(h *handler.Handler) {
|
||||||
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(h.HeadByAttribute))
|
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(h.HeadByAttribute))
|
||||||
r.OPTIONS("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addPreflight())
|
r.OPTIONS("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addPreflight())
|
||||||
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal)
|
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal)
|
||||||
r.GET("/zip/{cid}/{prefix:*}", a.addMiddlewares(h.DownloadZipped))
|
r.GET("/zip/{cid}/{prefix:*}", a.addMiddlewares(h.DownloadZip))
|
||||||
r.OPTIONS("/zip/{cid}/{prefix:*}", a.addPreflight())
|
r.OPTIONS("/zip/{cid}/{prefix:*}", a.addPreflight())
|
||||||
|
r.GET("/tar/{cid}/{prefix:*}", a.addMiddlewares(h.DownloadTar))
|
||||||
|
r.OPTIONS("/tar/{cid}/{prefix:*}", a.addPreflight())
|
||||||
a.log.Info(logs.AddedPathZipCidPrefix)
|
a.log.Info(logs.AddedPathZipCidPrefix)
|
||||||
|
|
||||||
a.webServer.Handler = r.Handler
|
a.webServer.Handler = r.Handler
|
||||||
|
|
|
@ -128,8 +128,13 @@ const (
|
||||||
cfgResolveOrder = "resolve_order"
|
cfgResolveOrder = "resolve_order"
|
||||||
|
|
||||||
// Zip compression.
|
// Zip compression.
|
||||||
|
//
|
||||||
|
// Deprecated: Use cfgArchiveCompression instead.
|
||||||
cfgZipCompression = "zip.compression"
|
cfgZipCompression = "zip.compression"
|
||||||
|
|
||||||
|
// Archive compression.
|
||||||
|
cfgArchiveCompression = "archive.compression"
|
||||||
|
|
||||||
// Runtime.
|
// Runtime.
|
||||||
cfgSoftMemoryLimit = "runtime.soft_memory_limit"
|
cfgSoftMemoryLimit = "runtime.soft_memory_limit"
|
||||||
|
|
||||||
|
@ -255,9 +260,6 @@ func settings() *viper.Viper {
|
||||||
// upload header
|
// upload header
|
||||||
v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false)
|
v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false)
|
||||||
|
|
||||||
// zip:
|
|
||||||
v.SetDefault(cfgZipCompression, false)
|
|
||||||
|
|
||||||
// metrics
|
// metrics
|
||||||
v.SetDefault(cfgPprofAddress, "localhost:8083")
|
v.SetDefault(cfgPprofAddress, "localhost:8083")
|
||||||
v.SetDefault(cfgPrometheusAddress, "localhost:8084")
|
v.SetDefault(cfgPrometheusAddress, "localhost:8084")
|
||||||
|
@ -844,3 +846,10 @@ func fetchTracingAttributes(v *viper.Viper) (map[string]string, error) {
|
||||||
|
|
||||||
return attributes, nil
|
return attributes, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func fetchArchiveCompression(v *viper.Viper) bool {
|
||||||
|
if v.IsSet(cfgZipCompression) {
|
||||||
|
return v.GetBool(cfgZipCompression)
|
||||||
|
}
|
||||||
|
return v.GetBool(cfgArchiveCompression)
|
||||||
|
}
|
||||||
|
|
|
@ -97,9 +97,13 @@ HTTP_GW_REBALANCE_TIMER=30s
|
||||||
# The number of errors on connection after which node is considered as unhealthy
|
# The number of errors on connection after which node is considered as unhealthy
|
||||||
HTTP_GW_POOL_ERROR_THRESHOLD=100
|
HTTP_GW_POOL_ERROR_THRESHOLD=100
|
||||||
|
|
||||||
# Enable zip compression to download files by common prefix.
|
# Enable archive compression to download files by common prefix.
|
||||||
|
# DEPRECATED: Use HTTP_GW_ARCHIVE_COMPRESSION instead.
|
||||||
HTTP_GW_ZIP_COMPRESSION=false
|
HTTP_GW_ZIP_COMPRESSION=false
|
||||||
|
|
||||||
|
# Enable archive compression to download files by common prefix.
|
||||||
|
HTTP_GW_ARCHIVE_COMPRESSION=false
|
||||||
|
|
||||||
HTTP_GW_TRACING_ENABLED=true
|
HTTP_GW_TRACING_ENABLED=true
|
||||||
HTTP_GW_TRACING_ENDPOINT="localhost:4317"
|
HTTP_GW_TRACING_ENDPOINT="localhost:4317"
|
||||||
HTTP_GW_TRACING_EXPORTER="otlp_grpc"
|
HTTP_GW_TRACING_EXPORTER="otlp_grpc"
|
||||||
|
|
|
@ -116,13 +116,19 @@ pool_error_threshold: 100 # The number of errors on connection after which node
|
||||||
# Number of workers in handler's worker pool
|
# Number of workers in handler's worker pool
|
||||||
worker_pool_size: 1000
|
worker_pool_size: 1000
|
||||||
|
|
||||||
# Enable index page to see objects list for specified container and prefix
|
# Enables index page to see objects list for specified container and prefix
|
||||||
index_page:
|
index_page:
|
||||||
enabled: false
|
enabled: false
|
||||||
template_path: internal/handler/templates/index.gotmpl
|
template_path: internal/handler/templates/index.gotmpl
|
||||||
|
|
||||||
|
# Deprecated: Use archive.compression instead
|
||||||
zip:
|
zip:
|
||||||
compression: false # Enable zip compression to download files by common prefix.
|
# Enables zip compression to download files by common prefix.
|
||||||
|
compression: false
|
||||||
|
|
||||||
|
archive:
|
||||||
|
# Enables archive compression to download files by common prefix.
|
||||||
|
compression: false
|
||||||
|
|
||||||
runtime:
|
runtime:
|
||||||
soft_memory_limit: 1gb
|
soft_memory_limit: 1gb
|
||||||
|
|
|
@ -218,9 +218,10 @@ upload_header:
|
||||||
|-------------------------|--------|---------------|---------------|-------------------------------------------------------------|
|
|-------------------------|--------|---------------|---------------|-------------------------------------------------------------|
|
||||||
| `use_default_timestamp` | `bool` | yes | `false` | Create timestamp for object if it isn't provided by header. |
|
| `use_default_timestamp` | `bool` | yes | `false` | Create timestamp for object if it isn't provided by header. |
|
||||||
|
|
||||||
|
|
||||||
# `zip` section
|
# `zip` section
|
||||||
|
|
||||||
|
> **_DEPRECATED:_** Use archive section instead
|
||||||
|
|
||||||
```yaml
|
```yaml
|
||||||
zip:
|
zip:
|
||||||
compression: false
|
compression: false
|
||||||
|
@ -230,6 +231,17 @@ zip:
|
||||||
|---------------|--------|---------------|---------------|--------------------------------------------------------------|
|
|---------------|--------|---------------|---------------|--------------------------------------------------------------|
|
||||||
| `compression` | `bool` | yes | `false` | Enable zip compression when download files by common prefix. |
|
| `compression` | `bool` | yes | `false` | Enable zip compression when download files by common prefix. |
|
||||||
|
|
||||||
|
# `archive` section
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
archive:
|
||||||
|
compression: false
|
||||||
|
```
|
||||||
|
|
||||||
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|
|---------------|--------|---------------|---------------|------------------------------------------------------------------|
|
||||||
|
| `compression` | `bool` | yes | `false` | Enable archive compression when download files by common prefix. |
|
||||||
|
|
||||||
|
|
||||||
# `pprof` section
|
# `pprof` section
|
||||||
|
|
||||||
|
|
|
@ -1,20 +1,21 @@
|
||||||
package handler
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"archive/tar"
|
||||||
"archive/zip"
|
"archive/zip"
|
||||||
"bufio"
|
"bufio"
|
||||||
|
"compress/gzip"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -46,7 +47,7 @@ func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
req := h.newRequest(c, log)
|
req := newRequest(c, log)
|
||||||
|
|
||||||
var objID oid.ID
|
var objID oid.ID
|
||||||
if checkS3Err == nil && shouldDownload(oidParam, downloadParam) {
|
if checkS3Err == nil && shouldDownload(oidParam, downloadParam) {
|
||||||
|
@ -62,13 +63,6 @@ func shouldDownload(oidParam string, downloadParam bool) bool {
|
||||||
return !isDir(oidParam) || downloadParam
|
return !isDir(oidParam) || downloadParam
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) request {
|
|
||||||
return request{
|
|
||||||
RequestCtx: ctx,
|
|
||||||
log: log,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// DownloadByAttribute handles attribute-based download requests.
|
// DownloadByAttribute handles attribute-based download requests.
|
||||||
func (h *Handler) DownloadByAttribute(c *fasthttp.RequestCtx) {
|
func (h *Handler) DownloadByAttribute(c *fasthttp.RequestCtx) {
|
||||||
h.byAttribute(c, h.receiveFile)
|
h.byAttribute(c, h.receiveFile)
|
||||||
|
@ -90,13 +84,61 @@ func (h *Handler) search(ctx context.Context, cnrID cid.ID, key, val string, op
|
||||||
return h.frostfs.SearchObjects(ctx, prm)
|
return h.frostfs.SearchObjects(ctx, prm)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer, error) {
|
// DownloadZip handles zip by prefix requests.
|
||||||
|
func (h *Handler) DownloadZip(c *fasthttp.RequestCtx) {
|
||||||
|
scid, _ := c.UserValue("cid").(string)
|
||||||
|
|
||||||
|
ctx := utils.GetContextFromRequest(c)
|
||||||
|
log := utils.GetReqLogOrDefault(ctx, h.log)
|
||||||
|
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
||||||
|
if err != nil {
|
||||||
|
logAndSendBucketError(c, log, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
resSearch, err := h.searchObjectsByPrefix(c, log, bktInfo.CID)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
c.Response.Header.Set(fasthttp.HeaderContentType, "application/zip")
|
||||||
|
c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"")
|
||||||
|
|
||||||
|
c.SetBodyStreamWriter(h.getZipResponseWriter(ctx, log, resSearch, bktInfo))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) getZipResponseWriter(ctx context.Context, log *zap.Logger, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) {
|
||||||
|
return func(w *bufio.Writer) {
|
||||||
|
defer resSearch.Close()
|
||||||
|
|
||||||
|
buf := make([]byte, 3<<20)
|
||||||
|
zipWriter := zip.NewWriter(w)
|
||||||
|
var objectsWritten int
|
||||||
|
|
||||||
|
errIter := resSearch.Iterate(h.putObjectToArchive(ctx, log, bktInfo.CID, buf,
|
||||||
|
func(obj *object.Object) (io.Writer, error) {
|
||||||
|
objectsWritten++
|
||||||
|
return h.createZipFile(zipWriter, obj)
|
||||||
|
}),
|
||||||
|
)
|
||||||
|
if errIter != nil {
|
||||||
|
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter))
|
||||||
|
return
|
||||||
|
} else if objectsWritten == 0 {
|
||||||
|
log.Warn(logs.ObjectsNotFound)
|
||||||
|
}
|
||||||
|
if err := zipWriter.Close(); err != nil {
|
||||||
|
log.Error(logs.CloseZipWriter, zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) createZipFile(zw *zip.Writer, obj *object.Object) (io.Writer, error) {
|
||||||
method := zip.Store
|
method := zip.Store
|
||||||
if h.config.ZipCompression() {
|
if h.config.ZipCompression() {
|
||||||
method = zip.Deflate
|
method = zip.Deflate
|
||||||
}
|
}
|
||||||
|
|
||||||
filePath := getZipFilePath(obj)
|
filePath := getFilePath(obj)
|
||||||
if len(filePath) == 0 || filePath[len(filePath)-1] == '/' {
|
if len(filePath) == 0 || filePath[len(filePath)-1] == '/' {
|
||||||
return nil, fmt.Errorf("invalid filepath '%s'", filePath)
|
return nil, fmt.Errorf("invalid filepath '%s'", filePath)
|
||||||
}
|
}
|
||||||
|
@ -108,99 +150,139 @@ func (h *Handler) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// DownloadZipped handles zip by prefix requests.
|
// DownloadTar forms tar.gz from objects by prefix.
|
||||||
func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
|
func (h *Handler) DownloadTar(c *fasthttp.RequestCtx) {
|
||||||
scid, _ := c.UserValue("cid").(string)
|
scid, _ := c.UserValue("cid").(string)
|
||||||
prefix, _ := c.UserValue("prefix").(string)
|
|
||||||
|
|
||||||
ctx := utils.GetContextFromRequest(c)
|
ctx := utils.GetContextFromRequest(c)
|
||||||
log := utils.GetReqLogOrDefault(ctx, h.log)
|
log := utils.GetReqLogOrDefault(ctx, h.log)
|
||||||
|
|
||||||
prefix, err := url.QueryUnescape(prefix)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("prefix", prefix), zap.Error(err))
|
|
||||||
ResponseError(c, "could not unescape prefix: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
log = log.With(zap.String("cid", scid), zap.String("prefix", prefix))
|
|
||||||
|
|
||||||
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logAndSendBucketError(c, log, err)
|
logAndSendBucketError(c, log, err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
resSearch, err := h.searchObjectsByPrefix(c, log, bktInfo.CID)
|
||||||
resSearch, err := h.search(ctx, bktInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
|
||||||
ResponseError(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
c.Response.Header.Set(fasthttp.HeaderContentType, "application/zip")
|
c.Response.Header.Set(fasthttp.HeaderContentType, "application/gzip")
|
||||||
c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"")
|
c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.tar.gz\"")
|
||||||
c.Response.SetStatusCode(http.StatusOK)
|
|
||||||
|
|
||||||
c.SetBodyStreamWriter(func(w *bufio.Writer) {
|
c.SetBodyStreamWriter(h.getTarResponseWriter(ctx, log, resSearch, bktInfo))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) getTarResponseWriter(ctx context.Context, log *zap.Logger, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) {
|
||||||
|
return func(w *bufio.Writer) {
|
||||||
defer resSearch.Close()
|
defer resSearch.Close()
|
||||||
|
|
||||||
zipWriter := zip.NewWriter(w)
|
compressionLevel := gzip.NoCompression
|
||||||
|
if h.config.ZipCompression() {
|
||||||
|
compressionLevel = gzip.DefaultCompression
|
||||||
|
}
|
||||||
|
|
||||||
var bufZip []byte
|
// ignore error because it's not nil only if compressionLevel argument is invalid
|
||||||
var addr oid.Address
|
gzipWriter, _ := gzip.NewWriterLevel(w, compressionLevel)
|
||||||
|
tarWriter := tar.NewWriter(gzipWriter)
|
||||||
|
|
||||||
empty := true
|
defer func() {
|
||||||
called := false
|
if err := tarWriter.Close(); err != nil {
|
||||||
btoken := bearerToken(ctx)
|
log.Error(logs.CloseTarWriter, zap.Error(err))
|
||||||
addr.SetContainer(bktInfo.CID)
|
|
||||||
|
|
||||||
errIter := resSearch.Iterate(func(id oid.ID) bool {
|
|
||||||
called = true
|
|
||||||
|
|
||||||
if empty {
|
|
||||||
bufZip = make([]byte, 3<<20) // the same as for upload
|
|
||||||
}
|
}
|
||||||
empty = false
|
if err := gzipWriter.Close(); err != nil {
|
||||||
|
log.Error(logs.CloseGzipWriter, zap.Error(err))
|
||||||
addr.SetObject(id)
|
|
||||||
if err = h.zipObject(ctx, zipWriter, addr, btoken, bufZip); err != nil {
|
|
||||||
log.Error(logs.FailedToAddObjectToArchive, zap.String("oid", id.EncodeToString()), zap.Error(err))
|
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return false
|
var objectsWritten int
|
||||||
})
|
buf := make([]byte, 3<<20) // the same as for upload
|
||||||
|
|
||||||
|
errIter := resSearch.Iterate(h.putObjectToArchive(ctx, log, bktInfo.CID, buf,
|
||||||
|
func(obj *object.Object) (io.Writer, error) {
|
||||||
|
objectsWritten++
|
||||||
|
return h.createTarFile(tarWriter, obj)
|
||||||
|
}),
|
||||||
|
)
|
||||||
if errIter != nil {
|
if errIter != nil {
|
||||||
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter))
|
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter))
|
||||||
} else if !called {
|
} else if objectsWritten == 0 {
|
||||||
log.Error(logs.ObjectsNotFound)
|
log.Warn(logs.ObjectsNotFound)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if err = zipWriter.Close(); err != nil {
|
func (h *Handler) createTarFile(tw *tar.Writer, obj *object.Object) (io.Writer, error) {
|
||||||
log.Error(logs.CloseZipWriter, zap.Error(err))
|
filePath := getFilePath(obj)
|
||||||
}
|
if len(filePath) == 0 || filePath[len(filePath)-1] == '/' {
|
||||||
|
return nil, fmt.Errorf("invalid filepath '%s'", filePath)
|
||||||
|
}
|
||||||
|
|
||||||
|
return tw, tw.WriteHeader(&tar.Header{
|
||||||
|
Name: filePath,
|
||||||
|
Mode: 0655,
|
||||||
|
Size: int64(obj.PayloadSize()),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error {
|
func (h *Handler) putObjectToArchive(ctx context.Context, log *zap.Logger, cnrID cid.ID, buf []byte, createArchiveHeader func(obj *object.Object) (io.Writer, error)) func(id oid.ID) bool {
|
||||||
prm := PrmObjectGet{
|
return func(id oid.ID) bool {
|
||||||
PrmAuth: PrmAuth{
|
log = log.With(zap.String("oid", id.EncodeToString()))
|
||||||
BearerToken: btoken,
|
|
||||||
},
|
|
||||||
Address: addr,
|
|
||||||
}
|
|
||||||
|
|
||||||
resGet, err := h.frostfs.GetObject(ctx, prm)
|
prm := PrmObjectGet{
|
||||||
|
PrmAuth: PrmAuth{
|
||||||
|
BearerToken: bearerToken(ctx),
|
||||||
|
},
|
||||||
|
Address: newAddress(cnrID, id),
|
||||||
|
}
|
||||||
|
|
||||||
|
resGet, err := h.frostfs.GetObject(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(logs.FailedToGetObject, zap.Error(err))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
fileWriter, err := createArchiveHeader(&resGet.Header)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(logs.FailedToAddObjectToArchive, zap.Error(err))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = writeToArchive(resGet, fileWriter, buf); err != nil {
|
||||||
|
log.Error(logs.FailedToAddObjectToArchive, zap.Error(err))
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) searchObjectsByPrefix(c *fasthttp.RequestCtx, log *zap.Logger, cnrID cid.ID) (ResObjectSearch, error) {
|
||||||
|
scid := cnrID.EncodeToString()
|
||||||
|
prefix, _ := c.UserValue("prefix").(string)
|
||||||
|
|
||||||
|
ctx := utils.GetContextFromRequest(c)
|
||||||
|
|
||||||
|
prefix, err := url.QueryUnescape(prefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get FrostFS object: %v", err)
|
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("prefix", prefix), zap.Error(err))
|
||||||
|
ResponseError(c, "could not unescape prefix: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
objWriter, err := h.addObjectToZip(zipWriter, &resGet.Header)
|
log = log.With(zap.String("cid", scid), zap.String("prefix", prefix))
|
||||||
|
|
||||||
|
resSearch, err := h.search(ctx, cnrID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("zip create header: %v", err)
|
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
||||||
|
ResponseError(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
return resSearch, nil
|
||||||
|
}
|
||||||
|
|
||||||
if _, err = io.CopyBuffer(objWriter, resGet.Payload, bufZip); err != nil {
|
func writeToArchive(resGet *Object, objWriter io.Writer, buf []byte) error {
|
||||||
|
var err error
|
||||||
|
if _, err = io.CopyBuffer(objWriter, resGet.Payload, buf); err != nil {
|
||||||
return fmt.Errorf("copy object payload to zip file: %v", err)
|
return fmt.Errorf("copy object payload to zip file: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -208,14 +290,10 @@ func (h *Handler) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid
|
||||||
return fmt.Errorf("object body close error: %w", err)
|
return fmt.Errorf("object body close error: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = zipWriter.Flush(); err != nil {
|
|
||||||
return fmt.Errorf("flush zip writer: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getZipFilePath(obj *object.Object) string {
|
func getFilePath(obj *object.Object) string {
|
||||||
for _, attr := range obj.Attributes() {
|
for _, attr := range obj.Attributes() {
|
||||||
if attr.Key() == object.AttributeFilePath {
|
if attr.Key() == object.AttributeFilePath {
|
||||||
return attr.Value()
|
return attr.Value()
|
||||||
|
|
|
@ -216,7 +216,7 @@ func (h *Handler) byS3Path(ctx context.Context, req request, cnrID cid.ID, path
|
||||||
}
|
}
|
||||||
|
|
||||||
addr := newAddress(cnrID, foundOID.OID)
|
addr := newAddress(cnrID, foundOID.OID)
|
||||||
handler(ctx, h.newRequest(c, log), addr)
|
handler(ctx, newRequest(c, log), addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// byAttribute is a wrapper similar to byNativeAddress.
|
// byAttribute is a wrapper similar to byNativeAddress.
|
||||||
|
@ -265,7 +265,7 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, handler func(context.Conte
|
||||||
addr.SetContainer(bktInfo.CID)
|
addr.SetContainer(bktInfo.CID)
|
||||||
addr.SetObject(objID)
|
addr.SetObject(objID)
|
||||||
|
|
||||||
handler(ctx, h.newRequest(c, log), addr)
|
handler(ctx, newRequest(c, log), addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) findObjectByAttribute(ctx context.Context, log *zap.Logger, cnrID cid.ID, attrKey, attrVal string) (oid.ID, error) {
|
func (h *Handler) findObjectByAttribute(ctx context.Context, log *zap.Logger, cnrID cid.ID, attrKey, attrVal string) (oid.ID, error) {
|
||||||
|
|
|
@ -517,7 +517,7 @@ func DoFuzzDownloadZipped(input []byte) int {
|
||||||
r.SetUserValue("cid", cid)
|
r.SetUserValue("cid", cid)
|
||||||
r.SetUserValue("prefix", prefix)
|
r.SetUserValue("prefix", prefix)
|
||||||
|
|
||||||
hc.Handler().DownloadZipped(r)
|
hc.Handler().DownloadZip(r)
|
||||||
|
|
||||||
return fuzzSuccessExitCode
|
return fuzzSuccessExitCode
|
||||||
}
|
}
|
||||||
|
|
|
@ -250,7 +250,7 @@ func TestBasic(t *testing.T) {
|
||||||
|
|
||||||
t.Run("zip", func(t *testing.T) {
|
t.Run("zip", func(t *testing.T) {
|
||||||
r = prepareGetZipped(ctx, bktName, "")
|
r = prepareGetZipped(ctx, bktName, "")
|
||||||
hc.Handler().DownloadZipped(r)
|
hc.Handler().DownloadZip(r)
|
||||||
|
|
||||||
readerAt := bytes.NewReader(r.Response.Body())
|
readerAt := bytes.NewReader(r.Response.Body())
|
||||||
zipReader, err := zip.NewReader(readerAt, int64(len(r.Response.Body())))
|
zipReader, err := zip.NewReader(readerAt, int64(len(r.Response.Body())))
|
||||||
|
|
|
@ -135,7 +135,7 @@ func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
req := h.newRequest(c, log)
|
req := newRequest(c, log)
|
||||||
|
|
||||||
var objID oid.ID
|
var objID oid.ID
|
||||||
if checkS3Err == nil {
|
if checkS3Err == nil {
|
||||||
|
|
|
@ -24,6 +24,13 @@ type request struct {
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) request {
|
||||||
|
return request{
|
||||||
|
RequestCtx: ctx,
|
||||||
|
log: log,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (r *request) handleFrostFSErr(err error, start time.Time) {
|
func (r *request) handleFrostFSErr(err error, start time.Time) {
|
||||||
logFields := []zap.Field{
|
logFields := []zap.Field{
|
||||||
zap.Stringer("elapsed", time.Since(start)),
|
zap.Stringer("elapsed", time.Since(start)),
|
||||||
|
|
|
@ -11,9 +11,12 @@ const (
|
||||||
ObjectNotFound = "object not found" // Error in ../../downloader/download.go
|
ObjectNotFound = "object not found" // Error in ../../downloader/download.go
|
||||||
ReadObjectListFailed = "read object list failed" // Error in ../../downloader/download.go
|
ReadObjectListFailed = "read object list failed" // Error in ../../downloader/download.go
|
||||||
FailedToAddObjectToArchive = "failed to add object to archive" // Error in ../../downloader/download.go
|
FailedToAddObjectToArchive = "failed to add object to archive" // Error in ../../downloader/download.go
|
||||||
|
FailedToGetObject = "failed to get object" // Error in ../../downloader/download.go
|
||||||
IteratingOverSelectedObjectsFailed = "iterating over selected objects failed" // Error in ../../downloader/download.go
|
IteratingOverSelectedObjectsFailed = "iterating over selected objects failed" // Error in ../../downloader/download.go
|
||||||
ObjectsNotFound = "objects not found" // Error in ../../downloader/download.go
|
ObjectsNotFound = "objects not found" // Error in ../../downloader/download.go
|
||||||
CloseZipWriter = "close zip writer" // Error in ../../downloader/download.go
|
CloseZipWriter = "close zip writer" // Error in ../../downloader/download.go
|
||||||
|
CloseGzipWriter = "close gzip writer" // Error in ../../downloader/download.go
|
||||||
|
CloseTarWriter = "close tar writer" // Error in ../../downloader/download.go
|
||||||
ServiceIsRunning = "service is running" // Info in ../../metrics/service.go
|
ServiceIsRunning = "service is running" // Info in ../../metrics/service.go
|
||||||
ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port" // Warn in ../../metrics/service.go
|
ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port" // Warn in ../../metrics/service.go
|
||||||
ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled" // Info in ../../metrics/service.go
|
ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled" // Info in ../../metrics/service.go
|
||||||
|
@ -24,7 +27,6 @@ const (
|
||||||
IgnorePartEmptyFilename = "ignore part, empty filename" // Debug in ../../uploader/upload.go
|
IgnorePartEmptyFilename = "ignore part, empty filename" // Debug in ../../uploader/upload.go
|
||||||
CloseTemporaryMultipartFormFile = "close temporary multipart/form file" // Debug in ../../uploader/upload.go
|
CloseTemporaryMultipartFormFile = "close temporary multipart/form file" // Debug in ../../uploader/upload.go
|
||||||
CouldNotReceiveMultipartForm = "could not receive multipart/form" // Error in ../../uploader/upload.go
|
CouldNotReceiveMultipartForm = "could not receive multipart/form" // Error in ../../uploader/upload.go
|
||||||
CouldNotProcessHeaders = "could not process headers" // Error in ../../uploader/upload.go
|
|
||||||
CouldNotParseClientTime = "could not parse client time" // Warn in ../../uploader/upload.go
|
CouldNotParseClientTime = "could not parse client time" // Warn in ../../uploader/upload.go
|
||||||
CouldNotPrepareExpirationHeader = "could not prepare expiration header" // Error in ../../uploader/upload.go
|
CouldNotPrepareExpirationHeader = "could not prepare expiration header" // Error in ../../uploader/upload.go
|
||||||
CouldNotEncodeResponse = "could not encode response" // Error in ../../uploader/upload.go
|
CouldNotEncodeResponse = "could not encode response" // Error in ../../uploader/upload.go
|
||||||
|
|
Loading…
Add table
Reference in a new issue