forked from TrueCloudLab/frostfs-http-gw
Compare commits
29 commits
support/v0
...
master
Author | SHA1 | Date | |
---|---|---|---|
11965deb41 | |||
a95dc6c8c7 | |||
f39b3aa93a | |||
6695ebe5a0 | |||
c6383fc135 | |||
5ded105c09 | |||
88e32ddd7f | |||
007d278caa | |||
7ec9b34d33 | |||
5470916361 | |||
c038957649 | |||
ce4ec032f9 | |||
4049255eed | |||
2c95250f72 | |||
5ae75eb9d8 | |||
627294bf70 | |||
0ef3e18ee1 | |||
2e28b2ac85 | |||
a375af7d98 | |||
dc8d0d4ab3 | |||
7fa973b261 | |||
1ced82a714 | |||
49d6a27562 | |||
9a5a2239bd | |||
8bc246f8f9 | |||
9b34413e17 | |||
e61b4867c9 | |||
84eb57475b | |||
e26577e753 |
29 changed files with 1247 additions and 189 deletions
|
@ -18,3 +18,6 @@ jobs:
|
||||||
|
|
||||||
- name: Build binary
|
- name: Build binary
|
||||||
run: make
|
run: make
|
||||||
|
|
||||||
|
- name: Check dirty suffix
|
||||||
|
run: if [[ $(make version) == *"dirty"* ]]; then echo "Version has dirty suffix" && exit 1; fi
|
||||||
|
|
|
@ -15,6 +15,6 @@ jobs:
|
||||||
go-version: '1.21'
|
go-version: '1.21'
|
||||||
|
|
||||||
- name: Run commit format checker
|
- name: Run commit format checker
|
||||||
uses: https://git.frostfs.info/TrueCloudLab/dco-go@v1
|
uses: https://git.frostfs.info/TrueCloudLab/dco-go@v3
|
||||||
with:
|
with:
|
||||||
from: adb95642d
|
from: 'origin/${{ github.event.pull_request.base.ref }}'
|
||||||
|
|
41
CHANGELOG.md
41
CHANGELOG.md
|
@ -4,20 +4,57 @@ This document outlines major changes between releases.
|
||||||
|
|
||||||
## [Unreleased]
|
## [Unreleased]
|
||||||
|
|
||||||
|
## [0.28.1] - 2024-01-24
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- Tree pool traversal limit (#92)
|
||||||
|
- Add new `reconnect_interval` config param (#100)
|
||||||
|
|
||||||
|
### Update from 0.28.0
|
||||||
|
See new `frostfs.tree_pool_max_attempts` config parameter.
|
||||||
|
|
||||||
|
### Fixed
|
||||||
|
- Fix possibility of panic during SIGHUP (#99)
|
||||||
|
- Handle query unescape and invalid bearer token errors (#107)
|
||||||
|
- Fix HTTP/2 requests (#110)
|
||||||
|
|
||||||
|
### Added
|
||||||
|
- Support client side object cut (#70)
|
||||||
|
- Add `frostfs.client_cut` config param
|
||||||
|
- Add `frostfs.buffer_max_size_for_put` config param
|
||||||
|
- Add bucket/container caching
|
||||||
|
- Disable homomorphic hash for PUT if it's disabled in container itself
|
||||||
|
- Add new `logger.destination` config param (#89)
|
||||||
|
- Add support namespaces (#91)
|
||||||
|
|
||||||
|
### Changed
|
||||||
|
|
||||||
|
### Removed
|
||||||
|
|
||||||
|
## [0.28.0] - Academy of Sciences - 2023-12-07
|
||||||
|
|
||||||
### Fixed
|
### Fixed
|
||||||
- `grpc` schemas in tree configuration (#62)
|
- `grpc` schemas in tree configuration (#62)
|
||||||
|
- `GetSubTree` failures (#67)
|
||||||
|
- Debian packaging (#69, #90)
|
||||||
|
- Get latest version of tree node (#85)
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
- Support dump metrics descriptions (#29)
|
- Support dump metrics descriptions (#29)
|
||||||
- Support impersonate bearer token (#40, #45)
|
- Support impersonate bearer token (#40, #45)
|
||||||
- Tracing support (#20, #44, #60)
|
- Tracing support (#20, #44, #60)
|
||||||
- Object name resolving with tree service (#30)
|
- Object name resolving with tree service (#30)
|
||||||
|
- Metrics for current endpoint status (#77)
|
||||||
|
- Soft memory limit with `runtime.soft_memory_limit` (#72)
|
||||||
|
- Add selection of the node of the latest version of the object (#85)
|
||||||
|
|
||||||
### Changed
|
### Changed
|
||||||
- Update prometheus to v1.15.0 (#35)
|
- Update prometheus to v1.15.0 (#35)
|
||||||
- Update go version to 1.19 (#50)
|
- Update go version to 1.19 (#50)
|
||||||
- Finish rebranding (#2)
|
- Finish rebranding (#2)
|
||||||
- Use gate key to form object owner (#66)
|
- Use gate key to form object owner (#66)
|
||||||
|
- Move log messages to constants (#36)
|
||||||
|
- Uploader and downloader refactor (#73)
|
||||||
|
|
||||||
### Removed
|
### Removed
|
||||||
- Drop `tree.service` param (now endpoints from `peers` section are used) (#59)
|
- Drop `tree.service` param (now endpoints from `peers` section are used) (#59)
|
||||||
|
@ -61,4 +98,6 @@ This project is a fork of [NeoFS HTTP Gateway](https://github.com/nspcc-dev/neof
|
||||||
To see CHANGELOG for older versions, refer to https://github.com/nspcc-dev/neofs-http-gw/blob/master/CHANGELOG.md.
|
To see CHANGELOG for older versions, refer to https://github.com/nspcc-dev/neofs-http-gw/blob/master/CHANGELOG.md.
|
||||||
|
|
||||||
[0.27.0]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/72734ab4...v0.27.0
|
[0.27.0]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/72734ab4...v0.27.0
|
||||||
[Unreleased]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.27.0...master
|
[0.28.0]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.27.0...v0.28.0
|
||||||
|
[0.28.1]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.28.0...v0.28.1
|
||||||
|
[Unreleased]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.28.1...master
|
||||||
|
|
2
VERSION
2
VERSION
|
@ -1 +1 @@
|
||||||
v0.27.0
|
v0.28.1
|
||||||
|
|
|
@ -2,17 +2,23 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"runtime/debug"
|
"runtime/debug"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/services"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/services"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||||
|
@ -33,6 +39,7 @@ import (
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
"golang.org/x/exp/slices"
|
||||||
)
|
)
|
||||||
|
|
||||||
type (
|
type (
|
||||||
|
@ -50,8 +57,11 @@ type (
|
||||||
resolver *resolver.ContainerResolver
|
resolver *resolver.ContainerResolver
|
||||||
metrics *gateMetrics
|
metrics *gateMetrics
|
||||||
services []*metrics.Service
|
services []*metrics.Service
|
||||||
settings *handler.Settings
|
settings *appSettings
|
||||||
|
|
||||||
servers []Server
|
servers []Server
|
||||||
|
unbindServers []ServerInfo
|
||||||
|
mu sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// App is an interface for the main gateway function.
|
// App is an interface for the main gateway function.
|
||||||
|
@ -69,6 +79,19 @@ type (
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
enabled bool
|
enabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// appSettings stores reloading parameters, so it has to provide getters and setters which use RWMutex.
|
||||||
|
appSettings struct {
|
||||||
|
reconnectInterval time.Duration
|
||||||
|
|
||||||
|
mu sync.RWMutex
|
||||||
|
defaultTimestamp bool
|
||||||
|
zipCompression bool
|
||||||
|
clientCut bool
|
||||||
|
bufferMaxSizeForPut uint64
|
||||||
|
namespaceHeader string
|
||||||
|
defaultNamespaces []string
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
// WithLogger returns Option to set a specific logger.
|
// WithLogger returns Option to set a specific logger.
|
||||||
|
@ -133,9 +156,58 @@ func newApp(ctx context.Context, opt ...Option) App {
|
||||||
return a
|
return a
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) initAppSettings() {
|
func (s *appSettings) DefaultTimestamp() bool {
|
||||||
a.settings = &handler.Settings{}
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.defaultTimestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) setDefaultTimestamp(val bool) {
|
||||||
|
s.mu.Lock()
|
||||||
|
s.defaultTimestamp = val
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) ZipCompression() bool {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.zipCompression
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) setZipCompression(val bool) {
|
||||||
|
s.mu.Lock()
|
||||||
|
s.zipCompression = val
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) ClientCut() bool {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.clientCut
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) setClientCut(val bool) {
|
||||||
|
s.mu.Lock()
|
||||||
|
s.clientCut = val
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) BufferMaxSizeForPut() uint64 {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.bufferMaxSizeForPut
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) setBufferMaxSizeForPut(val uint64) {
|
||||||
|
s.mu.Lock()
|
||||||
|
s.bufferMaxSizeForPut = val
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) initAppSettings() {
|
||||||
|
a.settings = &appSettings{
|
||||||
|
reconnectInterval: fetchReconnectInterval(a.cfg),
|
||||||
|
}
|
||||||
a.updateSettings()
|
a.updateSettings()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -151,6 +223,7 @@ func (a *app) getResolverConfig() ([]string, *resolver.Config) {
|
||||||
resolveCfg := &resolver.Config{
|
resolveCfg := &resolver.Config{
|
||||||
FrostFS: resolver.NewFrostFSResolver(a.pool),
|
FrostFS: resolver.NewFrostFSResolver(a.pool),
|
||||||
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
|
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
|
||||||
|
Settings: a.settings,
|
||||||
}
|
}
|
||||||
|
|
||||||
order := a.cfg.GetStringSlice(cfgResolveOrder)
|
order := a.cfg.GetStringSlice(cfgResolveOrder)
|
||||||
|
@ -333,16 +406,22 @@ func (a *app) Serve() {
|
||||||
a.startServices()
|
a.startServices()
|
||||||
a.initServers(a.ctx)
|
a.initServers(a.ctx)
|
||||||
|
|
||||||
for i := range a.servers {
|
servs := a.getServers()
|
||||||
|
|
||||||
|
for i := range servs {
|
||||||
go func(i int) {
|
go func(i int) {
|
||||||
a.log.Info(logs.StartingServer, zap.String("address", a.servers[i].Address()))
|
a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()))
|
||||||
if err := a.webServer.Serve(a.servers[i].Listener()); err != nil && err != http.ErrServerClosed {
|
if err := a.webServer.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed {
|
||||||
a.metrics.MarkUnhealthy(a.servers[i].Address())
|
a.metrics.MarkUnhealthy(servs[i].Address())
|
||||||
a.log.Fatal(logs.ListenAndServe, zap.Error(err))
|
a.log.Fatal(logs.ListenAndServe, zap.Error(err))
|
||||||
}
|
}
|
||||||
}(i)
|
}(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(a.unbindServers) != 0 {
|
||||||
|
a.scheduleReconnect(a.ctx, a.webServer)
|
||||||
|
}
|
||||||
|
|
||||||
sigs := make(chan os.Signal, 1)
|
sigs := make(chan os.Signal, 1)
|
||||||
signal.Notify(sigs, syscall.SIGHUP)
|
signal.Notify(sigs, syscall.SIGHUP)
|
||||||
|
|
||||||
|
@ -415,8 +494,12 @@ func (a *app) configReload(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) updateSettings() {
|
func (a *app) updateSettings() {
|
||||||
a.settings.SetDefaultTimestamp(a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp))
|
a.settings.setDefaultTimestamp(a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp))
|
||||||
a.settings.SetZipCompression(a.cfg.GetBool(cfgZipCompression))
|
a.settings.setZipCompression(a.cfg.GetBool(cfgZipCompression))
|
||||||
|
a.settings.setClientCut(a.cfg.GetBool(cfgClientCut))
|
||||||
|
a.settings.setBufferMaxSizeForPut(a.cfg.GetUint64(cfgBufferMaxSizeForPut))
|
||||||
|
a.settings.setNamespaceHeader(a.cfg.GetString(cfgResolveNamespaceHeader))
|
||||||
|
a.settings.setDefaultNamespaces(a.cfg.GetStringSlice(cfgResolveDefaultNamespaces))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) startServices() {
|
func (a *app) startServices() {
|
||||||
|
@ -450,15 +533,15 @@ func (a *app) configureRouter(handler *handler.Handler) {
|
||||||
response.Error(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)
|
response.Error(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)
|
||||||
}
|
}
|
||||||
|
|
||||||
r.POST("/upload/{cid}", a.logger(a.tokenizer(a.tracer(handler.Upload))))
|
r.POST("/upload/{cid}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.Upload)))))
|
||||||
a.log.Info(logs.AddedPathUploadCid)
|
a.log.Info(logs.AddedPathUploadCid)
|
||||||
r.GET("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(handler.DownloadByAddressOrBucketName))))
|
r.GET("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.DownloadByAddressOrBucketName)))))
|
||||||
r.HEAD("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(handler.HeadByAddressOrBucketName))))
|
r.HEAD("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.HeadByAddressOrBucketName)))))
|
||||||
a.log.Info(logs.AddedPathGetCidOid)
|
a.log.Info(logs.AddedPathGetCidOid)
|
||||||
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(handler.DownloadByAttribute))))
|
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.DownloadByAttribute)))))
|
||||||
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(handler.HeadByAttribute))))
|
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.HeadByAttribute)))))
|
||||||
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal)
|
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal)
|
||||||
r.GET("/zip/{cid}/{prefix:*}", a.logger(a.tokenizer(a.tracer(handler.DownloadZipped))))
|
r.GET("/zip/{cid}/{prefix:*}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.DownloadZipped)))))
|
||||||
a.log.Info(logs.AddedPathZipCidPrefix)
|
a.log.Info(logs.AddedPathZipCidPrefix)
|
||||||
|
|
||||||
a.webServer.Handler = r.Handler
|
a.webServer.Handler = r.Handler
|
||||||
|
@ -479,8 +562,9 @@ func (a *app) tokenizer(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
||||||
return func(req *fasthttp.RequestCtx) {
|
return func(req *fasthttp.RequestCtx) {
|
||||||
appCtx, err := tokens.StoreBearerTokenAppCtx(a.ctx, req)
|
appCtx, err := tokens.StoreBearerTokenAppCtx(a.ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
a.log.Error(logs.CouldNotFetchAndStoreBearerToken, zap.Error(err))
|
a.log.Error(logs.CouldNotFetchAndStoreBearerToken, zap.Uint64("id", req.ID()), zap.Error(err))
|
||||||
response.Error(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
response.Error(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
utils.SetContextToRequest(appCtx, req)
|
utils.SetContextToRequest(appCtx, req)
|
||||||
h(req)
|
h(req)
|
||||||
|
@ -497,6 +581,20 @@ func (a *app) tracer(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
||||||
span.End()
|
span.End()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
appCtx = treepool.SetRequestID(appCtx, strconv.FormatUint(req.ID(), 10))
|
||||||
|
|
||||||
|
utils.SetContextToRequest(appCtx, req)
|
||||||
|
h(req)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) reqNamespace(h fasthttp.RequestHandler) fasthttp.RequestHandler {
|
||||||
|
return func(req *fasthttp.RequestCtx) {
|
||||||
|
appCtx := utils.GetContextFromRequest(req)
|
||||||
|
|
||||||
|
nsBytes := req.Request.Header.Peek(a.settings.NamespaceHeader())
|
||||||
|
appCtx = middleware.SetNamespace(appCtx, string(nsBytes))
|
||||||
|
|
||||||
utils.SetContextToRequest(appCtx, req)
|
utils.SetContextToRequest(appCtx, req)
|
||||||
h(req)
|
h(req)
|
||||||
}
|
}
|
||||||
|
@ -508,11 +606,12 @@ func (a *app) AppParams() *utils.AppParams {
|
||||||
Pool: a.pool,
|
Pool: a.pool,
|
||||||
Owner: a.owner,
|
Owner: a.owner,
|
||||||
Resolver: a.resolver,
|
Resolver: a.resolver,
|
||||||
|
Cache: cache.NewBucketCache(getCacheOptions(a.cfg, a.log)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) initServers(ctx context.Context) {
|
func (a *app) initServers(ctx context.Context) {
|
||||||
serversInfo := fetchServers(a.cfg)
|
serversInfo := fetchServers(a.cfg, a.log)
|
||||||
|
|
||||||
a.servers = make([]Server, 0, len(serversInfo))
|
a.servers = make([]Server, 0, len(serversInfo))
|
||||||
for _, serverInfo := range serversInfo {
|
for _, serverInfo := range serversInfo {
|
||||||
|
@ -522,6 +621,7 @@ func (a *app) initServers(ctx context.Context) {
|
||||||
}
|
}
|
||||||
srv, err := newServer(ctx, serverInfo)
|
srv, err := newServer(ctx, serverInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
a.unbindServers = append(a.unbindServers, serverInfo)
|
||||||
a.metrics.MarkUnhealthy(serverInfo.Address)
|
a.metrics.MarkUnhealthy(serverInfo.Address)
|
||||||
a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err))...)
|
a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err))...)
|
||||||
continue
|
continue
|
||||||
|
@ -538,22 +638,25 @@ func (a *app) initServers(ctx context.Context) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) updateServers() error {
|
func (a *app) updateServers() error {
|
||||||
serversInfo := fetchServers(a.cfg)
|
serversInfo := fetchServers(a.cfg, a.log)
|
||||||
|
|
||||||
|
a.mu.Lock()
|
||||||
|
defer a.mu.Unlock()
|
||||||
|
|
||||||
var found bool
|
var found bool
|
||||||
for _, serverInfo := range serversInfo {
|
for _, serverInfo := range serversInfo {
|
||||||
index := a.serverIndex(serverInfo.Address)
|
ser := a.getServer(serverInfo.Address)
|
||||||
if index == -1 {
|
if ser != nil {
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if serverInfo.TLS.Enabled {
|
if serverInfo.TLS.Enabled {
|
||||||
if err := a.servers[index].UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
|
if err := ser.UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
|
||||||
return fmt.Errorf("failed to update tls certs: %w", err)
|
return fmt.Errorf("failed to update tls certs: %w", err)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
found = true
|
found = true
|
||||||
}
|
}
|
||||||
|
} else if unbind := a.updateUnbindServerInfo(serverInfo); unbind {
|
||||||
|
found = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if !found {
|
if !found {
|
||||||
return fmt.Errorf("invalid servers configuration: no known server found")
|
return fmt.Errorf("invalid servers configuration: no known server found")
|
||||||
|
@ -562,13 +665,29 @@ func (a *app) updateServers() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) serverIndex(address string) int {
|
func (a *app) getServers() []Server {
|
||||||
|
a.mu.RLock()
|
||||||
|
defer a.mu.RUnlock()
|
||||||
|
return a.servers
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) getServer(address string) Server {
|
||||||
for i := range a.servers {
|
for i := range a.servers {
|
||||||
if a.servers[i].Address() == address {
|
if a.servers[i].Address() == address {
|
||||||
return i
|
return a.servers[i]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return -1
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) updateUnbindServerInfo(info ServerInfo) bool {
|
||||||
|
for i := range a.unbindServers {
|
||||||
|
if a.unbindServers[i].Address == info.Address {
|
||||||
|
a.unbindServers[i] = info
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (a *app) initTracing(ctx context.Context) {
|
func (a *app) initTracing(ctx context.Context) {
|
||||||
|
@ -608,3 +727,93 @@ func (a *app) setRuntimeParameters() {
|
||||||
zap.Int64("old_value", previous))
|
zap.Int64("old_value", previous))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) NamespaceHeader() string {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
return s.namespaceHeader
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) setNamespaceHeader(nsHeader string) {
|
||||||
|
s.mu.Lock()
|
||||||
|
s.namespaceHeader = nsHeader
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) FormContainerZone(ns string) (zone string, isDefault bool) {
|
||||||
|
s.mu.RLock()
|
||||||
|
namespaces := s.defaultNamespaces
|
||||||
|
s.mu.RUnlock()
|
||||||
|
if slices.Contains(namespaces, ns) {
|
||||||
|
return v2container.SysAttributeZoneDefault, true
|
||||||
|
}
|
||||||
|
|
||||||
|
return ns + ".ns", false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *appSettings) setDefaultNamespaces(namespaces []string) {
|
||||||
|
for i := range namespaces { // to be set namespaces in env variable as `HTTP_GW_RESOLVE_BUCKET_DEFAULT_NAMESPACES="" "root"`
|
||||||
|
namespaces[i] = strings.Trim(namespaces[i], "\"")
|
||||||
|
}
|
||||||
|
|
||||||
|
s.mu.Lock()
|
||||||
|
s.defaultNamespaces = namespaces
|
||||||
|
s.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) scheduleReconnect(ctx context.Context, srv *fasthttp.Server) {
|
||||||
|
go func() {
|
||||||
|
t := time.NewTicker(a.settings.reconnectInterval)
|
||||||
|
defer t.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-t.C:
|
||||||
|
if a.tryReconnect(ctx, srv) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.Reset(a.settings.reconnectInterval)
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a *app) tryReconnect(ctx context.Context, sr *fasthttp.Server) bool {
|
||||||
|
a.mu.Lock()
|
||||||
|
defer a.mu.Unlock()
|
||||||
|
|
||||||
|
a.log.Info(logs.ServerReconnecting)
|
||||||
|
var failedServers []ServerInfo
|
||||||
|
|
||||||
|
for _, serverInfo := range a.unbindServers {
|
||||||
|
fields := []zap.Field{
|
||||||
|
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
|
||||||
|
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
|
||||||
|
}
|
||||||
|
|
||||||
|
srv, err := newServer(ctx, serverInfo)
|
||||||
|
if err != nil {
|
||||||
|
a.log.Warn(logs.ServerReconnectFailed, zap.Error(err))
|
||||||
|
failedServers = append(failedServers, serverInfo)
|
||||||
|
a.metrics.MarkUnhealthy(serverInfo.Address)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
a.log.Info(logs.StartingServer, zap.String("address", srv.Address()))
|
||||||
|
a.metrics.MarkHealthy(serverInfo.Address)
|
||||||
|
if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) {
|
||||||
|
a.log.Warn(logs.ListenAndServe, zap.Error(err))
|
||||||
|
a.metrics.MarkUnhealthy(serverInfo.Address)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
a.servers = append(a.servers, srv)
|
||||||
|
a.log.Info(logs.ServerReconnectedSuccessfully, fields...)
|
||||||
|
}
|
||||||
|
|
||||||
|
a.unbindServers = failedServers
|
||||||
|
|
||||||
|
return len(a.unbindServers) == 0
|
||||||
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import (
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"github.com/testcontainers/testcontainers-go"
|
"github.com/testcontainers/testcontainers-go"
|
||||||
"github.com/testcontainers/testcontainers-go/wait"
|
"github.com/testcontainers/testcontainers-go/wait"
|
||||||
|
"go.uber.org/zap/zapcore"
|
||||||
)
|
)
|
||||||
|
|
||||||
type putResponse struct {
|
type putResponse struct {
|
||||||
|
@ -68,6 +69,7 @@ func TestIntegration(t *testing.T) {
|
||||||
t.Run("simple get "+version, func(t *testing.T) { simpleGet(ctx, t, clientPool, ownerID, CID, version) })
|
t.Run("simple get "+version, func(t *testing.T) { simpleGet(ctx, t, clientPool, ownerID, CID, version) })
|
||||||
t.Run("get by attribute "+version, func(t *testing.T) { getByAttr(ctx, t, clientPool, ownerID, CID, version) })
|
t.Run("get by attribute "+version, func(t *testing.T) { getByAttr(ctx, t, clientPool, ownerID, CID, version) })
|
||||||
t.Run("get zip "+version, func(t *testing.T) { getZip(ctx, t, clientPool, ownerID, CID, version) })
|
t.Run("get zip "+version, func(t *testing.T) { getZip(ctx, t, clientPool, ownerID, CID, version) })
|
||||||
|
t.Run("test namespaces "+version, func(t *testing.T) { checkNamespaces(ctx, t, clientPool, ownerID, CID, version) })
|
||||||
|
|
||||||
cancel()
|
cancel()
|
||||||
server.Wait()
|
server.Wait()
|
||||||
|
@ -81,7 +83,7 @@ func runServer() (App, context.CancelFunc) {
|
||||||
cancelCtx, cancel := context.WithCancel(context.Background())
|
cancelCtx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
v := getDefaultConfig()
|
v := getDefaultConfig()
|
||||||
l, lvl := newLogger(v)
|
l, lvl := newStdoutLogger(zapcore.DebugLevel)
|
||||||
application := newApp(cancelCtx, WithConfig(v), WithLogger(l, lvl))
|
application := newApp(cancelCtx, WithConfig(v), WithLogger(l, lvl))
|
||||||
go application.Serve()
|
go application.Serve()
|
||||||
|
|
||||||
|
@ -338,6 +340,40 @@ func checkZip(t *testing.T, data []byte, length int64, names, contents []string)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkNamespaces(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, CID cid.ID, version string) {
|
||||||
|
content := "content of file"
|
||||||
|
attributes := map[string]string{
|
||||||
|
"some-attr": "some-get-value",
|
||||||
|
}
|
||||||
|
|
||||||
|
id := putObject(ctx, t, clientPool, ownerID, CID, content, attributes)
|
||||||
|
|
||||||
|
req, err := http.NewRequest(http.MethodGet, testHost+"/get/"+testContainerName+"/"+id.String(), nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
req.Header.Set(defaultNamespaceHeader, "")
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
checkGetResponse(t, resp, content, attributes)
|
||||||
|
|
||||||
|
req, err = http.NewRequest(http.MethodGet, testHost+"/get/"+testContainerName+"/"+id.String(), nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
req.Header.Set(defaultNamespaceHeader, "root")
|
||||||
|
|
||||||
|
resp, err = http.DefaultClient.Do(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
checkGetResponse(t, resp, content, attributes)
|
||||||
|
|
||||||
|
req, err = http.NewRequest(http.MethodGet, testHost+"/get/"+testContainerName+"/"+id.String(), nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
req.Header.Set(defaultNamespaceHeader, "root2")
|
||||||
|
|
||||||
|
resp, err = http.DefaultClient.Do(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, http.StatusNotFound, resp.StatusCode)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
func createDockerContainer(ctx context.Context, t *testing.T, image string) testcontainers.Container {
|
func createDockerContainer(ctx context.Context, t *testing.T, image string) testcontainers.Container {
|
||||||
req := testcontainers.ContainerRequest{
|
req := testcontainers.ContainerRequest{
|
||||||
Image: image,
|
Image: image,
|
||||||
|
|
|
@ -9,7 +9,7 @@ import (
|
||||||
func main() {
|
func main() {
|
||||||
globalContext, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
globalContext, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
|
||||||
v := settings()
|
v := settings()
|
||||||
logger, atomicLevel := newLogger(v)
|
logger, atomicLevel := pickLogger(v)
|
||||||
|
|
||||||
application := newApp(globalContext, WithLogger(logger, atomicLevel), WithConfig(v))
|
application := newApp(globalContext, WithLogger(logger, atomicLevel), WithConfig(v))
|
||||||
go application.Serve()
|
go application.Serve()
|
||||||
|
|
|
@ -68,11 +68,13 @@ func newServer(ctx context.Context, serverInfo ServerInfo) (*server, error) {
|
||||||
|
|
||||||
if serverInfo.TLS.Enabled {
|
if serverInfo.TLS.Enabled {
|
||||||
if err = tlsProvider.UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
|
if err = tlsProvider.UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
|
||||||
return nil, fmt.Errorf("failed to update cert: %w", err)
|
lnErr := ln.Close()
|
||||||
|
return nil, fmt.Errorf("failed to update cert (listener close: %v): %w", lnErr, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
ln = tls.NewListener(ln, &tls.Config{
|
ln = tls.NewListener(ln, &tls.Config{
|
||||||
GetCertificate: tlsProvider.GetCertificate,
|
GetCertificate: tlsProvider.GetCertificate,
|
||||||
|
NextProtos: []string{"h2"}, // required to enable HTTP/2 requests in `http.Serve`
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
119
cmd/http-gw/server_test.go
Normal file
119
cmd/http-gw/server_test.go
Normal file
|
@ -0,0 +1,119 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/rand"
|
||||||
|
"crypto/rsa"
|
||||||
|
"crypto/tls"
|
||||||
|
"crypto/x509"
|
||||||
|
"crypto/x509/pkix"
|
||||||
|
"encoding/pem"
|
||||||
|
"fmt"
|
||||||
|
"math/big"
|
||||||
|
"net"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"path"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/net/http2"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
expHeaderKey = "Foo"
|
||||||
|
expHeaderValue = "Bar"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestHTTP2TLS(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
certPath, keyPath := prepareTestCerts(t)
|
||||||
|
|
||||||
|
srv := &http.Server{
|
||||||
|
Handler: http.HandlerFunc(testHandler),
|
||||||
|
}
|
||||||
|
|
||||||
|
tlsListener, err := newServer(ctx, ServerInfo{
|
||||||
|
Address: ":0",
|
||||||
|
TLS: ServerTLSInfo{
|
||||||
|
Enabled: true,
|
||||||
|
CertFile: certPath,
|
||||||
|
KeyFile: keyPath,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
port := tlsListener.Listener().Addr().(*net.TCPAddr).Port
|
||||||
|
addr := fmt.Sprintf("https://localhost:%d", port)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
_ = srv.Serve(tlsListener.Listener())
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Server is running, now send HTTP/2 request
|
||||||
|
|
||||||
|
tlsClientConfig := &tls.Config{
|
||||||
|
InsecureSkipVerify: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
cliHTTP1 := http.Client{Transport: &http.Transport{TLSClientConfig: tlsClientConfig}}
|
||||||
|
cliHTTP2 := http.Client{Transport: &http2.Transport{TLSClientConfig: tlsClientConfig}}
|
||||||
|
|
||||||
|
req, err := http.NewRequest("GET", addr, nil)
|
||||||
|
require.NoError(t, err)
|
||||||
|
req.Header[expHeaderKey] = []string{expHeaderValue}
|
||||||
|
|
||||||
|
resp, err := cliHTTP1.Do(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
||||||
|
|
||||||
|
resp, err = cliHTTP2.Do(req)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, http.StatusOK, resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
func testHandler(resp http.ResponseWriter, req *http.Request) {
|
||||||
|
hdr, ok := req.Header[expHeaderKey]
|
||||||
|
if !ok || len(hdr) != 1 || hdr[0] != expHeaderValue {
|
||||||
|
resp.WriteHeader(http.StatusBadRequest)
|
||||||
|
} else {
|
||||||
|
resp.WriteHeader(http.StatusOK)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareTestCerts(t *testing.T) (certPath, keyPath string) {
|
||||||
|
privateKey, err := rsa.GenerateKey(rand.Reader, 2048)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
template := x509.Certificate{
|
||||||
|
SerialNumber: big.NewInt(1),
|
||||||
|
Subject: pkix.Name{CommonName: "localhost"},
|
||||||
|
NotBefore: time.Now(),
|
||||||
|
NotAfter: time.Now().Add(time.Hour * 24 * 365),
|
||||||
|
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
|
||||||
|
BasicConstraintsValid: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &privateKey.PublicKey, privateKey)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
dir := t.TempDir()
|
||||||
|
certPath = path.Join(dir, "cert.pem")
|
||||||
|
keyPath = path.Join(dir, "key.pem")
|
||||||
|
|
||||||
|
certFile, err := os.Create(certPath)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer certFile.Close()
|
||||||
|
|
||||||
|
keyFile, err := os.Create(keyPath)
|
||||||
|
require.NoError(t, err)
|
||||||
|
defer keyFile.Close()
|
||||||
|
|
||||||
|
err = pem.Encode(certFile, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
err = pem.Encode(keyFile, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(privateKey)})
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
return certPath, keyPath
|
||||||
|
}
|
|
@ -13,20 +13,28 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||||
grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
|
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
|
||||||
|
"git.frostfs.info/TrueCloudLab/zapjournald"
|
||||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||||
"github.com/spf13/pflag"
|
"github.com/spf13/pflag"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
|
"github.com/ssgreg/journald"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
destinationStdout = "stdout"
|
||||||
|
destinationJournald = "journald"
|
||||||
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
defaultRebalanceTimer = 60 * time.Second
|
defaultRebalanceTimer = 60 * time.Second
|
||||||
defaultRequestTimeout = 15 * time.Second
|
defaultRequestTimeout = 15 * time.Second
|
||||||
|
@ -39,11 +47,19 @@ const (
|
||||||
|
|
||||||
defaultSoftMemoryLimit = math.MaxInt64
|
defaultSoftMemoryLimit = math.MaxInt64
|
||||||
|
|
||||||
|
defaultBufferMaxSizeForPut = 1024 * 1024 // 1mb
|
||||||
|
|
||||||
|
defaultNamespaceHeader = "X-Frostfs-Namespace"
|
||||||
|
|
||||||
|
defaultReconnectInterval = time.Minute
|
||||||
|
|
||||||
cfgServer = "server"
|
cfgServer = "server"
|
||||||
cfgTLSEnabled = "tls.enabled"
|
cfgTLSEnabled = "tls.enabled"
|
||||||
cfgTLSCertFile = "tls.cert_file"
|
cfgTLSCertFile = "tls.cert_file"
|
||||||
cfgTLSKeyFile = "tls.key_file"
|
cfgTLSKeyFile = "tls.key_file"
|
||||||
|
|
||||||
|
cfgReconnectInterval = "reconnect_interval"
|
||||||
|
|
||||||
// Web.
|
// Web.
|
||||||
cfgWebReadBufferSize = "web.read_buffer_size"
|
cfgWebReadBufferSize = "web.read_buffer_size"
|
||||||
cfgWebWriteBufferSize = "web.write_buffer_size"
|
cfgWebWriteBufferSize = "web.write_buffer_size"
|
||||||
|
@ -72,6 +88,7 @@ const (
|
||||||
|
|
||||||
// Logger.
|
// Logger.
|
||||||
cfgLoggerLevel = "logger.level"
|
cfgLoggerLevel = "logger.level"
|
||||||
|
cfgLoggerDestination = "logger.destination"
|
||||||
|
|
||||||
// Wallet.
|
// Wallet.
|
||||||
cfgWalletPassphrase = "wallet.passphrase"
|
cfgWalletPassphrase = "wallet.passphrase"
|
||||||
|
@ -96,6 +113,22 @@ const (
|
||||||
// Runtime.
|
// Runtime.
|
||||||
cfgSoftMemoryLimit = "runtime.soft_memory_limit"
|
cfgSoftMemoryLimit = "runtime.soft_memory_limit"
|
||||||
|
|
||||||
|
// Enabling client side object preparing for PUT operations.
|
||||||
|
cfgClientCut = "frostfs.client_cut"
|
||||||
|
// Sets max buffer size for read payload in put operations.
|
||||||
|
cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put"
|
||||||
|
// Configuration of parameters of requests to FrostFS.
|
||||||
|
// Sets max attempt to make successful tree request.
|
||||||
|
cfgTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts"
|
||||||
|
|
||||||
|
// Caching.
|
||||||
|
cfgBucketsCacheLifetime = "cache.buckets.lifetime"
|
||||||
|
cfgBucketsCacheSize = "cache.buckets.size"
|
||||||
|
|
||||||
|
// Bucket resolving options.
|
||||||
|
cfgResolveNamespaceHeader = "resolve_bucket.namespace_header"
|
||||||
|
cfgResolveDefaultNamespaces = "resolve_bucket.default_namespaces"
|
||||||
|
|
||||||
// Command line args.
|
// Command line args.
|
||||||
cmdHelp = "help"
|
cmdHelp = "help"
|
||||||
cmdVersion = "version"
|
cmdVersion = "version"
|
||||||
|
@ -153,10 +186,14 @@ func settings() *viper.Viper {
|
||||||
|
|
||||||
// logger:
|
// logger:
|
||||||
v.SetDefault(cfgLoggerLevel, "debug")
|
v.SetDefault(cfgLoggerLevel, "debug")
|
||||||
|
v.SetDefault(cfgLoggerDestination, "stdout")
|
||||||
|
|
||||||
// pool:
|
// pool:
|
||||||
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
|
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
|
||||||
|
|
||||||
|
// frostfs:
|
||||||
|
v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut)
|
||||||
|
|
||||||
// web-server:
|
// web-server:
|
||||||
v.SetDefault(cfgWebReadBufferSize, 4096)
|
v.SetDefault(cfgWebReadBufferSize, 4096)
|
||||||
v.SetDefault(cfgWebWriteBufferSize, 4096)
|
v.SetDefault(cfgWebWriteBufferSize, 4096)
|
||||||
|
@ -175,6 +212,10 @@ func settings() *viper.Viper {
|
||||||
v.SetDefault(cfgPprofAddress, "localhost:8083")
|
v.SetDefault(cfgPprofAddress, "localhost:8083")
|
||||||
v.SetDefault(cfgPrometheusAddress, "localhost:8084")
|
v.SetDefault(cfgPrometheusAddress, "localhost:8084")
|
||||||
|
|
||||||
|
// resolve bucket
|
||||||
|
v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader)
|
||||||
|
v.SetDefault(cfgResolveDefaultNamespaces, []string{"", "root"})
|
||||||
|
|
||||||
// Binding flags
|
// Binding flags
|
||||||
if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil {
|
if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
@ -334,7 +375,25 @@ func mergeConfig(v *viper.Viper, fileName string) error {
|
||||||
return v.MergeConfig(cfgFile)
|
return v.MergeConfig(cfgFile)
|
||||||
}
|
}
|
||||||
|
|
||||||
// newLogger constructs a zap.Logger instance for current application.
|
func pickLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) {
|
||||||
|
lvl, err := getLogLevel(v)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
dest := v.GetString(cfgLoggerDestination)
|
||||||
|
|
||||||
|
switch dest {
|
||||||
|
case destinationStdout:
|
||||||
|
return newStdoutLogger(lvl)
|
||||||
|
case destinationJournald:
|
||||||
|
return newJournaldLogger(lvl)
|
||||||
|
default:
|
||||||
|
panic(fmt.Sprintf("wrong destination for logger: %s", dest))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// newStdoutLogger constructs a zap.Logger instance for current application.
|
||||||
// Panics on failure.
|
// Panics on failure.
|
||||||
//
|
//
|
||||||
// Logger is built from zap's production logging configuration with:
|
// Logger is built from zap's production logging configuration with:
|
||||||
|
@ -345,12 +404,7 @@ func mergeConfig(v *viper.Viper, fileName string) error {
|
||||||
// Logger records a stack trace for all messages at or above fatal level.
|
// Logger records a stack trace for all messages at or above fatal level.
|
||||||
//
|
//
|
||||||
// See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace.
|
// See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace.
|
||||||
func newLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) {
|
func newStdoutLogger(lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) {
|
||||||
lvl, err := getLogLevel(v)
|
|
||||||
if err != nil {
|
|
||||||
panic(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
c := zap.NewProductionConfig()
|
c := zap.NewProductionConfig()
|
||||||
c.Level = zap.NewAtomicLevelAt(lvl)
|
c.Level = zap.NewAtomicLevelAt(lvl)
|
||||||
c.Encoding = "console"
|
c.Encoding = "console"
|
||||||
|
@ -366,6 +420,25 @@ func newLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) {
|
||||||
return l, c.Level
|
return l, c.Level
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newJournaldLogger(lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) {
|
||||||
|
c := zap.NewProductionConfig()
|
||||||
|
c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
|
||||||
|
c.Level = zap.NewAtomicLevelAt(lvl)
|
||||||
|
|
||||||
|
encoder := zapjournald.NewPartialEncoder(zapcore.NewConsoleEncoder(c.EncoderConfig), zapjournald.SyslogFields)
|
||||||
|
|
||||||
|
core := zapjournald.NewCore(c.Level, encoder, &journald.Journal{}, zapjournald.SyslogFields)
|
||||||
|
coreWithContext := core.With([]zapcore.Field{
|
||||||
|
zapjournald.SyslogFacility(zapjournald.LogDaemon),
|
||||||
|
zapjournald.SyslogIdentifier(),
|
||||||
|
zapjournald.SyslogPid(),
|
||||||
|
})
|
||||||
|
|
||||||
|
l := zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)))
|
||||||
|
|
||||||
|
return l, c.Level
|
||||||
|
}
|
||||||
|
|
||||||
func getLogLevel(v *viper.Viper) (zapcore.Level, error) {
|
func getLogLevel(v *viper.Viper) (zapcore.Level, error) {
|
||||||
var lvl zapcore.Level
|
var lvl zapcore.Level
|
||||||
lvlStr := v.GetString(cfgLoggerLevel)
|
lvlStr := v.GetString(cfgLoggerLevel)
|
||||||
|
@ -385,8 +458,18 @@ func getLogLevel(v *viper.Viper) (zapcore.Level, error) {
|
||||||
return lvl, nil
|
return lvl, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func fetchServers(v *viper.Viper) []ServerInfo {
|
func fetchReconnectInterval(cfg *viper.Viper) time.Duration {
|
||||||
|
reconnect := cfg.GetDuration(cfgReconnectInterval)
|
||||||
|
if reconnect <= 0 {
|
||||||
|
reconnect = defaultReconnectInterval
|
||||||
|
}
|
||||||
|
|
||||||
|
return reconnect
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
|
||||||
var servers []ServerInfo
|
var servers []ServerInfo
|
||||||
|
seen := make(map[string]struct{})
|
||||||
|
|
||||||
for i := 0; ; i++ {
|
for i := 0; ; i++ {
|
||||||
key := cfgServer + "." + strconv.Itoa(i) + "."
|
key := cfgServer + "." + strconv.Itoa(i) + "."
|
||||||
|
@ -401,6 +484,11 @@ func fetchServers(v *viper.Viper) []ServerInfo {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if _, ok := seen[serverInfo.Address]; ok {
|
||||||
|
log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
seen[serverInfo.Address] = struct{}{}
|
||||||
servers = append(servers, serverInfo)
|
servers = append(servers, serverInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -461,6 +549,8 @@ func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.
|
||||||
prm.SetLogger(logger)
|
prm.SetLogger(logger)
|
||||||
prmTree.SetLogger(logger)
|
prmTree.SetLogger(logger)
|
||||||
|
|
||||||
|
prmTree.SetMaxRequestAttempts(cfg.GetInt(cfgTreePoolMaxAttempts))
|
||||||
|
|
||||||
var apiGRPCDialOpts []grpc.DialOption
|
var apiGRPCDialOpts []grpc.DialOption
|
||||||
var treeGRPCDialOpts []grpc.DialOption
|
var treeGRPCDialOpts []grpc.DialOption
|
||||||
if cfg.GetBool(cfgTracingEnabled) {
|
if cfg.GetBool(cfgTracingEnabled) {
|
||||||
|
@ -531,3 +621,44 @@ func fetchSoftMemoryLimit(cfg *viper.Viper) int64 {
|
||||||
|
|
||||||
return int64(softMemoryLimit)
|
return int64(softMemoryLimit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getCacheOptions(v *viper.Viper, l *zap.Logger) *cache.Config {
|
||||||
|
cacheCfg := cache.DefaultBucketConfig(l)
|
||||||
|
|
||||||
|
cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Lifetime)
|
||||||
|
cacheCfg.Size = fetchCacheSize(v, l, cfgBucketsCacheSize, cacheCfg.Size)
|
||||||
|
|
||||||
|
return cacheCfg
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration {
|
||||||
|
if v.IsSet(cfgEntry) {
|
||||||
|
lifetime := v.GetDuration(cfgEntry)
|
||||||
|
if lifetime <= 0 {
|
||||||
|
l.Error(logs.InvalidLifetimeUsingDefaultValue,
|
||||||
|
zap.String("parameter", cfgEntry),
|
||||||
|
zap.Duration("value in config", lifetime),
|
||||||
|
zap.Duration("default", defaultValue))
|
||||||
|
} else {
|
||||||
|
return lifetime
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return defaultValue
|
||||||
|
}
|
||||||
|
|
||||||
|
func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue int) int {
|
||||||
|
if v.IsSet(cfgEntry) {
|
||||||
|
size := v.GetInt(cfgEntry)
|
||||||
|
if size <= 0 {
|
||||||
|
l.Error(logs.InvalidCacheSizeUsingDefaultValue,
|
||||||
|
zap.String("parameter", cfgEntry),
|
||||||
|
zap.Int("value in config", size),
|
||||||
|
zap.Int("default", defaultValue))
|
||||||
|
} else {
|
||||||
|
return size
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return defaultValue
|
||||||
|
}
|
||||||
|
|
|
@ -26,6 +26,9 @@ HTTP_GW_SERVER_1_TLS_ENABLED=true
|
||||||
HTTP_GW_SERVER_1_TLS_CERT_FILE=/path/to/tls/cert
|
HTTP_GW_SERVER_1_TLS_CERT_FILE=/path/to/tls/cert
|
||||||
HTTP_GW_SERVER_1_TLS_KEY_FILE=/path/to/tls/key
|
HTTP_GW_SERVER_1_TLS_KEY_FILE=/path/to/tls/key
|
||||||
|
|
||||||
|
# How often to reconnect to the servers
|
||||||
|
HTTP_GW_RECONNECT_INTERVAL: 1m
|
||||||
|
|
||||||
# Nodes configuration.
|
# Nodes configuration.
|
||||||
# This configuration make the gateway use the first node (grpc://s01.frostfs.devenv:8080)
|
# This configuration make the gateway use the first node (grpc://s01.frostfs.devenv:8080)
|
||||||
# while it's healthy. Otherwise, the gateway use the second node (grpc://s01.frostfs.devenv:8080)
|
# while it's healthy. Otherwise, the gateway use the second node (grpc://s01.frostfs.devenv:8080)
|
||||||
|
@ -98,3 +101,23 @@ HTTP_GW_TRACING_ENDPOINT="localhost:4317"
|
||||||
HTTP_GW_TRACING_EXPORTER="otlp_grpc"
|
HTTP_GW_TRACING_EXPORTER="otlp_grpc"
|
||||||
|
|
||||||
HTTP_GW_RUNTIME_SOFT_MEMORY_LIMIT=1073741824
|
HTTP_GW_RUNTIME_SOFT_MEMORY_LIMIT=1073741824
|
||||||
|
|
||||||
|
# Parameters of requests to FrostFS
|
||||||
|
# This flag enables client side object preparing.
|
||||||
|
HTTP_GW_FROSTFS_CLIENT_CUT=false
|
||||||
|
# Sets max buffer size for read payload in put operations.
|
||||||
|
HTTP_GW_FROSTFS_BUFFER_MAX_SIZE_FOR_PUT=1048576
|
||||||
|
|
||||||
|
# Caching
|
||||||
|
# Cache which contains mapping of bucket name to bucket info
|
||||||
|
HTTP_GW_CACHE_BUCKETS_LIFETIME=1m
|
||||||
|
HTTP_GW_CACHE_BUCKETS_SIZE=1000
|
||||||
|
|
||||||
|
# Header to determine zone to resolve bucket name
|
||||||
|
HTTP_GW_RESOLVE_BUCKET_NAMESPACE_HEADER=X-Frostfs-Namespace
|
||||||
|
# Namespaces that should be handled as default
|
||||||
|
HTTP_GW_RESOLVE_BUCKET_DEFAULT_NAMESPACES="" "root"
|
||||||
|
|
||||||
|
# Max attempt to make successful tree request.
|
||||||
|
# default value is 0 that means the number of attempts equals to number of nodes in pool.
|
||||||
|
HTTP_GW_FROSTFS_TREE_POOL_MAX_ATTEMPTS=0
|
||||||
|
|
|
@ -16,6 +16,7 @@ tracing:
|
||||||
|
|
||||||
logger:
|
logger:
|
||||||
level: debug # Log level.
|
level: debug # Log level.
|
||||||
|
destination: stdout
|
||||||
|
|
||||||
server:
|
server:
|
||||||
- address: 0.0.0.0:8080
|
- address: 0.0.0.0:8080
|
||||||
|
@ -54,6 +55,7 @@ peers:
|
||||||
priority: 2
|
priority: 2
|
||||||
weight: 9
|
weight: 9
|
||||||
|
|
||||||
|
reconnect_interval: 1m
|
||||||
|
|
||||||
web:
|
web:
|
||||||
# Per-connection buffer size for requests' reading.
|
# Per-connection buffer size for requests' reading.
|
||||||
|
@ -104,3 +106,24 @@ zip:
|
||||||
|
|
||||||
runtime:
|
runtime:
|
||||||
soft_memory_limit: 1gb
|
soft_memory_limit: 1gb
|
||||||
|
|
||||||
|
# Parameters of requests to FrostFS
|
||||||
|
frostfs:
|
||||||
|
# This flag enables client side object preparing.
|
||||||
|
client_cut: false
|
||||||
|
# Sets max buffer size for read payload in put operations.
|
||||||
|
buffer_max_size_for_put: 1048576
|
||||||
|
# Max attempt to make successful tree request.
|
||||||
|
# default value is 0 that means the number of attempts equals to number of nodes in pool.
|
||||||
|
tree_pool_max_attempts: 0
|
||||||
|
|
||||||
|
# Caching
|
||||||
|
cache:
|
||||||
|
# Cache which contains mapping of bucket name to bucket info
|
||||||
|
buckets:
|
||||||
|
lifetime: 1m
|
||||||
|
size: 1000
|
||||||
|
|
||||||
|
resolve_bucket:
|
||||||
|
namespace_header: X-Frostfs-Namespace
|
||||||
|
default_namespaces: [ "", "root" ]
|
|
@ -41,7 +41,7 @@ $ cat http.log
|
||||||
# Structure
|
# Structure
|
||||||
|
|
||||||
| Section | Description |
|
| Section | Description |
|
||||||
|-----------------|-------------------------------------------------------|
|
|------------------|----------------------------------------------------------------|
|
||||||
| no section | [General parameters](#general-section) |
|
| no section | [General parameters](#general-section) |
|
||||||
| `wallet` | [Wallet configuration](#wallet-section) |
|
| `wallet` | [Wallet configuration](#wallet-section) |
|
||||||
| `peers` | [Nodes configuration](#peers-section) |
|
| `peers` | [Nodes configuration](#peers-section) |
|
||||||
|
@ -54,6 +54,9 @@ $ cat http.log
|
||||||
| `prometheus` | [Prometheus configuration](#prometheus-section) |
|
| `prometheus` | [Prometheus configuration](#prometheus-section) |
|
||||||
| `tracing` | [Tracing configuration](#tracing-section) |
|
| `tracing` | [Tracing configuration](#tracing-section) |
|
||||||
| `runtime` | [Runtime configuration](#runtime-section) |
|
| `runtime` | [Runtime configuration](#runtime-section) |
|
||||||
|
| `frostfs` | [Frostfs configuration](#frostfs-section) |
|
||||||
|
| `cache` | [Cache configuration](#cache-section) |
|
||||||
|
| `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) |
|
||||||
|
|
||||||
|
|
||||||
# General section
|
# General section
|
||||||
|
@ -69,6 +72,7 @@ stream_timeout: 10s
|
||||||
request_timeout: 5s
|
request_timeout: 5s
|
||||||
rebalance_timer: 30s
|
rebalance_timer: 30s
|
||||||
pool_error_threshold: 100
|
pool_error_threshold: 100
|
||||||
|
reconnect_interval: 1m
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|
@ -80,6 +84,7 @@ pool_error_threshold: 100
|
||||||
| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. |
|
| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. |
|
||||||
| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. |
|
| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. |
|
||||||
| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. |
|
| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. |
|
||||||
|
| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. |
|
||||||
|
|
||||||
# `wallet` section
|
# `wallet` section
|
||||||
|
|
||||||
|
@ -158,12 +163,13 @@ server:
|
||||||
```yaml
|
```yaml
|
||||||
logger:
|
logger:
|
||||||
level: debug
|
level: debug
|
||||||
|
destination: stdout
|
||||||
```
|
```
|
||||||
|
|
||||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|-----------|----------|---------------|---------------|----------------------------------------------------------------------------------------------------|
|
|---------------|----------|---------------|---------------|----------------------------------------------------------------------------------------------------|
|
||||||
| `level` | `string` | yes | `debug` | Logging level.<br/>Possible values: `debug`, `info`, `warn`, `error`, `dpanic`, `panic`, `fatal`. |
|
| `level` | `string` | yes | `debug` | Logging level.<br/>Possible values: `debug`, `info`, `warn`, `error`, `dpanic`, `panic`, `fatal`. |
|
||||||
|
| `destination` | `string` | no | `stdout` | Destination for logger: `stdout` or `journald` |
|
||||||
|
|
||||||
# `web` section
|
# `web` section
|
||||||
|
|
||||||
|
@ -269,3 +275,64 @@ runtime:
|
||||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|---------------------|--------|---------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
|---------------------|--------|---------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||||
| `soft_memory_limit` | `size` | yes | maxint64 | Soft memory limit for the runtime. Zero or no value stands for no limit. If `GOMEMLIMIT` environment variable is set, the value from the configuration file will be ignored. |
|
| `soft_memory_limit` | `size` | yes | maxint64 | Soft memory limit for the runtime. Zero or no value stands for no limit. If `GOMEMLIMIT` environment variable is set, the value from the configuration file will be ignored. |
|
||||||
|
|
||||||
|
# `frostfs` section
|
||||||
|
|
||||||
|
Contains parameters of requests to FrostFS.
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
frostfs:
|
||||||
|
client_cut: false
|
||||||
|
buffer_max_size_for_put: 1048576 # 1mb
|
||||||
|
tree_pool_max_attempts: 0
|
||||||
|
```
|
||||||
|
|
||||||
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|
|---------------------------|----------|---------------|---------------|---------------------------------------------------------------------------------------------------------------------------|
|
||||||
|
| `client_cut` | `bool` | yes | `false` | This flag enables client side object preparing. |
|
||||||
|
| `buffer_max_size_for_put` | `uint64` | yes | `1048576` | Sets max buffer size for read payload in put operations. |
|
||||||
|
| `tree_pool_max_attempts` | `uint32` | no | `0` | Sets max attempt to make successful tree request. Value 0 means the number of attempts equals to number of nodes in pool. |
|
||||||
|
|
||||||
|
|
||||||
|
### `cache` section
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
cache:
|
||||||
|
buckets:
|
||||||
|
lifetime: 1m
|
||||||
|
size: 1000
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
| Parameter | Type | Default value | Description |
|
||||||
|
|-----------------|-----------------------------------|-----------------------------------|----------------------------------------------------------------------------------------|
|
||||||
|
| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 1000` | Cache which contains mapping of bucket name to bucket info. |
|
||||||
|
|
||||||
|
|
||||||
|
#### `cache` subsection
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
lifetime: 1m
|
||||||
|
size: 1000
|
||||||
|
```
|
||||||
|
|
||||||
|
| Parameter | Type | Default value | Description |
|
||||||
|
|------------|------------|------------------|-------------------------------|
|
||||||
|
| `lifetime` | `duration` | depends on cache | Lifetime of entries in cache. |
|
||||||
|
| `size` | `int` | depends on cache | LRU cache size. |
|
||||||
|
|
||||||
|
|
||||||
|
# `resolve_bucket` section
|
||||||
|
|
||||||
|
Bucket name resolving parameters from and to container ID.
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
resolve_bucket:
|
||||||
|
namespace_header: X-Frostfs-Namespace
|
||||||
|
default_namespaces: [ "", "root" ]
|
||||||
|
```
|
||||||
|
|
||||||
|
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||||
|
|----------------------|------------|---------------|-----------------------|--------------------------------------------------------------------------------------------------------------------------|
|
||||||
|
| `namespace_header` | `string` | yes | `X-Frostfs-Namespace` | Header to determine zone to resolve bucket name. |
|
||||||
|
| `default_namespaces` | `[]string` | yes | ["","root"] | Namespaces that should be handled as default. |
|
11
go.mod
11
go.mod
|
@ -3,21 +3,26 @@ module git.frostfs.info/TrueCloudLab/frostfs-http-gw
|
||||||
go 1.20
|
go 1.20
|
||||||
|
|
||||||
require (
|
require (
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230802075510-964c3edb3f44
|
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20231121085847-241a9f1ad0a4
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230802103237-363f153eafa6
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20231107114540-ab75edd70939
|
||||||
|
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
||||||
|
github.com/bluele/gcache v0.0.2
|
||||||
github.com/fasthttp/router v1.4.1
|
github.com/fasthttp/router v1.4.1
|
||||||
github.com/nspcc-dev/neo-go v0.101.2-0.20230601131642-a0117042e8fc
|
github.com/nspcc-dev/neo-go v0.101.2-0.20230601131642-a0117042e8fc
|
||||||
github.com/prometheus/client_golang v1.15.1
|
github.com/prometheus/client_golang v1.15.1
|
||||||
github.com/prometheus/client_model v0.3.0
|
github.com/prometheus/client_model v0.3.0
|
||||||
github.com/spf13/pflag v1.0.5
|
github.com/spf13/pflag v1.0.5
|
||||||
github.com/spf13/viper v1.15.0
|
github.com/spf13/viper v1.15.0
|
||||||
|
github.com/ssgreg/journald v1.0.0
|
||||||
github.com/stretchr/testify v1.8.3
|
github.com/stretchr/testify v1.8.3
|
||||||
github.com/testcontainers/testcontainers-go v0.13.0
|
github.com/testcontainers/testcontainers-go v0.13.0
|
||||||
github.com/valyala/fasthttp v1.34.0
|
github.com/valyala/fasthttp v1.34.0
|
||||||
go.opentelemetry.io/otel v1.16.0
|
go.opentelemetry.io/otel v1.16.0
|
||||||
go.opentelemetry.io/otel/trace v1.16.0
|
go.opentelemetry.io/otel/trace v1.16.0
|
||||||
go.uber.org/zap v1.24.0
|
go.uber.org/zap v1.24.0
|
||||||
|
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc
|
||||||
|
golang.org/x/net v0.10.0
|
||||||
google.golang.org/grpc v1.55.0
|
google.golang.org/grpc v1.55.0
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -99,8 +104,6 @@ require (
|
||||||
go.uber.org/atomic v1.10.0 // indirect
|
go.uber.org/atomic v1.10.0 // indirect
|
||||||
go.uber.org/multierr v1.11.0 // indirect
|
go.uber.org/multierr v1.11.0 // indirect
|
||||||
golang.org/x/crypto v0.9.0 // indirect
|
golang.org/x/crypto v0.9.0 // indirect
|
||||||
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
|
|
||||||
golang.org/x/net v0.10.0 // indirect
|
|
||||||
golang.org/x/sync v0.2.0 // indirect
|
golang.org/x/sync v0.2.0 // indirect
|
||||||
golang.org/x/sys v0.8.0 // indirect
|
golang.org/x/sys v0.8.0 // indirect
|
||||||
golang.org/x/term v0.8.0 // indirect
|
golang.org/x/term v0.8.0 // indirect
|
||||||
|
|
14
go.sum
14
go.sum
|
@ -37,22 +37,24 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
|
||||||
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
|
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
|
||||||
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
|
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
|
||||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230802075510-964c3edb3f44 h1:v6JqBD/VzZx3QSxbaXnUwnnJ1KEYheU4LzLGr3IhsAE=
|
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20231121085847-241a9f1ad0a4 h1:wjLfZ3WCt7qNGsQv+Jl0TXnmtg0uVk/jToKPFTBc/jo=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230802075510-964c3edb3f44/go.mod h1:pKJJRLOChW4zDQsAt1e8k/snWKljJtpkiPfxV53ngjI=
|
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20231121085847-241a9f1ad0a4/go.mod h1:uY0AYmCznjZdghDnAk7THFIe1Vlg531IxUcus7ZfUJI=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb h1:S/TrbOOu9qEXZRZ9/Ddw7crnxbBUQLo68PSzQWYrc9M=
|
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb h1:S/TrbOOu9qEXZRZ9/Ddw7crnxbBUQLo68PSzQWYrc9M=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb/go.mod h1:nkR5gaGeez3Zv2SE7aceP0YwxG2FzIB5cGKpQO2vV2o=
|
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb/go.mod h1:nkR5gaGeez3Zv2SE7aceP0YwxG2FzIB5cGKpQO2vV2o=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk=
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
|
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 h1:aGQ6QaAnTerQ5Dq5b2/f9DUQtSqPkZZ/bkMx/HKuLCo=
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 h1:aGQ6QaAnTerQ5Dq5b2/f9DUQtSqPkZZ/bkMx/HKuLCo=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6/go.mod h1:W8Nn08/l6aQ7UlIbpF7FsQou7TVpcRD1ZT1KG4TrFhE=
|
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6/go.mod h1:W8Nn08/l6aQ7UlIbpF7FsQou7TVpcRD1ZT1KG4TrFhE=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230802103237-363f153eafa6 h1:u6lzNotV6MEMNEG/XeS7g+FjPrrf+j4gnOHtvun2KJc=
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20231107114540-ab75edd70939 h1:jZEepi9yWmqrWgLRQcHQu4YPJaudmd7d2AEhpmM3m4U=
|
||||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230802103237-363f153eafa6/go.mod h1:LI2GOj0pEx0jYTjB3QHja2PNhQFYL2pCm71RAFwDv0M=
|
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20231107114540-ab75edd70939/go.mod h1:t1akKcUH7iBrFHX8rSXScYMP17k2kYQXMbZooiL5Juw=
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
||||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA=
|
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA=
|
||||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0/go.mod h1:okpbKfVYf/BpejtfFTfhZqFP+sZ8rsHrP8Rr/jYPNRc=
|
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0/go.mod h1:okpbKfVYf/BpejtfFTfhZqFP+sZ8rsHrP8Rr/jYPNRc=
|
||||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 h1:UFMnUIk0Zh17m8rjGHJMqku2hCgaXDqjqZzS4gsb4UA=
|
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 h1:UFMnUIk0Zh17m8rjGHJMqku2hCgaXDqjqZzS4gsb4UA=
|
||||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0/go.mod h1:dhY+oy274hV8wGvGL4MwwMpdL3GYvaX1a8GQZQHvlF8=
|
git.frostfs.info/TrueCloudLab/tzhash v1.8.0/go.mod h1:dhY+oy274hV8wGvGL4MwwMpdL3GYvaX1a8GQZQHvlF8=
|
||||||
|
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02 h1:HeY8n27VyPRQe49l/fzyVMkWEB2fsLJYKp64pwA7tz4=
|
||||||
|
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02/go.mod h1:rQFJJdEOV7KbbMtQYR2lNfiZk+ONRDJSbMCTWxKt8Fw=
|
||||||
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||||
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
|
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
|
||||||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
|
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 h1:UQHMgLO+TxOElx5B5HZ4hJQsoJ/PvUvKRhJHDQXO8P8=
|
||||||
|
@ -138,6 +140,8 @@ github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngE
|
||||||
github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
|
github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
|
||||||
github.com/blang/semver v3.1.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
|
github.com/blang/semver v3.1.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
|
||||||
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
|
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
|
||||||
|
github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw=
|
||||||
|
github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0=
|
||||||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
|
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
|
||||||
github.com/bshuster-repo/logrus-logstash-hook v0.4.1/go.mod h1:zsTqEiSzDgAa/8GZR7E1qaXrhYNDKBYy5/dWPTIflbk=
|
github.com/bshuster-repo/logrus-logstash-hook v0.4.1/go.mod h1:zsTqEiSzDgAa/8GZR7E1qaXrhYNDKBYy5/dWPTIflbk=
|
||||||
github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ=
|
github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ=
|
||||||
|
@ -871,6 +875,8 @@ github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An
|
||||||
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
|
github.com/spf13/viper v1.4.0/go.mod h1:PTJ7Z/lr49W6bUbkmS1V3by4uWynFiR9p7+dSq/yZzE=
|
||||||
github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU=
|
github.com/spf13/viper v1.15.0 h1:js3yy885G8xwJa6iOISGFwd+qlUo5AvyXb7CiihdtiU=
|
||||||
github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA=
|
github.com/spf13/viper v1.15.0/go.mod h1:fFcTBJxvhhzSJiZy8n+PeW6t8l+KeT/uTARa0jHOQLA=
|
||||||
|
github.com/ssgreg/journald v1.0.0 h1:0YmTDPJXxcWDPba12qNMdO6TxvfkFSYpFIJ31CwmLcU=
|
||||||
|
github.com/ssgreg/journald v1.0.0/go.mod h1:RUckwmTM8ghGWPslq2+ZBZzbb9/2KgjzYZ4JEP+oRt0=
|
||||||
github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8=
|
github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980/go.mod h1:AO3tvPzVZ/ayst6UlUKUv6rcPQInYe3IknH3jYhAKu8=
|
||||||
github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.0.0-20180129172003-8a3f7159479f/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
|
|
72
internal/cache/buckets.go
vendored
Normal file
72
internal/cache/buckets.go
vendored
Normal file
|
@ -0,0 +1,72 @@
|
||||||
|
package cache
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
|
"github.com/bluele/gcache"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
// BucketCache contains cache with objects and the lifetime of cache entries.
|
||||||
|
type BucketCache struct {
|
||||||
|
cache gcache.Cache
|
||||||
|
logger *zap.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
// Config stores expiration params for cache.
|
||||||
|
type Config struct {
|
||||||
|
Size int
|
||||||
|
Lifetime time.Duration
|
||||||
|
Logger *zap.Logger
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
// DefaultBucketCacheSize is a default maximum number of entries in cache.
|
||||||
|
DefaultBucketCacheSize = 1e3
|
||||||
|
// DefaultBucketCacheLifetime is a default lifetime of entries in cache.
|
||||||
|
DefaultBucketCacheLifetime = time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
|
// DefaultBucketConfig returns new default cache expiration values.
|
||||||
|
func DefaultBucketConfig(logger *zap.Logger) *Config {
|
||||||
|
return &Config{
|
||||||
|
Size: DefaultBucketCacheSize,
|
||||||
|
Lifetime: DefaultBucketCacheLifetime,
|
||||||
|
Logger: logger,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBucketCache creates an object of BucketCache.
|
||||||
|
func NewBucketCache(config *Config) *BucketCache {
|
||||||
|
gc := gcache.New(config.Size).LRU().Expiration(config.Lifetime).Build()
|
||||||
|
return &BucketCache{cache: gc, logger: config.Logger}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get returns a cached object.
|
||||||
|
func (o *BucketCache) Get(ns, bktName string) *data.BucketInfo {
|
||||||
|
entry, err := o.cache.Get(formKey(ns, bktName))
|
||||||
|
if err != nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
result, ok := entry.(*data.BucketInfo)
|
||||||
|
if !ok {
|
||||||
|
o.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
|
||||||
|
zap.String("expected", fmt.Sprintf("%T", result)))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return result
|
||||||
|
}
|
||||||
|
|
||||||
|
// Put puts an object to cache.
|
||||||
|
func (o *BucketCache) Put(bkt *data.BucketInfo) error {
|
||||||
|
return o.cache.Set(formKey(bkt.Zone, bkt.Name), bkt)
|
||||||
|
}
|
||||||
|
|
||||||
|
func formKey(ns, name string) string {
|
||||||
|
return name + "." + ns
|
||||||
|
}
|
12
internal/data/bucket.go
Normal file
12
internal/data/bucket.go
Normal file
|
@ -0,0 +1,12 @@
|
||||||
|
package data
|
||||||
|
|
||||||
|
import (
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BucketInfo struct {
|
||||||
|
Name string // container name from system attribute
|
||||||
|
Zone string // container zone from system attribute
|
||||||
|
CID cid.ID
|
||||||
|
HomomorphicHashDisabled bool
|
||||||
|
}
|
|
@ -14,8 +14,6 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -30,7 +28,7 @@ func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
||||||
var id oid.ID
|
var id oid.ID
|
||||||
err := id.DecodeString(test)
|
err := id.DecodeString(test)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.byBucketname(c, h.receiveFile)
|
h.byObjectName(c, h.receiveFile)
|
||||||
} else {
|
} else {
|
||||||
h.byAddress(c, h.receiveFile)
|
h.byAddress(c, h.receiveFile)
|
||||||
}
|
}
|
||||||
|
@ -63,16 +61,9 @@ func (h *Handler) search(ctx context.Context, cid *cid.ID, key, val string, op o
|
||||||
return h.pool.SearchObjects(ctx, prm)
|
return h.pool.SearchObjects(ctx, prm)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) getContainer(ctx context.Context, cnrID cid.ID) (container.Container, error) {
|
|
||||||
var prm pool.PrmContainerGet
|
|
||||||
prm.SetContainerID(cnrID)
|
|
||||||
|
|
||||||
return h.pool.GetContainer(ctx, prm)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *Handler) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer, error) {
|
func (h *Handler) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer, error) {
|
||||||
method := zip.Store
|
method := zip.Store
|
||||||
if h.settings.ZipCompression() {
|
if h.config.ZipCompression() {
|
||||||
method = zip.Deflate
|
method = zip.Deflate
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -91,32 +82,26 @@ func (h *Handler) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer,
|
||||||
// DownloadZipped handles zip by prefix requests.
|
// DownloadZipped handles zip by prefix requests.
|
||||||
func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
|
func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
|
||||||
scid, _ := c.UserValue("cid").(string)
|
scid, _ := c.UserValue("cid").(string)
|
||||||
prefix, _ := url.QueryUnescape(c.UserValue("prefix").(string))
|
prefix, _ := c.UserValue("prefix").(string)
|
||||||
log := h.log.With(zap.String("cid", scid), zap.String("prefix", prefix))
|
|
||||||
|
prefix, err := url.QueryUnescape(prefix)
|
||||||
|
if err != nil {
|
||||||
|
h.log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("prefix", prefix), zap.Uint64("id", c.ID()), zap.Error(err))
|
||||||
|
response.Error(c, "could not unescape prefix: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log := h.log.With(zap.String("cid", scid), zap.String("prefix", prefix), zap.Uint64("id", c.ID()))
|
||||||
|
|
||||||
ctx := utils.GetContextFromRequest(c)
|
ctx := utils.GetContextFromRequest(c)
|
||||||
|
|
||||||
containerID, err := h.getContainerID(ctx, scid)
|
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.WrongContainerID, zap.Error(err))
|
logAndSendBucketError(c, log, err)
|
||||||
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if container exists here to be able to return 404 error,
|
resSearch, err := h.search(ctx, &bktInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||||
// otherwise we get this error only in object iteration step
|
|
||||||
// and client get 200 OK.
|
|
||||||
if _, err = h.getContainer(ctx, *containerID); err != nil {
|
|
||||||
log.Error(logs.CouldNotCheckContainerExistence, zap.Error(err))
|
|
||||||
if client.IsErrContainerNotFound(err) {
|
|
||||||
response.Error(c, "Not Found", fasthttp.StatusNotFound)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
response.Error(c, "could not check container existence: "+err.Error(), fasthttp.StatusBadRequest)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
resSearch, err := h.search(ctx, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
||||||
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
@ -138,7 +123,7 @@ func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
|
||||||
empty := true
|
empty := true
|
||||||
called := false
|
called := false
|
||||||
btoken := bearerToken(ctx)
|
btoken := bearerToken(ctx)
|
||||||
addr.SetContainer(*containerID)
|
addr.SetContainer(bktInfo.CID)
|
||||||
|
|
||||||
errIter := resSearch.Iterate(func(id oid.ID) bool {
|
errIter := resSearch.Iterate(func(id oid.ID) bool {
|
||||||
called = true
|
called = true
|
||||||
|
|
|
@ -3,15 +3,21 @@ package handler
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/url"
|
"net/url"
|
||||||
"sync/atomic"
|
"strings"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
@ -21,59 +27,36 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type Config interface {
|
||||||
|
DefaultTimestamp() bool
|
||||||
|
ZipCompression() bool
|
||||||
|
ClientCut() bool
|
||||||
|
BufferMaxSizeForPut() uint64
|
||||||
|
NamespaceHeader() string
|
||||||
|
}
|
||||||
|
|
||||||
type Handler struct {
|
type Handler struct {
|
||||||
log *zap.Logger
|
log *zap.Logger
|
||||||
pool *pool.Pool
|
pool *pool.Pool
|
||||||
ownerID *user.ID
|
ownerID *user.ID
|
||||||
settings *Settings
|
config Config
|
||||||
containerResolver *resolver.ContainerResolver
|
containerResolver *resolver.ContainerResolver
|
||||||
tree *tree.Tree
|
tree *tree.Tree
|
||||||
|
cache *cache.BucketCache
|
||||||
}
|
}
|
||||||
|
|
||||||
// Settings stores reloading parameters, so it has to provide atomic getters and setters.
|
func New(params *utils.AppParams, config Config, tree *tree.Tree) *Handler {
|
||||||
type Settings struct {
|
|
||||||
defaultTimestamp atomic.Bool
|
|
||||||
zipCompression atomic.Bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Settings) DefaultTimestamp() bool {
|
|
||||||
return s.defaultTimestamp.Load()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Settings) SetDefaultTimestamp(val bool) {
|
|
||||||
s.defaultTimestamp.Store(val)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Settings) ZipCompression() bool {
|
|
||||||
return s.zipCompression.Load()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Settings) SetZipCompression(val bool) {
|
|
||||||
s.zipCompression.Store(val)
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(params *utils.AppParams, settings *Settings, tree *tree.Tree) *Handler {
|
|
||||||
return &Handler{
|
return &Handler{
|
||||||
log: params.Logger,
|
log: params.Logger,
|
||||||
pool: params.Pool,
|
pool: params.Pool,
|
||||||
ownerID: params.Owner,
|
ownerID: params.Owner,
|
||||||
settings: settings,
|
config: config,
|
||||||
containerResolver: params.Resolver,
|
containerResolver: params.Resolver,
|
||||||
tree: tree,
|
tree: tree,
|
||||||
|
cache: params.Cache,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// getContainerID decode container id, if it's not a valid container id
|
|
||||||
// then trey to resolve name using provided resolver.
|
|
||||||
func (h *Handler) getContainerID(ctx context.Context, containerID string) (*cid.ID, error) {
|
|
||||||
cnrID := new(cid.ID)
|
|
||||||
err := cnrID.DecodeString(containerID)
|
|
||||||
if err != nil {
|
|
||||||
cnrID, err = h.containerResolver.Resolve(ctx, containerID)
|
|
||||||
}
|
|
||||||
return cnrID, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||||
// prepares request and object address to it.
|
// prepares request and object address to it.
|
||||||
func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||||
|
@ -85,10 +68,9 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
|
||||||
|
|
||||||
ctx := utils.GetContextFromRequest(c)
|
ctx := utils.GetContextFromRequest(c)
|
||||||
|
|
||||||
cnrID, err := h.getContainerID(ctx, idCnr)
|
bktInfo, err := h.getBucketInfo(ctx, idCnr, log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.WrongContainerID, zap.Error(err))
|
logAndSendBucketError(c, log, err)
|
||||||
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -100,15 +82,15 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
|
||||||
}
|
}
|
||||||
|
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(*cnrID)
|
addr.SetContainer(bktInfo.CID)
|
||||||
addr.SetObject(*objID)
|
addr.SetObject(*objID)
|
||||||
|
|
||||||
f(ctx, *h.newRequest(c, log), addr)
|
f(ctx, *h.newRequest(c, log), addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// byBucketname is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
// byObjectName is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||||
// prepares request and object address to it.
|
// prepares request and object address to it.
|
||||||
func (h *Handler) byBucketname(req *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||||
var (
|
var (
|
||||||
bucketname = req.UserValue("cid").(string)
|
bucketname = req.UserValue("cid").(string)
|
||||||
key = req.UserValue("oid").(string)
|
key = req.UserValue("oid").(string)
|
||||||
|
@ -117,16 +99,20 @@ func (h *Handler) byBucketname(req *fasthttp.RequestCtx, f func(context.Context,
|
||||||
|
|
||||||
ctx := utils.GetContextFromRequest(req)
|
ctx := utils.GetContextFromRequest(req)
|
||||||
|
|
||||||
cnrID, err := h.getContainerID(ctx, bucketname)
|
bktInfo, err := h.getBucketInfo(ctx, bucketname, log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.WrongContainerID, zap.Error(err))
|
logAndSendBucketError(req, log, err)
|
||||||
response.Error(req, "wrong container id", fasthttp.StatusBadRequest)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
foundOid, err := h.tree.GetLatestVersion(ctx, cnrID, key)
|
foundOid, err := h.tree.GetLatestVersion(ctx, &bktInfo.CID, key)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.ObjectWasntFound, zap.Error(err))
|
if errors.Is(err, tree.ErrNodeAccessDenied) {
|
||||||
|
response.Error(req, "Access Denied", fasthttp.StatusForbidden)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Error(logs.GetLatestObjectVersion, zap.Error(err))
|
||||||
|
|
||||||
response.Error(req, "object wasn't found", fasthttp.StatusNotFound)
|
response.Error(req, "object wasn't found", fasthttp.StatusNotFound)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -137,7 +123,7 @@ func (h *Handler) byBucketname(req *fasthttp.RequestCtx, f func(context.Context,
|
||||||
}
|
}
|
||||||
|
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
addr.SetContainer(*cnrID)
|
addr.SetContainer(bktInfo.CID)
|
||||||
addr.SetObject(foundOid.OID)
|
addr.SetObject(foundOid.OID)
|
||||||
|
|
||||||
f(ctx, *h.newRequest(req, log), addr)
|
f(ctx, *h.newRequest(req, log), addr)
|
||||||
|
@ -145,23 +131,35 @@ func (h *Handler) byBucketname(req *fasthttp.RequestCtx, f func(context.Context,
|
||||||
|
|
||||||
// byAttribute is a wrapper similar to byAddress.
|
// byAttribute is a wrapper similar to byAddress.
|
||||||
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||||
var (
|
scid, _ := c.UserValue("cid").(string)
|
||||||
scid, _ = c.UserValue("cid").(string)
|
key, _ := c.UserValue("attr_key").(string)
|
||||||
key, _ = url.QueryUnescape(c.UserValue("attr_key").(string))
|
val, _ := c.UserValue("attr_val").(string)
|
||||||
val, _ = url.QueryUnescape(c.UserValue("attr_val").(string))
|
|
||||||
log = h.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
|
|
||||||
)
|
|
||||||
|
|
||||||
ctx := utils.GetContextFromRequest(c)
|
key, err := url.QueryUnescape(key)
|
||||||
|
|
||||||
containerID, err := h.getContainerID(ctx, scid)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.WrongContainerID, zap.Error(err))
|
h.log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("attr_key", key), zap.Uint64("id", c.ID()), zap.Error(err))
|
||||||
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
response.Error(c, "could not unescape attr_key: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
res, err := h.search(ctx, containerID, key, val, object.MatchStringEqual)
|
val, err = url.QueryUnescape(val)
|
||||||
|
if err != nil {
|
||||||
|
h.log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("attr_val", val), zap.Uint64("id", c.ID()), zap.Error(err))
|
||||||
|
response.Error(c, "could not unescape attr_val: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log := h.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
|
||||||
|
|
||||||
|
ctx := utils.GetContextFromRequest(c)
|
||||||
|
|
||||||
|
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
||||||
|
if err != nil {
|
||||||
|
logAndSendBucketError(c, log, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := h.search(ctx, &bktInfo.CID, key, val, object.MatchStringEqual)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
||||||
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
@ -186,8 +184,74 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re
|
||||||
}
|
}
|
||||||
|
|
||||||
var addrObj oid.Address
|
var addrObj oid.Address
|
||||||
addrObj.SetContainer(*containerID)
|
addrObj.SetContainer(bktInfo.CID)
|
||||||
addrObj.SetObject(buf[0])
|
addrObj.SetObject(buf[0])
|
||||||
|
|
||||||
f(ctx, *h.newRequest(c, log), addrObj)
|
f(ctx, *h.newRequest(c, log), addrObj)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// resolveContainer decode container id, if it's not a valid container id
|
||||||
|
// then trey to resolve name using provided resolver.
|
||||||
|
func (h *Handler) resolveContainer(ctx context.Context, containerID string) (*cid.ID, error) {
|
||||||
|
cnrID := new(cid.ID)
|
||||||
|
err := cnrID.DecodeString(containerID)
|
||||||
|
if err != nil {
|
||||||
|
cnrID, err = h.containerResolver.Resolve(ctx, containerID)
|
||||||
|
if err != nil && strings.Contains(err.Error(), "not found") {
|
||||||
|
err = fmt.Errorf("%w: %s", new(apistatus.ContainerNotFound), err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return cnrID, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *zap.Logger) (*data.BucketInfo, error) {
|
||||||
|
ns, err := middleware.GetNamespace(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if bktInfo := h.cache.Get(ns, containerName); bktInfo != nil {
|
||||||
|
return bktInfo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cnrID, err := h.resolveContainer(ctx, containerName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
bktInfo, err := h.readContainer(ctx, *cnrID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err = h.cache.Put(bktInfo); err != nil {
|
||||||
|
log.Warn(logs.CouldntPutBucketIntoCache,
|
||||||
|
zap.String("bucket name", bktInfo.Name),
|
||||||
|
zap.Stringer("bucket cid", bktInfo.CID),
|
||||||
|
zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
return bktInfo, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.BucketInfo, error) {
|
||||||
|
prm := pool.PrmContainerGet{ContainerID: cnrID}
|
||||||
|
res, err := h.pool.GetContainer(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("get frostfs container '%s': %w", cnrID.String(), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
bktInfo := &data.BucketInfo{
|
||||||
|
CID: cnrID,
|
||||||
|
Name: cnrID.EncodeToString(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if domain := container.ReadDomain(res); domain.Name() != "" {
|
||||||
|
bktInfo.Name = domain.Name()
|
||||||
|
bktInfo.Zone = domain.Zone()
|
||||||
|
}
|
||||||
|
|
||||||
|
bktInfo.HomomorphicHashDisabled = container.IsHomomorphicHashingDisabled(res)
|
||||||
|
|
||||||
|
return bktInfo, err
|
||||||
|
}
|
||||||
|
|
|
@ -110,7 +110,7 @@ func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
||||||
|
|
||||||
err := id.DecodeString(test)
|
err := id.DecodeString(test)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.byBucketname(c, h.headObject)
|
h.byObjectName(c, h.headObject)
|
||||||
} else {
|
} else {
|
||||||
h.byAddress(c, h.headObject)
|
h.byAddress(c, h.headObject)
|
||||||
}
|
}
|
||||||
|
|
26
internal/handler/middleware/util.go
Normal file
26
internal/handler/middleware/util.go
Normal file
|
@ -0,0 +1,26 @@
|
||||||
|
package middleware
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
)
|
||||||
|
|
||||||
|
// keyWrapper is wrapper for context keys.
|
||||||
|
type keyWrapper string
|
||||||
|
|
||||||
|
const nsKey = keyWrapper("namespace")
|
||||||
|
|
||||||
|
// GetNamespace extract namespace from context.
|
||||||
|
func GetNamespace(ctx context.Context) (string, error) {
|
||||||
|
ns, ok := ctx.Value(nsKey).(string)
|
||||||
|
if !ok {
|
||||||
|
return "", fmt.Errorf("couldn't get namespace from context")
|
||||||
|
}
|
||||||
|
|
||||||
|
return ns, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetNamespace sets namespace in the context.
|
||||||
|
func SetNamespace(ctx context.Context, ns string) context.Context {
|
||||||
|
return context.WithValue(ctx, nsKey, ns)
|
||||||
|
}
|
|
@ -57,10 +57,9 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
|
||||||
|
|
||||||
ctx := utils.GetContextFromRequest(req)
|
ctx := utils.GetContextFromRequest(req)
|
||||||
|
|
||||||
idCnr, err := h.getContainerID(ctx, scid)
|
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(logs.WrongContainerID, zap.Error(err))
|
logAndSendBucketError(req, log, err)
|
||||||
response.Error(req, "wrong container id", fasthttp.StatusBadRequest)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,7 +120,7 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
|
||||||
attributes = append(attributes, *filename)
|
attributes = append(attributes, *filename)
|
||||||
}
|
}
|
||||||
// sets Timestamp attribute if it wasn't set from header and enabled by settings
|
// sets Timestamp attribute if it wasn't set from header and enabled by settings
|
||||||
if _, ok := filtered[object.AttributeTimestamp]; !ok && h.settings.DefaultTimestamp() {
|
if _, ok := filtered[object.AttributeTimestamp]; !ok && h.config.DefaultTimestamp() {
|
||||||
timestamp := object.NewAttribute()
|
timestamp := object.NewAttribute()
|
||||||
timestamp.SetKey(object.AttributeTimestamp)
|
timestamp.SetKey(object.AttributeTimestamp)
|
||||||
timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10))
|
timestamp.SetValue(strconv.FormatInt(time.Now().Unix(), 10))
|
||||||
|
@ -129,13 +128,16 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
obj := object.New()
|
obj := object.New()
|
||||||
obj.SetContainerID(*idCnr)
|
obj.SetContainerID(bktInfo.CID)
|
||||||
obj.SetOwnerID(h.ownerID)
|
obj.SetOwnerID(h.ownerID)
|
||||||
obj.SetAttributes(attributes...)
|
obj.SetAttributes(attributes...)
|
||||||
|
|
||||||
var prm pool.PrmObjectPut
|
var prm pool.PrmObjectPut
|
||||||
prm.SetHeader(*obj)
|
prm.SetHeader(*obj)
|
||||||
prm.SetPayload(file)
|
prm.SetPayload(file)
|
||||||
|
prm.SetClientCut(h.config.ClientCut())
|
||||||
|
prm.SetBufferMaxSize(h.config.BufferMaxSizeForPut())
|
||||||
|
prm.WithoutHomomorphicHash(bktInfo.HomomorphicHashDisabled)
|
||||||
|
|
||||||
bt := h.fetchBearerToken(ctx)
|
bt := h.fetchBearerToken(ctx)
|
||||||
if bt != nil {
|
if bt != nil {
|
||||||
|
@ -148,7 +150,7 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
addr.SetObject(idObj)
|
addr.SetObject(idObj)
|
||||||
addr.SetContainer(*idCnr)
|
addr.SetContainer(bktInfo.CID)
|
||||||
|
|
||||||
// Try to return the response, otherwise, if something went wrong, throw an error.
|
// Try to return the response, otherwise, if something went wrong, throw an error.
|
||||||
if err = newPutResponse(addr).encode(req); err != nil {
|
if err = newPutResponse(addr).encode(req); err != nil {
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
@ -58,3 +59,13 @@ func isValidValue(s string) bool {
|
||||||
}
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func logAndSendBucketError(c *fasthttp.RequestCtx, log *zap.Logger, err error) {
|
||||||
|
log.Error(logs.CouldntGetBucket, zap.Error(err))
|
||||||
|
|
||||||
|
if client.IsErrContainerNotFound(err) {
|
||||||
|
response.Error(c, "Not Found", fasthttp.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
response.Error(c, "could not get bucket: "+err.Error(), fasthttp.StatusBadRequest)
|
||||||
|
}
|
||||||
|
|
|
@ -4,14 +4,12 @@ const (
|
||||||
CouldntParseCreationDate = "couldn't parse creation date" // Info in ../../downloader/*
|
CouldntParseCreationDate = "couldn't parse creation date" // Info in ../../downloader/*
|
||||||
CouldNotDetectContentTypeFromPayload = "could not detect Content-Type from payload" // Error in ../../downloader/download.go
|
CouldNotDetectContentTypeFromPayload = "could not detect Content-Type from payload" // Error in ../../downloader/download.go
|
||||||
CouldNotReceiveObject = "could not receive object" // Error in ../../downloader/download.go
|
CouldNotReceiveObject = "could not receive object" // Error in ../../downloader/download.go
|
||||||
WrongContainerID = "wrong container id" // Error in ../../downloader/download.go and uploader/upload.go
|
|
||||||
WrongObjectID = "wrong object id" // Error in ../../downloader/download.go
|
WrongObjectID = "wrong object id" // Error in ../../downloader/download.go
|
||||||
ObjectWasntFound = "object wasn't found" // Error in ../../downloader/download.go
|
GetLatestObjectVersion = "get latest object version" // Error in ../../downloader/download.go
|
||||||
ObjectWasDeleted = "object was deleted" // Error in ../../downloader/download.go
|
ObjectWasDeleted = "object was deleted" // Error in ../../downloader/download.go
|
||||||
CouldNotSearchForObjects = "could not search for objects" // Error in ../../downloader/download.go
|
CouldNotSearchForObjects = "could not search for objects" // Error in ../../downloader/download.go
|
||||||
ObjectNotFound = "object not found" // Error in ../../downloader/download.go
|
ObjectNotFound = "object not found" // Error in ../../downloader/download.go
|
||||||
ReadObjectListFailed = "read object list failed" // Error in ../../downloader/download.go
|
ReadObjectListFailed = "read object list failed" // Error in ../../downloader/download.go
|
||||||
CouldNotCheckContainerExistence = "could not check container existence" // Error in ../../downloader/download.go
|
|
||||||
FailedToAddObjectToArchive = "failed to add object to archive" // Error in ../../downloader/download.go
|
FailedToAddObjectToArchive = "failed to add object to archive" // Error in ../../downloader/download.go
|
||||||
IteratingOverSelectedObjectsFailed = "iterating over selected objects failed" // Error in ../../downloader/download.go
|
IteratingOverSelectedObjectsFailed = "iterating over selected objects failed" // Error in ../../downloader/download.go
|
||||||
ObjectsNotFound = "objects not found" // Error in ../../downloader/download.go
|
ObjectsNotFound = "objects not found" // Error in ../../downloader/download.go
|
||||||
|
@ -21,6 +19,7 @@ const (
|
||||||
ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled" // Info in ../../metrics/service.go
|
ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled" // Info in ../../metrics/service.go
|
||||||
ShuttingDownService = "shutting down service" // Info in ../../metrics/service.go
|
ShuttingDownService = "shutting down service" // Info in ../../metrics/service.go
|
||||||
CantShutDownService = "can't shut down service" // Panic in ../../metrics/service.go
|
CantShutDownService = "can't shut down service" // Panic in ../../metrics/service.go
|
||||||
|
CantGracefullyShutDownService = "can't gracefully shut down service, force stop" // Error in ../../metrics/service.go
|
||||||
IgnorePartEmptyFormName = "ignore part, empty form name" // Debug in ../../uploader/upload.go
|
IgnorePartEmptyFormName = "ignore part, empty form name" // Debug in ../../uploader/upload.go
|
||||||
IgnorePartEmptyFilename = "ignore part, empty filename" // Debug in ../../uploader/upload.go
|
IgnorePartEmptyFilename = "ignore part, empty filename" // Debug in ../../uploader/upload.go
|
||||||
CloseTemporaryMultipartFormFile = "close temporary multipart/form file" // Debug in ../../uploader/upload.go
|
CloseTemporaryMultipartFormFile = "close temporary multipart/form file" // Debug in ../../uploader/upload.go
|
||||||
|
@ -68,4 +67,14 @@ const (
|
||||||
FailedToCreateTreePool = "failed to create tree pool" // Fatal in ../../settings.go
|
FailedToCreateTreePool = "failed to create tree pool" // Fatal in ../../settings.go
|
||||||
FailedToDialTreePool = "failed to dial tree pool" // Fatal in ../../settings.go
|
FailedToDialTreePool = "failed to dial tree pool" // Fatal in ../../settings.go
|
||||||
AddedStoragePeer = "added storage peer" // Info in ../../settings.go
|
AddedStoragePeer = "added storage peer" // Info in ../../settings.go
|
||||||
|
CouldntGetBucket = "could not get bucket" // Error in ../handler/utils.go
|
||||||
|
CouldntPutBucketIntoCache = "couldn't put bucket info into cache" // Warn in ../handler/handler.go
|
||||||
|
InvalidCacheEntryType = "invalid cache entry type" // Warn in ../cache/buckets.go
|
||||||
|
InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/http-gw/settings.go
|
||||||
|
InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/http-gw/settings.go
|
||||||
|
FailedToUnescapeQuery = "failed to unescape query"
|
||||||
|
ServerReconnecting = "reconnecting server..."
|
||||||
|
ServerReconnectedSuccessfully = "server reconnected successfully"
|
||||||
|
ServerReconnectFailed = "failed to reconnect server"
|
||||||
|
WarnDuplicateAddress = "duplicate address"
|
||||||
)
|
)
|
||||||
|
|
|
@ -40,6 +40,9 @@ func (ms *Service) ShutDown(ctx context.Context) {
|
||||||
ms.log.Info(logs.ShuttingDownService, zap.String("endpoint", ms.Addr))
|
ms.log.Info(logs.ShuttingDownService, zap.String("endpoint", ms.Addr))
|
||||||
err := ms.Shutdown(ctx)
|
err := ms.Shutdown(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ms.log.Panic(logs.CantShutDownService)
|
ms.log.Error(logs.CantGracefullyShutDownService, zap.Error(err))
|
||||||
|
if err = ms.Close(); err != nil {
|
||||||
|
ms.log.Panic(logs.CantShutDownService, zap.Error(err))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ns"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ns"
|
||||||
|
@ -28,9 +29,14 @@ type FrostFS interface {
|
||||||
SystemDNS(context.Context) (string, error)
|
SystemDNS(context.Context) (string, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type Settings interface {
|
||||||
|
FormContainerZone(ns string) (zone string, isDefault bool)
|
||||||
|
}
|
||||||
|
|
||||||
type Config struct {
|
type Config struct {
|
||||||
FrostFS FrostFS
|
FrostFS FrostFS
|
||||||
RPCAddress string
|
RPCAddress string
|
||||||
|
Settings Settings
|
||||||
}
|
}
|
||||||
|
|
||||||
type ContainerResolver struct {
|
type ContainerResolver struct {
|
||||||
|
@ -135,29 +141,43 @@ func (r *ContainerResolver) equals(resolverNames []string) bool {
|
||||||
func newResolver(name string, cfg *Config) (*Resolver, error) {
|
func newResolver(name string, cfg *Config) (*Resolver, error) {
|
||||||
switch name {
|
switch name {
|
||||||
case DNSResolver:
|
case DNSResolver:
|
||||||
return NewDNSResolver(cfg.FrostFS)
|
return NewDNSResolver(cfg.FrostFS, cfg.Settings)
|
||||||
case NNSResolver:
|
case NNSResolver:
|
||||||
return NewNNSResolver(cfg.RPCAddress)
|
return NewNNSResolver(cfg.RPCAddress, cfg.Settings)
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unknown resolver: %s", name)
|
return nil, fmt.Errorf("unknown resolver: %s", name)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDNSResolver(frostFS FrostFS) (*Resolver, error) {
|
func NewDNSResolver(frostFS FrostFS, settings Settings) (*Resolver, error) {
|
||||||
if frostFS == nil {
|
if frostFS == nil {
|
||||||
return nil, fmt.Errorf("pool must not be nil for DNS resolver")
|
return nil, fmt.Errorf("pool must not be nil for DNS resolver")
|
||||||
}
|
}
|
||||||
|
if settings == nil {
|
||||||
|
return nil, fmt.Errorf("resolver settings must not be nil for DNS resolver")
|
||||||
|
}
|
||||||
|
|
||||||
var dns ns.DNS
|
var dns ns.DNS
|
||||||
|
|
||||||
resolveFunc := func(ctx context.Context, name string) (*cid.ID, error) {
|
resolveFunc := func(ctx context.Context, name string) (*cid.ID, error) {
|
||||||
domain, err := frostFS.SystemDNS(ctx)
|
var err error
|
||||||
|
|
||||||
|
namespace, err := middleware.GetNamespace(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
zone, isDefault := settings.FormContainerZone(namespace)
|
||||||
|
if isDefault {
|
||||||
|
zone, err = frostFS.SystemDNS(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("read system DNS parameter of the FrostFS: %w", err)
|
return nil, fmt.Errorf("read system DNS parameter of the FrostFS: %w", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
domain = name + "." + domain
|
domain := name + "." + zone
|
||||||
cnrID, err := dns.ResolveContainerName(domain)
|
cnrID, err := dns.ResolveContainerName(domain)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("couldn't resolve container '%s' as '%s': %w", name, domain, err)
|
return nil, fmt.Errorf("couldn't resolve container '%s' as '%s': %w", name, domain, err)
|
||||||
}
|
}
|
||||||
|
@ -170,17 +190,32 @@ func NewDNSResolver(frostFS FrostFS) (*Resolver, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewNNSResolver(rpcAddress string) (*Resolver, error) {
|
func NewNNSResolver(rpcAddress string, settings Settings) (*Resolver, error) {
|
||||||
|
if rpcAddress == "" {
|
||||||
|
return nil, fmt.Errorf("rpc address must not be empty for NNS resolver")
|
||||||
|
}
|
||||||
|
if settings == nil {
|
||||||
|
return nil, fmt.Errorf("resolver settings must not be nil for NNS resolver")
|
||||||
|
}
|
||||||
|
|
||||||
var nns ns.NNS
|
var nns ns.NNS
|
||||||
|
|
||||||
if err := nns.Dial(rpcAddress); err != nil {
|
if err := nns.Dial(rpcAddress); err != nil {
|
||||||
return nil, fmt.Errorf("could not dial nns: %w", err)
|
return nil, fmt.Errorf("could not dial nns: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
resolveFunc := func(_ context.Context, name string) (*cid.ID, error) {
|
resolveFunc := func(ctx context.Context, name string) (*cid.ID, error) {
|
||||||
var d container.Domain
|
var d container.Domain
|
||||||
d.SetName(name)
|
d.SetName(name)
|
||||||
|
|
||||||
|
namespace, err := middleware.GetNamespace(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
zone, _ := settings.FormContainerZone(namespace)
|
||||||
|
d.SetZone(zone)
|
||||||
|
|
||||||
cnrID, err := nns.ResolveContainerDomain(d)
|
cnrID, err := nns.ResolveContainerDomain(d)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("couldn't resolve container '%s': %w", name, err)
|
return nil, fmt.Errorf("couldn't resolve container '%s': %w", name, err)
|
||||||
|
|
39
tree/tree.go
39
tree/tree.go
|
@ -73,6 +73,7 @@ type Meta interface {
|
||||||
|
|
||||||
type NodeResponse interface {
|
type NodeResponse interface {
|
||||||
GetMeta() []Meta
|
GetMeta() []Meta
|
||||||
|
GetTimestamp() uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
|
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
|
||||||
|
@ -135,7 +136,7 @@ func (c *Tree) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName s
|
||||||
TreeID: versionTree,
|
TreeID: versionTree,
|
||||||
Path: path,
|
Path: path,
|
||||||
Meta: meta,
|
Meta: meta,
|
||||||
LatestOnly: true,
|
LatestOnly: false,
|
||||||
AllAttrs: false,
|
AllAttrs: false,
|
||||||
}
|
}
|
||||||
nodes, err := c.service.GetNodes(ctx, p)
|
nodes, err := c.service.GetNodes(ctx, p)
|
||||||
|
@ -143,11 +144,43 @@ func (c *Tree) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName s
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(nodes) == 0 {
|
latestNode, err := getLatestNode(nodes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return newNodeVersion(latestNode)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getLatestNode(nodes []NodeResponse) (NodeResponse, error) {
|
||||||
|
var (
|
||||||
|
maxCreationTime uint64
|
||||||
|
targetIndexNode = -1
|
||||||
|
)
|
||||||
|
|
||||||
|
for i, node := range nodes {
|
||||||
|
currentCreationTime := node.GetTimestamp()
|
||||||
|
if checkExistOID(node.GetMeta()) && currentCreationTime > maxCreationTime {
|
||||||
|
maxCreationTime = currentCreationTime
|
||||||
|
targetIndexNode = i
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if targetIndexNode == -1 {
|
||||||
return nil, layer.ErrNodeNotFound
|
return nil, layer.ErrNodeNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
return newNodeVersion(nodes[0])
|
return nodes[targetIndexNode], nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkExistOID(meta []Meta) bool {
|
||||||
|
for _, kv := range meta {
|
||||||
|
if kv.GetKey() == "OID" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// pathFromName splits name by '/'.
|
// pathFromName splits name by '/'.
|
||||||
|
|
143
tree/tree_test.go
Normal file
143
tree/tree_test.go
Normal file
|
@ -0,0 +1,143 @@
|
||||||
|
package tree
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
type nodeMeta struct {
|
||||||
|
key string
|
||||||
|
value []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m nodeMeta) GetKey() string {
|
||||||
|
return m.key
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m nodeMeta) GetValue() []byte {
|
||||||
|
return m.value
|
||||||
|
}
|
||||||
|
|
||||||
|
type nodeResponse struct {
|
||||||
|
meta []nodeMeta
|
||||||
|
timestamp uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n nodeResponse) GetTimestamp() uint64 {
|
||||||
|
return n.timestamp
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n nodeResponse) GetMeta() []Meta {
|
||||||
|
res := make([]Meta, len(n.meta))
|
||||||
|
for i, value := range n.meta {
|
||||||
|
res[i] = value
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestGetLatestNode(t *testing.T) {
|
||||||
|
for _, tc := range []struct {
|
||||||
|
name string
|
||||||
|
nodes []NodeResponse
|
||||||
|
exceptedOID string
|
||||||
|
error bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "empty",
|
||||||
|
nodes: []NodeResponse{},
|
||||||
|
error: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "one node of the object version",
|
||||||
|
nodes: []NodeResponse{
|
||||||
|
nodeResponse{
|
||||||
|
timestamp: 1,
|
||||||
|
meta: []nodeMeta{
|
||||||
|
{
|
||||||
|
key: oidKV,
|
||||||
|
value: []byte("oid1"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
exceptedOID: "oid1",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "one node of the object version and one node of the secondary object",
|
||||||
|
nodes: []NodeResponse{
|
||||||
|
nodeResponse{
|
||||||
|
timestamp: 3,
|
||||||
|
meta: []nodeMeta{},
|
||||||
|
},
|
||||||
|
nodeResponse{
|
||||||
|
timestamp: 1,
|
||||||
|
meta: []nodeMeta{
|
||||||
|
{
|
||||||
|
key: oidKV,
|
||||||
|
value: []byte("oid1"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
exceptedOID: "oid1",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "all nodes represent a secondary object",
|
||||||
|
nodes: []NodeResponse{
|
||||||
|
nodeResponse{
|
||||||
|
timestamp: 3,
|
||||||
|
meta: []nodeMeta{},
|
||||||
|
},
|
||||||
|
nodeResponse{
|
||||||
|
timestamp: 5,
|
||||||
|
meta: []nodeMeta{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
error: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "several nodes of different types and with different timestamp",
|
||||||
|
nodes: []NodeResponse{
|
||||||
|
nodeResponse{
|
||||||
|
timestamp: 1,
|
||||||
|
meta: []nodeMeta{
|
||||||
|
{
|
||||||
|
key: oidKV,
|
||||||
|
value: []byte("oid1"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
nodeResponse{
|
||||||
|
timestamp: 3,
|
||||||
|
meta: []nodeMeta{},
|
||||||
|
},
|
||||||
|
nodeResponse{
|
||||||
|
timestamp: 4,
|
||||||
|
meta: []nodeMeta{
|
||||||
|
{
|
||||||
|
key: oidKV,
|
||||||
|
value: []byte("oid2"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
nodeResponse{
|
||||||
|
timestamp: 6,
|
||||||
|
meta: []nodeMeta{},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
exceptedOID: "oid2",
|
||||||
|
},
|
||||||
|
} {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
actualNode, err := getLatestNode(tc.nodes)
|
||||||
|
if tc.error {
|
||||||
|
require.Error(t, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, tc.exceptedOID, string(actualNode.GetMeta()[0].GetValue()))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
package utils
|
package utils
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||||
|
@ -12,4 +13,5 @@ type AppParams struct {
|
||||||
Pool *pool.Pool
|
Pool *pool.Pool
|
||||||
Owner *user.ID
|
Owner *user.ID
|
||||||
Resolver *resolver.ContainerResolver
|
Resolver *resolver.ContainerResolver
|
||||||
|
Cache *cache.BucketCache
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue