forked from TrueCloudLab/frostfs-http-gw
Compare commits
24 commits
feature/17
...
master
Author | SHA1 | Date | |
---|---|---|---|
8362cd696e | |||
8de06e23a0 | |||
a6fdaf9456 | |||
526da379ad | |||
87ace4f8f7 | |||
36bd3e2d43 | |||
1e897aa3c3 | |||
1e7309684b | |||
7901d00924 | |||
a7617514d3 | |||
856e0ecf40 | |||
1e82f64dfd | |||
4b782cf124 | |||
|
f0c999d9a2 | ||
1db62f9d95 | |||
e1b670a727 | |||
9551f34f00 | |||
a4e3767d4b | |||
d32ac4b537 | |||
a658f3adc0 | |||
a945a947ac | |||
1be92fa4be | |||
dc100f03a6 | |||
bbc7c7367d |
46 changed files with 2176 additions and 1637 deletions
27
.forgejo/workflows/oci-image.yml
Normal file
27
.forgejo/workflows/oci-image.yml
Normal file
|
@ -0,0 +1,27 @@
|
|||
on:
|
||||
pull_request:
|
||||
push:
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
image:
|
||||
name: OCI image
|
||||
runs-on: docker
|
||||
container: git.frostfs.info/truecloudlab/env:oci-image-builder-bookworm
|
||||
steps:
|
||||
- name: Clone git repo
|
||||
uses: actions/checkout@v3
|
||||
|
||||
- name: Build OCI image
|
||||
run: make image
|
||||
|
||||
- name: Push image to OCI registry
|
||||
run: |
|
||||
echo "$REGISTRY_PASSWORD" \
|
||||
| docker login --username truecloudlab --password-stdin git.frostfs.info
|
||||
make image-push
|
||||
if: >-
|
||||
startsWith(github.ref, 'refs/tags/v') &&
|
||||
(github.event_name == 'workflow_dispatch' || github.event_name == 'push')
|
||||
env:
|
||||
REGISTRY_PASSWORD: ${{secrets.FORGEJO_OCI_REGISTRY_PUSH_TOKEN}}
|
|
@ -43,3 +43,19 @@ jobs:
|
|||
|
||||
- name: Run tests
|
||||
run: make test
|
||||
|
||||
integration:
|
||||
name: Integration tests
|
||||
runs-on: oci-runner
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: '1.23'
|
||||
|
||||
- name: Run integration tests
|
||||
run: |-
|
||||
podman-service.sh
|
||||
make integration-test
|
||||
|
|
|
@ -16,7 +16,7 @@ jobs:
|
|||
- name: Setup Go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: '1.22'
|
||||
go-version: '1.22.11'
|
||||
|
||||
- name: Install govulncheck
|
||||
run: go install golang.org/x/vuln/cmd/govulncheck@latest
|
||||
|
|
25
CHANGELOG.md
25
CHANGELOG.md
|
@ -4,6 +4,27 @@ This document outlines major changes between releases.
|
|||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
- Add handling quota limit reached error (#187)
|
||||
|
||||
## [0.32.1] - 2025-01-27
|
||||
|
||||
### Fixed
|
||||
- SIGHUP panic (#198)
|
||||
|
||||
## [0.32.0] - Khumbu - 2024-12-20
|
||||
|
||||
### Fixed
|
||||
- Getting S3 object with FrostFS Object ID-like key (#166)
|
||||
- Ignore delete marked objects in versioned bucket in index page (#181)
|
||||
|
||||
### Added
|
||||
- Metric of dropped logs by log sampler (#150)
|
||||
- Fallback FileName attribute search during FilePath attribute search (#174)
|
||||
|
||||
### Changed
|
||||
- Updated tree service pool without api-go dependency (#178)
|
||||
|
||||
## [0.31.0] - Rongbuk - 2024-11-20
|
||||
|
||||
### Fixed
|
||||
|
@ -170,4 +191,6 @@ To see CHANGELOG for older versions, refer to https://github.com/nspcc-dev/neofs
|
|||
[0.30.2]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.30.1...v0.30.2
|
||||
[0.30.3]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.30.2...v0.30.3
|
||||
[0.31.0]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.30.3...v0.31.0
|
||||
[Unreleased]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.31.0...master
|
||||
[0.32.0]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.31.0...v0.32.0
|
||||
[0.32.1]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.32.0...v0.32.1
|
||||
[Unreleased]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.32.1...master
|
|
@ -1 +1,3 @@
|
|||
.* @alexvanin @dkirillov
|
||||
.* @TrueCloudLab/storage-services-developers @TrueCloudLab/storage-services-committers
|
||||
.forgejo/.* @potyarkin
|
||||
Makefile @potyarkin
|
||||
|
|
2
Makefile
2
Makefile
|
@ -7,7 +7,7 @@ LINT_VERSION ?= 1.60.3
|
|||
TRUECLOUDLAB_LINT_VERSION ?= 0.0.6
|
||||
BUILD ?= $(shell date -u --iso=seconds)
|
||||
|
||||
HUB_IMAGE ?= truecloudlab/frostfs-http-gw
|
||||
HUB_IMAGE ?= git.frostfs.info/truecloudlab/frostfs-http-gw
|
||||
HUB_TAG ?= "$(shell echo ${VERSION} | sed 's/^v//')"
|
||||
|
||||
METRICS_DUMP_OUT ?= ./metrics-dump.json
|
||||
|
|
141
README.md
141
README.md
|
@ -38,7 +38,7 @@ version Show current version
|
|||
```
|
||||
|
||||
Or you can also use a [Docker
|
||||
image](https://hub.docker.com/r/truecloudlab/frostfs-http-gw) provided for the released
|
||||
image](https://git.frostfs.info/TrueCloudLab/-/packages/container/frostfs-http-gw) provided for the released
|
||||
(and occasionally unreleased) versions of the gateway (`:latest` points to the
|
||||
latest stable release).
|
||||
|
||||
|
@ -217,41 +217,8 @@ Also, in case of downloading, you need to have a file inside a container.
|
|||
### NNS
|
||||
|
||||
In all download/upload routes you can use container name instead of its id (`$CID`).
|
||||
Read more about it in [docs/nns.md](./docs/nns.md).
|
||||
|
||||
Steps to start using name resolving:
|
||||
|
||||
1. Enable NNS resolving in config (`rpc_endpoint` must be a valid neo rpc node, see [configs](./config) for other examples):
|
||||
|
||||
```yaml
|
||||
rpc_endpoint: http://morph-chain.frostfs.devenv:30333
|
||||
resolve_order:
|
||||
- nns
|
||||
```
|
||||
|
||||
2. Make sure your container is registered in NNS contract. If you use [frostfs-dev-env](https://git.frostfs.info/TrueCloudLab/frostfs-dev-env)
|
||||
you can check if your container (e.g. with `container-name` name) is registered in NNS:
|
||||
|
||||
```shell
|
||||
$ curl -s --data '{"id":1,"jsonrpc":"2.0","method":"getcontractstate","params":[1]}' \
|
||||
http://morph-chain.frostfs.devenv:30333 | jq -r '.result.hash'
|
||||
|
||||
0x8e6c3cd4b976b28e84a3788f6ea9e2676c15d667
|
||||
|
||||
$ docker exec -it morph_chain neo-go \
|
||||
contract testinvokefunction \
|
||||
-r http://morph-chain.frostfs.devenv:30333 0x8e6c3cd4b976b28e84a3788f6ea9e2676c15d667 \
|
||||
resolve string:container-name.container int:16 \
|
||||
| jq -r '.stack[0].value | if type=="array" then .[0].value else . end' \
|
||||
| base64 -d && echo
|
||||
|
||||
7f3vvkw4iTiS5ZZbu5BQXEmJtETWbi3uUjLNaSs29xrL
|
||||
```
|
||||
|
||||
3. Use container name instead of its `$CID`. For example:
|
||||
|
||||
```shell
|
||||
$ curl http://localhost:8082/get_by_attribute/container-name/FileName/object-name
|
||||
```
|
||||
|
||||
#### Create a container
|
||||
|
||||
|
@ -462,109 +429,7 @@ object ID, like this:
|
|||
|
||||
#### Authentication
|
||||
|
||||
You can always upload files to public containers (open for anyone to put
|
||||
objects into), but for restricted containers you need to explicitly allow PUT
|
||||
operations for a request signed with your HTTP Gateway keys.
|
||||
|
||||
If you don't want to manage gateway's secret keys and adjust policies when
|
||||
gateway configuration changes (new gate, key rotation, etc) or you plan to use
|
||||
public services, there is an option to let your application backend (or you) to
|
||||
issue Bearer Tokens and pass them from the client via gate down to FrostFS level
|
||||
to grant access.
|
||||
|
||||
FrostFS Bearer Token basically is a container owner-signed policy (refer to FrostFS
|
||||
documentation for more details). There are two options to pass them to gateway:
|
||||
* "Authorization" header with "Bearer" type and base64-encoded token in
|
||||
credentials field
|
||||
* "Bearer" cookie with base64-encoded token contents
|
||||
|
||||
For example, you have a mobile application frontend with a backend part storing
|
||||
data in FrostFS. When a user authorizes in the mobile app, the backend issues a FrostFS
|
||||
Bearer token and provides it to the frontend. Then, the mobile app may generate
|
||||
some data and upload it via any available FrostFS HTTP Gateway by adding
|
||||
the corresponding header to the upload request. Accessing policy protected data
|
||||
works the same way.
|
||||
|
||||
##### Example
|
||||
In order to generate a bearer token, you need to have wallet (which will be used to sign the token)
|
||||
|
||||
1. Suppose you have a container with private policy for wallet key
|
||||
|
||||
```
|
||||
$ frostfs-cli container create -r <endpoint> --wallet <wallet> -policy <policy> --basic-acl 0 --await
|
||||
CID: 9dfzyvq82JnFqp5svxcREf2iy6XNuifYcJPusEDnGK9Z
|
||||
|
||||
$ frostfs-cli ape-manager add -r <endpoint> --wallet <wallet> \
|
||||
--target-type container --target-name 9dfzyvq82JnFqp5svxcREf2iy6XNuifYcJPusEDnGK9Z \
|
||||
--rule "allow Object.* RequestCondition:"\$Actor:publicKey"=03b09baabff3f6107c7e9acb8721a6fc5618d45b50247a314d82e548702cce8cd5 *" \
|
||||
--chain-id <chainID>
|
||||
```
|
||||
|
||||
|
||||
2. Form a Bearer token (10000 is lifetime expiration in epoch) to impersonate
|
||||
HTTP Gateway request as wallet signed request and save it to **bearer.json**:
|
||||
```
|
||||
{
|
||||
"body": {
|
||||
"allowImpersonate": true,
|
||||
"lifetime": {
|
||||
"exp": "10000",
|
||||
"nbf": "0",
|
||||
"iat": "0"
|
||||
}
|
||||
},
|
||||
"signature": null
|
||||
}
|
||||
```
|
||||
|
||||
3. Sign it with the wallet:
|
||||
```
|
||||
$ frostfs-cli util sign bearer-token --from bearer.json --to signed.json -w <wallet>
|
||||
```
|
||||
|
||||
4. Encode to base64 to use in header:
|
||||
```
|
||||
$ base64 -w 0 signed.json
|
||||
# output: Ck4KKgoECAIQBhIiCiCZGdlbN7DPGPMg9rsWqV+p2XdMzUqknRiexewSFp8kmBIbChk17MUri6OJ0X5ftsHzy7NERDNFB4C92PcaGgMIkE4SZgohAxpsb7vfAso1F0X6hrm6WpRS14WsT3/Ct1SMoqRsT89KEkEEGxKi8GjKSf52YqhppgaOTQHbUsL3jn7SHLqS3ndAQ7NtAATnmRHleZw2V2xRRSRBQdjDC05KK83LhdSax72Fsw==
|
||||
```
|
||||
|
||||
After that, the Bearer token can be used:
|
||||
|
||||
```
|
||||
$ curl -F 'file=@cat.jpeg;filename=cat.jpeg' -H "Authorization: Bearer Ck4KKgoECAIQBhIiCiCZGdlbN7DPGPMg9rsWqV+p2XdMzUqknRiexewSFp8kmBIbChk17MUri6OJ0X5ftsHzy7NERDNFB4C92PcaGgMIkE4SZgohAxpsb7vfAso1F0X6hrm6WpRS14WsT3/Ct1SMoqRsT89KEkEEGxKi8GjKSf52YqhppgaOTQHbUsL3jn7SHLqS3ndAQ7NtAATnmRHleZw2V2xRRSRBQdjDC05KK83LhdSax72Fsw==" \
|
||||
http://localhost:8082/upload/BJeErH9MWmf52VsR1mLWKkgF3pRm3FkubYxM7TZkBP4K
|
||||
# output:
|
||||
# {
|
||||
# "object_id": "DhfES9nVrFksxGDD2jQLunGADfrXExxNwqXbDafyBn9X",
|
||||
# "container_id": "BJeErH9MWmf52VsR1mLWKkgF3pRm3FkubYxM7TZkBP4K"
|
||||
# }
|
||||
```
|
||||
|
||||
##### Note: Bearer Token owner
|
||||
|
||||
You can specify exact key who can use Bearer Token (gateway wallet address).
|
||||
To do this, encode wallet address in base64 format
|
||||
|
||||
```
|
||||
$ echo 'NhVtreTTCoqsMQV5Wp55fqnriiUCpEaKm3' | base58 --decode | base64
|
||||
# output: NezFK4ujidF+X7bB88uzREQzRQeAvdj3Gg==
|
||||
```
|
||||
|
||||
Then specify this value in Bearer Token Json
|
||||
```
|
||||
{
|
||||
"body": {
|
||||
"ownerID": {
|
||||
"value": "NezFK4ujidF+X7bB88uzREQzRQeAvdj3Gg=="
|
||||
},
|
||||
...
|
||||
```
|
||||
|
||||
##### Note: Policy override
|
||||
|
||||
Instead of impersonation, you can define the set of policies that will be applied
|
||||
to the request sender. This allows to restrict access to specific operation and
|
||||
specific objects without giving full impersonation control to the token user.
|
||||
Read more about request authentication in [docs/authentication.md](./docs/authemtnication.md)
|
||||
|
||||
### Metrics and Pprof
|
||||
|
||||
|
|
2
VERSION
2
VERSION
|
@ -1 +1 @@
|
|||
v0.31.0
|
||||
v0.32.1
|
||||
|
|
|
@ -25,7 +25,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/templates"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
|
@ -57,7 +56,7 @@ type (
|
|||
treePool *treepool.Pool
|
||||
key *keys.PrivateKey
|
||||
owner *user.ID
|
||||
cfg *viper.Viper
|
||||
cfg *appCfg
|
||||
webServer *fasthttp.Server
|
||||
webDone chan struct{}
|
||||
resolver *resolver.ContainerResolver
|
||||
|
@ -65,6 +64,7 @@ type (
|
|||
services []*metrics.Service
|
||||
settings *appSettings
|
||||
loggerSettings *loggerSettings
|
||||
bucketCache *cache.BucketCache
|
||||
|
||||
servers []Server
|
||||
unbindServers []ServerInfo
|
||||
|
@ -95,21 +95,22 @@ type (
|
|||
dialerSource *internalnet.DialerSource
|
||||
workerPoolSize int
|
||||
|
||||
mu sync.RWMutex
|
||||
defaultTimestamp bool
|
||||
zipCompression bool
|
||||
clientCut bool
|
||||
returnIndexPage bool
|
||||
indexPageTemplate string
|
||||
bufferMaxSizeForPut uint64
|
||||
namespaceHeader string
|
||||
defaultNamespaces []string
|
||||
corsAllowOrigin string
|
||||
corsAllowMethods []string
|
||||
corsAllowHeaders []string
|
||||
corsExposeHeaders []string
|
||||
corsAllowCredentials bool
|
||||
corsMaxAge int
|
||||
mu sync.RWMutex
|
||||
defaultTimestamp bool
|
||||
archiveCompression bool
|
||||
clientCut bool
|
||||
returnIndexPage bool
|
||||
indexPageTemplate string
|
||||
bufferMaxSizeForPut uint64
|
||||
namespaceHeader string
|
||||
defaultNamespaces []string
|
||||
corsAllowOrigin string
|
||||
corsAllowMethods []string
|
||||
corsAllowHeaders []string
|
||||
corsExposeHeaders []string
|
||||
corsAllowCredentials bool
|
||||
corsMaxAge int
|
||||
enableFilepathFallback bool
|
||||
}
|
||||
|
||||
CORS struct {
|
||||
|
@ -122,35 +123,37 @@ type (
|
|||
}
|
||||
)
|
||||
|
||||
func newApp(ctx context.Context, v *viper.Viper) App {
|
||||
func newApp(ctx context.Context, cfg *appCfg) App {
|
||||
logSettings := &loggerSettings{}
|
||||
log := pickLogger(v, logSettings)
|
||||
log := pickLogger(cfg.config(), logSettings)
|
||||
|
||||
a := &app{
|
||||
ctx: ctx,
|
||||
log: log.logger,
|
||||
cfg: v,
|
||||
logLevel: log.lvl,
|
||||
cfg: cfg,
|
||||
loggerSettings: logSettings,
|
||||
webServer: new(fasthttp.Server),
|
||||
webDone: make(chan struct{}),
|
||||
bucketCache: cache.NewBucketCache(getBucketCacheOptions(cfg.config(), log.logger), cfg.config().GetBool(cfgFeaturesTreePoolNetmapSupport)),
|
||||
}
|
||||
|
||||
a.initAppSettings()
|
||||
|
||||
// -- setup FastHTTP server --
|
||||
a.webServer.Name = "frost-http-gw"
|
||||
a.webServer.ReadBufferSize = a.cfg.GetInt(cfgWebReadBufferSize)
|
||||
a.webServer.WriteBufferSize = a.cfg.GetInt(cfgWebWriteBufferSize)
|
||||
a.webServer.ReadTimeout = a.cfg.GetDuration(cfgWebReadTimeout)
|
||||
a.webServer.WriteTimeout = a.cfg.GetDuration(cfgWebWriteTimeout)
|
||||
a.webServer.ReadBufferSize = a.config().GetInt(cfgWebReadBufferSize)
|
||||
a.webServer.WriteBufferSize = a.config().GetInt(cfgWebWriteBufferSize)
|
||||
a.webServer.ReadTimeout = a.config().GetDuration(cfgWebReadTimeout)
|
||||
a.webServer.WriteTimeout = a.config().GetDuration(cfgWebWriteTimeout)
|
||||
a.webServer.DisableHeaderNamesNormalizing = true
|
||||
a.webServer.NoDefaultServerHeader = true
|
||||
a.webServer.NoDefaultContentType = true
|
||||
a.webServer.MaxRequestBodySize = a.cfg.GetInt(cfgWebMaxRequestBodySize)
|
||||
a.webServer.MaxRequestBodySize = a.config().GetInt(cfgWebMaxRequestBodySize)
|
||||
a.webServer.DisablePreParseMultipartForm = true
|
||||
a.webServer.StreamRequestBody = a.cfg.GetBool(cfgWebStreamRequestBody)
|
||||
a.webServer.StreamRequestBody = a.config().GetBool(cfgWebStreamRequestBody)
|
||||
// -- -- -- -- -- -- -- -- -- -- -- -- -- --
|
||||
a.pool, a.treePool, a.key = getPools(ctx, a.log, a.cfg, a.settings.dialerSource)
|
||||
a.initPools(ctx)
|
||||
|
||||
var owner user.ID
|
||||
user.IDFromKey(&owner, a.key.PrivateKey.PublicKey)
|
||||
|
@ -165,18 +168,22 @@ func newApp(ctx context.Context, v *viper.Viper) App {
|
|||
return a
|
||||
}
|
||||
|
||||
func (a *app) config() *viper.Viper {
|
||||
return a.cfg.config()
|
||||
}
|
||||
|
||||
func (a *app) initAppSettings() {
|
||||
a.settings = &appSettings{
|
||||
reconnectInterval: fetchReconnectInterval(a.cfg),
|
||||
dialerSource: getDialerSource(a.log, a.cfg),
|
||||
workerPoolSize: a.cfg.GetInt(cfgWorkerPoolSize),
|
||||
reconnectInterval: fetchReconnectInterval(a.config()),
|
||||
dialerSource: getDialerSource(a.log, a.config()),
|
||||
workerPoolSize: a.config().GetInt(cfgWorkerPoolSize),
|
||||
}
|
||||
a.settings.update(a.cfg, a.log)
|
||||
a.settings.update(a.config(), a.log)
|
||||
}
|
||||
|
||||
func (s *appSettings) update(v *viper.Viper, l *zap.Logger) {
|
||||
defaultTimestamp := v.GetBool(cfgUploaderHeaderEnableDefaultTimestamp)
|
||||
zipCompression := v.GetBool(cfgZipCompression)
|
||||
archiveCompression := fetchArchiveCompression(v)
|
||||
returnIndexPage := v.GetBool(cfgIndexPageEnabled)
|
||||
clientCut := v.GetBool(cfgClientCut)
|
||||
bufferMaxSizeForPut := v.GetUint64(cfgBufferMaxSizeForPut)
|
||||
|
@ -189,12 +196,13 @@ func (s *appSettings) update(v *viper.Viper, l *zap.Logger) {
|
|||
corsExposeHeaders := v.GetStringSlice(cfgCORSExposeHeaders)
|
||||
corsAllowCredentials := v.GetBool(cfgCORSAllowCredentials)
|
||||
corsMaxAge := fetchCORSMaxAge(v)
|
||||
enableFilepathFallback := v.GetBool(cfgFeaturesEnableFilepathFallback)
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.defaultTimestamp = defaultTimestamp
|
||||
s.zipCompression = zipCompression
|
||||
s.archiveCompression = archiveCompression
|
||||
s.returnIndexPage = returnIndexPage
|
||||
s.clientCut = clientCut
|
||||
s.bufferMaxSizeForPut = bufferMaxSizeForPut
|
||||
|
@ -208,6 +216,7 @@ func (s *appSettings) update(v *viper.Viper, l *zap.Logger) {
|
|||
s.corsExposeHeaders = corsExposeHeaders
|
||||
s.corsAllowCredentials = corsAllowCredentials
|
||||
s.corsMaxAge = corsMaxAge
|
||||
s.enableFilepathFallback = enableFilepathFallback
|
||||
}
|
||||
|
||||
func (s *loggerSettings) DroppedLogsInc() {
|
||||
|
@ -232,10 +241,10 @@ func (s *appSettings) DefaultTimestamp() bool {
|
|||
return s.defaultTimestamp
|
||||
}
|
||||
|
||||
func (s *appSettings) ZipCompression() bool {
|
||||
func (s *appSettings) ArchiveCompression() bool {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.zipCompression
|
||||
return s.archiveCompression
|
||||
}
|
||||
|
||||
func (s *appSettings) IndexPageEnabled() bool {
|
||||
|
@ -305,6 +314,12 @@ func (s *appSettings) FormContainerZone(ns string) (zone string, isDefault bool)
|
|||
return ns + ".ns", false
|
||||
}
|
||||
|
||||
func (s *appSettings) EnableFilepathFallback() bool {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.enableFilepathFallback
|
||||
}
|
||||
|
||||
func (a *app) initResolver() {
|
||||
var err error
|
||||
a.resolver, err = resolver.NewContainerResolver(a.getResolverConfig())
|
||||
|
@ -316,11 +331,11 @@ func (a *app) initResolver() {
|
|||
func (a *app) getResolverConfig() ([]string, *resolver.Config) {
|
||||
resolveCfg := &resolver.Config{
|
||||
FrostFS: frostfs.NewResolverFrostFS(a.pool),
|
||||
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
|
||||
RPCAddress: a.config().GetString(cfgRPCEndpoint),
|
||||
Settings: a.settings,
|
||||
}
|
||||
|
||||
order := a.cfg.GetStringSlice(cfgResolveOrder)
|
||||
order := a.config().GetStringSlice(cfgResolveOrder)
|
||||
if resolveCfg.RPCAddress == "" {
|
||||
order = remove(order, resolver.NNSResolver)
|
||||
a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided)
|
||||
|
@ -335,7 +350,7 @@ func (a *app) getResolverConfig() ([]string, *resolver.Config) {
|
|||
|
||||
func (a *app) initMetrics() {
|
||||
gateMetricsProvider := metrics.NewGateMetrics(a.pool)
|
||||
a.metrics = newGateMetrics(a.log, gateMetricsProvider, a.cfg.GetBool(cfgPrometheusEnabled))
|
||||
a.metrics = newGateMetrics(a.log, gateMetricsProvider, a.config().GetBool(cfgPrometheusEnabled))
|
||||
a.metrics.SetHealth(metrics.HealthStatusStarting)
|
||||
a.loggerSettings.setMetrics(a.metrics.provider)
|
||||
}
|
||||
|
@ -499,10 +514,10 @@ func (a *app) Serve() {
|
|||
close(a.webDone)
|
||||
}()
|
||||
|
||||
handler := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool)), workerPool)
|
||||
handle := handler.New(a.AppParams(), a.settings, tree.NewTree(frostfs.NewPoolWrapper(a.treePool)), workerPool)
|
||||
|
||||
// Configure router.
|
||||
a.configureRouter(handler)
|
||||
a.configureRouter(handle)
|
||||
|
||||
a.startServices()
|
||||
a.initServers(a.ctx)
|
||||
|
@ -563,22 +578,22 @@ func (a *app) shutdownTracing() {
|
|||
|
||||
func (a *app) configReload(ctx context.Context) {
|
||||
a.log.Info(logs.SIGHUPConfigReloadStarted)
|
||||
if !a.cfg.IsSet(cmdConfig) && !a.cfg.IsSet(cmdConfigDir) {
|
||||
if !a.config().IsSet(cmdConfig) && !a.config().IsSet(cmdConfigDir) {
|
||||
a.log.Warn(logs.FailedToReloadConfigBecauseItsMissed)
|
||||
return
|
||||
}
|
||||
if err := readInConfig(a.cfg); err != nil {
|
||||
if err := a.cfg.reload(); err != nil {
|
||||
a.log.Warn(logs.FailedToReloadConfig, zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if lvl, err := getLogLevel(a.cfg); err != nil {
|
||||
if lvl, err := getLogLevel(a.config()); err != nil {
|
||||
a.log.Warn(logs.LogLevelWontBeUpdated, zap.Error(err))
|
||||
} else {
|
||||
a.logLevel.SetLevel(lvl)
|
||||
}
|
||||
|
||||
if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.cfg, a.log)); err != nil {
|
||||
if err := a.settings.dialerSource.Update(fetchMultinetConfig(a.config(), a.log)); err != nil {
|
||||
a.log.Warn(logs.MultinetConfigWontBeUpdated, zap.Error(err))
|
||||
}
|
||||
|
||||
|
@ -595,9 +610,9 @@ func (a *app) configReload(ctx context.Context) {
|
|||
a.stopServices()
|
||||
a.startServices()
|
||||
|
||||
a.settings.update(a.cfg, a.log)
|
||||
a.settings.update(a.config(), a.log)
|
||||
|
||||
a.metrics.SetEnabled(a.cfg.GetBool(cfgPrometheusEnabled))
|
||||
a.metrics.SetEnabled(a.config().GetBool(cfgPrometheusEnabled))
|
||||
a.initTracing(ctx)
|
||||
a.setHealthStatus()
|
||||
|
||||
|
@ -605,12 +620,14 @@ func (a *app) configReload(ctx context.Context) {
|
|||
}
|
||||
|
||||
func (a *app) startServices() {
|
||||
pprofConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPprofEnabled), Address: a.cfg.GetString(cfgPprofAddress)}
|
||||
a.services = a.services[:0]
|
||||
|
||||
pprofConfig := metrics.Config{Enabled: a.config().GetBool(cfgPprofEnabled), Address: a.config().GetString(cfgPprofAddress)}
|
||||
pprofService := metrics.NewPprofService(a.log, pprofConfig)
|
||||
a.services = append(a.services, pprofService)
|
||||
go pprofService.Start()
|
||||
|
||||
prometheusConfig := metrics.Config{Enabled: a.cfg.GetBool(cfgPrometheusEnabled), Address: a.cfg.GetString(cfgPrometheusAddress)}
|
||||
prometheusConfig := metrics.Config{Enabled: a.config().GetBool(cfgPrometheusEnabled), Address: a.config().GetString(cfgPrometheusAddress)}
|
||||
prometheusService := metrics.NewPrometheusService(a.log, prometheusConfig)
|
||||
a.services = append(a.services, prometheusService)
|
||||
go prometheusService.Start()
|
||||
|
@ -625,29 +642,31 @@ func (a *app) stopServices() {
|
|||
}
|
||||
}
|
||||
|
||||
func (a *app) configureRouter(handler *handler.Handler) {
|
||||
func (a *app) configureRouter(h *handler.Handler) {
|
||||
r := router.New()
|
||||
r.RedirectTrailingSlash = true
|
||||
r.NotFound = func(r *fasthttp.RequestCtx) {
|
||||
response.Error(r, "Not found", fasthttp.StatusNotFound)
|
||||
handler.ResponseError(r, "Not found", fasthttp.StatusNotFound)
|
||||
}
|
||||
r.MethodNotAllowed = func(r *fasthttp.RequestCtx) {
|
||||
response.Error(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)
|
||||
handler.ResponseError(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)
|
||||
}
|
||||
|
||||
r.POST("/upload/{cid}", a.addMiddlewares(handler.Upload))
|
||||
r.POST("/upload/{cid}", a.addMiddlewares(h.Upload))
|
||||
r.OPTIONS("/upload/{cid}", a.addPreflight())
|
||||
a.log.Info(logs.AddedPathUploadCid)
|
||||
r.GET("/get/{cid}/{oid:*}", a.addMiddlewares(handler.DownloadByAddressOrBucketName))
|
||||
r.HEAD("/get/{cid}/{oid:*}", a.addMiddlewares(handler.HeadByAddressOrBucketName))
|
||||
r.GET("/get/{cid}/{oid:*}", a.addMiddlewares(h.DownloadByAddressOrBucketName))
|
||||
r.HEAD("/get/{cid}/{oid:*}", a.addMiddlewares(h.HeadByAddressOrBucketName))
|
||||
r.OPTIONS("/get/{cid}/{oid:*}", a.addPreflight())
|
||||
a.log.Info(logs.AddedPathGetCidOid)
|
||||
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(handler.DownloadByAttribute))
|
||||
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(handler.HeadByAttribute))
|
||||
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(h.DownloadByAttribute))
|
||||
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(h.HeadByAttribute))
|
||||
r.OPTIONS("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addPreflight())
|
||||
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal)
|
||||
r.GET("/zip/{cid}/{prefix:*}", a.addMiddlewares(handler.DownloadZipped))
|
||||
r.GET("/zip/{cid}/{prefix:*}", a.addMiddlewares(h.DownloadZip))
|
||||
r.OPTIONS("/zip/{cid}/{prefix:*}", a.addPreflight())
|
||||
r.GET("/tar/{cid}/{prefix:*}", a.addMiddlewares(h.DownloadTar))
|
||||
r.OPTIONS("/tar/{cid}/{prefix:*}", a.addPreflight())
|
||||
a.log.Info(logs.AddedPathZipCidPrefix)
|
||||
|
||||
a.webServer.Handler = r.Handler
|
||||
|
@ -789,7 +808,7 @@ func (a *app) tokenizer(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
|||
log := utils.GetReqLogOrDefault(reqCtx, a.log)
|
||||
|
||||
log.Error(logs.CouldNotFetchAndStoreBearerToken, zap.Error(err))
|
||||
response.Error(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
handler.ResponseError(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
utils.SetContextToRequest(appCtx, req)
|
||||
|
@ -830,12 +849,12 @@ func (a *app) AppParams() *handler.AppParams {
|
|||
FrostFS: frostfs.NewFrostFS(a.pool),
|
||||
Owner: a.owner,
|
||||
Resolver: a.resolver,
|
||||
Cache: cache.NewBucketCache(getCacheOptions(a.cfg, a.log)),
|
||||
Cache: a.bucketCache,
|
||||
}
|
||||
}
|
||||
|
||||
func (a *app) initServers(ctx context.Context) {
|
||||
serversInfo := fetchServers(a.cfg, a.log)
|
||||
serversInfo := fetchServers(a.config(), a.log)
|
||||
|
||||
a.servers = make([]Server, 0, len(serversInfo))
|
||||
for _, serverInfo := range serversInfo {
|
||||
|
@ -862,7 +881,7 @@ func (a *app) initServers(ctx context.Context) {
|
|||
}
|
||||
|
||||
func (a *app) updateServers() error {
|
||||
serversInfo := fetchServers(a.cfg, a.log)
|
||||
serversInfo := fetchServers(a.config(), a.log)
|
||||
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
|
@ -920,15 +939,15 @@ func (a *app) initTracing(ctx context.Context) {
|
|||
instanceID = a.servers[0].Address()
|
||||
}
|
||||
cfg := tracing.Config{
|
||||
Enabled: a.cfg.GetBool(cfgTracingEnabled),
|
||||
Exporter: tracing.Exporter(a.cfg.GetString(cfgTracingExporter)),
|
||||
Endpoint: a.cfg.GetString(cfgTracingEndpoint),
|
||||
Enabled: a.config().GetBool(cfgTracingEnabled),
|
||||
Exporter: tracing.Exporter(a.config().GetString(cfgTracingExporter)),
|
||||
Endpoint: a.config().GetString(cfgTracingEndpoint),
|
||||
Service: "frostfs-http-gw",
|
||||
InstanceID: instanceID,
|
||||
Version: Version,
|
||||
}
|
||||
|
||||
if trustedCa := a.cfg.GetString(cfgTracingTrustedCa); trustedCa != "" {
|
||||
if trustedCa := a.config().GetString(cfgTracingTrustedCa); trustedCa != "" {
|
||||
caBytes, err := os.ReadFile(trustedCa)
|
||||
if err != nil {
|
||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
|
||||
|
@ -943,7 +962,7 @@ func (a *app) initTracing(ctx context.Context) {
|
|||
cfg.ServerCaCertPool = certPool
|
||||
}
|
||||
|
||||
attributes, err := fetchTracingAttributes(a.cfg)
|
||||
attributes, err := fetchTracingAttributes(a.config())
|
||||
if err != nil {
|
||||
a.log.Warn(logs.FailedToInitializeTracing, zap.Error(err))
|
||||
return
|
||||
|
@ -966,7 +985,7 @@ func (a *app) setRuntimeParameters() {
|
|||
return
|
||||
}
|
||||
|
||||
softMemoryLimit := fetchSoftMemoryLimit(a.cfg)
|
||||
softMemoryLimit := fetchSoftMemoryLimit(a.config())
|
||||
previous := debug.SetMemoryLimit(softMemoryLimit)
|
||||
if softMemoryLimit != previous {
|
||||
a.log.Info(logs.RuntimeSoftMemoryLimitUpdated,
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"net/http"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -28,9 +29,9 @@ import (
|
|||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
docker "github.com/docker/docker/api/types/container"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/nspcc-dev/neo-go/pkg/wallet"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/testcontainers/testcontainers-go"
|
||||
"github.com/testcontainers/testcontainers-go/wait"
|
||||
|
@ -49,11 +50,12 @@ const (
|
|||
|
||||
func TestIntegration(t *testing.T) {
|
||||
rootCtx := context.Background()
|
||||
aioImage := "truecloudlab/frostfs-aio:"
|
||||
aioImage := "git.frostfs.info/truecloudlab/frostfs-aio:"
|
||||
versions := []string{
|
||||
"1.2.7",
|
||||
"1.3.0",
|
||||
"1.5.0",
|
||||
"1.6.5",
|
||||
}
|
||||
key, err := keys.NewPrivateKeyFromHex("1dd37fba80fec4e6a6f13fd708d8dcb3b29def768017052f6c930fa1c5d90bbb")
|
||||
require.NoError(t, err)
|
||||
|
@ -70,21 +72,28 @@ func TestIntegration(t *testing.T) {
|
|||
ctx, cancel2 := context.WithCancel(rootCtx)
|
||||
|
||||
aioContainer := createDockerContainer(ctx, t, aioImage+version)
|
||||
if strings.HasPrefix(version, "1.6") {
|
||||
registerUser(t, ctx, aioContainer, file.Name())
|
||||
}
|
||||
|
||||
// See the logs from the command execution.
|
||||
server, cancel := runServer(file.Name())
|
||||
clientPool := getPool(ctx, t, key)
|
||||
CID, err := createContainer(ctx, t, clientPool, ownerID, version)
|
||||
CID, err := createContainer(ctx, t, clientPool, ownerID)
|
||||
require.NoError(t, err, version)
|
||||
|
||||
token := makeBearerToken(t, key, ownerID, version)
|
||||
jsonToken, binaryToken := makeBearerTokens(t, key, ownerID, version)
|
||||
|
||||
t.Run("simple put "+version, func(t *testing.T) { simplePut(ctx, t, clientPool, CID, version) })
|
||||
t.Run("put with bearer token in header"+version, func(t *testing.T) { putWithBearerTokenInHeader(ctx, t, clientPool, CID, token) })
|
||||
t.Run("put with bearer token in cookie"+version, func(t *testing.T) { putWithBearerTokenInCookie(ctx, t, clientPool, CID, token) })
|
||||
t.Run("simple put "+version, func(t *testing.T) { simplePut(ctx, t, clientPool, CID) })
|
||||
t.Run("put with json bearer token in header"+version, func(t *testing.T) { putWithBearerTokenInHeader(ctx, t, clientPool, CID, jsonToken) })
|
||||
t.Run("put with json bearer token in cookie"+version, func(t *testing.T) { putWithBearerTokenInCookie(ctx, t, clientPool, CID, jsonToken) })
|
||||
t.Run("put with binary bearer token in header"+version, func(t *testing.T) { putWithBearerTokenInHeader(ctx, t, clientPool, CID, binaryToken) })
|
||||
t.Run("put with binary bearer token in cookie"+version, func(t *testing.T) { putWithBearerTokenInCookie(ctx, t, clientPool, CID, binaryToken) })
|
||||
t.Run("put with duplicate keys "+version, func(t *testing.T) { putWithDuplicateKeys(t, CID) })
|
||||
t.Run("simple get "+version, func(t *testing.T) { simpleGet(ctx, t, clientPool, ownerID, CID, version) })
|
||||
t.Run("get by attribute "+version, func(t *testing.T) { getByAttr(ctx, t, clientPool, ownerID, CID, version) })
|
||||
t.Run("get zip "+version, func(t *testing.T) { getZip(ctx, t, clientPool, ownerID, CID, version) })
|
||||
t.Run("test namespaces "+version, func(t *testing.T) { checkNamespaces(ctx, t, clientPool, ownerID, CID, version) })
|
||||
t.Run("simple get "+version, func(t *testing.T) { simpleGet(ctx, t, clientPool, ownerID, CID) })
|
||||
t.Run("get by attribute "+version, func(t *testing.T) { getByAttr(ctx, t, clientPool, ownerID, CID) })
|
||||
t.Run("get zip "+version, func(t *testing.T) { getZip(ctx, t, clientPool, ownerID, CID) })
|
||||
t.Run("test namespaces "+version, func(t *testing.T) { checkNamespaces(ctx, t, clientPool, ownerID, CID) })
|
||||
|
||||
cancel()
|
||||
server.Wait()
|
||||
|
@ -98,8 +107,8 @@ func runServer(pathToWallet string) (App, context.CancelFunc) {
|
|||
cancelCtx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
v := getDefaultConfig()
|
||||
v.Set(cfgWalletPath, pathToWallet)
|
||||
v.Set(cfgWalletPassphrase, "")
|
||||
v.config().Set(cfgWalletPath, pathToWallet)
|
||||
v.config().Set(cfgWalletPassphrase, "")
|
||||
|
||||
application := newApp(cancelCtx, v)
|
||||
go application.Serve()
|
||||
|
@ -107,7 +116,7 @@ func runServer(pathToWallet string) (App, context.CancelFunc) {
|
|||
return application, cancel
|
||||
}
|
||||
|
||||
func simplePut(ctx context.Context, t *testing.T, p *pool.Pool, CID cid.ID, version string) {
|
||||
func simplePut(ctx context.Context, t *testing.T, p *pool.Pool, CID cid.ID) {
|
||||
url := testHost + "/upload/" + CID.String()
|
||||
makePutRequestAndCheck(ctx, t, p, CID, url)
|
||||
|
||||
|
@ -255,7 +264,7 @@ func putWithDuplicateKeys(t *testing.T, CID cid.ID) {
|
|||
require.Equal(t, http.StatusBadRequest, resp.StatusCode)
|
||||
}
|
||||
|
||||
func simpleGet(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, CID cid.ID, version string) {
|
||||
func simpleGet(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, CID cid.ID) {
|
||||
content := "content of file"
|
||||
attributes := map[string]string{
|
||||
"some-attr": "some-get-value",
|
||||
|
@ -302,7 +311,7 @@ func checkGetByAttrResponse(t *testing.T, resp *http.Response, content string, a
|
|||
}
|
||||
}
|
||||
|
||||
func getByAttr(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, CID cid.ID, version string) {
|
||||
func getByAttr(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, CID cid.ID) {
|
||||
keyAttr, valAttr := "some-attr", "some-get-by-attr-value"
|
||||
content := "content of file"
|
||||
attributes := map[string]string{keyAttr: valAttr}
|
||||
|
@ -324,7 +333,7 @@ func getByAttr(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID
|
|||
checkGetByAttrResponse(t, resp, content, expectedAttr)
|
||||
}
|
||||
|
||||
func getZip(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, CID cid.ID, version string) {
|
||||
func getZip(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, CID cid.ID) {
|
||||
names := []string{"zipfolder/dir/name1.txt", "zipfolder/name2.txt"}
|
||||
contents := []string{"content of file1", "content of file2"}
|
||||
attributes1 := map[string]string{object.AttributeFilePath: names[0]}
|
||||
|
@ -389,7 +398,7 @@ func checkZip(t *testing.T, data []byte, length int64, names, contents []string)
|
|||
}
|
||||
}
|
||||
|
||||
func checkNamespaces(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, CID cid.ID, version string) {
|
||||
func checkNamespaces(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, CID cid.ID) {
|
||||
content := "content of file"
|
||||
attributes := map[string]string{
|
||||
"some-attr": "some-get-value",
|
||||
|
@ -425,11 +434,13 @@ func checkNamespaces(ctx context.Context, t *testing.T, clientPool *pool.Pool, o
|
|||
|
||||
func createDockerContainer(ctx context.Context, t *testing.T, image string) testcontainers.Container {
|
||||
req := testcontainers.ContainerRequest{
|
||||
Image: image,
|
||||
WaitingFor: wait.NewLogStrategy("aio container started").WithStartupTimeout(30 * time.Second),
|
||||
Name: "aio",
|
||||
Hostname: "aio",
|
||||
NetworkMode: "host",
|
||||
Image: image,
|
||||
WaitingFor: wait.NewLogStrategy("aio container started").WithStartupTimeout(2 * time.Minute),
|
||||
Name: "aio",
|
||||
Hostname: "aio",
|
||||
HostConfigModifier: func(hc *docker.HostConfig) {
|
||||
hc.NetworkMode = "host"
|
||||
},
|
||||
}
|
||||
aioC, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
|
||||
ContainerRequest: req,
|
||||
|
@ -440,14 +451,14 @@ func createDockerContainer(ctx context.Context, t *testing.T, image string) test
|
|||
return aioC
|
||||
}
|
||||
|
||||
func getDefaultConfig() *viper.Viper {
|
||||
func getDefaultConfig() *appCfg {
|
||||
v := settings()
|
||||
v.SetDefault(cfgPeers+".0.address", "localhost:8080")
|
||||
v.SetDefault(cfgPeers+".0.weight", 1)
|
||||
v.SetDefault(cfgPeers+".0.priority", 1)
|
||||
v.config().SetDefault(cfgPeers+".0.address", "localhost:8080")
|
||||
v.config().SetDefault(cfgPeers+".0.weight", 1)
|
||||
v.config().SetDefault(cfgPeers+".0.priority", 1)
|
||||
|
||||
v.SetDefault(cfgRPCEndpoint, "http://localhost:30333")
|
||||
v.SetDefault("server.0.address", testListenAddress)
|
||||
v.config().SetDefault(cfgRPCEndpoint, "http://localhost:30333")
|
||||
v.config().SetDefault("server.0.address", testListenAddress)
|
||||
|
||||
return v
|
||||
}
|
||||
|
@ -466,7 +477,7 @@ func getPool(ctx context.Context, t *testing.T, key *keys.PrivateKey) *pool.Pool
|
|||
return clientPool
|
||||
}
|
||||
|
||||
func createContainer(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, version string) (cid.ID, error) {
|
||||
func createContainer(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID) (cid.ID, error) {
|
||||
var policy netmap.PlacementPolicy
|
||||
err := policy.DecodeString("REP 1")
|
||||
require.NoError(t, err)
|
||||
|
@ -526,7 +537,19 @@ func putObject(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID
|
|||
return id.ObjectID
|
||||
}
|
||||
|
||||
func makeBearerToken(t *testing.T, key *keys.PrivateKey, ownerID user.ID, version string) string {
|
||||
func registerUser(t *testing.T, ctx context.Context, aioContainer testcontainers.Container, pathToWallet string) {
|
||||
err := aioContainer.CopyFileToContainer(ctx, pathToWallet, "/usr/wallet.json", 644)
|
||||
require.NoError(t, err)
|
||||
|
||||
_, _, err = aioContainer.Exec(ctx, []string{
|
||||
"/usr/bin/frostfs-s3-authmate", "register-user",
|
||||
"--wallet", "/usr/wallet.json",
|
||||
"--rpc-endpoint", "http://localhost:30333",
|
||||
"--contract-wallet", "/config/s3-gw-wallet.json"})
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func makeBearerTokens(t *testing.T, key *keys.PrivateKey, ownerID user.ID, version string) (jsonTokenBase64, binaryTokenBase64 string) {
|
||||
tkn := new(bearer.Token)
|
||||
tkn.ForUser(ownerID)
|
||||
tkn.SetExp(10000)
|
||||
|
@ -540,10 +563,16 @@ func makeBearerToken(t *testing.T, key *keys.PrivateKey, ownerID user.ID, versio
|
|||
err := tkn.Sign(key.PrivateKey)
|
||||
require.NoError(t, err)
|
||||
|
||||
t64 := base64.StdEncoding.EncodeToString(tkn.Marshal())
|
||||
require.NotEmpty(t, t64)
|
||||
jsonToken, err := tkn.MarshalJSON()
|
||||
require.NoError(t, err)
|
||||
|
||||
return t64
|
||||
jsonTokenBase64 = base64.StdEncoding.EncodeToString(jsonToken)
|
||||
binaryTokenBase64 = base64.StdEncoding.EncodeToString(tkn.Marshal())
|
||||
|
||||
require.NotEmpty(t, jsonTokenBase64)
|
||||
require.NotEmpty(t, binaryTokenBase64)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
func makeTempWallet(t *testing.T, key *keys.PrivateKey, path string) {
|
||||
|
|
|
@ -8,9 +8,9 @@ import (
|
|||
|
||||
func main() {
|
||||
globalContext, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||
v := settings()
|
||||
cfg := settings()
|
||||
|
||||
application := newApp(globalContext, v)
|
||||
application := newApp(globalContext, cfg)
|
||||
go application.Serve()
|
||||
application.Wait()
|
||||
}
|
||||
|
|
|
@ -12,17 +12,18 @@ import (
|
|||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
internalnet "git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/net"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/service/frostfs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||
grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
|
||||
"git.frostfs.info/TrueCloudLab/zapjournald"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/ssgreg/journald"
|
||||
|
@ -128,8 +129,13 @@ const (
|
|||
cfgResolveOrder = "resolve_order"
|
||||
|
||||
// Zip compression.
|
||||
//
|
||||
// Deprecated: Use cfgArchiveCompression instead.
|
||||
cfgZipCompression = "zip.compression"
|
||||
|
||||
// Archive compression.
|
||||
cfgArchiveCompression = "archive.compression"
|
||||
|
||||
// Runtime.
|
||||
cfgSoftMemoryLimit = "runtime.soft_memory_limit"
|
||||
|
||||
|
@ -144,6 +150,7 @@ const (
|
|||
// Caching.
|
||||
cfgBucketsCacheLifetime = "cache.buckets.lifetime"
|
||||
cfgBucketsCacheSize = "cache.buckets.size"
|
||||
cfgNetmapCacheLifetime = "cache.netmap.lifetime"
|
||||
|
||||
// Bucket resolving options.
|
||||
cfgResolveNamespaceHeader = "resolve_bucket.namespace_header"
|
||||
|
@ -164,6 +171,10 @@ const (
|
|||
cfgMultinetFallbackDelay = "multinet.fallback_delay"
|
||||
cfgMultinetSubnets = "multinet.subnets"
|
||||
|
||||
// Feature.
|
||||
cfgFeaturesEnableFilepathFallback = "features.enable_filepath_fallback"
|
||||
cfgFeaturesTreePoolNetmapSupport = "features.tree_pool_netmap_support"
|
||||
|
||||
// Command line args.
|
||||
cmdHelp = "help"
|
||||
cmdVersion = "version"
|
||||
|
@ -187,14 +198,72 @@ type Logger struct {
|
|||
lvl zap.AtomicLevel
|
||||
}
|
||||
|
||||
func settings() *viper.Viper {
|
||||
type appCfg struct {
|
||||
flags *pflag.FlagSet
|
||||
|
||||
mu sync.RWMutex
|
||||
settings *viper.Viper
|
||||
}
|
||||
|
||||
func (a *appCfg) reload() error {
|
||||
old := a.config()
|
||||
|
||||
v, err := newViper(a.flags)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if old.IsSet(cmdConfig) {
|
||||
v.Set(cmdConfig, old.Get(cmdConfig))
|
||||
}
|
||||
if old.IsSet(cmdConfigDir) {
|
||||
v.Set(cmdConfigDir, old.Get(cmdConfigDir))
|
||||
}
|
||||
|
||||
if err = readInConfig(v); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
a.setConfig(v)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a *appCfg) config() *viper.Viper {
|
||||
a.mu.RLock()
|
||||
defer a.mu.RUnlock()
|
||||
|
||||
return a.settings
|
||||
}
|
||||
|
||||
func (a *appCfg) setConfig(v *viper.Viper) {
|
||||
a.mu.Lock()
|
||||
a.settings = v
|
||||
a.mu.Unlock()
|
||||
}
|
||||
|
||||
func newViper(flags *pflag.FlagSet) (*viper.Viper, error) {
|
||||
v := viper.New()
|
||||
|
||||
v.AutomaticEnv()
|
||||
v.SetEnvPrefix(Prefix)
|
||||
v.AllowEmptyEnv(true)
|
||||
v.SetConfigType("yaml")
|
||||
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
|
||||
|
||||
if err := bindFlags(v, flags); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
setDefaults(v, flags)
|
||||
|
||||
if v.IsSet(cfgServer+".0."+cfgTLSKeyFile) && v.IsSet(cfgServer+".0."+cfgTLSCertFile) {
|
||||
v.Set(cfgServer+".0."+cfgTLSEnabled, true)
|
||||
}
|
||||
|
||||
return v, nil
|
||||
}
|
||||
|
||||
func settings() *appCfg {
|
||||
// flags setup:
|
||||
flags := pflag.NewFlagSet("commandline", pflag.ExitOnError)
|
||||
flags.SetOutput(os.Stdout)
|
||||
|
@ -218,92 +287,17 @@ func settings() *viper.Viper {
|
|||
flags.String(cmdListenAddress, "0.0.0.0:8080", "addresses to listen")
|
||||
flags.String(cfgTLSCertFile, "", "TLS certificate path")
|
||||
flags.String(cfgTLSKeyFile, "", "TLS key path")
|
||||
peers := flags.StringArrayP(cfgPeers, "p", nil, "FrostFS nodes")
|
||||
flags.StringArrayP(cfgPeers, "p", nil, "FrostFS nodes")
|
||||
|
||||
resolveMethods := flags.StringSlice(cfgResolveOrder, []string{resolver.NNSResolver, resolver.DNSResolver}, "set container name resolve order")
|
||||
|
||||
// set defaults:
|
||||
|
||||
// logger:
|
||||
v.SetDefault(cfgLoggerLevel, "debug")
|
||||
v.SetDefault(cfgLoggerDestination, "stdout")
|
||||
v.SetDefault(cfgLoggerSamplingEnabled, false)
|
||||
v.SetDefault(cfgLoggerSamplingThereafter, 100)
|
||||
v.SetDefault(cfgLoggerSamplingInitial, 100)
|
||||
v.SetDefault(cfgLoggerSamplingInterval, defaultLoggerSamplerInterval)
|
||||
|
||||
// pool:
|
||||
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
|
||||
|
||||
// frostfs:
|
||||
v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut)
|
||||
|
||||
// web-server:
|
||||
v.SetDefault(cfgWebReadBufferSize, 4096)
|
||||
v.SetDefault(cfgWebWriteBufferSize, 4096)
|
||||
v.SetDefault(cfgWebReadTimeout, time.Minute*10)
|
||||
v.SetDefault(cfgWebWriteTimeout, time.Minute*5)
|
||||
v.SetDefault(cfgWebStreamRequestBody, true)
|
||||
v.SetDefault(cfgWebMaxRequestBodySize, fasthttp.DefaultMaxRequestBodySize)
|
||||
|
||||
v.SetDefault(cfgWorkerPoolSize, 1000)
|
||||
// upload header
|
||||
v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false)
|
||||
|
||||
// zip:
|
||||
v.SetDefault(cfgZipCompression, false)
|
||||
|
||||
// metrics
|
||||
v.SetDefault(cfgPprofAddress, "localhost:8083")
|
||||
v.SetDefault(cfgPrometheusAddress, "localhost:8084")
|
||||
|
||||
// resolve bucket
|
||||
v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader)
|
||||
v.SetDefault(cfgResolveDefaultNamespaces, []string{"", "root"})
|
||||
|
||||
// multinet
|
||||
v.SetDefault(cfgMultinetFallbackDelay, defaultMultinetFallbackDelay)
|
||||
|
||||
// Binding flags
|
||||
if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := v.BindPFlag(cfgPrometheusEnabled, flags.Lookup(cmdMetrics)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err := v.BindPFlag(cfgWalletPath, flags.Lookup(cmdWallet)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err := v.BindPFlag(cfgWalletAddress, flags.Lookup(cmdAddress)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err := v.BindPFlags(flags); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if err := v.BindPFlag(cfgServer+".0.address", flags.Lookup(cmdListenAddress)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := v.BindPFlag(cfgServer+".0."+cfgTLSKeyFile, flags.Lookup(cfgTLSKeyFile)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
if err := v.BindPFlag(cfgServer+".0."+cfgTLSCertFile, flags.Lookup(cfgTLSCertFile)); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
flags.StringSlice(cfgResolveOrder, []string{resolver.NNSResolver, resolver.DNSResolver}, "set container name resolve order")
|
||||
|
||||
if err := flags.Parse(os.Args); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
if v.IsSet(cfgServer+".0."+cfgTLSKeyFile) && v.IsSet(cfgServer+".0."+cfgTLSCertFile) {
|
||||
v.Set(cfgServer+".0."+cfgTLSEnabled, true)
|
||||
}
|
||||
|
||||
if resolveMethods != nil {
|
||||
v.SetDefault(cfgResolveOrder, *resolveMethods)
|
||||
v, err := newViper(flags)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("bind flags: %w", err))
|
||||
}
|
||||
|
||||
switch {
|
||||
|
@ -348,15 +342,97 @@ func settings() *viper.Viper {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
if peers != nil && len(*peers) > 0 {
|
||||
for i := range *peers {
|
||||
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".address", (*peers)[i])
|
||||
return &appCfg{
|
||||
flags: flags,
|
||||
settings: v,
|
||||
}
|
||||
}
|
||||
|
||||
func setDefaults(v *viper.Viper, flags *pflag.FlagSet) {
|
||||
// set defaults:
|
||||
|
||||
// logger:
|
||||
v.SetDefault(cfgLoggerLevel, "debug")
|
||||
v.SetDefault(cfgLoggerDestination, "stdout")
|
||||
v.SetDefault(cfgLoggerSamplingEnabled, false)
|
||||
v.SetDefault(cfgLoggerSamplingThereafter, 100)
|
||||
v.SetDefault(cfgLoggerSamplingInitial, 100)
|
||||
v.SetDefault(cfgLoggerSamplingInterval, defaultLoggerSamplerInterval)
|
||||
|
||||
// pool:
|
||||
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
|
||||
|
||||
// frostfs:
|
||||
v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut)
|
||||
|
||||
// web-server:
|
||||
v.SetDefault(cfgWebReadBufferSize, 4096)
|
||||
v.SetDefault(cfgWebWriteBufferSize, 4096)
|
||||
v.SetDefault(cfgWebReadTimeout, time.Minute*10)
|
||||
v.SetDefault(cfgWebWriteTimeout, time.Minute*5)
|
||||
v.SetDefault(cfgWebStreamRequestBody, true)
|
||||
v.SetDefault(cfgWebMaxRequestBodySize, fasthttp.DefaultMaxRequestBodySize)
|
||||
|
||||
v.SetDefault(cfgWorkerPoolSize, 1000)
|
||||
// upload header
|
||||
v.SetDefault(cfgUploaderHeaderEnableDefaultTimestamp, false)
|
||||
|
||||
// metrics
|
||||
v.SetDefault(cfgPprofAddress, "localhost:8083")
|
||||
v.SetDefault(cfgPrometheusAddress, "localhost:8084")
|
||||
|
||||
// resolve bucket
|
||||
v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader)
|
||||
v.SetDefault(cfgResolveDefaultNamespaces, []string{"", "root"})
|
||||
|
||||
// multinet
|
||||
v.SetDefault(cfgMultinetFallbackDelay, defaultMultinetFallbackDelay)
|
||||
|
||||
if resolveMethods, err := flags.GetStringSlice(cfgResolveOrder); err == nil {
|
||||
v.SetDefault(cfgResolveOrder, resolveMethods)
|
||||
}
|
||||
|
||||
if peers, err := flags.GetStringArray(cfgPeers); err == nil {
|
||||
for i := range peers {
|
||||
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".address", peers[i])
|
||||
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".weight", 1)
|
||||
v.SetDefault(cfgPeers+"."+strconv.Itoa(i)+".priority", 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return v
|
||||
func bindFlags(v *viper.Viper, flags *pflag.FlagSet) error {
|
||||
// Binding flags
|
||||
if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := v.BindPFlag(cfgPrometheusEnabled, flags.Lookup(cmdMetrics)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := v.BindPFlag(cfgWalletPath, flags.Lookup(cmdWallet)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := v.BindPFlag(cfgWalletAddress, flags.Lookup(cmdAddress)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := v.BindPFlags(flags); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := v.BindPFlag(cfgServer+".0.address", flags.Lookup(cmdListenAddress)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := v.BindPFlag(cfgServer+".0."+cfgTLSKeyFile, flags.Lookup(cfgTLSKeyFile)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := v.BindPFlag(cfgServer+".0."+cfgTLSCertFile, flags.Lookup(cfgTLSCertFile)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func readInConfig(v *viper.Viper) error {
|
||||
|
@ -608,10 +684,10 @@ func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
|
|||
return servers
|
||||
}
|
||||
|
||||
func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper, dialSource *internalnet.DialerSource) (*pool.Pool, *treepool.Pool, *keys.PrivateKey) {
|
||||
key, err := getFrostFSKey(cfg, logger)
|
||||
func (a *app) initPools(ctx context.Context) {
|
||||
key, err := getFrostFSKey(a.config(), a.log)
|
||||
if err != nil {
|
||||
logger.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err))
|
||||
a.log.Fatal(logs.CouldNotLoadFrostFSPrivateKey, zap.Error(err))
|
||||
}
|
||||
|
||||
var prm pool.InitParameters
|
||||
|
@ -619,77 +695,83 @@ func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper, dialSou
|
|||
|
||||
prm.SetKey(&key.PrivateKey)
|
||||
prmTree.SetKey(key)
|
||||
logger.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())))
|
||||
a.log.Info(logs.UsingCredentials, zap.String("FrostFS", hex.EncodeToString(key.PublicKey().Bytes())))
|
||||
|
||||
for _, peer := range fetchPeers(logger, cfg) {
|
||||
for _, peer := range fetchPeers(a.log, a.config()) {
|
||||
prm.AddNode(peer)
|
||||
prmTree.AddNode(peer)
|
||||
}
|
||||
|
||||
connTimeout := cfg.GetDuration(cfgConTimeout)
|
||||
connTimeout := a.config().GetDuration(cfgConTimeout)
|
||||
if connTimeout <= 0 {
|
||||
connTimeout = defaultConnectTimeout
|
||||
}
|
||||
prm.SetNodeDialTimeout(connTimeout)
|
||||
prmTree.SetNodeDialTimeout(connTimeout)
|
||||
|
||||
streamTimeout := cfg.GetDuration(cfgStreamTimeout)
|
||||
streamTimeout := a.config().GetDuration(cfgStreamTimeout)
|
||||
if streamTimeout <= 0 {
|
||||
streamTimeout = defaultStreamTimeout
|
||||
}
|
||||
prm.SetNodeStreamTimeout(streamTimeout)
|
||||
prmTree.SetNodeStreamTimeout(streamTimeout)
|
||||
|
||||
healthCheckTimeout := cfg.GetDuration(cfgReqTimeout)
|
||||
healthCheckTimeout := a.config().GetDuration(cfgReqTimeout)
|
||||
if healthCheckTimeout <= 0 {
|
||||
healthCheckTimeout = defaultRequestTimeout
|
||||
}
|
||||
prm.SetHealthcheckTimeout(healthCheckTimeout)
|
||||
prmTree.SetHealthcheckTimeout(healthCheckTimeout)
|
||||
|
||||
rebalanceInterval := cfg.GetDuration(cfgRebalance)
|
||||
rebalanceInterval := a.config().GetDuration(cfgRebalance)
|
||||
if rebalanceInterval <= 0 {
|
||||
rebalanceInterval = defaultRebalanceTimer
|
||||
}
|
||||
prm.SetClientRebalanceInterval(rebalanceInterval)
|
||||
prmTree.SetClientRebalanceInterval(rebalanceInterval)
|
||||
|
||||
errorThreshold := cfg.GetUint32(cfgPoolErrorThreshold)
|
||||
errorThreshold := a.config().GetUint32(cfgPoolErrorThreshold)
|
||||
if errorThreshold <= 0 {
|
||||
errorThreshold = defaultPoolErrorThreshold
|
||||
}
|
||||
prm.SetErrorThreshold(errorThreshold)
|
||||
prm.SetLogger(logger)
|
||||
prmTree.SetLogger(logger)
|
||||
prm.SetLogger(a.log)
|
||||
prmTree.SetLogger(a.log)
|
||||
|
||||
prmTree.SetMaxRequestAttempts(cfg.GetInt(cfgTreePoolMaxAttempts))
|
||||
prmTree.SetMaxRequestAttempts(a.config().GetInt(cfgTreePoolMaxAttempts))
|
||||
|
||||
interceptors := []grpc.DialOption{
|
||||
grpc.WithUnaryInterceptor(grpctracing.NewUnaryClientInteceptor()),
|
||||
grpc.WithStreamInterceptor(grpctracing.NewStreamClientInterceptor()),
|
||||
grpc.WithContextDialer(dialSource.GrpcContextDialer()),
|
||||
grpc.WithContextDialer(a.settings.dialerSource.GrpcContextDialer()),
|
||||
}
|
||||
prm.SetGRPCDialOptions(interceptors...)
|
||||
prmTree.SetGRPCDialOptions(interceptors...)
|
||||
|
||||
p, err := pool.NewPool(prm)
|
||||
if err != nil {
|
||||
logger.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err))
|
||||
a.log.Fatal(logs.FailedToCreateConnectionPool, zap.Error(err))
|
||||
}
|
||||
|
||||
if err = p.Dial(ctx); err != nil {
|
||||
logger.Fatal(logs.FailedToDialConnectionPool, zap.Error(err))
|
||||
a.log.Fatal(logs.FailedToDialConnectionPool, zap.Error(err))
|
||||
}
|
||||
|
||||
if a.config().GetBool(cfgFeaturesTreePoolNetmapSupport) {
|
||||
prmTree.SetNetMapInfoSource(frostfs.NewSource(frostfs.NewFrostFS(p), cache.NewNetmapCache(getNetmapCacheOptions(a.config(), a.log)), a.bucketCache, a.log))
|
||||
}
|
||||
|
||||
treePool, err := treepool.NewPool(prmTree)
|
||||
if err != nil {
|
||||
logger.Fatal(logs.FailedToCreateTreePool, zap.Error(err))
|
||||
a.log.Fatal(logs.FailedToCreateTreePool, zap.Error(err))
|
||||
}
|
||||
if err = treePool.Dial(ctx); err != nil {
|
||||
logger.Fatal(logs.FailedToDialTreePool, zap.Error(err))
|
||||
a.log.Fatal(logs.FailedToDialTreePool, zap.Error(err))
|
||||
}
|
||||
|
||||
return p, treePool, key
|
||||
a.pool = p
|
||||
a.treePool = treePool
|
||||
a.key = key
|
||||
}
|
||||
|
||||
func fetchPeers(l *zap.Logger, v *viper.Viper) []pool.NodeParam {
|
||||
|
@ -730,7 +812,7 @@ func fetchSoftMemoryLimit(cfg *viper.Viper) int64 {
|
|||
return int64(softMemoryLimit)
|
||||
}
|
||||
|
||||
func getCacheOptions(v *viper.Viper, l *zap.Logger) *cache.Config {
|
||||
func getBucketCacheOptions(v *viper.Viper, l *zap.Logger) *cache.Config {
|
||||
cacheCfg := cache.DefaultBucketConfig(l)
|
||||
|
||||
cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Lifetime)
|
||||
|
@ -739,6 +821,14 @@ func getCacheOptions(v *viper.Viper, l *zap.Logger) *cache.Config {
|
|||
return cacheCfg
|
||||
}
|
||||
|
||||
func getNetmapCacheOptions(v *viper.Viper, l *zap.Logger) *cache.NetmapCacheConfig {
|
||||
cacheCfg := cache.DefaultNetmapConfig(l)
|
||||
|
||||
cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgNetmapCacheLifetime, cacheCfg.Lifetime)
|
||||
|
||||
return cacheCfg
|
||||
}
|
||||
|
||||
func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration {
|
||||
if v.IsSet(cfgEntry) {
|
||||
lifetime := v.GetDuration(cfgEntry)
|
||||
|
@ -825,3 +915,10 @@ func fetchTracingAttributes(v *viper.Viper) (map[string]string, error) {
|
|||
|
||||
return attributes, nil
|
||||
}
|
||||
|
||||
func fetchArchiveCompression(v *viper.Viper) bool {
|
||||
if v.IsSet(cfgZipCompression) {
|
||||
return v.GetBool(cfgZipCompression)
|
||||
}
|
||||
return v.GetBool(cfgArchiveCompression)
|
||||
}
|
||||
|
|
60
cmd/http-gw/settings_test.go
Normal file
60
cmd/http-gw/settings_test.go
Normal file
|
@ -0,0 +1,60 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestConfigReload(t *testing.T) {
|
||||
f, err := os.CreateTemp("", "conf")
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
require.NoError(t, os.Remove(f.Name()))
|
||||
}()
|
||||
|
||||
confData := `
|
||||
pprof:
|
||||
enabled: true
|
||||
|
||||
resolve_bucket:
|
||||
default_namespaces: [""]
|
||||
|
||||
resolve_order:
|
||||
- nns
|
||||
`
|
||||
|
||||
_, err = f.WriteString(confData)
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, f.Close())
|
||||
|
||||
cfg := settings()
|
||||
|
||||
require.NoError(t, cfg.flags.Parse([]string{"--config", f.Name(), "--connect_timeout", "15s"}))
|
||||
require.NoError(t, cfg.reload())
|
||||
|
||||
require.True(t, cfg.config().GetBool(cfgPprofEnabled))
|
||||
require.Equal(t, []string{""}, cfg.config().GetStringSlice(cfgResolveDefaultNamespaces))
|
||||
require.Equal(t, []string{resolver.NNSResolver}, cfg.config().GetStringSlice(cfgResolveOrder))
|
||||
require.Equal(t, 15*time.Second, cfg.config().GetDuration(cfgConTimeout))
|
||||
|
||||
require.NoError(t, os.Truncate(f.Name(), 0))
|
||||
require.NoError(t, cfg.reload())
|
||||
|
||||
require.False(t, cfg.config().GetBool(cfgPprofEnabled))
|
||||
require.Equal(t, []string{"", "root"}, cfg.config().GetStringSlice(cfgResolveDefaultNamespaces))
|
||||
require.Equal(t, []string{resolver.NNSResolver, resolver.DNSResolver}, cfg.config().GetStringSlice(cfgResolveOrder))
|
||||
require.Equal(t, 15*time.Second, cfg.config().GetDuration(cfgConTimeout))
|
||||
}
|
||||
|
||||
func TestSetTLSEnabled(t *testing.T) {
|
||||
cfg := settings()
|
||||
|
||||
require.NoError(t, cfg.flags.Parse([]string{"--" + cfgTLSCertFile, "tls.crt", "--" + cfgTLSKeyFile, "tls.key"}))
|
||||
require.NoError(t, cfg.reload())
|
||||
|
||||
require.True(t, cfg.config().GetBool(cfgServer+".0."+cfgTLSEnabled))
|
||||
}
|
|
@ -97,9 +97,13 @@ HTTP_GW_REBALANCE_TIMER=30s
|
|||
# The number of errors on connection after which node is considered as unhealthy
|
||||
HTTP_GW_POOL_ERROR_THRESHOLD=100
|
||||
|
||||
# Enable zip compression to download files by common prefix.
|
||||
# Enable archive compression to download files by common prefix.
|
||||
# DEPRECATED: Use HTTP_GW_ARCHIVE_COMPRESSION instead.
|
||||
HTTP_GW_ZIP_COMPRESSION=false
|
||||
|
||||
# Enable archive compression to download files by common prefix.
|
||||
HTTP_GW_ARCHIVE_COMPRESSION=false
|
||||
|
||||
HTTP_GW_TRACING_ENABLED=true
|
||||
HTTP_GW_TRACING_ENDPOINT="localhost:4317"
|
||||
HTTP_GW_TRACING_EXPORTER="otlp_grpc"
|
||||
|
@ -121,6 +125,8 @@ HTTP_GW_FROSTFS_BUFFER_MAX_SIZE_FOR_PUT=1048576
|
|||
# Cache which contains mapping of bucket name to bucket info
|
||||
HTTP_GW_CACHE_BUCKETS_LIFETIME=1m
|
||||
HTTP_GW_CACHE_BUCKETS_SIZE=1000
|
||||
# Cache which stores netmap
|
||||
HTTP_GW_CACHE_NETMAP_LIFETIME=1m
|
||||
|
||||
# Header to determine zone to resolve bucket name
|
||||
HTTP_GW_RESOLVE_BUCKET_NAMESPACE_HEADER=X-Frostfs-Namespace
|
||||
|
@ -158,4 +164,9 @@ HTTP_GW_WORKER_POOL_SIZE=1000
|
|||
# Enable index page support
|
||||
HTTP_GW_INDEX_PAGE_ENABLED=false
|
||||
# Index page template path
|
||||
HTTP_GW_INDEX_PAGE_TEMPLATE_PATH=internal/handler/templates/index.gotmpl
|
||||
HTTP_GW_INDEX_PAGE_TEMPLATE_PATH=internal/handler/templates/index.gotmpl
|
||||
|
||||
# Enable using fallback path to search for a object by attribute
|
||||
HTTP_GW_FEATURES_ENABLE_FILEPATH_FALLBACK=false
|
||||
# Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service
|
||||
HTTP_GW_FEATURES_TREE_POOL_NETMAP_SUPPORT=true
|
||||
|
|
|
@ -116,13 +116,19 @@ pool_error_threshold: 100 # The number of errors on connection after which node
|
|||
# Number of workers in handler's worker pool
|
||||
worker_pool_size: 1000
|
||||
|
||||
# Enable index page to see objects list for specified container and prefix
|
||||
# Enables index page to see objects list for specified container and prefix
|
||||
index_page:
|
||||
enabled: false
|
||||
template_path: internal/handler/templates/index.gotmpl
|
||||
|
||||
# Deprecated: Use archive.compression instead
|
||||
zip:
|
||||
compression: false # Enable zip compression to download files by common prefix.
|
||||
# Enables zip compression to download files by common prefix.
|
||||
compression: false
|
||||
|
||||
archive:
|
||||
# Enables archive compression to download files by common prefix.
|
||||
compression: false
|
||||
|
||||
runtime:
|
||||
soft_memory_limit: 1gb
|
||||
|
@ -143,6 +149,9 @@ cache:
|
|||
buckets:
|
||||
lifetime: 1m
|
||||
size: 1000
|
||||
# Cache which stores netmap
|
||||
netmap:
|
||||
lifetime: 1m
|
||||
|
||||
resolve_bucket:
|
||||
namespace_header: X-Frostfs-Namespace
|
||||
|
@ -172,3 +181,9 @@ multinet:
|
|||
source_ips:
|
||||
- 1.2.3.4
|
||||
- 1.2.3.5
|
||||
|
||||
features:
|
||||
# Enable using fallback path to search for a object by attribute
|
||||
enable_filepath_fallback: false
|
||||
# Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service
|
||||
tree_pool_netmap_support: true
|
||||
|
|
41
docs/api.md
41
docs/api.md
|
@ -1,14 +1,14 @@
|
|||
# HTTP Gateway Specification
|
||||
|
||||
| Route | Description |
|
||||
|-------------------------------------------------|----------------------------------------------|
|
||||
| `/upload/{cid}` | [Put object](#put-object) |
|
||||
| `/get/{cid}/{oid}` | [Get object](#get-object) |
|
||||
| `/get_by_attribute/{cid}/{attr_key}/{attr_val}` | [Search object](#search-object) |
|
||||
| `/zip/{cid}/{prefix}` | [Download objects in archive](#download-zip) |
|
||||
| Route | Description |
|
||||
|-------------------------------------------------|--------------------------------------------------|
|
||||
| `/upload/{cid}` | [Put object](#put-object) |
|
||||
| `/get/{cid}/{oid}` | [Get object](#get-object) |
|
||||
| `/get_by_attribute/{cid}/{attr_key}/{attr_val}` | [Search object](#search-object) |
|
||||
| `/zip/{cid}/{prefix}`, `/tar/{cid}/{prefix}` | [Download objects in archive](#download-archive) |
|
||||
|
||||
**Note:** `cid` parameter can be base58 encoded container ID or container name
|
||||
(the name must be registered in NNS, see appropriate section in [README](../README.md#nns)).
|
||||
(the name must be registered in NNS, see appropriate section in [nns.md](./nns.md)).
|
||||
|
||||
Route parameters can be:
|
||||
|
||||
|
@ -18,7 +18,7 @@ Route parameters can be:
|
|||
|
||||
### Bearer token
|
||||
|
||||
All routes can accept [bearer token](../README.md#authentication) from:
|
||||
All routes can accept [bearer token](./authentication.md) from:
|
||||
|
||||
* `Authorization` header with `Bearer` type and base64-encoded token in
|
||||
credentials field
|
||||
|
@ -56,12 +56,14 @@ Upload file as object with attributes to FrostFS.
|
|||
|
||||
###### Headers
|
||||
|
||||
| Header | Description |
|
||||
|------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| Common headers | See [bearer token](#bearer-token). |
|
||||
| `X-Attribute-System-*` | Used to set system FrostFS object attributes <br/> (e.g. use "X-Attribute-System-Expiration-Epoch" to set `__SYSTEM__EXPIRATION_EPOCH` attribute). |
|
||||
| `X-Attribute-*` | Used to set regular object attributes <br/> (e.g. use "X-Attribute-My-Tag" to set `My-Tag` attribute). |
|
||||
| `Date` | This header is used to calculate the right `__SYSTEM__EXPIRATION` attribute for object. If the header is missing, the current server time is used. |
|
||||
| Header | Description |
|
||||
|------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| Common headers | See [bearer token](#bearer-token). |
|
||||
| `X-Attribute-System-*` | Used to set system FrostFS object attributes <br/> (e.g. use "X-Attribute-System-Expiration-Epoch" to set `__SYSTEM__EXPIRATION_EPOCH` attribute). |
|
||||
| `X-Attribute-*` | Used to set regular object attributes <br/> (e.g. use "X-Attribute-My-Tag" to set `My-Tag` attribute). |
|
||||
| `X-Explode-Archive` | If set, gate tries to read files from uploading `tar` archive and creates an object for each file in it. Uploading `tar` could be compressed via Gzip by setting a `Content-Encoding` header. Sets a `FilePath` attribute as a relative path from archive root and a `FileName` as the last path element of the `FilePath`. |
|
||||
| `Content-Encoding` | If set and value is `gzip`, gate will handle uploading file as a `Gzip` compressed `tar` file. |
|
||||
| `Date` | This header is used to calculate the right `__SYSTEM__EXPIRATION` attribute for object. If the header is missing, the current server time is used. |
|
||||
|
||||
There are some reserved headers type of `X-Attribute-FROSTFS-*` (headers are arranged in descending order of priority):
|
||||
|
||||
|
@ -269,9 +271,9 @@ If more than one object is found, an arbitrary one will be used to get attribute
|
|||
| 400 | Some error occurred during operation. |
|
||||
| 404 | Container or object not found. |
|
||||
|
||||
## Download zip
|
||||
## Download archive
|
||||
|
||||
Route: `/zip/{cid}/{prefix}`
|
||||
Route: `/zip/{cid}/{prefix}`, `/tar/{cid}/{prefix}`
|
||||
|
||||
| Route parameter | Type | Description |
|
||||
|-----------------|-----------|---------------------------------------------------------|
|
||||
|
@ -282,12 +284,13 @@ Route: `/zip/{cid}/{prefix}`
|
|||
|
||||
#### GET
|
||||
|
||||
Find objects by prefix for `FilePath` attributes. Return found objects in zip archive.
|
||||
Find objects by prefix for `FilePath` attributes. Return found objects in zip or tar archive.
|
||||
Name of files in archive sets to `FilePath` attribute of objects.
|
||||
Time of files sets to time when object has started downloading.
|
||||
You can download all files in container that have `FilePath` attribute by `/zip/{cid}/` route.
|
||||
You can download all files in container that have `FilePath` attribute by `/zip/{cid}/` or
|
||||
`/tar/{cid}/` route.
|
||||
|
||||
Archive can be compressed (see http-gw [configuration](gate-configuration.md#zip-section)).
|
||||
Archive can be compressed (see http-gw [configuration](gate-configuration.md#archive-section)).
|
||||
|
||||
##### Request
|
||||
|
||||
|
|
108
docs/authentication.md
Normal file
108
docs/authentication.md
Normal file
|
@ -0,0 +1,108 @@
|
|||
# Request authentication
|
||||
|
||||
HTTP Gateway does not authorize requests. Gateway converts HTTP request to a
|
||||
FrostFS request and signs it with its own private key.
|
||||
|
||||
You can always upload files to public containers (open for anyone to put
|
||||
objects into), but for restricted containers you need to explicitly allow PUT
|
||||
operations for a request signed with your HTTP Gateway keys.
|
||||
|
||||
If you don't want to manage gateway's secret keys and adjust policies when
|
||||
gateway configuration changes (new gate, key rotation, etc) or you plan to use
|
||||
public services, there is an option to let your application backend (or you) to
|
||||
issue Bearer Tokens and pass them from the client via gate down to FrostFS level
|
||||
to grant access.
|
||||
|
||||
FrostFS Bearer Token basically is a container owner-signed policy (refer to FrostFS
|
||||
documentation for more details). There are two options to pass them to gateway:
|
||||
* "Authorization" header with "Bearer" type and base64-encoded token in
|
||||
credentials field
|
||||
* "Bearer" cookie with base64-encoded token contents
|
||||
|
||||
For example, you have a mobile application frontend with a backend part storing
|
||||
data in FrostFS. When a user authorizes in the mobile app, the backend issues a FrostFS
|
||||
Bearer token and provides it to the frontend. Then, the mobile app may generate
|
||||
some data and upload it via any available FrostFS HTTP Gateway by adding
|
||||
the corresponding header to the upload request. Accessing policy protected data
|
||||
works the same way.
|
||||
|
||||
##### Example
|
||||
In order to generate a bearer token, you need to have wallet (which will be used to sign the token)
|
||||
|
||||
1. Suppose you have a container with private policy for wallet key
|
||||
|
||||
```
|
||||
$ frostfs-cli container create -r <endpoint> --wallet <wallet> -policy <policy> --basic-acl 0 --await
|
||||
CID: 9dfzyvq82JnFqp5svxcREf2iy6XNuifYcJPusEDnGK9Z
|
||||
|
||||
$ frostfs-cli ape-manager add -r <endpoint> --wallet <wallet> \
|
||||
--target-type container --target-name 9dfzyvq82JnFqp5svxcREf2iy6XNuifYcJPusEDnGK9Z \
|
||||
--rule "allow Object.* RequestCondition:"\$Actor:publicKey"=03b09baabff3f6107c7e9acb8721a6fc5618d45b50247a314d82e548702cce8cd5 *" \
|
||||
--chain-id <chainID>
|
||||
```
|
||||
|
||||
|
||||
2. Form a Bearer token (10000 is lifetime expiration in epoch) to impersonate
|
||||
HTTP Gateway request as wallet signed request and save it to **bearer.json**:
|
||||
```
|
||||
{
|
||||
"body": {
|
||||
"allowImpersonate": true,
|
||||
"lifetime": {
|
||||
"exp": "10000",
|
||||
"nbf": "0",
|
||||
"iat": "0"
|
||||
}
|
||||
},
|
||||
"signature": null
|
||||
}
|
||||
```
|
||||
|
||||
3. Sign it with the wallet:
|
||||
```
|
||||
$ frostfs-cli util sign bearer-token --from bearer.json --to signed.json -w <wallet>
|
||||
```
|
||||
|
||||
4. Encode to base64 to use in header:
|
||||
```
|
||||
$ base64 -w 0 signed.json
|
||||
# output: Ck4KKgoECAIQBhIiCiCZGdlbN7DPGPMg9rsWqV+p2XdMzUqknRiexewSFp8kmBIbChk17MUri6OJ0X5ftsHzy7NERDNFB4C92PcaGgMIkE4SZgohAxpsb7vfAso1F0X6hrm6WpRS14WsT3/Ct1SMoqRsT89KEkEEGxKi8GjKSf52YqhppgaOTQHbUsL3jn7SHLqS3ndAQ7NtAATnmRHleZw2V2xRRSRBQdjDC05KK83LhdSax72Fsw==
|
||||
```
|
||||
|
||||
After that, the Bearer token can be used:
|
||||
|
||||
```
|
||||
$ curl -F 'file=@cat.jpeg;filename=cat.jpeg' -H "Authorization: Bearer Ck4KKgoECAIQBhIiCiCZGdlbN7DPGPMg9rsWqV+p2XdMzUqknRiexewSFp8kmBIbChk17MUri6OJ0X5ftsHzy7NERDNFB4C92PcaGgMIkE4SZgohAxpsb7vfAso1F0X6hrm6WpRS14WsT3/Ct1SMoqRsT89KEkEEGxKi8GjKSf52YqhppgaOTQHbUsL3jn7SHLqS3ndAQ7NtAATnmRHleZw2V2xRRSRBQdjDC05KK83LhdSax72Fsw==" \
|
||||
http://localhost:8082/upload/BJeErH9MWmf52VsR1mLWKkgF3pRm3FkubYxM7TZkBP4K
|
||||
# output:
|
||||
# {
|
||||
# "object_id": "DhfES9nVrFksxGDD2jQLunGADfrXExxNwqXbDafyBn9X",
|
||||
# "container_id": "BJeErH9MWmf52VsR1mLWKkgF3pRm3FkubYxM7TZkBP4K"
|
||||
# }
|
||||
```
|
||||
|
||||
##### Note: Bearer Token owner
|
||||
|
||||
You can specify exact key who can use Bearer Token (gateway wallet address).
|
||||
To do this, encode wallet address in base64 format
|
||||
|
||||
```
|
||||
$ echo 'NhVtreTTCoqsMQV5Wp55fqnriiUCpEaKm3' | base58 --decode | base64
|
||||
# output: NezFK4ujidF+X7bB88uzREQzRQeAvdj3Gg==
|
||||
```
|
||||
|
||||
Then specify this value in Bearer Token Json
|
||||
```
|
||||
{
|
||||
"body": {
|
||||
"ownerID": {
|
||||
"value": "NezFK4ujidF+X7bB88uzREQzRQeAvdj3Gg=="
|
||||
},
|
||||
...
|
||||
```
|
||||
|
||||
##### Note: Policy override
|
||||
|
||||
Instead of impersonation, you can define the set of policies that will be applied
|
||||
to the request sender. This allows to restrict access to specific operation and
|
||||
specific objects without giving full impersonation control to the token user.
|
|
@ -59,7 +59,7 @@ $ cat http.log
|
|||
| `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) |
|
||||
| `index_page` | [Index page configuration](#index_page-section) |
|
||||
| `multinet` | [Multinet configuration](#multinet-section) |
|
||||
|
||||
| `features` | [Features configuration](#features-section) |
|
||||
|
||||
# General section
|
||||
|
||||
|
@ -218,9 +218,10 @@ upload_header:
|
|||
|-------------------------|--------|---------------|---------------|-------------------------------------------------------------|
|
||||
| `use_default_timestamp` | `bool` | yes | `false` | Create timestamp for object if it isn't provided by header. |
|
||||
|
||||
|
||||
# `zip` section
|
||||
|
||||
> **_DEPRECATED:_** Use archive section instead
|
||||
|
||||
```yaml
|
||||
zip:
|
||||
compression: false
|
||||
|
@ -230,6 +231,17 @@ zip:
|
|||
|---------------|--------|---------------|---------------|--------------------------------------------------------------|
|
||||
| `compression` | `bool` | yes | `false` | Enable zip compression when download files by common prefix. |
|
||||
|
||||
# `archive` section
|
||||
|
||||
```yaml
|
||||
archive:
|
||||
compression: false
|
||||
```
|
||||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|---------------|--------|---------------|---------------|------------------------------------------------------------------|
|
||||
| `compression` | `bool` | yes | `false` | Enable archive compression when download files by common prefix. |
|
||||
|
||||
|
||||
# `pprof` section
|
||||
|
||||
|
@ -339,12 +351,14 @@ cache:
|
|||
buckets:
|
||||
lifetime: 1m
|
||||
size: 1000
|
||||
|
||||
netmap:
|
||||
lifetime: 1m
|
||||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|-----------------|-----------------------------------|-----------------------------------|----------------------------------------------------------------------------------------|
|
||||
| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 1000` | Cache which contains mapping of bucket name to bucket info. |
|
||||
| Parameter | Type | Default value | Description |
|
||||
|-----------|-----------------------------------|---------------------------------|---------------------------------------------------------------------------|
|
||||
| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 1000` | Cache which contains mapping of bucket name to bucket info. |
|
||||
| `netmap` | [Cache config](#cache-subsection) | `lifetime: 1m` | Cache which stores netmap. `netmap.size` isn't applicable for this cache. |
|
||||
|
||||
|
||||
#### `cache` subsection
|
||||
|
@ -457,3 +471,18 @@ multinet:
|
|||
|--------------|------------|---------------|---------------|----------------------------------------------------------------------|
|
||||
| `mask` | `string` | yes | | Destination subnet. |
|
||||
| `source_ips` | `[]string` | yes | | Array of source IP addresses to use when dialing destination subnet. |
|
||||
|
||||
# `features` section
|
||||
|
||||
Contains parameters for enabling features.
|
||||
|
||||
```yaml
|
||||
features:
|
||||
enable_filepath_fallback: true
|
||||
tree_pool_netmap_support: true
|
||||
```
|
||||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|-------------------------------------|--------|---------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| `features.enable_filepath_fallback` | `bool` | yes | `false` | Enable using fallback path to search for a object by attribute. If the value of the `FilePath` attribute in the request contains no `/` symbols or single leading `/` symbol and the object was not found, then an attempt is made to search for the object by the attribute `FileName`. |
|
||||
| `features.tree_pool_netmap_support` | `bool` | no | `false` | Enable using new version of tree pool, which uses netmap to select nodes, for requests to tree service. |
|
||||
|
|
36
docs/nns.md
Normal file
36
docs/nns.md
Normal file
|
@ -0,0 +1,36 @@
|
|||
# Nicename Resolving with NNS
|
||||
|
||||
Steps to start using name resolving:
|
||||
|
||||
1. Enable NNS resolving in config (`rpc_endpoint` must be a valid neo rpc node, see [configs](./config) for other examples):
|
||||
|
||||
```yaml
|
||||
rpc_endpoint: http://morph-chain.frostfs.devenv:30333
|
||||
resolve_order:
|
||||
- nns
|
||||
```
|
||||
|
||||
2. Make sure your container is registered in NNS contract. If you use [frostfs-dev-env](https://git.frostfs.info/TrueCloudLab/frostfs-dev-env)
|
||||
you can check if your container (e.g. with `container-name` name) is registered in NNS:
|
||||
|
||||
```shell
|
||||
$ curl -s --data '{"id":1,"jsonrpc":"2.0","method":"getcontractstate","params":[1]}' \
|
||||
http://morph-chain.frostfs.devenv:30333 | jq -r '.result.hash'
|
||||
|
||||
0x8e6c3cd4b976b28e84a3788f6ea9e2676c15d667
|
||||
|
||||
$ docker exec -it morph_chain neo-go \
|
||||
contract testinvokefunction \
|
||||
-r http://morph-chain.frostfs.devenv:30333 0x8e6c3cd4b976b28e84a3788f6ea9e2676c15d667 \
|
||||
resolve string:container-name.container int:16 \
|
||||
| jq -r '.stack[0].value | if type=="array" then .[0].value else . end' \
|
||||
| base64 -d && echo
|
||||
|
||||
7f3vvkw4iTiS5ZZbu5BQXEmJtETWbi3uUjLNaSs29xrL
|
||||
```
|
||||
|
||||
3. Use container name instead of its `$CID`. For example:
|
||||
|
||||
```shell
|
||||
$ curl http://localhost:8082/get_by_attribute/container-name/FileName/object-name
|
||||
```
|
73
go.mod
73
go.mod
|
@ -4,11 +4,12 @@ go 1.22
|
|||
|
||||
require (
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20241112082307-f17779933e88
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241206094944-81c423e7094d
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20241218062344-42a0fc8c13ae
|
||||
git.frostfs.info/TrueCloudLab/multinet v0.0.0-20241015075604-6cb0d80e0972
|
||||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
||||
github.com/bluele/gcache v0.0.2
|
||||
github.com/docker/go-units v0.4.0
|
||||
github.com/docker/docker v27.1.1+incompatible
|
||||
github.com/docker/go-units v0.5.0
|
||||
github.com/fasthttp/router v1.4.1
|
||||
github.com/nspcc-dev/neo-go v0.106.2
|
||||
github.com/panjf2000/ants/v2 v2.5.0
|
||||
|
@ -18,7 +19,7 @@ require (
|
|||
github.com/spf13/viper v1.15.0
|
||||
github.com/ssgreg/journald v1.0.0
|
||||
github.com/stretchr/testify v1.9.0
|
||||
github.com/testcontainers/testcontainers-go v0.13.0
|
||||
github.com/testcontainers/testcontainers-go v0.35.0
|
||||
github.com/trailofbits/go-fuzz-utils v0.0.0-20230413173806-58c38daa3cb4
|
||||
github.com/valyala/fasthttp v1.34.0
|
||||
go.opentelemetry.io/otel v1.28.0
|
||||
|
@ -26,79 +27,98 @@ require (
|
|||
go.uber.org/zap v1.27.0
|
||||
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
|
||||
golang.org/x/net v0.26.0
|
||||
golang.org/x/sys v0.22.0
|
||||
golang.org/x/sys v0.28.0
|
||||
google.golang.org/grpc v1.66.2
|
||||
)
|
||||
|
||||
require (
|
||||
dario.cat/mergo v1.0.0 // indirect
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e // indirect
|
||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 // indirect
|
||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
|
||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 // indirect
|
||||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
|
||||
github.com/Microsoft/go-winio v0.5.2 // indirect
|
||||
github.com/Microsoft/hcsshim v0.9.2 // indirect
|
||||
github.com/Microsoft/go-winio v0.6.2 // indirect
|
||||
github.com/VictoriaMetrics/easyproto v0.1.4 // indirect
|
||||
github.com/andybalholm/brotli v1.0.4 // indirect
|
||||
github.com/antlr4-go/antlr/v4 v4.13.1 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/containerd/cgroups v1.0.3 // indirect
|
||||
github.com/containerd/containerd v1.6.2 // indirect
|
||||
github.com/containerd/containerd v1.7.18 // indirect
|
||||
github.com/containerd/log v0.1.0 // indirect
|
||||
github.com/containerd/platforms v0.2.1 // indirect
|
||||
github.com/cpuguy83/dockercfg v0.3.2 // indirect
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
||||
github.com/docker/distribution v2.8.1+incompatible // indirect
|
||||
github.com/docker/docker v20.10.14+incompatible // indirect
|
||||
github.com/docker/go-connections v0.4.0 // indirect
|
||||
github.com/distribution/reference v0.6.0 // indirect
|
||||
github.com/docker/go-connections v0.5.0 // indirect
|
||||
github.com/felixge/httpsnoop v1.0.4 // indirect
|
||||
github.com/fsnotify/fsnotify v1.6.0 // indirect
|
||||
github.com/go-logr/logr v1.4.2 // indirect
|
||||
github.com/go-logr/stdr v1.2.2 // indirect
|
||||
github.com/go-ole/go-ole v1.2.6 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
github.com/golang/snappy v0.0.4 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
github.com/gorilla/mux v1.8.0 // indirect
|
||||
github.com/gorilla/websocket v1.5.1 // indirect
|
||||
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect
|
||||
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
|
||||
github.com/hashicorp/hcl v1.0.0 // indirect
|
||||
github.com/ipfs/go-cid v0.0.7 // indirect
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/klauspost/compress v1.16.4 // indirect
|
||||
github.com/klauspost/compress v1.17.4 // indirect
|
||||
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
|
||||
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
|
||||
github.com/magiconair/properties v1.8.7 // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/minio/sha256-simd v1.0.1 // indirect
|
||||
github.com/mitchellh/mapstructure v1.5.0 // indirect
|
||||
github.com/moby/sys/mount v0.3.2 // indirect
|
||||
github.com/moby/sys/mountinfo v0.6.1 // indirect
|
||||
github.com/moby/term v0.0.0-20210619224110-3f7ff695adc6 // indirect
|
||||
github.com/moby/docker-image-spec v1.3.1 // indirect
|
||||
github.com/moby/patternmatcher v0.6.0 // indirect
|
||||
github.com/moby/sys/sequential v0.5.0 // indirect
|
||||
github.com/moby/sys/user v0.1.0 // indirect
|
||||
github.com/moby/term v0.5.0 // indirect
|
||||
github.com/morikuni/aec v1.0.0 // indirect
|
||||
github.com/mr-tron/base58 v1.2.0 // indirect
|
||||
github.com/multiformats/go-base32 v0.1.0 // indirect
|
||||
github.com/multiformats/go-base36 v0.2.0 // indirect
|
||||
github.com/multiformats/go-multiaddr v0.14.0 // indirect
|
||||
github.com/multiformats/go-multibase v0.2.0 // indirect
|
||||
github.com/multiformats/go-multihash v0.2.3 // indirect
|
||||
github.com/multiformats/go-varint v0.0.7 // indirect
|
||||
github.com/nspcc-dev/go-ordered-json v0.0.0-20240301084351-0246b013f8b2 // indirect
|
||||
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20240521091047-78685785716d // indirect
|
||||
github.com/nspcc-dev/rfc6979 v0.2.1 // indirect
|
||||
github.com/opencontainers/go-digest v1.0.0 // indirect
|
||||
github.com/opencontainers/image-spec v1.0.2 // indirect
|
||||
github.com/opencontainers/runc v1.1.1 // indirect
|
||||
github.com/opencontainers/image-spec v1.1.0 // indirect
|
||||
github.com/pelletier/go-toml/v2 v2.0.6 // indirect
|
||||
github.com/pkg/errors v0.9.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
|
||||
github.com/prometheus/common v0.48.0 // indirect
|
||||
github.com/prometheus/procfs v0.12.0 // indirect
|
||||
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
||||
github.com/savsgio/gotils v0.0.0-20210617111740-97865ed5a873 // indirect
|
||||
github.com/sirupsen/logrus v1.8.1 // indirect
|
||||
github.com/shirou/gopsutil/v3 v3.23.12 // indirect
|
||||
github.com/shoenig/go-m1cpu v0.1.6 // indirect
|
||||
github.com/sirupsen/logrus v1.9.3 // indirect
|
||||
github.com/spaolacci/murmur3 v1.1.0 // indirect
|
||||
github.com/spf13/afero v1.9.3 // indirect
|
||||
github.com/spf13/cast v1.5.0 // indirect
|
||||
github.com/spf13/jwalterweatherman v1.1.0 // indirect
|
||||
github.com/subosito/gotenv v1.4.2 // indirect
|
||||
github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect
|
||||
github.com/tklauser/go-sysconf v0.3.12 // indirect
|
||||
github.com/tklauser/numcpus v0.6.1 // indirect
|
||||
github.com/twmb/murmur3 v1.1.8 // indirect
|
||||
github.com/urfave/cli v1.22.5 // indirect
|
||||
github.com/urfave/cli v1.22.12 // indirect
|
||||
github.com/valyala/bytebufferpool v1.0.0 // indirect
|
||||
github.com/yusufpapurcu/wmi v1.2.3 // indirect
|
||||
go.etcd.io/bbolt v1.3.9 // indirect
|
||||
go.opencensus.io v0.24.0 // indirect
|
||||
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.28.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.28.0 // indirect
|
||||
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.28.0 // indirect
|
||||
|
@ -106,14 +126,15 @@ require (
|
|||
go.opentelemetry.io/otel/sdk v1.28.0 // indirect
|
||||
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
golang.org/x/crypto v0.24.0 // indirect
|
||||
golang.org/x/sync v0.7.0 // indirect
|
||||
golang.org/x/term v0.21.0 // indirect
|
||||
golang.org/x/text v0.16.0 // indirect
|
||||
golang.org/x/crypto v0.31.0 // indirect
|
||||
golang.org/x/sync v0.10.0 // indirect
|
||||
golang.org/x/term v0.27.0 // indirect
|
||||
golang.org/x/text v0.21.0 // indirect
|
||||
golang.org/x/time v0.3.0 // indirect
|
||||
google.golang.org/genproto/googleapis/api v0.0.0-20240701130421-f6361c86f094 // indirect
|
||||
google.golang.org/genproto/googleapis/rpc v0.0.0-20240701130421-f6361c86f094 // indirect
|
||||
google.golang.org/protobuf v1.34.2 // indirect
|
||||
gopkg.in/ini.v1 v1.67.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
lukechampine.com/blake3 v1.2.1 // indirect
|
||||
)
|
||||
|
|
51
internal/cache/buckets.go
vendored
51
internal/cache/buckets.go
vendored
|
@ -6,14 +6,16 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"github.com/bluele/gcache"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// BucketCache contains cache with objects and the lifetime of cache entries.
|
||||
type BucketCache struct {
|
||||
cache gcache.Cache
|
||||
logger *zap.Logger
|
||||
cache gcache.Cache
|
||||
cidCache gcache.Cache
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// Config stores expiration params for cache.
|
||||
|
@ -40,14 +42,45 @@ func DefaultBucketConfig(logger *zap.Logger) *Config {
|
|||
}
|
||||
|
||||
// NewBucketCache creates an object of BucketCache.
|
||||
func NewBucketCache(config *Config) *BucketCache {
|
||||
gc := gcache.New(config.Size).LRU().Expiration(config.Lifetime).Build()
|
||||
return &BucketCache{cache: gc, logger: config.Logger}
|
||||
func NewBucketCache(config *Config, cidCache bool) *BucketCache {
|
||||
cache := &BucketCache{
|
||||
cache: gcache.New(config.Size).LRU().Expiration(config.Lifetime).Build(),
|
||||
logger: config.Logger,
|
||||
}
|
||||
|
||||
if cidCache {
|
||||
cache.cidCache = gcache.New(config.Size).LRU().Expiration(config.Lifetime).Build()
|
||||
}
|
||||
return cache
|
||||
}
|
||||
|
||||
// Get returns a cached object.
|
||||
func (o *BucketCache) Get(ns, bktName string) *data.BucketInfo {
|
||||
entry, err := o.cache.Get(formKey(ns, bktName))
|
||||
return o.get(formKey(ns, bktName))
|
||||
}
|
||||
|
||||
func (o *BucketCache) GetByCID(cnrID cid.ID) *data.BucketInfo {
|
||||
if o.cidCache == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
entry, err := o.cidCache.Get(cnrID)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
key, ok := entry.(string)
|
||||
if !ok {
|
||||
o.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
|
||||
zap.String("expected", fmt.Sprintf("%T", key)))
|
||||
return nil
|
||||
}
|
||||
|
||||
return o.get(key)
|
||||
}
|
||||
|
||||
func (o *BucketCache) get(key string) *data.BucketInfo {
|
||||
entry, err := o.cache.Get(key)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
@ -64,6 +97,12 @@ func (o *BucketCache) Get(ns, bktName string) *data.BucketInfo {
|
|||
|
||||
// Put puts an object to cache.
|
||||
func (o *BucketCache) Put(bkt *data.BucketInfo) error {
|
||||
if o.cidCache != nil {
|
||||
if err := o.cidCache.Set(bkt.CID, formKey(bkt.Zone, bkt.Name)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return o.cache.Set(formKey(bkt.Zone, bkt.Name), bkt)
|
||||
}
|
||||
|
||||
|
|
65
internal/cache/netmap.go
vendored
Normal file
65
internal/cache/netmap.go
vendored
Normal file
|
@ -0,0 +1,65 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"github.com/bluele/gcache"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
// NetmapCache provides cache for netmap.
|
||||
NetmapCache struct {
|
||||
cache gcache.Cache
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// NetmapCacheConfig stores expiration params for cache.
|
||||
NetmapCacheConfig struct {
|
||||
Lifetime time.Duration
|
||||
Logger *zap.Logger
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultNetmapCacheLifetime = time.Minute
|
||||
netmapCacheSize = 1
|
||||
netmapKey = "netmap"
|
||||
)
|
||||
|
||||
// DefaultNetmapConfig returns new default cache expiration values.
|
||||
func DefaultNetmapConfig(logger *zap.Logger) *NetmapCacheConfig {
|
||||
return &NetmapCacheConfig{
|
||||
Lifetime: DefaultNetmapCacheLifetime,
|
||||
Logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// NewNetmapCache creates an object of NetmapCache.
|
||||
func NewNetmapCache(config *NetmapCacheConfig) *NetmapCache {
|
||||
gc := gcache.New(netmapCacheSize).LRU().Expiration(config.Lifetime).Build()
|
||||
return &NetmapCache{cache: gc, logger: config.Logger}
|
||||
}
|
||||
|
||||
func (c *NetmapCache) Get() *netmap.NetMap {
|
||||
entry, err := c.cache.Get(netmapKey)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
result, ok := entry.(netmap.NetMap)
|
||||
if !ok {
|
||||
c.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
|
||||
zap.String("expected", fmt.Sprintf("%T", result)))
|
||||
return nil
|
||||
}
|
||||
|
||||
return &result
|
||||
}
|
||||
|
||||
func (c *NetmapCache) Put(nm netmap.NetMap) error {
|
||||
return c.cache.Set(netmapKey, nm)
|
||||
}
|
|
@ -2,6 +2,7 @@ package data
|
|||
|
||||
import (
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
)
|
||||
|
||||
type BucketInfo struct {
|
||||
|
@ -9,4 +10,5 @@ type BucketInfo struct {
|
|||
Zone string // container zone from system attribute
|
||||
CID cid.ID
|
||||
HomomorphicHashDisabled bool
|
||||
PlacementPolicy netmap.PlacementPolicy
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package api
|
||||
package data
|
||||
|
||||
import (
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -7,12 +7,21 @@ import (
|
|||
// NodeVersion represent node from tree service.
|
||||
type NodeVersion struct {
|
||||
BaseNodeVersion
|
||||
DeleteMarker bool
|
||||
IsPrefixNode bool
|
||||
}
|
||||
|
||||
// BaseNodeVersion is minimal node info from tree service.
|
||||
// Basically used for "system" object.
|
||||
type BaseNodeVersion struct {
|
||||
OID oid.ID
|
||||
ID uint64
|
||||
OID oid.ID
|
||||
IsDeleteMarker bool
|
||||
}
|
||||
|
||||
type NodeInfo struct {
|
||||
Meta []NodeMeta
|
||||
}
|
||||
|
||||
type NodeMeta interface {
|
||||
GetKey() string
|
||||
GetValue() []byte
|
||||
}
|
|
@ -22,11 +22,13 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
dateFormat = "02-01-2006 15:04"
|
||||
attrOID = "OID"
|
||||
attrCreated = "Created"
|
||||
attrFileName = "FileName"
|
||||
attrSize = "Size"
|
||||
dateFormat = "02-01-2006 15:04"
|
||||
attrOID = "OID"
|
||||
attrCreated = "Created"
|
||||
attrFileName = "FileName"
|
||||
attrFilePath = "FilePath"
|
||||
attrSize = "Size"
|
||||
attrDeleteMarker = "IsDeleteMarker"
|
||||
)
|
||||
|
||||
type (
|
||||
|
@ -38,23 +40,25 @@ type (
|
|||
Objects []ResponseObject
|
||||
}
|
||||
ResponseObject struct {
|
||||
OID string
|
||||
Created string
|
||||
FileName string
|
||||
FilePath string
|
||||
Size string
|
||||
IsDir bool
|
||||
GetURL string
|
||||
OID string
|
||||
Created string
|
||||
FileName string
|
||||
FilePath string
|
||||
Size string
|
||||
IsDir bool
|
||||
GetURL string
|
||||
IsDeleteMarker bool
|
||||
}
|
||||
)
|
||||
|
||||
func newListObjectsResponseS3(attrs map[string]string) ResponseObject {
|
||||
return ResponseObject{
|
||||
Created: formatTimestamp(attrs[attrCreated]),
|
||||
OID: attrs[attrOID],
|
||||
FileName: attrs[attrFileName],
|
||||
Size: attrs[attrSize],
|
||||
IsDir: attrs[attrOID] == "",
|
||||
Created: formatTimestamp(attrs[attrCreated]),
|
||||
OID: attrs[attrOID],
|
||||
FileName: attrs[attrFileName],
|
||||
Size: attrs[attrSize],
|
||||
IsDir: attrs[attrOID] == "",
|
||||
IsDeleteMarker: attrs[attrDeleteMarker] == "true",
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -169,7 +173,7 @@ func (h *Handler) getDirObjectsS3(ctx context.Context, bucketInfo *data.BucketIn
|
|||
objects: make([]ResponseObject, 0, len(nodes)),
|
||||
}
|
||||
for _, node := range nodes {
|
||||
meta := node.GetMeta()
|
||||
meta := node.Meta
|
||||
if meta == nil {
|
||||
continue
|
||||
}
|
||||
|
@ -178,6 +182,9 @@ func (h *Handler) getDirObjectsS3(ctx context.Context, bucketInfo *data.BucketIn
|
|||
attrs[m.GetKey()] = string(m.GetValue())
|
||||
}
|
||||
obj := newListObjectsResponseS3(attrs)
|
||||
if obj.IsDeleteMarker {
|
||||
continue
|
||||
}
|
||||
obj.FilePath = prefix + obj.FileName
|
||||
obj.GetURL = "/get/" + bucketInfo.Name + urlencode(obj.FilePath)
|
||||
result.objects = append(result.objects, obj)
|
||||
|
|
|
@ -1,19 +1,21 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"archive/zip"
|
||||
"bufio"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -23,24 +25,42 @@ import (
|
|||
|
||||
// DownloadByAddressOrBucketName handles download requests using simple cid/oid or bucketname/key format.
|
||||
func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
||||
oidURLParam := c.UserValue("oid").(string)
|
||||
downloadQueryParam := c.QueryArgs().GetBool("download")
|
||||
cidParam := c.UserValue("cid").(string)
|
||||
oidParam := c.UserValue("oid").(string)
|
||||
downloadParam := c.QueryArgs().GetBool("download")
|
||||
|
||||
switch {
|
||||
case isObjectID(oidURLParam):
|
||||
h.byNativeAddress(c, h.receiveFile)
|
||||
case !isContainerRoot(oidURLParam) && (downloadQueryParam || !isDir(oidURLParam)):
|
||||
h.byS3Path(c, h.receiveFile)
|
||||
default:
|
||||
h.browseIndex(c)
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
log := utils.GetReqLogOrDefault(ctx, h.log).With(
|
||||
zap.String("cid", cidParam),
|
||||
zap.String("oid", oidParam),
|
||||
)
|
||||
|
||||
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
|
||||
checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo)
|
||||
if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) {
|
||||
logAndSendBucketError(c, log, checkS3Err)
|
||||
return
|
||||
}
|
||||
|
||||
req := newRequest(c, log)
|
||||
|
||||
var objID oid.ID
|
||||
if checkS3Err == nil && shouldDownload(oidParam, downloadParam) {
|
||||
h.byS3Path(ctx, req, bktInfo.CID, oidParam, h.receiveFile)
|
||||
} else if err = objID.DecodeString(oidParam); err == nil {
|
||||
h.byNativeAddress(ctx, req, bktInfo.CID, objID, h.receiveFile)
|
||||
} else {
|
||||
h.browseIndex(c, checkS3Err != nil)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) *request {
|
||||
return &request{
|
||||
RequestCtx: ctx,
|
||||
log: log,
|
||||
}
|
||||
func shouldDownload(oidParam string, downloadParam bool) bool {
|
||||
return !isDir(oidParam) || downloadParam
|
||||
}
|
||||
|
||||
// DownloadByAttribute handles attribute-based download requests.
|
||||
|
@ -64,13 +84,61 @@ func (h *Handler) search(ctx context.Context, cnrID cid.ID, key, val string, op
|
|||
return h.frostfs.SearchObjects(ctx, prm)
|
||||
}
|
||||
|
||||
func (h *Handler) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer, error) {
|
||||
// DownloadZip handles zip by prefix requests.
|
||||
func (h *Handler) DownloadZip(c *fasthttp.RequestCtx) {
|
||||
scid, _ := c.UserValue("cid").(string)
|
||||
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
log := utils.GetReqLogOrDefault(ctx, h.log)
|
||||
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
resSearch, err := h.searchObjectsByPrefix(c, log, bktInfo.CID)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
c.Response.Header.Set(fasthttp.HeaderContentType, "application/zip")
|
||||
c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"")
|
||||
|
||||
c.SetBodyStreamWriter(h.getZipResponseWriter(ctx, log, resSearch, bktInfo))
|
||||
}
|
||||
|
||||
func (h *Handler) getZipResponseWriter(ctx context.Context, log *zap.Logger, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) {
|
||||
return func(w *bufio.Writer) {
|
||||
defer resSearch.Close()
|
||||
|
||||
buf := make([]byte, 3<<20)
|
||||
zipWriter := zip.NewWriter(w)
|
||||
var objectsWritten int
|
||||
|
||||
errIter := resSearch.Iterate(h.putObjectToArchive(ctx, log, bktInfo.CID, buf,
|
||||
func(obj *object.Object) (io.Writer, error) {
|
||||
objectsWritten++
|
||||
return h.createZipFile(zipWriter, obj)
|
||||
}),
|
||||
)
|
||||
if errIter != nil {
|
||||
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter))
|
||||
return
|
||||
} else if objectsWritten == 0 {
|
||||
log.Warn(logs.ObjectsNotFound)
|
||||
}
|
||||
if err := zipWriter.Close(); err != nil {
|
||||
log.Error(logs.CloseZipWriter, zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) createZipFile(zw *zip.Writer, obj *object.Object) (io.Writer, error) {
|
||||
method := zip.Store
|
||||
if h.config.ZipCompression() {
|
||||
if h.config.ArchiveCompression() {
|
||||
method = zip.Deflate
|
||||
}
|
||||
|
||||
filePath := getZipFilePath(obj)
|
||||
filePath := getFilePath(obj)
|
||||
if len(filePath) == 0 || filePath[len(filePath)-1] == '/' {
|
||||
return nil, fmt.Errorf("invalid filepath '%s'", filePath)
|
||||
}
|
||||
|
@ -82,99 +150,139 @@ func (h *Handler) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer,
|
|||
})
|
||||
}
|
||||
|
||||
// DownloadZipped handles zip by prefix requests.
|
||||
func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
|
||||
// DownloadTar forms tar.gz from objects by prefix.
|
||||
func (h *Handler) DownloadTar(c *fasthttp.RequestCtx) {
|
||||
scid, _ := c.UserValue("cid").(string)
|
||||
prefix, _ := c.UserValue("prefix").(string)
|
||||
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
log := utils.GetReqLogOrDefault(ctx, h.log)
|
||||
|
||||
prefix, err := url.QueryUnescape(prefix)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("prefix", prefix), zap.Error(err))
|
||||
response.Error(c, "could not unescape prefix: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
log = log.With(zap.String("cid", scid), zap.String("prefix", prefix))
|
||||
|
||||
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
|
||||
resSearch, err := h.search(ctx, bktInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||
resSearch, err := h.searchObjectsByPrefix(c, log, bktInfo.CID)
|
||||
if err != nil {
|
||||
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
||||
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
c.Response.Header.Set(fasthttp.HeaderContentType, "application/zip")
|
||||
c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.zip\"")
|
||||
c.Response.SetStatusCode(http.StatusOK)
|
||||
c.Response.Header.Set(fasthttp.HeaderContentType, "application/gzip")
|
||||
c.Response.Header.Set(fasthttp.HeaderContentDisposition, "attachment; filename=\"archive.tar.gz\"")
|
||||
|
||||
c.SetBodyStreamWriter(func(w *bufio.Writer) {
|
||||
c.SetBodyStreamWriter(h.getTarResponseWriter(ctx, log, resSearch, bktInfo))
|
||||
}
|
||||
|
||||
func (h *Handler) getTarResponseWriter(ctx context.Context, log *zap.Logger, resSearch ResObjectSearch, bktInfo *data.BucketInfo) func(w *bufio.Writer) {
|
||||
return func(w *bufio.Writer) {
|
||||
defer resSearch.Close()
|
||||
|
||||
zipWriter := zip.NewWriter(w)
|
||||
compressionLevel := gzip.NoCompression
|
||||
if h.config.ArchiveCompression() {
|
||||
compressionLevel = gzip.DefaultCompression
|
||||
}
|
||||
|
||||
var bufZip []byte
|
||||
var addr oid.Address
|
||||
// ignore error because it's not nil only if compressionLevel argument is invalid
|
||||
gzipWriter, _ := gzip.NewWriterLevel(w, compressionLevel)
|
||||
tarWriter := tar.NewWriter(gzipWriter)
|
||||
|
||||
empty := true
|
||||
called := false
|
||||
btoken := bearerToken(ctx)
|
||||
addr.SetContainer(bktInfo.CID)
|
||||
|
||||
errIter := resSearch.Iterate(func(id oid.ID) bool {
|
||||
called = true
|
||||
|
||||
if empty {
|
||||
bufZip = make([]byte, 3<<20) // the same as for upload
|
||||
defer func() {
|
||||
if err := tarWriter.Close(); err != nil {
|
||||
log.Error(logs.CloseTarWriter, zap.Error(err))
|
||||
}
|
||||
empty = false
|
||||
|
||||
addr.SetObject(id)
|
||||
if err = h.zipObject(ctx, zipWriter, addr, btoken, bufZip); err != nil {
|
||||
log.Error(logs.FailedToAddObjectToArchive, zap.String("oid", id.EncodeToString()), zap.Error(err))
|
||||
if err := gzipWriter.Close(); err != nil {
|
||||
log.Error(logs.CloseGzipWriter, zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
return false
|
||||
})
|
||||
var objectsWritten int
|
||||
buf := make([]byte, 3<<20) // the same as for upload
|
||||
|
||||
errIter := resSearch.Iterate(h.putObjectToArchive(ctx, log, bktInfo.CID, buf,
|
||||
func(obj *object.Object) (io.Writer, error) {
|
||||
objectsWritten++
|
||||
return h.createTarFile(tarWriter, obj)
|
||||
}),
|
||||
)
|
||||
if errIter != nil {
|
||||
log.Error(logs.IteratingOverSelectedObjectsFailed, zap.Error(errIter))
|
||||
} else if !called {
|
||||
log.Error(logs.ObjectsNotFound)
|
||||
} else if objectsWritten == 0 {
|
||||
log.Warn(logs.ObjectsNotFound)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err = zipWriter.Close(); err != nil {
|
||||
log.Error(logs.CloseZipWriter, zap.Error(err))
|
||||
}
|
||||
func (h *Handler) createTarFile(tw *tar.Writer, obj *object.Object) (io.Writer, error) {
|
||||
filePath := getFilePath(obj)
|
||||
if len(filePath) == 0 || filePath[len(filePath)-1] == '/' {
|
||||
return nil, fmt.Errorf("invalid filepath '%s'", filePath)
|
||||
}
|
||||
|
||||
return tw, tw.WriteHeader(&tar.Header{
|
||||
Name: filePath,
|
||||
Mode: 0655,
|
||||
Size: int64(obj.PayloadSize()),
|
||||
})
|
||||
}
|
||||
|
||||
func (h *Handler) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid.Address, btoken *bearer.Token, bufZip []byte) error {
|
||||
prm := PrmObjectGet{
|
||||
PrmAuth: PrmAuth{
|
||||
BearerToken: btoken,
|
||||
},
|
||||
Address: addr,
|
||||
}
|
||||
func (h *Handler) putObjectToArchive(ctx context.Context, log *zap.Logger, cnrID cid.ID, buf []byte, createArchiveHeader func(obj *object.Object) (io.Writer, error)) func(id oid.ID) bool {
|
||||
return func(id oid.ID) bool {
|
||||
log = log.With(zap.String("oid", id.EncodeToString()))
|
||||
|
||||
resGet, err := h.frostfs.GetObject(ctx, prm)
|
||||
prm := PrmObjectGet{
|
||||
PrmAuth: PrmAuth{
|
||||
BearerToken: bearerToken(ctx),
|
||||
},
|
||||
Address: newAddress(cnrID, id),
|
||||
}
|
||||
|
||||
resGet, err := h.frostfs.GetObject(ctx, prm)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToGetObject, zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
fileWriter, err := createArchiveHeader(&resGet.Header)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToAddObjectToArchive, zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
if err = writeToArchive(resGet, fileWriter, buf); err != nil {
|
||||
log.Error(logs.FailedToAddObjectToArchive, zap.Error(err))
|
||||
return false
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) searchObjectsByPrefix(c *fasthttp.RequestCtx, log *zap.Logger, cnrID cid.ID) (ResObjectSearch, error) {
|
||||
scid, _ := c.UserValue("cid").(string)
|
||||
prefix, _ := c.UserValue("prefix").(string)
|
||||
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
|
||||
prefix, err := url.QueryUnescape(prefix)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get FrostFS object: %v", err)
|
||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("prefix", prefix), zap.Error(err))
|
||||
ResponseError(c, "could not unescape prefix: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
objWriter, err := h.addObjectToZip(zipWriter, &resGet.Header)
|
||||
log = log.With(zap.String("cid", scid), zap.String("prefix", prefix))
|
||||
|
||||
resSearch, err := h.search(ctx, cnrID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||
if err != nil {
|
||||
return fmt.Errorf("zip create header: %v", err)
|
||||
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
||||
ResponseError(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return nil, err
|
||||
}
|
||||
return resSearch, nil
|
||||
}
|
||||
|
||||
if _, err = io.CopyBuffer(objWriter, resGet.Payload, bufZip); err != nil {
|
||||
func writeToArchive(resGet *Object, objWriter io.Writer, buf []byte) error {
|
||||
var err error
|
||||
if _, err = io.CopyBuffer(objWriter, resGet.Payload, buf); err != nil {
|
||||
return fmt.Errorf("copy object payload to zip file: %v", err)
|
||||
}
|
||||
|
||||
|
@ -182,14 +290,10 @@ func (h *Handler) zipObject(ctx context.Context, zipWriter *zip.Writer, addr oid
|
|||
return fmt.Errorf("object body close error: %w", err)
|
||||
}
|
||||
|
||||
if err = zipWriter.Flush(); err != nil {
|
||||
return fmt.Errorf("flush zip writer: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getZipFilePath(obj *object.Object) string {
|
||||
func getFilePath(obj *object.Object) string {
|
||||
for _, attr := range obj.Attributes() {
|
||||
if attr.Key() == object.AttributeFilePath {
|
||||
return attr.Value()
|
||||
|
|
|
@ -11,9 +11,8 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
|
@ -29,12 +28,13 @@ import (
|
|||
|
||||
type Config interface {
|
||||
DefaultTimestamp() bool
|
||||
ZipCompression() bool
|
||||
ArchiveCompression() bool
|
||||
ClientCut() bool
|
||||
IndexPageEnabled() bool
|
||||
IndexPageTemplate() string
|
||||
BufferMaxSizeForPut() uint64
|
||||
NamespaceHeader() string
|
||||
EnableFilepathFallback() bool
|
||||
}
|
||||
|
||||
// PrmContainer groups parameters of FrostFS.Container operation.
|
||||
|
@ -139,6 +139,8 @@ var (
|
|||
ErrAccessDenied = errors.New("access denied")
|
||||
// ErrGatewayTimeout is returned from FrostFS in case of timeout, deadline exceeded etc.
|
||||
ErrGatewayTimeout = errors.New("gateway timeout")
|
||||
// ErrQuotaLimitReached is returned from FrostFS in case of quota exceeded.
|
||||
ErrQuotaLimitReached = errors.New("quota limit reached")
|
||||
)
|
||||
|
||||
// FrostFS represents virtual connection to FrostFS network.
|
||||
|
@ -164,7 +166,7 @@ type Handler struct {
|
|||
ownerID *user.ID
|
||||
config Config
|
||||
containerResolver ContainerResolver
|
||||
tree *tree.Tree
|
||||
tree layer.TreeService
|
||||
cache *cache.BucketCache
|
||||
workerPool *ants.Pool
|
||||
}
|
||||
|
@ -177,7 +179,7 @@ type AppParams struct {
|
|||
Cache *cache.BucketCache
|
||||
}
|
||||
|
||||
func New(params *AppParams, config Config, tree *tree.Tree, workerPool *ants.Pool) *Handler {
|
||||
func New(params *AppParams, config Config, tree layer.TreeService, workerPool *ants.Pool) *Handler {
|
||||
return &Handler{
|
||||
log: params.Logger,
|
||||
frostfs: params.FrostFS,
|
||||
|
@ -192,77 +194,34 @@ func New(params *AppParams, config Config, tree *tree.Tree, workerPool *ants.Poo
|
|||
|
||||
// byNativeAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||
// prepares request and object address to it.
|
||||
func (h *Handler) byNativeAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
idCnr, _ := c.UserValue("cid").(string)
|
||||
idObj, _ := url.PathUnescape(c.UserValue("oid").(string))
|
||||
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
||||
log := reqLog.With(zap.String("cid", idCnr), zap.String("oid", idObj))
|
||||
|
||||
bktInfo, err := h.getBucketInfo(ctx, idCnr, log)
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
|
||||
objID := new(oid.ID)
|
||||
if err = objID.DecodeString(idObj); err != nil {
|
||||
log.Error(logs.WrongObjectID, zap.Error(err))
|
||||
response.Error(c, "wrong object id", fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
addr := newAddress(bktInfo.CID, *objID)
|
||||
|
||||
f(ctx, *h.newRequest(c, log), addr)
|
||||
func (h *Handler) byNativeAddress(ctx context.Context, req request, cnrID cid.ID, objID oid.ID, handler func(context.Context, request, oid.Address)) {
|
||||
addr := newAddress(cnrID, objID)
|
||||
handler(ctx, req, addr)
|
||||
}
|
||||
|
||||
// byS3Path is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||
// resolves object address from S3-like path <bucket name>/<object key>.
|
||||
func (h *Handler) byS3Path(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
bucketname := c.UserValue("cid").(string)
|
||||
key := c.UserValue("oid").(string)
|
||||
func (h *Handler) byS3Path(ctx context.Context, req request, cnrID cid.ID, path string, handler func(context.Context, request, oid.Address)) {
|
||||
c, log := req.RequestCtx, req.log
|
||||
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
reqLog := utils.GetReqLogOrDefault(ctx, h.log)
|
||||
log := reqLog.With(zap.String("bucketname", bucketname), zap.String("key", key))
|
||||
|
||||
unescapedKey, err := url.QueryUnescape(key)
|
||||
foundOID, err := h.tree.GetLatestVersion(ctx, &cnrID, path)
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
|
||||
bktInfo, err := h.getBucketInfo(ctx, bucketname, log)
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
|
||||
foundOid, err := h.tree.GetLatestVersion(ctx, &bktInfo.CID, unescapedKey)
|
||||
if err != nil {
|
||||
if errors.Is(err, tree.ErrNodeAccessDenied) {
|
||||
response.Error(c, "Access Denied", fasthttp.StatusForbidden)
|
||||
} else {
|
||||
response.Error(c, "object wasn't found", fasthttp.StatusNotFound)
|
||||
log.Error(logs.GetLatestObjectVersion, zap.Error(err))
|
||||
}
|
||||
return
|
||||
}
|
||||
if foundOid.DeleteMarker {
|
||||
if foundOID.IsDeleteMarker {
|
||||
log.Error(logs.ObjectWasDeleted)
|
||||
response.Error(c, "object deleted", fasthttp.StatusNotFound)
|
||||
ResponseError(c, "object deleted", fasthttp.StatusNotFound)
|
||||
return
|
||||
}
|
||||
addr := newAddress(bktInfo.CID, foundOid.OID)
|
||||
|
||||
f(ctx, *h.newRequest(c, log), addr)
|
||||
addr := newAddress(cnrID, foundOID.OID)
|
||||
handler(ctx, newRequest(c, log), addr)
|
||||
}
|
||||
|
||||
// byAttribute is a wrapper similar to byNativeAddress.
|
||||
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
scid, _ := c.UserValue("cid").(string)
|
||||
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, handler func(context.Context, request, oid.Address)) {
|
||||
cidParam, _ := c.UserValue("cid").(string)
|
||||
key, _ := c.UserValue("attr_key").(string)
|
||||
val, _ := c.UserValue("attr_val").(string)
|
||||
|
||||
|
@ -271,55 +230,78 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re
|
|||
|
||||
key, err := url.QueryUnescape(key)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("attr_key", key), zap.Error(err))
|
||||
response.Error(c, "could not unescape attr_key: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", cidParam), zap.String("attr_key", key), zap.Error(err))
|
||||
ResponseError(c, "could not unescape attr_key: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
val, err = url.QueryUnescape(val)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("attr_val", val), zap.Error(err))
|
||||
response.Error(c, "could not unescape attr_val: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
log.Error(logs.FailedToUnescapeQuery, zap.String("cid", cidParam), zap.String("attr_val", val), zap.Error(err))
|
||||
ResponseError(c, "could not unescape attr_val: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
log = log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
|
||||
log = log.With(zap.String("cid", cidParam), zap.String("attr_key", key), zap.String("attr_val", val))
|
||||
|
||||
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
||||
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
|
||||
if err != nil {
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
|
||||
res, err := h.search(ctx, bktInfo.CID, key, val, object.MatchStringEqual)
|
||||
objID, err := h.findObjectByAttribute(ctx, log, bktInfo.CID, key, val)
|
||||
if err != nil {
|
||||
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
||||
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
if errors.Is(err, io.EOF) {
|
||||
ResponseError(c, err.Error(), fasthttp.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
||||
ResponseError(c, err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(bktInfo.CID)
|
||||
addr.SetObject(objID)
|
||||
|
||||
handler(ctx, newRequest(c, log), addr)
|
||||
}
|
||||
|
||||
func (h *Handler) findObjectByAttribute(ctx context.Context, log *zap.Logger, cnrID cid.ID, attrKey, attrVal string) (oid.ID, error) {
|
||||
res, err := h.search(ctx, cnrID, attrKey, attrVal, object.MatchStringEqual)
|
||||
if err != nil {
|
||||
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
||||
return oid.ID{}, fmt.Errorf("could not search for objects: %w", err)
|
||||
}
|
||||
defer res.Close()
|
||||
|
||||
buf := make([]oid.ID, 1)
|
||||
|
||||
n, err := res.Read(buf)
|
||||
if n == 0 {
|
||||
if errors.Is(err, io.EOF) {
|
||||
switch {
|
||||
case errors.Is(err, io.EOF) && h.needSearchByFileName(attrKey, attrVal):
|
||||
log.Debug(logs.ObjectNotFoundByFilePathTrySearchByFileName)
|
||||
return h.findObjectByAttribute(ctx, log, cnrID, attrFileName, attrVal)
|
||||
case errors.Is(err, io.EOF):
|
||||
log.Error(logs.ObjectNotFound, zap.Error(err))
|
||||
response.Error(c, "object not found", fasthttp.StatusNotFound)
|
||||
return
|
||||
return oid.ID{}, fmt.Errorf("object not found: %w", err)
|
||||
default:
|
||||
log.Error(logs.ReadObjectListFailed, zap.Error(err))
|
||||
return oid.ID{}, fmt.Errorf("read object list failed: %w", err)
|
||||
}
|
||||
|
||||
log.Error(logs.ReadObjectListFailed, zap.Error(err))
|
||||
response.Error(c, "read object list failed: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
var addrObj oid.Address
|
||||
addrObj.SetContainer(bktInfo.CID)
|
||||
addrObj.SetObject(buf[0])
|
||||
return buf[0], nil
|
||||
}
|
||||
|
||||
f(ctx, *h.newRequest(c, log), addrObj)
|
||||
func (h *Handler) needSearchByFileName(key, val string) bool {
|
||||
if key != attrFilePath || !h.config.EnableFilepathFallback() {
|
||||
return false
|
||||
}
|
||||
|
||||
return strings.HasPrefix(val, "/") && strings.Count(val, "/") == 1 || !strings.Contains(val, "/")
|
||||
}
|
||||
|
||||
// resolveContainer decode container id, if it's not a valid container id
|
||||
|
@ -384,11 +366,12 @@ func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.Bucket
|
|||
}
|
||||
|
||||
bktInfo.HomomorphicHashDisabled = container.IsHomomorphicHashingDisabled(*res)
|
||||
bktInfo.PlacementPolicy = res.PlacementPolicy()
|
||||
|
||||
return bktInfo, err
|
||||
}
|
||||
|
||||
func (h *Handler) browseIndex(c *fasthttp.RequestCtx) {
|
||||
func (h *Handler) browseIndex(c *fasthttp.RequestCtx, isNativeList bool) {
|
||||
if !h.config.IndexPageEnabled() {
|
||||
c.SetStatusCode(fasthttp.StatusNotFound)
|
||||
return
|
||||
|
@ -414,18 +397,9 @@ func (h *Handler) browseIndex(c *fasthttp.RequestCtx) {
|
|||
}
|
||||
|
||||
listFunc := h.getDirObjectsS3
|
||||
isNativeList := false
|
||||
|
||||
err = h.tree.CheckSettingsNodeExist(ctx, bktInfo)
|
||||
if err != nil {
|
||||
if errors.Is(err, tree.ErrNodeNotFound) {
|
||||
// tree probe failed, try to use native
|
||||
listFunc = h.getDirObjectsNative
|
||||
isNativeList = true
|
||||
} else {
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
if isNativeList {
|
||||
// tree probe failed, trying to use native
|
||||
listFunc = h.getDirObjectsNative
|
||||
}
|
||||
|
||||
h.browseObjects(c, browseParams{
|
||||
|
|
|
@ -517,7 +517,7 @@ func DoFuzzDownloadZipped(input []byte) int {
|
|||
r.SetUserValue("cid", cid)
|
||||
r.SetUserValue("prefix", prefix)
|
||||
|
||||
hc.Handler().DownloadZipped(r)
|
||||
hc.Handler().DownloadZip(r)
|
||||
|
||||
return fuzzSuccessExitCode
|
||||
}
|
||||
|
|
|
@ -14,8 +14,8 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
||||
|
@ -32,25 +32,41 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type treeClientMock struct {
|
||||
type treeServiceMock struct {
|
||||
system map[string]map[string]*data.BaseNodeVersion
|
||||
}
|
||||
|
||||
func (t *treeClientMock) GetNodes(context.Context, *tree.GetNodesParams) ([]tree.NodeResponse, error) {
|
||||
return nil, nil
|
||||
func newTreeService() *treeServiceMock {
|
||||
return &treeServiceMock{
|
||||
system: make(map[string]map[string]*data.BaseNodeVersion),
|
||||
}
|
||||
}
|
||||
|
||||
func (t *treeClientMock) GetSubTree(context.Context, *data.BucketInfo, string, []uint64, uint32, bool) ([]tree.NodeResponse, error) {
|
||||
func (t *treeServiceMock) CheckSettingsNodeExists(context.Context, *data.BucketInfo) error {
|
||||
_, ok := t.system["bucket-settings"]
|
||||
if !ok {
|
||||
return layer.ErrNodeNotFound
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *treeServiceMock) GetSubTreeByPrefix(context.Context, *data.BucketInfo, string, bool) ([]data.NodeInfo, string, error) {
|
||||
return nil, "", nil
|
||||
}
|
||||
|
||||
func (t *treeServiceMock) GetLatestVersion(context.Context, *cid.ID, string) (*data.NodeVersion, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
type configMock struct {
|
||||
additionalSearch bool
|
||||
}
|
||||
|
||||
func (c *configMock) DefaultTimestamp() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (c *configMock) ZipCompression() bool {
|
||||
func (c *configMock) ArchiveCompression() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -78,13 +94,17 @@ func (c *configMock) NamespaceHeader() string {
|
|||
return ""
|
||||
}
|
||||
|
||||
func (c *configMock) EnableFilepathFallback() bool {
|
||||
return c.additionalSearch
|
||||
}
|
||||
|
||||
type handlerContext struct {
|
||||
key *keys.PrivateKey
|
||||
owner user.ID
|
||||
|
||||
h *Handler
|
||||
frostfs *TestFrostFS
|
||||
tree *treeClientMock
|
||||
tree *treeServiceMock
|
||||
cfg *configMock
|
||||
}
|
||||
|
||||
|
@ -122,17 +142,17 @@ func prepareHandlerContext() (*handlerContext, error) {
|
|||
Size: 1,
|
||||
Lifetime: 1,
|
||||
Logger: logger,
|
||||
}),
|
||||
}, false),
|
||||
}
|
||||
|
||||
treeMock := &treeClientMock{}
|
||||
treeMock := newTreeService()
|
||||
cfgMock := &configMock{}
|
||||
|
||||
workerPool, err := ants.NewPool(1000)
|
||||
workerPool, err := ants.NewPool(1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
handler := New(params, cfgMock, tree.NewTree(treeMock), workerPool)
|
||||
handler := New(params, cfgMock, treeMock, workerPool)
|
||||
|
||||
return &handlerContext{
|
||||
key: key,
|
||||
|
@ -199,10 +219,8 @@ func TestBasic(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
|
||||
obj := hc.frostfs.objects[putRes.ContainerID+"/"+putRes.ObjectID]
|
||||
attr := object.NewAttribute()
|
||||
attr.SetKey(object.AttributeFilePath)
|
||||
attr.SetValue(objFileName)
|
||||
obj.SetAttributes(append(obj.Attributes(), *attr)...)
|
||||
attr := prepareObjectAttributes(object.AttributeFilePath, objFileName)
|
||||
obj.SetAttributes(append(obj.Attributes(), attr)...)
|
||||
|
||||
t.Run("get", func(t *testing.T) {
|
||||
r = prepareGetRequest(ctx, cnrID.EncodeToString(), putRes.ObjectID)
|
||||
|
@ -232,7 +250,7 @@ func TestBasic(t *testing.T) {
|
|||
|
||||
t.Run("zip", func(t *testing.T) {
|
||||
r = prepareGetZipped(ctx, bktName, "")
|
||||
hc.Handler().DownloadZipped(r)
|
||||
hc.Handler().DownloadZip(r)
|
||||
|
||||
readerAt := bytes.NewReader(r.Response.Body())
|
||||
zipReader, err := zip.NewReader(readerAt, int64(len(r.Response.Body())))
|
||||
|
@ -251,6 +269,159 @@ func TestBasic(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestFindObjectByAttribute(t *testing.T) {
|
||||
hc, err := prepareHandlerContext()
|
||||
require.NoError(t, err)
|
||||
hc.cfg.additionalSearch = true
|
||||
|
||||
bktName := "bucket"
|
||||
cnrID, cnr, err := hc.prepareContainer(bktName, acl.PublicRWExtended)
|
||||
require.NoError(t, err)
|
||||
hc.frostfs.SetContainer(cnrID, cnr)
|
||||
|
||||
ctx := context.Background()
|
||||
ctx = middleware.SetNamespace(ctx, "")
|
||||
|
||||
content := "hello"
|
||||
r, err := prepareUploadRequest(ctx, cnrID.EncodeToString(), content)
|
||||
require.NoError(t, err)
|
||||
|
||||
hc.Handler().Upload(r)
|
||||
require.Equal(t, r.Response.StatusCode(), http.StatusOK)
|
||||
|
||||
var putRes putResponse
|
||||
err = json.Unmarshal(r.Response.Body(), &putRes)
|
||||
require.NoError(t, err)
|
||||
|
||||
testAttrVal1 := "test-attr-val1"
|
||||
testAttrVal2 := "test-attr-val2"
|
||||
testAttrVal3 := "test-attr-val3"
|
||||
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
firstAttr object.Attribute
|
||||
secondAttr object.Attribute
|
||||
reqAttrKey string
|
||||
reqAttrValue string
|
||||
err string
|
||||
additionalSearch bool
|
||||
}{
|
||||
{
|
||||
name: "success search by FileName",
|
||||
firstAttr: prepareObjectAttributes(attrFilePath, testAttrVal1),
|
||||
secondAttr: prepareObjectAttributes(attrFileName, testAttrVal2),
|
||||
reqAttrKey: attrFileName,
|
||||
reqAttrValue: testAttrVal2,
|
||||
additionalSearch: false,
|
||||
},
|
||||
{
|
||||
name: "failed search by FileName",
|
||||
firstAttr: prepareObjectAttributes(attrFilePath, testAttrVal1),
|
||||
secondAttr: prepareObjectAttributes(attrFileName, testAttrVal2),
|
||||
reqAttrKey: attrFileName,
|
||||
reqAttrValue: testAttrVal3,
|
||||
err: "not found",
|
||||
additionalSearch: false,
|
||||
},
|
||||
{
|
||||
name: "success search by FilePath (with additional search)",
|
||||
firstAttr: prepareObjectAttributes(attrFilePath, testAttrVal1),
|
||||
secondAttr: prepareObjectAttributes(attrFileName, testAttrVal2),
|
||||
reqAttrKey: attrFilePath,
|
||||
reqAttrValue: testAttrVal2,
|
||||
additionalSearch: true,
|
||||
},
|
||||
{
|
||||
name: "failed by FilePath (with additional search)",
|
||||
firstAttr: prepareObjectAttributes(attrFilePath, testAttrVal1),
|
||||
secondAttr: prepareObjectAttributes(attrFileName, testAttrVal2),
|
||||
reqAttrKey: attrFilePath,
|
||||
reqAttrValue: testAttrVal3,
|
||||
err: "not found",
|
||||
additionalSearch: true,
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
obj := hc.frostfs.objects[putRes.ContainerID+"/"+putRes.ObjectID]
|
||||
obj.SetAttributes(tc.firstAttr, tc.secondAttr)
|
||||
hc.cfg.additionalSearch = tc.additionalSearch
|
||||
|
||||
objID, err := hc.Handler().findObjectByAttribute(ctx, hc.Handler().log, cnrID, tc.reqAttrKey, tc.reqAttrValue)
|
||||
if tc.err != "" {
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), tc.err)
|
||||
return
|
||||
}
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, putRes.ObjectID, objID.EncodeToString())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestNeedSearchByFileName(t *testing.T) {
|
||||
hc, err := prepareHandlerContext()
|
||||
require.NoError(t, err)
|
||||
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
attrKey string
|
||||
attrVal string
|
||||
additionalSearch bool
|
||||
expected bool
|
||||
}{
|
||||
{
|
||||
name: "need search - not contains slash",
|
||||
attrKey: attrFilePath,
|
||||
attrVal: "cat.png",
|
||||
additionalSearch: true,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "need search - single lead slash",
|
||||
attrKey: attrFilePath,
|
||||
attrVal: "/cat.png",
|
||||
additionalSearch: true,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "don't need search - single slash but not lead",
|
||||
attrKey: attrFilePath,
|
||||
attrVal: "cats/cat.png",
|
||||
additionalSearch: true,
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "don't need search - more one slash",
|
||||
attrKey: attrFilePath,
|
||||
attrVal: "/cats/cat.png",
|
||||
additionalSearch: true,
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "don't need search - incorrect attribute key",
|
||||
attrKey: attrFileName,
|
||||
attrVal: "cat.png",
|
||||
additionalSearch: true,
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
name: "don't need search - additional search disabled",
|
||||
attrKey: attrFilePath,
|
||||
attrVal: "cat.png",
|
||||
additionalSearch: false,
|
||||
expected: false,
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
hc.cfg.additionalSearch = tc.additionalSearch
|
||||
|
||||
res := hc.h.needSearchByFileName(tc.attrKey, tc.attrVal)
|
||||
require.Equal(t, tc.expected, res)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func prepareUploadRequest(ctx context.Context, bucket, content string) (*fasthttp.RequestCtx, error) {
|
||||
r := new(fasthttp.RequestCtx)
|
||||
utils.SetContextToRequest(ctx, r)
|
||||
|
@ -283,6 +454,13 @@ func prepareGetZipped(ctx context.Context, bucket, prefix string) *fasthttp.Requ
|
|||
return r
|
||||
}
|
||||
|
||||
func prepareObjectAttributes(attrKey, attrValue string) object.Attribute {
|
||||
attr := object.NewAttribute()
|
||||
attr.SetKey(attrKey)
|
||||
attr.SetValue(attrValue)
|
||||
return *attr
|
||||
}
|
||||
|
||||
const (
|
||||
keyAttr = "User-Attribute"
|
||||
valAttr = "user value"
|
||||
|
|
|
@ -2,11 +2,13 @@ package handler
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -43,7 +45,11 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
|
|||
}
|
||||
|
||||
req.Response.Header.Set(fasthttp.HeaderContentLength, strconv.FormatUint(obj.PayloadSize(), 10))
|
||||
var contentType string
|
||||
var (
|
||||
contentType string
|
||||
filename string
|
||||
filepath string
|
||||
)
|
||||
for _, attr := range obj.Attributes() {
|
||||
key := attr.Key()
|
||||
val := attr.Value()
|
||||
|
@ -67,8 +73,15 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
|
|||
req.Response.Header.Set(fasthttp.HeaderLastModified, time.Unix(value, 0).UTC().Format(http.TimeFormat))
|
||||
case object.AttributeContentType:
|
||||
contentType = val
|
||||
case object.AttributeFilePath:
|
||||
filepath = val
|
||||
case object.AttributeFileName:
|
||||
filename = val
|
||||
}
|
||||
}
|
||||
if filename == "" {
|
||||
filename = filepath
|
||||
}
|
||||
|
||||
idsToResponse(&req.Response, obj)
|
||||
|
||||
|
@ -83,7 +96,7 @@ func (h *Handler) headObject(ctx context.Context, req request, objectAddress oid
|
|||
}
|
||||
|
||||
return h.frostfs.RangeObject(ctx, prmRange)
|
||||
})
|
||||
}, filename)
|
||||
if err != nil && err != io.EOF {
|
||||
req.handleFrostFSErr(err, start)
|
||||
return
|
||||
|
@ -102,14 +115,36 @@ func idsToResponse(resp *fasthttp.Response, obj *object.Object) {
|
|||
|
||||
// HeadByAddressOrBucketName handles head requests using simple cid/oid or bucketname/key format.
|
||||
func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
||||
test, _ := c.UserValue("oid").(string)
|
||||
var id oid.ID
|
||||
cidParam, _ := c.UserValue("cid").(string)
|
||||
oidParam, _ := c.UserValue("oid").(string)
|
||||
|
||||
err := id.DecodeString(test)
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
log := utils.GetReqLogOrDefault(ctx, h.log).With(
|
||||
zap.String("cid", cidParam),
|
||||
zap.String("oid", oidParam),
|
||||
)
|
||||
|
||||
bktInfo, err := h.getBucketInfo(ctx, cidParam, log)
|
||||
if err != nil {
|
||||
h.byS3Path(c, h.headObject)
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
checkS3Err := h.tree.CheckSettingsNodeExists(ctx, bktInfo)
|
||||
if checkS3Err != nil && !errors.Is(checkS3Err, layer.ErrNodeNotFound) {
|
||||
logAndSendBucketError(c, log, checkS3Err)
|
||||
return
|
||||
}
|
||||
|
||||
req := newRequest(c, log)
|
||||
|
||||
var objID oid.ID
|
||||
if checkS3Err == nil {
|
||||
h.byS3Path(ctx, req, bktInfo.CID, oidParam, h.headObject)
|
||||
} else if err = objID.DecodeString(oidParam); err == nil {
|
||||
h.byNativeAddress(ctx, req, bktInfo.CID, objID, h.headObject)
|
||||
} else {
|
||||
h.byNativeAddress(c, h.headObject)
|
||||
logAndSendBucketError(c, log, checkS3Err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -42,7 +42,9 @@ func fetchMultipartFile(l *zap.Logger, r io.Reader, boundary string) (MultipartF
|
|||
// ignore multipart/form-data values
|
||||
if filename == "" {
|
||||
l.Debug(logs.IgnorePartEmptyFilename, zap.String("form", name))
|
||||
|
||||
if err = part.Close(); err != nil {
|
||||
l.Warn(logs.FailedToCloseReader, zap.Error(err))
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
@ -4,13 +4,14 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"mime"
|
||||
"net/http"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -25,7 +26,7 @@ type readCloser struct {
|
|||
|
||||
// initializes io.Reader with the limited size and detects Content-Type from it.
|
||||
// Returns r's error directly. Also returns the processed data.
|
||||
func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (string, []byte, error) {
|
||||
func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error), filename string) (string, []byte, error) {
|
||||
if maxSize > sizeToDetectType {
|
||||
maxSize = sizeToDetectType
|
||||
}
|
||||
|
@ -44,7 +45,20 @@ func readContentType(maxSize uint64, rInit func(uint64) (io.Reader, error)) (str
|
|||
|
||||
buf = buf[:n]
|
||||
|
||||
return http.DetectContentType(buf), buf, err // to not lose io.EOF
|
||||
contentType := http.DetectContentType(buf)
|
||||
|
||||
// Since the detector detects the "text/plain" content type for various types of text files,
|
||||
// including CSS, JavaScript, and CSV files,
|
||||
// we'll determine the final content type based on the file's extension.
|
||||
if strings.HasPrefix(contentType, "text/plain") {
|
||||
ext := path.Ext(filename)
|
||||
// If the file doesn't have a file extension, we'll keep the content type as is.
|
||||
if len(ext) > 0 {
|
||||
contentType = mime.TypeByExtension(ext)
|
||||
}
|
||||
}
|
||||
|
||||
return contentType, buf, err // to not lose io.EOF
|
||||
}
|
||||
|
||||
type getMultiobjectBodyParams struct {
|
||||
|
@ -128,10 +142,10 @@ func (h *Handler) receiveFile(ctx context.Context, req request, objAddress oid.A
|
|||
|
||||
contentType, payloadHead, err = readContentType(payloadSize, func(uint64) (io.Reader, error) {
|
||||
return payload, nil
|
||||
})
|
||||
}, filename)
|
||||
if err != nil && err != io.EOF {
|
||||
req.log.Error(logs.CouldNotDetectContentTypeFromPayload, zap.Error(err))
|
||||
response.Error(req.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
ResponseError(req.RequestCtx, "could not detect Content-Type from payload: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -10,39 +10,80 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const (
|
||||
txtContentType = "text/plain; charset=utf-8"
|
||||
cssContentType = "text/css; charset=utf-8"
|
||||
htmlContentType = "text/html; charset=utf-8"
|
||||
javascriptContentType = "text/javascript; charset=utf-8"
|
||||
|
||||
htmlBody = "<!DOCTYPE html><html ><head><meta charset=\"utf-8\"><title>Test Html</title>"
|
||||
)
|
||||
|
||||
func TestDetector(t *testing.T) {
|
||||
txtContentType := "text/plain; charset=utf-8"
|
||||
sb := strings.Builder{}
|
||||
for i := 0; i < 10; i++ {
|
||||
sb.WriteString("Some txt content. Content-Type must be detected properly by detector.")
|
||||
}
|
||||
|
||||
for _, tc := range []struct {
|
||||
Name string
|
||||
ContentType string
|
||||
Expected string
|
||||
Name string
|
||||
ExpectedContentType string
|
||||
Content string
|
||||
FileName string
|
||||
}{
|
||||
{
|
||||
Name: "less than 512b",
|
||||
ContentType: txtContentType,
|
||||
Expected: sb.String()[:256],
|
||||
Name: "less than 512b",
|
||||
ExpectedContentType: txtContentType,
|
||||
Content: sb.String()[:256],
|
||||
FileName: "test.txt",
|
||||
},
|
||||
{
|
||||
Name: "more than 512b",
|
||||
ContentType: txtContentType,
|
||||
Expected: sb.String(),
|
||||
Name: "more than 512b",
|
||||
ExpectedContentType: txtContentType,
|
||||
Content: sb.String(),
|
||||
FileName: "test.txt",
|
||||
},
|
||||
{
|
||||
Name: "css content type",
|
||||
ExpectedContentType: cssContentType,
|
||||
Content: sb.String(),
|
||||
FileName: "test.css",
|
||||
},
|
||||
{
|
||||
Name: "javascript content type",
|
||||
ExpectedContentType: javascriptContentType,
|
||||
Content: sb.String(),
|
||||
FileName: "test.js",
|
||||
},
|
||||
{
|
||||
Name: "html content type by file content",
|
||||
ExpectedContentType: htmlContentType,
|
||||
Content: htmlBody,
|
||||
FileName: "test.detect-by-content",
|
||||
},
|
||||
{
|
||||
Name: "html content type by file extension",
|
||||
ExpectedContentType: htmlContentType,
|
||||
Content: sb.String(),
|
||||
FileName: "test.html",
|
||||
},
|
||||
{
|
||||
Name: "empty file extension",
|
||||
ExpectedContentType: txtContentType,
|
||||
Content: sb.String(),
|
||||
FileName: "test",
|
||||
},
|
||||
} {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
contentType, data, err := readContentType(uint64(len(tc.Expected)),
|
||||
contentType, data, err := readContentType(uint64(len(tc.Content)),
|
||||
func(uint64) (io.Reader, error) {
|
||||
return strings.NewReader(tc.Expected), nil
|
||||
},
|
||||
return strings.NewReader(tc.Content), nil
|
||||
}, tc.FileName,
|
||||
)
|
||||
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.ContentType, contentType)
|
||||
require.True(t, strings.HasPrefix(tc.Expected, string(data)))
|
||||
require.Equal(t, tc.ExpectedContentType, contentType)
|
||||
require.True(t, strings.HasPrefix(tc.Content, string(data)))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,15 +1,20 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
|
@ -20,8 +25,9 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
jsonHeader = "application/json; charset=UTF-8"
|
||||
drainBufSize = 4096
|
||||
jsonHeader = "application/json; charset=UTF-8"
|
||||
drainBufSize = 4096
|
||||
explodeArchiveHeader = "X-Explode-Archive"
|
||||
)
|
||||
|
||||
type putResponse struct {
|
||||
|
@ -44,11 +50,7 @@ func (pr *putResponse) encode(w io.Writer) error {
|
|||
|
||||
// Upload handles multipart upload request.
|
||||
func (h *Handler) Upload(c *fasthttp.RequestCtx) {
|
||||
var (
|
||||
file MultipartFile
|
||||
idObj oid.ID
|
||||
addr oid.Address
|
||||
)
|
||||
var file MultipartFile
|
||||
|
||||
scid, _ := c.UserValue("cid").(string)
|
||||
bodyStream := c.RequestBodyStream()
|
||||
|
@ -64,76 +66,78 @@ func (h *Handler) Upload(c *fasthttp.RequestCtx) {
|
|||
return
|
||||
}
|
||||
|
||||
defer func() {
|
||||
// If the temporary reader can be closed - let's close it.
|
||||
if file == nil {
|
||||
return
|
||||
}
|
||||
err := file.Close()
|
||||
log.Debug(
|
||||
logs.CloseTemporaryMultipartFormFile,
|
||||
zap.Stringer("address", addr),
|
||||
zap.String("filename", file.FileName()),
|
||||
zap.Error(err),
|
||||
)
|
||||
}()
|
||||
|
||||
boundary := string(c.Request.Header.MultipartFormBoundary())
|
||||
if file, err = fetchMultipartFile(log, bodyStream, boundary); err != nil {
|
||||
log.Error(logs.CouldNotReceiveMultipartForm, zap.Error(err))
|
||||
response.Error(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
ResponseError(c, "could not receive multipart/form: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
filtered, err := filterHeaders(log, &c.Request.Header)
|
||||
if err != nil {
|
||||
log.Error(logs.CouldNotProcessHeaders, zap.Error(err))
|
||||
response.Error(c, err.Error(), fasthttp.StatusBadRequest)
|
||||
log.Error(logs.FailedToFilterHeaders, zap.Error(err))
|
||||
ResponseError(c, err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
|
||||
if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil {
|
||||
log.Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err))
|
||||
} else {
|
||||
now = parsed
|
||||
if c.Request.Header.Peek(explodeArchiveHeader) != nil {
|
||||
h.explodeArchive(request{c, log}, bktInfo, file, filtered)
|
||||
} else {
|
||||
h.uploadSingleObject(request{c, log}, bktInfo, file, filtered)
|
||||
}
|
||||
|
||||
// Multipart is multipart and thus can contain more than one part which
|
||||
// we ignore at the moment. Also, when dealing with chunked encoding
|
||||
// the last zero-length chunk might be left unread (because multipart
|
||||
// reader only cares about its boundary and doesn't look further) and
|
||||
// it will be (erroneously) interpreted as the start of the next
|
||||
// pipelined header. Thus, we need to drain the body buffer.
|
||||
for {
|
||||
_, err = bodyStream.Read(drainBuf)
|
||||
if err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if err = utils.PrepareExpirationHeader(c, h.frostfs, filtered, now); err != nil {
|
||||
log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err))
|
||||
response.Error(c, "could not prepare expiration header: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
func (h *Handler) uploadSingleObject(req request, bkt *data.BucketInfo, file MultipartFile, filtered map[string]string) {
|
||||
c, log := req.RequestCtx, req.log
|
||||
setIfNotExist(filtered, object.AttributeFileName, file.FileName())
|
||||
|
||||
attributes, err := h.extractAttributes(c, log, filtered)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToGetAttributes, zap.Error(err))
|
||||
ResponseError(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
attributes := make([]object.Attribute, 0, len(filtered))
|
||||
// prepares attributes from filtered headers
|
||||
for key, val := range filtered {
|
||||
attribute := object.NewAttribute()
|
||||
attribute.SetKey(key)
|
||||
attribute.SetValue(val)
|
||||
attributes = append(attributes, *attribute)
|
||||
idObj, err := h.uploadObject(c, bkt, attributes, file)
|
||||
if err != nil {
|
||||
h.handlePutFrostFSErr(c, err, log)
|
||||
return
|
||||
}
|
||||
// sets FileName attribute if it wasn't set from header
|
||||
if _, ok := filtered[object.AttributeFileName]; !ok {
|
||||
filename := object.NewAttribute()
|
||||
filename.SetKey(object.AttributeFileName)
|
||||
filename.SetValue(file.FileName())
|
||||
attributes = append(attributes, *filename)
|
||||
}
|
||||
// sets Timestamp attribute if it wasn't set from header and enabled by settings
|
||||
if _, ok := filtered[object.AttributeTimestamp]; !ok && h.config.DefaultTimestamp() {
|
||||
timestamp := object.NewAttribute()
|
||||
timestamp.SetKey(object.AttributeTimestamp)
|
||||
timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10))
|
||||
attributes = append(attributes, *timestamp)
|
||||
log.Debug(logs.ObjectUploaded,
|
||||
zap.String("oid", idObj.EncodeToString()),
|
||||
zap.String("FileName", file.FileName()),
|
||||
)
|
||||
|
||||
addr := newAddress(bkt.CID, idObj)
|
||||
c.Response.Header.SetContentType(jsonHeader)
|
||||
// Try to return the response, otherwise, if something went wrong, throw an error.
|
||||
if err = newPutResponse(addr).encode(c); err != nil {
|
||||
log.Error(logs.CouldNotEncodeResponse, zap.Error(err))
|
||||
ResponseError(c, "could not encode response", fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) uploadObject(c *fasthttp.RequestCtx, bkt *data.BucketInfo, attrs []object.Attribute, file io.Reader) (oid.ID, error) {
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
|
||||
obj := object.New()
|
||||
obj.SetContainerID(bktInfo.CID)
|
||||
obj.SetContainerID(bkt.CID)
|
||||
obj.SetOwnerID(*h.ownerID)
|
||||
obj.SetAttributes(attributes...)
|
||||
obj.SetAttributes(attrs...)
|
||||
|
||||
prm := PrmObjectCreate{
|
||||
PrmAuth: PrmAuth{
|
||||
|
@ -142,48 +146,128 @@ func (h *Handler) Upload(c *fasthttp.RequestCtx) {
|
|||
Object: obj,
|
||||
Payload: file,
|
||||
ClientCut: h.config.ClientCut(),
|
||||
WithoutHomomorphicHash: bktInfo.HomomorphicHashDisabled,
|
||||
WithoutHomomorphicHash: bkt.HomomorphicHashDisabled,
|
||||
BufferMaxSize: h.config.BufferMaxSizeForPut(),
|
||||
}
|
||||
|
||||
if idObj, err = h.frostfs.CreateObject(ctx, prm); err != nil {
|
||||
h.handlePutFrostFSErr(c, err, log)
|
||||
return
|
||||
idObj, err := h.frostfs.CreateObject(ctx, prm)
|
||||
if err != nil {
|
||||
return oid.ID{}, err
|
||||
}
|
||||
|
||||
addr.SetObject(idObj)
|
||||
addr.SetContainer(bktInfo.CID)
|
||||
return idObj, nil
|
||||
}
|
||||
|
||||
// Try to return the response, otherwise, if something went wrong, throw an error.
|
||||
if err = newPutResponse(addr).encode(c); err != nil {
|
||||
log.Error(logs.CouldNotEncodeResponse, zap.Error(err))
|
||||
response.Error(c, "could not encode response", fasthttp.StatusBadRequest)
|
||||
|
||||
return
|
||||
}
|
||||
// Multipart is multipart and thus can contain more than one part which
|
||||
// we ignore at the moment. Also, when dealing with chunked encoding
|
||||
// the last zero-length chunk might be left unread (because multipart
|
||||
// reader only cares about its boundary and doesn't look further) and
|
||||
// it will be (erroneously) interpreted as the start of the next
|
||||
// pipelined header. Thus we need to drain the body buffer.
|
||||
for {
|
||||
_, err = bodyStream.Read(drainBuf)
|
||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||
break
|
||||
func (h *Handler) extractAttributes(c *fasthttp.RequestCtx, log *zap.Logger, filtered map[string]string) ([]object.Attribute, error) {
|
||||
now := time.Now()
|
||||
if rawHeader := c.Request.Header.Peek(fasthttp.HeaderDate); rawHeader != nil {
|
||||
if parsed, err := time.Parse(http.TimeFormat, string(rawHeader)); err != nil {
|
||||
log.Warn(logs.CouldNotParseClientTime, zap.String("Date header", string(rawHeader)), zap.Error(err))
|
||||
} else {
|
||||
now = parsed
|
||||
}
|
||||
}
|
||||
// Report status code and content type.
|
||||
c.Response.SetStatusCode(fasthttp.StatusOK)
|
||||
c.Response.Header.SetContentType(jsonHeader)
|
||||
if err := utils.PrepareExpirationHeader(c, h.frostfs, filtered, now); err != nil {
|
||||
log.Error(logs.CouldNotPrepareExpirationHeader, zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
attributes := make([]object.Attribute, 0, len(filtered))
|
||||
// prepares attributes from filtered headers
|
||||
for key, val := range filtered {
|
||||
attribute := newAttribute(key, val)
|
||||
attributes = append(attributes, attribute)
|
||||
}
|
||||
// sets Timestamp attribute if it wasn't set from header and enabled by settings
|
||||
if _, ok := filtered[object.AttributeTimestamp]; !ok && h.config.DefaultTimestamp() {
|
||||
timestamp := newAttribute(object.AttributeTimestamp, strconv.FormatInt(time.Now().Unix(), 10))
|
||||
attributes = append(attributes, timestamp)
|
||||
}
|
||||
|
||||
return attributes, nil
|
||||
}
|
||||
|
||||
func newAttribute(key string, val string) object.Attribute {
|
||||
attr := object.NewAttribute()
|
||||
attr.SetKey(key)
|
||||
attr.SetValue(val)
|
||||
return *attr
|
||||
}
|
||||
|
||||
// explodeArchive read files from archive and creates objects for each of them.
|
||||
// Sets FilePath attribute with name from tar.Header.
|
||||
func (h *Handler) explodeArchive(req request, bkt *data.BucketInfo, file io.ReadCloser, filtered map[string]string) {
|
||||
c, log := req.RequestCtx, req.log
|
||||
|
||||
// remove user attributes which vary for each file in archive
|
||||
// to guarantee that they won't appear twice
|
||||
delete(filtered, object.AttributeFileName)
|
||||
delete(filtered, object.AttributeFilePath)
|
||||
|
||||
commonAttributes, err := h.extractAttributes(c, log, filtered)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToGetAttributes, zap.Error(err))
|
||||
ResponseError(c, "could not extract attributes: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
attributes := commonAttributes
|
||||
|
||||
reader := file
|
||||
if bytes.EqualFold(c.Request.Header.Peek(fasthttp.HeaderContentEncoding), []byte("gzip")) {
|
||||
log.Debug(logs.GzipReaderSelected)
|
||||
gzipReader, err := gzip.NewReader(file)
|
||||
if err != nil {
|
||||
log.Error(logs.FailedToCreateGzipReader, zap.Error(err))
|
||||
ResponseError(c, "could read gzip file: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
if err := gzipReader.Close(); err != nil {
|
||||
log.Warn(logs.FailedToCloseReader, zap.Error(err))
|
||||
}
|
||||
}()
|
||||
reader = gzipReader
|
||||
}
|
||||
|
||||
tarReader := tar.NewReader(reader)
|
||||
for {
|
||||
obj, err := tarReader.Next()
|
||||
if errors.Is(err, io.EOF) {
|
||||
break
|
||||
} else if err != nil {
|
||||
log.Error(logs.FailedToReadFileFromTar, zap.Error(err))
|
||||
ResponseError(c, "could not get next entry: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
if isDir(obj.Name) {
|
||||
continue
|
||||
}
|
||||
|
||||
// set varying attributes
|
||||
attributes = attributes[:len(commonAttributes)]
|
||||
fileName := filepath.Base(obj.Name)
|
||||
attributes = append(attributes, newAttribute(object.AttributeFilePath, obj.Name))
|
||||
attributes = append(attributes, newAttribute(object.AttributeFileName, fileName))
|
||||
|
||||
idObj, err := h.uploadObject(c, bkt, attributes, tarReader)
|
||||
if err != nil {
|
||||
h.handlePutFrostFSErr(c, err, log)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debug(logs.ObjectUploaded,
|
||||
zap.String("oid", idObj.EncodeToString()),
|
||||
zap.String("FileName", fileName),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) handlePutFrostFSErr(r *fasthttp.RequestCtx, err error, log *zap.Logger) {
|
||||
statusCode, msg, additionalFields := response.FormErrorResponse("could not store file in frostfs", err)
|
||||
statusCode, msg, additionalFields := formErrorResponse("could not store file in frostfs", err)
|
||||
logFields := append([]zap.Field{zap.Error(err)}, additionalFields...)
|
||||
|
||||
log.Error(logs.CouldNotStoreFileInFrostfs, logFields...)
|
||||
response.Error(r, msg, statusCode)
|
||||
ResponseError(r, msg, statusCode)
|
||||
}
|
||||
|
||||
func (h *Handler) fetchBearerToken(ctx context.Context) *bearer.Token {
|
||||
|
|
|
@ -2,14 +2,16 @@ package handler
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
sdkstatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -22,16 +24,23 @@ type request struct {
|
|||
log *zap.Logger
|
||||
}
|
||||
|
||||
func newRequest(ctx *fasthttp.RequestCtx, log *zap.Logger) request {
|
||||
return request{
|
||||
RequestCtx: ctx,
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *request) handleFrostFSErr(err error, start time.Time) {
|
||||
logFields := []zap.Field{
|
||||
zap.Stringer("elapsed", time.Since(start)),
|
||||
zap.Error(err),
|
||||
}
|
||||
statusCode, msg, additionalFields := response.FormErrorResponse("could not receive object", err)
|
||||
statusCode, msg, additionalFields := formErrorResponse("could not receive object", err)
|
||||
logFields = append(logFields, additionalFields...)
|
||||
|
||||
r.log.Error(logs.CouldNotReceiveObject, logFields...)
|
||||
response.Error(r.RequestCtx, msg, statusCode)
|
||||
ResponseError(r.RequestCtx, msg, statusCode)
|
||||
}
|
||||
|
||||
func bearerToken(ctx context.Context) *bearer.Token {
|
||||
|
@ -42,16 +51,7 @@ func bearerToken(ctx context.Context) *bearer.Token {
|
|||
}
|
||||
|
||||
func isDir(name string) bool {
|
||||
return strings.HasSuffix(name, "/")
|
||||
}
|
||||
|
||||
func isObjectID(s string) bool {
|
||||
var objID oid.ID
|
||||
return objID.DecodeString(s) == nil
|
||||
}
|
||||
|
||||
func isContainerRoot(key string) bool {
|
||||
return key == ""
|
||||
return name == "" || strings.HasSuffix(name, "/")
|
||||
}
|
||||
|
||||
func loadAttributes(attrs []object.Attribute) map[string]string {
|
||||
|
@ -88,10 +88,10 @@ func logAndSendBucketError(c *fasthttp.RequestCtx, log *zap.Logger, err error) {
|
|||
log.Error(logs.CouldntGetBucket, zap.Error(err))
|
||||
|
||||
if client.IsErrContainerNotFound(err) {
|
||||
response.Error(c, "Not Found", fasthttp.StatusNotFound)
|
||||
ResponseError(c, "Not Found", fasthttp.StatusNotFound)
|
||||
return
|
||||
}
|
||||
response.Error(c, "could not get bucket: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
ResponseError(c, "could not get bucket: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
}
|
||||
|
||||
func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
|
||||
|
@ -100,3 +100,43 @@ func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
|
|||
addr.SetObject(obj)
|
||||
return addr
|
||||
}
|
||||
|
||||
// setIfNotExist sets key value to map if key is not present yet.
|
||||
func setIfNotExist(m map[string]string, key, value string) {
|
||||
if _, ok := m[key]; !ok {
|
||||
m[key] = value
|
||||
}
|
||||
}
|
||||
|
||||
func ResponseError(r *fasthttp.RequestCtx, msg string, code int) {
|
||||
r.Error(msg+"\n", code)
|
||||
}
|
||||
|
||||
func formErrorResponse(message string, err error) (int, string, []zap.Field) {
|
||||
var (
|
||||
msg string
|
||||
statusCode int
|
||||
logFields []zap.Field
|
||||
)
|
||||
|
||||
st := new(sdkstatus.ObjectAccessDenied)
|
||||
|
||||
switch {
|
||||
case errors.As(err, &st):
|
||||
statusCode = fasthttp.StatusForbidden
|
||||
reason := st.Reason()
|
||||
msg = fmt.Sprintf("%s: %v: %s", message, err, reason)
|
||||
logFields = append(logFields, zap.String("error_detail", reason))
|
||||
case errors.Is(err, ErrQuotaLimitReached):
|
||||
statusCode = fasthttp.StatusConflict
|
||||
msg = fmt.Sprintf("%s: %v", message, err)
|
||||
case client.IsErrObjectNotFound(err) || client.IsErrContainerNotFound(err):
|
||||
statusCode = fasthttp.StatusNotFound
|
||||
msg = "Not Found"
|
||||
default:
|
||||
statusCode = fasthttp.StatusBadRequest
|
||||
msg = fmt.Sprintf("%s: %v", message, err)
|
||||
}
|
||||
|
||||
return statusCode, msg, logFields
|
||||
}
|
||||
|
|
|
@ -4,13 +4,15 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
)
|
||||
|
||||
// TreeService provide interface to interact with tree service using s3 data models.
|
||||
type TreeService interface {
|
||||
GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*api.NodeVersion, error)
|
||||
GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*data.NodeVersion, error)
|
||||
GetSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) ([]data.NodeInfo, string, error)
|
||||
CheckSettingsNodeExists(ctx context.Context, bktInfo *data.BucketInfo) error
|
||||
}
|
||||
|
||||
var (
|
|
@ -1,83 +1,80 @@
|
|||
package logs
|
||||
|
||||
const (
|
||||
CouldntParseCreationDate = "couldn't parse creation date" // Info in ../../downloader/*
|
||||
CouldNotDetectContentTypeFromPayload = "could not detect Content-Type from payload" // Error in ../../downloader/download.go
|
||||
CouldNotReceiveObject = "could not receive object" // Error in ../../downloader/download.go
|
||||
WrongObjectID = "wrong object id" // Error in ../../downloader/download.go
|
||||
GetLatestObjectVersion = "get latest object version" // Error in ../../downloader/download.go
|
||||
ObjectWasDeleted = "object was deleted" // Error in ../../downloader/download.go
|
||||
CouldNotSearchForObjects = "could not search for objects" // Error in ../../downloader/download.go
|
||||
ObjectNotFound = "object not found" // Error in ../../downloader/download.go
|
||||
ReadObjectListFailed = "read object list failed" // Error in ../../downloader/download.go
|
||||
FailedToAddObjectToArchive = "failed to add object to archive" // Error in ../../downloader/download.go
|
||||
IteratingOverSelectedObjectsFailed = "iterating over selected objects failed" // Error in ../../downloader/download.go
|
||||
ObjectsNotFound = "objects not found" // Error in ../../downloader/download.go
|
||||
CloseZipWriter = "close zip writer" // Error in ../../downloader/download.go
|
||||
ServiceIsRunning = "service is running" // Info in ../../metrics/service.go
|
||||
ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port" // Warn in ../../metrics/service.go
|
||||
ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled" // Info in ../../metrics/service.go
|
||||
ShuttingDownService = "shutting down service" // Info in ../../metrics/service.go
|
||||
CantShutDownService = "can't shut down service" // Panic in ../../metrics/service.go
|
||||
CantGracefullyShutDownService = "can't gracefully shut down service, force stop" // Error in ../../metrics/service.go
|
||||
IgnorePartEmptyFormName = "ignore part, empty form name" // Debug in ../../uploader/upload.go
|
||||
IgnorePartEmptyFilename = "ignore part, empty filename" // Debug in ../../uploader/upload.go
|
||||
CloseTemporaryMultipartFormFile = "close temporary multipart/form file" // Debug in ../../uploader/upload.go
|
||||
CouldNotReceiveMultipartForm = "could not receive multipart/form" // Error in ../../uploader/upload.go
|
||||
CouldNotProcessHeaders = "could not process headers" // Error in ../../uploader/upload.go
|
||||
CouldNotParseClientTime = "could not parse client time" // Warn in ../../uploader/upload.go
|
||||
CouldNotPrepareExpirationHeader = "could not prepare expiration header" // Error in ../../uploader/upload.go
|
||||
CouldNotEncodeResponse = "could not encode response" // Error in ../../uploader/upload.go
|
||||
CouldNotStoreFileInFrostfs = "could not store file in frostfs" // Error in ../../uploader/upload.go
|
||||
AddAttributeToResultObject = "add attribute to result object" // Debug in ../../uploader/filter.go
|
||||
FailedToCreateResolver = "failed to create resolver" // Fatal in ../../app.go
|
||||
FailedToCreateWorkerPool = "failed to create worker pool" // Fatal in ../../app.go
|
||||
FailedToReadIndexPageTemplate = "failed to read index page template" // Error in ../../app.go
|
||||
SetCustomIndexPageTemplate = "set custom index page template" // Info in ../../app.go
|
||||
ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty = "container resolver will be disabled because of resolvers 'resolver_order' is empty" // Info in ../../app.go
|
||||
MetricsAreDisabled = "metrics are disabled" // Warn in ../../app.go
|
||||
NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun = "no wallet path specified, creating ephemeral key automatically for this run" // Info in ../../app.go
|
||||
StartingApplication = "starting application" // Info in ../../app.go
|
||||
StartingServer = "starting server" // Info in ../../app.go
|
||||
ListenAndServe = "listen and serve" // Fatal in ../../app.go
|
||||
ShuttingDownWebServer = "shutting down web server" // Info in ../../app.go
|
||||
FailedToShutdownTracing = "failed to shutdown tracing" // Warn in ../../app.go
|
||||
SIGHUPConfigReloadStarted = "SIGHUP config reload started" // Info in ../../app.go
|
||||
FailedToReloadConfigBecauseItsMissed = "failed to reload config because it's missed" // Warn in ../../app.go
|
||||
FailedToReloadConfig = "failed to reload config" // Warn in ../../app.go
|
||||
LogLevelWontBeUpdated = "log level won't be updated" // Warn in ../../app.go
|
||||
FailedToUpdateResolvers = "failed to update resolvers" // Warn in ../../app.go
|
||||
FailedToReloadServerParameters = "failed to reload server parameters" // Warn in ../../app.go
|
||||
SIGHUPConfigReloadCompleted = "SIGHUP config reload completed" // Info in ../../app.go
|
||||
AddedPathUploadCid = "added path /upload/{cid}" // Info in ../../app.go
|
||||
AddedPathGetCidOid = "added path /get/{cid}/{oid}" // Info in ../../app.go
|
||||
AddedPathGetByAttributeCidAttrKeyAttrVal = "added path /get_by_attribute/{cid}/{attr_key}/{attr_val:*}" // Info in ../../app.go
|
||||
AddedPathZipCidPrefix = "added path /zip/{cid}/{prefix}" // Info in ../../app.go
|
||||
Request = "request" // Info in ../../app.go
|
||||
CouldNotFetchAndStoreBearerToken = "could not fetch and store bearer token" // Error in ../../app.go
|
||||
FailedToAddServer = "failed to add server" // Warn in ../../app.go
|
||||
AddServer = "add server" // Info in ../../app.go
|
||||
NoHealthyServers = "no healthy servers" // Fatal in ../../app.go
|
||||
FailedToInitializeTracing = "failed to initialize tracing" // Warn in ../../app.go
|
||||
TracingConfigUpdated = "tracing config updated" // Info in ../../app.go
|
||||
ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided = "resolver nns won't be used since rpc_endpoint isn't provided" // Warn in ../../app.go
|
||||
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped" // Warn in ../../app.go
|
||||
RuntimeSoftMemoryLimitUpdated = "soft runtime memory limit value updated" // Info in ../../app.go
|
||||
CouldNotLoadFrostFSPrivateKey = "could not load FrostFS private key" // Fatal in ../../settings.go
|
||||
UsingCredentials = "using credentials" // Info in ../../settings.go
|
||||
FailedToCreateConnectionPool = "failed to create connection pool" // Fatal in ../../settings.go
|
||||
FailedToDialConnectionPool = "failed to dial connection pool" // Fatal in ../../settings.go
|
||||
FailedToCreateTreePool = "failed to create tree pool" // Fatal in ../../settings.go
|
||||
FailedToDialTreePool = "failed to dial tree pool" // Fatal in ../../settings.go
|
||||
AddedStoragePeer = "added storage peer" // Info in ../../settings.go
|
||||
CouldntGetBucket = "could not get bucket" // Error in ../handler/utils.go
|
||||
CouldntPutBucketIntoCache = "couldn't put bucket info into cache" // Warn in ../handler/handler.go
|
||||
FailedToSumbitTaskToPool = "failed to submit task to pool" // Error in ../handler/browse.go
|
||||
FailedToHeadObject = "failed to head object" // Error in ../handler/browse.go
|
||||
FailedToIterateOverResponse = "failed to iterate over search response" // Error in ../handler/browse.go
|
||||
InvalidCacheEntryType = "invalid cache entry type" // Warn in ../cache/buckets.go
|
||||
InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/http-gw/settings.go
|
||||
InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/http-gw/settings.go
|
||||
CouldntParseCreationDate = "couldn't parse creation date"
|
||||
CouldNotDetectContentTypeFromPayload = "could not detect Content-Type from payload"
|
||||
CouldNotReceiveObject = "could not receive object"
|
||||
ObjectWasDeleted = "object was deleted"
|
||||
CouldNotSearchForObjects = "could not search for objects"
|
||||
ObjectNotFound = "object not found"
|
||||
ReadObjectListFailed = "read object list failed"
|
||||
FailedToAddObjectToArchive = "failed to add object to archive"
|
||||
FailedToGetObject = "failed to get object"
|
||||
IteratingOverSelectedObjectsFailed = "iterating over selected objects failed"
|
||||
ObjectsNotFound = "objects not found"
|
||||
CloseZipWriter = "close zip writer"
|
||||
ServiceIsRunning = "service is running"
|
||||
ServiceCouldntStartOnConfiguredPort = "service couldn't start on configured port"
|
||||
ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled"
|
||||
ShuttingDownService = "shutting down service"
|
||||
CantShutDownService = "can't shut down service"
|
||||
CantGracefullyShutDownService = "can't gracefully shut down service, force stop"
|
||||
IgnorePartEmptyFormName = "ignore part, empty form name"
|
||||
IgnorePartEmptyFilename = "ignore part, empty filename"
|
||||
CouldNotReceiveMultipartForm = "could not receive multipart/form"
|
||||
CouldNotParseClientTime = "could not parse client time"
|
||||
CouldNotPrepareExpirationHeader = "could not prepare expiration header"
|
||||
CouldNotEncodeResponse = "could not encode response"
|
||||
CouldNotStoreFileInFrostfs = "could not store file in frostfs"
|
||||
AddAttributeToResultObject = "add attribute to result object"
|
||||
FailedToCreateResolver = "failed to create resolver"
|
||||
FailedToCreateWorkerPool = "failed to create worker pool"
|
||||
FailedToReadIndexPageTemplate = "failed to read index page template"
|
||||
SetCustomIndexPageTemplate = "set custom index page template"
|
||||
ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty = "container resolver will be disabled because of resolvers 'resolver_order' is empty"
|
||||
MetricsAreDisabled = "metrics are disabled"
|
||||
NoWalletPathSpecifiedCreatingEphemeralKeyAutomaticallyForThisRun = "no wallet path specified, creating ephemeral key automatically for this run"
|
||||
StartingApplication = "starting application"
|
||||
StartingServer = "starting server"
|
||||
ListenAndServe = "listen and serve"
|
||||
ShuttingDownWebServer = "shutting down web server"
|
||||
FailedToShutdownTracing = "failed to shutdown tracing"
|
||||
SIGHUPConfigReloadStarted = "SIGHUP config reload started"
|
||||
FailedToReloadConfigBecauseItsMissed = "failed to reload config because it's missed"
|
||||
FailedToReloadConfig = "failed to reload config"
|
||||
LogLevelWontBeUpdated = "log level won't be updated"
|
||||
FailedToUpdateResolvers = "failed to update resolvers"
|
||||
FailedToReloadServerParameters = "failed to reload server parameters"
|
||||
SIGHUPConfigReloadCompleted = "SIGHUP config reload completed"
|
||||
AddedPathUploadCid = "added path /upload/{cid}"
|
||||
AddedPathGetCidOid = "added path /get/{cid}/{oid}"
|
||||
AddedPathGetByAttributeCidAttrKeyAttrVal = "added path /get_by_attribute/{cid}/{attr_key}/{attr_val:*}"
|
||||
AddedPathZipCidPrefix = "added path /zip/{cid}/{prefix}"
|
||||
Request = "request"
|
||||
CouldNotFetchAndStoreBearerToken = "could not fetch and store bearer token"
|
||||
FailedToAddServer = "failed to add server"
|
||||
AddServer = "add server"
|
||||
NoHealthyServers = "no healthy servers"
|
||||
FailedToInitializeTracing = "failed to initialize tracing"
|
||||
TracingConfigUpdated = "tracing config updated"
|
||||
ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided = "resolver nns won't be used since rpc_endpoint isn't provided"
|
||||
RuntimeSoftMemoryDefinedWithGOMEMLIMIT = "soft runtime memory defined with GOMEMLIMIT environment variable, config value skipped"
|
||||
RuntimeSoftMemoryLimitUpdated = "soft runtime memory limit value updated"
|
||||
CouldNotLoadFrostFSPrivateKey = "could not load FrostFS private key"
|
||||
UsingCredentials = "using credentials"
|
||||
FailedToCreateConnectionPool = "failed to create connection pool"
|
||||
FailedToDialConnectionPool = "failed to dial connection pool"
|
||||
FailedToCreateTreePool = "failed to create tree pool"
|
||||
FailedToDialTreePool = "failed to dial tree pool"
|
||||
AddedStoragePeer = "added storage peer"
|
||||
CouldntGetBucket = "could not get bucket"
|
||||
CouldntPutBucketIntoCache = "couldn't put bucket info into cache"
|
||||
FailedToSumbitTaskToPool = "failed to submit task to pool"
|
||||
FailedToHeadObject = "failed to head object"
|
||||
FailedToIterateOverResponse = "failed to iterate over search response"
|
||||
InvalidCacheEntryType = "invalid cache entry type"
|
||||
InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)"
|
||||
InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value"
|
||||
FailedToUnescapeQuery = "failed to unescape query"
|
||||
ServerReconnecting = "reconnecting server..."
|
||||
ServerReconnectedSuccessfully = "server reconnected successfully"
|
||||
|
@ -87,4 +84,15 @@ const (
|
|||
MultinetDialFail = "multinet dial failed"
|
||||
FailedToLoadMultinetConfig = "failed to load multinet config"
|
||||
MultinetConfigWontBeUpdated = "multinet config won't be updated"
|
||||
ObjectNotFoundByFilePathTrySearchByFileName = "object not found by filePath attribute, try search by fileName"
|
||||
CouldntCacheNetmap = "couldn't cache netmap"
|
||||
FailedToFilterHeaders = "failed to filter headers"
|
||||
FailedToReadFileFromTar = "failed to read file from tar"
|
||||
FailedToGetAttributes = "failed to get attributes"
|
||||
ObjectUploaded = "object uploaded"
|
||||
CloseGzipWriter = "close gzip writer"
|
||||
CloseTarWriter = "close tar writer"
|
||||
FailedToCloseReader = "failed to close reader"
|
||||
FailedToCreateGzipReader = "failed to create gzip reader"
|
||||
GzipReaderSelected = "gzip reader selected"
|
||||
)
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
|
@ -173,6 +174,15 @@ func (x *FrostFS) GetEpochDurations(ctx context.Context) (*utils.EpochDurations,
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func (x *FrostFS) NetmapSnapshot(ctx context.Context) (netmap.NetMap, error) {
|
||||
netmapSnapshot, err := x.pool.NetMapSnapshot(ctx)
|
||||
if err != nil {
|
||||
return netmapSnapshot, handleObjectError("get netmap via connection pool", err)
|
||||
}
|
||||
|
||||
return netmapSnapshot, nil
|
||||
}
|
||||
|
||||
// ResolverFrostFS represents virtual connection to the FrostFS network.
|
||||
// It implements resolver.FrostFS.
|
||||
type ResolverFrostFS struct {
|
||||
|
@ -205,6 +215,10 @@ func handleObjectError(msg string, err error) error {
|
|||
}
|
||||
|
||||
if reason, ok := IsErrObjectAccessDenied(err); ok {
|
||||
if strings.Contains(reason, "limit reached") {
|
||||
return fmt.Errorf("%s: %w: %s", msg, handler.ErrQuotaLimitReached, reason)
|
||||
}
|
||||
|
||||
return fmt.Errorf("%s: %w: %s", msg, handler.ErrAccessDenied, reason)
|
||||
}
|
||||
|
||||
|
|
83
internal/service/frostfs/frostfs_test.go
Normal file
83
internal/service/frostfs/frostfs_test.go
Normal file
|
@ -0,0 +1,83 @@
|
|||
package frostfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
func TestHandleObjectError(t *testing.T) {
|
||||
msg := "some msg"
|
||||
|
||||
t.Run("nil error", func(t *testing.T) {
|
||||
err := handleObjectError(msg, nil)
|
||||
require.Nil(t, err)
|
||||
})
|
||||
|
||||
t.Run("simple access denied", func(t *testing.T) {
|
||||
reason := "some reason"
|
||||
inputErr := new(apistatus.ObjectAccessDenied)
|
||||
inputErr.WriteReason(reason)
|
||||
|
||||
err := handleObjectError(msg, inputErr)
|
||||
require.ErrorIs(t, err, handler.ErrAccessDenied)
|
||||
require.Contains(t, err.Error(), reason)
|
||||
require.Contains(t, err.Error(), msg)
|
||||
})
|
||||
|
||||
t.Run("access denied - quota reached", func(t *testing.T) {
|
||||
reason := "Quota limit reached"
|
||||
inputErr := new(apistatus.ObjectAccessDenied)
|
||||
inputErr.WriteReason(reason)
|
||||
|
||||
err := handleObjectError(msg, inputErr)
|
||||
require.ErrorIs(t, err, handler.ErrQuotaLimitReached)
|
||||
require.Contains(t, err.Error(), reason)
|
||||
require.Contains(t, err.Error(), msg)
|
||||
})
|
||||
|
||||
t.Run("simple timeout", func(t *testing.T) {
|
||||
inputErr := errors.New("timeout")
|
||||
|
||||
err := handleObjectError(msg, inputErr)
|
||||
require.ErrorIs(t, err, handler.ErrGatewayTimeout)
|
||||
require.Contains(t, err.Error(), inputErr.Error())
|
||||
require.Contains(t, err.Error(), msg)
|
||||
})
|
||||
|
||||
t.Run("deadline exceeded", func(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
defer cancel()
|
||||
<-ctx.Done()
|
||||
|
||||
err := handleObjectError(msg, ctx.Err())
|
||||
require.ErrorIs(t, err, handler.ErrGatewayTimeout)
|
||||
require.Contains(t, err.Error(), ctx.Err().Error())
|
||||
require.Contains(t, err.Error(), msg)
|
||||
})
|
||||
|
||||
t.Run("grpc deadline exceeded", func(t *testing.T) {
|
||||
inputErr := fmt.Errorf("wrap grpc error: %w", status.Error(codes.DeadlineExceeded, "error"))
|
||||
|
||||
err := handleObjectError(msg, inputErr)
|
||||
require.ErrorIs(t, err, handler.ErrGatewayTimeout)
|
||||
require.Contains(t, err.Error(), inputErr.Error())
|
||||
require.Contains(t, err.Error(), msg)
|
||||
})
|
||||
|
||||
t.Run("unknown error", func(t *testing.T) {
|
||||
inputErr := errors.New("unknown error")
|
||||
|
||||
err := handleObjectError(msg, inputErr)
|
||||
require.ErrorIs(t, err, inputErr)
|
||||
require.Contains(t, err.Error(), msg)
|
||||
})
|
||||
}
|
69
internal/service/frostfs/source.go
Normal file
69
internal/service/frostfs/source.go
Normal file
|
@ -0,0 +1,69 @@
|
|||
package frostfs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type Source struct {
|
||||
frostFS *FrostFS
|
||||
netmapCache *cache.NetmapCache
|
||||
bucketCache *cache.BucketCache
|
||||
log *zap.Logger
|
||||
}
|
||||
|
||||
func NewSource(frostFS *FrostFS, netmapCache *cache.NetmapCache, bucketCache *cache.BucketCache, log *zap.Logger) *Source {
|
||||
return &Source{
|
||||
frostFS: frostFS,
|
||||
netmapCache: netmapCache,
|
||||
bucketCache: bucketCache,
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Source) NetMapSnapshot(ctx context.Context) (netmap.NetMap, error) {
|
||||
cachedNetmap := s.netmapCache.Get()
|
||||
if cachedNetmap != nil {
|
||||
return *cachedNetmap, nil
|
||||
}
|
||||
|
||||
netmapSnapshot, err := s.frostFS.NetmapSnapshot(ctx)
|
||||
if err != nil {
|
||||
return netmap.NetMap{}, fmt.Errorf("get netmap: %w", err)
|
||||
}
|
||||
|
||||
if err = s.netmapCache.Put(netmapSnapshot); err != nil {
|
||||
s.log.Warn(logs.CouldntCacheNetmap, zap.Error(err))
|
||||
}
|
||||
|
||||
return netmapSnapshot, nil
|
||||
}
|
||||
|
||||
func (s *Source) PlacementPolicy(ctx context.Context, cnrID cid.ID) (netmap.PlacementPolicy, error) {
|
||||
info := s.bucketCache.GetByCID(cnrID)
|
||||
if info != nil {
|
||||
return info.PlacementPolicy, nil
|
||||
}
|
||||
|
||||
prm := handler.PrmContainer{
|
||||
ContainerID: cnrID,
|
||||
}
|
||||
res, err := s.frostFS.Container(ctx, prm)
|
||||
if err != nil {
|
||||
return netmap.PlacementPolicy{}, fmt.Errorf("get container: %w", err)
|
||||
}
|
||||
|
||||
// We don't put container back to the cache to keep cache
|
||||
// coherent to the requests made by users. FrostFS Source
|
||||
// is being used by SDK Tree Pool and it should not fill cache
|
||||
// with possibly irrelevant container values.
|
||||
|
||||
return res.PlacementPolicy(), nil
|
||||
}
|
|
@ -1,41 +0,0 @@
|
|||
package response
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
sdkstatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func Error(r *fasthttp.RequestCtx, msg string, code int) {
|
||||
r.Error(msg+"\n", code)
|
||||
}
|
||||
|
||||
func FormErrorResponse(message string, err error) (int, string, []zap.Field) {
|
||||
var (
|
||||
msg string
|
||||
statusCode int
|
||||
logFields []zap.Field
|
||||
)
|
||||
|
||||
st := new(sdkstatus.ObjectAccessDenied)
|
||||
|
||||
switch {
|
||||
case errors.As(err, &st):
|
||||
statusCode = fasthttp.StatusForbidden
|
||||
reason := st.Reason()
|
||||
msg = fmt.Sprintf("%s: %v: %s", message, err, reason)
|
||||
logFields = append(logFields, zap.String("error_detail", reason))
|
||||
case client.IsErrObjectNotFound(err) || client.IsErrContainerNotFound(err):
|
||||
statusCode = fasthttp.StatusNotFound
|
||||
msg = "Not Found"
|
||||
default:
|
||||
statusCode = fasthttp.StatusBadRequest
|
||||
msg = fmt.Sprintf("%s: %v", message, err)
|
||||
}
|
||||
|
||||
return statusCode, msg, logFields
|
||||
}
|
|
@ -82,14 +82,22 @@ func fetchBearerToken(ctx *fasthttp.RequestCtx) (*bearer.Token, error) {
|
|||
tkn = new(bearer.Token)
|
||||
)
|
||||
for _, parse := range []fromHandler{BearerTokenFromHeader, BearerTokenFromCookie} {
|
||||
if buf = parse(&ctx.Request.Header); buf == nil {
|
||||
buf = parse(&ctx.Request.Header)
|
||||
if buf == nil {
|
||||
continue
|
||||
} else if data, err := base64.StdEncoding.DecodeString(string(buf)); err != nil {
|
||||
}
|
||||
|
||||
data, err := base64.StdEncoding.DecodeString(string(buf))
|
||||
if err != nil {
|
||||
lastErr = fmt.Errorf("can't base64-decode bearer token: %w", err)
|
||||
continue
|
||||
} else if err = tkn.Unmarshal(data); err != nil {
|
||||
lastErr = fmt.Errorf("can't unmarshal bearer token: %w", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if err = tkn.Unmarshal(data); err != nil {
|
||||
if err = tkn.UnmarshalJSON(data); err != nil {
|
||||
lastErr = fmt.Errorf("can't unmarshal bearer token: %w", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return tkn, nil
|
||||
|
|
|
@ -98,8 +98,14 @@ func TestFetchBearerToken(t *testing.T) {
|
|||
tkn := new(bearer.Token)
|
||||
tkn.ForUser(uid)
|
||||
|
||||
t64 := base64.StdEncoding.EncodeToString(tkn.Marshal())
|
||||
require.NotEmpty(t, t64)
|
||||
jsonToken, err := tkn.MarshalJSON()
|
||||
require.NoError(t, err)
|
||||
|
||||
jsonTokenBase64 := base64.StdEncoding.EncodeToString(jsonToken)
|
||||
binaryTokenBase64 := base64.StdEncoding.EncodeToString(tkn.Marshal())
|
||||
|
||||
require.NotEmpty(t, jsonTokenBase64)
|
||||
require.NotEmpty(t, binaryTokenBase64)
|
||||
|
||||
cases := []struct {
|
||||
name string
|
||||
|
@ -143,25 +149,47 @@ func TestFetchBearerToken(t *testing.T) {
|
|||
error: "can't unmarshal bearer token",
|
||||
},
|
||||
{
|
||||
name: "bad header, but good cookie",
|
||||
name: "bad header, but good cookie with binary token",
|
||||
header: "dGVzdAo=",
|
||||
cookie: t64,
|
||||
cookie: binaryTokenBase64,
|
||||
expect: tkn,
|
||||
},
|
||||
{
|
||||
name: "bad cookie, but good header",
|
||||
header: t64,
|
||||
name: "bad cookie, but good header with binary token",
|
||||
header: binaryTokenBase64,
|
||||
cookie: "dGVzdAo=",
|
||||
expect: tkn,
|
||||
},
|
||||
{
|
||||
name: "ok for header",
|
||||
header: t64,
|
||||
name: "bad header, but good cookie with json token",
|
||||
header: "dGVzdAo=",
|
||||
cookie: jsonTokenBase64,
|
||||
expect: tkn,
|
||||
},
|
||||
{
|
||||
name: "ok for cookie",
|
||||
cookie: t64,
|
||||
name: "bad cookie, but good header with json token",
|
||||
header: jsonTokenBase64,
|
||||
cookie: "dGVzdAo=",
|
||||
expect: tkn,
|
||||
},
|
||||
{
|
||||
name: "ok for header with binary token",
|
||||
header: binaryTokenBase64,
|
||||
expect: tkn,
|
||||
},
|
||||
{
|
||||
name: "ok for cookie with binary token",
|
||||
cookie: binaryTokenBase64,
|
||||
expect: tkn,
|
||||
},
|
||||
{
|
||||
name: "ok for header with json token",
|
||||
header: jsonTokenBase64,
|
||||
expect: tkn,
|
||||
},
|
||||
{
|
||||
name: "ok for cookie with json token",
|
||||
cookie: jsonTokenBase64,
|
||||
expect: tkn,
|
||||
},
|
||||
}
|
||||
|
|
52
tree/tree.go
52
tree/tree.go
|
@ -6,9 +6,8 @@ import (
|
|||
"fmt"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/api/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/layer"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
@ -118,7 +117,7 @@ func (n *treeNode) FileName() (string, bool) {
|
|||
return value, ok
|
||||
}
|
||||
|
||||
func newNodeVersion(node NodeResponse) (*api.NodeVersion, error) {
|
||||
func newNodeVersion(node NodeResponse) (*data.NodeVersion, error) {
|
||||
tNode, err := newTreeNode(node)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid tree node: %w", err)
|
||||
|
@ -127,20 +126,30 @@ func newNodeVersion(node NodeResponse) (*api.NodeVersion, error) {
|
|||
return newNodeVersionFromTreeNode(tNode), nil
|
||||
}
|
||||
|
||||
func newNodeVersionFromTreeNode(treeNode *treeNode) *api.NodeVersion {
|
||||
func newNodeVersionFromTreeNode(treeNode *treeNode) *data.NodeVersion {
|
||||
_, isDeleteMarker := treeNode.Get(isDeleteMarkerKV)
|
||||
size, _ := treeNode.Get(sizeKV)
|
||||
version := &api.NodeVersion{
|
||||
BaseNodeVersion: api.BaseNodeVersion{
|
||||
OID: treeNode.ObjID,
|
||||
version := &data.NodeVersion{
|
||||
BaseNodeVersion: data.BaseNodeVersion{
|
||||
OID: treeNode.ObjID,
|
||||
IsDeleteMarker: isDeleteMarker,
|
||||
},
|
||||
DeleteMarker: isDeleteMarker,
|
||||
IsPrefixNode: size == "",
|
||||
}
|
||||
|
||||
return version
|
||||
}
|
||||
|
||||
func newNodeInfo(node NodeResponse) data.NodeInfo {
|
||||
nodeMeta := node.GetMeta()
|
||||
nodeInfo := data.NodeInfo{
|
||||
Meta: make([]data.NodeMeta, 0, len(nodeMeta)),
|
||||
}
|
||||
for _, meta := range nodeMeta {
|
||||
nodeInfo.Meta = append(nodeInfo.Meta, meta)
|
||||
}
|
||||
|
||||
return nodeInfo
|
||||
}
|
||||
|
||||
func newMultiNode(nodes []NodeResponse) (*multiSystemNode, error) {
|
||||
var (
|
||||
err error
|
||||
|
@ -180,7 +189,7 @@ func (m *multiSystemNode) Old() []*treeNode {
|
|||
return m.nodes[1:]
|
||||
}
|
||||
|
||||
func (c *Tree) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*api.NodeVersion, error) {
|
||||
func (c *Tree) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*data.NodeVersion, error) {
|
||||
nodes, err := c.GetVersions(ctx, cnrID, objectName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -210,7 +219,7 @@ func (c *Tree) GetVersions(ctx context.Context, cnrID *cid.ID, objectName string
|
|||
return c.service.GetNodes(ctx, p)
|
||||
}
|
||||
|
||||
func (c *Tree) CheckSettingsNodeExist(ctx context.Context, bktInfo *data.BucketInfo) error {
|
||||
func (c *Tree) CheckSettingsNodeExists(ctx context.Context, bktInfo *data.BucketInfo) error {
|
||||
_, err := c.getSystemNode(ctx, bktInfo, settingsFileName)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -236,7 +245,7 @@ func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, name
|
|||
nodes = filterMultipartNodes(nodes)
|
||||
|
||||
if len(nodes) == 0 {
|
||||
return nil, ErrNodeNotFound
|
||||
return nil, layer.ErrNodeNotFound
|
||||
}
|
||||
|
||||
return newMultiNode(nodes)
|
||||
|
@ -298,14 +307,14 @@ func pathFromName(objectName string) []string {
|
|||
return strings.Split(objectName, separator)
|
||||
}
|
||||
|
||||
func (c *Tree) GetSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) ([]NodeResponse, string, error) {
|
||||
func (c *Tree) GetSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo, prefix string, latestOnly bool) ([]data.NodeInfo, string, error) {
|
||||
rootID, tailPrefix, err := c.determinePrefixNode(ctx, bktInfo, versionTree, prefix)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
subTree, err := c.service.GetSubTree(ctx, bktInfo, versionTree, rootID, 2, false)
|
||||
if err != nil {
|
||||
if errors.Is(err, layer.ErrNodeNotFound) {
|
||||
if errors.Is(err, ErrNodeNotFound) {
|
||||
return nil, "", nil
|
||||
}
|
||||
return nil, "", err
|
||||
|
@ -340,14 +349,23 @@ func (c *Tree) GetSubTreeByPrefix(ctx context.Context, bktInfo *data.BucketInfo,
|
|||
nodesMap[fileName] = nodes
|
||||
}
|
||||
|
||||
result := make([]NodeResponse, 0, len(subTree))
|
||||
result := make([]data.NodeInfo, 0, len(subTree))
|
||||
for _, nodes := range nodesMap {
|
||||
result = append(result, nodes...)
|
||||
result = append(result, nodeResponseToNodeInfo(nodes)...)
|
||||
}
|
||||
|
||||
return result, strings.TrimSuffix(prefix, tailPrefix), nil
|
||||
}
|
||||
|
||||
func nodeResponseToNodeInfo(nodes []NodeResponse) []data.NodeInfo {
|
||||
nodesInfo := make([]data.NodeInfo, 0, len(nodes))
|
||||
for _, node := range nodes {
|
||||
nodesInfo = append(nodesInfo, newNodeInfo(node))
|
||||
}
|
||||
|
||||
return nodesInfo
|
||||
}
|
||||
|
||||
func (c *Tree) determinePrefixNode(ctx context.Context, bktInfo *data.BucketInfo, treeID, prefix string) ([]uint64, string, error) {
|
||||
rootID := []uint64{0}
|
||||
path := strings.Split(prefix, separator)
|
||||
|
|
Loading…
Add table
Reference in a new issue