forked from TrueCloudLab/frostfs-http-gw
Compare commits
3 commits
d2a37a17c4
...
78601fef8a
Author | SHA1 | Date | |
---|---|---|---|
78601fef8a | |||
8fe8f2dcc2 | |||
77eb474581 |
23 changed files with 892 additions and 138 deletions
|
@ -3,7 +3,10 @@
|
|||
This document outlines major changes between releases.
|
||||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
- Support percent-encoding for GET queries (#134)
|
||||
- Add `trace_id` to logs (#152)
|
||||
|
||||
### Changed
|
||||
- Update go version to 1.22 (#132)
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
|
@ -23,6 +24,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/templates"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
|
@ -41,6 +43,7 @@ import (
|
|||
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
@ -91,6 +94,8 @@ type (
|
|||
defaultTimestamp bool
|
||||
zipCompression bool
|
||||
clientCut bool
|
||||
returnIndexPage bool
|
||||
indexPageTemplate string
|
||||
bufferMaxSizeForPut uint64
|
||||
namespaceHeader string
|
||||
defaultNamespaces []string
|
||||
|
@ -155,6 +160,7 @@ func newApp(ctx context.Context, opt ...Option) App {
|
|||
a.initResolver()
|
||||
a.initMetrics()
|
||||
a.initTracing(ctx)
|
||||
a.loadIndexPageTemplate()
|
||||
|
||||
return a
|
||||
}
|
||||
|
@ -177,12 +183,59 @@ func (s *appSettings) ZipCompression() bool {
|
|||
return s.zipCompression
|
||||
}
|
||||
|
||||
func (s *appSettings) IndexPageEnabled() bool {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.returnIndexPage
|
||||
}
|
||||
|
||||
func (s *appSettings) IndexPageTemplate() string {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
if s.indexPageTemplate == "" {
|
||||
return templates.DefaultIndexTemplate
|
||||
}
|
||||
return s.indexPageTemplate
|
||||
}
|
||||
|
||||
func (s *appSettings) setZipCompression(val bool) {
|
||||
s.mu.Lock()
|
||||
s.zipCompression = val
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *appSettings) setReturnIndexPage(val bool) {
|
||||
s.mu.Lock()
|
||||
s.returnIndexPage = val
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *appSettings) setIndexTemplate(val string) {
|
||||
s.mu.Lock()
|
||||
s.indexPageTemplate = val
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (a *app) loadIndexPageTemplate() {
|
||||
if !a.settings.IndexPageEnabled() {
|
||||
return
|
||||
}
|
||||
reader, err := os.Open(a.cfg.GetString(cfgIndexPageTemplatePath))
|
||||
if err != nil {
|
||||
a.settings.setIndexTemplate("")
|
||||
a.log.Warn(logs.FailedToReadIndexPageTemplate, zap.Error(err))
|
||||
return
|
||||
}
|
||||
tmpl, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
a.settings.setIndexTemplate("")
|
||||
a.log.Warn(logs.FailedToReadIndexPageTemplate, zap.Error(err))
|
||||
return
|
||||
}
|
||||
a.settings.setIndexTemplate(string(tmpl))
|
||||
a.log.Info(logs.SetCustomIndexPageTemplate)
|
||||
}
|
||||
|
||||
func (s *appSettings) ClientCut() bool {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
@ -491,6 +544,7 @@ func (a *app) configReload(ctx context.Context) {
|
|||
|
||||
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
|
||||
a.initTracing(ctx)
|
||||
a.loadIndexPageTemplate()
|
||||
a.setHealthStatus()
|
||||
|
||||
a.log.Info(logs.SIGHUPConfigReloadCompleted)
|
||||
|
@ -499,6 +553,7 @@ func (a *app) configReload(ctx context.Context) {
|
|||
func (a *app) updateSettings() {
|
||||
a.settings.setDefaultTimestamp(a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp))
|
||||
a.settings.setZipCompression(a.cfg.GetBool(cfgZipCompression))
|
||||
a.settings.setReturnIndexPage(a.cfg.GetBool(cfgIndexPageEnabled))
|
||||
a.settings.setClientCut(a.cfg.GetBool(cfgClientCut))
|
||||
a.settings.setBufferMaxSizeForPut(a.cfg.GetUint64(cfgBufferMaxSizeForPut))
|
||||
a.settings.setNamespaceHeader(a.cfg.GetString(cfgResolveNamespaceHeader))
|
||||
|
@ -536,15 +591,15 @@ func (a *app) configureRouter(handler *handler.Handler) {
|
|||
response.Error(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)
|
||||
}
|
||||
|
||||
r.POST("/upload/{cid}", a.logger(a.canonicalizer(a.tokenizer(a.tracer(a.reqNamespace(handler.Upload))))))
|
||||
r.POST("/upload/{cid}", a.tracer(a.logger(a.canonicalizer(a.tokenizer(a.reqNamespace(handler.Upload))))))
|
||||
a.log.Info(logs.AddedPathUploadCid)
|
||||
r.GET("/get/{cid}/{oid:*}", a.logger(a.canonicalizer(a.tokenizer(a.tracer(a.reqNamespace(handler.DownloadByAddressOrBucketName))))))
|
||||
r.HEAD("/get/{cid}/{oid:*}", a.logger(a.canonicalizer(a.tokenizer(a.tracer(a.reqNamespace(handler.HeadByAddressOrBucketName))))))
|
||||
r.GET("/get/{cid}/{oid:*}", a.tracer(a.logger(a.canonicalizer(a.tokenizer(a.reqNamespace(handler.DownloadByAddressOrBucketName))))))
|
||||
r.HEAD("/get/{cid}/{oid:*}", a.tracer(a.logger(a.canonicalizer(a.tokenizer(a.reqNamespace(handler.HeadByAddressOrBucketName))))))
|
||||
a.log.Info(logs.AddedPathGetCidOid)
|
||||
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.canonicalizer(a.tokenizer(a.tracer(a.reqNamespace(handler.DownloadByAttribute))))))
|
||||
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.canonicalizer(a.tokenizer(a.tracer(a.reqNamespace(handler.HeadByAttribute))))))
|
||||
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.tracer(a.logger(a.canonicalizer(a.tokenizer(a.reqNamespace(handler.DownloadByAttribute))))))
|
||||
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.tracer(a.logger(a.canonicalizer(a.tokenizer(a.reqNamespace(handler.HeadByAttribute))))))
|
||||
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal)
|
||||
r.GET("/zip/{cid}/{prefix:*}", a.logger(a.canonicalizer(a.tokenizer(a.tracer(a.reqNamespace(handler.DownloadZipped))))))
|
||||
r.GET("/zip/{cid}/{prefix:*}", a.tracer(a.logger(a.canonicalizer(a.tokenizer(a.reqNamespace(handler.DownloadZipped))))))
|
||||
a.log.Info(logs.AddedPathZipCidPrefix)
|
||||
|
||||
a.webServer.Handler = r.Handler
|
||||
|
@ -552,11 +607,20 @@ func (a *app) configureRouter(handler *handler.Handler) {
|
|||
|
||||
func (a *app) logger(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
||||
return func(req *fasthttp.RequestCtx) {
|
||||
a.log.Info(logs.Request, zap.String("remote", req.RemoteAddr().String()),
|
||||
fields := []zap.Field{
|
||||
zap.String("remote", req.RemoteAddr().String()),
|
||||
zap.ByteString("method", req.Method()),
|
||||
zap.ByteString("path", req.Path()),
|
||||
zap.ByteString("query", req.QueryArgs().QueryString()),
|
||||
zap.Uint64("id", req.ID()))
|
||||
zap.Uint64("id", req.ID()),
|
||||
}
|
||||
|
||||
appCtx := utils.GetContextFromRequest(req)
|
||||
if traceID := trace.SpanFromContext(appCtx).SpanContext().TraceID(); traceID.IsValid() {
|
||||
fields = append(fields, zap.String("trace_id", traceID.String()))
|
||||
}
|
||||
|
||||
a.log.Info(logs.Request, fields...)
|
||||
h(req)
|
||||
}
|
||||
}
|
||||
|
@ -595,9 +659,19 @@ func (a *app) canonicalizer(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
|||
|
||||
func (a *app) tokenizer(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
||||
return func(req *fasthttp.RequestCtx) {
|
||||
appCtx, err := tokens.StoreBearerTokenAppCtx(a.ctx, req)
|
||||
appCtx, err := tokens.StoreBearerTokenAppCtx(utils.GetContextFromRequest(req), req)
|
||||
if err != nil {
|
||||
a.log.Error(logs.CouldNotFetchAndStoreBearerToken, zap.Uint64("id", req.ID()), zap.Error(err))
|
||||
fields := []zap.Field{
|
||||
zap.Uint64("id", req.ID()),
|
||||
zap.Error(err),
|
||||
}
|
||||
|
||||
reqCtx := utils.GetContextFromRequest(req)
|
||||
if traceID := trace.SpanFromContext(reqCtx).SpanContext().TraceID(); traceID.IsValid() {
|
||||
fields = append(fields, zap.String("trace_id", traceID.String()))
|
||||
}
|
||||
|
||||
a.log.Error(logs.CouldNotFetchAndStoreBearerToken, fields...)
|
||||
response.Error(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
@ -608,9 +682,7 @@ func (a *app) tokenizer(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
|||
|
||||
func (a *app) tracer(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
||||
return func(req *fasthttp.RequestCtx) {
|
||||
appCtx := utils.GetContextFromRequest(req)
|
||||
|
||||
appCtx, span := utils.StartHTTPServerSpan(appCtx, req, "REQUEST")
|
||||
appCtx, span := utils.StartHTTPServerSpan(a.ctx, req, "REQUEST")
|
||||
defer func() {
|
||||
utils.SetHTTPTraceInfo(appCtx, span, req)
|
||||
span.End()
|
||||
|
|
|
@ -41,6 +41,8 @@ const (
|
|||
defaultConnectTimeout = 10 * time.Second
|
||||
defaultStreamTimeout = 10 * time.Second
|
||||
|
||||
defaultLoggerSamplerInterval = 1 * time.Second
|
||||
|
||||
defaultShutdownTimeout = 15 * time.Second
|
||||
|
||||
defaultPoolErrorThreshold uint32 = 100
|
||||
|
@ -60,6 +62,9 @@ const (
|
|||
|
||||
cfgReconnectInterval = "reconnect_interval"
|
||||
|
||||
cfgIndexPageEnabled = "index_page.enabled"
|
||||
cfgIndexPageTemplatePath = "index_page.template_path"
|
||||
|
||||
// Web.
|
||||
cfgWebReadBufferSize = "web.read_buffer_size"
|
||||
cfgWebWriteBufferSize = "web.write_buffer_size"
|
||||
|
@ -91,6 +96,11 @@ const (
|
|||
cfgLoggerLevel = "logger.level"
|
||||
cfgLoggerDestination = "logger.destination"
|
||||
|
||||
cfgLoggerSamplingEnabled = "logger.sampling.enabled"
|
||||
cfgLoggerSamplingInitial = "logger.sampling.initial"
|
||||
cfgLoggerSamplingThereafter = "logger.sampling.thereafter"
|
||||
cfgLoggerSamplingInterval = "logger.sampling.interval"
|
||||
|
||||
// Wallet.
|
||||
cfgWalletPassphrase = "wallet.passphrase"
|
||||
cfgWalletPath = "wallet.path"
|
||||
|
@ -188,10 +198,17 @@ func settings() *viper.Viper {
|
|||
// logger:
|
||||
v.SetDefault(cfgLoggerLevel, "debug")
|
||||
v.SetDefault(cfgLoggerDestination, "stdout")
|
||||
v.SetDefault(cfgLoggerSamplingEnabled, false)
|
||||
v.SetDefault(cfgLoggerSamplingThereafter, 100)
|
||||
v.SetDefault(cfgLoggerSamplingInitial, 100)
|
||||
v.SetDefault(cfgLoggerSamplingInterval, defaultLoggerSamplerInterval)
|
||||
|
||||
// pool:
|
||||
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
|
||||
|
||||
v.SetDefault(cfgIndexPageEnabled, false)
|
||||
v.SetDefault(cfgIndexPageTemplatePath, "")
|
||||
|
||||
// frostfs:
|
||||
v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut)
|
||||
|
||||
|
@ -386,9 +403,9 @@ func pickLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) {
|
|||
|
||||
switch dest {
|
||||
case destinationStdout:
|
||||
return newStdoutLogger(lvl)
|
||||
return newStdoutLogger(v, lvl)
|
||||
case destinationJournald:
|
||||
return newJournaldLogger(lvl)
|
||||
return newJournaldLogger(v, lvl)
|
||||
default:
|
||||
panic(fmt.Sprintf("wrong destination for logger: %s", dest))
|
||||
}
|
||||
|
@ -405,39 +422,59 @@ func pickLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) {
|
|||
// Logger records a stack trace for all messages at or above fatal level.
|
||||
//
|
||||
// See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace.
|
||||
func newStdoutLogger(lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) {
|
||||
c := zap.NewProductionConfig()
|
||||
c.Level = zap.NewAtomicLevelAt(lvl)
|
||||
c.Encoding = "console"
|
||||
c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
|
||||
func newStdoutLogger(v *viper.Viper, lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) {
|
||||
stdout := zapcore.AddSync(os.Stderr)
|
||||
level := zap.NewAtomicLevelAt(lvl)
|
||||
|
||||
l, err := c.Build(
|
||||
zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)),
|
||||
)
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("build zap logger instance: %v", err))
|
||||
}
|
||||
consoleOutCore := zapcore.NewCore(newLogEncoder(), stdout, level)
|
||||
consoleOutCore = samplingEnabling(v, consoleOutCore)
|
||||
|
||||
return l, c.Level
|
||||
l := zap.New(consoleOutCore, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)))
|
||||
return l, level
|
||||
}
|
||||
|
||||
func newJournaldLogger(lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) {
|
||||
c := zap.NewProductionConfig()
|
||||
c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
|
||||
c.Level = zap.NewAtomicLevelAt(lvl)
|
||||
func newJournaldLogger(v *viper.Viper, lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) {
|
||||
level := zap.NewAtomicLevelAt(lvl)
|
||||
|
||||
encoder := zapjournald.NewPartialEncoder(zapcore.NewConsoleEncoder(c.EncoderConfig), zapjournald.SyslogFields)
|
||||
encoder := zapjournald.NewPartialEncoder(newLogEncoder(), zapjournald.SyslogFields)
|
||||
|
||||
core := zapjournald.NewCore(c.Level, encoder, &journald.Journal{}, zapjournald.SyslogFields)
|
||||
core := zapjournald.NewCore(level, encoder, &journald.Journal{}, zapjournald.SyslogFields)
|
||||
coreWithContext := core.With([]zapcore.Field{
|
||||
zapjournald.SyslogFacility(zapjournald.LogDaemon),
|
||||
zapjournald.SyslogIdentifier(),
|
||||
zapjournald.SyslogPid(),
|
||||
})
|
||||
|
||||
coreWithContext = samplingEnabling(v, coreWithContext)
|
||||
|
||||
l := zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)))
|
||||
|
||||
return l, c.Level
|
||||
return l, level
|
||||
}
|
||||
|
||||
func newLogEncoder() zapcore.Encoder {
|
||||
c := zap.NewProductionEncoderConfig()
|
||||
c.EncodeTime = zapcore.ISO8601TimeEncoder
|
||||
|
||||
return zapcore.NewConsoleEncoder(c)
|
||||
}
|
||||
|
||||
func samplingEnabling(v *viper.Viper, core zapcore.Core) zapcore.Core {
|
||||
// Zap samples by logging the first cgfLoggerSamplingInitial entries with a given level
|
||||
// and message within the specified time interval.
|
||||
// In the above config, only the first cgfLoggerSamplingInitial log entries with the same level and message
|
||||
// are recorded in cfgLoggerSamplingInterval interval. Every other log entry will be dropped within the interval since
|
||||
// cfgLoggerSamplingThereafter is specified here.
|
||||
if v.GetBool(cfgLoggerSamplingEnabled) {
|
||||
core = zapcore.NewSamplerWithOptions(
|
||||
core,
|
||||
v.GetDuration(cfgLoggerSamplingInterval),
|
||||
v.GetInt(cfgLoggerSamplingInitial),
|
||||
v.GetInt(cfgLoggerSamplingThereafter),
|
||||
)
|
||||
}
|
||||
|
||||
return core
|
||||
}
|
||||
|
||||
func getLogLevel(v *viper.Viper) (zapcore.Level, error) {
|
||||
|
|
|
@ -14,8 +14,12 @@ HTTP_GW_PPROF_ADDRESS=localhost:8083
|
|||
HTTP_GW_PROMETHEUS_ENABLED=true
|
||||
HTTP_GW_PROMETHEUS_ADDRESS=localhost:8084
|
||||
|
||||
# Log level.
|
||||
# Logger.
|
||||
HTTP_GW_LOGGER_LEVEL=debug
|
||||
HTTP_GW_LOGGER_SAMPLING_ENABLED=false
|
||||
HTTP_GW_LOGGER_SAMPLING_INITIAL=100
|
||||
HTTP_GW_LOGGER_SAMPLING_THEREAFTER=100
|
||||
HTTP_GW_LOGGER_SAMPLING_INTERVAL=1s
|
||||
|
||||
HTTP_GW_SERVER_0_ADDRESS=0.0.0.0:443
|
||||
HTTP_GW_SERVER_0_TLS_ENABLED=false
|
||||
|
|
|
@ -18,6 +18,11 @@ tracing:
|
|||
logger:
|
||||
level: debug # Log level.
|
||||
destination: stdout
|
||||
sampling:
|
||||
enabled: false
|
||||
initial: 100
|
||||
thereafter: 100
|
||||
interval: 1s
|
||||
|
||||
server:
|
||||
- address: 0.0.0.0:8080
|
||||
|
@ -102,6 +107,11 @@ request_timeout: 5s # Timeout to check node health during rebalance.
|
|||
rebalance_timer: 30s # Interval to check nodes health.
|
||||
pool_error_threshold: 100 # The number of errors on connection after which node is considered as unhealthy.
|
||||
|
||||
# Enable index page to see objects list for specified container and prefix
|
||||
index_page:
|
||||
enabled: false
|
||||
template_path: internal/handler/templates/index.gotmpl
|
||||
|
||||
zip:
|
||||
compression: false # Enable zip compression to download files by common prefix.
|
||||
|
||||
|
@ -127,4 +137,4 @@ cache:
|
|||
|
||||
resolve_bucket:
|
||||
namespace_header: X-Frostfs-Namespace
|
||||
default_namespaces: [ "", "root" ]
|
||||
default_namespaces: [ "", "root" ]
|
||||
|
|
13
docs/api.md
13
docs/api.md
|
@ -95,12 +95,12 @@ The `filename` field from the multipart form will be set as `FileName` attribute
|
|||
|
||||
## Get object
|
||||
|
||||
Route: `/get/{cid}/{oid}?[download=true]`
|
||||
Route: `/get/{cid}/{oid}?[download=false]`
|
||||
|
||||
| Route parameter | Type | Description |
|
||||
|-----------------|--------|------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| `cid` | Single | Base58 encoded container ID or container name from NNS. |
|
||||
| `oid` | Single | Base58 encoded object ID. |
|
||||
| `cid` | Single | Base58 encoded `container ID` or `container name` from NNS or `bucket name`. |
|
||||
| `oid` | Single | Base58 encoded `object ID`. Also could be `S3 object name` if `cid` is specified as bucket name. |
|
||||
| `download` | Query | Set the `Content-Disposition` header as `attachment` in response.<br/> This make the browser to download object as file instead of showing it on the page. |
|
||||
|
||||
### Methods
|
||||
|
@ -141,6 +141,13 @@ Get an object (payload and attributes) by an address.
|
|||
| 400 | Some error occurred during object downloading. |
|
||||
| 404 | Container or object not found. |
|
||||
|
||||
###### Body
|
||||
|
||||
Returns object data. If request performed from browser, either displays raw data or downloads it as
|
||||
attachment if `download` query parameter is set to `true`.
|
||||
If `index_page.enabled` is set to `true`, returns HTML with index-page if no object with specified
|
||||
S3-name was found.
|
||||
|
||||
#### HEAD
|
||||
|
||||
Get an object attributes by an address.
|
||||
|
|
|
@ -57,6 +57,7 @@ $ cat http.log
|
|||
| `frostfs` | [Frostfs configuration](#frostfs-section) |
|
||||
| `cache` | [Cache configuration](#cache-section) |
|
||||
| `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) |
|
||||
| `index_page` | [Index page configuration](#index_page-section) |
|
||||
|
||||
|
||||
# General section
|
||||
|
@ -75,16 +76,16 @@ pool_error_threshold: 100
|
|||
reconnect_interval: 1m
|
||||
```
|
||||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|------------------------|------------|---------------|----------------|------------------------------------------------------------------------------------|
|
||||
| `rpc_endpoint` | `string` | yes | | The address of the RPC host to which the gateway connects to resolve bucket names. |
|
||||
| `resolve_order` | `[]string` | yes | `[nns, dns]` | Order of bucket name resolvers to use. |
|
||||
| `connect_timeout` | `duration` | | `10s` | Timeout to connect to a node. |
|
||||
| `stream_timeout` | `duration` | | `10s` | Timeout for individual operations in streaming RPC. |
|
||||
| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. |
|
||||
| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. |
|
||||
| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. |
|
||||
| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. |
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|------------------------|------------|---------------|---------------|-------------------------------------------------------------------------------------------------|
|
||||
| `rpc_endpoint` | `string` | yes | | The address of the RPC host to which the gateway connects to resolve bucket names. |
|
||||
| `resolve_order` | `[]string` | yes | `[nns, dns]` | Order of bucket name resolvers to use. |
|
||||
| `connect_timeout` | `duration` | | `10s` | Timeout to connect to a node. |
|
||||
| `stream_timeout` | `duration` | | `10s` | Timeout for individual operations in streaming RPC. |
|
||||
| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. |
|
||||
| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. |
|
||||
| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. |
|
||||
| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. |
|
||||
|
||||
# `wallet` section
|
||||
|
||||
|
@ -164,12 +165,21 @@ server:
|
|||
logger:
|
||||
level: debug
|
||||
destination: stdout
|
||||
sampling:
|
||||
enabled: false
|
||||
initial: 100
|
||||
thereafter: 100
|
||||
interval: 1s
|
||||
```
|
||||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|---------------|----------|---------------|---------------|----------------------------------------------------------------------------------------------------|
|
||||
| `level` | `string` | yes | `debug` | Logging level.<br/>Possible values: `debug`, `info`, `warn`, `error`, `dpanic`, `panic`, `fatal`. |
|
||||
| `destination` | `string` | no | `stdout` | Destination for logger: `stdout` or `journald` |
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|-----------------------|------------|---------------|---------------|----------------------------------------------------------------------------------------------------|
|
||||
| `level` | `string` | yes | `debug` | Logging level.<br/>Possible values: `debug`, `info`, `warn`, `error`, `dpanic`, `panic`, `fatal`. |
|
||||
| `destination` | `string` | no | `stdout` | Destination for logger: `stdout` or `journald` |
|
||||
| `sampling.enabled` | `bool` | no | false | Sampling enabling flag. |
|
||||
| `sampling.initial` | `int` | no | '100' | Sampling count of first log entries. |
|
||||
| `sampling.thereafter` | `int` | no | '100' | Sampling count of entries after an `interval`. |
|
||||
| `sampling.interval` | `duration` | no | '1s' | Sampling interval of messaging similar entries. |
|
||||
|
||||
# `web` section
|
||||
|
||||
|
@ -337,4 +347,19 @@ resolve_bucket:
|
|||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|----------------------|------------|---------------|-----------------------|--------------------------------------------------------------------------------------------------------------------------|
|
||||
| `namespace_header` | `string` | yes | `X-Frostfs-Namespace` | Header to determine zone to resolve bucket name. |
|
||||
| `default_namespaces` | `[]string` | yes | ["","root"] | Namespaces that should be handled as default. |
|
||||
| `default_namespaces` | `[]string` | yes | ["","root"] | Namespaces that should be handled as default. |
|
||||
|
||||
# `index_page` section
|
||||
|
||||
Parameters for index HTML-page output with S3-bucket or S3-subdir content for `Get object` request
|
||||
|
||||
```yaml
|
||||
index_page:
|
||||
enabled: false
|
||||
template_path: ""
|
||||
```
|
||||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|-----------------|----------|---------------|---------------|---------------------------------------------------------------------------------|
|
||||
| `enabled` | `bool` | yes | `false` | Flag to enable index_page return if no object with specified S3-name was found. |
|
||||
| `template_path` | `string` | yes | `""` | Path to .gotmpl file with html template for index_page. |
|
||||
|
|
2
go.mod
2
go.mod
|
@ -8,6 +8,7 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240918095938-e580ee991d98
|
||||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
||||
github.com/bluele/gcache v0.0.2
|
||||
github.com/docker/go-units v0.4.0
|
||||
github.com/fasthttp/router v1.4.1
|
||||
github.com/nspcc-dev/neo-go v0.106.2
|
||||
github.com/prometheus/client_golang v1.19.0
|
||||
|
@ -50,7 +51,6 @@ require (
|
|||
github.com/docker/distribution v2.8.1+incompatible // indirect
|
||||
github.com/docker/docker v20.10.14+incompatible // indirect
|
||||
github.com/docker/go-connections v0.4.0 // indirect
|
||||
github.com/docker/go-units v0.4.0 // indirect
|
||||
github.com/fsnotify/fsnotify v1.6.0 // indirect
|
||||
github.com/go-logr/logr v1.4.2 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
type NodeVersion struct {
|
||||
BaseNodeVersion
|
||||
DeleteMarker bool
|
||||
IsPrefixNode bool
|
||||
}
|
||||
|
||||
// BaseNodeVersion is minimal node info from tree service.
|
||||
|
|
|
@ -4,7 +4,9 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
||||
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
|
||||
|
@ -15,16 +17,16 @@ type GetNodeByPathResponseInfoWrapper struct {
|
|||
response *grpcService.GetNodeByPathResponse_Info
|
||||
}
|
||||
|
||||
func (n GetNodeByPathResponseInfoWrapper) GetNodeID() uint64 {
|
||||
return n.response.GetNodeId()
|
||||
func (n GetNodeByPathResponseInfoWrapper) GetNodeID() []uint64 {
|
||||
return []uint64{n.response.GetNodeId()}
|
||||
}
|
||||
|
||||
func (n GetNodeByPathResponseInfoWrapper) GetParentID() uint64 {
|
||||
return n.response.GetParentId()
|
||||
func (n GetNodeByPathResponseInfoWrapper) GetParentID() []uint64 {
|
||||
return []uint64{n.response.GetParentId()}
|
||||
}
|
||||
|
||||
func (n GetNodeByPathResponseInfoWrapper) GetTimestamp() uint64 {
|
||||
return n.response.GetTimestamp()
|
||||
func (n GetNodeByPathResponseInfoWrapper) GetTimestamp() []uint64 {
|
||||
return []uint64{n.response.GetTimestamp()}
|
||||
}
|
||||
|
||||
func (n GetNodeByPathResponseInfoWrapper) GetMeta() []tree.Meta {
|
||||
|
@ -89,3 +91,73 @@ func handleError(err error) error {
|
|||
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32, sort bool) ([]tree.NodeResponse, error) {
|
||||
order := treepool.NoneOrder
|
||||
if sort {
|
||||
order = treepool.AscendingOrder
|
||||
}
|
||||
poolPrm := treepool.GetSubTreeParams{
|
||||
CID: bktInfo.CID,
|
||||
TreeID: treeID,
|
||||
RootID: rootID,
|
||||
Depth: depth,
|
||||
BearerToken: getBearer(ctx),
|
||||
Order: order,
|
||||
}
|
||||
if len(rootID) == 1 && rootID[0] == 0 {
|
||||
// storage node interprets 'nil' value as []uint64{0}
|
||||
// gate wants to send 'nil' value instead of []uint64{0}, because
|
||||
// it provides compatibility with previous tree service api where
|
||||
// single uint64(0) value is dropped from signature
|
||||
poolPrm.RootID = nil
|
||||
}
|
||||
|
||||
subTreeReader, err := w.p.GetSubTree(ctx, poolPrm)
|
||||
if err != nil {
|
||||
return nil, handleError(err)
|
||||
}
|
||||
|
||||
var subtree []tree.NodeResponse
|
||||
|
||||
node, err := subTreeReader.Next()
|
||||
for err == nil {
|
||||
subtree = append(subtree, GetSubTreeResponseBodyWrapper{node})
|
||||
node, err = subTreeReader.Next()
|
||||
}
|
||||
if err != io.EOF {
|
||||
return nil, handleError(err)
|
||||
}
|
||||
|
||||
return subtree, nil
|
||||
}
|
||||
|
||||
type GetSubTreeResponseBodyWrapper struct {
|
||||
response *grpcService.GetSubTreeResponse_Body
|
||||
}
|
||||
|
||||
func (n GetSubTreeResponseBodyWrapper) GetNodeID() []uint64 {
|
||||
return n.response.GetNodeId()
|
||||
}
|
||||
|
||||
func (n GetSubTreeResponseBodyWrapper) GetParentID() []uint64 {
|
||||
resp := n.response.GetParentId()
|
||||
if resp == nil {
|
||||
// storage sends nil that should be interpreted as []uint64{0}
|
||||
// due to protobuf compatibility, see 'GetSubTree' function
|
||||
return []uint64{0}
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
||||
func (n GetSubTreeResponseBodyWrapper) GetTimestamp() []uint64 {
|
||||
return n.response.GetTimestamp()
|
||||
}
|
||||
|
||||
func (n GetSubTreeResponseBodyWrapper) GetMeta() []tree.Meta {
|
||||
res := make([]tree.Meta, len(n.response.Meta))
|
||||
for i, value := range n.response.Meta {
|
||||
res[i] = value
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
|
157
internal/handler/browse.go
Normal file
157
internal/handler/browse.go
Normal file
|
@ -0,0 +1,157 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"html/template"
|
||||
"net/url"
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"github.com/docker/go-units"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
dateFormat = "02-01-2006 15:04"
|
||||
attrOID = "OID"
|
||||
attrCreated = "Created"
|
||||
attrFileName = "FileName"
|
||||
attrSize = "Size"
|
||||
)
|
||||
|
||||
type (
|
||||
BrowsePageData struct {
|
||||
BucketName,
|
||||
Prefix string
|
||||
Objects []ResponseObject
|
||||
}
|
||||
ResponseObject struct {
|
||||
OID string
|
||||
Created string
|
||||
FileName string
|
||||
Size string
|
||||
IsDir bool
|
||||
}
|
||||
)
|
||||
|
||||
func parseTimestamp(tstamp string) (time.Time, error) {
|
||||
millis, err := strconv.ParseInt(tstamp, 10, 64)
|
||||
if err != nil {
|
||||
return time.Time{}, err
|
||||
}
|
||||
|
||||
return time.UnixMilli(millis), nil
|
||||
}
|
||||
|
||||
func NewResponseObject(nodes map[string]string) ResponseObject {
|
||||
return ResponseObject{
|
||||
OID: nodes[attrOID],
|
||||
Created: nodes[attrCreated],
|
||||
FileName: nodes[attrFileName],
|
||||
Size: nodes[attrSize],
|
||||
IsDir: nodes[attrOID] == "",
|
||||
}
|
||||
}
|
||||
|
||||
func formatTimestamp(strdate string) string {
|
||||
date, err := parseTimestamp(strdate)
|
||||
if err != nil || date.IsZero() {
|
||||
return ""
|
||||
}
|
||||
|
||||
return date.Format(dateFormat)
|
||||
}
|
||||
|
||||
func formatSize(strsize string) string {
|
||||
size, err := strconv.ParseFloat(strsize, 64)
|
||||
if err != nil {
|
||||
return "0B"
|
||||
}
|
||||
return units.HumanSize(size)
|
||||
}
|
||||
|
||||
func parentDir(prefix string) string {
|
||||
index := strings.LastIndex(prefix, "/")
|
||||
if index == -1 {
|
||||
return prefix
|
||||
}
|
||||
return prefix[index:]
|
||||
}
|
||||
|
||||
func trimPrefix(encPrefix string) string {
|
||||
prefix, err := url.PathUnescape(encPrefix)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
slashIndex := strings.LastIndex(prefix, "/")
|
||||
if slashIndex == -1 {
|
||||
return ""
|
||||
}
|
||||
return prefix[:slashIndex]
|
||||
}
|
||||
|
||||
func urlencode(prefix, filename string) string {
|
||||
var res strings.Builder
|
||||
path := filename
|
||||
if prefix != "" {
|
||||
path = strings.Join([]string{prefix, filename}, "/")
|
||||
}
|
||||
prefixParts := strings.Split(path, "/")
|
||||
for _, prefixPart := range prefixParts {
|
||||
prefixPart = "/" + url.PathEscape(prefixPart)
|
||||
if prefixPart == "/." || prefixPart == "/.." {
|
||||
prefixPart = url.PathEscape(prefixPart)
|
||||
}
|
||||
res.WriteString(prefixPart)
|
||||
}
|
||||
|
||||
return res.String()
|
||||
}
|
||||
|
||||
func (h *Handler) browseObjects(c *fasthttp.RequestCtx, bucketInfo *data.BucketInfo, prefix string) {
|
||||
log := h.log.With(zap.String("bucket", bucketInfo.Name))
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
nodes, err := h.listObjects(ctx, bucketInfo, prefix)
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
|
||||
respObjects := make([]ResponseObject, len(nodes))
|
||||
|
||||
for i, node := range nodes {
|
||||
respObjects[i] = NewResponseObject(node)
|
||||
}
|
||||
|
||||
sort.Slice(respObjects, func(i, j int) bool {
|
||||
if respObjects[i].IsDir == respObjects[j].IsDir {
|
||||
return respObjects[i].FileName < respObjects[j].FileName
|
||||
}
|
||||
return respObjects[i].IsDir
|
||||
})
|
||||
indexTemplate := h.config.IndexPageTemplate()
|
||||
|
||||
tmpl, err := template.New("index").Funcs(template.FuncMap{
|
||||
"formatTimestamp": formatTimestamp,
|
||||
"formatSize": formatSize,
|
||||
"trimPrefix": trimPrefix,
|
||||
"urlencode": urlencode,
|
||||
"parentDir": parentDir,
|
||||
}).Parse(indexTemplate)
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
if err = tmpl.Execute(c, &BrowsePageData{
|
||||
BucketName: bucketInfo.Name,
|
||||
Prefix: prefix,
|
||||
Objects: respObjects,
|
||||
}); err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
}
|
|
@ -18,6 +18,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -81,19 +82,25 @@ func (h *Handler) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer,
|
|||
|
||||
// DownloadZipped handles zip by prefix requests.
|
||||
func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
|
||||
scid, _ := c.UserValue("cid").(string)
|
||||
prefix, _ := c.UserValue("prefix").(string)
|
||||
var (
|
||||
scid, _ = c.UserValue("cid").(string)
|
||||
prefix, _ = c.UserValue("prefix").(string)
|
||||
log = h.log
|
||||
ctx = utils.GetContextFromRequest(c)
|
||||
)
|
||||
|
||||
if traceID := trace.SpanFromContext(ctx).SpanContext().TraceID(); traceID.IsValid() {
|
||||
log = log.With(zap.String("trace_id", traceID.String()))
|
||||
}
|
||||
|
||||
prefix, err := url.QueryUnescape(prefix)
|
||||
if err != nil {
|
||||
h.log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("prefix", prefix), zap.Uint64("id", c.ID()), zap.Error(err))
|
||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("prefix", prefix), zap.Uint64("id", c.ID()), zap.Error(err))
|
||||
response.Error(c, "could not unescape prefix: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
log := h.log.With(zap.String("cid", scid), zap.String("prefix", prefix), zap.Uint64("id", c.ID()))
|
||||
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
log = log.With(zap.String("cid", scid), zap.String("prefix", prefix), zap.Uint64("id", c.ID()))
|
||||
|
||||
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
||||
if err != nil {
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -30,6 +31,8 @@ type Config interface {
|
|||
DefaultTimestamp() bool
|
||||
ZipCompression() bool
|
||||
ClientCut() bool
|
||||
IndexPageEnabled() bool
|
||||
IndexPageTemplate() string
|
||||
BufferMaxSizeForPut() uint64
|
||||
NamespaceHeader() string
|
||||
}
|
||||
|
@ -182,9 +185,12 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
|
|||
idCnr, _ = c.UserValue("cid").(string)
|
||||
idObj, _ = c.UserValue("oid").(string)
|
||||
log = h.log.With(zap.String("cid", idCnr), zap.String("oid", idObj))
|
||||
ctx = utils.GetContextFromRequest(c)
|
||||
)
|
||||
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
if traceID := trace.SpanFromContext(ctx).SpanContext().TraceID(); traceID.IsValid() {
|
||||
log = log.With(zap.String("trace_id", traceID.String()))
|
||||
}
|
||||
|
||||
bktInfo, err := h.getBucketInfo(ctx, idCnr, log)
|
||||
if err != nil {
|
||||
|
@ -208,41 +214,53 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
|
|||
|
||||
// byObjectName is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||
// prepares request and object address to it.
|
||||
func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
func (h *Handler) byObjectName(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
var (
|
||||
bucketname = req.UserValue("cid").(string)
|
||||
key = req.UserValue("oid").(string)
|
||||
bucketname = c.UserValue("cid").(string)
|
||||
key = c.UserValue("oid").(string)
|
||||
log = h.log.With(zap.String("bucketname", bucketname), zap.String("key", key))
|
||||
download = c.QueryArgs().GetBool("download")
|
||||
ctx = utils.GetContextFromRequest(c)
|
||||
)
|
||||
|
||||
if traceID := trace.SpanFromContext(ctx).SpanContext().TraceID(); traceID.IsValid() {
|
||||
log = log.With(zap.String("trace_id", traceID.String()))
|
||||
}
|
||||
|
||||
unescapedKey, err := url.QueryUnescape(key)
|
||||
if err != nil {
|
||||
logAndSendBucketError(req, log, err)
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
|
||||
ctx := utils.GetContextFromRequest(req)
|
||||
|
||||
bktInfo, err := h.getBucketInfo(ctx, bucketname, log)
|
||||
if err != nil {
|
||||
logAndSendBucketError(req, log, err)
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
|
||||
foundOid, err := h.tree.GetLatestVersion(ctx, &bktInfo.CID, unescapedKey)
|
||||
if h.config.IndexPageEnabled() && !download && string(c.Method()) != fasthttp.MethodHead {
|
||||
if isDir(unescapedKey) || isContainerRoot(unescapedKey) {
|
||||
if code := checkErrorType(err); code == fasthttp.StatusNotFound || code == fasthttp.StatusOK {
|
||||
c.SetStatusCode(code)
|
||||
h.browseObjects(c, bktInfo, unescapedKey)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
if errors.Is(err, tree.ErrNodeAccessDenied) {
|
||||
response.Error(req, "Access Denied", fasthttp.StatusForbidden)
|
||||
return
|
||||
response.Error(c, "Access Denied", fasthttp.StatusForbidden)
|
||||
} else {
|
||||
response.Error(c, "object wasn't found", fasthttp.StatusNotFound)
|
||||
log.Error(logs.GetLatestObjectVersion, zap.Error(err))
|
||||
}
|
||||
log.Error(logs.GetLatestObjectVersion, zap.Error(err))
|
||||
|
||||
response.Error(req, "object wasn't found", fasthttp.StatusNotFound)
|
||||
return
|
||||
}
|
||||
if foundOid.DeleteMarker {
|
||||
log.Error(logs.ObjectWasDeleted)
|
||||
response.Error(req, "object deleted", fasthttp.StatusNotFound)
|
||||
response.Error(c, "object deleted", fasthttp.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -250,32 +268,38 @@ func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context,
|
|||
addr.SetContainer(bktInfo.CID)
|
||||
addr.SetObject(foundOid.OID)
|
||||
|
||||
f(ctx, *h.newRequest(req, log), addr)
|
||||
f(ctx, *h.newRequest(c, log), addr)
|
||||
}
|
||||
|
||||
// byAttribute is a wrapper similar to byAddress.
|
||||
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
scid, _ := c.UserValue("cid").(string)
|
||||
key, _ := c.UserValue("attr_key").(string)
|
||||
val, _ := c.UserValue("attr_val").(string)
|
||||
var (
|
||||
scid, _ = c.UserValue("cid").(string)
|
||||
key, _ = c.UserValue("attr_key").(string)
|
||||
val, _ = c.UserValue("attr_val").(string)
|
||||
log = h.log
|
||||
ctx = utils.GetContextFromRequest(c)
|
||||
)
|
||||
|
||||
if traceID := trace.SpanFromContext(ctx).SpanContext().TraceID(); traceID.IsValid() {
|
||||
log = log.With(zap.String("trace_id", traceID.String()))
|
||||
}
|
||||
|
||||
key, err := url.QueryUnescape(key)
|
||||
if err != nil {
|
||||
h.log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("attr_key", key), zap.Uint64("id", c.ID()), zap.Error(err))
|
||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("attr_key", key), zap.Uint64("id", c.ID()), zap.Error(err))
|
||||
response.Error(c, "could not unescape attr_key: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
val, err = url.QueryUnescape(val)
|
||||
if err != nil {
|
||||
h.log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("attr_val", val), zap.Uint64("id", c.ID()), zap.Error(err))
|
||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("attr_val", val), zap.Uint64("id", c.ID()), zap.Error(err))
|
||||
response.Error(c, "could not unescape attr_val: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
log := h.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
|
||||
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
log = log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
|
||||
|
||||
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
||||
if err != nil {
|
||||
|
@ -379,3 +403,25 @@ func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.Bucket
|
|||
|
||||
return bktInfo, err
|
||||
}
|
||||
|
||||
func (h *Handler) listObjects(ctx context.Context, bucketInfo *data.BucketInfo, prefix string) ([]map[string]string, error) {
|
||||
nodes, _, err := h.tree.GetSubTreeByPrefix(ctx, bucketInfo, prefix, true)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var objects = make([]map[string]string, 0, len(nodes))
|
||||
for _, node := range nodes {
|
||||
meta := node.GetMeta()
|
||||
if meta == nil {
|
||||
continue
|
||||
}
|
||||
var obj = make(map[string]string, len(meta))
|
||||
for _, m := range meta {
|
||||
obj[m.GetKey()] = string(m.GetValue())
|
||||
}
|
||||
objects = append(objects, obj)
|
||||
}
|
||||
|
||||
return objects, nil
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
||||
|
@ -37,6 +38,10 @@ func (t *treeClientMock) GetNodes(context.Context, *tree.GetNodesParams) ([]tree
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
func (t *treeClientMock) GetSubTree(context.Context, *data.BucketInfo, string, []uint64, uint32, bool) ([]tree.NodeResponse, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type configMock struct {
|
||||
}
|
||||
|
||||
|
@ -48,6 +53,17 @@ func (c *configMock) ZipCompression() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (c *configMock) IndexPageEnabled() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *configMock) IndexPageTemplatePath() string {
|
||||
return ""
|
||||
}
|
||||
func (c *configMock) IndexPageTemplate() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (c *configMock) ClientCut() bool {
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -53,6 +53,7 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
|||
dis = "inline"
|
||||
start = time.Now()
|
||||
filename string
|
||||
filepath string
|
||||
)
|
||||
|
||||
prm := PrmObjectGet{
|
||||
|
@ -104,6 +105,8 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
|||
time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
||||
case object.AttributeContentType:
|
||||
contentType = val
|
||||
case object.AttributeFilePath:
|
||||
filepath = val
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -135,6 +138,10 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objectAddress oi
|
|||
}
|
||||
req.SetContentType(contentType)
|
||||
|
||||
if filename == "" {
|
||||
filename = filepath
|
||||
}
|
||||
|
||||
req.Response.Header.Set(fasthttp.HeaderContentDisposition, dis+"; filename="+path.Base(filename))
|
||||
|
||||
req.Response.SetBodyStream(rObj.Payload, int(payloadSize))
|
||||
|
|
|
@ -16,6 +16,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -43,22 +44,25 @@ func (pr *putResponse) encode(w io.Writer) error {
|
|||
}
|
||||
|
||||
// Upload handles multipart upload request.
|
||||
func (h *Handler) Upload(req *fasthttp.RequestCtx) {
|
||||
func (h *Handler) Upload(c *fasthttp.RequestCtx) {
|
||||
var (
|
||||
file MultipartFile
|
||||
idObj oid.ID
|
||||
addr oid.Address
|
||||
scid, _ = req.UserValue("cid").(string)
|
||||
scid, _ = c.UserValue("cid").(string)
|
||||
log = h.log.With(zap.String("cid", scid))
|
||||
bodyStream = req.RequestBodyStream()
|
||||
bodyStream = c.RequestBodyStream()
|
||||
drainBuf = make([]byte, drainBufSize)
|
||||
ctx = utils.GetContextFromRequest(c)
|
||||
)
|
||||
|
||||
ctx := utils.GetContextFromRequest(req)
|
||||
if traceID := trace.SpanFromContext(ctx).SpanContext().TraceID(); traceID.IsValid() {
|
||||
log = log.With(zap.String("trace_id", traceID.String()))
|
||||
}
|
||||
|
||||
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
||||
if err != nil {
|
||||
logAndSendBucketError(req, log, err)
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -75,21 +79,23 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
|
|||
zap.Error(err),
|
||||
)
|
||||
}()
|
||||
boundary := string(req.Request.Header.MultipartFormBoundary())
|
||||
if file, err = fetchMultipartFile(h.log, bodyStream, boundary); err != nil {
|
||||
|
||||
boundary := string(c.Request.Header.MultipartFormBoundary())
|
||||
if file, err = fetchMultipartFile(log, bodyStream, boundary); err != nil {
|
||||
log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err))
|
||||
response.Error(req, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
response.Error(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
filtered, err := filterHeaders(h.log, &req.Request.Header)
|
||||
|
||||
filtered, err := filterHeaders(log, &c.Request.Header)
|
||||
if err != nil {
|
||||
log.Error(logs.CouldNotProcessHeaders, zap.Error(err))
|
||||
response.Error(req, err.Error(), fasthttp.StatusBadRequest)
|
||||
response.Error(c, err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
if rawHeader := req.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
|
||||
if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
|
||||
if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil {
|
||||
log.Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err))
|
||||
} else {
|
||||
|
@ -97,9 +103,9 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
|
|||
}
|
||||
}
|
||||
|
||||
if err = utils.PrepareExpirationHeader(req, h.frostfs, filtered, now); err != nil {
|
||||
if err = utils.PrepareExpirationHeader(c, h.frostfs, filtered, now); err != nil {
|
||||
log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err))
|
||||
response.Error(req, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
response.Error(c, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -143,7 +149,7 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
|
|||
}
|
||||
|
||||
if idObj, err = h.frostfs.CreateObject(ctx, prm); err != nil {
|
||||
h.handlePutFrostFSErr(req, err)
|
||||
h.handlePutFrostFSErr(c, err, log)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -151,9 +157,9 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
|
|||
addr.SetContainer(bktInfo.CID)
|
||||
|
||||
// Try to return the response, otherwise, if something went wrong, throw an error.
|
||||
if err = newPutResponse(addr).encode(req); err != nil {
|
||||
if err = newPutResponse(addr).encode(c); err != nil {
|
||||
log.Error(logs.CouldNotEncodeResponse, zap.Error(err))
|
||||
response.Error(req, "could not encode response", fasthttp.StatusBadRequest)
|
||||
response.Error(c, "could not encode response", fasthttp.StatusBadRequest)
|
||||
|
||||
return
|
||||
}
|
||||
|
@ -170,15 +176,15 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
|
|||
}
|
||||
}
|
||||
// Report status code and content type.
|
||||
req.Response.SetStatusCode(fasthttp.StatusOK)
|
||||
req.Response.Header.SetContentType(jsonHeader)
|
||||
c.Response.SetStatusCode(fasthttp.StatusOK)
|
||||
c.Response.Header.SetContentType(jsonHeader)
|
||||
}
|
||||
|
||||
func (h *Handler) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error) {
|
||||
func (h *Handler) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error, log *zap.Logger) {
|
||||
statusCode, msg, additionalFields := response.FormErrorResponse("could not store file in frostfs", err)
|
||||
logFields := append([]zap.Field{zap.Error(err)}, additionalFields...)
|
||||
|
||||
h.log.Error(logs.CouldNotStoreFileInFrostfs, logFields...)
|
||||
log.Error(logs.CouldNotStoreFileInFrostfs, logFields...)
|
||||
response.Error(r, msg, statusCode)
|
||||
}
|
||||
|
||||
|
|
|
@ -2,12 +2,14 @@ package handler
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
"github.com/valyala/fasthttp"
|
||||
|
@ -38,6 +40,25 @@ func bearerToken(ctx context.Context) *bearer.Token {
|
|||
return nil
|
||||
}
|
||||
|
||||
func isDir(name string) bool {
|
||||
return strings.HasSuffix(name, "/")
|
||||
}
|
||||
|
||||
func isContainerRoot(key string) bool {
|
||||
return key == ""
|
||||
}
|
||||
|
||||
func checkErrorType(err error) int {
|
||||
switch {
|
||||
case err == nil:
|
||||
return fasthttp.StatusOK
|
||||
case errors.Is(err, tree.ErrNodeAccessDenied):
|
||||
return fasthttp.StatusForbidden
|
||||
default:
|
||||
return fasthttp.StatusNotFound
|
||||
}
|
||||
}
|
||||
|
||||
func isValidToken(s string) bool {
|
||||
for _, c := range s {
|
||||
if c <= ' ' || c > 127 {
|
||||
|
|
|
@ -31,6 +31,8 @@ const (
|
|||
CouldNotStoreFileInFrostfs = "could not store file in frostfs" // Error in ../../uploader/upload.go
|
||||
AddAttributeToResultObject = "add attribute to result object" // Debug in ../../uploader/filter.go
|
||||
FailedToCreateResolver = "failed to create resolver" // Fatal in ../../app.go
|
||||
FailedToReadIndexPageTemplate = "failed to read index page template, set default" // Warn in ../../app.go
|
||||
SetCustomIndexPageTemplate = "set custom index page template" // Info in ../../app.go
|
||||
ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty = "container resolver will be disabled because of resolvers 'resolver_order' is empty" // Info in ../../app.go
|
||||
MetricsAreDisabled = "metrics are disabled" // Warn in ../../app.go
|
||||
NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun = "no wallet path specified, creating ephemeral key automatically for this run" // Info in ../../app.go
|
||||
|
|
90
internal/templates/index.gotmpl
Normal file
90
internal/templates/index.gotmpl
Normal file
|
@ -0,0 +1,90 @@
|
|||
{{$bucketName := .BucketName}}
|
||||
{{ $prefix := trimPrefix .Prefix }}
|
||||
<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8"/>
|
||||
<title>Index of s3://{{$bucketName}}/{{if $prefix}}/{{$prefix}}/{{end}}</title>
|
||||
<style>
|
||||
table {
|
||||
width: 80%;
|
||||
border-collapse: collapse;
|
||||
}
|
||||
body {
|
||||
background: #f2f2f2;
|
||||
}
|
||||
table, th, td {
|
||||
border: 0 solid transparent;
|
||||
}
|
||||
th, td {
|
||||
padding: 10px;
|
||||
text-align: left;
|
||||
}
|
||||
th {
|
||||
background-color: #c3bcbc;
|
||||
}
|
||||
tr:nth-child(even) {background-color: #ebe7e7;}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<h1>Index of s3://{{$bucketName}}/{{if $prefix}}{{$prefix}}/{{end}}</h1>
|
||||
<table>
|
||||
<thead>
|
||||
<tr>
|
||||
<th>Filename</th>
|
||||
<th>Size</th>
|
||||
<th>Created</th>
|
||||
<th>Download</th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody>
|
||||
{{ $trimmedPrefix := trimPrefix $prefix }}
|
||||
{{if $trimmedPrefix }}
|
||||
<tr>
|
||||
<td>
|
||||
⮐<a href="/get/{{$bucketName}}{{ urlencode $trimmedPrefix "" }}">..</a>
|
||||
</td>
|
||||
<td></td>
|
||||
<td></td>
|
||||
<td></td>
|
||||
</tr>
|
||||
{{else}}
|
||||
<tr>
|
||||
<td>
|
||||
⮐<a href="/get/{{ $bucketName }}/">..</a>
|
||||
</td>
|
||||
<td></td>
|
||||
<td></td>
|
||||
<td></td>
|
||||
</tr>
|
||||
{{end}}
|
||||
{{range .Objects}}
|
||||
<tr>
|
||||
<td>
|
||||
{{if .IsDir}}
|
||||
🗀
|
||||
<a href="/get/{{ $bucketName }}{{ urlencode $prefix .FileName }}/">
|
||||
{{.FileName}}/
|
||||
</a>
|
||||
{{else}}
|
||||
🗎
|
||||
<a href="/get/{{ $bucketName }}{{ urlencode $prefix .FileName }}">
|
||||
{{.FileName}}
|
||||
</a>
|
||||
{{end}}
|
||||
</td>
|
||||
<td>{{if not .IsDir}}{{ formatSize .Size }}{{end}}</td>
|
||||
<td>{{if not .IsDir}}{{ formatTimestamp .Created }}{{end}}</td>
|
||||
<td>
|
||||
{{ if not .IsDir }}
|
||||
<a href="/get/{{ $bucketName}}{{ urlencode $prefix .FileName }}?download=true">
|
||||
Link
|
||||
</a>
|
||||
{{ end }}
|
||||
</td>
|
||||
</tr>
|
||||
{{end}}
|
||||
</tbody>
|
||||
</table>
|
||||
</body>
|
||||
</html>
|
6
internal/templates/template.go
Normal file
6
internal/templates/template.go
Normal file
|
@ -0,0 +1,6 @@
|
|||
package templates
|
||||
|
||||
import _ "embed"
|
||||
|
||||
//go:embed index.gotmpl
|
||||
var DefaultIndexTemplate string
|
|
@ -52,8 +52,8 @@ func BearerTokenFromCookie(h *fasthttp.RequestHeader) []byte {
|
|||
|
||||
// StoreBearerTokenAppCtx extracts a bearer token from the header or cookie and stores
|
||||
// it in the application context.
|
||||
func StoreBearerTokenAppCtx(ctx context.Context, req *fasthttp.RequestCtx) (context.Context, error) {
|
||||
tkn, err := fetchBearerToken(req)
|
||||
func StoreBearerTokenAppCtx(ctx context.Context, c *fasthttp.RequestCtx) (context.Context, error) {
|
||||
tkn, err := fetchBearerToken(c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
204
tree/tree.go
204
tree/tree.go
|
@ -2,11 +2,13 @@ package tree
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/api/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
@ -20,6 +22,7 @@ type (
|
|||
// Each method must return ErrNodeNotFound or ErrNodeAccessDenied if relevant.
|
||||
ServiceClient interface {
|
||||
GetNodes(ctx context.Context, p *GetNodesParams) ([]NodeResponse, error)
|
||||
GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32, sort bool) ([]NodeResponse, error)
|
||||
}
|
||||
|
||||
treeNode struct {
|
||||
|
@ -29,6 +32,7 @@ type (
|
|||
|
||||
GetNodesParams struct {
|
||||
CnrID cid.ID
|
||||
BktInfo *data.BucketInfo
|
||||
TreeID string
|
||||
Path []string
|
||||
Meta []string
|
||||
|
@ -54,6 +58,7 @@ const (
|
|||
|
||||
// keys for delete marker nodes.
|
||||
isDeleteMarkerKV = "IsDeleteMarker"
|
||||
sizeKV = "Size"
|
||||
|
||||
// versionTree -- ID of a tree with object versions.
|
||||
versionTree = "version"
|
||||
|
@ -73,26 +78,28 @@ type Meta interface {
|
|||
|
||||
type NodeResponse interface {
|
||||
GetMeta() []Meta
|
||||
GetTimestamp() uint64
|
||||
GetTimestamp() []uint64
|
||||
GetNodeID() []uint64
|
||||
GetParentID() []uint64
|
||||
}
|
||||
|
||||
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
|
||||
treeNode := &treeNode{
|
||||
tNode := &treeNode{
|
||||
Meta: make(map[string]string, len(nodeInfo.GetMeta())),
|
||||
}
|
||||
|
||||
for _, kv := range nodeInfo.GetMeta() {
|
||||
switch kv.GetKey() {
|
||||
case oidKV:
|
||||
if err := treeNode.ObjID.DecodeString(string(kv.GetValue())); err != nil {
|
||||
if err := tNode.ObjID.DecodeString(string(kv.GetValue())); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
default:
|
||||
treeNode.Meta[kv.GetKey()] = string(kv.GetValue())
|
||||
tNode.Meta[kv.GetKey()] = string(kv.GetValue())
|
||||
}
|
||||
}
|
||||
|
||||
return treeNode, nil
|
||||
return tNode, nil
|
||||
}
|
||||
|
||||
func (n *treeNode) Get(key string) (string, bool) {
|
||||
|
@ -106,29 +113,44 @@ func (n *treeNode) FileName() (string, bool) {
|
|||
}
|
||||
|
||||
func newNodeVersion(node NodeResponse) (*api.NodeVersion, error) {
|
||||
treeNode, err := newTreeNode(node)
|
||||
tNode, err := newTreeNode(node)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid tree node: %w", err)
|
||||
}
|
||||
|
||||
return newNodeVersionFromTreeNode(treeNode), nil
|
||||
return newNodeVersionFromTreeNode(tNode), nil
|
||||
}
|
||||
|
||||
func newNodeVersionFromTreeNode(treeNode *treeNode) *api.NodeVersion {
|
||||
_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
|
||||
|
||||
size, _ := treeNode.Get(sizeKV)
|
||||
version := &api.NodeVersion{
|
||||
BaseNodeVersion: api.BaseNodeVersion{
|
||||
OID: treeNode.ObjID,
|
||||
},
|
||||
DeleteMarker: isDeleteMarker,
|
||||
IsPrefixNode: size == "",
|
||||
}
|
||||
|
||||
return version
|
||||
}
|
||||
|
||||
func (c *Tree) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*api.NodeVersion, error) {
|
||||
meta := []string{oidKV, isDeleteMarkerKV}
|
||||
nodes, err := c.GetVersions(ctx, cnrID, objectName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
latestNode, err := getLatestVersionNode(nodes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return newNodeVersion(latestNode)
|
||||
}
|
||||
|
||||
func (c *Tree) GetVersions(ctx context.Context, cnrID *cid.ID, objectName string) ([]NodeResponse, error) {
|
||||
meta := []string{oidKV, isDeleteMarkerKV, sizeKV}
|
||||
path := pathFromName(objectName)
|
||||
|
||||
p := &GetNodesParams{
|
||||
|
@ -139,30 +161,24 @@ func (c *Tree) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName s
|
|||
LatestOnly: false,
|
||||
AllAttrs: false,
|
||||
}
|
||||
nodes, err := c.service.GetNodes(ctx, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
latestNode, err := getLatestNode(nodes)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return newNodeVersion(latestNode)
|
||||
return c.service.GetNodes(ctx, p)
|
||||
}
|
||||
|
||||
func getLatestNode(nodes []NodeResponse) (NodeResponse, error) {
|
||||
func getLatestVersionNode(nodes []NodeResponse) (NodeResponse, error) {
|
||||
var (
|
||||
maxCreationTime uint64
|
||||
targetIndexNode = -1
|
||||
)
|
||||
|
||||
for i, node := range nodes {
|
||||
currentCreationTime := node.GetTimestamp()
|
||||
if checkExistOID(node.GetMeta()) && currentCreationTime > maxCreationTime {
|
||||
maxCreationTime = currentCreationTime
|
||||
if !checkExistOID(node.GetMeta()) {
|
||||
continue
|
||||
}
|
||||
|
||||
if currentCreationTime := getMaxTimestamp(node); currentCreationTime > maxCreationTime {
|
||||
targetIndexNode = i
|
||||
maxCreationTime = currentCreationTime
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -187,3 +203,145 @@ func checkExistOID(meta []Meta) bool {
|
|||
func pathFromName(objectName string) []string {
|
||||
return strings.Split(objectName, separator)
|
||||
}
|
||||
|
||||
func (c *Tree) GetSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) ([]NodeResponse, string, error) {
|
||||
rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, versionTree, prefix)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
subTree, err := c.service.GetSubTree(ctx, bktInfo, versionTree, rootID, 2, false)
|
||||
if err != nil {
|
||||
if errors.Is(err, layer.ErrNodeNotFound) {
|
||||
return nil, "", nil
|
||||
}
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
nodesMap := make(map[string][]NodeResponse, len(subTree))
|
||||
for _, node := range subTree {
|
||||
if MultiID(rootID).Equal(node.GetNodeID()) {
|
||||
continue
|
||||
}
|
||||
|
||||
fileName := GetFilename(node)
|
||||
if !strings.HasPrefix(fileName, tailPrefix) {
|
||||
continue
|
||||
}
|
||||
|
||||
nodes := nodesMap[fileName]
|
||||
|
||||
// Add all nodes if flag latestOnly is false.
|
||||
// Add all intermediate nodes
|
||||
// and only latest leaf (object) nodes. To do this store and replace last leaf (object) node in nodes[0]
|
||||
if len(nodes) == 0 {
|
||||
nodes = []NodeResponse{node}
|
||||
} else if !latestOnly || isIntermediate(node) {
|
||||
nodes = append(nodes, node)
|
||||
} else if isIntermediate(nodes[0]) {
|
||||
nodes = append([]NodeResponse{node}, nodes...)
|
||||
} else if getMaxTimestamp(node) > getMaxTimestamp(nodes[0]) {
|
||||
nodes[0] = node
|
||||
}
|
||||
|
||||
nodesMap[fileName] = nodes
|
||||
}
|
||||
|
||||
result := make([]NodeResponse, 0, len(subTree))
|
||||
for _, nodes := range nodesMap {
|
||||
result = append(result, nodes...)
|
||||
}
|
||||
|
||||
return result, strings.TrimSuffix(prefix, tailPrefix), nil
|
||||
}
|
||||
|
||||
func (c *Tree) determinePrefixNode(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) ([]uint64, string, error) {
|
||||
rootID := []uint64{0}
|
||||
path := strings.Split(prefix, separator)
|
||||
tailPrefix := path[len(path)-1]
|
||||
|
||||
if len(path) > 1 {
|
||||
var err error
|
||||
rootID, err = c.getPrefixNodeID(ctx, bktInfo, treeID, path[:len(path)-1])
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
}
|
||||
|
||||
return rootID, tailPrefix, nil
|
||||
}
|
||||
|
||||
func (c *Tree) getPrefixNodeID(ctx context.Context, bktInfo *data.BucketInfo, treeID string, prefixPath []string) ([]uint64, error) {
|
||||
p := &GetNodesParams{
|
||||
CnrID: bktInfo.CID,
|
||||
BktInfo: bktInfo,
|
||||
TreeID: treeID,
|
||||
Path: prefixPath,
|
||||
LatestOnly: false,
|
||||
AllAttrs: true,
|
||||
}
|
||||
nodes, err := c.service.GetNodes(ctx, p)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var intermediateNodes []uint64
|
||||
for _, node := range nodes {
|
||||
if isIntermediate(node) {
|
||||
intermediateNodes = append(intermediateNodes, node.GetNodeID()...)
|
||||
}
|
||||
}
|
||||
|
||||
if len(intermediateNodes) == 0 {
|
||||
return nil, layer.ErrNodeNotFound
|
||||
}
|
||||
|
||||
return intermediateNodes, nil
|
||||
}
|
||||
|
||||
func GetFilename(node NodeResponse) string {
|
||||
for _, kv := range node.GetMeta() {
|
||||
if kv.GetKey() == FileNameKey {
|
||||
return string(kv.GetValue())
|
||||
}
|
||||
}
|
||||
|
||||
return ""
|
||||
}
|
||||
|
||||
func isIntermediate(node NodeResponse) bool {
|
||||
if len(node.GetMeta()) != 1 {
|
||||
return false
|
||||
}
|
||||
|
||||
return node.GetMeta()[0].GetKey() == FileNameKey
|
||||
}
|
||||
|
||||
func getMaxTimestamp(node NodeResponse) uint64 {
|
||||
var maxTimestamp uint64
|
||||
|
||||
for _, timestamp := range node.GetTimestamp() {
|
||||
if timestamp > maxTimestamp {
|
||||
maxTimestamp = timestamp
|
||||
}
|
||||
}
|
||||
|
||||
return maxTimestamp
|
||||
}
|
||||
|
||||
type MultiID []uint64
|
||||
|
||||
func (m MultiID) Equal(id MultiID) bool {
|
||||
seen := make(map[uint64]struct{}, len(m))
|
||||
|
||||
for i := range m {
|
||||
seen[m[i]] = struct{}{}
|
||||
}
|
||||
|
||||
for i := range id {
|
||||
if _, ok := seen[id[i]]; !ok {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
|
|
@ -24,8 +24,8 @@ type nodeResponse struct {
|
|||
timestamp uint64
|
||||
}
|
||||
|
||||
func (n nodeResponse) GetTimestamp() uint64 {
|
||||
return n.timestamp
|
||||
func (n nodeResponse) GetTimestamp() []uint64 {
|
||||
return []uint64{n.timestamp}
|
||||
}
|
||||
|
||||
func (n nodeResponse) GetMeta() []Meta {
|
||||
|
@ -36,6 +36,13 @@ func (n nodeResponse) GetMeta() []Meta {
|
|||
return res
|
||||
}
|
||||
|
||||
func (n nodeResponse) GetNodeID() []uint64 {
|
||||
return nil
|
||||
}
|
||||
func (n nodeResponse) GetParentID() []uint64 {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestGetLatestNode(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
|
@ -130,7 +137,7 @@ func TestGetLatestNode(t *testing.T) {
|
|||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
actualNode, err := getLatestNode(tc.nodes)
|
||||
actualNode, err := getLatestVersionNode(tc.nodes)
|
||||
if tc.error {
|
||||
require.Error(t, err)
|
||||
return
|
||||
|
|
Loading…
Reference in a new issue