Compare commits

..

3 commits

Author SHA1 Message Date
ca46dc5ec1 [#XX] Don't reload rpc_endpoint for resolvers
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-10-12 18:08:11 +03:00
eccc3f8077 [#XX] Use contract resolver from sdk
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-10-12 18:04:22 +03:00
e3b6f534cc [#XX] Support frostfsid validation
Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-10-05 17:37:34 +03:00
31 changed files with 1043 additions and 1231 deletions

View file

@ -18,6 +18,3 @@ jobs:
- name: Build binary
run: make
- name: Check dirty suffix
run: if [[ $(make version) == *"dirty"* ]]; then echo "Version has dirty suffix" && exit 1; fi

View file

@ -15,6 +15,6 @@ jobs:
go-version: '1.21'
- name: Run commit format checker
uses: https://git.frostfs.info/TrueCloudLab/dco-go@v3
uses: https://git.frostfs.info/TrueCloudLab/dco-go@v1
with:
from: 'origin/${{ github.event.pull_request.base.ref }}'
from: adb95642d

View file

@ -4,57 +4,20 @@ This document outlines major changes between releases.
## [Unreleased]
## [0.28.1] - 2024-01-24
### Added
- Tree pool traversal limit (#92)
- Add new `reconnect_interval` config param (#100)
### Update from 0.28.0
See new `frostfs.tree_pool_max_attempts` config parameter.
### Fixed
- Fix possibility of panic during SIGHUP (#99)
- Handle query unescape and invalid bearer token errors (#107)
- Fix HTTP/2 requests (#110)
### Added
- Support client side object cut (#70)
- Add `frostfs.client_cut` config param
- Add `frostfs.buffer_max_size_for_put` config param
- Add bucket/container caching
- Disable homomorphic hash for PUT if it's disabled in container itself
- Add new `logger.destination` config param (#89)
- Add support namespaces (#91)
### Changed
### Removed
## [0.28.0] - Academy of Sciences - 2023-12-07
### Fixed
- `grpc` schemas in tree configuration (#62)
- `GetSubTree` failures (#67)
- Debian packaging (#69, #90)
- Get latest version of tree node (#85)
### Added
- Support dump metrics descriptions (#29)
- Support impersonate bearer token (#40, #45)
- Tracing support (#20, #44, #60)
- Object name resolving with tree service (#30)
- Metrics for current endpoint status (#77)
- Soft memory limit with `runtime.soft_memory_limit` (#72)
- Add selection of the node of the latest version of the object (#85)
### Changed
- Update prometheus to v1.15.0 (#35)
- Update go version to 1.19 (#50)
- Finish rebranding (#2)
- Use gate key to form object owner (#66)
- Move log messages to constants (#36)
- Uploader and downloader refactor (#73)
### Removed
- Drop `tree.service` param (now endpoints from `peers` section are used) (#59)
@ -98,6 +61,4 @@ This project is a fork of [NeoFS HTTP Gateway](https://github.com/nspcc-dev/neof
To see CHANGELOG for older versions, refer to https://github.com/nspcc-dev/neofs-http-gw/blob/master/CHANGELOG.md.
[0.27.0]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/72734ab4...v0.27.0
[0.28.0]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.27.0...v0.28.0
[0.28.1]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.28.0...v0.28.1
[Unreleased]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.28.1...master
[Unreleased]: https://git.frostfs.info/TrueCloudLab/frostfs-http-gw/compare/v0.27.0...master

View file

@ -486,7 +486,7 @@ the corresponding header to the upload request. Accessing the ACL protected data
works the same way.
##### Example
In order to generate a bearer token, you need to have wallet (which will be used to sign the token) and
In order to generate a bearer token, you need to have a wallet (which will be used to sign the token) and
the address of the sender who will do the request to FrostFS (in our case, it's a gateway wallet address).
Suppose we have:

View file

@ -1 +1 @@
v0.28.1
v0.27.0

View file

@ -2,23 +2,20 @@ package main
import (
"context"
"errors"
"crypto/elliptic"
"fmt"
"net/http"
"os"
"os/signal"
"runtime/debug"
"strconv"
"strings"
"sync"
"syscall"
"time"
v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/frostfsid"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/services"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/metrics"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
@ -27,6 +24,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
@ -39,7 +37,6 @@ import (
"github.com/spf13/viper"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
type (
@ -58,10 +55,8 @@ type (
metrics *gateMetrics
services []*metrics.Service
settings *appSettings
servers []Server
unbindServers []ServerInfo
mu sync.RWMutex
frostfsid *frostfsid.FrostFSID
}
// App is an interface for the main gateway function.
@ -82,15 +77,9 @@ type (
// appSettings stores reloading parameters, so it has to provide getters and setters which use RWMutex.
appSettings struct {
reconnectInterval time.Duration
mu sync.RWMutex
defaultTimestamp bool
zipCompression bool
clientCut bool
bufferMaxSizeForPut uint64
namespaceHeader string
defaultNamespaces []string
}
)
@ -151,6 +140,7 @@ func newApp(ctx context.Context, opt ...Option) App {
a.initAppSettings()
a.initResolver()
a.initMetrics()
a.initIAM(ctx)
a.initTracing(ctx)
return a
@ -180,54 +170,30 @@ func (s *appSettings) setZipCompression(val bool) {
s.mu.Unlock()
}
func (s *appSettings) ClientCut() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.clientCut
}
func (s *appSettings) setClientCut(val bool) {
s.mu.Lock()
s.clientCut = val
s.mu.Unlock()
}
func (s *appSettings) BufferMaxSizeForPut() uint64 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.bufferMaxSizeForPut
}
func (s *appSettings) setBufferMaxSizeForPut(val uint64) {
s.mu.Lock()
s.bufferMaxSizeForPut = val
s.mu.Unlock()
}
func (a *app) initAppSettings() {
a.settings = &appSettings{
reconnectInterval: fetchReconnectInterval(a.cfg),
}
a.settings = &appSettings{}
a.updateSettings()
}
func (a *app) initResolver() {
var err error
a.resolver, err = resolver.NewContainerResolver(a.getResolverConfig())
a.resolver, err = resolver.NewContainerResolver(a.getResolverOrder(), a.getResolverConfig())
if err != nil {
a.log.Fatal(logs.FailedToCreateResolver, zap.Error(err))
}
}
func (a *app) getResolverConfig() ([]string, *resolver.Config) {
resolveCfg := &resolver.Config{
func (a *app) getResolverConfig() *resolver.Config {
return &resolver.Config{
FrostFS: resolver.NewFrostFSResolver(a.pool),
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
Settings: a.settings,
}
}
func (a *app) getResolverOrder() []string {
order := a.cfg.GetStringSlice(cfgResolveOrder)
if resolveCfg.RPCAddress == "" {
if a.cfg.GetString(cfgRPCEndpoint) == "" {
order = remove(order, resolver.NNSResolver)
a.log.Warn(logs.ResolverNNSWontBeUsedSinceRPCEndpointIsntProvided)
}
@ -236,7 +202,7 @@ func (a *app) getResolverConfig() ([]string, *resolver.Config) {
a.log.Info(logs.ContainerResolverWillBeDisabledBecauseOfResolversResolverOrderIsEmpty)
}
return order, resolveCfg
return order
}
func (a *app) initMetrics() {
@ -245,6 +211,22 @@ func (a *app) initMetrics() {
a.metrics.SetHealth(metrics.HealthStatusStarting)
}
func (a *app) initIAM(ctx context.Context) {
if !a.cfg.GetBool(cfgFrostfsIDEnabled) {
return
}
var err error
a.frostfsid, err = frostfsid.New(ctx, frostfsid.Config{
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
Contract: a.cfg.GetString(cfgFrostfsIDContract),
Key: a.key,
})
if err != nil {
a.log.Fatal("init frostfsid contract", zap.Error(err))
}
}
func newGateMetrics(logger *zap.Logger, provider *metrics.GateMetrics, enabled bool) *gateMetrics {
if !enabled {
logger.Warn(logs.MetricsAreDisabled)
@ -406,22 +388,16 @@ func (a *app) Serve() {
a.startServices()
a.initServers(a.ctx)
servs := a.getServers()
for i := range servs {
for i := range a.servers {
go func(i int) {
a.log.Info(logs.StartingServer, zap.String("address", servs[i].Address()))
if err := a.webServer.Serve(servs[i].Listener()); err != nil && err != http.ErrServerClosed {
a.metrics.MarkUnhealthy(servs[i].Address())
a.log.Info(logs.StartingServer, zap.String("address", a.servers[i].Address()))
if err := a.webServer.Serve(a.servers[i].Listener()); err != nil && err != http.ErrServerClosed {
a.metrics.MarkUnhealthy(a.servers[i].Address())
a.log.Fatal(logs.ListenAndServe, zap.Error(err))
}
}(i)
}
if len(a.unbindServers) != 0 {
a.scheduleReconnect(a.ctx, a.webServer)
}
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGHUP)
@ -471,7 +447,7 @@ func (a *app) configReload(ctx context.Context) {
a.logLevel.SetLevel(lvl)
}
if err := a.resolver.UpdateResolvers(a.getResolverConfig()); err != nil {
if err := a.resolver.UpdateResolvers(a.getResolverOrder()); err != nil {
a.log.Warn(logs.FailedToUpdateResolvers, zap.Error(err))
}
@ -496,10 +472,6 @@ func (a *app) configReload(ctx context.Context) {
func (a *app) updateSettings() {
a.settings.setDefaultTimestamp(a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp))
a.settings.setZipCompression(a.cfg.GetBool(cfgZipCompression))
a.settings.setClientCut(a.cfg.GetBool(cfgClientCut))
a.settings.setBufferMaxSizeForPut(a.cfg.GetUint64(cfgBufferMaxSizeForPut))
a.settings.setNamespaceHeader(a.cfg.GetString(cfgResolveNamespaceHeader))
a.settings.setDefaultNamespaces(a.cfg.GetStringSlice(cfgResolveDefaultNamespaces))
}
func (a *app) startServices() {
@ -533,20 +505,75 @@ func (a *app) configureRouter(handler *handler.Handler) {
response.Error(r, "Method Not Allowed", fasthttp.StatusMethodNotAllowed)
}
r.POST("/upload/{cid}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.Upload)))))
r.POST("/upload/{cid}", a.addMiddlewares(handler.Upload))
a.log.Info(logs.AddedPathUploadCid)
r.GET("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.DownloadByAddressOrBucketName)))))
r.HEAD("/get/{cid}/{oid:*}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.HeadByAddressOrBucketName)))))
r.GET("/get/{cid}/{oid:*}", a.addMiddlewares(handler.DownloadByAddressOrBucketName))
r.HEAD("/get/{cid}/{oid:*}", a.addMiddlewares(handler.HeadByAddressOrBucketName))
a.log.Info(logs.AddedPathGetCidOid)
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.DownloadByAttribute)))))
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.HeadByAttribute)))))
r.GET("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(handler.DownloadByAttribute))
r.HEAD("/get_by_attribute/{cid}/{attr_key}/{attr_val:*}", a.addMiddlewares(handler.HeadByAttribute))
a.log.Info(logs.AddedPathGetByAttributeCidAttrKeyAttrVal)
r.GET("/zip/{cid}/{prefix:*}", a.logger(a.tokenizer(a.tracer(a.reqNamespace(handler.DownloadZipped)))))
r.GET("/zip/{cid}/{prefix:*}", a.addMiddlewares(handler.DownloadZipped))
a.log.Info(logs.AddedPathZipCidPrefix)
a.webServer.Handler = r.Handler
}
func (a *app) addMiddlewares(handler fasthttp.RequestHandler) fasthttp.RequestHandler {
list := []func(fasthttp.RequestHandler) fasthttp.RequestHandler{
a.logger,
a.tokenizer,
a.tracer,
}
if a.frostfsid != nil {
list = append(list, a.iam)
}
res := handler
for i := len(list) - 1; i >= 0; i-- {
res = list[i](res)
}
return res
}
func (a *app) iam(h fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(req *fasthttp.RequestCtx) {
ctx := utils.GetContextFromRequest(req)
tkn, err := tokens.LoadBearerToken(ctx)
if err != nil || tkn == nil {
a.log.Debug(logs.AnonRequestSkipIAMValidation, zap.Uint64("id", req.ID()))
h(req)
return
}
if err = validateBearerToken(a.frostfsid, tkn); err != nil {
a.log.Error(logs.IAMValidationFailed, zap.Uint64("id", req.ID()), zap.Error(err))
response.Error(req, "iam validation failed: "+err.Error(), fasthttp.StatusForbidden)
return
}
h(req)
}
}
func validateBearerToken(frostfsID *frostfsid.FrostFSID, bt *bearer.Token) error {
m := new(acl.BearerToken)
bt.WriteToV2(m)
pk, err := keys.NewPublicKeyFromBytes(m.GetSignature().GetKey(), elliptic.P256())
if err != nil {
return fmt.Errorf("invalid bearer token public key: %w", err)
}
if err = frostfsID.ValidatePublicKey(pk); err != nil {
return fmt.Errorf("validation data user key failed: %w", err)
}
return nil
}
func (a *app) logger(h fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(req *fasthttp.RequestCtx) {
a.log.Info(logs.Request, zap.String("remote", req.RemoteAddr().String()),
@ -562,9 +589,8 @@ func (a *app) tokenizer(h fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(req *fasthttp.RequestCtx) {
appCtx, err := tokens.StoreBearerTokenAppCtx(a.ctx, req)
if err != nil {
a.log.Error(logs.CouldNotFetchAndStoreBearerToken, zap.Uint64("id", req.ID()), zap.Error(err))
a.log.Error(logs.CouldNotFetchAndStoreBearerToken, zap.Error(err))
response.Error(req, "could not fetch and store bearer token: "+err.Error(), fasthttp.StatusBadRequest)
return
}
utils.SetContextToRequest(appCtx, req)
h(req)
@ -581,20 +607,6 @@ func (a *app) tracer(h fasthttp.RequestHandler) fasthttp.RequestHandler {
span.End()
}()
appCtx = treepool.SetRequestID(appCtx, strconv.FormatUint(req.ID(), 10))
utils.SetContextToRequest(appCtx, req)
h(req)
}
}
func (a *app) reqNamespace(h fasthttp.RequestHandler) fasthttp.RequestHandler {
return func(req *fasthttp.RequestCtx) {
appCtx := utils.GetContextFromRequest(req)
nsBytes := req.Request.Header.Peek(a.settings.NamespaceHeader())
appCtx = middleware.SetNamespace(appCtx, string(nsBytes))
utils.SetContextToRequest(appCtx, req)
h(req)
}
@ -606,12 +618,11 @@ func (a *app) AppParams() *utils.AppParams {
Pool: a.pool,
Owner: a.owner,
Resolver: a.resolver,
Cache: cache.NewBucketCache(getCacheOptions(a.cfg, a.log)),
}
}
func (a *app) initServers(ctx context.Context) {
serversInfo := fetchServers(a.cfg, a.log)
serversInfo := fetchServers(a.cfg)
a.servers = make([]Server, 0, len(serversInfo))
for _, serverInfo := range serversInfo {
@ -621,7 +632,6 @@ func (a *app) initServers(ctx context.Context) {
}
srv, err := newServer(ctx, serverInfo)
if err != nil {
a.unbindServers = append(a.unbindServers, serverInfo)
a.metrics.MarkUnhealthy(serverInfo.Address)
a.log.Warn(logs.FailedToAddServer, append(fields, zap.Error(err))...)
continue
@ -638,25 +648,22 @@ func (a *app) initServers(ctx context.Context) {
}
func (a *app) updateServers() error {
serversInfo := fetchServers(a.cfg, a.log)
a.mu.Lock()
defer a.mu.Unlock()
serversInfo := fetchServers(a.cfg)
var found bool
for _, serverInfo := range serversInfo {
ser := a.getServer(serverInfo.Address)
if ser != nil {
index := a.serverIndex(serverInfo.Address)
if index == -1 {
continue
}
if serverInfo.TLS.Enabled {
if err := ser.UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
if err := a.servers[index].UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
return fmt.Errorf("failed to update tls certs: %w", err)
}
found = true
}
} else if unbind := a.updateUnbindServerInfo(serverInfo); unbind {
found = true
}
}
if !found {
return fmt.Errorf("invalid servers configuration: no known server found")
@ -665,29 +672,13 @@ func (a *app) updateServers() error {
return nil
}
func (a *app) getServers() []Server {
a.mu.RLock()
defer a.mu.RUnlock()
return a.servers
}
func (a *app) getServer(address string) Server {
func (a *app) serverIndex(address string) int {
for i := range a.servers {
if a.servers[i].Address() == address {
return a.servers[i]
return i
}
}
return nil
}
func (a *app) updateUnbindServerInfo(info ServerInfo) bool {
for i := range a.unbindServers {
if a.unbindServers[i].Address == info.Address {
a.unbindServers[i] = info
return true
}
}
return false
return -1
}
func (a *app) initTracing(ctx context.Context) {
@ -727,93 +718,3 @@ func (a *app) setRuntimeParameters() {
zap.Int64("old_value", previous))
}
}
func (s *appSettings) NamespaceHeader() string {
s.mu.RLock()
defer s.mu.RUnlock()
return s.namespaceHeader
}
func (s *appSettings) setNamespaceHeader(nsHeader string) {
s.mu.Lock()
s.namespaceHeader = nsHeader
s.mu.Unlock()
}
func (s *appSettings) FormContainerZone(ns string) (zone string, isDefault bool) {
s.mu.RLock()
namespaces := s.defaultNamespaces
s.mu.RUnlock()
if slices.Contains(namespaces, ns) {
return v2container.SysAttributeZoneDefault, true
}
return ns + ".ns", false
}
func (s *appSettings) setDefaultNamespaces(namespaces []string) {
for i := range namespaces { // to be set namespaces in env variable as `HTTP_GW_RESOLVE_BUCKET_DEFAULT_NAMESPACES="" "root"`
namespaces[i] = strings.Trim(namespaces[i], "\"")
}
s.mu.Lock()
s.defaultNamespaces = namespaces
s.mu.Unlock()
}
func (a *app) scheduleReconnect(ctx context.Context, srv *fasthttp.Server) {
go func() {
t := time.NewTicker(a.settings.reconnectInterval)
defer t.Stop()
for {
select {
case <-t.C:
if a.tryReconnect(ctx, srv) {
return
}
t.Reset(a.settings.reconnectInterval)
case <-ctx.Done():
return
}
}
}()
}
func (a *app) tryReconnect(ctx context.Context, sr *fasthttp.Server) bool {
a.mu.Lock()
defer a.mu.Unlock()
a.log.Info(logs.ServerReconnecting)
var failedServers []ServerInfo
for _, serverInfo := range a.unbindServers {
fields := []zap.Field{
zap.String("address", serverInfo.Address), zap.Bool("tls enabled", serverInfo.TLS.Enabled),
zap.String("tls cert", serverInfo.TLS.CertFile), zap.String("tls key", serverInfo.TLS.KeyFile),
}
srv, err := newServer(ctx, serverInfo)
if err != nil {
a.log.Warn(logs.ServerReconnectFailed, zap.Error(err))
failedServers = append(failedServers, serverInfo)
a.metrics.MarkUnhealthy(serverInfo.Address)
continue
}
go func() {
a.log.Info(logs.StartingServer, zap.String("address", srv.Address()))
a.metrics.MarkHealthy(serverInfo.Address)
if err = sr.Serve(srv.Listener()); err != nil && !errors.Is(err, http.ErrServerClosed) {
a.log.Warn(logs.ListenAndServe, zap.Error(err))
a.metrics.MarkUnhealthy(serverInfo.Address)
}
}()
a.servers = append(a.servers, srv)
a.log.Info(logs.ServerReconnectedSuccessfully, fields...)
}
a.unbindServers = failedServers
return len(a.unbindServers) == 0
}

View file

@ -29,7 +29,6 @@ import (
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
"go.uber.org/zap/zapcore"
)
type putResponse struct {
@ -69,7 +68,6 @@ func TestIntegration(t *testing.T) {
t.Run("simple get "+version, func(t *testing.T) { simpleGet(ctx, t, clientPool, ownerID, CID, version) })
t.Run("get by attribute "+version, func(t *testing.T) { getByAttr(ctx, t, clientPool, ownerID, CID, version) })
t.Run("get zip "+version, func(t *testing.T) { getZip(ctx, t, clientPool, ownerID, CID, version) })
t.Run("test namespaces "+version, func(t *testing.T) { checkNamespaces(ctx, t, clientPool, ownerID, CID, version) })
cancel()
server.Wait()
@ -83,7 +81,7 @@ func runServer() (App, context.CancelFunc) {
cancelCtx, cancel := context.WithCancel(context.Background())
v := getDefaultConfig()
l, lvl := newStdoutLogger(zapcore.DebugLevel)
l, lvl := newLogger(v)
application := newApp(cancelCtx, WithConfig(v), WithLogger(l, lvl))
go application.Serve()
@ -340,40 +338,6 @@ func checkZip(t *testing.T, data []byte, length int64, names, contents []string)
}
}
func checkNamespaces(ctx context.Context, t *testing.T, clientPool *pool.Pool, ownerID user.ID, CID cid.ID, version string) {
content := "content of file"
attributes := map[string]string{
"some-attr": "some-get-value",
}
id := putObject(ctx, t, clientPool, ownerID, CID, content, attributes)
req, err := http.NewRequest(http.MethodGet, testHost+"/get/"+testContainerName+"/"+id.String(), nil)
require.NoError(t, err)
req.Header.Set(defaultNamespaceHeader, "")
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
checkGetResponse(t, resp, content, attributes)
req, err = http.NewRequest(http.MethodGet, testHost+"/get/"+testContainerName+"/"+id.String(), nil)
require.NoError(t, err)
req.Header.Set(defaultNamespaceHeader, "root")
resp, err = http.DefaultClient.Do(req)
require.NoError(t, err)
checkGetResponse(t, resp, content, attributes)
req, err = http.NewRequest(http.MethodGet, testHost+"/get/"+testContainerName+"/"+id.String(), nil)
require.NoError(t, err)
req.Header.Set(defaultNamespaceHeader, "root2")
resp, err = http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusNotFound, resp.StatusCode)
}
func createDockerContainer(ctx context.Context, t *testing.T, image string) testcontainers.Container {
req := testcontainers.ContainerRequest{
Image: image,

View file

@ -9,7 +9,7 @@ import (
func main() {
globalContext, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
v := settings()
logger, atomicLevel := pickLogger(v)
logger, atomicLevel := newLogger(v)
application := newApp(globalContext, WithLogger(logger, atomicLevel), WithConfig(v))
go application.Serve()

View file

@ -68,13 +68,11 @@ func newServer(ctx context.Context, serverInfo ServerInfo) (*server, error) {
if serverInfo.TLS.Enabled {
if err = tlsProvider.UpdateCert(serverInfo.TLS.CertFile, serverInfo.TLS.KeyFile); err != nil {
lnErr := ln.Close()
return nil, fmt.Errorf("failed to update cert (listener close: %v): %w", lnErr, err)
return nil, fmt.Errorf("failed to update cert: %w", err)
}
ln = tls.NewListener(ln, &tls.Config{
GetCertificate: tlsProvider.GetCertificate,
NextProtos: []string{"h2"}, // required to enable HTTP/2 requests in `http.Serve`
})
}

View file

@ -1,119 +0,0 @@
package main
import (
"context"
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"fmt"
"math/big"
"net"
"net/http"
"os"
"path"
"testing"
"time"
"github.com/stretchr/testify/require"
"golang.org/x/net/http2"
)
const (
expHeaderKey = "Foo"
expHeaderValue = "Bar"
)
func TestHTTP2TLS(t *testing.T) {
ctx := context.Background()
certPath, keyPath := prepareTestCerts(t)
srv := &http.Server{
Handler: http.HandlerFunc(testHandler),
}
tlsListener, err := newServer(ctx, ServerInfo{
Address: ":0",
TLS: ServerTLSInfo{
Enabled: true,
CertFile: certPath,
KeyFile: keyPath,
},
})
require.NoError(t, err)
port := tlsListener.Listener().Addr().(*net.TCPAddr).Port
addr := fmt.Sprintf("https://localhost:%d", port)
go func() {
_ = srv.Serve(tlsListener.Listener())
}()
// Server is running, now send HTTP/2 request
tlsClientConfig := &tls.Config{
InsecureSkipVerify: true,
}
cliHTTP1 := http.Client{Transport: &http.Transport{TLSClientConfig: tlsClientConfig}}
cliHTTP2 := http.Client{Transport: &http2.Transport{TLSClientConfig: tlsClientConfig}}
req, err := http.NewRequest("GET", addr, nil)
require.NoError(t, err)
req.Header[expHeaderKey] = []string{expHeaderValue}
resp, err := cliHTTP1.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
resp, err = cliHTTP2.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusOK, resp.StatusCode)
}
func testHandler(resp http.ResponseWriter, req *http.Request) {
hdr, ok := req.Header[expHeaderKey]
if !ok || len(hdr) != 1 || hdr[0] != expHeaderValue {
resp.WriteHeader(http.StatusBadRequest)
} else {
resp.WriteHeader(http.StatusOK)
}
}
func prepareTestCerts(t *testing.T) (certPath, keyPath string) {
privateKey, err := rsa.GenerateKey(rand.Reader, 2048)
require.NoError(t, err)
template := x509.Certificate{
SerialNumber: big.NewInt(1),
Subject: pkix.Name{CommonName: "localhost"},
NotBefore: time.Now(),
NotAfter: time.Now().Add(time.Hour * 24 * 365),
KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageCertSign,
BasicConstraintsValid: true,
}
derBytes, err := x509.CreateCertificate(rand.Reader, &template, &template, &privateKey.PublicKey, privateKey)
require.NoError(t, err)
dir := t.TempDir()
certPath = path.Join(dir, "cert.pem")
keyPath = path.Join(dir, "key.pem")
certFile, err := os.Create(certPath)
require.NoError(t, err)
defer certFile.Close()
keyFile, err := os.Create(keyPath)
require.NoError(t, err)
defer keyFile.Close()
err = pem.Encode(certFile, &pem.Block{Type: "CERTIFICATE", Bytes: derBytes})
require.NoError(t, err)
err = pem.Encode(keyFile, &pem.Block{Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(privateKey)})
require.NoError(t, err)
return certPath, keyPath
}

View file

@ -13,28 +13,20 @@ import (
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
"git.frostfs.info/TrueCloudLab/zapjournald"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/spf13/pflag"
"github.com/spf13/viper"
"github.com/ssgreg/journald"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"google.golang.org/grpc"
)
const (
destinationStdout = "stdout"
destinationJournald = "journald"
)
const (
defaultRebalanceTimer = 60 * time.Second
defaultRequestTimeout = 15 * time.Second
@ -47,19 +39,11 @@ const (
defaultSoftMemoryLimit = math.MaxInt64
defaultBufferMaxSizeForPut = 1024 * 1024 // 1mb
defaultNamespaceHeader = "X-Frostfs-Namespace"
defaultReconnectInterval = time.Minute
cfgServer = "server"
cfgTLSEnabled = "tls.enabled"
cfgTLSCertFile = "tls.cert_file"
cfgTLSKeyFile = "tls.key_file"
cfgReconnectInterval = "reconnect_interval"
// Web.
cfgWebReadBufferSize = "web.read_buffer_size"
cfgWebWriteBufferSize = "web.write_buffer_size"
@ -88,7 +72,6 @@ const (
// Logger.
cfgLoggerLevel = "logger.level"
cfgLoggerDestination = "logger.destination"
// Wallet.
cfgWalletPassphrase = "wallet.passphrase"
@ -113,21 +96,9 @@ const (
// Runtime.
cfgSoftMemoryLimit = "runtime.soft_memory_limit"
// Enabling client side object preparing for PUT operations.
cfgClientCut = "frostfs.client_cut"
// Sets max buffer size for read payload in put operations.
cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put"
// Configuration of parameters of requests to FrostFS.
// Sets max attempt to make successful tree request.
cfgTreePoolMaxAttempts = "frostfs.tree_pool_max_attempts"
// Caching.
cfgBucketsCacheLifetime = "cache.buckets.lifetime"
cfgBucketsCacheSize = "cache.buckets.size"
// Bucket resolving options.
cfgResolveNamespaceHeader = "resolve_bucket.namespace_header"
cfgResolveDefaultNamespaces = "resolve_bucket.default_namespaces"
// FrostfsID.
cfgFrostfsIDEnabled = "frostfsid.enabled"
cfgFrostfsIDContract = "frostfsid.contract"
// Command line args.
cmdHelp = "help"
@ -186,14 +157,10 @@ func settings() *viper.Viper {
// logger:
v.SetDefault(cfgLoggerLevel, "debug")
v.SetDefault(cfgLoggerDestination, "stdout")
// pool:
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
// frostfs:
v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut)
// web-server:
v.SetDefault(cfgWebReadBufferSize, 4096)
v.SetDefault(cfgWebWriteBufferSize, 4096)
@ -212,9 +179,8 @@ func settings() *viper.Viper {
v.SetDefault(cfgPprofAddress, "localhost:8083")
v.SetDefault(cfgPrometheusAddress, "localhost:8084")
// resolve bucket
v.SetDefault(cfgResolveNamespaceHeader, defaultNamespaceHeader)
v.SetDefault(cfgResolveDefaultNamespaces, []string{"", "root"})
// frostfsid
v.SetDefault(cfgFrostfsIDContract, "frostfsid.frostfs")
// Binding flags
if err := v.BindPFlag(cfgPprofEnabled, flags.Lookup(cmdPprof)); err != nil {
@ -375,25 +341,7 @@ func mergeConfig(v *viper.Viper, fileName string) error {
return v.MergeConfig(cfgFile)
}
func pickLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) {
lvl, err := getLogLevel(v)
if err != nil {
panic(err)
}
dest := v.GetString(cfgLoggerDestination)
switch dest {
case destinationStdout:
return newStdoutLogger(lvl)
case destinationJournald:
return newJournaldLogger(lvl)
default:
panic(fmt.Sprintf("wrong destination for logger: %s", dest))
}
}
// newStdoutLogger constructs a zap.Logger instance for current application.
// newLogger constructs a zap.Logger instance for current application.
// Panics on failure.
//
// Logger is built from zap's production logging configuration with:
@ -404,7 +352,12 @@ func pickLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) {
// Logger records a stack trace for all messages at or above fatal level.
//
// See also zapcore.Level, zap.NewProductionConfig, zap.AddStacktrace.
func newStdoutLogger(lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) {
func newLogger(v *viper.Viper) (*zap.Logger, zap.AtomicLevel) {
lvl, err := getLogLevel(v)
if err != nil {
panic(err)
}
c := zap.NewProductionConfig()
c.Level = zap.NewAtomicLevelAt(lvl)
c.Encoding = "console"
@ -420,25 +373,6 @@ func newStdoutLogger(lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) {
return l, c.Level
}
func newJournaldLogger(lvl zapcore.Level) (*zap.Logger, zap.AtomicLevel) {
c := zap.NewProductionConfig()
c.EncoderConfig.EncodeTime = zapcore.ISO8601TimeEncoder
c.Level = zap.NewAtomicLevelAt(lvl)
encoder := zapjournald.NewPartialEncoder(zapcore.NewConsoleEncoder(c.EncoderConfig), zapjournald.SyslogFields)
core := zapjournald.NewCore(c.Level, encoder, &journald.Journal{}, zapjournald.SyslogFields)
coreWithContext := core.With([]zapcore.Field{
zapjournald.SyslogFacility(zapjournald.LogDaemon),
zapjournald.SyslogIdentifier(),
zapjournald.SyslogPid(),
})
l := zap.New(coreWithContext, zap.AddStacktrace(zap.NewAtomicLevelAt(zap.FatalLevel)))
return l, c.Level
}
func getLogLevel(v *viper.Viper) (zapcore.Level, error) {
var lvl zapcore.Level
lvlStr := v.GetString(cfgLoggerLevel)
@ -458,18 +392,8 @@ func getLogLevel(v *viper.Viper) (zapcore.Level, error) {
return lvl, nil
}
func fetchReconnectInterval(cfg *viper.Viper) time.Duration {
reconnect := cfg.GetDuration(cfgReconnectInterval)
if reconnect <= 0 {
reconnect = defaultReconnectInterval
}
return reconnect
}
func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
func fetchServers(v *viper.Viper) []ServerInfo {
var servers []ServerInfo
seen := make(map[string]struct{})
for i := 0; ; i++ {
key := cfgServer + "." + strconv.Itoa(i) + "."
@ -484,11 +408,6 @@ func fetchServers(v *viper.Viper, log *zap.Logger) []ServerInfo {
break
}
if _, ok := seen[serverInfo.Address]; ok {
log.Warn(logs.WarnDuplicateAddress, zap.String("address", serverInfo.Address))
continue
}
seen[serverInfo.Address] = struct{}{}
servers = append(servers, serverInfo)
}
@ -549,8 +468,6 @@ func getPools(ctx context.Context, logger *zap.Logger, cfg *viper.Viper) (*pool.
prm.SetLogger(logger)
prmTree.SetLogger(logger)
prmTree.SetMaxRequestAttempts(cfg.GetInt(cfgTreePoolMaxAttempts))
var apiGRPCDialOpts []grpc.DialOption
var treeGRPCDialOpts []grpc.DialOption
if cfg.GetBool(cfgTracingEnabled) {
@ -621,44 +538,3 @@ func fetchSoftMemoryLimit(cfg *viper.Viper) int64 {
return int64(softMemoryLimit)
}
func getCacheOptions(v *viper.Viper, l *zap.Logger) *cache.Config {
cacheCfg := cache.DefaultBucketConfig(l)
cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Lifetime)
cacheCfg.Size = fetchCacheSize(v, l, cfgBucketsCacheSize, cacheCfg.Size)
return cacheCfg
}
func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration {
if v.IsSet(cfgEntry) {
lifetime := v.GetDuration(cfgEntry)
if lifetime <= 0 {
l.Error(logs.InvalidLifetimeUsingDefaultValue,
zap.String("parameter", cfgEntry),
zap.Duration("value in config", lifetime),
zap.Duration("default", defaultValue))
} else {
return lifetime
}
}
return defaultValue
}
func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue int) int {
if v.IsSet(cfgEntry) {
size := v.GetInt(cfgEntry)
if size <= 0 {
l.Error(logs.InvalidCacheSizeUsingDefaultValue,
zap.String("parameter", cfgEntry),
zap.Int("value in config", size),
zap.Int("default", defaultValue))
} else {
return size
}
}
return defaultValue
}

View file

@ -26,9 +26,6 @@ HTTP_GW_SERVER_1_TLS_ENABLED=true
HTTP_GW_SERVER_1_TLS_CERT_FILE=/path/to/tls/cert
HTTP_GW_SERVER_1_TLS_KEY_FILE=/path/to/tls/key
# How often to reconnect to the servers
HTTP_GW_RECONNECT_INTERVAL: 1m
# Nodes configuration.
# This configuration make the gateway use the first node (grpc://s01.frostfs.devenv:8080)
# while it's healthy. Otherwise, the gateway use the second node (grpc://s01.frostfs.devenv:8080)
@ -102,22 +99,8 @@ HTTP_GW_TRACING_EXPORTER="otlp_grpc"
HTTP_GW_RUNTIME_SOFT_MEMORY_LIMIT=1073741824
# Parameters of requests to FrostFS
# This flag enables client side object preparing.
HTTP_GW_FROSTFS_CLIENT_CUT=false
# Sets max buffer size for read payload in put operations.
HTTP_GW_FROSTFS_BUFFER_MAX_SIZE_FOR_PUT=1048576
# Caching
# Cache which contains mapping of bucket name to bucket info
HTTP_GW_CACHE_BUCKETS_LIFETIME=1m
HTTP_GW_CACHE_BUCKETS_SIZE=1000
# Header to determine zone to resolve bucket name
HTTP_GW_RESOLVE_BUCKET_NAMESPACE_HEADER=X-Frostfs-Namespace
# Namespaces that should be handled as default
HTTP_GW_RESOLVE_BUCKET_DEFAULT_NAMESPACES="" "root"
# Max attempt to make successful tree request.
# default value is 0 that means the number of attempts equals to number of nodes in pool.
HTTP_GW_FROSTFS_TREE_POOL_MAX_ATTEMPTS=0
# FrostfsID contract configuration. To enable this functionality the `rpc_endpoint` param must be also set.
# Enables check that allow requests only users that is registered in FrostfsID contract.
HTTP_GW_FROSTFSID_ENABLED=false
# FrostfsID contract hash (LE) or name in NNS.
HTTP_GW_FROSTFSID_CONTRACT=frostfsid.frostfs

View file

@ -16,7 +16,6 @@ tracing:
logger:
level: debug # Log level.
destination: stdout
server:
- address: 0.0.0.0:8080
@ -55,7 +54,6 @@ peers:
priority: 2
weight: 9
reconnect_interval: 1m
web:
# Per-connection buffer size for requests' reading.
@ -107,23 +105,10 @@ zip:
runtime:
soft_memory_limit: 1gb
# Parameters of requests to FrostFS
frostfs:
# This flag enables client side object preparing.
client_cut: false
# Sets max buffer size for read payload in put operations.
buffer_max_size_for_put: 1048576
# Max attempt to make successful tree request.
# default value is 0 that means the number of attempts equals to number of nodes in pool.
tree_pool_max_attempts: 0
# FrostfsID contract configuration. To enable this functionality the `rpc_endpoint` param must be also set.
frostfsid:
# Enables check that allow requests only users that is registered in FrostfsID contract.
enabled: false
# FrostfsID contract hash (LE) or name in NNS.
contract: frostfsid.frostfs
# Caching
cache:
# Cache which contains mapping of bucket name to bucket info
buckets:
lifetime: 1m
size: 1000
resolve_bucket:
namespace_header: X-Frostfs-Namespace
default_namespaces: [ "", "root" ]

View file

@ -41,7 +41,7 @@ $ cat http.log
# Structure
| Section | Description |
|------------------|----------------------------------------------------------------|
|-----------------|-------------------------------------------------------|
| no section | [General parameters](#general-section) |
| `wallet` | [Wallet configuration](#wallet-section) |
| `peers` | [Nodes configuration](#peers-section) |
@ -54,9 +54,7 @@ $ cat http.log
| `prometheus` | [Prometheus configuration](#prometheus-section) |
| `tracing` | [Tracing configuration](#tracing-section) |
| `runtime` | [Runtime configuration](#runtime-section) |
| `frostfs` | [Frostfs configuration](#frostfs-section) |
| `cache` | [Cache configuration](#cache-section) |
| `resolve_bucket` | [Bucket name resolving configuration](#resolve_bucket-section) |
| `frostfsid` | [FrostfsID configuration](#frostfsid-section) |
# General section
@ -72,19 +70,17 @@ stream_timeout: 10s
request_timeout: 5s
rebalance_timer: 30s
pool_error_threshold: 100
reconnect_interval: 1m
```
| Parameter | Type | SIGHUP reload | Default value | Description |
|------------------------|------------|---------------|----------------|------------------------------------------------------------------------------------|
| `rpc_endpoint` | `string` | yes | | The address of the RPC host to which the gateway connects to resolve bucket names. |
| `resolve_order` | `[]string` | yes | `[nns, dns]` | Order of bucket name resolvers to use. |
| `connect_timeout` | `duration` | | `10s` | Timeout to connect to a node. |
| `stream_timeout` | `duration` | | `10s` | Timeout for individual operations in streaming RPC. |
| `request_timeout` | `duration` | | `15s` | Timeout to check node health during rebalance. |
| `rebalance_timer` | `duration` | | `60s` | Interval to check node health. |
| `pool_error_threshold` | `uint32` | | `100` | The number of errors on connection after which node is considered as unhealthy. |
| `reconnect_interval` | `duration` | no | `1m` | Listeners reconnection interval. |
|------------------------|------------|---------------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `rpc_endpoint` | `string` | no | | The address of the RPC host to which the gateway connects to resolve bucket names and interact with frostfs contracts (required to use the `nns` resolver and `frostfsid` contract). |
| `resolve_order` | `[]string` | yes | `[nns, dns]` | Order of bucket name resolvers to use. Available resolvers: `dns`, `nns`. |
| `connect_timeout` | `duration` | no | `10s` | Timeout to connect to a node. |
| `stream_timeout` | `duration` | no | `10s` | Timeout for individual operations in streaming RPC. |
| `request_timeout` | `duration` | no | `15s` | Timeout to check node health during rebalance. |
| `rebalance_timer` | `duration` | no | `60s` | Interval to check node health. |
| `pool_error_threshold` | `uint32` | no | `100` | The number of errors on connection after which node is considered as unhealthy. |
# `wallet` section
@ -163,13 +159,12 @@ server:
```yaml
logger:
level: debug
destination: stdout
```
| Parameter | Type | SIGHUP reload | Default value | Description |
|---------------|----------|---------------|---------------|----------------------------------------------------------------------------------------------------|
|-----------|----------|---------------|---------------|----------------------------------------------------------------------------------------------------|
| `level` | `string` | yes | `debug` | Logging level.<br/>Possible values: `debug`, `info`, `warn`, `error`, `dpanic`, `panic`, `fatal`. |
| `destination` | `string` | no | `stdout` | Destination for logger: `stdout` or `journald` |
# `web` section
@ -276,63 +271,17 @@ runtime:
|---------------------|--------|---------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `soft_memory_limit` | `size` | yes | maxint64 | Soft memory limit for the runtime. Zero or no value stands for no limit. If `GOMEMLIMIT` environment variable is set, the value from the configuration file will be ignored. |
# `frostfs` section
# `frostfsid` section
Contains parameters of requests to FrostFS.
FrostfsID contract configuration. To enable this functionality the `rpc_endpoint` param must be also set.
```yaml
frostfs:
client_cut: false
buffer_max_size_for_put: 1048576 # 1mb
tree_pool_max_attempts: 0
frostfsid:
enabled: false
contract: frostfsid.frostfs
```
| Parameter | Type | SIGHUP reload | Default value | Description |
|---------------------------|----------|---------------|---------------|---------------------------------------------------------------------------------------------------------------------------|
| `client_cut` | `bool` | yes | `false` | This flag enables client side object preparing. |
| `buffer_max_size_for_put` | `uint64` | yes | `1048576` | Sets max buffer size for read payload in put operations. |
| `tree_pool_max_attempts` | `uint32` | no | `0` | Sets max attempt to make successful tree request. Value 0 means the number of attempts equals to number of nodes in pool. |
### `cache` section
```yaml
cache:
buckets:
lifetime: 1m
size: 1000
```
| Parameter | Type | Default value | Description |
|-----------------|-----------------------------------|-----------------------------------|----------------------------------------------------------------------------------------|
| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 1000` | Cache which contains mapping of bucket name to bucket info. |
#### `cache` subsection
```yaml
lifetime: 1m
size: 1000
```
| Parameter | Type | Default value | Description |
|------------|------------|------------------|-------------------------------|
| `lifetime` | `duration` | depends on cache | Lifetime of entries in cache. |
| `size` | `int` | depends on cache | LRU cache size. |
# `resolve_bucket` section
Bucket name resolving parameters from and to container ID.
```yaml
resolve_bucket:
namespace_header: X-Frostfs-Namespace
default_namespaces: [ "", "root" ]
```
| Parameter | Type | SIGHUP reload | Default value | Description |
|----------------------|------------|---------------|-----------------------|--------------------------------------------------------------------------------------------------------------------------|
| `namespace_header` | `string` | yes | `X-Frostfs-Namespace` | Header to determine zone to resolve bucket name. |
| `default_namespaces` | `[]string` | yes | ["","root"] | Namespaces that should be handled as default. |
|------------|----------|---------------|-------------------|----------------------------------------------------------------------------------------|
| `enabled` | `bool` | no | false | Enables check that allow requests only users that is registered in FrostfsID contract. |
| `contract` | `string` | no | frostfsid.frostfs | FrostfsID contract hash (LE) or name in NNS. |

21
go.mod
View file

@ -3,31 +3,26 @@ module git.frostfs.info/TrueCloudLab/frostfs-http-gw
go 1.20
require (
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20231121085847-241a9f1ad0a4
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230802075510-964c3edb3f44
git.frostfs.info/TrueCloudLab/frostfs-contract v0.18.1-0.20231004065251-4194633db7bb
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20231107114540-ab75edd70939
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
github.com/bluele/gcache v0.0.2
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230802103237-363f153eafa6
github.com/fasthttp/router v1.4.1
github.com/nspcc-dev/neo-go v0.101.2-0.20230601131642-a0117042e8fc
github.com/nspcc-dev/neo-go v0.101.5-0.20230808195420-5fc61be5f6c5
github.com/prometheus/client_golang v1.15.1
github.com/prometheus/client_model v0.3.0
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.15.0
github.com/ssgreg/journald v1.0.0
github.com/stretchr/testify v1.8.3
github.com/testcontainers/testcontainers-go v0.13.0
github.com/valyala/fasthttp v1.34.0
go.opentelemetry.io/otel v1.16.0
go.opentelemetry.io/otel/trace v1.16.0
go.uber.org/zap v1.24.0
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc
golang.org/x/net v0.10.0
go.uber.org/zap v1.26.0
google.golang.org/grpc v1.55.0
)
require (
git.frostfs.info/TrueCloudLab/frostfs-contract v0.0.0-20230307110621-19a8ef2d02fb // indirect
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect
git.frostfs.info/TrueCloudLab/hrw v1.2.1 // indirect
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 // indirect
@ -42,7 +37,7 @@ require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.0.3 // indirect
github.com/containerd/containerd v1.6.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.1 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/docker/distribution v2.8.1+incompatible // indirect
@ -72,7 +67,7 @@ require (
github.com/morikuni/aec v1.0.0 // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/nspcc-dev/go-ordered-json v0.0.0-20220111165707-25110be27d22 // indirect
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20230615193820-9185820289ce // indirect
github.com/nspcc-dev/neo-go/pkg/interop v0.0.0-20230808195420-5fc61be5f6c5 // indirect
github.com/nspcc-dev/rfc6979 v0.2.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
@ -104,6 +99,8 @@ require (
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/exp v0.0.0-20230515195305-f3d0a9c9a5cc // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.2.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/term v0.8.0 // indirect

689
go.sum

File diff suppressed because it is too large Load diff

View file

@ -1,72 +0,0 @@
package cache
import (
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"github.com/bluele/gcache"
"go.uber.org/zap"
)
// BucketCache contains cache with objects and the lifetime of cache entries.
type BucketCache struct {
cache gcache.Cache
logger *zap.Logger
}
// Config stores expiration params for cache.
type Config struct {
Size int
Lifetime time.Duration
Logger *zap.Logger
}
const (
// DefaultBucketCacheSize is a default maximum number of entries in cache.
DefaultBucketCacheSize = 1e3
// DefaultBucketCacheLifetime is a default lifetime of entries in cache.
DefaultBucketCacheLifetime = time.Minute
)
// DefaultBucketConfig returns new default cache expiration values.
func DefaultBucketConfig(logger *zap.Logger) *Config {
return &Config{
Size: DefaultBucketCacheSize,
Lifetime: DefaultBucketCacheLifetime,
Logger: logger,
}
}
// NewBucketCache creates an object of BucketCache.
func NewBucketCache(config *Config) *BucketCache {
gc := gcache.New(config.Size).LRU().Expiration(config.Lifetime).Build()
return &BucketCache{cache: gc, logger: config.Logger}
}
// Get returns a cached object.
func (o *BucketCache) Get(ns, bktName string) *data.BucketInfo {
entry, err := o.cache.Get(formKey(ns, bktName))
if err != nil {
return nil
}
result, ok := entry.(*data.BucketInfo)
if !ok {
o.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
zap.String("expected", fmt.Sprintf("%T", result)))
return nil
}
return result
}
// Put puts an object to cache.
func (o *BucketCache) Put(bkt *data.BucketInfo) error {
return o.cache.Set(formKey(bkt.Zone, bkt.Name), bkt)
}
func formKey(ns, name string) string {
return name + "." + ns
}

View file

@ -1,12 +0,0 @@
package data
import (
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
)
type BucketInfo struct {
Name string // container name from system attribute
Zone string // container zone from system attribute
CID cid.ID
HomomorphicHashDisabled bool
}

View file

@ -0,0 +1,87 @@
package frostfsid
import (
"context"
"fmt"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-contract/frostfsid/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ns"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
"github.com/nspcc-dev/neo-go/pkg/rpcclient"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet"
)
type FrostFSID struct {
cli *client.Client
}
type Config struct {
// RPCAddress is an endpoint to connect to neo rpc.
RPCAddress string
// Contract is hash of contract or its name in NNS.
Contract string
// Key is used to interact with frostfsid contract.
// If this is nil than random key will be generated.
Key *keys.PrivateKey
}
// New creates new FrostfsID contract wrapper that implements auth.FrostFSID interface.
func New(ctx context.Context, cfg Config) (*FrostFSID, error) {
contractHash, err := fetchContractHash(cfg)
if err != nil {
return nil, fmt.Errorf("resolve frostfs contract hash: %w", err)
}
key := cfg.Key
if key == nil {
if key, err = keys.NewPrivateKey(); err != nil {
return nil, fmt.Errorf("generate anon private key for frostfsid: %w", err)
}
}
rpcCli, err := rpcclient.New(ctx, cfg.RPCAddress, rpcclient.Options{})
if err != nil {
return nil, fmt.Errorf("init rpc client: %w", err)
}
cli, err := client.New(rpcCli, wallet.NewAccountFromPrivateKey(key), contractHash, nil)
if err != nil {
return nil, fmt.Errorf("init frostfsid client: %w", err)
}
return &FrostFSID{
cli: cli,
}, nil
}
func (f *FrostFSID) ValidatePublicKey(key *keys.PublicKey) error {
_, err := f.cli.GetSubjectByKey(key)
return err
}
func fetchContractHash(cfg Config) (util.Uint160, error) {
if hash, err := util.Uint160DecodeStringLE(cfg.Contract); err == nil {
return hash, nil
}
splitName := strings.Split(cfg.Contract, ".")
if len(splitName) != 2 {
return util.Uint160{}, fmt.Errorf("invalid contract name: '%s'", cfg.Contract)
}
var domain container.Domain
domain.SetName(splitName[0])
domain.SetZone(splitName[1])
var nns ns.NNS
if err := nns.Dial(cfg.RPCAddress); err != nil {
return util.Uint160{}, fmt.Errorf("dial nns %s: %w", cfg.RPCAddress, err)
}
return nns.ResolveContractHash(domain)
}

View file

@ -14,6 +14,8 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -28,7 +30,7 @@ func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) {
var id oid.ID
err := id.DecodeString(test)
if err != nil {
h.byObjectName(c, h.receiveFile)
h.byBucketname(c, h.receiveFile)
} else {
h.byAddress(c, h.receiveFile)
}
@ -61,6 +63,13 @@ func (h *Handler) search(ctx context.Context, cid *cid.ID, key, val string, op o
return h.pool.SearchObjects(ctx, prm)
}
func (h *Handler) getContainer(ctx context.Context, cnrID cid.ID) (container.Container, error) {
var prm pool.PrmContainerGet
prm.SetContainerID(cnrID)
return h.pool.GetContainer(ctx, prm)
}
func (h *Handler) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer, error) {
method := zip.Store
if h.config.ZipCompression() {
@ -82,26 +91,32 @@ func (h *Handler) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer,
// DownloadZipped handles zip by prefix requests.
func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
scid, _ := c.UserValue("cid").(string)
prefix, _ := c.UserValue("prefix").(string)
prefix, err := url.QueryUnescape(prefix)
if err != nil {
h.log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("prefix", prefix), zap.Uint64("id", c.ID()), zap.Error(err))
response.Error(c, "could not unescape prefix: "+err.Error(), fasthttp.StatusBadRequest)
return
}
log := h.log.With(zap.String("cid", scid), zap.String("prefix", prefix), zap.Uint64("id", c.ID()))
prefix, _ := url.QueryUnescape(c.UserValue("prefix").(string))
log := h.log.With(zap.String("cid", scid), zap.String("prefix", prefix))
ctx := utils.GetContextFromRequest(c)
bktInfo, err := h.getBucketInfo(ctx, scid, log)
containerID, err := h.getContainerID(ctx, scid)
if err != nil {
logAndSendBucketError(c, log, err)
log.Error(logs.WrongContainerID, zap.Error(err))
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
return
}
resSearch, err := h.search(ctx, &bktInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
// check if container exists here to be able to return 404 error,
// otherwise we get this error only in object iteration step
// and client get 200 OK.
if _, err = h.getContainer(ctx, *containerID); err != nil {
log.Error(logs.CouldNotCheckContainerExistence, zap.Error(err))
if client.IsErrContainerNotFound(err) {
response.Error(c, "Not Found", fasthttp.StatusNotFound)
return
}
response.Error(c, "could not check container existence: "+err.Error(), fasthttp.StatusBadRequest)
return
}
resSearch, err := h.search(ctx, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
if err != nil {
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
@ -123,7 +138,7 @@ func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
empty := true
called := false
btoken := bearerToken(ctx)
addr.SetContainer(bktInfo.CID)
addr.SetContainer(*containerID)
errIter := resSearch.Iterate(func(id oid.ID) bool {
called = true

View file

@ -3,21 +3,14 @@ package handler
import (
"context"
"errors"
"fmt"
"io"
"net/url"
"strings"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -30,9 +23,6 @@ import (
type Config interface {
DefaultTimestamp() bool
ZipCompression() bool
ClientCut() bool
BufferMaxSizeForPut() uint64
NamespaceHeader() string
}
type Handler struct {
@ -42,7 +32,6 @@ type Handler struct {
config Config
containerResolver *resolver.ContainerResolver
tree *tree.Tree
cache *cache.BucketCache
}
func New(params *utils.AppParams, config Config, tree *tree.Tree) *Handler {
@ -53,10 +42,20 @@ func New(params *utils.AppParams, config Config, tree *tree.Tree) *Handler {
config: config,
containerResolver: params.Resolver,
tree: tree,
cache: params.Cache,
}
}
// getContainerID decode container id, if it's not a valid container id
// then trey to resolve name using provided resolver.
func (h *Handler) getContainerID(ctx context.Context, containerID string) (*cid.ID, error) {
cnrID := new(cid.ID)
err := cnrID.DecodeString(containerID)
if err != nil {
cnrID, err = h.containerResolver.Resolve(ctx, containerID)
}
return cnrID, err
}
// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
// prepares request and object address to it.
func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
@ -68,9 +67,10 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
ctx := utils.GetContextFromRequest(c)
bktInfo, err := h.getBucketInfo(ctx, idCnr, log)
cnrID, err := h.getContainerID(ctx, idCnr)
if err != nil {
logAndSendBucketError(c, log, err)
log.Error(logs.WrongContainerID, zap.Error(err))
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
return
}
@ -82,15 +82,15 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
}
var addr oid.Address
addr.SetContainer(bktInfo.CID)
addr.SetContainer(*cnrID)
addr.SetObject(*objID)
f(ctx, *h.newRequest(c, log), addr)
}
// byObjectName is a wrapper for function (e.g. request.headObject, request.receiveFile) that
// byBucketname is a wrapper for function (e.g. request.headObject, request.receiveFile) that
// prepares request and object address to it.
func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
func (h *Handler) byBucketname(req *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
var (
bucketname = req.UserValue("cid").(string)
key = req.UserValue("oid").(string)
@ -99,20 +99,16 @@ func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context,
ctx := utils.GetContextFromRequest(req)
bktInfo, err := h.getBucketInfo(ctx, bucketname, log)
cnrID, err := h.getContainerID(ctx, bucketname)
if err != nil {
logAndSendBucketError(req, log, err)
log.Error(logs.WrongContainerID, zap.Error(err))
response.Error(req, "wrong container id", fasthttp.StatusBadRequest)
return
}
foundOid, err := h.tree.GetLatestVersion(ctx, &bktInfo.CID, key)
foundOid, err := h.tree.GetLatestVersion(ctx, cnrID, key)
if err != nil {
if errors.Is(err, tree.ErrNodeAccessDenied) {
response.Error(req, "Access Denied", fasthttp.StatusForbidden)
return
}
log.Error(logs.GetLatestObjectVersion, zap.Error(err))
log.Error(logs.ObjectWasntFound, zap.Error(err))
response.Error(req, "object wasn't found", fasthttp.StatusNotFound)
return
}
@ -123,7 +119,7 @@ func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context,
}
var addr oid.Address
addr.SetContainer(bktInfo.CID)
addr.SetContainer(*cnrID)
addr.SetObject(foundOid.OID)
f(ctx, *h.newRequest(req, log), addr)
@ -131,35 +127,23 @@ func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context,
// byAttribute is a wrapper similar to byAddress.
func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
scid, _ := c.UserValue("cid").(string)
key, _ := c.UserValue("attr_key").(string)
val, _ := c.UserValue("attr_val").(string)
key, err := url.QueryUnescape(key)
if err != nil {
h.log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("attr_key", key), zap.Uint64("id", c.ID()), zap.Error(err))
response.Error(c, "could not unescape attr_key: "+err.Error(), fasthttp.StatusBadRequest)
return
}
val, err = url.QueryUnescape(val)
if err != nil {
h.log.Error(logs.FailedToUnescapeQuery, zap.String("cid", scid), zap.String("attr_val", val), zap.Uint64("id", c.ID()), zap.Error(err))
response.Error(c, "could not unescape attr_val: "+err.Error(), fasthttp.StatusBadRequest)
return
}
log := h.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
var (
scid, _ = c.UserValue("cid").(string)
key, _ = url.QueryUnescape(c.UserValue("attr_key").(string))
val, _ = url.QueryUnescape(c.UserValue("attr_val").(string))
log = h.log.With(zap.String("cid", scid), zap.String("attr_key", key), zap.String("attr_val", val))
)
ctx := utils.GetContextFromRequest(c)
bktInfo, err := h.getBucketInfo(ctx, scid, log)
containerID, err := h.getContainerID(ctx, scid)
if err != nil {
logAndSendBucketError(c, log, err)
log.Error(logs.WrongContainerID, zap.Error(err))
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
return
}
res, err := h.search(ctx, &bktInfo.CID, key, val, object.MatchStringEqual)
res, err := h.search(ctx, containerID, key, val, object.MatchStringEqual)
if err != nil {
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
@ -184,74 +168,8 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re
}
var addrObj oid.Address
addrObj.SetContainer(bktInfo.CID)
addrObj.SetContainer(*containerID)
addrObj.SetObject(buf[0])
f(ctx, *h.newRequest(c, log), addrObj)
}
// resolveContainer decode container id, if it's not a valid container id
// then trey to resolve name using provided resolver.
func (h *Handler) resolveContainer(ctx context.Context, containerID string) (*cid.ID, error) {
cnrID := new(cid.ID)
err := cnrID.DecodeString(containerID)
if err != nil {
cnrID, err = h.containerResolver.Resolve(ctx, containerID)
if err != nil && strings.Contains(err.Error(), "not found") {
err = fmt.Errorf("%w: %s", new(apistatus.ContainerNotFound), err.Error())
}
}
return cnrID, err
}
func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *zap.Logger) (*data.BucketInfo, error) {
ns, err := middleware.GetNamespace(ctx)
if err != nil {
return nil, err
}
if bktInfo := h.cache.Get(ns, containerName); bktInfo != nil {
return bktInfo, nil
}
cnrID, err := h.resolveContainer(ctx, containerName)
if err != nil {
return nil, err
}
bktInfo, err := h.readContainer(ctx, *cnrID)
if err != nil {
return nil, err
}
if err = h.cache.Put(bktInfo); err != nil {
log.Warn(logs.CouldntPutBucketIntoCache,
zap.String("bucket name", bktInfo.Name),
zap.Stringer("bucket cid", bktInfo.CID),
zap.Error(err))
}
return bktInfo, nil
}
func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.BucketInfo, error) {
prm := pool.PrmContainerGet{ContainerID: cnrID}
res, err := h.pool.GetContainer(ctx, prm)
if err != nil {
return nil, fmt.Errorf("get frostfs container '%s': %w", cnrID.String(), err)
}
bktInfo := &data.BucketInfo{
CID: cnrID,
Name: cnrID.EncodeToString(),
}
if domain := container.ReadDomain(res); domain.Name() != "" {
bktInfo.Name = domain.Name()
bktInfo.Zone = domain.Zone()
}
bktInfo.HomomorphicHashDisabled = container.IsHomomorphicHashingDisabled(res)
return bktInfo, err
}

View file

@ -110,7 +110,7 @@ func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
err := id.DecodeString(test)
if err != nil {
h.byObjectName(c, h.headObject)
h.byBucketname(c, h.headObject)
} else {
h.byAddress(c, h.headObject)
}

View file

@ -1,26 +0,0 @@
package middleware
import (
"context"
"fmt"
)
// keyWrapper is wrapper for context keys.
type keyWrapper string
const nsKey = keyWrapper("namespace")
// GetNamespace extract namespace from context.
func GetNamespace(ctx context.Context) (string, error) {
ns, ok := ctx.Value(nsKey).(string)
if !ok {
return "", fmt.Errorf("couldn't get namespace from context")
}
return ns, nil
}
// SetNamespace sets namespace in the context.
func SetNamespace(ctx context.Context, ns string) context.Context {
return context.WithValue(ctx, nsKey, ns)
}

View file

@ -57,9 +57,10 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
ctx := utils.GetContextFromRequest(req)
bktInfo, err := h.getBucketInfo(ctx, scid, log)
idCnr, err := h.getContainerID(ctx, scid)
if err != nil {
logAndSendBucketError(req, log, err)
log.Error(logs.WrongContainerID, zap.Error(err))
response.Error(req, "wrong container id", fasthttp.StatusBadRequest)
return
}
@ -128,16 +129,13 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
}
obj := object.New()
obj.SetContainerID(bktInfo.CID)
obj.SetContainerID(*idCnr)
obj.SetOwnerID(h.ownerID)
obj.SetAttributes(attributes...)
var prm pool.PrmObjectPut
prm.SetHeader(*obj)
prm.SetPayload(file)
prm.SetClientCut(h.config.ClientCut())
prm.SetBufferMaxSize(h.config.BufferMaxSizeForPut())
prm.WithoutHomomorphicHash(bktInfo.HomomorphicHashDisabled)
bt := h.fetchBearerToken(ctx)
if bt != nil {
@ -150,7 +148,7 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
}
addr.SetObject(idObj)
addr.SetContainer(bktInfo.CID)
addr.SetContainer(*idCnr)
// Try to return the response, otherwise, if something went wrong, throw an error.
if err = newPutResponse(addr).encode(req); err != nil {

View file

@ -9,7 +9,6 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
"github.com/valyala/fasthttp"
"go.uber.org/zap"
)
@ -59,13 +58,3 @@ func isValidValue(s string) bool {
}
return true
}
func logAndSendBucketError(c *fasthttp.RequestCtx, log *zap.Logger, err error) {
log.Error(logs.CouldntGetBucket, zap.Error(err))
if client.IsErrContainerNotFound(err) {
response.Error(c, "Not Found", fasthttp.StatusNotFound)
return
}
response.Error(c, "could not get bucket: "+err.Error(), fasthttp.StatusBadRequest)
}

View file

@ -4,12 +4,14 @@ const (
CouldntParseCreationDate = "couldn't parse creation date" // Info in ../../downloader/*
CouldNotDetectContentTypeFromPayload = "could not detect Content-Type from payload" // Error in ../../downloader/download.go
CouldNotReceiveObject = "could not receive object" // Error in ../../downloader/download.go
WrongContainerID = "wrong container id" // Error in ../../downloader/download.go and uploader/upload.go
WrongObjectID = "wrong object id" // Error in ../../downloader/download.go
GetLatestObjectVersion = "get latest object version" // Error in ../../downloader/download.go
ObjectWasntFound = "object wasn't found" // Error in ../../downloader/download.go
ObjectWasDeleted = "object was deleted" // Error in ../../downloader/download.go
CouldNotSearchForObjects = "could not search for objects" // Error in ../../downloader/download.go
ObjectNotFound = "object not found" // Error in ../../downloader/download.go
ReadObjectListFailed = "read object list failed" // Error in ../../downloader/download.go
CouldNotCheckContainerExistence = "could not check container existence" // Error in ../../downloader/download.go
FailedToAddObjectToArchive = "failed to add object to archive" // Error in ../../downloader/download.go
IteratingOverSelectedObjectsFailed = "iterating over selected objects failed" // Error in ../../downloader/download.go
ObjectsNotFound = "objects not found" // Error in ../../downloader/download.go
@ -19,7 +21,6 @@ const (
ServiceHasntStartedSinceItsDisabled = "service hasn't started since it's disabled" // Info in ../../metrics/service.go
ShuttingDownService = "shutting down service" // Info in ../../metrics/service.go
CantShutDownService = "can't shut down service" // Panic in ../../metrics/service.go
CantGracefullyShutDownService = "can't gracefully shut down service, force stop" // Error in ../../metrics/service.go
IgnorePartEmptyFormName = "ignore part, empty form name" // Debug in ../../uploader/upload.go
IgnorePartEmptyFilename = "ignore part, empty filename" // Debug in ../../uploader/upload.go
CloseTemporaryMultipartFormFile = "close temporary multipart/form file" // Debug in ../../uploader/upload.go
@ -67,14 +68,6 @@ const (
FailedToCreateTreePool = "failed to create tree pool" // Fatal in ../../settings.go
FailedToDialTreePool = "failed to dial tree pool" // Fatal in ../../settings.go
AddedStoragePeer = "added storage peer" // Info in ../../settings.go
CouldntGetBucket = "could not get bucket" // Error in ../handler/utils.go
CouldntPutBucketIntoCache = "couldn't put bucket info into cache" // Warn in ../handler/handler.go
InvalidCacheEntryType = "invalid cache entry type" // Warn in ../cache/buckets.go
InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/http-gw/settings.go
InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/http-gw/settings.go
FailedToUnescapeQuery = "failed to unescape query"
ServerReconnecting = "reconnecting server..."
ServerReconnectedSuccessfully = "server reconnected successfully"
ServerReconnectFailed = "failed to reconnect server"
WarnDuplicateAddress = "duplicate address"
AnonRequestSkipIAMValidation = "anon request, skip IAM validation" // Debug in ../../app.go
IAMValidationFailed = "IAM validation failed" // Error in ../../app.go
)

View file

@ -40,9 +40,6 @@ func (ms *Service) ShutDown(ctx context.Context) {
ms.log.Info(logs.ShuttingDownService, zap.String("endpoint", ms.Addr))
err := ms.Shutdown(ctx)
if err != nil {
ms.log.Error(logs.CantGracefullyShutDownService, zap.Error(err))
if err = ms.Close(); err != nil {
ms.log.Panic(logs.CantShutDownService, zap.Error(err))
}
ms.log.Panic(logs.CantShutDownService)
}
}

View file

@ -6,7 +6,6 @@ import (
"fmt"
"sync"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler/middleware"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ns"
@ -29,17 +28,15 @@ type FrostFS interface {
SystemDNS(context.Context) (string, error)
}
type Settings interface {
FormContainerZone(ns string) (zone string, isDefault bool)
}
type Config struct {
FrostFS FrostFS
RPCAddress string
Settings Settings
}
type ContainerResolver struct {
rpcAddress string
frostfs FrostFS
mu sync.RWMutex
resolvers []*Resolver
}
@ -64,6 +61,8 @@ func NewContainerResolver(resolverNames []string, cfg *Config) (*ContainerResolv
}
return &ContainerResolver{
rpcAddress: cfg.RPCAddress,
frostfs: cfg.FrostFS,
resolvers: resolvers,
}, nil
}
@ -107,7 +106,7 @@ func (r *ContainerResolver) Resolve(ctx context.Context, cnrName string) (*cid.I
return nil, ErrNoResolvers
}
func (r *ContainerResolver) UpdateResolvers(resolverNames []string, cfg *Config) error {
func (r *ContainerResolver) UpdateResolvers(resolverNames []string) error {
r.mu.Lock()
defer r.mu.Unlock()
@ -115,7 +114,7 @@ func (r *ContainerResolver) UpdateResolvers(resolverNames []string, cfg *Config)
return nil
}
resolvers, err := createResolvers(resolverNames, cfg)
resolvers, err := createResolvers(resolverNames, &Config{FrostFS: r.frostfs, RPCAddress: r.rpcAddress})
if err != nil {
return err
}
@ -141,43 +140,29 @@ func (r *ContainerResolver) equals(resolverNames []string) bool {
func newResolver(name string, cfg *Config) (*Resolver, error) {
switch name {
case DNSResolver:
return NewDNSResolver(cfg.FrostFS, cfg.Settings)
return NewDNSResolver(cfg.FrostFS)
case NNSResolver:
return NewNNSResolver(cfg.RPCAddress, cfg.Settings)
return NewNNSResolver(cfg.RPCAddress)
default:
return nil, fmt.Errorf("unknown resolver: %s", name)
}
}
func NewDNSResolver(frostFS FrostFS, settings Settings) (*Resolver, error) {
func NewDNSResolver(frostFS FrostFS) (*Resolver, error) {
if frostFS == nil {
return nil, fmt.Errorf("pool must not be nil for DNS resolver")
}
if settings == nil {
return nil, fmt.Errorf("resolver settings must not be nil for DNS resolver")
}
var dns ns.DNS
resolveFunc := func(ctx context.Context, name string) (*cid.ID, error) {
var err error
namespace, err := middleware.GetNamespace(ctx)
if err != nil {
return nil, err
}
zone, isDefault := settings.FormContainerZone(namespace)
if isDefault {
zone, err = frostFS.SystemDNS(ctx)
domain, err := frostFS.SystemDNS(ctx)
if err != nil {
return nil, fmt.Errorf("read system DNS parameter of the FrostFS: %w", err)
}
}
domain := name + "." + zone
domain = name + "." + domain
cnrID, err := dns.ResolveContainerName(domain)
if err != nil {
return nil, fmt.Errorf("couldn't resolve container '%s' as '%s': %w", name, domain, err)
}
@ -190,32 +175,17 @@ func NewDNSResolver(frostFS FrostFS, settings Settings) (*Resolver, error) {
}, nil
}
func NewNNSResolver(rpcAddress string, settings Settings) (*Resolver, error) {
if rpcAddress == "" {
return nil, fmt.Errorf("rpc address must not be empty for NNS resolver")
}
if settings == nil {
return nil, fmt.Errorf("resolver settings must not be nil for NNS resolver")
}
func NewNNSResolver(rpcAddress string) (*Resolver, error) {
var nns ns.NNS
if err := nns.Dial(rpcAddress); err != nil {
return nil, fmt.Errorf("could not dial nns: %w", err)
}
resolveFunc := func(ctx context.Context, name string) (*cid.ID, error) {
resolveFunc := func(_ context.Context, name string) (*cid.ID, error) {
var d container.Domain
d.SetName(name)
namespace, err := middleware.GetNamespace(ctx)
if err != nil {
return nil, err
}
zone, _ := settings.FormContainerZone(namespace)
d.SetZone(zone)
cnrID, err := nns.ResolveContainerDomain(d)
if err != nil {
return nil, fmt.Errorf("couldn't resolve container '%s': %w", name, err)

View file

@ -73,7 +73,6 @@ type Meta interface {
type NodeResponse interface {
GetMeta() []Meta
GetTimestamp() uint64
}
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
@ -136,7 +135,7 @@ func (c *Tree) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName s
TreeID: versionTree,
Path: path,
Meta: meta,
LatestOnly: false,
LatestOnly: true,
AllAttrs: false,
}
nodes, err := c.service.GetNodes(ctx, p)
@ -144,43 +143,11 @@ func (c *Tree) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName s
return nil, err
}
latestNode, err := getLatestNode(nodes)
if err != nil {
return nil, err
}
return newNodeVersion(latestNode)
}
func getLatestNode(nodes []NodeResponse) (NodeResponse, error) {
var (
maxCreationTime uint64
targetIndexNode = -1
)
for i, node := range nodes {
currentCreationTime := node.GetTimestamp()
if checkExistOID(node.GetMeta()) && currentCreationTime > maxCreationTime {
maxCreationTime = currentCreationTime
targetIndexNode = i
}
}
if targetIndexNode == -1 {
if len(nodes) == 0 {
return nil, layer.ErrNodeNotFound
}
return nodes[targetIndexNode], nil
}
func checkExistOID(meta []Meta) bool {
for _, kv := range meta {
if kv.GetKey() == "OID" {
return true
}
}
return false
return newNodeVersion(nodes[0])
}
// pathFromName splits name by '/'.

View file

@ -1,143 +0,0 @@
package tree
import (
"testing"
"github.com/stretchr/testify/require"
)
type nodeMeta struct {
key string
value []byte
}
func (m nodeMeta) GetKey() string {
return m.key
}
func (m nodeMeta) GetValue() []byte {
return m.value
}
type nodeResponse struct {
meta []nodeMeta
timestamp uint64
}
func (n nodeResponse) GetTimestamp() uint64 {
return n.timestamp
}
func (n nodeResponse) GetMeta() []Meta {
res := make([]Meta, len(n.meta))
for i, value := range n.meta {
res[i] = value
}
return res
}
func TestGetLatestNode(t *testing.T) {
for _, tc := range []struct {
name string
nodes []NodeResponse
exceptedOID string
error bool
}{
{
name: "empty",
nodes: []NodeResponse{},
error: true,
},
{
name: "one node of the object version",
nodes: []NodeResponse{
nodeResponse{
timestamp: 1,
meta: []nodeMeta{
{
key: oidKV,
value: []byte("oid1"),
},
},
},
},
exceptedOID: "oid1",
},
{
name: "one node of the object version and one node of the secondary object",
nodes: []NodeResponse{
nodeResponse{
timestamp: 3,
meta: []nodeMeta{},
},
nodeResponse{
timestamp: 1,
meta: []nodeMeta{
{
key: oidKV,
value: []byte("oid1"),
},
},
},
},
exceptedOID: "oid1",
},
{
name: "all nodes represent a secondary object",
nodes: []NodeResponse{
nodeResponse{
timestamp: 3,
meta: []nodeMeta{},
},
nodeResponse{
timestamp: 5,
meta: []nodeMeta{},
},
},
error: true,
},
{
name: "several nodes of different types and with different timestamp",
nodes: []NodeResponse{
nodeResponse{
timestamp: 1,
meta: []nodeMeta{
{
key: oidKV,
value: []byte("oid1"),
},
},
},
nodeResponse{
timestamp: 3,
meta: []nodeMeta{},
},
nodeResponse{
timestamp: 4,
meta: []nodeMeta{
{
key: oidKV,
value: []byte("oid2"),
},
},
},
nodeResponse{
timestamp: 6,
meta: []nodeMeta{},
},
},
exceptedOID: "oid2",
},
} {
t.Run(tc.name, func(t *testing.T) {
actualNode, err := getLatestNode(tc.nodes)
if tc.error {
require.Error(t, err)
return
}
require.NoError(t, err)
require.Equal(t, tc.exceptedOID, string(actualNode.GetMeta()[0].GetValue()))
})
}
}

View file

@ -1,7 +1,6 @@
package utils
import (
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
@ -13,5 +12,4 @@ type AppParams struct {
Pool *pool.Pool
Owner *user.ID
Resolver *resolver.ContainerResolver
Cache *cache.BucketCache
}