forked from TrueCloudLab/frostfs-s3-gw
Compare commits
10 commits
d6b506f6d9
...
4424aeeb71
Author | SHA1 | Date | |
---|---|---|---|
4424aeeb71 | |||
a0aba8bcb1 | |||
d0e4d55772 | |||
42e72889a5 | |||
98815d5473 | |||
62615d7ab7 | |||
575ab4d294 | |||
d919e6cce2 | |||
056f168d77 | |||
9bdfe2a016 |
50 changed files with 1629 additions and 204 deletions
|
@ -3,11 +3,12 @@ FROM golang:1.22 AS builder
|
|||
ARG BUILD=now
|
||||
ARG REPO=git.frostfs.info/TrueCloudLab/frostfs-s3-gw
|
||||
ARG VERSION=dev
|
||||
ARG GOFLAGS=""
|
||||
|
||||
WORKDIR /src
|
||||
COPY . /src
|
||||
|
||||
RUN make
|
||||
RUN make GOFLAGS=${GOFLAGS}
|
||||
|
||||
# Executable image
|
||||
FROM alpine AS frostfs-s3-gw
|
||||
|
|
6
Makefile
6
Makefile
|
@ -14,6 +14,8 @@ METRICS_DUMP_OUT ?= ./metrics-dump.json
|
|||
CMDS = $(addprefix frostfs-, $(notdir $(wildcard cmd/*)))
|
||||
BINS = $(addprefix $(BINDIR)/, $(CMDS))
|
||||
|
||||
GOFLAGS ?=
|
||||
|
||||
# Variables for docker
|
||||
REPO_BASENAME = $(shell basename `go list -m`)
|
||||
HUB_IMAGE ?= "truecloudlab/$(REPO_BASENAME)"
|
||||
|
@ -44,6 +46,7 @@ all: $(BINS)
|
|||
$(BINS): $(BINDIR) dep
|
||||
@echo "⇒ Build $@"
|
||||
CGO_ENABLED=0 \
|
||||
GOFLAGS=$(GOFLAGS) \
|
||||
go build -v -trimpath \
|
||||
-ldflags "-X $(REPO)/internal/version.Version=$(VERSION)" \
|
||||
-o $@ ./cmd/$(subst frostfs-,,$(notdir $@))
|
||||
|
@ -70,7 +73,7 @@ docker/%:
|
|||
-w /src \
|
||||
-u `stat -c "%u:%g" .` \
|
||||
--env HOME=/src \
|
||||
golang:$(GO_VERSION) make $*,\
|
||||
golang:$(GO_VERSION) make GOFLAGS=$(GOFLAGS) $*,\
|
||||
@echo "supported docker targets: all $(BINS) lint")
|
||||
|
||||
# Run tests
|
||||
|
@ -121,6 +124,7 @@ image:
|
|||
@docker build \
|
||||
--build-arg REPO=$(REPO) \
|
||||
--build-arg VERSION=$(VERSION) \
|
||||
--build-arg GOFLAGS=$(GOFLAGS) \
|
||||
--rm \
|
||||
-f .docker/Dockerfile \
|
||||
-t $(HUB_IMAGE):$(HUB_TAG) .
|
||||
|
|
|
@ -22,8 +22,8 @@ import (
|
|||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
)
|
||||
|
||||
// authorizationFieldRegexp -- is regexp for credentials with Base58 encoded cid and oid and '0' (zero) as delimiter.
|
||||
var authorizationFieldRegexp = regexp.MustCompile(`AWS4-HMAC-SHA256 Credential=(?P<access_key_id>[^/]+)/(?P<date>[^/]+)/(?P<region>[^/]*)/(?P<service>[^/]+)/aws4_request,\s*SignedHeaders=(?P<signed_header_fields>.+),\s*Signature=(?P<v4_signature>.+)`)
|
||||
// AuthorizationFieldRegexp -- is regexp for credentials with Base58 encoded cid and oid and '0' (zero) as delimiter.
|
||||
var AuthorizationFieldRegexp = regexp.MustCompile(`AWS4-HMAC-SHA256 Credential=(?P<access_key_id>[^/]+)/(?P<date>[^/]+)/(?P<region>[^/]*)/(?P<service>[^/]+)/aws4_request,\s*SignedHeaders=(?P<signed_header_fields>.+),\s*Signature=(?P<v4_signature>.+)`)
|
||||
|
||||
// postPolicyCredentialRegexp -- is regexp for credentials when uploading file using POST with policy.
|
||||
var postPolicyCredentialRegexp = regexp.MustCompile(`(?P<access_key_id>[^/]+)/(?P<date>[^/]+)/(?P<region>[^/]*)/(?P<service>[^/]+)/aws4_request`)
|
||||
|
@ -85,7 +85,7 @@ var ContentSHA256HeaderStandardValue = map[string]struct{}{
|
|||
func New(creds tokens.Credentials, prefixes []string) *Center {
|
||||
return &Center{
|
||||
cli: creds,
|
||||
reg: NewRegexpMatcher(authorizationFieldRegexp),
|
||||
reg: NewRegexpMatcher(AuthorizationFieldRegexp),
|
||||
postReg: NewRegexpMatcher(postPolicyCredentialRegexp),
|
||||
allowedAccessKeyIDPrefixes: prefixes,
|
||||
}
|
||||
|
|
|
@ -32,7 +32,7 @@ func TestAuthHeaderParse(t *testing.T) {
|
|||
defaultHeader := "AWS4-HMAC-SHA256 Credential=oid0cid/20210809/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=2811ccb9e242f41426738fb1f"
|
||||
|
||||
center := &Center{
|
||||
reg: NewRegexpMatcher(authorizationFieldRegexp),
|
||||
reg: NewRegexpMatcher(AuthorizationFieldRegexp),
|
||||
}
|
||||
|
||||
for _, tc := range []struct {
|
||||
|
|
|
@ -85,7 +85,7 @@ func TestCheckSign(t *testing.T) {
|
|||
|
||||
c := &Center{
|
||||
cli: mock,
|
||||
reg: NewRegexpMatcher(authorizationFieldRegexp),
|
||||
reg: NewRegexpMatcher(AuthorizationFieldRegexp),
|
||||
postReg: NewRegexpMatcher(postPolicyCredentialRegexp),
|
||||
}
|
||||
box, err := c.Authenticate(req)
|
||||
|
|
65
api/cache/network_info.go
vendored
Normal file
65
api/cache/network_info.go
vendored
Normal file
|
@ -0,0 +1,65 @@
|
|||
package cache
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"github.com/bluele/gcache"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
// NetworkInfoCache provides cache for network info.
|
||||
NetworkInfoCache struct {
|
||||
cache gcache.Cache
|
||||
logger *zap.Logger
|
||||
}
|
||||
|
||||
// NetworkInfoCacheConfig stores expiration params for cache.
|
||||
NetworkInfoCacheConfig struct {
|
||||
Lifetime time.Duration
|
||||
Logger *zap.Logger
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
DefaultNetworkInfoCacheLifetime = 1 * time.Minute
|
||||
networkInfoCacheSize = 1
|
||||
networkInfoKey = "network_info"
|
||||
)
|
||||
|
||||
// DefaultNetworkInfoConfig returns new default cache expiration values.
|
||||
func DefaultNetworkInfoConfig(logger *zap.Logger) *NetworkInfoCacheConfig {
|
||||
return &NetworkInfoCacheConfig{
|
||||
Lifetime: DefaultNetworkInfoCacheLifetime,
|
||||
Logger: logger,
|
||||
}
|
||||
}
|
||||
|
||||
// NewNetworkInfoCache creates an object of NetworkInfoCache.
|
||||
func NewNetworkInfoCache(config *NetworkInfoCacheConfig) *NetworkInfoCache {
|
||||
gc := gcache.New(networkInfoCacheSize).LRU().Expiration(config.Lifetime).Build()
|
||||
return &NetworkInfoCache{cache: gc, logger: config.Logger}
|
||||
}
|
||||
|
||||
func (c *NetworkInfoCache) Get() *netmap.NetworkInfo {
|
||||
entry, err := c.cache.Get(networkInfoKey)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
result, ok := entry.(netmap.NetworkInfo)
|
||||
if !ok {
|
||||
c.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
|
||||
zap.String("expected", fmt.Sprintf("%T", result)))
|
||||
return nil
|
||||
}
|
||||
|
||||
return &result
|
||||
}
|
||||
|
||||
func (c *NetworkInfoCache) Put(info netmap.NetworkInfo) error {
|
||||
return c.cache.Set(networkInfoKey, info)
|
||||
}
|
|
@ -126,6 +126,14 @@ type PartInfo struct {
|
|||
Created time.Time `json:"created"`
|
||||
}
|
||||
|
||||
type PartInfoExtended struct {
|
||||
PartInfo
|
||||
|
||||
// Timestamp is used to find the latest version of part info in case of tree split
|
||||
// when there are multiple nodes for the same part.
|
||||
Timestamp uint64
|
||||
}
|
||||
|
||||
// ToHeaderString form short part representation to use in S3-Completed-Parts header.
|
||||
func (p *PartInfo) ToHeaderString() string {
|
||||
// ETag value contains SHA256 checksum which is used while getting object parts attributes.
|
||||
|
|
|
@ -288,6 +288,21 @@ func completeMultipartUploadBase(hc *handlerContext, bktName, objName, uploadID
|
|||
return w
|
||||
}
|
||||
|
||||
func abortMultipartUpload(hc *handlerContext, bktName, objName, uploadID string) {
|
||||
w := abortMultipartUploadBase(hc, bktName, objName, uploadID)
|
||||
assertStatus(hc.t, w, http.StatusNoContent)
|
||||
}
|
||||
|
||||
func abortMultipartUploadBase(hc *handlerContext, bktName, objName, uploadID string) *httptest.ResponseRecorder {
|
||||
query := make(url.Values)
|
||||
query.Set(uploadIDQuery, uploadID)
|
||||
|
||||
w, r := prepareTestFullRequest(hc, bktName, objName, query, nil)
|
||||
hc.Handler().AbortMultipartUploadHandler(w, r)
|
||||
|
||||
return w
|
||||
}
|
||||
|
||||
func uploadPartEncrypted(hc *handlerContext, bktName, objName, uploadID string, num, size int) (string, []byte) {
|
||||
return uploadPartBase(hc, bktName, objName, true, uploadID, num, size)
|
||||
}
|
||||
|
|
|
@ -167,7 +167,7 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig) (*handlerContextBas
|
|||
tp := layer.NewTestFrostFS(key)
|
||||
|
||||
testResolver := &resolver.Resolver{Name: "test_resolver"}
|
||||
testResolver.SetResolveFunc(func(_ context.Context, name string) (cid.ID, error) {
|
||||
testResolver.SetResolveFunc(func(_ context.Context, _, name string) (cid.ID, error) {
|
||||
return tp.ContainerID(name)
|
||||
})
|
||||
|
||||
|
@ -243,6 +243,7 @@ func getMinCacheConfig(logger *zap.Logger) *layer.CachesConfig {
|
|||
Buckets: minCacheCfg,
|
||||
System: minCacheCfg,
|
||||
AccessControl: minCacheCfg,
|
||||
NetworkInfo: &cache.NetworkInfoCacheConfig{Lifetime: minCacheCfg.Lifetime},
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -205,10 +205,7 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
var size uint64
|
||||
if r.ContentLength > 0 {
|
||||
size = uint64(r.ContentLength)
|
||||
}
|
||||
size := h.getPutPayloadSize(r)
|
||||
|
||||
p := &layer.UploadPartParams{
|
||||
Info: &layer.UploadInfoParams{
|
||||
|
|
|
@ -17,6 +17,10 @@ import (
|
|||
s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -122,6 +126,108 @@ func TestMultipartReUploadPart(t *testing.T) {
|
|||
equalDataSlices(t, append(data1, data2...), data)
|
||||
}
|
||||
|
||||
func TestMultipartRemovePartsSplit(t *testing.T) {
|
||||
bktName, objName := "bucket-to-upload-part", "object-multipart"
|
||||
partSize := 8
|
||||
|
||||
t.Run("reupload part", func(t *testing.T) {
|
||||
hc := prepareHandlerContext(t)
|
||||
bktInfo := createTestBucket(hc, bktName)
|
||||
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
|
||||
|
||||
uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
|
||||
|
||||
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
|
||||
require.NoError(t, err)
|
||||
|
||||
objID := oidtest.ID()
|
||||
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
|
||||
"Number": "1",
|
||||
"OID": objID.EncodeToString(),
|
||||
"Owner": usertest.ID().EncodeToString(),
|
||||
"ETag": "etag",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
|
||||
require.Len(t, hc.tp.Objects(), 2)
|
||||
|
||||
list := listParts(hc, bktName, objName, uploadInfo.UploadID, "0", http.StatusOK)
|
||||
require.Len(t, list.Parts, 1)
|
||||
require.Equal(t, `"etag"`, list.Parts[0].ETag)
|
||||
|
||||
etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
|
||||
list = listParts(hc, bktName, objName, uploadInfo.UploadID, "0", http.StatusOK)
|
||||
require.Len(t, list.Parts, 1)
|
||||
require.Equal(t, etag1, list.Parts[0].ETag)
|
||||
|
||||
require.Len(t, hc.tp.Objects(), 1)
|
||||
})
|
||||
|
||||
t.Run("abort multipart", func(t *testing.T) {
|
||||
hc := prepareHandlerContext(t)
|
||||
bktInfo := createTestBucket(hc, bktName)
|
||||
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
|
||||
|
||||
uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
|
||||
|
||||
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
|
||||
require.NoError(t, err)
|
||||
|
||||
objID := oidtest.ID()
|
||||
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
|
||||
"Number": "1",
|
||||
"OID": objID.EncodeToString(),
|
||||
"Owner": usertest.ID().EncodeToString(),
|
||||
"ETag": "etag",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
|
||||
require.Len(t, hc.tp.Objects(), 2)
|
||||
|
||||
abortMultipartUpload(hc, bktName, objName, uploadInfo.UploadID)
|
||||
require.Empty(t, hc.tp.Objects())
|
||||
})
|
||||
|
||||
t.Run("complete multipart", func(t *testing.T) {
|
||||
hc := prepareHandlerContext(t)
|
||||
bktInfo := createTestBucket(hc, bktName)
|
||||
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
|
||||
|
||||
etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
|
||||
|
||||
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
|
||||
require.NoError(t, err)
|
||||
|
||||
objID := oidtest.ID()
|
||||
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
|
||||
"Number": "1",
|
||||
"OID": objID.EncodeToString(),
|
||||
"Owner": usertest.ID().EncodeToString(),
|
||||
"ETag": "etag",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
|
||||
require.Len(t, hc.tp.Objects(), 2)
|
||||
|
||||
completeMultipartUpload(hc, bktName, objName, uploadInfo.UploadID, []string{etag1})
|
||||
require.Falsef(t, containsOID(hc.tp.Objects(), objID), "frostfs contains '%s' object, but shouldn't", objID)
|
||||
})
|
||||
}
|
||||
|
||||
func containsOID(objects []*object.Object, objID oid.ID) bool {
|
||||
for _, o := range objects {
|
||||
oID, _ := o.ID()
|
||||
if oID.Equals(objID) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func TestListMultipartUploads(t *testing.T) {
|
||||
hc := prepareHandlerContext(t)
|
||||
|
||||
|
|
|
@ -242,10 +242,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
metadata[api.ContentEncoding] = encodings
|
||||
}
|
||||
|
||||
var size uint64
|
||||
if r.ContentLength > 0 {
|
||||
size = uint64(r.ContentLength)
|
||||
}
|
||||
size := h.getPutPayloadSize(r)
|
||||
|
||||
params := &layer.PutObjectParams{
|
||||
BktInfo: bktInfo,
|
||||
|
|
|
@ -29,6 +29,11 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
const (
|
||||
awsChunkedRequestExampleDecodedContentLength = 66560
|
||||
awsChunkedRequestExampleContentLength = 66824
|
||||
)
|
||||
|
||||
func TestCheckBucketName(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
|
@ -361,7 +366,12 @@ func TestPutObjectWithStreamBodyAWSExample(t *testing.T) {
|
|||
hc.Handler().PutObjectHandler(w, req)
|
||||
assertStatus(t, w, http.StatusOK)
|
||||
|
||||
data := getObjectRange(t, hc, bktName, objName, 0, 66824)
|
||||
w, req = prepareTestRequest(hc, bktName, objName, nil)
|
||||
hc.Handler().HeadObjectHandler(w, req)
|
||||
assertStatus(t, w, http.StatusOK)
|
||||
require.Equal(t, strconv.Itoa(awsChunkedRequestExampleDecodedContentLength), w.Header().Get(api.ContentLength))
|
||||
|
||||
data := getObjectRange(t, hc, bktName, objName, 0, awsChunkedRequestExampleDecodedContentLength)
|
||||
for i := range chunk {
|
||||
require.Equal(t, chunk[i], data[i])
|
||||
}
|
||||
|
@ -397,6 +407,8 @@ func TestPutChunkedTestContentEncoding(t *testing.T) {
|
|||
require.Equal(t, "gzip", resp.Header().Get(api.ContentEncoding))
|
||||
}
|
||||
|
||||
// getChunkedRequest implements request example from
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
|
||||
func getChunkedRequest(ctx context.Context, t *testing.T, bktName, objName string) (*httptest.ResponseRecorder, *http.Request, []byte) {
|
||||
chunk := make([]byte, 65*1024)
|
||||
for i := range chunk {
|
||||
|
@ -424,9 +436,9 @@ func getChunkedRequest(ctx context.Context, t *testing.T, bktName, objName strin
|
|||
req, err := http.NewRequest("PUT", "https://s3.amazonaws.com/"+bktName+"/"+objName, nil)
|
||||
require.NoError(t, err)
|
||||
req.Header.Set("content-encoding", "aws-chunked")
|
||||
req.Header.Set("content-length", "66824")
|
||||
req.Header.Set("content-length", strconv.Itoa(awsChunkedRequestExampleContentLength))
|
||||
req.Header.Set("x-amz-content-sha256", "STREAMING-AWS4-HMAC-SHA256-PAYLOAD")
|
||||
req.Header.Set("x-amz-decoded-content-length", "66560")
|
||||
req.Header.Set("x-amz-decoded-content-length", strconv.Itoa(awsChunkedRequestExampleDecodedContentLength))
|
||||
req.Header.Set("x-amz-storage-class", "REDUCED_REDUNDANCY")
|
||||
|
||||
signTime, err := time.Parse("20060102T150405Z", "20130524T000000Z")
|
||||
|
|
|
@ -106,6 +106,23 @@ func (h *handler) getBucketAndCheckOwner(r *http.Request, bucket string, header
|
|||
return bktInfo, checkOwner(bktInfo, expected)
|
||||
}
|
||||
|
||||
func (h *handler) getPutPayloadSize(r *http.Request) uint64 {
|
||||
decodeContentSize := r.Header.Get(api.AmzDecodedContentLength)
|
||||
decodedSize, err := strconv.Atoi(decodeContentSize)
|
||||
if err != nil {
|
||||
decodedSize = 0
|
||||
}
|
||||
|
||||
var size uint64
|
||||
if decodedSize > 0 {
|
||||
size = uint64(decodedSize)
|
||||
} else if r.ContentLength > 0 {
|
||||
size = uint64(r.ContentLength)
|
||||
}
|
||||
|
||||
return size
|
||||
}
|
||||
|
||||
func parseRange(s string) (*layer.RangeParams, error) {
|
||||
if s == "" {
|
||||
return nil, nil
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"go.uber.org/zap"
|
||||
|
@ -19,6 +20,7 @@ type Cache struct {
|
|||
bucketCache *cache.BucketCache
|
||||
systemCache *cache.SystemCache
|
||||
accessCache *cache.AccessControlCache
|
||||
networkInfoCache *cache.NetworkInfoCache
|
||||
}
|
||||
|
||||
// CachesConfig contains params for caches.
|
||||
|
@ -31,6 +33,7 @@ type CachesConfig struct {
|
|||
Buckets *cache.Config
|
||||
System *cache.Config
|
||||
AccessControl *cache.Config
|
||||
NetworkInfo *cache.NetworkInfoCacheConfig
|
||||
}
|
||||
|
||||
// DefaultCachesConfigs returns filled configs.
|
||||
|
@ -44,6 +47,7 @@ func DefaultCachesConfigs(logger *zap.Logger) *CachesConfig {
|
|||
Buckets: cache.DefaultBucketConfig(logger),
|
||||
System: cache.DefaultSystemConfig(logger),
|
||||
AccessControl: cache.DefaultAccessControlConfig(logger),
|
||||
NetworkInfo: cache.DefaultNetworkInfoConfig(logger),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -57,6 +61,7 @@ func NewCache(cfg *CachesConfig) *Cache {
|
|||
bucketCache: cache.NewBucketCache(cfg.Buckets),
|
||||
systemCache: cache.NewSystemCache(cfg.System),
|
||||
accessCache: cache.NewAccessControlCache(cfg.AccessControl),
|
||||
networkInfoCache: cache.NewNetworkInfoCache(cfg.NetworkInfo),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -283,3 +288,13 @@ func (c *Cache) PutLifecycleConfiguration(owner user.ID, bkt *data.BucketInfo, c
|
|||
func (c *Cache) DeleteLifecycleConfiguration(bktInfo *data.BucketInfo) {
|
||||
c.systemCache.Delete(bktInfo.LifecycleConfigurationObjectName())
|
||||
}
|
||||
|
||||
func (c *Cache) GetNetworkInfo() *netmap.NetworkInfo {
|
||||
return c.networkInfoCache.Get()
|
||||
}
|
||||
|
||||
func (c *Cache) PutNetworkInfo(info netmap.NetworkInfo) {
|
||||
if err := c.networkInfoCache.Put(info); err != nil {
|
||||
c.logger.Warn(logs.CouldntCacheNetworkInfo, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -64,7 +64,7 @@ func (n *Layer) containerInfo(ctx context.Context, prm PrmContainer) (*data.Buck
|
|||
}
|
||||
}
|
||||
|
||||
zone, _ := n.features.FormContainerZone(reqInfo.Namespace)
|
||||
zone := n.features.FormContainerZone(reqInfo.Namespace)
|
||||
if zone != info.Zone {
|
||||
return nil, fmt.Errorf("ns '%s' and zone '%s' are mismatched for container '%s'", zone, info.Zone, prm.ContainerID)
|
||||
}
|
||||
|
@ -111,7 +111,7 @@ func (n *Layer) createContainer(ctx context.Context, p *CreateBucketParams) (*da
|
|||
p.LocationConstraint = api.DefaultLocationConstraint // s3tests_boto3.functional.test_s3:test_bucket_get_location
|
||||
}
|
||||
|
||||
zone, _ := n.features.FormContainerZone(p.Namespace)
|
||||
zone := n.features.FormContainerZone(p.Namespace)
|
||||
|
||||
bktInfo := &data.BucketInfo{
|
||||
Name: p.Name,
|
||||
|
|
|
@ -52,12 +52,12 @@ func (k *FeatureSettingsMock) SetMD5Enabled(md5Enabled bool) {
|
|||
k.md5Enabled = md5Enabled
|
||||
}
|
||||
|
||||
func (k *FeatureSettingsMock) FormContainerZone(ns string) (zone string, isDefault bool) {
|
||||
func (k *FeatureSettingsMock) FormContainerZone(ns string) string {
|
||||
if ns == "" {
|
||||
return v2container.SysAttributeZoneDefault, true
|
||||
return v2container.SysAttributeZoneDefault
|
||||
}
|
||||
|
||||
return ns + ".ns", false
|
||||
return ns + ".ns"
|
||||
}
|
||||
|
||||
type TestFrostFS struct {
|
||||
|
|
|
@ -35,14 +35,14 @@ import (
|
|||
|
||||
type (
|
||||
BucketResolver interface {
|
||||
Resolve(ctx context.Context, name string) (cid.ID, error)
|
||||
Resolve(ctx context.Context, zone, name string) (cid.ID, error)
|
||||
}
|
||||
|
||||
FeatureSettings interface {
|
||||
ClientCut() bool
|
||||
BufferMaxSizeForPut() uint64
|
||||
MD5Enabled() bool
|
||||
FormContainerZone(ns string) (zone string, isDefault bool)
|
||||
FormContainerZone(ns string) string
|
||||
}
|
||||
|
||||
Layer struct {
|
||||
|
@ -322,13 +322,13 @@ func (n *Layer) GetBucketInfo(ctx context.Context, name string) (*data.BucketInf
|
|||
}
|
||||
|
||||
reqInfo := middleware.GetReqInfo(ctx)
|
||||
zone, _ := n.features.FormContainerZone(reqInfo.Namespace)
|
||||
zone := n.features.FormContainerZone(reqInfo.Namespace)
|
||||
|
||||
if bktInfo := n.cache.GetBucket(zone, name); bktInfo != nil {
|
||||
return bktInfo, nil
|
||||
}
|
||||
|
||||
containerID, err := n.ResolveBucket(ctx, name)
|
||||
containerID, err := n.ResolveBucket(ctx, zone, name)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "not found") {
|
||||
return nil, fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchBucket), err.Error())
|
||||
|
@ -352,13 +352,13 @@ func (n *Layer) ResolveCID(ctx context.Context, name string) (cid.ID, error) {
|
|||
}
|
||||
|
||||
reqInfo := middleware.GetReqInfo(ctx)
|
||||
zone, _ := n.features.FormContainerZone(reqInfo.Namespace)
|
||||
zone := n.features.FormContainerZone(reqInfo.Namespace)
|
||||
|
||||
if bktInfo := n.cache.GetBucket(zone, name); bktInfo != nil {
|
||||
return bktInfo.CID, nil
|
||||
}
|
||||
|
||||
return n.ResolveBucket(ctx, name)
|
||||
return n.ResolveBucket(ctx, zone, name)
|
||||
}
|
||||
|
||||
// ListBuckets returns all user containers. The name of the bucket is a container
|
||||
|
@ -798,10 +798,10 @@ func (n *Layer) CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.
|
|||
return nil, errors.GetAPIError(errors.ErrBucketAlreadyExists)
|
||||
}
|
||||
|
||||
func (n *Layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error) {
|
||||
func (n *Layer) ResolveBucket(ctx context.Context, zone, name string) (cid.ID, error) {
|
||||
var cnrID cid.ID
|
||||
if err := cnrID.DecodeString(name); err != nil {
|
||||
if cnrID, err = n.resolver.Resolve(ctx, name); err != nil {
|
||||
if cnrID, err = n.resolver.Resolve(ctx, zone, name); err != nil {
|
||||
return cid.ID{}, err
|
||||
}
|
||||
|
||||
|
@ -855,10 +855,17 @@ func (n *Layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
|
|||
}
|
||||
|
||||
func (n *Layer) GetNetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
|
||||
cachedInfo := n.cache.GetNetworkInfo()
|
||||
if cachedInfo != nil {
|
||||
return *cachedInfo, nil
|
||||
}
|
||||
|
||||
networkInfo, err := n.frostFS.NetworkInfo(ctx)
|
||||
if err != nil {
|
||||
return networkInfo, fmt.Errorf("get network info: %w", err)
|
||||
return netmap.NetworkInfo{}, fmt.Errorf("get network info: %w", err)
|
||||
}
|
||||
|
||||
n.cache.PutNetworkInfo(networkInfo)
|
||||
|
||||
return networkInfo, nil
|
||||
}
|
||||
|
|
|
@ -150,9 +150,9 @@ func (n *Layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar
|
|||
metaSize += len(p.Data.TagSet)
|
||||
}
|
||||
|
||||
networkInfo, err := n.frostFS.NetworkInfo(ctx)
|
||||
networkInfo, err := n.GetNetworkInfo(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get network info: %w", err)
|
||||
return err
|
||||
}
|
||||
|
||||
info := &data.MultipartInfo{
|
||||
|
@ -290,16 +290,18 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
|
|||
MD5: hex.EncodeToString(createdObj.MD5Sum),
|
||||
}
|
||||
|
||||
oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
|
||||
oldPartIDs, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
|
||||
oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove)
|
||||
if err != nil && !oldPartIDNotFound {
|
||||
return nil, err
|
||||
}
|
||||
if !oldPartIDNotFound {
|
||||
if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil {
|
||||
n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err),
|
||||
zap.String("cid", bktInfo.CID.EncodeToString()),
|
||||
zap.String("oid", oldPartID.EncodeToString()))
|
||||
for _, oldPartID := range oldPartIDs {
|
||||
if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil {
|
||||
n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err),
|
||||
zap.String("cid", bktInfo.CID.EncodeToString()),
|
||||
zap.String("oid", oldPartID.EncodeToString()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -385,16 +387,15 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
|||
|
||||
var multipartObjetSize uint64
|
||||
var encMultipartObjectSize uint64
|
||||
parts := make([]*data.PartInfo, 0, len(p.Parts))
|
||||
parts := make([]*data.PartInfoExtended, 0, len(p.Parts))
|
||||
|
||||
var completedPartsHeader strings.Builder
|
||||
md5Hash := md5.New()
|
||||
for i, part := range p.Parts {
|
||||
partInfo := partsInfo[part.PartNumber]
|
||||
if partInfo == nil || data.UnQuote(part.ETag) != partInfo.GetETag(n.features.MD5Enabled()) {
|
||||
partInfo := partsInfo.Extract(part.PartNumber, data.UnQuote(part.ETag), n.features.MD5Enabled())
|
||||
if partInfo == nil {
|
||||
return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber)
|
||||
}
|
||||
delete(partsInfo, part.PartNumber)
|
||||
|
||||
// for the last part we have no minimum size limit
|
||||
if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize {
|
||||
|
@ -475,14 +476,16 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
|||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(p.Info.Bkt.CID)
|
||||
for _, partInfo := range partsInfo {
|
||||
if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil {
|
||||
n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart,
|
||||
zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID),
|
||||
zap.Error(err))
|
||||
for _, prts := range partsInfo {
|
||||
for _, partInfo := range prts {
|
||||
if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil {
|
||||
n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart,
|
||||
zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID),
|
||||
zap.Error(err))
|
||||
}
|
||||
addr.SetObject(partInfo.OID)
|
||||
n.cache.DeleteObject(addr)
|
||||
}
|
||||
addr.SetObject(partInfo.OID)
|
||||
n.cache.DeleteObject(addr)
|
||||
}
|
||||
|
||||
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
|
||||
|
@ -554,10 +557,12 @@ func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
|
|||
return err
|
||||
}
|
||||
|
||||
for _, info := range parts {
|
||||
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil {
|
||||
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()),
|
||||
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err))
|
||||
for _, infos := range parts {
|
||||
for _, info := range infos {
|
||||
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil {
|
||||
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()),
|
||||
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -581,7 +586,12 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
|
|||
|
||||
parts := make([]*Part, 0, len(partsInfo))
|
||||
|
||||
for _, partInfo := range partsInfo {
|
||||
for _, infos := range partsInfo {
|
||||
sort.Slice(infos, func(i, j int) bool {
|
||||
return infos[i].Timestamp < infos[j].Timestamp
|
||||
})
|
||||
|
||||
partInfo := infos[len(infos)-1]
|
||||
parts = append(parts, &Part{
|
||||
ETag: data.Quote(partInfo.GetETag(n.features.MD5Enabled())),
|
||||
LastModified: partInfo.Created.UTC().Format(time.RFC3339),
|
||||
|
@ -618,7 +628,22 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
|
|||
return &res, nil
|
||||
}
|
||||
|
||||
func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.PartInfo, error) {
|
||||
type PartsInfo map[int][]*data.PartInfoExtended
|
||||
|
||||
func (p PartsInfo) Extract(part int, etag string, md5Enabled bool) *data.PartInfoExtended {
|
||||
parts := p[part]
|
||||
|
||||
for i, info := range parts {
|
||||
if info.GetETag(md5Enabled) == etag {
|
||||
p[part] = append(parts[:i], parts[i+1:]...)
|
||||
return info
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, PartsInfo, error) {
|
||||
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID)
|
||||
if err != nil {
|
||||
if errors.Is(err, ErrNodeNotFound) {
|
||||
|
@ -632,11 +657,11 @@ func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.
|
|||
return nil, nil, err
|
||||
}
|
||||
|
||||
res := make(map[int]*data.PartInfo, len(parts))
|
||||
res := make(map[int][]*data.PartInfoExtended, len(parts))
|
||||
partsNumbers := make([]int, len(parts))
|
||||
oids := make([]string, len(parts))
|
||||
for i, part := range parts {
|
||||
res[part.Number] = part
|
||||
res[part.Number] = append(res[part.Number], part)
|
||||
partsNumbers[i] = part.Number
|
||||
oids[i] = part.OID.EncodeToString()
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"mime"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
@ -21,6 +22,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
apiErrors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/detector"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
|
@ -245,11 +247,11 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
|
|||
if r != nil {
|
||||
if len(p.Header[api.ContentType]) == 0 {
|
||||
if contentType := MimeByFilePath(p.Object); len(contentType) == 0 {
|
||||
d := newDetector(r)
|
||||
d := detector.NewDetector(r, http.DetectContentType)
|
||||
if contentType, err := d.Detect(); err == nil {
|
||||
p.Header[api.ContentType] = contentType
|
||||
}
|
||||
r = d.MultiReader()
|
||||
r = d.RestoredReader()
|
||||
} else {
|
||||
p.Header[api.ContentType] = contentType
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"io"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
|
@ -33,7 +34,7 @@ type TreeServiceMock struct {
|
|||
locks map[string]map[uint64]*data.LockInfo
|
||||
tags map[string]map[uint64]map[string]string
|
||||
multiparts map[string]map[string][]*data.MultipartInfo
|
||||
parts map[string]map[int]*data.PartInfo
|
||||
parts map[string]map[int]*data.PartInfoExtended
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) {
|
||||
|
@ -92,7 +93,7 @@ func NewTreeService() *TreeServiceMock {
|
|||
locks: make(map[string]map[uint64]*data.LockInfo),
|
||||
tags: make(map[string]map[uint64]map[string]string),
|
||||
multiparts: make(map[string]map[string][]*data.MultipartInfo),
|
||||
parts: make(map[string]map[int]*data.PartInfo),
|
||||
parts: make(map[string]map[int]*data.PartInfoExtended),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -346,28 +347,31 @@ func (t *TreeServiceMock) GetMultipartUpload(_ context.Context, bktInfo *data.Bu
|
|||
return nil, ErrNodeNotFound
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) {
|
||||
func (t *TreeServiceMock) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) {
|
||||
multipartInfo, err := t.GetMultipartUpload(ctx, bktInfo, info.Key, info.UploadID)
|
||||
if err != nil {
|
||||
return oid.ID{}, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if multipartInfo.ID != multipartNodeID {
|
||||
return oid.ID{}, fmt.Errorf("invalid multipart info id")
|
||||
return nil, fmt.Errorf("invalid multipart info id")
|
||||
}
|
||||
|
||||
partsMap, ok := t.parts[info.UploadID]
|
||||
if !ok {
|
||||
partsMap = make(map[int]*data.PartInfo)
|
||||
partsMap = make(map[int]*data.PartInfoExtended)
|
||||
}
|
||||
|
||||
partsMap[info.Number] = info
|
||||
partsMap[info.Number] = &data.PartInfoExtended{
|
||||
PartInfo: *info,
|
||||
Timestamp: uint64(time.Now().UnixMicro()),
|
||||
}
|
||||
|
||||
t.parts[info.UploadID] = partsMap
|
||||
return oid.ID{}, nil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) GetParts(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) {
|
||||
func (t *TreeServiceMock) GetParts(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) {
|
||||
cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()]
|
||||
|
||||
var foundMultipart *data.MultipartInfo
|
||||
|
@ -387,7 +391,7 @@ LOOP:
|
|||
}
|
||||
|
||||
partsMap := t.parts[foundMultipart.UploadID]
|
||||
result := make([]*data.PartInfo, 0, len(partsMap))
|
||||
result := make([]*data.PartInfoExtended, 0, len(partsMap))
|
||||
for _, part := range partsMap {
|
||||
result = append(result, part)
|
||||
}
|
||||
|
|
|
@ -57,11 +57,11 @@ type TreeService interface {
|
|||
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)
|
||||
|
||||
// AddPart puts a node to a system tree as a child of appropriate multipart upload
|
||||
// and returns objectID of a previous part which must be deleted in FrostFS.
|
||||
// and returns objectIDs of a previous part/s which must be deleted in FrostFS.
|
||||
//
|
||||
// If object id to remove is not found returns ErrNoNodeToRemove error.
|
||||
AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error)
|
||||
GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error)
|
||||
// If object ids to remove is not found returns ErrNoNodeToRemove error.
|
||||
AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error)
|
||||
GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error)
|
||||
|
||||
PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error)
|
||||
GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error)
|
||||
|
|
|
@ -3,14 +3,18 @@ package middleware
|
|||
import (
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const wildcardPlaceholder = "<wildcard>"
|
||||
const (
|
||||
wildcardPlaceholder = "<wildcard>"
|
||||
|
||||
enabledVHS = "enabled"
|
||||
disabledVHS = "disabled"
|
||||
)
|
||||
|
||||
type VHSSettings interface {
|
||||
Domains() []string
|
||||
|
@ -26,9 +30,9 @@ func PrepareAddressStyle(settings VHSSettings, log *zap.Logger) Func {
|
|||
ctx := r.Context()
|
||||
reqInfo := GetReqInfo(ctx)
|
||||
reqLogger := reqLogOrDefault(ctx, log)
|
||||
headerVHSEnabled := r.Header.Get(settings.VHSHeader())
|
||||
statusVHS := r.Header.Get(settings.VHSHeader())
|
||||
|
||||
if isVHSAddress(headerVHSEnabled, settings.GlobalVHS(), settings.VHSNamespacesEnabled(), reqInfo.Namespace) {
|
||||
if isVHSAddress(statusVHS, settings.GlobalVHS(), settings.VHSNamespacesEnabled(), reqInfo.Namespace) {
|
||||
prepareVHSAddress(reqInfo, r, settings)
|
||||
} else {
|
||||
preparePathStyleAddress(reqInfo, r, reqLogger)
|
||||
|
@ -39,17 +43,20 @@ func PrepareAddressStyle(settings VHSSettings, log *zap.Logger) Func {
|
|||
}
|
||||
}
|
||||
|
||||
func isVHSAddress(headerVHSEnabled string, enabledFlag bool, vhsNamespaces map[string]bool, namespace string) bool {
|
||||
if result, err := strconv.ParseBool(headerVHSEnabled); err == nil {
|
||||
func isVHSAddress(statusVHS string, enabledFlag bool, vhsNamespaces map[string]bool, namespace string) bool {
|
||||
switch statusVHS {
|
||||
case enabledVHS:
|
||||
return true
|
||||
case disabledVHS:
|
||||
return false
|
||||
default:
|
||||
result := enabledFlag
|
||||
if v, ok := vhsNamespaces[namespace]; ok {
|
||||
result = v
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
result := enabledFlag
|
||||
if v, ok := vhsNamespaces[namespace]; ok {
|
||||
result = v
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func prepareVHSAddress(reqInfo *ReqInfo, r *http.Request, settings VHSSettings) {
|
||||
|
|
|
@ -41,12 +41,12 @@ func (v *VHSSettingsMock) VHSNamespacesEnabled() map[string]bool {
|
|||
|
||||
func TestIsVHSAddress(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
headerVHSEnabled string
|
||||
vhsEnabledFlag bool
|
||||
vhsNamespaced map[string]bool
|
||||
namespace string
|
||||
expected bool
|
||||
name string
|
||||
headerStatusVHS string
|
||||
vhsEnabledFlag bool
|
||||
vhsNamespaced map[string]bool
|
||||
namespace string
|
||||
expected bool
|
||||
}{
|
||||
{
|
||||
name: "vhs disabled",
|
||||
|
@ -75,9 +75,9 @@ func TestIsVHSAddress(t *testing.T) {
|
|||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "vhs enabled (header)",
|
||||
headerVHSEnabled: "true",
|
||||
vhsEnabledFlag: false,
|
||||
name: "vhs enabled (header)",
|
||||
headerStatusVHS: enabledVHS,
|
||||
vhsEnabledFlag: false,
|
||||
vhsNamespaced: map[string]bool{
|
||||
"kapusta": false,
|
||||
},
|
||||
|
@ -85,9 +85,9 @@ func TestIsVHSAddress(t *testing.T) {
|
|||
expected: true,
|
||||
},
|
||||
{
|
||||
name: "vhs disabled (header)",
|
||||
headerVHSEnabled: "false",
|
||||
vhsEnabledFlag: true,
|
||||
name: "vhs disabled (header)",
|
||||
headerStatusVHS: disabledVHS,
|
||||
vhsEnabledFlag: true,
|
||||
vhsNamespaced: map[string]bool{
|
||||
"kapusta": true,
|
||||
},
|
||||
|
@ -96,7 +96,7 @@ func TestIsVHSAddress(t *testing.T) {
|
|||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
actual := isVHSAddress(tc.headerVHSEnabled, tc.vhsEnabledFlag, tc.vhsNamespaced, tc.namespace)
|
||||
actual := isVHSAddress(tc.headerStatusVHS, tc.vhsEnabledFlag, tc.vhsNamespaced, tc.namespace)
|
||||
require.Equal(t, tc.expected, actual)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -107,3 +107,9 @@ const (
|
|||
PartNumberQuery = "partNumber"
|
||||
LegalHoldQuery = "legal-hold"
|
||||
)
|
||||
|
||||
const (
|
||||
StdoutPath = "stdout"
|
||||
StderrPath = "stderr"
|
||||
SinkName = "lumberjack"
|
||||
)
|
||||
|
|
237
api/middleware/log_http.go
Normal file
237
api/middleware/log_http.go
Normal file
|
@ -0,0 +1,237 @@
|
|||
//go:build loghttp
|
||||
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/detector"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/xmlutils"
|
||||
"go.uber.org/zap"
|
||||
"go.uber.org/zap/zapcore"
|
||||
"gopkg.in/natefinch/lumberjack.v2"
|
||||
)
|
||||
|
||||
type (
|
||||
LogHTTPSettings interface {
|
||||
LogHTTPConfig() LogHTTPConfig
|
||||
}
|
||||
LogHTTPConfig struct {
|
||||
Enabled bool
|
||||
MaxBody int64
|
||||
MaxLogSize int
|
||||
OutputPath string
|
||||
UseGzip bool
|
||||
log *httpLogger
|
||||
}
|
||||
httpLogger struct {
|
||||
*zap.Logger
|
||||
logRoller *lumberjack.Logger
|
||||
}
|
||||
// responseReadWriter helps read http response body.
|
||||
responseReadWriter struct {
|
||||
http.ResponseWriter
|
||||
response *bytes.Buffer
|
||||
statusCode int
|
||||
}
|
||||
)
|
||||
|
||||
const (
|
||||
payloadLabel = "payload"
|
||||
responseLabel = "response"
|
||||
)
|
||||
|
||||
func (lc *LogHTTPConfig) InitHTTPLogger(log *zap.Logger) {
|
||||
if err := lc.initHTTPLogger(); err != nil {
|
||||
log.Error(logs.FailedToInitializeHTTPLogger, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// initHTTPLogger returns registers zap sink and returns new httpLogger.
|
||||
func (lc *LogHTTPConfig) initHTTPLogger() (err error) {
|
||||
lc.log = &httpLogger{
|
||||
Logger: zap.NewNop(),
|
||||
logRoller: &lumberjack.Logger{},
|
||||
}
|
||||
c := newLoggerConfig()
|
||||
lc.log.Logger, err = c.Build()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lc.setLogOutput()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// newLoggerConfig creates new zap.Config with disabled base fields.
|
||||
func newLoggerConfig() zap.Config {
|
||||
c := zap.NewProductionConfig()
|
||||
c.DisableCaller = true
|
||||
c.DisableStacktrace = true
|
||||
c.EncoderConfig = newEncoderConfig()
|
||||
c.Sampling = nil
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (lc *LogHTTPConfig) setLogOutput() {
|
||||
var output zapcore.WriteSyncer
|
||||
switch lc.OutputPath {
|
||||
case "", StdoutPath:
|
||||
output = zapcore.AddSync(os.Stdout)
|
||||
case StderrPath:
|
||||
output = zapcore.AddSync(os.Stderr)
|
||||
default:
|
||||
output = zapcore.AddSync(&lumberjack.Logger{
|
||||
Filename: lc.OutputPath,
|
||||
MaxSize: lc.MaxLogSize,
|
||||
Compress: lc.UseGzip,
|
||||
})
|
||||
}
|
||||
|
||||
// create logger with new sync
|
||||
lc.log.Logger = lc.log.Logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
|
||||
return zapcore.NewCore(zapcore.NewJSONEncoder(newEncoderConfig()), output, zapcore.InfoLevel)
|
||||
}))
|
||||
}
|
||||
|
||||
func newEncoderConfig() zapcore.EncoderConfig {
|
||||
c := zap.NewProductionEncoderConfig()
|
||||
c.MessageKey = zapcore.OmitKey
|
||||
c.LevelKey = zapcore.OmitKey
|
||||
c.TimeKey = zapcore.OmitKey
|
||||
c.FunctionKey = zapcore.OmitKey
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (ww *responseReadWriter) Write(data []byte) (int, error) {
|
||||
ww.response.Write(data)
|
||||
return ww.ResponseWriter.Write(data)
|
||||
}
|
||||
|
||||
func (ww *responseReadWriter) WriteHeader(code int) {
|
||||
ww.statusCode = code
|
||||
ww.ResponseWriter.WriteHeader(code)
|
||||
}
|
||||
|
||||
func (ww *responseReadWriter) Flush() {
|
||||
if f, ok := ww.ResponseWriter.(http.Flusher); ok {
|
||||
f.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
// LogHTTP logs http parameters from s3 request.
|
||||
func LogHTTP(l *zap.Logger, settings LogHTTPSettings) Func {
|
||||
return func(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
config := settings.LogHTTPConfig()
|
||||
if !config.Enabled || config.log == nil {
|
||||
h.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
httplog := config.log.getHTTPLogger(r).
|
||||
withFieldIfExist("query", r.URL.Query()).
|
||||
withFieldIfExist("headers", r.Header)
|
||||
|
||||
payload := getBody(r.Body, l)
|
||||
r.Body = io.NopCloser(bytes.NewReader(payload))
|
||||
|
||||
payloadReader := io.LimitReader(bytes.NewReader(payload), config.MaxBody)
|
||||
httplog = httplog.withProcessedBody(payloadLabel, payloadReader, l)
|
||||
|
||||
wr := newResponseReadWriter(w)
|
||||
h.ServeHTTP(wr, r)
|
||||
|
||||
respReader := io.LimitReader(wr.response, config.MaxBody)
|
||||
httplog = httplog.withProcessedBody(responseLabel, respReader, l)
|
||||
httplog = httplog.with(zap.Int("status", wr.statusCode))
|
||||
|
||||
httplog.Info(logs.LogHTTP)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// withFieldIfExist checks whether data is not empty and attach it to log output.
|
||||
func (lg *httpLogger) withFieldIfExist(label string, data map[string][]string) *httpLogger {
|
||||
if len(data) != 0 {
|
||||
return lg.with(zap.Any(label, data))
|
||||
}
|
||||
return lg
|
||||
}
|
||||
|
||||
func (lg *httpLogger) with(fields ...zap.Field) *httpLogger {
|
||||
return &httpLogger{
|
||||
Logger: lg.Logger.With(fields...),
|
||||
logRoller: lg.logRoller,
|
||||
}
|
||||
}
|
||||
|
||||
func (lg *httpLogger) getHTTPLogger(r *http.Request) *httpLogger {
|
||||
return lg.with(
|
||||
zap.String("from", r.RemoteAddr),
|
||||
zap.String("URI", r.RequestURI),
|
||||
zap.String("method", r.Method),
|
||||
zap.String("protocol", r.Proto),
|
||||
)
|
||||
}
|
||||
|
||||
func (lg *httpLogger) withProcessedBody(label string, bodyReader io.Reader, l *zap.Logger) *httpLogger {
|
||||
resp, err := processBody(bodyReader)
|
||||
if err != nil {
|
||||
l.Error(logs.FailedToProcessHTTPBody,
|
||||
zap.Error(err),
|
||||
zap.String("body type", payloadLabel))
|
||||
return lg
|
||||
}
|
||||
|
||||
return lg.with(zap.ByteString(label, resp))
|
||||
}
|
||||
|
||||
func newResponseReadWriter(w http.ResponseWriter) *responseReadWriter {
|
||||
return &responseReadWriter{
|
||||
ResponseWriter: w,
|
||||
response: &bytes.Buffer{},
|
||||
}
|
||||
}
|
||||
|
||||
func getBody(httpBody io.ReadCloser, l *zap.Logger) []byte {
|
||||
defer func(httpBody io.ReadCloser) {
|
||||
if err := httpBody.Close(); err != nil {
|
||||
l.Error(logs.FailedToCloseHTTPBody, zap.Error(err))
|
||||
}
|
||||
}(httpBody)
|
||||
|
||||
body, err := io.ReadAll(httpBody)
|
||||
if err != nil {
|
||||
l.Error(logs.FailedToReadHTTPBody,
|
||||
zap.Error(err),
|
||||
zap.String("body type", payloadLabel))
|
||||
return nil
|
||||
}
|
||||
return body
|
||||
}
|
||||
|
||||
// processBody reads body and base64 encode it if it's not XML.
|
||||
func processBody(bodyReader io.Reader) ([]byte, error) {
|
||||
resultBody := &bytes.Buffer{}
|
||||
detect := detector.NewDetector(bodyReader, xmlutils.DetectXML)
|
||||
dataType, err := detect.Detect()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
writer := xmlutils.ChooseWriter(dataType, resultBody)
|
||||
if _, err = io.Copy(writer, detect.RestoredReader()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = writer.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resultBody.Bytes(), nil
|
||||
}
|
36
api/middleware/log_http_stub.go
Normal file
36
api/middleware/log_http_stub.go
Normal file
|
@ -0,0 +1,36 @@
|
|||
//go:build !loghttp
|
||||
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type (
|
||||
LogHTTPSettings interface {
|
||||
LogHTTPConfig() LogHTTPConfig
|
||||
}
|
||||
LogHTTPConfig struct {
|
||||
Enabled bool
|
||||
MaxBody int64
|
||||
MaxLogSize int
|
||||
OutputPath string
|
||||
UseGzip bool
|
||||
}
|
||||
)
|
||||
|
||||
func LogHTTP(l *zap.Logger, _ LogHTTPSettings) Func {
|
||||
l.Warn(logs.LogHTTPDisabledInThisBuild)
|
||||
return func(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
h.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (*LogHTTPConfig) InitHTTPLogger(*zap.Logger) {
|
||||
// ignore
|
||||
}
|
|
@ -6,7 +6,7 @@ import (
|
|||
"fmt"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||
v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ns"
|
||||
|
@ -29,20 +29,14 @@ type FrostFS interface {
|
|||
SystemDNS(context.Context) (string, error)
|
||||
}
|
||||
|
||||
type Settings interface {
|
||||
FormContainerZone(ns string) (zone string, isDefault bool)
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
FrostFS FrostFS
|
||||
RPCAddress string
|
||||
Settings Settings
|
||||
}
|
||||
|
||||
type BucketResolver struct {
|
||||
rpcAddress string
|
||||
frostfs FrostFS
|
||||
settings Settings
|
||||
|
||||
mu sync.RWMutex
|
||||
resolvers []*Resolver
|
||||
|
@ -50,15 +44,15 @@ type BucketResolver struct {
|
|||
|
||||
type Resolver struct {
|
||||
Name string
|
||||
resolve func(context.Context, string) (cid.ID, error)
|
||||
resolve func(context.Context, string, string) (cid.ID, error)
|
||||
}
|
||||
|
||||
func (r *Resolver) SetResolveFunc(fn func(context.Context, string) (cid.ID, error)) {
|
||||
func (r *Resolver) SetResolveFunc(fn func(context.Context, string, string) (cid.ID, error)) {
|
||||
r.resolve = fn
|
||||
}
|
||||
|
||||
func (r *Resolver) Resolve(ctx context.Context, name string) (cid.ID, error) {
|
||||
return r.resolve(ctx, name)
|
||||
func (r *Resolver) Resolve(ctx context.Context, zone, name string) (cid.ID, error) {
|
||||
return r.resolve(ctx, zone, name)
|
||||
}
|
||||
|
||||
func NewBucketResolver(resolverNames []string, cfg *Config) (*BucketResolver, error) {
|
||||
|
@ -87,12 +81,12 @@ func createResolvers(resolverNames []string, cfg *Config) ([]*Resolver, error) {
|
|||
return resolvers, nil
|
||||
}
|
||||
|
||||
func (r *BucketResolver) Resolve(ctx context.Context, bktName string) (cnrID cid.ID, err error) {
|
||||
func (r *BucketResolver) Resolve(ctx context.Context, zone, bktName string) (cnrID cid.ID, err error) {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
for _, resolver := range r.resolvers {
|
||||
cnrID, resolverErr := resolver.Resolve(ctx, bktName)
|
||||
cnrID, resolverErr := resolver.Resolve(ctx, zone, bktName)
|
||||
if resolverErr != nil {
|
||||
resolverErr = fmt.Errorf("%s: %w", resolver.Name, resolverErr)
|
||||
if err == nil {
|
||||
|
@ -123,7 +117,6 @@ func (r *BucketResolver) UpdateResolvers(resolverNames []string) error {
|
|||
cfg := &Config{
|
||||
FrostFS: r.frostfs,
|
||||
RPCAddress: r.rpcAddress,
|
||||
Settings: r.settings,
|
||||
}
|
||||
|
||||
resolvers, err := createResolvers(resolverNames, cfg)
|
||||
|
@ -152,30 +145,25 @@ func (r *BucketResolver) equals(resolverNames []string) bool {
|
|||
func newResolver(name string, cfg *Config) (*Resolver, error) {
|
||||
switch name {
|
||||
case DNSResolver:
|
||||
return NewDNSResolver(cfg.FrostFS, cfg.Settings)
|
||||
return NewDNSResolver(cfg.FrostFS)
|
||||
case NNSResolver:
|
||||
return NewNNSResolver(cfg.RPCAddress, cfg.Settings)
|
||||
return NewNNSResolver(cfg.RPCAddress)
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown resolver: %s", name)
|
||||
}
|
||||
}
|
||||
|
||||
func NewDNSResolver(frostFS FrostFS, settings Settings) (*Resolver, error) {
|
||||
func NewDNSResolver(frostFS FrostFS) (*Resolver, error) {
|
||||
if frostFS == nil {
|
||||
return nil, fmt.Errorf("pool must not be nil for DNS resolver")
|
||||
}
|
||||
if settings == nil {
|
||||
return nil, fmt.Errorf("resolver settings must not be nil for DNS resolver")
|
||||
}
|
||||
|
||||
var dns ns.DNS
|
||||
|
||||
resolveFunc := func(ctx context.Context, name string) (cid.ID, error) {
|
||||
resolveFunc := func(ctx context.Context, zone, name string) (cid.ID, error) {
|
||||
var err error
|
||||
reqInfo := middleware.GetReqInfo(ctx)
|
||||
|
||||
zone, isDefault := settings.FormContainerZone(reqInfo.Namespace)
|
||||
if isDefault {
|
||||
if zone == v2container.SysAttributeZoneDefault {
|
||||
zone, err = frostFS.SystemDNS(ctx)
|
||||
if err != nil {
|
||||
return cid.ID{}, fmt.Errorf("read system DNS parameter of the FrostFS: %w", err)
|
||||
|
@ -196,13 +184,10 @@ func NewDNSResolver(frostFS FrostFS, settings Settings) (*Resolver, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func NewNNSResolver(address string, settings Settings) (*Resolver, error) {
|
||||
func NewNNSResolver(address string) (*Resolver, error) {
|
||||
if address == "" {
|
||||
return nil, fmt.Errorf("rpc address must not be empty for NNS resolver")
|
||||
}
|
||||
if settings == nil {
|
||||
return nil, fmt.Errorf("resolver settings must not be nil for NNS resolver")
|
||||
}
|
||||
|
||||
var nns ns.NNS
|
||||
|
||||
|
@ -210,12 +195,9 @@ func NewNNSResolver(address string, settings Settings) (*Resolver, error) {
|
|||
return nil, fmt.Errorf("dial %s: %w", address, err)
|
||||
}
|
||||
|
||||
resolveFunc := func(ctx context.Context, name string) (cid.ID, error) {
|
||||
resolveFunc := func(_ context.Context, zone, name string) (cid.ID, error) {
|
||||
var d container.Domain
|
||||
d.SetName(name)
|
||||
|
||||
reqInfo := middleware.GetReqInfo(ctx)
|
||||
zone, _ := settings.FormContainerZone(reqInfo.Namespace)
|
||||
d.SetZone(zone)
|
||||
|
||||
cnrID, err := nns.ResolveContainerDomain(d)
|
||||
|
|
|
@ -99,6 +99,7 @@ type Settings interface {
|
|||
s3middleware.PolicySettings
|
||||
s3middleware.MetricsSettings
|
||||
s3middleware.VHSSettings
|
||||
s3middleware.LogHTTPSettings
|
||||
}
|
||||
|
||||
type FrostFSID interface {
|
||||
|
@ -127,7 +128,9 @@ type Config struct {
|
|||
|
||||
func NewRouter(cfg Config) *chi.Mux {
|
||||
api := chi.NewRouter()
|
||||
|
||||
api.Use(
|
||||
s3middleware.LogHTTP(cfg.Log, cfg.MiddlewareSettings),
|
||||
s3middleware.Request(cfg.Log, cfg.MiddlewareSettings),
|
||||
middleware.ThrottleWithOpts(cfg.Throttle),
|
||||
middleware.Recoverer,
|
||||
|
|
|
@ -80,6 +80,7 @@ type middlewareSettingsMock struct {
|
|||
domains []string
|
||||
vhsEnabled bool
|
||||
vhsNamespacesEnabled map[string]bool
|
||||
logHTTP middleware.LogHTTPConfig
|
||||
}
|
||||
|
||||
func (r *middlewareSettingsMock) SourceIPHeader() string {
|
||||
|
@ -117,6 +118,9 @@ func (r *middlewareSettingsMock) ServernameHeader() string {
|
|||
func (r *middlewareSettingsMock) VHSNamespacesEnabled() map[string]bool {
|
||||
return r.vhsNamespacesEnabled
|
||||
}
|
||||
func (r *middlewareSettingsMock) LogHTTPConfig() middleware.LogHTTPConfig {
|
||||
return r.logHTTP
|
||||
}
|
||||
|
||||
type frostFSIDMock struct {
|
||||
tags map[string]string
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"os"
|
||||
"os/signal"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
@ -86,6 +87,7 @@ type (
|
|||
|
||||
appSettings struct {
|
||||
logLevel zap.AtomicLevel
|
||||
httpLogging s3middleware.LogHTTPConfig
|
||||
maxClient maxClientsConfig
|
||||
defaultMaxAge int
|
||||
reconnectInterval time.Duration
|
||||
|
@ -216,6 +218,7 @@ func (a *App) initLayer(ctx context.Context) {
|
|||
func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
|
||||
settings := &appSettings{
|
||||
logLevel: log.lvl,
|
||||
httpLogging: s3middleware.LogHTTPConfig{},
|
||||
maxClient: newMaxClients(v),
|
||||
defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
|
||||
reconnectInterval: fetchReconnectInterval(v),
|
||||
|
@ -250,10 +253,20 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
|
|||
vhsEnabled := v.GetBool(cfgVHSEnabled)
|
||||
vhsHeader := v.GetString(cfgVHSHeader)
|
||||
servernameHeader := v.GetString(cfgServernameHeader)
|
||||
vhsNamespacesEnabled := s.prepareVHSNamespaces(v, log)
|
||||
httpLoggingEnabled := v.GetBool(cfgHTTPLoggingEnabled)
|
||||
httpLoggingMaxBody := v.GetInt64(cfgHTTPLoggingMaxBody)
|
||||
httpLoggingMaxLogSize := v.GetInt(cfgHTTPLoggingMaxLogSize)
|
||||
httpLoggingOutputPath := v.GetString(cfgHTTPLoggingDestination)
|
||||
httpLoggingUseGzip := v.GetBool(cfgHTTPLoggingGzip)
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.httpLogging.Enabled = httpLoggingEnabled
|
||||
s.httpLogging.MaxBody = httpLoggingMaxBody
|
||||
s.httpLogging.MaxLogSize = httpLoggingMaxLogSize
|
||||
s.httpLogging.OutputPath = httpLoggingOutputPath
|
||||
s.httpLogging.UseGzip = httpLoggingUseGzip
|
||||
s.httpLogging.InitHTTPLogger(log)
|
||||
|
||||
s.namespaceHeader = namespaceHeader
|
||||
s.defaultNamespaces = defaultNamespaces
|
||||
|
@ -272,17 +285,22 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
|
|||
s.vhsEnabled = vhsEnabled
|
||||
s.vhsHeader = vhsHeader
|
||||
s.servernameHeader = servernameHeader
|
||||
s.vhsNamespacesEnabled = vhsNamespacesEnabled
|
||||
|
||||
s.mu.Unlock()
|
||||
|
||||
s.prepareVHSNamespaces(v, log)
|
||||
}
|
||||
|
||||
func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger) map[string]bool {
|
||||
func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger) {
|
||||
nsMap := fetchVHSNamespaces(v, log)
|
||||
vhsNamespaces := make(map[string]bool, len(nsMap))
|
||||
for ns, flag := range nsMap {
|
||||
vhsNamespaces[s.ResolveNamespaceAlias(ns)] = flag
|
||||
}
|
||||
|
||||
return vhsNamespaces
|
||||
s.mu.Lock()
|
||||
s.vhsNamespacesEnabled = vhsNamespaces
|
||||
s.mu.Unlock()
|
||||
}
|
||||
|
||||
func (s *appSettings) Domains() []string {
|
||||
|
@ -361,6 +379,13 @@ func (s *appSettings) DefaultCopiesNumbers(namespace string) []uint32 {
|
|||
return s.namespaces[namespace].CopiesNumbers[defaultConstraintName]
|
||||
}
|
||||
|
||||
func (s *appSettings) LogHTTPConfig() s3middleware.LogHTTPConfig {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
return s.httpLogging
|
||||
}
|
||||
|
||||
func (s *appSettings) NewXMLDecoder(r io.Reader) *xml.Decoder {
|
||||
dec := xml.NewDecoder(r)
|
||||
dec.CharsetReader = func(charset string, reader io.Reader) (io.Reader, error) {
|
||||
|
@ -404,12 +429,12 @@ func (s *appSettings) NamespaceHeader() string {
|
|||
return s.namespaceHeader
|
||||
}
|
||||
|
||||
func (s *appSettings) FormContainerZone(ns string) (zone string, isDefault bool) {
|
||||
func (s *appSettings) FormContainerZone(ns string) string {
|
||||
if len(ns) == 0 {
|
||||
return v2container.SysAttributeZoneDefault, true
|
||||
return v2container.SysAttributeZoneDefault
|
||||
}
|
||||
|
||||
return ns + ".ns", false
|
||||
return ns + ".ns"
|
||||
}
|
||||
|
||||
func (s *appSettings) isDefaultNamespace(ns string) bool {
|
||||
|
@ -525,7 +550,6 @@ func (a *App) getResolverConfig() *resolver.Config {
|
|||
return &resolver.Config{
|
||||
FrostFS: frostfs.NewResolverFrostFS(a.pool),
|
||||
RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
|
||||
Settings: a.settings,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -920,6 +944,8 @@ func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
|
|||
cacheCfg.AccessControl.Lifetime = fetchCacheLifetime(v, l, cfgAccessControlCacheLifetime, cacheCfg.AccessControl.Lifetime)
|
||||
cacheCfg.AccessControl.Size = fetchCacheSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size)
|
||||
|
||||
cacheCfg.NetworkInfo.Lifetime = fetchCacheLifetime(v, l, cfgNetworkInfoCacheLifetime, cacheCfg.NetworkInfo.Lifetime)
|
||||
|
||||
return cacheCfg
|
||||
}
|
||||
|
||||
|
@ -1062,8 +1088,13 @@ func (a *App) fetchContainerInfo(ctx context.Context, cfgKey string) (info *data
|
|||
|
||||
var id cid.ID
|
||||
if err = id.DecodeString(containerString); err != nil {
|
||||
if id, err = a.bucketResolver.Resolve(ctx, containerString); err != nil {
|
||||
return nil, fmt.Errorf("resolve container name %s: %w", containerString, err)
|
||||
i := strings.Index(containerString, ".")
|
||||
if i < 0 {
|
||||
return nil, fmt.Errorf("invalid container address: %s", containerString)
|
||||
}
|
||||
|
||||
if id, err = a.bucketResolver.Resolve(ctx, containerString[i+1:], containerString[:i]); err != nil {
|
||||
return nil, fmt.Errorf("resolve container address %s: %w", containerString, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -81,6 +81,14 @@ const ( // Settings.
|
|||
cfgLoggerLevel = "logger.level"
|
||||
cfgLoggerDestination = "logger.destination"
|
||||
|
||||
// HttpLogging.
|
||||
cfgHTTPLoggingEnabled = "http_logging.enabled"
|
||||
cfgHTTPLoggingMaxBody = "http_logging.max_body"
|
||||
cfgHTTPLoggingMaxLogSize = "http_logging.max_log_size"
|
||||
cfgHTTPLoggingDestination = "http_logging.destination"
|
||||
cfgHTTPLoggingGzip = "http_logging.gzip"
|
||||
cfgHTTPLoggingLogResponse = "http_logging.log_response"
|
||||
|
||||
// Wallet.
|
||||
cfgWalletPath = "wallet.path"
|
||||
cfgWalletAddress = "wallet.address"
|
||||
|
@ -122,6 +130,7 @@ const ( // Settings.
|
|||
cfgMorphPolicyCacheSize = "cache.morph_policy.size"
|
||||
cfgFrostfsIDCacheLifetime = "cache.frostfsid.lifetime"
|
||||
cfgFrostfsIDCacheSize = "cache.frostfsid.size"
|
||||
cfgNetworkInfoCacheLifetime = "cache.network_info.lifetime"
|
||||
|
||||
cfgAccessBoxCacheRemovingCheckInterval = "cache.accessbox.removing_check_interval"
|
||||
|
||||
|
@ -779,6 +788,14 @@ func newSettings() *viper.Viper {
|
|||
v.SetDefault(cfgLoggerLevel, "debug")
|
||||
v.SetDefault(cfgLoggerDestination, "stdout")
|
||||
|
||||
// http logger
|
||||
v.SetDefault(cfgHTTPLoggingEnabled, false)
|
||||
v.SetDefault(cfgHTTPLoggingMaxBody, 1024)
|
||||
v.SetDefault(cfgHTTPLoggingMaxLogSize, 50)
|
||||
v.SetDefault(cfgHTTPLoggingDestination, "stdout")
|
||||
v.SetDefault(cfgHTTPLoggingGzip, false)
|
||||
v.SetDefault(cfgHTTPLoggingLogResponse, true)
|
||||
|
||||
// pool:
|
||||
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
|
||||
v.SetDefault(cfgStreamTimeout, defaultStreamTimeout)
|
||||
|
|
50
cmd/s3-playback/internal/playback/multipart.go
Normal file
50
cmd/s3-playback/internal/playback/multipart.go
Normal file
|
@ -0,0 +1,50 @@
|
|||
package playback
|
||||
|
||||
import (
|
||||
"encoding/xml"
|
||||
"fmt"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type MultipartUpload struct {
|
||||
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ InitiateMultipartUploadResult" json:"-"`
|
||||
Bucket string `json:"bucket" xml:"Bucket"`
|
||||
Key string `json:"key" xml:"Key"`
|
||||
UploadID string `json:"uploadId" xml:"UploadId"`
|
||||
}
|
||||
|
||||
func HandleResponse(r *http.Request, mparts map[string]MultipartUpload, resp []byte, logResponse []byte) error {
|
||||
var mpart, mpartOld MultipartUpload
|
||||
if r.Method != "POST" || !r.URL.Query().Has("uploads") {
|
||||
return nil
|
||||
}
|
||||
// get new uploadId from response
|
||||
err := xml.Unmarshal(resp, &mpart)
|
||||
if err != nil {
|
||||
return fmt.Errorf("xml unmarshal error: %w", err)
|
||||
}
|
||||
// get old uploadId from logs
|
||||
err = xml.Unmarshal(logResponse, &mpartOld)
|
||||
if err != nil {
|
||||
return fmt.Errorf("xml unmarshal error: %w", err)
|
||||
}
|
||||
if mpartOld.UploadID != "" {
|
||||
mparts[mpartOld.UploadID] = mpart
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func SwapUploadID(r *http.Request, settings *Settings) error {
|
||||
var uploadID string
|
||||
query := r.URL.Query()
|
||||
uploadID = query.Get("uploadId")
|
||||
mpart, ok := settings.Multiparts[uploadID]
|
||||
if !ok {
|
||||
return fmt.Errorf("no multipart upload with specified uploadId")
|
||||
}
|
||||
query.Set("uploadId", mpart.UploadID)
|
||||
r.URL.RawQuery = query.Encode()
|
||||
|
||||
return nil
|
||||
}
|
97
cmd/s3-playback/internal/playback/request.go
Normal file
97
cmd/s3-playback/internal/playback/request.go
Normal file
|
@ -0,0 +1,97 @@
|
|||
package playback
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/detector"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/xmlutils"
|
||||
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
)
|
||||
|
||||
type (
|
||||
httpBody []byte
|
||||
LoggedRequest struct {
|
||||
From string `json:"from"`
|
||||
URI string `json:"URI"`
|
||||
Method string `json:"method"`
|
||||
Payload httpBody `json:"payload,omitempty"`
|
||||
Response httpBody `json:"response,omitempty"`
|
||||
Query url.Values `json:"query"`
|
||||
Header http.Header `json:"headers"`
|
||||
}
|
||||
Credentials struct {
|
||||
AccessKey string
|
||||
SecretKey string
|
||||
}
|
||||
Settings struct {
|
||||
Endpoint string
|
||||
Creds Credentials
|
||||
Multiparts map[string]MultipartUpload
|
||||
Client *http.Client
|
||||
}
|
||||
)
|
||||
|
||||
func (h *httpBody) UnmarshalJSON(data []byte) error {
|
||||
unquoted, err := strconv.Unquote(string(data))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unquote data: %w", err)
|
||||
}
|
||||
detect := detector.NewDetector(strings.NewReader(unquoted), xmlutils.DetectXML)
|
||||
dataType, err := detect.Detect()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to detect data: %w", err)
|
||||
}
|
||||
reader := xmlutils.ChooseReader(dataType, detect.RestoredReader())
|
||||
*h, err = io.ReadAll(reader)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to unmarshal httpbody: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Sign replace Authorization header with new Access key id and Signature values.
|
||||
func Sign(ctx context.Context, r *http.Request, creds Credentials) error {
|
||||
credProvider := credentials.NewStaticCredentialsProvider(creds.AccessKey, creds.SecretKey, "")
|
||||
awsCred, err := credProvider.Retrieve(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
authHdr := r.Header.Get(auth.AuthorizationHdr)
|
||||
authInfo, err := parseAuthHeader(authHdr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
newHeader := strings.Replace(authHdr, authInfo["access_key_id"], creds.AccessKey, 1)
|
||||
r.Header.Set(auth.AuthorizationHdr, newHeader)
|
||||
|
||||
signer := v4.NewSigner()
|
||||
signatureDateTimeStr := r.Header.Get(api.AmzDate)
|
||||
signatureDateTime, err := time.Parse("20060102T150405Z", signatureDateTimeStr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return signer.SignHTTP(ctx, awsCred, r, r.Header.Get(api.AmzContentSha256), authInfo["service"], authInfo["region"], signatureDateTime)
|
||||
}
|
||||
|
||||
func parseAuthHeader(authHeader string) (map[string]string, error) {
|
||||
authInfo := auth.NewRegexpMatcher(auth.AuthorizationFieldRegexp).GetSubmatches(authHeader)
|
||||
if len(authInfo) == 0 {
|
||||
return nil, errors.New("no matches found")
|
||||
}
|
||||
|
||||
return authInfo, nil
|
||||
}
|
98
cmd/s3-playback/internal/playback/request_test.go
Normal file
98
cmd/s3-playback/internal/playback/request_test.go
Normal file
|
@ -0,0 +1,98 @@
|
|||
package playback
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var errNoMatches = errors.New("no matches found")
|
||||
|
||||
func withoutValue(data map[string]string, field string) map[string]string {
|
||||
result := make(map[string]string)
|
||||
for k, v := range data {
|
||||
result[k] = v
|
||||
}
|
||||
result[field] = ""
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func TestParseAuthHeader(t *testing.T) {
|
||||
defaultHeader := "AWS4-HMAC-SHA256 Credential=oid0cid/20210809/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=2811ccb9e242f41426738fb1f"
|
||||
|
||||
defaultAuthInfo := map[string]string{
|
||||
"access_key_id": "oid0cid",
|
||||
"service": "s3",
|
||||
"region": "us-east-1",
|
||||
"v4_signature": "2811ccb9e242f41426738fb1f",
|
||||
"signed_header_fields": "host;x-amz-content-sha256;x-amz-date",
|
||||
"date": "20210809",
|
||||
}
|
||||
for _, tc := range []struct {
|
||||
title string
|
||||
header string
|
||||
err error
|
||||
expected map[string]string
|
||||
}{
|
||||
{
|
||||
title: "correct full header",
|
||||
header: defaultHeader,
|
||||
err: nil,
|
||||
expected: defaultAuthInfo,
|
||||
},
|
||||
{
|
||||
title: "correct with empty region",
|
||||
header: strings.Replace(defaultHeader, "/us-east-1/", "//", 1),
|
||||
err: nil,
|
||||
expected: withoutValue(defaultAuthInfo, "region"),
|
||||
},
|
||||
{
|
||||
title: "empty access key",
|
||||
header: strings.Replace(defaultHeader, "oid0cid", "", 1),
|
||||
err: errNoMatches,
|
||||
expected: nil,
|
||||
},
|
||||
{
|
||||
title: "empty service",
|
||||
header: strings.Replace(defaultHeader, "/s3/", "//", 1),
|
||||
err: errNoMatches,
|
||||
expected: nil,
|
||||
},
|
||||
{
|
||||
title: "empty date",
|
||||
header: strings.Replace(defaultHeader, "/20210809/", "//", 1),
|
||||
err: errNoMatches,
|
||||
expected: nil,
|
||||
},
|
||||
{
|
||||
title: "empty v4_signature",
|
||||
header: strings.Replace(defaultHeader, "Signature=2811ccb9e242f41426738fb1f",
|
||||
"Signature=", 1),
|
||||
err: errNoMatches,
|
||||
expected: nil,
|
||||
},
|
||||
{
|
||||
title: "empty signed_fields",
|
||||
header: strings.Replace(defaultHeader, "SignedHeaders=host;x-amz-content-sha256;x-amz-date",
|
||||
"SignedHeaders=", 1),
|
||||
err: errNoMatches,
|
||||
expected: nil,
|
||||
},
|
||||
{
|
||||
title: "empty signed_fields",
|
||||
header: strings.Replace(defaultHeader, "SignedHeaders=host;x-amz-content-sha256;x-amz-date",
|
||||
"SignedHeaders=", 1),
|
||||
err: errNoMatches,
|
||||
expected: nil,
|
||||
},
|
||||
} {
|
||||
t.Run(tc.title, func(t *testing.T) {
|
||||
authInfo, err := parseAuthHeader(tc.header)
|
||||
require.Equal(t, err, tc.err, tc.header)
|
||||
require.Equal(t, tc.expected, authInfo, tc.header)
|
||||
})
|
||||
}
|
||||
}
|
20
cmd/s3-playback/main.go
Normal file
20
cmd/s3-playback/main.go
Normal file
|
@ -0,0 +1,20 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"os/signal"
|
||||
"syscall"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/cmd/s3-playback/modules"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ctx, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
|
||||
|
||||
if cmd, err := modules.Execute(ctx); err != nil {
|
||||
cmd.PrintErrln("Error:", err.Error())
|
||||
cmd.PrintErrf("Run '%v --help' for usage.\n", cmd.CommandPath())
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
67
cmd/s3-playback/modules/root.go
Normal file
67
cmd/s3-playback/modules/root.go
Normal file
|
@ -0,0 +1,67 @@
|
|||
package modules
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultPrintResponseLimit = 1024
|
||||
cfgConfigPath = "config"
|
||||
cfgHTTPTimeoutFlag = "http-timeout"
|
||||
cfgSkipVerifyTLS = "skip-verify-tls"
|
||||
)
|
||||
|
||||
var (
|
||||
cfgFile string
|
||||
rootCmd = &cobra.Command{
|
||||
Use: "frostfs-s3-playback",
|
||||
Version: version.Version,
|
||||
Short: "FrostFS S3 Traffic Playback",
|
||||
Long: "Helps to reproduce s3 commands from log files",
|
||||
Example: "frostfs-s3-playback [--skip-verify-tls] [--http-timeout <timeout>] " +
|
||||
"[--version] --config <config_path> <command>",
|
||||
SilenceUsage: true,
|
||||
SilenceErrors: true,
|
||||
PersistentPreRunE: func(cmd *cobra.Command, _ []string) error {
|
||||
return viper.BindPFlags(cmd.Flags())
|
||||
},
|
||||
RunE: func(cmd *cobra.Command, _ []string) error {
|
||||
return cmd.Help()
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func Execute(ctx context.Context) (*cobra.Command, error) {
|
||||
return rootCmd.ExecuteContextC(ctx)
|
||||
}
|
||||
|
||||
func initConfig() {
|
||||
viper.SetConfigFile(cfgFile)
|
||||
_ = viper.ReadInConfig()
|
||||
}
|
||||
|
||||
func init() {
|
||||
cobra.OnInitialize(initConfig)
|
||||
cobra.EnableTraverseRunHooks = true
|
||||
rootCmd.SetGlobalNormalizationFunc(func(_ *pflag.FlagSet, name string) pflag.NormalizedName {
|
||||
return pflag.NormalizedName(strings.ReplaceAll(name, "_", "-"))
|
||||
})
|
||||
|
||||
rootCmd.PersistentFlags().StringVar(&cfgFile, cfgConfigPath, "", "configuration filepath")
|
||||
_ = rootCmd.MarkPersistentFlagRequired(cfgConfigPath)
|
||||
_ = rootCmd.MarkPersistentFlagFilename(cfgConfigPath)
|
||||
rootCmd.PersistentFlags().Duration(cfgHTTPTimeoutFlag, time.Minute, "http request timeout")
|
||||
rootCmd.PersistentFlags().Bool(cfgSkipVerifyTLS, false, "skip TLS certificate verification")
|
||||
rootCmd.SetOut(os.Stdout)
|
||||
|
||||
initRunCmd()
|
||||
rootCmd.AddCommand(runCmd)
|
||||
}
|
208
cmd/s3-playback/modules/run.go
Normal file
208
cmd/s3-playback/modules/run.go
Normal file
|
@ -0,0 +1,208 @@
|
|||
package modules
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"crypto/sha256"
|
||||
"crypto/tls"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/cmd/s3-playback/internal/playback"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/detector"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/xmlutils"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
const (
|
||||
cfgPrintResponseLimit = "print-response-limit"
|
||||
cfgLogPath = "log"
|
||||
cfgEndpoint = "endpoint"
|
||||
awsAccessKey = "credentials.access_key"
|
||||
awsSecretKey = "credentials.secret_key"
|
||||
)
|
||||
|
||||
var runCmd = &cobra.Command{
|
||||
Use: "run",
|
||||
Short: "Send requests from log file",
|
||||
Long: "Reads the network log file and sends each request to the specified URL",
|
||||
Example: "frostfs-s3-playback --config <config_path> run [--endpoint=<endpoint>] [--log=<log_path>]",
|
||||
PersistentPreRunE: func(cmd *cobra.Command, _ []string) (err error) {
|
||||
viper.SetDefault(cfgPrintResponseLimit, defaultPrintResponseLimit)
|
||||
return viper.BindPFlags(cmd.Flags())
|
||||
},
|
||||
RunE: run,
|
||||
}
|
||||
|
||||
func initRunCmd() {
|
||||
runCmd.Flags().String(cfgLogPath, "./request.log", "log file path")
|
||||
runCmd.Flags().String(cfgEndpoint, "", "endpoint URL")
|
||||
runCmd.Flags().Int(cfgPrintResponseLimit, defaultPrintResponseLimit, "print limit for http response body")
|
||||
}
|
||||
|
||||
func logResponse(cmd *cobra.Command, r *http.Request, resp *http.Response) {
|
||||
cmd.Println(r.Method, r.URL.RequestURI())
|
||||
cmd.Println(resp.Status)
|
||||
if resp.ContentLength == 0 {
|
||||
return
|
||||
}
|
||||
detect := detector.NewDetector(resp.Body, xmlutils.DetectXML)
|
||||
dataType, err := detect.Detect()
|
||||
if err != nil {
|
||||
cmd.PrintErrln("type detection error:", err.Error())
|
||||
return
|
||||
}
|
||||
body := &bytes.Buffer{}
|
||||
resultWriter := xmlutils.ChooseWriter(dataType, body)
|
||||
_, err = io.Copy(resultWriter, io.LimitReader(detect.RestoredReader(), viper.GetInt64(cfgPrintResponseLimit)))
|
||||
if err != nil {
|
||||
cmd.Println(err)
|
||||
return
|
||||
}
|
||||
if err = resultWriter.Close(); err != nil {
|
||||
cmd.Printf("could not close response body: %s\n", err)
|
||||
return
|
||||
}
|
||||
|
||||
cmd.Println(body.String())
|
||||
cmd.Println()
|
||||
}
|
||||
|
||||
func run(cmd *cobra.Command, _ []string) error {
|
||||
ctx := cmd.Context()
|
||||
settings := &playback.Settings{
|
||||
Endpoint: viper.GetString(cfgEndpoint),
|
||||
Creds: playback.Credentials{
|
||||
AccessKey: viper.GetString(awsAccessKey),
|
||||
SecretKey: viper.GetString(awsSecretKey),
|
||||
},
|
||||
Multiparts: make(map[string]playback.MultipartUpload),
|
||||
Client: &http.Client{
|
||||
Transport: http.DefaultTransport,
|
||||
Timeout: viper.GetDuration(cfgHTTPTimeoutFlag),
|
||||
},
|
||||
}
|
||||
|
||||
file, err := os.Open(viper.GetString(cfgLogPath))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
reader := bufio.NewReader(file)
|
||||
|
||||
if viper.GetBool(cfgSkipVerifyTLS) {
|
||||
settings.Client.Transport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
|
||||
}
|
||||
|
||||
id := 1
|
||||
for {
|
||||
logReq, err := getRequestFromLog(reader)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
cmd.PrintErrln(strconv.Itoa(id)+")", "failed to parse request", err)
|
||||
id++
|
||||
continue
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return fmt.Errorf("interrupted: %w", ctx.Err())
|
||||
default:
|
||||
r, resp, err := playbackRequest(ctx, logReq, settings)
|
||||
if err != nil {
|
||||
cmd.PrintErrln(strconv.Itoa(id)+")", "failed to playback request:", err)
|
||||
id++
|
||||
continue
|
||||
}
|
||||
cmd.Print(strconv.Itoa(id) + ") ")
|
||||
logResponse(cmd, r, resp)
|
||||
id++
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getRequestFromLog(reader *bufio.Reader) (playback.LoggedRequest, error) {
|
||||
var logReq playback.LoggedRequest
|
||||
req, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
return logReq, err
|
||||
}
|
||||
|
||||
err = json.Unmarshal([]byte(req), &logReq)
|
||||
if err != nil {
|
||||
return logReq, err
|
||||
}
|
||||
|
||||
return logReq, nil
|
||||
}
|
||||
|
||||
// playbackRequest creates http.Request from LoggedRequest and sends it to specified endpoint.
|
||||
func playbackRequest(ctx context.Context, logReq playback.LoggedRequest, settings *playback.Settings) (*http.Request, *http.Response, error) {
|
||||
r, err := prepareRequest(ctx, logReq, settings)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to prepare request: %w", err)
|
||||
}
|
||||
resp, err := settings.Client.Do(r)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to send request: %w", err)
|
||||
}
|
||||
|
||||
respBody, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to read response body: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if err = playback.HandleResponse(r, settings.Multiparts, respBody, logReq.Response); err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to register multipart upload: %w", err)
|
||||
}
|
||||
resp.Body = io.NopCloser(bytes.NewBuffer(respBody))
|
||||
|
||||
return r, resp, nil
|
||||
}
|
||||
|
||||
// prepareRequest creates request from logs and modifies its signature and uploadId (if presents).
|
||||
func prepareRequest(ctx context.Context, logReq playback.LoggedRequest, settings *playback.Settings) (*http.Request, error) {
|
||||
r, err := http.NewRequestWithContext(ctx, logReq.Method, settings.Endpoint+logReq.URI, bytes.NewReader(logReq.Payload))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
r.Header = logReq.Header
|
||||
sha256hash := sha256.New()
|
||||
sha256hash.Write(logReq.Payload)
|
||||
r.Header.Set(auth.AmzContentSHA256, hex.EncodeToString(sha256hash.Sum(nil)))
|
||||
if r.Header.Get(api.ContentMD5) != "" {
|
||||
sha256hash.Reset()
|
||||
md5hash := md5.New()
|
||||
md5hash.Write(logReq.Payload)
|
||||
r.Header.Set(api.ContentMD5, base64.StdEncoding.EncodeToString(md5hash.Sum(nil)))
|
||||
}
|
||||
if r.URL.Query().Has("uploadId") {
|
||||
if err = playback.SwapUploadID(r, settings); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if r.Header.Get(auth.AuthorizationHdr) != "" {
|
||||
if err = playback.Sign(ctx, r, settings.Creds); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return r, nil
|
||||
}
|
|
@ -52,6 +52,17 @@ S3_GW_CONFIG=/path/to/config/yaml
|
|||
# Logger
|
||||
S3_GW_LOGGER_LEVEL=debug
|
||||
|
||||
# HTTP logger
|
||||
S3_GW_HTTP_LOGGING_ENABLED=false
|
||||
# max body size to log
|
||||
S3_GW_HTTP_LOGGING_MAX_BODY=1024
|
||||
# max log size in Mb
|
||||
S3_GW_HTTP_LOGGING_MAX_LOG_SIZE=20
|
||||
# use log compression
|
||||
S3_GW_HTTP_LOGGING_GZIP=true
|
||||
# possible destination output values: filesystem path, url, "stdout", "stderr"
|
||||
S3_GW_HTTP_LOGGING_DESTINATION=stdout
|
||||
|
||||
# RPC endpoint and order of resolving of bucket names
|
||||
S3_GW_RPC_ENDPOINT=http://morph-chain.frostfs.devenv:30333/
|
||||
S3_GW_RESOLVE_ORDER="nns dns"
|
||||
|
@ -111,6 +122,8 @@ S3_GW_CACHE_MORPH_POLICY_SIZE=10000
|
|||
# Cache which stores frostfsid subject info
|
||||
S3_GW_CACHE_FROSTFSID_LIFETIME=1m
|
||||
S3_GW_CACHE_FROSTFSID_SIZE=10000
|
||||
# Cache which stores network info
|
||||
S3_GW_CACHE_NETWORK_INFO_LIFETIME=1m
|
||||
|
||||
# Default policy of placing containers in FrostFS
|
||||
# If a user sends a request `CreateBucket` and doesn't define policy for placing of a container in FrostFS, the S3 Gateway
|
||||
|
|
|
@ -56,6 +56,18 @@ logger:
|
|||
level: debug
|
||||
destination: stdout
|
||||
|
||||
# log http request data (URI, headers, query, etc)
|
||||
http_logging:
|
||||
enabled: false
|
||||
# max body size to log
|
||||
max_body: 1024
|
||||
# max log size in Mb
|
||||
max_log_size: 20
|
||||
# use log compression
|
||||
gzip: true
|
||||
# possible output values: filesystem path, url, "stdout", "stderr"
|
||||
destination: stdout
|
||||
|
||||
# RPC endpoint and order of resolving of bucket names
|
||||
rpc_endpoint: http://morph-chain.frostfs.devenv:30333
|
||||
resolve_order:
|
||||
|
@ -135,6 +147,9 @@ cache:
|
|||
frostfsid:
|
||||
lifetime: 1m
|
||||
size: 10000
|
||||
# Cache which stores network info
|
||||
network_info:
|
||||
lifetime: 1m
|
||||
|
||||
# Parameters of FrostFS container placement policy
|
||||
placement_policy:
|
||||
|
|
8
config/playback/playback.yaml
Normal file
8
config/playback/playback.yaml
Normal file
|
@ -0,0 +1,8 @@
|
|||
endpoint: http://localhost:8084
|
||||
log: ./log/request.log
|
||||
credentials:
|
||||
access_key: CAtUxDSSFtuVyVCjHTMhwx3eP3YSPo5ffwbPcnKfcdrD06WwUSn72T5EBNe3jcgjL54rmxFM6u3nUAoNBps8qJ1PD
|
||||
secret_key: 560027d81c277de7378f71cbf12a32e4f7f541de724be59bcfdbfdc925425f30
|
||||
http_timeout: 60s
|
||||
skip_verify_tls: false
|
||||
print_response_limit: 1024
|
|
@ -176,6 +176,7 @@ There are some custom types used for brevity:
|
|||
| `placement_policy` | [Placement policy configuration](#placement_policy-section) |
|
||||
| `server` | [Server configuration](#server-section) |
|
||||
| `logger` | [Logger configuration](#logger-section) |
|
||||
| `http_logging` | [HTTP Request logger configuration](#http_logging-section) |
|
||||
| `cache` | [Cache configuration](#cache-section) |
|
||||
| `cors` | [CORS configuration](#cors-section) |
|
||||
| `pprof` | [Pprof configuration](#pprof-section) |
|
||||
|
@ -220,7 +221,7 @@ max_clients_deadline: 30s
|
|||
allowed_access_key_id_prefixes:
|
||||
- Ck9BHsgKcnwfCTUSFm6pxhoNS4cBqgN2NQ8zVgPjqZDX
|
||||
- 3stjWenX15YwYzczMr88gy3CQr4NYFBQ8P7keGzH5QFn
|
||||
|
||||
|
||||
reconnect_interval: 1m
|
||||
|
||||
source_ip_header: "Source-Ip"
|
||||
|
@ -376,6 +377,32 @@ logger:
|
|||
| `level` | `string` | yes | `debug` | Logging level.<br/>Possible values: `debug`, `info`, `warn`, `error`, `dpanic`, `panic`, `fatal`. |
|
||||
| `destination` | `string` | no | `stdout` | Destination for logger: `stdout` or `journald` |
|
||||
|
||||
|
||||
### `http_logging` section
|
||||
|
||||
Could be enabled only in builds with `loghttp` build tag. To build with `loghttp` tag, pass `GOFLAGS` var to `make`:
|
||||
```bash
|
||||
make GOFLAGS="-tags=loghttp" [target]
|
||||
```
|
||||
|
||||
```yaml
|
||||
http_logging:
|
||||
enabled: false
|
||||
max_body: 1024
|
||||
max_log_size: 20
|
||||
gzip: true
|
||||
destination: stdout
|
||||
```
|
||||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|----------------|----------|---------------|---------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| `enabled` | `bool` | yes | `false` | Flag to enable the logger. |
|
||||
| `max_body` | `int` | yes | `1024` | Max body size for log output in bytes. |
|
||||
| `max_log_size` | `int` | yes | `50` | Log file size threshold (in megabytes) to be moved in backup file. After reaching threshold, initial filename is appended with timestamp. And new empty file with initial name is created. |
|
||||
| `gzip` | `bool` | yes | `false` | Whether to enable Gzip compression to backup log files. |
|
||||
| `destination` | `string` | yes | `stdout` | Specify path for log output. Accepts log file path, or "stdout" and "stderr" reserved words to print in output streams. File and folders are created if necessary. |
|
||||
|
||||
|
||||
### `cache` section
|
||||
|
||||
```yaml
|
||||
|
@ -411,6 +438,8 @@ cache:
|
|||
frostfsid:
|
||||
lifetime: 1m
|
||||
size: 10000
|
||||
network_info:
|
||||
lifetime: 1m
|
||||
```
|
||||
|
||||
| Parameter | Type | Default value | Description |
|
||||
|
@ -425,6 +454,7 @@ cache:
|
|||
| `accesscontrol` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 100000` | Cache which stores owner to cache operation mapping. |
|
||||
| `morph_policy` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores list of policy chains. |
|
||||
| `frostfsid` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores FrostfsID subject info. |
|
||||
| `network_info` | [Cache config](#cache-subsection) | `lifetime: 1m` | Cache which stores network info. |
|
||||
|
||||
#### `cache` subsection
|
||||
|
||||
|
@ -743,7 +773,7 @@ vhs:
|
|||
```
|
||||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
| ------------------- | ----------------- | ------------- | ---------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
|
||||
|---------------------|-------------------|---------------|------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| `enabled` | `bool` | yes | `false` | Enables the use of virtual host addressing for buckets at the application level. |
|
||||
| `vhs_header` | `string` | yes | `X-Frostfs-S3-VHS` | Header for determining whether VHS is enabled for the request. |
|
||||
| `servername_header` | `string` | yes | `X-Frostfs-Servername` | Header for determining servername. |
|
||||
|
|
48
docs/playback.md
Normal file
48
docs/playback.md
Normal file
|
@ -0,0 +1,48 @@
|
|||
# FrostFS S3 Playback
|
||||
|
||||
Playback is a tool to reproduce queries to `frostfs-s3-gw` in dev environment. Network logs could be
|
||||
gathered from `s3-gw` via HTTP Logger which could be enabled on build with `loghttp` build tag
|
||||
and `http_logging.enabled` option set to `true` in `s3-gw` configuration.
|
||||
|
||||
## Commands
|
||||
|
||||
`run` - reads log file and reproduces send requests from it to specified endpoint
|
||||
|
||||
#### Example
|
||||
```bash
|
||||
frostfs-s3-playback --config <config_path> run [--endpoint=<endpoint>] [--log=<log_path>]
|
||||
```
|
||||
|
||||
## Configuration
|
||||
|
||||
Playback accepts configuration file path in yaml with corresponding options:
|
||||
```yaml
|
||||
endpoint: http://localhost:8084
|
||||
log: ./request.log
|
||||
env: .env
|
||||
credentials:
|
||||
access_key: CAtUxDSSFtuVyVCjHTMhwx3eP3YSPo5ffwbPcnKfcdrD06WwUSn72T5EBNe3jcgjL54rmxFM6u3nUAoNBps8qJ1PD
|
||||
secret_key: 560027d81c277de7378f71cbf12a32e4f7f541de724be59bcfdbfdc925425f30
|
||||
http_timeout: 60s
|
||||
skip_verify_tls: true
|
||||
```
|
||||
Configuration path is passed via required `--config` flag.
|
||||
If corresponding flag is set, it overrides parameter from config.
|
||||
|
||||
### Configuration parameters
|
||||
|
||||
#### Global parameters
|
||||
| Config parameter name | Flag name | Type | Default value | Description |
|
||||
|-------------------------|-------------------|------------|---------------|-------------------------------------------------------------------------------|
|
||||
| - | `config` | `string` | - | config file path (e.g. `./config/playback.yaml`) |
|
||||
| `http_timeout` | `http-timeout` | `duration` | `60s` | http request timeout |
|
||||
| `skip_verify_tls` | `skip-verify-tls` | `bool` | `false` | skips tls certificate verification for https endpoints |
|
||||
| `credentials.accessKey` | - | `string` | - | AWS access key id |
|
||||
| `credentials.secretKey` | - | `string` | - | AWS secret key |
|
||||
| `print_response_limit` | - | `int` | `1024` | max response length to be printed in stdout, the rest of body will be omitted |
|
||||
|
||||
#### `run` command parameters
|
||||
| Config parameter name | Flag name | Type | Default value | Description |
|
||||
|-----------------------|-----------|--------|---------------|--------------------------------------------------------|
|
||||
| `endpoint` | endpoint | string | - | s3-gw endpoint URL |
|
||||
| `log` | log | string | ./request.log | path to log file, could be either absolute or relative |
|
12
go.mod
12
go.mod
|
@ -7,10 +7,11 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240822080251-28f140bf06c1
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240821072038-a1386f6d259a
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b
|
||||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
|
||||
github.com/aws/aws-sdk-go v1.44.6
|
||||
github.com/aws/aws-sdk-go-v2 v1.18.1
|
||||
github.com/aws/aws-sdk-go-v2 v1.30.3
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.17.27
|
||||
github.com/bluele/gcache v0.0.2
|
||||
github.com/go-chi/chi/v5 v5.0.8
|
||||
github.com/google/uuid v1.6.0
|
||||
|
@ -20,7 +21,7 @@ require (
|
|||
github.com/panjf2000/ants/v2 v2.5.0
|
||||
github.com/prometheus/client_golang v1.19.0
|
||||
github.com/prometheus/client_model v0.5.0
|
||||
github.com/spf13/cobra v1.7.0
|
||||
github.com/spf13/cobra v1.8.1
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/spf13/viper v1.15.0
|
||||
github.com/ssgreg/journald v1.0.0
|
||||
|
@ -36,6 +37,7 @@ require (
|
|||
golang.org/x/text v0.14.0
|
||||
google.golang.org/grpc v1.63.2
|
||||
google.golang.org/protobuf v1.33.0
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1
|
||||
)
|
||||
|
||||
require (
|
||||
|
@ -45,11 +47,11 @@ require (
|
|||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 // indirect
|
||||
github.com/VictoriaMetrics/easyproto v0.1.4 // indirect
|
||||
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
|
||||
github.com/aws/smithy-go v1.13.5 // indirect
|
||||
github.com/aws/smithy-go v1.20.3 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.2.0 // indirect
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
|
||||
github.com/fsnotify/fsnotify v1.6.0 // indirect
|
||||
|
|
25
go.sum
25
go.sum
|
@ -48,8 +48,8 @@ git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240822080251-28f140bf06c1
|
|||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240822080251-28f140bf06c1/go.mod h1:Pl77loECndbgIC0Kljj1MFmGJKQ9gotaFINyveW1T8I=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240821072038-a1386f6d259a h1:uuNs7xOVgFOqO6hUyyZT+/eZ9glXQ85J4GDVe+qKMCI=
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240821072038-a1386f6d259a/go.mod h1:SgioiGhQNWqiV5qpFAXRDJF81SEFRBhtwGEiU0FViyA=
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b h1:M50kdfrf/h8c3cz0bJ2AEUcbXvAlPFVC1Wp1WkfZ/8E=
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b/go.mod h1:GZTk55RI4dKzsK6BCn5h2xxE28UHNfgoq/NJxW/LQ6A=
|
||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA=
|
||||
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0/go.mod h1:okpbKfVYf/BpejtfFTfhZqFP+sZ8rsHrP8Rr/jYPNRc=
|
||||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 h1:UFMnUIk0Zh17m8rjGHJMqku2hCgaXDqjqZzS4gsb4UA=
|
||||
|
@ -66,10 +66,12 @@ github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8
|
|||
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
|
||||
github.com/aws/aws-sdk-go v1.44.6 h1:Y+uHxmZfhRTLX2X3khkdxCoTZAyGEX21aOUHe1U6geg=
|
||||
github.com/aws/aws-sdk-go v1.44.6/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
|
||||
github.com/aws/aws-sdk-go-v2 v1.18.1 h1:+tefE750oAb7ZQGzla6bLkOwfcQCEtC5y2RqoqCeqKo=
|
||||
github.com/aws/aws-sdk-go-v2 v1.18.1/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw=
|
||||
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8=
|
||||
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA=
|
||||
github.com/aws/aws-sdk-go-v2 v1.30.3 h1:jUeBtG0Ih+ZIFH0F4UkmL9w3cSpaMv9tYYDbzILP8dY=
|
||||
github.com/aws/aws-sdk-go-v2 v1.30.3/go.mod h1:nIQjQVp5sfpQcTc9mPSr1B0PaWK5ByX9MOoDadSN4lc=
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.17.27 h1:2raNba6gr2IfA0eqqiP2XiQ0UVOpGPgDSi0I9iAP+UI=
|
||||
github.com/aws/aws-sdk-go-v2/credentials v1.17.27/go.mod h1:gniiwbGahQByxan6YjQUMcW4Aov6bLC3m+evgcoN4r4=
|
||||
github.com/aws/smithy-go v1.20.3 h1:ryHwveWzPV5BIof6fyDvor6V3iUL7nTfiTKXHiW05nE=
|
||||
github.com/aws/smithy-go v1.20.3/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
|
||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||
github.com/bits-and-blooms/bitset v1.8.0 h1:FD+XqgOZDUxxZ8hzoBFuV9+cGWY9CslN6d5MS5JVb4c=
|
||||
|
@ -100,8 +102,8 @@ github.com/consensys/bavard v0.1.13/go.mod h1:9ItSMtA/dXMAiL7BG6bqW2m3NdSEObYWoH
|
|||
github.com/consensys/gnark-crypto v0.12.2-0.20231013160410-1f65e75b6dfb h1:f0BMgIjhZy4lSRHCXFbQst85f5agZAjtDMixQqBWNpc=
|
||||
github.com/consensys/gnark-crypto v0.12.2-0.20231013160410-1f65e75b6dfb/go.mod h1:v2Gy7L/4ZRosZ7Ivs+9SfUDr0f5UlG+EM5t7MPHiLuY=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4=
|
||||
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
|
@ -177,7 +179,6 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
|
|||
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
|
||||
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
|
||||
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
|
||||
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
|
||||
|
@ -295,8 +296,8 @@ github.com/spf13/afero v1.9.3 h1:41FoI0fD7OR7mGcKE/aOiLkGreyf8ifIOQmJANWogMk=
|
|||
github.com/spf13/afero v1.9.3/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y=
|
||||
github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w=
|
||||
github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU=
|
||||
github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I=
|
||||
github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0=
|
||||
github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
|
||||
github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y=
|
||||
github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk=
|
||||
github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo=
|
||||
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
|
||||
|
@ -707,6 +708,8 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
|
|||
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
|
||||
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
|
||||
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
|
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||
|
|
|
@ -99,6 +99,13 @@ const (
|
|||
FailedToPassAuthentication = "failed to pass authentication" // Error in ../../api/middleware/auth.go
|
||||
FailedToResolveCID = "failed to resolve CID" // Debug in ../../api/middleware/metrics.go
|
||||
RequestStart = "request start" // Info in ../../api/middleware/reqinfo.go
|
||||
LogHTTP = "http log" // Info in ../../api/middleware/log_http.go
|
||||
FailedToCloseHTTPBody = "failed to close http body" // Error in ../../api/middleware/log_http.go
|
||||
FailedToInitializeHTTPLogger = "failed to initialize http logger" // Error in ../../api/middleware/log_http.go
|
||||
FailedToReloadHTTPFileLogger = "failed to reload http file logger" // Error in ../../api/middleware/log_http.go
|
||||
FailedToReadHTTPBody = "failed to read http body" // Error in ../../api/middleware/log_http.go
|
||||
FailedToProcessHTTPBody = "failed to process http body" // Error in ../../api/middleware/log_http.go
|
||||
LogHTTPDisabledInThisBuild = "http logging disabled in this build" // Warn in ../../api/middleware/log_http_stub.go
|
||||
FailedToUnescapeObjectName = "failed to unescape object name" // Warn in ../../api/middleware/reqinfo.go
|
||||
InvalidDefaultMaxAge = "invalid defaultMaxAge" // Fatal in ../../cmd/s3-gw/app_settings.go
|
||||
CantShutDownService = "can't shut down service" // Panic in ../../cmd/s3-gw/service.go
|
||||
|
@ -161,4 +168,6 @@ const (
|
|||
WarnDuplicateNamespaceVHS = "duplicate namespace with enabled VHS, config value skipped"
|
||||
WarnValueVHSEnabledFlagWrongType = "the value of the VHS enable flag for the namespace is of the wrong type, config value skipped"
|
||||
WarnDomainContainsInvalidPlaceholder = "the domain contains an invalid placeholder, domain skipped"
|
||||
FailedToRemoveOldPartNode = "failed to remove old part node"
|
||||
CouldntCacheNetworkInfo = "couldn't cache network info"
|
||||
)
|
||||
|
|
|
@ -1,15 +1,15 @@
|
|||
package layer
|
||||
package detector
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
)
|
||||
|
||||
type (
|
||||
detector struct {
|
||||
Detector struct {
|
||||
io.Reader
|
||||
err error
|
||||
data []byte
|
||||
err error
|
||||
data []byte
|
||||
detectFunc func([]byte) string
|
||||
}
|
||||
errReader struct {
|
||||
data []byte
|
||||
|
@ -36,23 +36,24 @@ func (r *errReader) Read(b []byte) (int, error) {
|
|||
return n, nil
|
||||
}
|
||||
|
||||
func newDetector(reader io.Reader) *detector {
|
||||
return &detector{
|
||||
data: make([]byte, contentTypeDetectSize),
|
||||
Reader: reader,
|
||||
func NewDetector(reader io.Reader, detectFunc func([]byte) string) *Detector {
|
||||
return &Detector{
|
||||
data: make([]byte, contentTypeDetectSize),
|
||||
Reader: reader,
|
||||
detectFunc: detectFunc,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *detector) Detect() (string, error) {
|
||||
func (d *Detector) Detect() (string, error) {
|
||||
n, err := d.Reader.Read(d.data)
|
||||
if err != nil && err != io.EOF {
|
||||
d.err = err
|
||||
return "", err
|
||||
}
|
||||
d.data = d.data[:n]
|
||||
return http.DetectContentType(d.data), nil
|
||||
return d.detectFunc(d.data), nil
|
||||
}
|
||||
|
||||
func (d *detector) MultiReader() io.Reader {
|
||||
func (d *Detector) RestoredReader() io.Reader {
|
||||
return io.MultiReader(newReader(d.data, d.err), d.Reader)
|
||||
}
|
|
@ -156,6 +156,10 @@ type NodeResponse interface {
|
|||
}
|
||||
|
||||
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
|
||||
if err := validateNodeResponse(nodeInfo); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tNode := &treeNode{
|
||||
ID: nodeInfo.GetNodeID(),
|
||||
ParentID: nodeInfo.GetParentID(),
|
||||
|
@ -163,14 +167,6 @@ func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
|
|||
Meta: make(map[string]string, len(nodeInfo.GetMeta())),
|
||||
}
|
||||
|
||||
if len(tNode.ID) == 0 || len(tNode.ParentID) == 0 || len(tNode.TimeStamp) == 0 {
|
||||
return nil, errors.New("invalid tree node: missing id")
|
||||
}
|
||||
|
||||
if len(tNode.ID) != len(tNode.ParentID) || len(tNode.ID) != len(tNode.TimeStamp) {
|
||||
return nil, errors.New("invalid tree node: length multiple ids mismatch")
|
||||
}
|
||||
|
||||
for _, kv := range nodeInfo.GetMeta() {
|
||||
switch kv.GetKey() {
|
||||
case oidKV:
|
||||
|
@ -377,6 +373,10 @@ func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *tr
|
|||
}
|
||||
|
||||
func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) {
|
||||
if err := validateNodeResponse(node); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(node.GetNodeID()) != 1 {
|
||||
return nil, errors.New("invalid multipart node: this is split node")
|
||||
}
|
||||
|
@ -426,10 +426,36 @@ func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo,
|
|||
return multipartInfo, nil
|
||||
}
|
||||
|
||||
func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
|
||||
var err error
|
||||
partInfo := &data.PartInfo{}
|
||||
func validateNodeResponse(node NodeResponse) error {
|
||||
ids := node.GetNodeID()
|
||||
parentIDs := node.GetParentID()
|
||||
timestamps := node.GetTimestamp()
|
||||
|
||||
if len(ids) == 0 || len(parentIDs) == 0 || len(timestamps) == 0 {
|
||||
return errors.New("invalid node response: missing ids")
|
||||
}
|
||||
|
||||
if len(ids) != len(parentIDs) || len(parentIDs) != len(timestamps) {
|
||||
return errors.New("invalid node response: multiple ids length mismatch")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func newPartInfo(node NodeResponse) (*data.PartInfoExtended, error) {
|
||||
if err := validateNodeResponse(node); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(node.GetNodeID()) != 1 {
|
||||
return nil, errors.New("invalid part node: this is split node")
|
||||
}
|
||||
|
||||
partInfo := &data.PartInfoExtended{
|
||||
Timestamp: node.GetTimestamp()[0],
|
||||
}
|
||||
|
||||
var err error
|
||||
for _, kv := range node.GetMeta() {
|
||||
value := string(kv.GetValue())
|
||||
switch kv.GetKey() {
|
||||
|
@ -1397,10 +1423,10 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo,
|
|||
return nil, layer.ErrNodeNotFound
|
||||
}
|
||||
|
||||
func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) {
|
||||
func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) {
|
||||
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
|
||||
if err != nil {
|
||||
return oid.ID{}, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
meta := map[string]string{
|
||||
|
@ -1412,48 +1438,76 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN
|
|||
md5KV: info.MD5,
|
||||
}
|
||||
|
||||
objToDelete := make([]oid.ID, 0, 1)
|
||||
partsToDelete := make([]uint64, 0, 1)
|
||||
var (
|
||||
latestPartID uint64
|
||||
maxTimestamp uint64
|
||||
)
|
||||
|
||||
multiNodeID := MultiID{multipartNodeID}
|
||||
|
||||
for _, part := range parts {
|
||||
if len(part.GetNodeID()) != 1 {
|
||||
// multipart parts nodeID shouldn't have multiple values
|
||||
c.reqLogger(ctx).Warn(logs.UnexpectedMultiNodeIDsInSubTreeMultiParts,
|
||||
zap.String("key", info.Key),
|
||||
zap.String("upload id", info.UploadID),
|
||||
zap.Uint64("multipart node id ", multipartNodeID),
|
||||
zap.Uint64s("node ids", part.GetNodeID()))
|
||||
continue
|
||||
}
|
||||
nodeID := part.GetNodeID()[0]
|
||||
if nodeID == multipartNodeID {
|
||||
if multiNodeID.Equal(part.GetNodeID()) {
|
||||
continue
|
||||
}
|
||||
|
||||
partInfo, err := newPartInfo(part)
|
||||
if err != nil {
|
||||
c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo,
|
||||
zap.String("key", info.Key),
|
||||
zap.String("upload id", info.UploadID),
|
||||
zap.Uint64("multipart node id ", multipartNodeID),
|
||||
zap.Uint64s("id", part.GetNodeID()),
|
||||
zap.Error(err))
|
||||
continue
|
||||
}
|
||||
if partInfo.Number == info.Number {
|
||||
return partInfo.OID, c.service.MoveNode(ctx, bktInfo, systemTree, nodeID, multipartNodeID, meta)
|
||||
nodeID := part.GetNodeID()[0]
|
||||
objToDelete = append(objToDelete, partInfo.OID)
|
||||
partsToDelete = append(partsToDelete, nodeID)
|
||||
timestamp := partInfo.Timestamp
|
||||
if timestamp > maxTimestamp {
|
||||
maxTimestamp = timestamp
|
||||
latestPartID = nodeID
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil {
|
||||
return oid.ID{}, err
|
||||
if len(objToDelete) != 0 {
|
||||
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latestPartID, multipartNodeID, meta); err != nil {
|
||||
return nil, fmt.Errorf("move part node: %w", err)
|
||||
}
|
||||
|
||||
for _, nodeID := range partsToDelete {
|
||||
if nodeID == latestPartID {
|
||||
continue
|
||||
}
|
||||
if err = c.service.RemoveNode(ctx, bktInfo, systemTree, nodeID); err != nil {
|
||||
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldPartNode,
|
||||
zap.String("key", info.Key),
|
||||
zap.String("upload id", info.UploadID),
|
||||
zap.Uint64("id", nodeID))
|
||||
}
|
||||
}
|
||||
|
||||
return objToDelete, nil
|
||||
}
|
||||
|
||||
return oid.ID{}, layer.ErrNoNodeToRemove
|
||||
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, layer.ErrNoNodeToRemove
|
||||
}
|
||||
|
||||
func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) {
|
||||
func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) {
|
||||
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result := make([]*data.PartInfo, 0, len(parts))
|
||||
result := make([]*data.PartInfoExtended, 0, len(parts))
|
||||
for _, part := range parts {
|
||||
if len(part.GetNodeID()) != 1 {
|
||||
// multipart parts nodeID shouldn't have multiple values
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
|
||||
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
|
@ -304,3 +305,57 @@ func TestGetLatestNode(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestSplitTreeMultiparts(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
memCli, err := NewTreeServiceClientMemory()
|
||||
require.NoError(t, err)
|
||||
treeService := NewTree(memCli, zaptest.NewLogger(t))
|
||||
|
||||
bktInfo := &data.BucketInfo{
|
||||
CID: cidtest.ID(),
|
||||
}
|
||||
|
||||
multipartInfo := &data.MultipartInfo{
|
||||
Key: "multipart",
|
||||
UploadID: "id",
|
||||
Meta: map[string]string{},
|
||||
Owner: usertest.ID(),
|
||||
}
|
||||
|
||||
err = treeService.CreateMultipartUpload(ctx, bktInfo, multipartInfo)
|
||||
require.NoError(t, err)
|
||||
|
||||
multipartInfo, err = treeService.GetMultipartUpload(ctx, bktInfo, multipartInfo.Key, multipartInfo.UploadID)
|
||||
require.NoError(t, err)
|
||||
|
||||
var objIDs []oid.ID
|
||||
for i := 0; i < 2; i++ {
|
||||
objID := oidtest.ID()
|
||||
_, err = memCli.AddNode(ctx, bktInfo, systemTree, multipartInfo.ID, map[string]string{
|
||||
partNumberKV: "1",
|
||||
oidKV: objID.EncodeToString(),
|
||||
ownerKV: usertest.ID().EncodeToString(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
objIDs = append(objIDs, objID)
|
||||
}
|
||||
|
||||
parts, err := treeService.GetParts(ctx, bktInfo, multipartInfo.ID)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, parts, 2)
|
||||
|
||||
objToDeletes, err := treeService.AddPart(ctx, bktInfo, multipartInfo.ID, &data.PartInfo{
|
||||
Key: multipartInfo.Key,
|
||||
UploadID: multipartInfo.UploadID,
|
||||
Number: 1,
|
||||
OID: oidtest.ID(),
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, objIDs, objToDeletes, "oids to delete mismatched")
|
||||
|
||||
parts, err = treeService.GetParts(ctx, bktInfo, multipartInfo.ID)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, parts, 1)
|
||||
}
|
||||
|
|
48
pkg/xmlutils/xmlutils.go
Normal file
48
pkg/xmlutils/xmlutils.go
Normal file
|
@ -0,0 +1,48 @@
|
|||
package xmlutils
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/base64"
|
||||
"encoding/xml"
|
||||
"io"
|
||||
)
|
||||
|
||||
type nopCloseWriter struct {
|
||||
io.Writer
|
||||
}
|
||||
|
||||
func (b nopCloseWriter) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
const (
|
||||
nonXML = "nonXML"
|
||||
typeXML = "application/xml"
|
||||
)
|
||||
|
||||
func DetectXML(data []byte) string {
|
||||
token, err := xml.NewDecoder(bytes.NewReader(data)).RawToken()
|
||||
if err != nil {
|
||||
return nonXML
|
||||
}
|
||||
|
||||
switch token.(type) {
|
||||
case xml.StartElement, xml.ProcInst:
|
||||
return typeXML
|
||||
}
|
||||
return nonXML
|
||||
}
|
||||
|
||||
func ChooseWriter(dataType string, bodyWriter io.Writer) io.WriteCloser {
|
||||
if dataType == typeXML {
|
||||
return nopCloseWriter{bodyWriter}
|
||||
}
|
||||
return base64.NewEncoder(base64.StdEncoding, bodyWriter)
|
||||
}
|
||||
|
||||
func ChooseReader(dataType string, bodyReader io.Reader) io.Reader {
|
||||
if dataType == typeXML {
|
||||
return bodyReader
|
||||
}
|
||||
return base64.NewDecoder(base64.StdEncoding, bodyReader)
|
||||
}
|
Loading…
Add table
Reference in a new issue