feature/client-cut #70
17 changed files with 374 additions and 71 deletions
|
@ -12,6 +12,11 @@ This document outlines major changes between releases.
|
|||
- Support impersonate bearer token (#40, #45)
|
||||
- Tracing support (#20, #44, #60)
|
||||
- Object name resolving with tree service (#30)
|
||||
- Support client side object cut (#70)
|
||||
- Add `frostfs.client_cut` config param
|
||||
- Add `frostfs.buffer_max_size_for_put` config param
|
||||
- Add bucket/container caching
|
||||
- Disable homomorphic hash for PUT if it's disabled in container itself
|
||||
|
||||
### Changed
|
||||
- Update prometheus to v1.15.0 (#35)
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"syscall"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/frostfs/services"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/handler"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
|
@ -72,9 +73,11 @@ type (
|
|||
|
||||
// appSettings stores reloading parameters, so it has to provide getters and setters which use RWMutex.
|
||||
appSettings struct {
|
||||
mu sync.RWMutex
|
||||
defaultTimestamp bool
|
||||
zipCompression bool
|
||||
mu sync.RWMutex
|
||||
defaultTimestamp bool
|
||||
zipCompression bool
|
||||
clientCut bool
|
||||
bufferMaxSizeForPut uint64
|
||||
}
|
||||
)
|
||||
|
||||
|
@ -164,6 +167,30 @@ func (s *appSettings) setZipCompression(val bool) {
|
|||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *appSettings) ClientCut() bool {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.clientCut
|
||||
}
|
||||
|
||||
func (s *appSettings) setClientCut(val bool) {
|
||||
s.mu.Lock()
|
||||
s.clientCut = val
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *appSettings) BufferMaxSizeForPut() uint64 {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.bufferMaxSizeForPut
|
||||
}
|
||||
|
||||
func (s *appSettings) setBufferMaxSizeForPut(val uint64) {
|
||||
s.mu.Lock()
|
||||
s.bufferMaxSizeForPut = val
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (a *app) initAppSettings() {
|
||||
a.settings = &appSettings{}
|
||||
|
||||
|
@ -448,6 +475,8 @@ func (a *app) configReload(ctx context.Context) {
|
|||
func (a *app) updateSettings() {
|
||||
a.settings.setDefaultTimestamp(a.cfg.GetBool(cfgUploaderHeaderEnableDefaultTimestamp))
|
||||
a.settings.setZipCompression(a.cfg.GetBool(cfgZipCompression))
|
||||
a.settings.setClientCut(a.cfg.GetBool(cfgClientCut))
|
||||
a.settings.setBufferMaxSizeForPut(a.cfg.GetUint64(cfgBufferMaxSizeForPut))
|
||||
}
|
||||
|
||||
func (a *app) startServices() {
|
||||
|
@ -539,6 +568,7 @@ func (a *app) AppParams() *utils.AppParams {
|
|||
Pool: a.pool,
|
||||
Owner: a.owner,
|
||||
Resolver: a.resolver,
|
||||
Cache: cache.NewBucketCache(getCacheOptions(a.cfg, a.log)),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||
grpctracing "git.frostfs.info/TrueCloudLab/frostfs-observability/tracing/grpc"
|
||||
|
@ -39,6 +40,8 @@ const (
|
|||
|
||||
defaultSoftMemoryLimit = math.MaxInt64
|
||||
|
||||
defaultBufferMaxSizeForPut = 1024 * 1024 // 1mb
|
||||
|
||||
cfgServer = "server"
|
||||
cfgTLSEnabled = "tls.enabled"
|
||||
cfgTLSCertFile = "tls.cert_file"
|
||||
|
@ -96,6 +99,15 @@ const (
|
|||
// Runtime.
|
||||
cfgSoftMemoryLimit = "runtime.soft_memory_limit"
|
||||
|
||||
// Enabling client side object preparing for PUT operations.
|
||||
cfgClientCut = "frostfs.client_cut"
|
||||
// Sets max buffer size for read payload in put operations.
|
||||
cfgBufferMaxSizeForPut = "frostfs.buffer_max_size_for_put"
|
||||
|
||||
// Caching.
|
||||
cfgBucketsCacheLifetime = "cache.buckets.lifetime"
|
||||
cfgBucketsCacheSize = "cache.buckets.size"
|
||||
|
||||
// Command line args.
|
||||
cmdHelp = "help"
|
||||
cmdVersion = "version"
|
||||
|
@ -157,6 +169,9 @@ func settings() *viper.Viper {
|
|||
// pool:
|
||||
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
|
||||
|
||||
// frostfs:
|
||||
v.SetDefault(cfgBufferMaxSizeForPut, defaultBufferMaxSizeForPut)
|
||||
|
||||
// web-server:
|
||||
v.SetDefault(cfgWebReadBufferSize, 4096)
|
||||
v.SetDefault(cfgWebWriteBufferSize, 4096)
|
||||
|
@ -531,3 +546,44 @@ func fetchSoftMemoryLimit(cfg *viper.Viper) int64 {
|
|||
|
||||
return int64(softMemoryLimit)
|
||||
}
|
||||
|
||||
func getCacheOptions(v *viper.Viper, l *zap.Logger) *cache.Config {
|
||||
cacheCfg := cache.DefaultBucketConfig(l)
|
||||
|
||||
cacheCfg.Lifetime = fetchCacheLifetime(v, l, cfgBucketsCacheLifetime, cacheCfg.Lifetime)
|
||||
cacheCfg.Size = fetchCacheSize(v, l, cfgBucketsCacheSize, cacheCfg.Size)
|
||||
|
||||
return cacheCfg
|
||||
}
|
||||
|
||||
func fetchCacheLifetime(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue time.Duration) time.Duration {
|
||||
if v.IsSet(cfgEntry) {
|
||||
lifetime := v.GetDuration(cfgEntry)
|
||||
if lifetime <= 0 {
|
||||
l.Error(logs.InvalidLifetimeUsingDefaultValue,
|
||||
zap.String("parameter", cfgEntry),
|
||||
zap.Duration("value in config", lifetime),
|
||||
zap.Duration("default", defaultValue))
|
||||
} else {
|
||||
return lifetime
|
||||
}
|
||||
}
|
||||
|
||||
return defaultValue
|
||||
}
|
||||
|
||||
func fetchCacheSize(v *viper.Viper, l *zap.Logger, cfgEntry string, defaultValue int) int {
|
||||
if v.IsSet(cfgEntry) {
|
||||
size := v.GetInt(cfgEntry)
|
||||
if size <= 0 {
|
||||
l.Error(logs.InvalidCacheSizeUsingDefaultValue,
|
||||
zap.String("parameter", cfgEntry),
|
||||
zap.Int("value in config", size),
|
||||
zap.Int("default", defaultValue))
|
||||
} else {
|
||||
return size
|
||||
}
|
||||
}
|
||||
|
||||
return defaultValue
|
||||
}
|
||||
|
|
|
@ -98,3 +98,14 @@ HTTP_GW_TRACING_ENDPOINT="localhost:4317"
|
|||
HTTP_GW_TRACING_EXPORTER="otlp_grpc"
|
||||
|
||||
HTTP_GW_RUNTIME_SOFT_MEMORY_LIMIT=1073741824
|
||||
|
||||
# Parameters of requests to FrostFS
|
||||
# This flag enables client side object preparing.
|
||||
HTTP_GW_FROSTFS_CLIENT_CUT=false
|
||||
# Sets max buffer size for read payload in put operations.
|
||||
HTTP_GW_FROSTFS_BUFFER_MAX_SIZE_FOR_PUT=1048576
|
||||
|
||||
# Caching
|
||||
# Cache which contains mapping of bucket name to bucket info
|
||||
HTTP_GW_CACHE_BUCKETS_LIFETIME=1m
|
||||
HTTP_GW_CACHE_BUCKETS_SIZE=1000
|
||||
|
|
|
@ -104,3 +104,17 @@ zip:
|
|||
|
||||
runtime:
|
||||
soft_memory_limit: 1gb
|
||||
|
||||
# Parameters of requests to FrostFS
|
||||
frostfs:
|
||||
# This flag enables client side object preparing.
|
||||
client_cut: false
|
||||
# Sets max buffer size for read payload in put operations.
|
||||
buffer_max_size_for_put: 1048576
|
||||
|
||||
# Caching
|
||||
cache:
|
||||
# Cache which contains mapping of bucket name to bucket info
|
||||
buckets:
|
||||
lifetime: 1m
|
||||
size: 1000
|
||||
|
|
|
@ -54,6 +54,8 @@ $ cat http.log
|
|||
| `prometheus` | [Prometheus configuration](#prometheus-section) |
|
||||
| `tracing` | [Tracing configuration](#tracing-section) |
|
||||
| `runtime` | [Runtime configuration](#runtime-section) |
|
||||
| `frostfs` | [Frostfs configuration](#frostfs-section) |
|
||||
| `cache` | [Cache configuration](#cache-section) |
|
||||
|
||||
|
||||
# General section
|
||||
|
@ -268,4 +270,47 @@ runtime:
|
|||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|---------------------|--------|---------------|---------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| `soft_memory_limit` | `size` | yes | maxint64 | Soft memory limit for the runtime. Zero or no value stands for no limit. If `GOMEMLIMIT` environment variable is set, the value from the configuration file will be ignored. |
|
||||
| `soft_memory_limit` | `size` | yes | maxint64 | Soft memory limit for the runtime. Zero or no value stands for no limit. If `GOMEMLIMIT` environment variable is set, the value from the configuration file will be ignored. |
|
||||
|
||||
# `frostfs` section
|
||||
|
||||
Contains parameters of requests to FrostFS.
|
||||
|
||||
```yaml
|
||||
frostfs:
|
||||
client_cut: false
|
||||
buffer_max_size_for_put: 1048576 # 1mb
|
||||
```
|
||||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|---------------------------|----------|---------------|---------------|----------------------------------------------------------|
|
||||
| `client_cut` | `bool` | yes | `false` | This flag enables client side object preparing. |
|
||||
| `buffer_max_size_for_put` | `uint64` | yes | `1048576` | Sets max buffer size for read payload in put operations. |
|
||||
|
||||
|
||||
### `cache` section
|
||||
|
||||
```yaml
|
||||
cache:
|
||||
buckets:
|
||||
lifetime: 1m
|
||||
size: 1000
|
||||
|
||||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|-----------------|-----------------------------------|-----------------------------------|----------------------------------------------------------------------------------------|
|
||||
| `buckets` | [Cache config](#cache-subsection) | `lifetime: 60s`<br>`size: 1000` | Cache which contains mapping of bucket name to bucket info. |
|
||||
|
||||
|
||||
#### `cache` subsection
|
||||
|
||||
```yaml
|
||||
lifetime: 1m
|
||||
size: 1000
|
||||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|------------|------------|------------------|-------------------------------|
|
||||
| `lifetime` | `duration` | depends on cache | Lifetime of entries in cache. |
|
||||
| `size` | `int` | depends on cache | LRU cache size. |
|
||||
|
|
3
go.mod
3
go.mod
|
@ -5,7 +5,8 @@ go 1.20
|
|||
require (
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.15.1-0.20230802075510-964c3edb3f44
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230802103237-363f153eafa6
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230825064515-46a214d065f8
|
||||
github.com/bluele/gcache v0.0.2
|
||||
github.com/fasthttp/router v1.4.1
|
||||
github.com/nspcc-dev/neo-go v0.101.2-0.20230601131642-a0117042e8fc
|
||||
github.com/prometheus/client_golang v1.15.1
|
||||
|
|
6
go.sum
6
go.sum
|
@ -45,8 +45,8 @@ git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSV
|
|||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 h1:aGQ6QaAnTerQ5Dq5b2/f9DUQtSqPkZZ/bkMx/HKuLCo=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6/go.mod h1:W8Nn08/l6aQ7UlIbpF7FsQou7TVpcRD1ZT1KG4TrFhE=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230802103237-363f153eafa6 h1:u6lzNotV6MEMNEG/XeS7g+FjPrrf+j4gnOHtvun2KJc=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230802103237-363f153eafa6/go.mod h1:LI2GOj0pEx0jYTjB3QHja2PNhQFYL2pCm71RAFwDv0M=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230825064515-46a214d065f8 h1:0s2RkATjdtK/5fHjRGsIi8qMvc9IoeMZgMX5ddMwI+I=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20230825064515-46a214d065f8/go.mod h1:t1akKcUH7iBrFHX8rSXScYMP17k2kYQXMbZooiL5Juw=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA=
|
||||
|
@ -138,6 +138,8 @@ github.com/bitly/go-simplejson v0.5.0/go.mod h1:cXHtHw4XUPsvGaxgjIAn8PhEWG9NfngE
|
|||
github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
|
||||
github.com/blang/semver v3.1.0+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
|
||||
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
|
||||
github.com/bluele/gcache v0.0.2 h1:WcbfdXICg7G/DGBh1PFfcirkWOQV+v077yF1pSy3DGw=
|
||||
github.com/bluele/gcache v0.0.2/go.mod h1:m15KV+ECjptwSPxKhOhQoAFQVtUFjTVkc3H8o0t/fp0=
|
||||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
|
||||
github.com/bshuster-repo/logrus-logstash-hook v0.4.1/go.mod h1:zsTqEiSzDgAa/8GZR7E1qaXrhYNDKBYy5/dWPTIflbk=
|
||||
github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ=
|
||||
|
|
68
internal/cache/buckets.go
vendored
Normal file
68
internal/cache/buckets.go
vendored
Normal file
|
@ -0,0 +1,68 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"github.com/bluele/gcache"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// BucketCache contains cache with objects and the lifetime of cache entries.
|
||||
type BucketCache struct {
|
||||
cache gcache.Cache
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// Config stores expiration params for cache.
|
||||
type Config struct {
|
||||
Size int
|
||||
Lifetime time.Duration
|
||||
Logger *zap.Logger
|
||||
}
|
||||
|
||||
const (
|
||||
// DefaultBucketCacheSize is a default maximum number of entries in cache.
|
||||
DefaultBucketCacheSize = 1e3
|
||||
// DefaultBucketCacheLifetime is a default lifetime of entries in cache.
|
||||
DefaultBucketCacheLifetime = time.Minute
|
||||
)
|
||||
|
||||
// DefaultBucketConfig returns new default cache expiration values.
|
||||
func DefaultBucketConfig(logger *zap.Logger) *Config {
|
||||
return &Config{
|
||||
Size: DefaultBucketCacheSize,
|
||||
Lifetime: DefaultBucketCacheLifetime,
|
||||
Logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// NewBucketCache creates an object of BucketCache.
|
||||
func NewBucketCache(config *Config) *BucketCache {
|
||||
gc := gcache.New(config.Size).LRU().Expiration(config.Lifetime).Build()
|
||||
return &BucketCache{cache: gc, logger: config.Logger}
|
||||
}
|
||||
|
||||
// Get returns a cached object.
|
||||
func (o *BucketCache) Get(key string) *data.BucketInfo {
|
||||
entry, err := o.cache.Get(key)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
result, ok := entry.(*data.BucketInfo)
|
||||
if !ok {
|
||||
o.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
|
||||
zap.String("expected", fmt.Sprintf("%T", result)))
|
||||
return nil
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// Put puts an object to cache.
|
||||
func (o *BucketCache) Put(bkt *data.BucketInfo) error {
|
||||
return o.cache.Set(bkt.Name, bkt)
|
||||
}
|
12
internal/data/bucket.go
Normal file
12
internal/data/bucket.go
Normal file
|
@ -0,0 +1,12 @@
|
|||
package data
|
||||
|
||||
import (
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
)
|
||||
|
||||
type BucketInfo struct {
|
||||
Name string // container name from system attribute
|
||||
Zone string // container zone from system attribute
|
||||
CID cid.ID
|
||||
HomomorphicHashDisabled bool
|
||||
}
|
|
@ -14,8 +14,6 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -30,7 +28,7 @@ func (h *Handler) DownloadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
|||
var id oid.ID
|
||||
err := id.DecodeString(test)
|
||||
if err != nil {
|
||||
h.byBucketname(c, h.receiveFile)
|
||||
h.byObjectName(c, h.receiveFile)
|
||||
} else {
|
||||
h.byAddress(c, h.receiveFile)
|
||||
}
|
||||
|
@ -63,13 +61,6 @@ func (h *Handler) search(ctx context.Context, cid *cid.ID, key, val string, op o
|
|||
return h.pool.SearchObjects(ctx, prm)
|
||||
}
|
||||
|
||||
func (h *Handler) getContainer(ctx context.Context, cnrID cid.ID) (container.Container, error) {
|
||||
var prm pool.PrmContainerGet
|
||||
prm.SetContainerID(cnrID)
|
||||
|
||||
return h.pool.GetContainer(ctx, prm)
|
||||
}
|
||||
|
||||
func (h *Handler) addObjectToZip(zw *zip.Writer, obj *object.Object) (io.Writer, error) {
|
||||
method := zip.Store
|
||||
if h.config.ZipCompression() {
|
||||
|
@ -96,27 +87,13 @@ func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
|
|||
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
|
||||
containerID, err := h.getContainerID(ctx, scid)
|
||||
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
||||
if err != nil {
|
||||
log.Error(logs.WrongContainerID, zap.Error(err))
|
||||
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
|
||||
// check if container exists here to be able to return 404 error,
|
||||
// otherwise we get this error only in object iteration step
|
||||
// and client get 200 OK.
|
||||
if _, err = h.getContainer(ctx, *containerID); err != nil {
|
||||
log.Error(logs.CouldNotCheckContainerExistence, zap.Error(err))
|
||||
if client.IsErrContainerNotFound(err) {
|
||||
response.Error(c, "Not Found", fasthttp.StatusNotFound)
|
||||
return
|
||||
}
|
||||
response.Error(c, "could not check container existence: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
resSearch, err := h.search(ctx, containerID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||
resSearch, err := h.search(ctx, &bktInfo.CID, object.AttributeFilePath, prefix, object.MatchCommonPrefix)
|
||||
if err != nil {
|
||||
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
||||
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
|
@ -138,7 +115,7 @@ func (h *Handler) DownloadZipped(c *fasthttp.RequestCtx) {
|
|||
empty := true
|
||||
called := false
|
||||
btoken := bearerToken(ctx)
|
||||
addr.SetContainer(*containerID)
|
||||
addr.SetContainer(bktInfo.CID)
|
||||
|
||||
errIter := resSearch.Iterate(func(id oid.ID) bool {
|
||||
called = true
|
||||
|
|
|
@ -3,14 +3,20 @@ package handler
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/utils"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -23,6 +29,8 @@ import (
|
|||
type Config interface {
|
||||
DefaultTimestamp() bool
|
||||
ZipCompression() bool
|
||||
ClientCut() bool
|
||||
BufferMaxSizeForPut() uint64
|
||||
}
|
||||
|
||||
type Handler struct {
|
||||
|
@ -32,6 +40,7 @@ type Handler struct {
|
|||
config Config
|
||||
containerResolver *resolver.ContainerResolver
|
||||
tree *tree.Tree
|
||||
cache *cache.BucketCache
|
||||
}
|
||||
|
||||
func New(params *utils.AppParams, config Config, tree *tree.Tree) *Handler {
|
||||
|
@ -42,20 +51,10 @@ func New(params *utils.AppParams, config Config, tree *tree.Tree) *Handler {
|
|||
config: config,
|
||||
containerResolver: params.Resolver,
|
||||
tree: tree,
|
||||
cache: params.Cache,
|
||||
}
|
||||
}
|
||||
|
||||
// getContainerID decode container id, if it's not a valid container id
|
||||
// then trey to resolve name using provided resolver.
|
||||
func (h *Handler) getContainerID(ctx context.Context, containerID string) (*cid.ID, error) {
|
||||
cnrID := new(cid.ID)
|
||||
err := cnrID.DecodeString(containerID)
|
||||
if err != nil {
|
||||
cnrID, err = h.containerResolver.Resolve(ctx, containerID)
|
||||
}
|
||||
return cnrID, err
|
||||
}
|
||||
|
||||
// byAddress is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||
// prepares request and object address to it.
|
||||
func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
|
@ -67,10 +66,9 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
|
|||
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
|
||||
cnrID, err := h.getContainerID(ctx, idCnr)
|
||||
bktInfo, err := h.getBucketInfo(ctx, idCnr, log)
|
||||
if err != nil {
|
||||
log.Error(logs.WrongContainerID, zap.Error(err))
|
||||
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -82,15 +80,15 @@ func (h *Handler) byAddress(c *fasthttp.RequestCtx, f func(context.Context, requ
|
|||
}
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(*cnrID)
|
||||
addr.SetContainer(bktInfo.CID)
|
||||
addr.SetObject(*objID)
|
||||
|
||||
f(ctx, *h.newRequest(c, log), addr)
|
||||
}
|
||||
|
||||
// byBucketname is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||
// byObjectName is a wrapper for function (e.g. request.headObject, request.receiveFile) that
|
||||
// prepares request and object address to it.
|
||||
func (h *Handler) byBucketname(req *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
func (h *Handler) byObjectName(req *fasthttp.RequestCtx, f func(context.Context, request, oid.Address)) {
|
||||
var (
|
||||
bucketname = req.UserValue("cid").(string)
|
||||
key = req.UserValue("oid").(string)
|
||||
|
@ -99,16 +97,20 @@ func (h *Handler) byBucketname(req *fasthttp.RequestCtx, f func(context.Context,
|
|||
|
||||
ctx := utils.GetContextFromRequest(req)
|
||||
|
||||
cnrID, err := h.getContainerID(ctx, bucketname)
|
||||
bktInfo, err := h.getBucketInfo(ctx, bucketname, log)
|
||||
if err != nil {
|
||||
log.Error(logs.WrongContainerID, zap.Error(err))
|
||||
response.Error(req, "wrong container id", fasthttp.StatusBadRequest)
|
||||
logAndSendBucketError(req, log, err)
|
||||
return
|
||||
}
|
||||
|
||||
foundOid, err := h.tree.GetLatestVersion(ctx, cnrID, key)
|
||||
foundOid, err := h.tree.GetLatestVersion(ctx, &bktInfo.CID, key)
|
||||
if err != nil {
|
||||
log.Error(logs.ObjectWasntFound, zap.Error(err))
|
||||
if errors.Is(err, tree.ErrNodeAccessDenied) {
|
||||
response.Error(req, "Access Denied", fasthttp.StatusForbidden)
|
||||
return
|
||||
}
|
||||
log.Error(logs.GetLatestObjectVersion, zap.Error(err))
|
||||
|
||||
response.Error(req, "object wasn't found", fasthttp.StatusNotFound)
|
||||
return
|
||||
}
|
||||
|
@ -119,7 +121,7 @@ func (h *Handler) byBucketname(req *fasthttp.RequestCtx, f func(context.Context,
|
|||
}
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(*cnrID)
|
||||
addr.SetContainer(bktInfo.CID)
|
||||
addr.SetObject(foundOid.OID)
|
||||
|
||||
f(ctx, *h.newRequest(req, log), addr)
|
||||
|
@ -136,14 +138,13 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re
|
|||
|
||||
ctx := utils.GetContextFromRequest(c)
|
||||
|
||||
containerID, err := h.getContainerID(ctx, scid)
|
||||
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
||||
if err != nil {
|
||||
log.Error(logs.WrongContainerID, zap.Error(err))
|
||||
response.Error(c, "wrong container id", fasthttp.StatusBadRequest)
|
||||
logAndSendBucketError(c, log, err)
|
||||
return
|
||||
}
|
||||
|
||||
res, err := h.search(ctx, containerID, key, val, object.MatchStringEqual)
|
||||
res, err := h.search(ctx, &bktInfo.CID, key, val, object.MatchStringEqual)
|
||||
if err != nil {
|
||||
log.Error(logs.CouldNotSearchForObjects, zap.Error(err))
|
||||
response.Error(c, "could not search for objects: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
|
@ -168,8 +169,69 @@ func (h *Handler) byAttribute(c *fasthttp.RequestCtx, f func(context.Context, re
|
|||
}
|
||||
|
||||
var addrObj oid.Address
|
||||
addrObj.SetContainer(*containerID)
|
||||
addrObj.SetContainer(bktInfo.CID)
|
||||
addrObj.SetObject(buf[0])
|
||||
|
||||
f(ctx, *h.newRequest(c, log), addrObj)
|
||||
}
|
||||
|
||||
// resolveContainer decode container id, if it's not a valid container id
|
||||
// then trey to resolve name using provided resolver.
|
||||
func (h *Handler) resolveContainer(ctx context.Context, containerID string) (*cid.ID, error) {
|
||||
cnrID := new(cid.ID)
|
||||
err := cnrID.DecodeString(containerID)
|
||||
if err != nil {
|
||||
cnrID, err = h.containerResolver.Resolve(ctx, containerID)
|
||||
if err != nil && strings.Contains(err.Error(), "not found") {
|
||||
err = fmt.Errorf("%w: %s", new(apistatus.ContainerNotFound), err.Error())
|
||||
}
|
||||
}
|
||||
return cnrID, err
|
||||
}
|
||||
|
||||
func (h *Handler) getBucketInfo(ctx context.Context, containerName string, log *zap.Logger) (*data.BucketInfo, error) {
|
||||
if bktInfo := h.cache.Get(containerName); bktInfo != nil {
|
||||
return bktInfo, nil
|
||||
}
|
||||
|
||||
cnrID, err := h.resolveContainer(ctx, containerName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bktInfo, err := h.readContainer(ctx, *cnrID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = h.cache.Put(bktInfo); err != nil {
|
||||
log.Warn(logs.CouldntPutBucketIntoCache,
|
||||
zap.String("bucket name", bktInfo.Name),
|
||||
zap.Stringer("bucket cid", bktInfo.CID),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
||||
return bktInfo, nil
|
||||
}
|
||||
|
||||
func (h *Handler) readContainer(ctx context.Context, cnrID cid.ID) (*data.BucketInfo, error) {
|
||||
prm := pool.PrmContainerGet{ContainerID: cnrID}
|
||||
res, err := h.pool.GetContainer(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get frostfs container '%s': %w", cnrID.String(), err)
|
||||
}
|
||||
|
||||
bktInfo := &data.BucketInfo{
|
||||
CID: cnrID,
|
||||
Name: cnrID.EncodeToString(),
|
||||
}
|
||||
|
||||
if domain := container.ReadDomain(res); domain.Name() != "" {
|
||||
bktInfo.Name = domain.Name()
|
||||
bktInfo.Zone = domain.Zone()
|
||||
}
|
||||
|
||||
bktInfo.HomomorphicHashDisabled = container.IsHomomorphicHashingDisabled(res)
|
||||
|
||||
return bktInfo, err
|
||||
}
|
||||
|
|
|
@ -110,7 +110,7 @@ func (h *Handler) HeadByAddressOrBucketName(c *fasthttp.RequestCtx) {
|
|||
|
||||
err := id.DecodeString(test)
|
||||
if err != nil {
|
||||
h.byBucketname(c, h.headObject)
|
||||
h.byObjectName(c, h.headObject)
|
||||
} else {
|
||||
h.byAddress(c, h.headObject)
|
||||
}
|
||||
|
|
|
@ -57,10 +57,9 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
|
|||
|
||||
ctx := utils.GetContextFromRequest(req)
|
||||
|
||||
idCnr, err := h.getContainerID(ctx, scid)
|
||||
bktInfo, err := h.getBucketInfo(ctx, scid, log)
|
||||
if err != nil {
|
||||
log.Error(logs.WrongContainerID, zap.Error(err))
|
||||
response.Error(req, "wrong container id", fasthttp.StatusBadRequest)
|
||||
logAndSendBucketError(req, log, err)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -129,13 +128,16 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
|
|||
}
|
||||
|
||||
obj := object.New()
|
||||
obj.SetContainerID(*idCnr)
|
||||
obj.SetContainerID(bktInfo.CID)
|
||||
obj.SetOwnerID(h.ownerID)
|
||||
obj.SetAttributes(attributes...)
|
||||
|
||||
var prm pool.PrmObjectPut
|
||||
prm.SetHeader(*obj)
|
||||
prm.SetPayload(file)
|
||||
prm.SetClientCut(h.config.ClientCut())
|
||||
prm.SetBufferMaxSize(h.config.BufferMaxSizeForPut())
|
||||
prm.WithoutHomomorphicHash(bktInfo.HomomorphicHashDisabled)
|
||||
|
||||
bt := h.fetchBearerToken(ctx)
|
||||
if bt != nil {
|
||||
|
@ -148,7 +150,7 @@ func (h *Handler) Upload(req *fasthttp.RequestCtx) {
|
|||
}
|
||||
|
||||
addr.SetObject(idObj)
|
||||
addr.SetContainer(*idCnr)
|
||||
addr.SetContainer(bktInfo.CID)
|
||||
|
||||
// Try to return the response, otherwise, if something went wrong, throw an error.
|
||||
if err = newPutResponse(addr).encode(req); err != nil {
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/response"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/tokens"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
"github.com/valyala/fasthttp"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
@ -58,3 +59,13 @@ func isValidValue(s string) bool {
|
|||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func logAndSendBucketError(c *fasthttp.RequestCtx, log *zap.Logger, err error) {
|
||||
log.Error(logs.CouldntGetBucket, zap.Error(err))
|
||||
|
||||
if client.IsErrContainerNotFound(err) {
|
||||
response.Error(c, "Not Found", fasthttp.StatusNotFound)
|
||||
return
|
||||
}
|
||||
response.Error(c, "could not get bucket: "+err.Error(), fasthttp.StatusBadRequest)
|
||||
}
|
||||
|
|
|
@ -6,12 +6,11 @@ const (
|
|||
CouldNotReceiveObject = "could not receive object" // Error in ../../downloader/download.go
|
||||
WrongContainerID = "wrong container id" // Error in ../../downloader/download.go and uploader/upload.go
|
||||
WrongObjectID = "wrong object id" // Error in ../../downloader/download.go
|
||||
ObjectWasntFound = "object wasn't found" // Error in ../../downloader/download.go
|
||||
GetLatestObjectVersion = "get latest object version" // Error in ../../downloader/download.go
|
||||
ObjectWasDeleted = "object was deleted" // Error in ../../downloader/download.go
|
||||
CouldNotSearchForObjects = "could not search for objects" // Error in ../../downloader/download.go
|
||||
ObjectNotFound = "object not found" // Error in ../../downloader/download.go
|
||||
ReadObjectListFailed = "read object list failed" // Error in ../../downloader/download.go
|
||||
CouldNotCheckContainerExistence = "could not check container existence" // Error in ../../downloader/download.go
|
||||
FailedToAddObjectToArchive = "failed to add object to archive" // Error in ../../downloader/download.go
|
||||
IteratingOverSelectedObjectsFailed = "iterating over selected objects failed" // Error in ../../downloader/download.go
|
||||
ObjectsNotFound = "objects not found" // Error in ../../downloader/download.go
|
||||
|
@ -68,4 +67,10 @@ const (
|
|||
FailedToCreateTreePool = "failed to create tree pool" // Fatal in ../../settings.go
|
||||
FailedToDialTreePool = "failed to dial tree pool" // Fatal in ../../settings.go
|
||||
AddedStoragePeer = "added storage peer" // Info in ../../settings.go
|
||||
CouldntGetBucket = "could not get bucket" // Error in ../handler/utils.go
|
||||
CouldntPutBucketIntoCache = "couldn't put bucket info into cache" // Warn in ../handler/handler.go
|
||||
InvalidCacheEntryType = "invalid cache entry type" // Warn in ../cache/buckets.go
|
||||
InvalidLifetimeUsingDefaultValue = "invalid lifetime, using default value (in seconds)" // Error in ../../cmd/http-gw/settings.go
|
||||
InvalidCacheSizeUsingDefaultValue = "invalid cache size, using default value" // Error in ../../cmd/http-gw/settings.go
|
||||
|
||||
)
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package utils
|
||||
|
||||
import (
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/internal/cache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-http-gw/resolver"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
|
@ -12,4 +13,5 @@ type AppParams struct {
|
|||
Pool *pool.Pool
|
||||
Owner *user.ID
|
||||
Resolver *resolver.ContainerResolver
|
||||
Cache *cache.BucketCache
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue