Compare commits

...

10 commits

Author SHA1 Message Date
4424aeeb71 [##] Fix of receiving VHS namespaces map
In the process of forming a map with namespaces
for which VHS is enabled, we resolve the alias
of the namespace. The problem is that to resolve,
we need default namespace names, which in turn do
not have time to decide by this time. Therefore,
the receipt of a card with VHS namespaces must
be postponed after reading the default names for
the namespace.

Signed-off-by: Roman Loginov <r.loginov@yadro.com>
2024-09-16 13:45:35 +03:00
a0aba8bcb1 [##] Fix X-Frostfs-S3-VHS header processing
It is assumed that the X-Frostfs-S3-VHS header
will have the value enabled/disabled instead
of true/false.

Signed-off-by: Roman Loginov <r.loginov@yadro.com>
2024-09-16 13:45:17 +03:00
d0e4d55772 [#460] Add network info cache
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
2024-09-13 09:56:24 +00:00
42e72889a5 [#450] Add test for chunk-encoded object size
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2024-09-13 12:00:24 +03:00
98815d5473 [#450] Fix aws-chunked header workflow
Signed-off-by: Pavel Pogodaev <p.pogodaev@yadro.com>
2024-09-13 11:59:07 +03:00
62615d7ab7 [#369] Request reproducer
Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
2024-09-11 15:25:09 +03:00
575ab4d294 [#369] Enhanced http requests logging
Signed-off-by: Nikita Zinkevich <n.zinkevich@yadro.com>
2024-09-11 15:25:09 +03:00
d919e6cce2 [#482] Fix containers resolving
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
2024-09-05 12:33:14 +03:00
056f168d77 [#448] multipart: Support removing duplicated parts
Previously after tree split we can have duplicated parts
(several objects and tree node referred to the same part number).
Some of them couldn't be deleted after abort or compete action.

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2024-09-03 13:20:38 +00:00
9bdfe2a016 [#479] Update APE to support s3:PatchObject action
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
2024-09-03 11:57:59 +00:00
50 changed files with 1629 additions and 204 deletions

View file

@ -3,11 +3,12 @@ FROM golang:1.22 AS builder
ARG BUILD=now ARG BUILD=now
ARG REPO=git.frostfs.info/TrueCloudLab/frostfs-s3-gw ARG REPO=git.frostfs.info/TrueCloudLab/frostfs-s3-gw
ARG VERSION=dev ARG VERSION=dev
ARG GOFLAGS=""
WORKDIR /src WORKDIR /src
COPY . /src COPY . /src
RUN make RUN make GOFLAGS=${GOFLAGS}
# Executable image # Executable image
FROM alpine AS frostfs-s3-gw FROM alpine AS frostfs-s3-gw

View file

@ -14,6 +14,8 @@ METRICS_DUMP_OUT ?= ./metrics-dump.json
CMDS = $(addprefix frostfs-, $(notdir $(wildcard cmd/*))) CMDS = $(addprefix frostfs-, $(notdir $(wildcard cmd/*)))
BINS = $(addprefix $(BINDIR)/, $(CMDS)) BINS = $(addprefix $(BINDIR)/, $(CMDS))
GOFLAGS ?=
# Variables for docker # Variables for docker
REPO_BASENAME = $(shell basename `go list -m`) REPO_BASENAME = $(shell basename `go list -m`)
HUB_IMAGE ?= "truecloudlab/$(REPO_BASENAME)" HUB_IMAGE ?= "truecloudlab/$(REPO_BASENAME)"
@ -44,6 +46,7 @@ all: $(BINS)
$(BINS): $(BINDIR) dep $(BINS): $(BINDIR) dep
@echo "⇒ Build $@" @echo "⇒ Build $@"
CGO_ENABLED=0 \ CGO_ENABLED=0 \
GOFLAGS=$(GOFLAGS) \
go build -v -trimpath \ go build -v -trimpath \
-ldflags "-X $(REPO)/internal/version.Version=$(VERSION)" \ -ldflags "-X $(REPO)/internal/version.Version=$(VERSION)" \
-o $@ ./cmd/$(subst frostfs-,,$(notdir $@)) -o $@ ./cmd/$(subst frostfs-,,$(notdir $@))
@ -70,7 +73,7 @@ docker/%:
-w /src \ -w /src \
-u `stat -c "%u:%g" .` \ -u `stat -c "%u:%g" .` \
--env HOME=/src \ --env HOME=/src \
golang:$(GO_VERSION) make $*,\ golang:$(GO_VERSION) make GOFLAGS=$(GOFLAGS) $*,\
@echo "supported docker targets: all $(BINS) lint") @echo "supported docker targets: all $(BINS) lint")
# Run tests # Run tests
@ -121,6 +124,7 @@ image:
@docker build \ @docker build \
--build-arg REPO=$(REPO) \ --build-arg REPO=$(REPO) \
--build-arg VERSION=$(VERSION) \ --build-arg VERSION=$(VERSION) \
--build-arg GOFLAGS=$(GOFLAGS) \
--rm \ --rm \
-f .docker/Dockerfile \ -f .docker/Dockerfile \
-t $(HUB_IMAGE):$(HUB_TAG) . -t $(HUB_IMAGE):$(HUB_TAG) .

View file

@ -22,8 +22,8 @@ import (
"github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/credentials"
) )
// authorizationFieldRegexp -- is regexp for credentials with Base58 encoded cid and oid and '0' (zero) as delimiter. // AuthorizationFieldRegexp -- is regexp for credentials with Base58 encoded cid and oid and '0' (zero) as delimiter.
var authorizationFieldRegexp = regexp.MustCompile(`AWS4-HMAC-SHA256 Credential=(?P<access_key_id>[^/]+)/(?P<date>[^/]+)/(?P<region>[^/]*)/(?P<service>[^/]+)/aws4_request,\s*SignedHeaders=(?P<signed_header_fields>.+),\s*Signature=(?P<v4_signature>.+)`) var AuthorizationFieldRegexp = regexp.MustCompile(`AWS4-HMAC-SHA256 Credential=(?P<access_key_id>[^/]+)/(?P<date>[^/]+)/(?P<region>[^/]*)/(?P<service>[^/]+)/aws4_request,\s*SignedHeaders=(?P<signed_header_fields>.+),\s*Signature=(?P<v4_signature>.+)`)
// postPolicyCredentialRegexp -- is regexp for credentials when uploading file using POST with policy. // postPolicyCredentialRegexp -- is regexp for credentials when uploading file using POST with policy.
var postPolicyCredentialRegexp = regexp.MustCompile(`(?P<access_key_id>[^/]+)/(?P<date>[^/]+)/(?P<region>[^/]*)/(?P<service>[^/]+)/aws4_request`) var postPolicyCredentialRegexp = regexp.MustCompile(`(?P<access_key_id>[^/]+)/(?P<date>[^/]+)/(?P<region>[^/]*)/(?P<service>[^/]+)/aws4_request`)
@ -85,7 +85,7 @@ var ContentSHA256HeaderStandardValue = map[string]struct{}{
func New(creds tokens.Credentials, prefixes []string) *Center { func New(creds tokens.Credentials, prefixes []string) *Center {
return &Center{ return &Center{
cli: creds, cli: creds,
reg: NewRegexpMatcher(authorizationFieldRegexp), reg: NewRegexpMatcher(AuthorizationFieldRegexp),
postReg: NewRegexpMatcher(postPolicyCredentialRegexp), postReg: NewRegexpMatcher(postPolicyCredentialRegexp),
allowedAccessKeyIDPrefixes: prefixes, allowedAccessKeyIDPrefixes: prefixes,
} }

View file

@ -32,7 +32,7 @@ func TestAuthHeaderParse(t *testing.T) {
defaultHeader := "AWS4-HMAC-SHA256 Credential=oid0cid/20210809/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=2811ccb9e242f41426738fb1f" defaultHeader := "AWS4-HMAC-SHA256 Credential=oid0cid/20210809/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=2811ccb9e242f41426738fb1f"
center := &Center{ center := &Center{
reg: NewRegexpMatcher(authorizationFieldRegexp), reg: NewRegexpMatcher(AuthorizationFieldRegexp),
} }
for _, tc := range []struct { for _, tc := range []struct {

View file

@ -85,7 +85,7 @@ func TestCheckSign(t *testing.T) {
c := &Center{ c := &Center{
cli: mock, cli: mock,
reg: NewRegexpMatcher(authorizationFieldRegexp), reg: NewRegexpMatcher(AuthorizationFieldRegexp),
postReg: NewRegexpMatcher(postPolicyCredentialRegexp), postReg: NewRegexpMatcher(postPolicyCredentialRegexp),
} }
box, err := c.Authenticate(req) box, err := c.Authenticate(req)

65
api/cache/network_info.go vendored Normal file
View file

@ -0,0 +1,65 @@
package cache
import (
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
"github.com/bluele/gcache"
"go.uber.org/zap"
)
type (
// NetworkInfoCache provides cache for network info.
NetworkInfoCache struct {
cache gcache.Cache
logger *zap.Logger
}
// NetworkInfoCacheConfig stores expiration params for cache.
NetworkInfoCacheConfig struct {
Lifetime time.Duration
Logger *zap.Logger
}
)
const (
DefaultNetworkInfoCacheLifetime = 1 * time.Minute
networkInfoCacheSize = 1
networkInfoKey = "network_info"
)
// DefaultNetworkInfoConfig returns new default cache expiration values.
func DefaultNetworkInfoConfig(logger *zap.Logger) *NetworkInfoCacheConfig {
return &NetworkInfoCacheConfig{
Lifetime: DefaultNetworkInfoCacheLifetime,
Logger: logger,
}
}
// NewNetworkInfoCache creates an object of NetworkInfoCache.
func NewNetworkInfoCache(config *NetworkInfoCacheConfig) *NetworkInfoCache {
gc := gcache.New(networkInfoCacheSize).LRU().Expiration(config.Lifetime).Build()
return &NetworkInfoCache{cache: gc, logger: config.Logger}
}
func (c *NetworkInfoCache) Get() *netmap.NetworkInfo {
entry, err := c.cache.Get(networkInfoKey)
if err != nil {
return nil
}
result, ok := entry.(netmap.NetworkInfo)
if !ok {
c.logger.Warn(logs.InvalidCacheEntryType, zap.String("actual", fmt.Sprintf("%T", entry)),
zap.String("expected", fmt.Sprintf("%T", result)))
return nil
}
return &result
}
func (c *NetworkInfoCache) Put(info netmap.NetworkInfo) error {
return c.cache.Set(networkInfoKey, info)
}

View file

@ -126,6 +126,14 @@ type PartInfo struct {
Created time.Time `json:"created"` Created time.Time `json:"created"`
} }
type PartInfoExtended struct {
PartInfo
// Timestamp is used to find the latest version of part info in case of tree split
// when there are multiple nodes for the same part.
Timestamp uint64
}
// ToHeaderString form short part representation to use in S3-Completed-Parts header. // ToHeaderString form short part representation to use in S3-Completed-Parts header.
func (p *PartInfo) ToHeaderString() string { func (p *PartInfo) ToHeaderString() string {
// ETag value contains SHA256 checksum which is used while getting object parts attributes. // ETag value contains SHA256 checksum which is used while getting object parts attributes.

View file

@ -288,6 +288,21 @@ func completeMultipartUploadBase(hc *handlerContext, bktName, objName, uploadID
return w return w
} }
func abortMultipartUpload(hc *handlerContext, bktName, objName, uploadID string) {
w := abortMultipartUploadBase(hc, bktName, objName, uploadID)
assertStatus(hc.t, w, http.StatusNoContent)
}
func abortMultipartUploadBase(hc *handlerContext, bktName, objName, uploadID string) *httptest.ResponseRecorder {
query := make(url.Values)
query.Set(uploadIDQuery, uploadID)
w, r := prepareTestFullRequest(hc, bktName, objName, query, nil)
hc.Handler().AbortMultipartUploadHandler(w, r)
return w
}
func uploadPartEncrypted(hc *handlerContext, bktName, objName, uploadID string, num, size int) (string, []byte) { func uploadPartEncrypted(hc *handlerContext, bktName, objName, uploadID string, num, size int) (string, []byte) {
return uploadPartBase(hc, bktName, objName, true, uploadID, num, size) return uploadPartBase(hc, bktName, objName, true, uploadID, num, size)
} }

View file

@ -167,7 +167,7 @@ func prepareHandlerContextBase(cacheCfg *layer.CachesConfig) (*handlerContextBas
tp := layer.NewTestFrostFS(key) tp := layer.NewTestFrostFS(key)
testResolver := &resolver.Resolver{Name: "test_resolver"} testResolver := &resolver.Resolver{Name: "test_resolver"}
testResolver.SetResolveFunc(func(_ context.Context, name string) (cid.ID, error) { testResolver.SetResolveFunc(func(_ context.Context, _, name string) (cid.ID, error) {
return tp.ContainerID(name) return tp.ContainerID(name)
}) })
@ -243,6 +243,7 @@ func getMinCacheConfig(logger *zap.Logger) *layer.CachesConfig {
Buckets: minCacheCfg, Buckets: minCacheCfg,
System: minCacheCfg, System: minCacheCfg,
AccessControl: minCacheCfg, AccessControl: minCacheCfg,
NetworkInfo: &cache.NetworkInfoCacheConfig{Lifetime: minCacheCfg.Lifetime},
} }
} }

View file

@ -205,10 +205,7 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
var size uint64 size := h.getPutPayloadSize(r)
if r.ContentLength > 0 {
size = uint64(r.ContentLength)
}
p := &layer.UploadPartParams{ p := &layer.UploadPartParams{
Info: &layer.UploadInfoParams{ Info: &layer.UploadInfoParams{

View file

@ -17,6 +17,10 @@ import (
s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" s3Errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer/encryption"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -122,6 +126,108 @@ func TestMultipartReUploadPart(t *testing.T) {
equalDataSlices(t, append(data1, data2...), data) equalDataSlices(t, append(data1, data2...), data)
} }
func TestMultipartRemovePartsSplit(t *testing.T) {
bktName, objName := "bucket-to-upload-part", "object-multipart"
partSize := 8
t.Run("reupload part", func(t *testing.T) {
hc := prepareHandlerContext(t)
bktInfo := createTestBucket(hc, bktName)
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
require.NoError(t, err)
objID := oidtest.ID()
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
"Number": "1",
"OID": objID.EncodeToString(),
"Owner": usertest.ID().EncodeToString(),
"ETag": "etag",
})
require.NoError(t, err)
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
require.Len(t, hc.tp.Objects(), 2)
list := listParts(hc, bktName, objName, uploadInfo.UploadID, "0", http.StatusOK)
require.Len(t, list.Parts, 1)
require.Equal(t, `"etag"`, list.Parts[0].ETag)
etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
list = listParts(hc, bktName, objName, uploadInfo.UploadID, "0", http.StatusOK)
require.Len(t, list.Parts, 1)
require.Equal(t, etag1, list.Parts[0].ETag)
require.Len(t, hc.tp.Objects(), 1)
})
t.Run("abort multipart", func(t *testing.T) {
hc := prepareHandlerContext(t)
bktInfo := createTestBucket(hc, bktName)
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
require.NoError(t, err)
objID := oidtest.ID()
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
"Number": "1",
"OID": objID.EncodeToString(),
"Owner": usertest.ID().EncodeToString(),
"ETag": "etag",
})
require.NoError(t, err)
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
require.Len(t, hc.tp.Objects(), 2)
abortMultipartUpload(hc, bktName, objName, uploadInfo.UploadID)
require.Empty(t, hc.tp.Objects())
})
t.Run("complete multipart", func(t *testing.T) {
hc := prepareHandlerContext(t)
bktInfo := createTestBucket(hc, bktName)
uploadInfo := createMultipartUpload(hc, bktName, objName, map[string]string{})
etag1, _ := uploadPart(hc, bktName, objName, uploadInfo.UploadID, 1, partSize)
multipartInfo, err := hc.tree.GetMultipartUpload(hc.Context(), bktInfo, uploadInfo.Key, uploadInfo.UploadID)
require.NoError(t, err)
objID := oidtest.ID()
_, err = hc.treeMock.AddNode(hc.Context(), bktInfo, "system", multipartInfo.ID, map[string]string{
"Number": "1",
"OID": objID.EncodeToString(),
"Owner": usertest.ID().EncodeToString(),
"ETag": "etag",
})
require.NoError(t, err)
hc.tp.AddObject(bktInfo.CID.EncodeToString()+"/"+objID.EncodeToString(), object.New())
require.Len(t, hc.tp.Objects(), 2)
completeMultipartUpload(hc, bktName, objName, uploadInfo.UploadID, []string{etag1})
require.Falsef(t, containsOID(hc.tp.Objects(), objID), "frostfs contains '%s' object, but shouldn't", objID)
})
}
func containsOID(objects []*object.Object, objID oid.ID) bool {
for _, o := range objects {
oID, _ := o.ID()
if oID.Equals(objID) {
return true
}
}
return false
}
func TestListMultipartUploads(t *testing.T) { func TestListMultipartUploads(t *testing.T) {
hc := prepareHandlerContext(t) hc := prepareHandlerContext(t)

View file

@ -242,10 +242,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
metadata[api.ContentEncoding] = encodings metadata[api.ContentEncoding] = encodings
} }
var size uint64 size := h.getPutPayloadSize(r)
if r.ContentLength > 0 {
size = uint64(r.ContentLength)
}
params := &layer.PutObjectParams{ params := &layer.PutObjectParams{
BktInfo: bktInfo, BktInfo: bktInfo,

View file

@ -29,6 +29,11 @@ import (
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
const (
awsChunkedRequestExampleDecodedContentLength = 66560
awsChunkedRequestExampleContentLength = 66824
)
func TestCheckBucketName(t *testing.T) { func TestCheckBucketName(t *testing.T) {
for _, tc := range []struct { for _, tc := range []struct {
name string name string
@ -361,7 +366,12 @@ func TestPutObjectWithStreamBodyAWSExample(t *testing.T) {
hc.Handler().PutObjectHandler(w, req) hc.Handler().PutObjectHandler(w, req)
assertStatus(t, w, http.StatusOK) assertStatus(t, w, http.StatusOK)
data := getObjectRange(t, hc, bktName, objName, 0, 66824) w, req = prepareTestRequest(hc, bktName, objName, nil)
hc.Handler().HeadObjectHandler(w, req)
assertStatus(t, w, http.StatusOK)
require.Equal(t, strconv.Itoa(awsChunkedRequestExampleDecodedContentLength), w.Header().Get(api.ContentLength))
data := getObjectRange(t, hc, bktName, objName, 0, awsChunkedRequestExampleDecodedContentLength)
for i := range chunk { for i := range chunk {
require.Equal(t, chunk[i], data[i]) require.Equal(t, chunk[i], data[i])
} }
@ -397,6 +407,8 @@ func TestPutChunkedTestContentEncoding(t *testing.T) {
require.Equal(t, "gzip", resp.Header().Get(api.ContentEncoding)) require.Equal(t, "gzip", resp.Header().Get(api.ContentEncoding))
} }
// getChunkedRequest implements request example from
// https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-streaming.html
func getChunkedRequest(ctx context.Context, t *testing.T, bktName, objName string) (*httptest.ResponseRecorder, *http.Request, []byte) { func getChunkedRequest(ctx context.Context, t *testing.T, bktName, objName string) (*httptest.ResponseRecorder, *http.Request, []byte) {
chunk := make([]byte, 65*1024) chunk := make([]byte, 65*1024)
for i := range chunk { for i := range chunk {
@ -424,9 +436,9 @@ func getChunkedRequest(ctx context.Context, t *testing.T, bktName, objName strin
req, err := http.NewRequest("PUT", "https://s3.amazonaws.com/"+bktName+"/"+objName, nil) req, err := http.NewRequest("PUT", "https://s3.amazonaws.com/"+bktName+"/"+objName, nil)
require.NoError(t, err) require.NoError(t, err)
req.Header.Set("content-encoding", "aws-chunked") req.Header.Set("content-encoding", "aws-chunked")
req.Header.Set("content-length", "66824") req.Header.Set("content-length", strconv.Itoa(awsChunkedRequestExampleContentLength))
req.Header.Set("x-amz-content-sha256", "STREAMING-AWS4-HMAC-SHA256-PAYLOAD") req.Header.Set("x-amz-content-sha256", "STREAMING-AWS4-HMAC-SHA256-PAYLOAD")
req.Header.Set("x-amz-decoded-content-length", "66560") req.Header.Set("x-amz-decoded-content-length", strconv.Itoa(awsChunkedRequestExampleDecodedContentLength))
req.Header.Set("x-amz-storage-class", "REDUCED_REDUNDANCY") req.Header.Set("x-amz-storage-class", "REDUCED_REDUNDANCY")
signTime, err := time.Parse("20060102T150405Z", "20130524T000000Z") signTime, err := time.Parse("20060102T150405Z", "20130524T000000Z")

View file

@ -106,6 +106,23 @@ func (h *handler) getBucketAndCheckOwner(r *http.Request, bucket string, header
return bktInfo, checkOwner(bktInfo, expected) return bktInfo, checkOwner(bktInfo, expected)
} }
func (h *handler) getPutPayloadSize(r *http.Request) uint64 {
decodeContentSize := r.Header.Get(api.AmzDecodedContentLength)
decodedSize, err := strconv.Atoi(decodeContentSize)
if err != nil {
decodedSize = 0
}
var size uint64
if decodedSize > 0 {
size = uint64(decodedSize)
} else if r.ContentLength > 0 {
size = uint64(r.ContentLength)
}
return size
}
func parseRange(s string) (*layer.RangeParams, error) { func parseRange(s string) (*layer.RangeParams, error) {
if s == "" { if s == "" {
return nil, nil return nil, nil

View file

@ -5,6 +5,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"go.uber.org/zap" "go.uber.org/zap"
@ -19,6 +20,7 @@ type Cache struct {
bucketCache *cache.BucketCache bucketCache *cache.BucketCache
systemCache *cache.SystemCache systemCache *cache.SystemCache
accessCache *cache.AccessControlCache accessCache *cache.AccessControlCache
networkInfoCache *cache.NetworkInfoCache
} }
// CachesConfig contains params for caches. // CachesConfig contains params for caches.
@ -31,6 +33,7 @@ type CachesConfig struct {
Buckets *cache.Config Buckets *cache.Config
System *cache.Config System *cache.Config
AccessControl *cache.Config AccessControl *cache.Config
NetworkInfo *cache.NetworkInfoCacheConfig
} }
// DefaultCachesConfigs returns filled configs. // DefaultCachesConfigs returns filled configs.
@ -44,6 +47,7 @@ func DefaultCachesConfigs(logger *zap.Logger) *CachesConfig {
Buckets: cache.DefaultBucketConfig(logger), Buckets: cache.DefaultBucketConfig(logger),
System: cache.DefaultSystemConfig(logger), System: cache.DefaultSystemConfig(logger),
AccessControl: cache.DefaultAccessControlConfig(logger), AccessControl: cache.DefaultAccessControlConfig(logger),
NetworkInfo: cache.DefaultNetworkInfoConfig(logger),
} }
} }
@ -57,6 +61,7 @@ func NewCache(cfg *CachesConfig) *Cache {
bucketCache: cache.NewBucketCache(cfg.Buckets), bucketCache: cache.NewBucketCache(cfg.Buckets),
systemCache: cache.NewSystemCache(cfg.System), systemCache: cache.NewSystemCache(cfg.System),
accessCache: cache.NewAccessControlCache(cfg.AccessControl), accessCache: cache.NewAccessControlCache(cfg.AccessControl),
networkInfoCache: cache.NewNetworkInfoCache(cfg.NetworkInfo),
} }
} }
@ -283,3 +288,13 @@ func (c *Cache) PutLifecycleConfiguration(owner user.ID, bkt *data.BucketInfo, c
func (c *Cache) DeleteLifecycleConfiguration(bktInfo *data.BucketInfo) { func (c *Cache) DeleteLifecycleConfiguration(bktInfo *data.BucketInfo) {
c.systemCache.Delete(bktInfo.LifecycleConfigurationObjectName()) c.systemCache.Delete(bktInfo.LifecycleConfigurationObjectName())
} }
func (c *Cache) GetNetworkInfo() *netmap.NetworkInfo {
return c.networkInfoCache.Get()
}
func (c *Cache) PutNetworkInfo(info netmap.NetworkInfo) {
if err := c.networkInfoCache.Put(info); err != nil {
c.logger.Warn(logs.CouldntCacheNetworkInfo, zap.Error(err))
}
}

View file

@ -64,7 +64,7 @@ func (n *Layer) containerInfo(ctx context.Context, prm PrmContainer) (*data.Buck
} }
} }
zone, _ := n.features.FormContainerZone(reqInfo.Namespace) zone := n.features.FormContainerZone(reqInfo.Namespace)
if zone != info.Zone { if zone != info.Zone {
return nil, fmt.Errorf("ns '%s' and zone '%s' are mismatched for container '%s'", zone, info.Zone, prm.ContainerID) return nil, fmt.Errorf("ns '%s' and zone '%s' are mismatched for container '%s'", zone, info.Zone, prm.ContainerID)
} }
@ -111,7 +111,7 @@ func (n *Layer) createContainer(ctx context.Context, p *CreateBucketParams) (*da
p.LocationConstraint = api.DefaultLocationConstraint // s3tests_boto3.functional.test_s3:test_bucket_get_location p.LocationConstraint = api.DefaultLocationConstraint // s3tests_boto3.functional.test_s3:test_bucket_get_location
} }
zone, _ := n.features.FormContainerZone(p.Namespace) zone := n.features.FormContainerZone(p.Namespace)
bktInfo := &data.BucketInfo{ bktInfo := &data.BucketInfo{
Name: p.Name, Name: p.Name,

View file

@ -52,12 +52,12 @@ func (k *FeatureSettingsMock) SetMD5Enabled(md5Enabled bool) {
k.md5Enabled = md5Enabled k.md5Enabled = md5Enabled
} }
func (k *FeatureSettingsMock) FormContainerZone(ns string) (zone string, isDefault bool) { func (k *FeatureSettingsMock) FormContainerZone(ns string) string {
if ns == "" { if ns == "" {
return v2container.SysAttributeZoneDefault, true return v2container.SysAttributeZoneDefault
} }
return ns + ".ns", false return ns + ".ns"
} }
type TestFrostFS struct { type TestFrostFS struct {

View file

@ -35,14 +35,14 @@ import (
type ( type (
BucketResolver interface { BucketResolver interface {
Resolve(ctx context.Context, name string) (cid.ID, error) Resolve(ctx context.Context, zone, name string) (cid.ID, error)
} }
FeatureSettings interface { FeatureSettings interface {
ClientCut() bool ClientCut() bool
BufferMaxSizeForPut() uint64 BufferMaxSizeForPut() uint64
MD5Enabled() bool MD5Enabled() bool
FormContainerZone(ns string) (zone string, isDefault bool) FormContainerZone(ns string) string
} }
Layer struct { Layer struct {
@ -322,13 +322,13 @@ func (n *Layer) GetBucketInfo(ctx context.Context, name string) (*data.BucketInf
} }
reqInfo := middleware.GetReqInfo(ctx) reqInfo := middleware.GetReqInfo(ctx)
zone, _ := n.features.FormContainerZone(reqInfo.Namespace) zone := n.features.FormContainerZone(reqInfo.Namespace)
if bktInfo := n.cache.GetBucket(zone, name); bktInfo != nil { if bktInfo := n.cache.GetBucket(zone, name); bktInfo != nil {
return bktInfo, nil return bktInfo, nil
} }
containerID, err := n.ResolveBucket(ctx, name) containerID, err := n.ResolveBucket(ctx, zone, name)
if err != nil { if err != nil {
if strings.Contains(err.Error(), "not found") { if strings.Contains(err.Error(), "not found") {
return nil, fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchBucket), err.Error()) return nil, fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchBucket), err.Error())
@ -352,13 +352,13 @@ func (n *Layer) ResolveCID(ctx context.Context, name string) (cid.ID, error) {
} }
reqInfo := middleware.GetReqInfo(ctx) reqInfo := middleware.GetReqInfo(ctx)
zone, _ := n.features.FormContainerZone(reqInfo.Namespace) zone := n.features.FormContainerZone(reqInfo.Namespace)
if bktInfo := n.cache.GetBucket(zone, name); bktInfo != nil { if bktInfo := n.cache.GetBucket(zone, name); bktInfo != nil {
return bktInfo.CID, nil return bktInfo.CID, nil
} }
return n.ResolveBucket(ctx, name) return n.ResolveBucket(ctx, zone, name)
} }
// ListBuckets returns all user containers. The name of the bucket is a container // ListBuckets returns all user containers. The name of the bucket is a container
@ -798,10 +798,10 @@ func (n *Layer) CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.
return nil, errors.GetAPIError(errors.ErrBucketAlreadyExists) return nil, errors.GetAPIError(errors.ErrBucketAlreadyExists)
} }
func (n *Layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error) { func (n *Layer) ResolveBucket(ctx context.Context, zone, name string) (cid.ID, error) {
var cnrID cid.ID var cnrID cid.ID
if err := cnrID.DecodeString(name); err != nil { if err := cnrID.DecodeString(name); err != nil {
if cnrID, err = n.resolver.Resolve(ctx, name); err != nil { if cnrID, err = n.resolver.Resolve(ctx, zone, name); err != nil {
return cid.ID{}, err return cid.ID{}, err
} }
@ -855,10 +855,17 @@ func (n *Layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
} }
func (n *Layer) GetNetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) { func (n *Layer) GetNetworkInfo(ctx context.Context) (netmap.NetworkInfo, error) {
cachedInfo := n.cache.GetNetworkInfo()
if cachedInfo != nil {
return *cachedInfo, nil
}
networkInfo, err := n.frostFS.NetworkInfo(ctx) networkInfo, err := n.frostFS.NetworkInfo(ctx)
if err != nil { if err != nil {
return networkInfo, fmt.Errorf("get network info: %w", err) return netmap.NetworkInfo{}, fmt.Errorf("get network info: %w", err)
} }
n.cache.PutNetworkInfo(networkInfo)
return networkInfo, nil return networkInfo, nil
} }

View file

@ -150,9 +150,9 @@ func (n *Layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar
metaSize += len(p.Data.TagSet) metaSize += len(p.Data.TagSet)
} }
networkInfo, err := n.frostFS.NetworkInfo(ctx) networkInfo, err := n.GetNetworkInfo(ctx)
if err != nil { if err != nil {
return fmt.Errorf("get network info: %w", err) return err
} }
info := &data.MultipartInfo{ info := &data.MultipartInfo{
@ -290,18 +290,20 @@ func (n *Layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
MD5: hex.EncodeToString(createdObj.MD5Sum), MD5: hex.EncodeToString(createdObj.MD5Sum),
} }
oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo) oldPartIDs, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo)
oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove) oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove)
if err != nil && !oldPartIDNotFound { if err != nil && !oldPartIDNotFound {
return nil, err return nil, err
} }
if !oldPartIDNotFound { if !oldPartIDNotFound {
for _, oldPartID := range oldPartIDs {
if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil { if err = n.objectDelete(ctx, bktInfo, oldPartID); err != nil {
n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err), n.reqLogger(ctx).Error(logs.CouldntDeleteOldPartObject, zap.Error(err),
zap.String("cid", bktInfo.CID.EncodeToString()), zap.String("cid", bktInfo.CID.EncodeToString()),
zap.String("oid", oldPartID.EncodeToString())) zap.String("oid", oldPartID.EncodeToString()))
} }
} }
}
objInfo := &data.ObjectInfo{ objInfo := &data.ObjectInfo{
ID: createdObj.ID, ID: createdObj.ID,
@ -385,16 +387,15 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
var multipartObjetSize uint64 var multipartObjetSize uint64
var encMultipartObjectSize uint64 var encMultipartObjectSize uint64
parts := make([]*data.PartInfo, 0, len(p.Parts)) parts := make([]*data.PartInfoExtended, 0, len(p.Parts))
var completedPartsHeader strings.Builder var completedPartsHeader strings.Builder
md5Hash := md5.New() md5Hash := md5.New()
for i, part := range p.Parts { for i, part := range p.Parts {
partInfo := partsInfo[part.PartNumber] partInfo := partsInfo.Extract(part.PartNumber, data.UnQuote(part.ETag), n.features.MD5Enabled())
if partInfo == nil || data.UnQuote(part.ETag) != partInfo.GetETag(n.features.MD5Enabled()) { if partInfo == nil {
return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber) return nil, nil, fmt.Errorf("%w: unknown part %d or etag mismatched", s3errors.GetAPIError(s3errors.ErrInvalidPart), part.PartNumber)
} }
delete(partsInfo, part.PartNumber)
// for the last part we have no minimum size limit // for the last part we have no minimum size limit
if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize { if i != len(p.Parts)-1 && partInfo.Size < UploadMinSize {
@ -475,7 +476,8 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
var addr oid.Address var addr oid.Address
addr.SetContainer(p.Info.Bkt.CID) addr.SetContainer(p.Info.Bkt.CID)
for _, partInfo := range partsInfo { for _, prts := range partsInfo {
for _, partInfo := range prts {
if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil { if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil {
n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart, n.reqLogger(ctx).Warn(logs.CouldNotDeleteUploadPart,
zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID), zap.Stringer("cid", p.Info.Bkt.CID), zap.Stringer("oid", &partInfo.OID),
@ -484,6 +486,7 @@ func (n *Layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
addr.SetObject(partInfo.OID) addr.SetObject(partInfo.OID)
n.cache.DeleteObject(addr) n.cache.DeleteObject(addr)
} }
}
return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo) return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo)
} }
@ -554,12 +557,14 @@ func (n *Layer) AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) e
return err return err
} }
for _, info := range parts { for _, infos := range parts {
for _, info := range infos {
if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil { if err = n.objectDelete(ctx, p.Bkt, info.OID); err != nil {
n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()), n.reqLogger(ctx).Warn(logs.CouldntDeletePart, zap.String("cid", p.Bkt.CID.EncodeToString()),
zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err)) zap.String("oid", info.OID.EncodeToString()), zap.Int("part number", info.Number), zap.Error(err))
} }
} }
}
return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo) return n.treeService.DeleteMultipartUpload(ctx, p.Bkt, multipartInfo)
} }
@ -581,7 +586,12 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
parts := make([]*Part, 0, len(partsInfo)) parts := make([]*Part, 0, len(partsInfo))
for _, partInfo := range partsInfo { for _, infos := range partsInfo {
sort.Slice(infos, func(i, j int) bool {
return infos[i].Timestamp < infos[j].Timestamp
})
partInfo := infos[len(infos)-1]
parts = append(parts, &Part{ parts = append(parts, &Part{
ETag: data.Quote(partInfo.GetETag(n.features.MD5Enabled())), ETag: data.Quote(partInfo.GetETag(n.features.MD5Enabled())),
LastModified: partInfo.Created.UTC().Format(time.RFC3339), LastModified: partInfo.Created.UTC().Format(time.RFC3339),
@ -618,7 +628,22 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
return &res, nil return &res, nil
} }
func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.PartInfo, error) { type PartsInfo map[int][]*data.PartInfoExtended
func (p PartsInfo) Extract(part int, etag string, md5Enabled bool) *data.PartInfoExtended {
parts := p[part]
for i, info := range parts {
if info.GetETag(md5Enabled) == etag {
p[part] = append(parts[:i], parts[i+1:]...)
return info
}
}
return nil
}
func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, PartsInfo, error) {
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID) multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID)
if err != nil { if err != nil {
if errors.Is(err, ErrNodeNotFound) { if errors.Is(err, ErrNodeNotFound) {
@ -632,11 +657,11 @@ func (n *Layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.
return nil, nil, err return nil, nil, err
} }
res := make(map[int]*data.PartInfo, len(parts)) res := make(map[int][]*data.PartInfoExtended, len(parts))
partsNumbers := make([]int, len(parts)) partsNumbers := make([]int, len(parts))
oids := make([]string, len(parts)) oids := make([]string, len(parts))
for i, part := range parts { for i, part := range parts {
res[part.Number] = part res[part.Number] = append(res[part.Number], part)
partsNumbers[i] = part.Number partsNumbers[i] = part.Number
oids[i] = part.OID.EncodeToString() oids[i] = part.OID.EncodeToString()
} }

View file

@ -12,6 +12,7 @@ import (
"fmt" "fmt"
"io" "io"
"mime" "mime"
"net/http"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings" "strings"
@ -21,6 +22,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
apiErrors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" apiErrors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/detector"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -245,11 +247,11 @@ func (n *Layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend
if r != nil { if r != nil {
if len(p.Header[api.ContentType]) == 0 { if len(p.Header[api.ContentType]) == 0 {
if contentType := MimeByFilePath(p.Object); len(contentType) == 0 { if contentType := MimeByFilePath(p.Object); len(contentType) == 0 {
d := newDetector(r) d := detector.NewDetector(r, http.DetectContentType)
if contentType, err := d.Detect(); err == nil { if contentType, err := d.Detect(); err == nil {
p.Header[api.ContentType] = contentType p.Header[api.ContentType] = contentType
} }
r = d.MultiReader() r = d.RestoredReader()
} else { } else {
p.Header[api.ContentType] = contentType p.Header[api.ContentType] = contentType
} }

View file

@ -6,6 +6,7 @@ import (
"io" "io"
"sort" "sort"
"strings" "strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -33,7 +34,7 @@ type TreeServiceMock struct {
locks map[string]map[uint64]*data.LockInfo locks map[string]map[uint64]*data.LockInfo
tags map[string]map[uint64]map[string]string tags map[string]map[uint64]map[string]string
multiparts map[string]map[string][]*data.MultipartInfo multiparts map[string]map[string][]*data.MultipartInfo
parts map[string]map[int]*data.PartInfo parts map[string]map[int]*data.PartInfoExtended
} }
func (t *TreeServiceMock) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) { func (t *TreeServiceMock) GetObjectTaggingAndLock(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, *data.LockInfo, error) {
@ -92,7 +93,7 @@ func NewTreeService() *TreeServiceMock {
locks: make(map[string]map[uint64]*data.LockInfo), locks: make(map[string]map[uint64]*data.LockInfo),
tags: make(map[string]map[uint64]map[string]string), tags: make(map[string]map[uint64]map[string]string),
multiparts: make(map[string]map[string][]*data.MultipartInfo), multiparts: make(map[string]map[string][]*data.MultipartInfo),
parts: make(map[string]map[int]*data.PartInfo), parts: make(map[string]map[int]*data.PartInfoExtended),
} }
} }
@ -346,28 +347,31 @@ func (t *TreeServiceMock) GetMultipartUpload(_ context.Context, bktInfo *data.Bu
return nil, ErrNodeNotFound return nil, ErrNodeNotFound
} }
func (t *TreeServiceMock) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) { func (t *TreeServiceMock) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) {
multipartInfo, err := t.GetMultipartUpload(ctx, bktInfo, info.Key, info.UploadID) multipartInfo, err := t.GetMultipartUpload(ctx, bktInfo, info.Key, info.UploadID)
if err != nil { if err != nil {
return oid.ID{}, err return nil, err
} }
if multipartInfo.ID != multipartNodeID { if multipartInfo.ID != multipartNodeID {
return oid.ID{}, fmt.Errorf("invalid multipart info id") return nil, fmt.Errorf("invalid multipart info id")
} }
partsMap, ok := t.parts[info.UploadID] partsMap, ok := t.parts[info.UploadID]
if !ok { if !ok {
partsMap = make(map[int]*data.PartInfo) partsMap = make(map[int]*data.PartInfoExtended)
} }
partsMap[info.Number] = info partsMap[info.Number] = &data.PartInfoExtended{
PartInfo: *info,
Timestamp: uint64(time.Now().UnixMicro()),
}
t.parts[info.UploadID] = partsMap t.parts[info.UploadID] = partsMap
return oid.ID{}, nil return nil, nil
} }
func (t *TreeServiceMock) GetParts(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) { func (t *TreeServiceMock) GetParts(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) {
cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()] cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()]
var foundMultipart *data.MultipartInfo var foundMultipart *data.MultipartInfo
@ -387,7 +391,7 @@ LOOP:
} }
partsMap := t.parts[foundMultipart.UploadID] partsMap := t.parts[foundMultipart.UploadID]
result := make([]*data.PartInfo, 0, len(partsMap)) result := make([]*data.PartInfoExtended, 0, len(partsMap))
for _, part := range partsMap { for _, part := range partsMap {
result = append(result, part) result = append(result, part)
} }

View file

@ -57,11 +57,11 @@ type TreeService interface {
GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, objectName, uploadID string) (*data.MultipartInfo, error)
// AddPart puts a node to a system tree as a child of appropriate multipart upload // AddPart puts a node to a system tree as a child of appropriate multipart upload
// and returns objectID of a previous part which must be deleted in FrostFS. // and returns objectIDs of a previous part/s which must be deleted in FrostFS.
// //
// If object id to remove is not found returns ErrNoNodeToRemove error. // If object ids to remove is not found returns ErrNoNodeToRemove error.
AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error)
GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error)
PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) PutBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error)
GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) GetBucketLifecycleConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error)

View file

@ -3,14 +3,18 @@ package middleware
import ( import (
"net/http" "net/http"
"net/url" "net/url"
"strconv"
"strings" "strings"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"go.uber.org/zap" "go.uber.org/zap"
) )
const wildcardPlaceholder = "<wildcard>" const (
wildcardPlaceholder = "<wildcard>"
enabledVHS = "enabled"
disabledVHS = "disabled"
)
type VHSSettings interface { type VHSSettings interface {
Domains() []string Domains() []string
@ -26,9 +30,9 @@ func PrepareAddressStyle(settings VHSSettings, log *zap.Logger) Func {
ctx := r.Context() ctx := r.Context()
reqInfo := GetReqInfo(ctx) reqInfo := GetReqInfo(ctx)
reqLogger := reqLogOrDefault(ctx, log) reqLogger := reqLogOrDefault(ctx, log)
headerVHSEnabled := r.Header.Get(settings.VHSHeader()) statusVHS := r.Header.Get(settings.VHSHeader())
if isVHSAddress(headerVHSEnabled, settings.GlobalVHS(), settings.VHSNamespacesEnabled(), reqInfo.Namespace) { if isVHSAddress(statusVHS, settings.GlobalVHS(), settings.VHSNamespacesEnabled(), reqInfo.Namespace) {
prepareVHSAddress(reqInfo, r, settings) prepareVHSAddress(reqInfo, r, settings)
} else { } else {
preparePathStyleAddress(reqInfo, r, reqLogger) preparePathStyleAddress(reqInfo, r, reqLogger)
@ -39,17 +43,20 @@ func PrepareAddressStyle(settings VHSSettings, log *zap.Logger) Func {
} }
} }
func isVHSAddress(headerVHSEnabled string, enabledFlag bool, vhsNamespaces map[string]bool, namespace string) bool { func isVHSAddress(statusVHS string, enabledFlag bool, vhsNamespaces map[string]bool, namespace string) bool {
if result, err := strconv.ParseBool(headerVHSEnabled); err == nil { switch statusVHS {
return result case enabledVHS:
} return true
case disabledVHS:
return false
default:
result := enabledFlag result := enabledFlag
if v, ok := vhsNamespaces[namespace]; ok { if v, ok := vhsNamespaces[namespace]; ok {
result = v result = v
} }
return result return result
}
} }
func prepareVHSAddress(reqInfo *ReqInfo, r *http.Request, settings VHSSettings) { func prepareVHSAddress(reqInfo *ReqInfo, r *http.Request, settings VHSSettings) {

View file

@ -42,7 +42,7 @@ func (v *VHSSettingsMock) VHSNamespacesEnabled() map[string]bool {
func TestIsVHSAddress(t *testing.T) { func TestIsVHSAddress(t *testing.T) {
for _, tc := range []struct { for _, tc := range []struct {
name string name string
headerVHSEnabled string headerStatusVHS string
vhsEnabledFlag bool vhsEnabledFlag bool
vhsNamespaced map[string]bool vhsNamespaced map[string]bool
namespace string namespace string
@ -76,7 +76,7 @@ func TestIsVHSAddress(t *testing.T) {
}, },
{ {
name: "vhs enabled (header)", name: "vhs enabled (header)",
headerVHSEnabled: "true", headerStatusVHS: enabledVHS,
vhsEnabledFlag: false, vhsEnabledFlag: false,
vhsNamespaced: map[string]bool{ vhsNamespaced: map[string]bool{
"kapusta": false, "kapusta": false,
@ -86,7 +86,7 @@ func TestIsVHSAddress(t *testing.T) {
}, },
{ {
name: "vhs disabled (header)", name: "vhs disabled (header)",
headerVHSEnabled: "false", headerStatusVHS: disabledVHS,
vhsEnabledFlag: true, vhsEnabledFlag: true,
vhsNamespaced: map[string]bool{ vhsNamespaced: map[string]bool{
"kapusta": true, "kapusta": true,
@ -96,7 +96,7 @@ func TestIsVHSAddress(t *testing.T) {
}, },
} { } {
t.Run(tc.name, func(t *testing.T) { t.Run(tc.name, func(t *testing.T) {
actual := isVHSAddress(tc.headerVHSEnabled, tc.vhsEnabledFlag, tc.vhsNamespaced, tc.namespace) actual := isVHSAddress(tc.headerStatusVHS, tc.vhsEnabledFlag, tc.vhsNamespaced, tc.namespace)
require.Equal(t, tc.expected, actual) require.Equal(t, tc.expected, actual)
}) })
} }

View file

@ -107,3 +107,9 @@ const (
PartNumberQuery = "partNumber" PartNumberQuery = "partNumber"
LegalHoldQuery = "legal-hold" LegalHoldQuery = "legal-hold"
) )
const (
StdoutPath = "stdout"
StderrPath = "stderr"
SinkName = "lumberjack"
)

237
api/middleware/log_http.go Normal file
View file

@ -0,0 +1,237 @@
//go:build loghttp
package middleware
import (
"bytes"
"io"
"net/http"
"os"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/detector"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/xmlutils"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"gopkg.in/natefinch/lumberjack.v2"
)
type (
LogHTTPSettings interface {
LogHTTPConfig() LogHTTPConfig
}
LogHTTPConfig struct {
Enabled bool
MaxBody int64
MaxLogSize int
OutputPath string
UseGzip bool
log *httpLogger
}
httpLogger struct {
*zap.Logger
logRoller *lumberjack.Logger
}
// responseReadWriter helps read http response body.
responseReadWriter struct {
http.ResponseWriter
response *bytes.Buffer
statusCode int
}
)
const (
payloadLabel = "payload"
responseLabel = "response"
)
func (lc *LogHTTPConfig) InitHTTPLogger(log *zap.Logger) {
if err := lc.initHTTPLogger(); err != nil {
log.Error(logs.FailedToInitializeHTTPLogger, zap.Error(err))
}
}
// initHTTPLogger returns registers zap sink and returns new httpLogger.
func (lc *LogHTTPConfig) initHTTPLogger() (err error) {
lc.log = &httpLogger{
Logger: zap.NewNop(),
logRoller: &lumberjack.Logger{},
}
c := newLoggerConfig()
lc.log.Logger, err = c.Build()
if err != nil {
return err
}
lc.setLogOutput()
return nil
}
// newLoggerConfig creates new zap.Config with disabled base fields.
func newLoggerConfig() zap.Config {
c := zap.NewProductionConfig()
c.DisableCaller = true
c.DisableStacktrace = true
c.EncoderConfig = newEncoderConfig()
c.Sampling = nil
return c
}
func (lc *LogHTTPConfig) setLogOutput() {
var output zapcore.WriteSyncer
switch lc.OutputPath {
case "", StdoutPath:
output = zapcore.AddSync(os.Stdout)
case StderrPath:
output = zapcore.AddSync(os.Stderr)
default:
output = zapcore.AddSync(&lumberjack.Logger{
Filename: lc.OutputPath,
MaxSize: lc.MaxLogSize,
Compress: lc.UseGzip,
})
}
// create logger with new sync
lc.log.Logger = lc.log.Logger.WithOptions(zap.WrapCore(func(core zapcore.Core) zapcore.Core {
return zapcore.NewCore(zapcore.NewJSONEncoder(newEncoderConfig()), output, zapcore.InfoLevel)
}))
}
func newEncoderConfig() zapcore.EncoderConfig {
c := zap.NewProductionEncoderConfig()
c.MessageKey = zapcore.OmitKey
c.LevelKey = zapcore.OmitKey
c.TimeKey = zapcore.OmitKey
c.FunctionKey = zapcore.OmitKey
return c
}
func (ww *responseReadWriter) Write(data []byte) (int, error) {
ww.response.Write(data)
return ww.ResponseWriter.Write(data)
}
func (ww *responseReadWriter) WriteHeader(code int) {
ww.statusCode = code
ww.ResponseWriter.WriteHeader(code)
}
func (ww *responseReadWriter) Flush() {
if f, ok := ww.ResponseWriter.(http.Flusher); ok {
f.Flush()
}
}
// LogHTTP logs http parameters from s3 request.
func LogHTTP(l *zap.Logger, settings LogHTTPSettings) Func {
return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
config := settings.LogHTTPConfig()
if !config.Enabled || config.log == nil {
h.ServeHTTP(w, r)
return
}
httplog := config.log.getHTTPLogger(r).
withFieldIfExist("query", r.URL.Query()).
withFieldIfExist("headers", r.Header)
payload := getBody(r.Body, l)
r.Body = io.NopCloser(bytes.NewReader(payload))
payloadReader := io.LimitReader(bytes.NewReader(payload), config.MaxBody)
httplog = httplog.withProcessedBody(payloadLabel, payloadReader, l)
wr := newResponseReadWriter(w)
h.ServeHTTP(wr, r)
respReader := io.LimitReader(wr.response, config.MaxBody)
httplog = httplog.withProcessedBody(responseLabel, respReader, l)
httplog = httplog.with(zap.Int("status", wr.statusCode))
httplog.Info(logs.LogHTTP)
})
}
}
// withFieldIfExist checks whether data is not empty and attach it to log output.
func (lg *httpLogger) withFieldIfExist(label string, data map[string][]string) *httpLogger {
if len(data) != 0 {
return lg.with(zap.Any(label, data))
}
return lg
}
func (lg *httpLogger) with(fields ...zap.Field) *httpLogger {
return &httpLogger{
Logger: lg.Logger.With(fields...),
logRoller: lg.logRoller,
}
}
func (lg *httpLogger) getHTTPLogger(r *http.Request) *httpLogger {
return lg.with(
zap.String("from", r.RemoteAddr),
zap.String("URI", r.RequestURI),
zap.String("method", r.Method),
zap.String("protocol", r.Proto),
)
}
func (lg *httpLogger) withProcessedBody(label string, bodyReader io.Reader, l *zap.Logger) *httpLogger {
resp, err := processBody(bodyReader)
if err != nil {
l.Error(logs.FailedToProcessHTTPBody,
zap.Error(err),
zap.String("body type", payloadLabel))
return lg
}
return lg.with(zap.ByteString(label, resp))
}
func newResponseReadWriter(w http.ResponseWriter) *responseReadWriter {
return &responseReadWriter{
ResponseWriter: w,
response: &bytes.Buffer{},
}
}
func getBody(httpBody io.ReadCloser, l *zap.Logger) []byte {
defer func(httpBody io.ReadCloser) {
if err := httpBody.Close(); err != nil {
l.Error(logs.FailedToCloseHTTPBody, zap.Error(err))
}
}(httpBody)
body, err := io.ReadAll(httpBody)
if err != nil {
l.Error(logs.FailedToReadHTTPBody,
zap.Error(err),
zap.String("body type", payloadLabel))
return nil
}
return body
}
// processBody reads body and base64 encode it if it's not XML.
func processBody(bodyReader io.Reader) ([]byte, error) {
resultBody := &bytes.Buffer{}
detect := detector.NewDetector(bodyReader, xmlutils.DetectXML)
dataType, err := detect.Detect()
if err != nil {
return nil, err
}
writer := xmlutils.ChooseWriter(dataType, resultBody)
if _, err = io.Copy(writer, detect.RestoredReader()); err != nil {
return nil, err
}
if err = writer.Close(); err != nil {
return nil, err
}
return resultBody.Bytes(), nil
}

View file

@ -0,0 +1,36 @@
//go:build !loghttp
package middleware
import (
"net/http"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"go.uber.org/zap"
)
type (
LogHTTPSettings interface {
LogHTTPConfig() LogHTTPConfig
}
LogHTTPConfig struct {
Enabled bool
MaxBody int64
MaxLogSize int
OutputPath string
UseGzip bool
}
)
func LogHTTP(l *zap.Logger, _ LogHTTPSettings) Func {
l.Warn(logs.LogHTTPDisabledInThisBuild)
return func(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
h.ServeHTTP(w, r)
})
}
}
func (*LogHTTPConfig) InitHTTPLogger(*zap.Logger) {
// ignore
}

View file

@ -6,7 +6,7 @@ import (
"fmt" "fmt"
"sync" "sync"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware" v2container "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ns" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ns"
@ -29,20 +29,14 @@ type FrostFS interface {
SystemDNS(context.Context) (string, error) SystemDNS(context.Context) (string, error)
} }
type Settings interface {
FormContainerZone(ns string) (zone string, isDefault bool)
}
type Config struct { type Config struct {
FrostFS FrostFS FrostFS FrostFS
RPCAddress string RPCAddress string
Settings Settings
} }
type BucketResolver struct { type BucketResolver struct {
rpcAddress string rpcAddress string
frostfs FrostFS frostfs FrostFS
settings Settings
mu sync.RWMutex mu sync.RWMutex
resolvers []*Resolver resolvers []*Resolver
@ -50,15 +44,15 @@ type BucketResolver struct {
type Resolver struct { type Resolver struct {
Name string Name string
resolve func(context.Context, string) (cid.ID, error) resolve func(context.Context, string, string) (cid.ID, error)
} }
func (r *Resolver) SetResolveFunc(fn func(context.Context, string) (cid.ID, error)) { func (r *Resolver) SetResolveFunc(fn func(context.Context, string, string) (cid.ID, error)) {
r.resolve = fn r.resolve = fn
} }
func (r *Resolver) Resolve(ctx context.Context, name string) (cid.ID, error) { func (r *Resolver) Resolve(ctx context.Context, zone, name string) (cid.ID, error) {
return r.resolve(ctx, name) return r.resolve(ctx, zone, name)
} }
func NewBucketResolver(resolverNames []string, cfg *Config) (*BucketResolver, error) { func NewBucketResolver(resolverNames []string, cfg *Config) (*BucketResolver, error) {
@ -87,12 +81,12 @@ func createResolvers(resolverNames []string, cfg *Config) ([]*Resolver, error) {
return resolvers, nil return resolvers, nil
} }
func (r *BucketResolver) Resolve(ctx context.Context, bktName string) (cnrID cid.ID, err error) { func (r *BucketResolver) Resolve(ctx context.Context, zone, bktName string) (cnrID cid.ID, err error) {
r.mu.RLock() r.mu.RLock()
defer r.mu.RUnlock() defer r.mu.RUnlock()
for _, resolver := range r.resolvers { for _, resolver := range r.resolvers {
cnrID, resolverErr := resolver.Resolve(ctx, bktName) cnrID, resolverErr := resolver.Resolve(ctx, zone, bktName)
if resolverErr != nil { if resolverErr != nil {
resolverErr = fmt.Errorf("%s: %w", resolver.Name, resolverErr) resolverErr = fmt.Errorf("%s: %w", resolver.Name, resolverErr)
if err == nil { if err == nil {
@ -123,7 +117,6 @@ func (r *BucketResolver) UpdateResolvers(resolverNames []string) error {
cfg := &Config{ cfg := &Config{
FrostFS: r.frostfs, FrostFS: r.frostfs,
RPCAddress: r.rpcAddress, RPCAddress: r.rpcAddress,
Settings: r.settings,
} }
resolvers, err := createResolvers(resolverNames, cfg) resolvers, err := createResolvers(resolverNames, cfg)
@ -152,30 +145,25 @@ func (r *BucketResolver) equals(resolverNames []string) bool {
func newResolver(name string, cfg *Config) (*Resolver, error) { func newResolver(name string, cfg *Config) (*Resolver, error) {
switch name { switch name {
case DNSResolver: case DNSResolver:
return NewDNSResolver(cfg.FrostFS, cfg.Settings) return NewDNSResolver(cfg.FrostFS)
case NNSResolver: case NNSResolver:
return NewNNSResolver(cfg.RPCAddress, cfg.Settings) return NewNNSResolver(cfg.RPCAddress)
default: default:
return nil, fmt.Errorf("unknown resolver: %s", name) return nil, fmt.Errorf("unknown resolver: %s", name)
} }
} }
func NewDNSResolver(frostFS FrostFS, settings Settings) (*Resolver, error) { func NewDNSResolver(frostFS FrostFS) (*Resolver, error) {
if frostFS == nil { if frostFS == nil {
return nil, fmt.Errorf("pool must not be nil for DNS resolver") return nil, fmt.Errorf("pool must not be nil for DNS resolver")
} }
if settings == nil {
return nil, fmt.Errorf("resolver settings must not be nil for DNS resolver")
}
var dns ns.DNS var dns ns.DNS
resolveFunc := func(ctx context.Context, name string) (cid.ID, error) { resolveFunc := func(ctx context.Context, zone, name string) (cid.ID, error) {
var err error var err error
reqInfo := middleware.GetReqInfo(ctx)
zone, isDefault := settings.FormContainerZone(reqInfo.Namespace) if zone == v2container.SysAttributeZoneDefault {
if isDefault {
zone, err = frostFS.SystemDNS(ctx) zone, err = frostFS.SystemDNS(ctx)
if err != nil { if err != nil {
return cid.ID{}, fmt.Errorf("read system DNS parameter of the FrostFS: %w", err) return cid.ID{}, fmt.Errorf("read system DNS parameter of the FrostFS: %w", err)
@ -196,13 +184,10 @@ func NewDNSResolver(frostFS FrostFS, settings Settings) (*Resolver, error) {
}, nil }, nil
} }
func NewNNSResolver(address string, settings Settings) (*Resolver, error) { func NewNNSResolver(address string) (*Resolver, error) {
if address == "" { if address == "" {
return nil, fmt.Errorf("rpc address must not be empty for NNS resolver") return nil, fmt.Errorf("rpc address must not be empty for NNS resolver")
} }
if settings == nil {
return nil, fmt.Errorf("resolver settings must not be nil for NNS resolver")
}
var nns ns.NNS var nns ns.NNS
@ -210,12 +195,9 @@ func NewNNSResolver(address string, settings Settings) (*Resolver, error) {
return nil, fmt.Errorf("dial %s: %w", address, err) return nil, fmt.Errorf("dial %s: %w", address, err)
} }
resolveFunc := func(ctx context.Context, name string) (cid.ID, error) { resolveFunc := func(_ context.Context, zone, name string) (cid.ID, error) {
var d container.Domain var d container.Domain
d.SetName(name) d.SetName(name)
reqInfo := middleware.GetReqInfo(ctx)
zone, _ := settings.FormContainerZone(reqInfo.Namespace)
d.SetZone(zone) d.SetZone(zone)
cnrID, err := nns.ResolveContainerDomain(d) cnrID, err := nns.ResolveContainerDomain(d)

View file

@ -99,6 +99,7 @@ type Settings interface {
s3middleware.PolicySettings s3middleware.PolicySettings
s3middleware.MetricsSettings s3middleware.MetricsSettings
s3middleware.VHSSettings s3middleware.VHSSettings
s3middleware.LogHTTPSettings
} }
type FrostFSID interface { type FrostFSID interface {
@ -127,7 +128,9 @@ type Config struct {
func NewRouter(cfg Config) *chi.Mux { func NewRouter(cfg Config) *chi.Mux {
api := chi.NewRouter() api := chi.NewRouter()
api.Use( api.Use(
s3middleware.LogHTTP(cfg.Log, cfg.MiddlewareSettings),
s3middleware.Request(cfg.Log, cfg.MiddlewareSettings), s3middleware.Request(cfg.Log, cfg.MiddlewareSettings),
middleware.ThrottleWithOpts(cfg.Throttle), middleware.ThrottleWithOpts(cfg.Throttle),
middleware.Recoverer, middleware.Recoverer,

View file

@ -80,6 +80,7 @@ type middlewareSettingsMock struct {
domains []string domains []string
vhsEnabled bool vhsEnabled bool
vhsNamespacesEnabled map[string]bool vhsNamespacesEnabled map[string]bool
logHTTP middleware.LogHTTPConfig
} }
func (r *middlewareSettingsMock) SourceIPHeader() string { func (r *middlewareSettingsMock) SourceIPHeader() string {
@ -117,6 +118,9 @@ func (r *middlewareSettingsMock) ServernameHeader() string {
func (r *middlewareSettingsMock) VHSNamespacesEnabled() map[string]bool { func (r *middlewareSettingsMock) VHSNamespacesEnabled() map[string]bool {
return r.vhsNamespacesEnabled return r.vhsNamespacesEnabled
} }
func (r *middlewareSettingsMock) LogHTTPConfig() middleware.LogHTTPConfig {
return r.logHTTP
}
type frostFSIDMock struct { type frostFSIDMock struct {
tags map[string]string tags map[string]string

View file

@ -11,6 +11,7 @@ import (
"os" "os"
"os/signal" "os/signal"
"runtime/debug" "runtime/debug"
"strings"
"sync" "sync"
"syscall" "syscall"
"time" "time"
@ -86,6 +87,7 @@ type (
appSettings struct { appSettings struct {
logLevel zap.AtomicLevel logLevel zap.AtomicLevel
httpLogging s3middleware.LogHTTPConfig
maxClient maxClientsConfig maxClient maxClientsConfig
defaultMaxAge int defaultMaxAge int
reconnectInterval time.Duration reconnectInterval time.Duration
@ -216,6 +218,7 @@ func (a *App) initLayer(ctx context.Context) {
func newAppSettings(log *Logger, v *viper.Viper) *appSettings { func newAppSettings(log *Logger, v *viper.Viper) *appSettings {
settings := &appSettings{ settings := &appSettings{
logLevel: log.lvl, logLevel: log.lvl,
httpLogging: s3middleware.LogHTTPConfig{},
maxClient: newMaxClients(v), maxClient: newMaxClients(v),
defaultMaxAge: fetchDefaultMaxAge(v, log.logger), defaultMaxAge: fetchDefaultMaxAge(v, log.logger),
reconnectInterval: fetchReconnectInterval(v), reconnectInterval: fetchReconnectInterval(v),
@ -250,10 +253,20 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
vhsEnabled := v.GetBool(cfgVHSEnabled) vhsEnabled := v.GetBool(cfgVHSEnabled)
vhsHeader := v.GetString(cfgVHSHeader) vhsHeader := v.GetString(cfgVHSHeader)
servernameHeader := v.GetString(cfgServernameHeader) servernameHeader := v.GetString(cfgServernameHeader)
vhsNamespacesEnabled := s.prepareVHSNamespaces(v, log) httpLoggingEnabled := v.GetBool(cfgHTTPLoggingEnabled)
httpLoggingMaxBody := v.GetInt64(cfgHTTPLoggingMaxBody)
httpLoggingMaxLogSize := v.GetInt(cfgHTTPLoggingMaxLogSize)
httpLoggingOutputPath := v.GetString(cfgHTTPLoggingDestination)
httpLoggingUseGzip := v.GetBool(cfgHTTPLoggingGzip)
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock()
s.httpLogging.Enabled = httpLoggingEnabled
s.httpLogging.MaxBody = httpLoggingMaxBody
s.httpLogging.MaxLogSize = httpLoggingMaxLogSize
s.httpLogging.OutputPath = httpLoggingOutputPath
s.httpLogging.UseGzip = httpLoggingUseGzip
s.httpLogging.InitHTTPLogger(log)
s.namespaceHeader = namespaceHeader s.namespaceHeader = namespaceHeader
s.defaultNamespaces = defaultNamespaces s.defaultNamespaces = defaultNamespaces
@ -272,17 +285,22 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
s.vhsEnabled = vhsEnabled s.vhsEnabled = vhsEnabled
s.vhsHeader = vhsHeader s.vhsHeader = vhsHeader
s.servernameHeader = servernameHeader s.servernameHeader = servernameHeader
s.vhsNamespacesEnabled = vhsNamespacesEnabled
s.mu.Unlock()
s.prepareVHSNamespaces(v, log)
} }
func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger) map[string]bool { func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger) {
nsMap := fetchVHSNamespaces(v, log) nsMap := fetchVHSNamespaces(v, log)
vhsNamespaces := make(map[string]bool, len(nsMap)) vhsNamespaces := make(map[string]bool, len(nsMap))
for ns, flag := range nsMap { for ns, flag := range nsMap {
vhsNamespaces[s.ResolveNamespaceAlias(ns)] = flag vhsNamespaces[s.ResolveNamespaceAlias(ns)] = flag
} }
return vhsNamespaces s.mu.Lock()
s.vhsNamespacesEnabled = vhsNamespaces
s.mu.Unlock()
} }
func (s *appSettings) Domains() []string { func (s *appSettings) Domains() []string {
@ -361,6 +379,13 @@ func (s *appSettings) DefaultCopiesNumbers(namespace string) []uint32 {
return s.namespaces[namespace].CopiesNumbers[defaultConstraintName] return s.namespaces[namespace].CopiesNumbers[defaultConstraintName]
} }
func (s *appSettings) LogHTTPConfig() s3middleware.LogHTTPConfig {
s.mu.RLock()
defer s.mu.RUnlock()
return s.httpLogging
}
func (s *appSettings) NewXMLDecoder(r io.Reader) *xml.Decoder { func (s *appSettings) NewXMLDecoder(r io.Reader) *xml.Decoder {
dec := xml.NewDecoder(r) dec := xml.NewDecoder(r)
dec.CharsetReader = func(charset string, reader io.Reader) (io.Reader, error) { dec.CharsetReader = func(charset string, reader io.Reader) (io.Reader, error) {
@ -404,12 +429,12 @@ func (s *appSettings) NamespaceHeader() string {
return s.namespaceHeader return s.namespaceHeader
} }
func (s *appSettings) FormContainerZone(ns string) (zone string, isDefault bool) { func (s *appSettings) FormContainerZone(ns string) string {
if len(ns) == 0 { if len(ns) == 0 {
return v2container.SysAttributeZoneDefault, true return v2container.SysAttributeZoneDefault
} }
return ns + ".ns", false return ns + ".ns"
} }
func (s *appSettings) isDefaultNamespace(ns string) bool { func (s *appSettings) isDefaultNamespace(ns string) bool {
@ -525,7 +550,6 @@ func (a *App) getResolverConfig() *resolver.Config {
return &resolver.Config{ return &resolver.Config{
FrostFS: frostfs.NewResolverFrostFS(a.pool), FrostFS: frostfs.NewResolverFrostFS(a.pool),
RPCAddress: a.cfg.GetString(cfgRPCEndpoint), RPCAddress: a.cfg.GetString(cfgRPCEndpoint),
Settings: a.settings,
} }
} }
@ -920,6 +944,8 @@ func getCacheOptions(v *viper.Viper, l *zap.Logger) *layer.CachesConfig {
cacheCfg.AccessControl.Lifetime = fetchCacheLifetime(v, l, cfgAccessControlCacheLifetime, cacheCfg.AccessControl.Lifetime) cacheCfg.AccessControl.Lifetime = fetchCacheLifetime(v, l, cfgAccessControlCacheLifetime, cacheCfg.AccessControl.Lifetime)
cacheCfg.AccessControl.Size = fetchCacheSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size) cacheCfg.AccessControl.Size = fetchCacheSize(v, l, cfgAccessControlCacheSize, cacheCfg.AccessControl.Size)
cacheCfg.NetworkInfo.Lifetime = fetchCacheLifetime(v, l, cfgNetworkInfoCacheLifetime, cacheCfg.NetworkInfo.Lifetime)
return cacheCfg return cacheCfg
} }
@ -1062,8 +1088,13 @@ func (a *App) fetchContainerInfo(ctx context.Context, cfgKey string) (info *data
var id cid.ID var id cid.ID
if err = id.DecodeString(containerString); err != nil { if err = id.DecodeString(containerString); err != nil {
if id, err = a.bucketResolver.Resolve(ctx, containerString); err != nil { i := strings.Index(containerString, ".")
return nil, fmt.Errorf("resolve container name %s: %w", containerString, err) if i < 0 {
return nil, fmt.Errorf("invalid container address: %s", containerString)
}
if id, err = a.bucketResolver.Resolve(ctx, containerString[i+1:], containerString[:i]); err != nil {
return nil, fmt.Errorf("resolve container address %s: %w", containerString, err)
} }
} }

View file

@ -81,6 +81,14 @@ const ( // Settings.
cfgLoggerLevel = "logger.level" cfgLoggerLevel = "logger.level"
cfgLoggerDestination = "logger.destination" cfgLoggerDestination = "logger.destination"
// HttpLogging.
cfgHTTPLoggingEnabled = "http_logging.enabled"
cfgHTTPLoggingMaxBody = "http_logging.max_body"
cfgHTTPLoggingMaxLogSize = "http_logging.max_log_size"
cfgHTTPLoggingDestination = "http_logging.destination"
cfgHTTPLoggingGzip = "http_logging.gzip"
cfgHTTPLoggingLogResponse = "http_logging.log_response"
// Wallet. // Wallet.
cfgWalletPath = "wallet.path" cfgWalletPath = "wallet.path"
cfgWalletAddress = "wallet.address" cfgWalletAddress = "wallet.address"
@ -122,6 +130,7 @@ const ( // Settings.
cfgMorphPolicyCacheSize = "cache.morph_policy.size" cfgMorphPolicyCacheSize = "cache.morph_policy.size"
cfgFrostfsIDCacheLifetime = "cache.frostfsid.lifetime" cfgFrostfsIDCacheLifetime = "cache.frostfsid.lifetime"
cfgFrostfsIDCacheSize = "cache.frostfsid.size" cfgFrostfsIDCacheSize = "cache.frostfsid.size"
cfgNetworkInfoCacheLifetime = "cache.network_info.lifetime"
cfgAccessBoxCacheRemovingCheckInterval = "cache.accessbox.removing_check_interval" cfgAccessBoxCacheRemovingCheckInterval = "cache.accessbox.removing_check_interval"
@ -779,6 +788,14 @@ func newSettings() *viper.Viper {
v.SetDefault(cfgLoggerLevel, "debug") v.SetDefault(cfgLoggerLevel, "debug")
v.SetDefault(cfgLoggerDestination, "stdout") v.SetDefault(cfgLoggerDestination, "stdout")
// http logger
v.SetDefault(cfgHTTPLoggingEnabled, false)
v.SetDefault(cfgHTTPLoggingMaxBody, 1024)
v.SetDefault(cfgHTTPLoggingMaxLogSize, 50)
v.SetDefault(cfgHTTPLoggingDestination, "stdout")
v.SetDefault(cfgHTTPLoggingGzip, false)
v.SetDefault(cfgHTTPLoggingLogResponse, true)
// pool: // pool:
v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold) v.SetDefault(cfgPoolErrorThreshold, defaultPoolErrorThreshold)
v.SetDefault(cfgStreamTimeout, defaultStreamTimeout) v.SetDefault(cfgStreamTimeout, defaultStreamTimeout)

View file

@ -0,0 +1,50 @@
package playback
import (
"encoding/xml"
"fmt"
"net/http"
)
type MultipartUpload struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ InitiateMultipartUploadResult" json:"-"`
Bucket string `json:"bucket" xml:"Bucket"`
Key string `json:"key" xml:"Key"`
UploadID string `json:"uploadId" xml:"UploadId"`
}
func HandleResponse(r *http.Request, mparts map[string]MultipartUpload, resp []byte, logResponse []byte) error {
var mpart, mpartOld MultipartUpload
if r.Method != "POST" || !r.URL.Query().Has("uploads") {
return nil
}
// get new uploadId from response
err := xml.Unmarshal(resp, &mpart)
if err != nil {
return fmt.Errorf("xml unmarshal error: %w", err)
}
// get old uploadId from logs
err = xml.Unmarshal(logResponse, &mpartOld)
if err != nil {
return fmt.Errorf("xml unmarshal error: %w", err)
}
if mpartOld.UploadID != "" {
mparts[mpartOld.UploadID] = mpart
}
return nil
}
func SwapUploadID(r *http.Request, settings *Settings) error {
var uploadID string
query := r.URL.Query()
uploadID = query.Get("uploadId")
mpart, ok := settings.Multiparts[uploadID]
if !ok {
return fmt.Errorf("no multipart upload with specified uploadId")
}
query.Set("uploadId", mpart.UploadID)
r.URL.RawQuery = query.Encode()
return nil
}

View file

@ -0,0 +1,97 @@
package playback
import (
"context"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/detector"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/xmlutils"
v4 "github.com/aws/aws-sdk-go-v2/aws/signer/v4"
"github.com/aws/aws-sdk-go-v2/credentials"
)
type (
httpBody []byte
LoggedRequest struct {
From string `json:"from"`
URI string `json:"URI"`
Method string `json:"method"`
Payload httpBody `json:"payload,omitempty"`
Response httpBody `json:"response,omitempty"`
Query url.Values `json:"query"`
Header http.Header `json:"headers"`
}
Credentials struct {
AccessKey string
SecretKey string
}
Settings struct {
Endpoint string
Creds Credentials
Multiparts map[string]MultipartUpload
Client *http.Client
}
)
func (h *httpBody) UnmarshalJSON(data []byte) error {
unquoted, err := strconv.Unquote(string(data))
if err != nil {
return fmt.Errorf("failed to unquote data: %w", err)
}
detect := detector.NewDetector(strings.NewReader(unquoted), xmlutils.DetectXML)
dataType, err := detect.Detect()
if err != nil {
return fmt.Errorf("failed to detect data: %w", err)
}
reader := xmlutils.ChooseReader(dataType, detect.RestoredReader())
*h, err = io.ReadAll(reader)
if err != nil {
return fmt.Errorf("failed to unmarshal httpbody: %w", err)
}
return nil
}
// Sign replace Authorization header with new Access key id and Signature values.
func Sign(ctx context.Context, r *http.Request, creds Credentials) error {
credProvider := credentials.NewStaticCredentialsProvider(creds.AccessKey, creds.SecretKey, "")
awsCred, err := credProvider.Retrieve(ctx)
if err != nil {
return err
}
authHdr := r.Header.Get(auth.AuthorizationHdr)
authInfo, err := parseAuthHeader(authHdr)
if err != nil {
return err
}
newHeader := strings.Replace(authHdr, authInfo["access_key_id"], creds.AccessKey, 1)
r.Header.Set(auth.AuthorizationHdr, newHeader)
signer := v4.NewSigner()
signatureDateTimeStr := r.Header.Get(api.AmzDate)
signatureDateTime, err := time.Parse("20060102T150405Z", signatureDateTimeStr)
if err != nil {
return err
}
return signer.SignHTTP(ctx, awsCred, r, r.Header.Get(api.AmzContentSha256), authInfo["service"], authInfo["region"], signatureDateTime)
}
func parseAuthHeader(authHeader string) (map[string]string, error) {
authInfo := auth.NewRegexpMatcher(auth.AuthorizationFieldRegexp).GetSubmatches(authHeader)
if len(authInfo) == 0 {
return nil, errors.New("no matches found")
}
return authInfo, nil
}

View file

@ -0,0 +1,98 @@
package playback
import (
"errors"
"strings"
"testing"
"github.com/stretchr/testify/require"
)
var errNoMatches = errors.New("no matches found")
func withoutValue(data map[string]string, field string) map[string]string {
result := make(map[string]string)
for k, v := range data {
result[k] = v
}
result[field] = ""
return result
}
func TestParseAuthHeader(t *testing.T) {
defaultHeader := "AWS4-HMAC-SHA256 Credential=oid0cid/20210809/us-east-1/s3/aws4_request, SignedHeaders=host;x-amz-content-sha256;x-amz-date, Signature=2811ccb9e242f41426738fb1f"
defaultAuthInfo := map[string]string{
"access_key_id": "oid0cid",
"service": "s3",
"region": "us-east-1",
"v4_signature": "2811ccb9e242f41426738fb1f",
"signed_header_fields": "host;x-amz-content-sha256;x-amz-date",
"date": "20210809",
}
for _, tc := range []struct {
title string
header string
err error
expected map[string]string
}{
{
title: "correct full header",
header: defaultHeader,
err: nil,
expected: defaultAuthInfo,
},
{
title: "correct with empty region",
header: strings.Replace(defaultHeader, "/us-east-1/", "//", 1),
err: nil,
expected: withoutValue(defaultAuthInfo, "region"),
},
{
title: "empty access key",
header: strings.Replace(defaultHeader, "oid0cid", "", 1),
err: errNoMatches,
expected: nil,
},
{
title: "empty service",
header: strings.Replace(defaultHeader, "/s3/", "//", 1),
err: errNoMatches,
expected: nil,
},
{
title: "empty date",
header: strings.Replace(defaultHeader, "/20210809/", "//", 1),
err: errNoMatches,
expected: nil,
},
{
title: "empty v4_signature",
header: strings.Replace(defaultHeader, "Signature=2811ccb9e242f41426738fb1f",
"Signature=", 1),
err: errNoMatches,
expected: nil,
},
{
title: "empty signed_fields",
header: strings.Replace(defaultHeader, "SignedHeaders=host;x-amz-content-sha256;x-amz-date",
"SignedHeaders=", 1),
err: errNoMatches,
expected: nil,
},
{
title: "empty signed_fields",
header: strings.Replace(defaultHeader, "SignedHeaders=host;x-amz-content-sha256;x-amz-date",
"SignedHeaders=", 1),
err: errNoMatches,
expected: nil,
},
} {
t.Run(tc.title, func(t *testing.T) {
authInfo, err := parseAuthHeader(tc.header)
require.Equal(t, err, tc.err, tc.header)
require.Equal(t, tc.expected, authInfo, tc.header)
})
}
}

20
cmd/s3-playback/main.go Normal file
View file

@ -0,0 +1,20 @@
package main
import (
"context"
"os"
"os/signal"
"syscall"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/cmd/s3-playback/modules"
)
func main() {
ctx, _ := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
if cmd, err := modules.Execute(ctx); err != nil {
cmd.PrintErrln("Error:", err.Error())
cmd.PrintErrf("Run '%v --help' for usage.\n", cmd.CommandPath())
os.Exit(1)
}
}

View file

@ -0,0 +1,67 @@
package modules
import (
"context"
"os"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/version"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/spf13/viper"
)
const (
defaultPrintResponseLimit = 1024
cfgConfigPath = "config"
cfgHTTPTimeoutFlag = "http-timeout"
cfgSkipVerifyTLS = "skip-verify-tls"
)
var (
cfgFile string
rootCmd = &cobra.Command{
Use: "frostfs-s3-playback",
Version: version.Version,
Short: "FrostFS S3 Traffic Playback",
Long: "Helps to reproduce s3 commands from log files",
Example: "frostfs-s3-playback [--skip-verify-tls] [--http-timeout <timeout>] " +
"[--version] --config <config_path> <command>",
SilenceUsage: true,
SilenceErrors: true,
PersistentPreRunE: func(cmd *cobra.Command, _ []string) error {
return viper.BindPFlags(cmd.Flags())
},
RunE: func(cmd *cobra.Command, _ []string) error {
return cmd.Help()
},
}
)
func Execute(ctx context.Context) (*cobra.Command, error) {
return rootCmd.ExecuteContextC(ctx)
}
func initConfig() {
viper.SetConfigFile(cfgFile)
_ = viper.ReadInConfig()
}
func init() {
cobra.OnInitialize(initConfig)
cobra.EnableTraverseRunHooks = true
rootCmd.SetGlobalNormalizationFunc(func(_ *pflag.FlagSet, name string) pflag.NormalizedName {
return pflag.NormalizedName(strings.ReplaceAll(name, "_", "-"))
})
rootCmd.PersistentFlags().StringVar(&cfgFile, cfgConfigPath, "", "configuration filepath")
_ = rootCmd.MarkPersistentFlagRequired(cfgConfigPath)
_ = rootCmd.MarkPersistentFlagFilename(cfgConfigPath)
rootCmd.PersistentFlags().Duration(cfgHTTPTimeoutFlag, time.Minute, "http request timeout")
rootCmd.PersistentFlags().Bool(cfgSkipVerifyTLS, false, "skip TLS certificate verification")
rootCmd.SetOut(os.Stdout)
initRunCmd()
rootCmd.AddCommand(runCmd)
}

View file

@ -0,0 +1,208 @@
package modules
import (
"bufio"
"bytes"
"context"
"crypto/md5"
"crypto/sha256"
"crypto/tls"
"encoding/base64"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"strconv"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/cmd/s3-playback/internal/playback"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/detector"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/xmlutils"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
const (
cfgPrintResponseLimit = "print-response-limit"
cfgLogPath = "log"
cfgEndpoint = "endpoint"
awsAccessKey = "credentials.access_key"
awsSecretKey = "credentials.secret_key"
)
var runCmd = &cobra.Command{
Use: "run",
Short: "Send requests from log file",
Long: "Reads the network log file and sends each request to the specified URL",
Example: "frostfs-s3-playback --config <config_path> run [--endpoint=<endpoint>] [--log=<log_path>]",
PersistentPreRunE: func(cmd *cobra.Command, _ []string) (err error) {
viper.SetDefault(cfgPrintResponseLimit, defaultPrintResponseLimit)
return viper.BindPFlags(cmd.Flags())
},
RunE: run,
}
func initRunCmd() {
runCmd.Flags().String(cfgLogPath, "./request.log", "log file path")
runCmd.Flags().String(cfgEndpoint, "", "endpoint URL")
runCmd.Flags().Int(cfgPrintResponseLimit, defaultPrintResponseLimit, "print limit for http response body")
}
func logResponse(cmd *cobra.Command, r *http.Request, resp *http.Response) {
cmd.Println(r.Method, r.URL.RequestURI())
cmd.Println(resp.Status)
if resp.ContentLength == 0 {
return
}
detect := detector.NewDetector(resp.Body, xmlutils.DetectXML)
dataType, err := detect.Detect()
if err != nil {
cmd.PrintErrln("type detection error:", err.Error())
return
}
body := &bytes.Buffer{}
resultWriter := xmlutils.ChooseWriter(dataType, body)
_, err = io.Copy(resultWriter, io.LimitReader(detect.RestoredReader(), viper.GetInt64(cfgPrintResponseLimit)))
if err != nil {
cmd.Println(err)
return
}
if err = resultWriter.Close(); err != nil {
cmd.Printf("could not close response body: %s\n", err)
return
}
cmd.Println(body.String())
cmd.Println()
}
func run(cmd *cobra.Command, _ []string) error {
ctx := cmd.Context()
settings := &playback.Settings{
Endpoint: viper.GetString(cfgEndpoint),
Creds: playback.Credentials{
AccessKey: viper.GetString(awsAccessKey),
SecretKey: viper.GetString(awsSecretKey),
},
Multiparts: make(map[string]playback.MultipartUpload),
Client: &http.Client{
Transport: http.DefaultTransport,
Timeout: viper.GetDuration(cfgHTTPTimeoutFlag),
},
}
file, err := os.Open(viper.GetString(cfgLogPath))
if err != nil {
return err
}
defer file.Close()
reader := bufio.NewReader(file)
if viper.GetBool(cfgSkipVerifyTLS) {
settings.Client.Transport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
}
id := 1
for {
logReq, err := getRequestFromLog(reader)
if err != nil {
if err == io.EOF {
break
}
cmd.PrintErrln(strconv.Itoa(id)+")", "failed to parse request", err)
id++
continue
}
select {
case <-ctx.Done():
return fmt.Errorf("interrupted: %w", ctx.Err())
default:
r, resp, err := playbackRequest(ctx, logReq, settings)
if err != nil {
cmd.PrintErrln(strconv.Itoa(id)+")", "failed to playback request:", err)
id++
continue
}
cmd.Print(strconv.Itoa(id) + ") ")
logResponse(cmd, r, resp)
id++
}
}
return nil
}
func getRequestFromLog(reader *bufio.Reader) (playback.LoggedRequest, error) {
var logReq playback.LoggedRequest
req, err := reader.ReadString('\n')
if err != nil {
return logReq, err
}
err = json.Unmarshal([]byte(req), &logReq)
if err != nil {
return logReq, err
}
return logReq, nil
}
// playbackRequest creates http.Request from LoggedRequest and sends it to specified endpoint.
func playbackRequest(ctx context.Context, logReq playback.LoggedRequest, settings *playback.Settings) (*http.Request, *http.Response, error) {
r, err := prepareRequest(ctx, logReq, settings)
if err != nil {
return nil, nil, fmt.Errorf("failed to prepare request: %w", err)
}
resp, err := settings.Client.Do(r)
if err != nil {
return nil, nil, fmt.Errorf("failed to send request: %w", err)
}
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, nil, fmt.Errorf("failed to read response body: %w", err)
}
defer resp.Body.Close()
if err = playback.HandleResponse(r, settings.Multiparts, respBody, logReq.Response); err != nil {
return nil, nil, fmt.Errorf("failed to register multipart upload: %w", err)
}
resp.Body = io.NopCloser(bytes.NewBuffer(respBody))
return r, resp, nil
}
// prepareRequest creates request from logs and modifies its signature and uploadId (if presents).
func prepareRequest(ctx context.Context, logReq playback.LoggedRequest, settings *playback.Settings) (*http.Request, error) {
r, err := http.NewRequestWithContext(ctx, logReq.Method, settings.Endpoint+logReq.URI, bytes.NewReader(logReq.Payload))
if err != nil {
return nil, err
}
r.Header = logReq.Header
sha256hash := sha256.New()
sha256hash.Write(logReq.Payload)
r.Header.Set(auth.AmzContentSHA256, hex.EncodeToString(sha256hash.Sum(nil)))
if r.Header.Get(api.ContentMD5) != "" {
sha256hash.Reset()
md5hash := md5.New()
md5hash.Write(logReq.Payload)
r.Header.Set(api.ContentMD5, base64.StdEncoding.EncodeToString(md5hash.Sum(nil)))
}
if r.URL.Query().Has("uploadId") {
if err = playback.SwapUploadID(r, settings); err != nil {
return nil, err
}
}
if r.Header.Get(auth.AuthorizationHdr) != "" {
if err = playback.Sign(ctx, r, settings.Creds); err != nil {
return nil, err
}
}
return r, nil
}

View file

@ -52,6 +52,17 @@ S3_GW_CONFIG=/path/to/config/yaml
# Logger # Logger
S3_GW_LOGGER_LEVEL=debug S3_GW_LOGGER_LEVEL=debug
# HTTP logger
S3_GW_HTTP_LOGGING_ENABLED=false
# max body size to log
S3_GW_HTTP_LOGGING_MAX_BODY=1024
# max log size in Mb
S3_GW_HTTP_LOGGING_MAX_LOG_SIZE=20
# use log compression
S3_GW_HTTP_LOGGING_GZIP=true
# possible destination output values: filesystem path, url, "stdout", "stderr"
S3_GW_HTTP_LOGGING_DESTINATION=stdout
# RPC endpoint and order of resolving of bucket names # RPC endpoint and order of resolving of bucket names
S3_GW_RPC_ENDPOINT=http://morph-chain.frostfs.devenv:30333/ S3_GW_RPC_ENDPOINT=http://morph-chain.frostfs.devenv:30333/
S3_GW_RESOLVE_ORDER="nns dns" S3_GW_RESOLVE_ORDER="nns dns"
@ -111,6 +122,8 @@ S3_GW_CACHE_MORPH_POLICY_SIZE=10000
# Cache which stores frostfsid subject info # Cache which stores frostfsid subject info
S3_GW_CACHE_FROSTFSID_LIFETIME=1m S3_GW_CACHE_FROSTFSID_LIFETIME=1m
S3_GW_CACHE_FROSTFSID_SIZE=10000 S3_GW_CACHE_FROSTFSID_SIZE=10000
# Cache which stores network info
S3_GW_CACHE_NETWORK_INFO_LIFETIME=1m
# Default policy of placing containers in FrostFS # Default policy of placing containers in FrostFS
# If a user sends a request `CreateBucket` and doesn't define policy for placing of a container in FrostFS, the S3 Gateway # If a user sends a request `CreateBucket` and doesn't define policy for placing of a container in FrostFS, the S3 Gateway

View file

@ -56,6 +56,18 @@ logger:
level: debug level: debug
destination: stdout destination: stdout
# log http request data (URI, headers, query, etc)
http_logging:
enabled: false
# max body size to log
max_body: 1024
# max log size in Mb
max_log_size: 20
# use log compression
gzip: true
# possible output values: filesystem path, url, "stdout", "stderr"
destination: stdout
# RPC endpoint and order of resolving of bucket names # RPC endpoint and order of resolving of bucket names
rpc_endpoint: http://morph-chain.frostfs.devenv:30333 rpc_endpoint: http://morph-chain.frostfs.devenv:30333
resolve_order: resolve_order:
@ -135,6 +147,9 @@ cache:
frostfsid: frostfsid:
lifetime: 1m lifetime: 1m
size: 10000 size: 10000
# Cache which stores network info
network_info:
lifetime: 1m
# Parameters of FrostFS container placement policy # Parameters of FrostFS container placement policy
placement_policy: placement_policy:

View file

@ -0,0 +1,8 @@
endpoint: http://localhost:8084
log: ./log/request.log
credentials:
access_key: CAtUxDSSFtuVyVCjHTMhwx3eP3YSPo5ffwbPcnKfcdrD06WwUSn72T5EBNe3jcgjL54rmxFM6u3nUAoNBps8qJ1PD
secret_key: 560027d81c277de7378f71cbf12a32e4f7f541de724be59bcfdbfdc925425f30
http_timeout: 60s
skip_verify_tls: false
print_response_limit: 1024

View file

@ -176,6 +176,7 @@ There are some custom types used for brevity:
| `placement_policy` | [Placement policy configuration](#placement_policy-section) | | `placement_policy` | [Placement policy configuration](#placement_policy-section) |
| `server` | [Server configuration](#server-section) | | `server` | [Server configuration](#server-section) |
| `logger` | [Logger configuration](#logger-section) | | `logger` | [Logger configuration](#logger-section) |
| `http_logging` | [HTTP Request logger configuration](#http_logging-section) |
| `cache` | [Cache configuration](#cache-section) | | `cache` | [Cache configuration](#cache-section) |
| `cors` | [CORS configuration](#cors-section) | | `cors` | [CORS configuration](#cors-section) |
| `pprof` | [Pprof configuration](#pprof-section) | | `pprof` | [Pprof configuration](#pprof-section) |
@ -376,6 +377,32 @@ logger:
| `level` | `string` | yes | `debug` | Logging level.<br/>Possible values: `debug`, `info`, `warn`, `error`, `dpanic`, `panic`, `fatal`. | | `level` | `string` | yes | `debug` | Logging level.<br/>Possible values: `debug`, `info`, `warn`, `error`, `dpanic`, `panic`, `fatal`. |
| `destination` | `string` | no | `stdout` | Destination for logger: `stdout` or `journald` | | `destination` | `string` | no | `stdout` | Destination for logger: `stdout` or `journald` |
### `http_logging` section
Could be enabled only in builds with `loghttp` build tag. To build with `loghttp` tag, pass `GOFLAGS` var to `make`:
```bash
make GOFLAGS="-tags=loghttp" [target]
```
```yaml
http_logging:
enabled: false
max_body: 1024
max_log_size: 20
gzip: true
destination: stdout
```
| Parameter | Type | SIGHUP reload | Default value | Description |
|----------------|----------|---------------|---------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `enabled` | `bool` | yes | `false` | Flag to enable the logger. |
| `max_body` | `int` | yes | `1024` | Max body size for log output in bytes. |
| `max_log_size` | `int` | yes | `50` | Log file size threshold (in megabytes) to be moved in backup file. After reaching threshold, initial filename is appended with timestamp. And new empty file with initial name is created. |
| `gzip` | `bool` | yes | `false` | Whether to enable Gzip compression to backup log files. |
| `destination` | `string` | yes | `stdout` | Specify path for log output. Accepts log file path, or "stdout" and "stderr" reserved words to print in output streams. File and folders are created if necessary. |
### `cache` section ### `cache` section
```yaml ```yaml
@ -411,6 +438,8 @@ cache:
frostfsid: frostfsid:
lifetime: 1m lifetime: 1m
size: 10000 size: 10000
network_info:
lifetime: 1m
``` ```
| Parameter | Type | Default value | Description | | Parameter | Type | Default value | Description |
@ -425,6 +454,7 @@ cache:
| `accesscontrol` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 100000` | Cache which stores owner to cache operation mapping. | | `accesscontrol` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 100000` | Cache which stores owner to cache operation mapping. |
| `morph_policy` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores list of policy chains. | | `morph_policy` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores list of policy chains. |
| `frostfsid` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores FrostfsID subject info. | | `frostfsid` | [Cache config](#cache-subsection) | `lifetime: 1m`<br>`size: 10000` | Cache which stores FrostfsID subject info. |
| `network_info` | [Cache config](#cache-subsection) | `lifetime: 1m` | Cache which stores network info. |
#### `cache` subsection #### `cache` subsection
@ -743,7 +773,7 @@ vhs:
``` ```
| Parameter | Type | SIGHUP reload | Default value | Description | | Parameter | Type | SIGHUP reload | Default value | Description |
| ------------------- | ----------------- | ------------- | ---------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | |---------------------|-------------------|---------------|------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `enabled` | `bool` | yes | `false` | Enables the use of virtual host addressing for buckets at the application level. | | `enabled` | `bool` | yes | `false` | Enables the use of virtual host addressing for buckets at the application level. |
| `vhs_header` | `string` | yes | `X-Frostfs-S3-VHS` | Header for determining whether VHS is enabled for the request. | | `vhs_header` | `string` | yes | `X-Frostfs-S3-VHS` | Header for determining whether VHS is enabled for the request. |
| `servername_header` | `string` | yes | `X-Frostfs-Servername` | Header for determining servername. | | `servername_header` | `string` | yes | `X-Frostfs-Servername` | Header for determining servername. |

48
docs/playback.md Normal file
View file

@ -0,0 +1,48 @@
# FrostFS S3 Playback
Playback is a tool to reproduce queries to `frostfs-s3-gw` in dev environment. Network logs could be
gathered from `s3-gw` via HTTP Logger which could be enabled on build with `loghttp` build tag
and `http_logging.enabled` option set to `true` in `s3-gw` configuration.
## Commands
`run` - reads log file and reproduces send requests from it to specified endpoint
#### Example
```bash
frostfs-s3-playback --config <config_path> run [--endpoint=<endpoint>] [--log=<log_path>]
```
## Configuration
Playback accepts configuration file path in yaml with corresponding options:
```yaml
endpoint: http://localhost:8084
log: ./request.log
env: .env
credentials:
access_key: CAtUxDSSFtuVyVCjHTMhwx3eP3YSPo5ffwbPcnKfcdrD06WwUSn72T5EBNe3jcgjL54rmxFM6u3nUAoNBps8qJ1PD
secret_key: 560027d81c277de7378f71cbf12a32e4f7f541de724be59bcfdbfdc925425f30
http_timeout: 60s
skip_verify_tls: true
```
Configuration path is passed via required `--config` flag.
If corresponding flag is set, it overrides parameter from config.
### Configuration parameters
#### Global parameters
| Config parameter name | Flag name | Type | Default value | Description |
|-------------------------|-------------------|------------|---------------|-------------------------------------------------------------------------------|
| - | `config` | `string` | - | config file path (e.g. `./config/playback.yaml`) |
| `http_timeout` | `http-timeout` | `duration` | `60s` | http request timeout |
| `skip_verify_tls` | `skip-verify-tls` | `bool` | `false` | skips tls certificate verification for https endpoints |
| `credentials.accessKey` | - | `string` | - | AWS access key id |
| `credentials.secretKey` | - | `string` | - | AWS secret key |
| `print_response_limit` | - | `int` | `1024` | max response length to be printed in stdout, the rest of body will be omitted |
#### `run` command parameters
| Config parameter name | Flag name | Type | Default value | Description |
|-----------------------|-----------|--------|---------------|--------------------------------------------------------|
| `endpoint` | endpoint | string | - | s3-gw endpoint URL |
| `log` | log | string | ./request.log | path to log file, could be either absolute or relative |

12
go.mod
View file

@ -7,10 +7,11 @@ require (
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240822080251-28f140bf06c1 git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240822080251-28f140bf06c1
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240821072038-a1386f6d259a git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02 git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02
github.com/aws/aws-sdk-go v1.44.6 github.com/aws/aws-sdk-go v1.44.6
github.com/aws/aws-sdk-go-v2 v1.18.1 github.com/aws/aws-sdk-go-v2 v1.30.3
github.com/aws/aws-sdk-go-v2/credentials v1.17.27
github.com/bluele/gcache v0.0.2 github.com/bluele/gcache v0.0.2
github.com/go-chi/chi/v5 v5.0.8 github.com/go-chi/chi/v5 v5.0.8
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
@ -20,7 +21,7 @@ require (
github.com/panjf2000/ants/v2 v2.5.0 github.com/panjf2000/ants/v2 v2.5.0
github.com/prometheus/client_golang v1.19.0 github.com/prometheus/client_golang v1.19.0
github.com/prometheus/client_model v0.5.0 github.com/prometheus/client_model v0.5.0
github.com/spf13/cobra v1.7.0 github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5 github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.15.0 github.com/spf13/viper v1.15.0
github.com/ssgreg/journald v1.0.0 github.com/ssgreg/journald v1.0.0
@ -36,6 +37,7 @@ require (
golang.org/x/text v0.14.0 golang.org/x/text v0.14.0
google.golang.org/grpc v1.63.2 google.golang.org/grpc v1.63.2
google.golang.org/protobuf v1.33.0 google.golang.org/protobuf v1.33.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
) )
require ( require (
@ -45,11 +47,11 @@ require (
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 // indirect git.frostfs.info/TrueCloudLab/tzhash v1.8.0 // indirect
github.com/VictoriaMetrics/easyproto v0.1.4 // indirect github.com/VictoriaMetrics/easyproto v0.1.4 // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/aws/smithy-go v1.13.5 // indirect github.com/aws/smithy-go v1.20.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.4 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect

25
go.sum
View file

@ -48,8 +48,8 @@ git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240822080251-28f140bf06c1
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240822080251-28f140bf06c1/go.mod h1:Pl77loECndbgIC0Kljj1MFmGJKQ9gotaFINyveW1T8I= git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240822080251-28f140bf06c1/go.mod h1:Pl77loECndbgIC0Kljj1MFmGJKQ9gotaFINyveW1T8I=
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc= git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM= git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240821072038-a1386f6d259a h1:uuNs7xOVgFOqO6hUyyZT+/eZ9glXQ85J4GDVe+qKMCI= git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b h1:M50kdfrf/h8c3cz0bJ2AEUcbXvAlPFVC1Wp1WkfZ/8E=
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240821072038-a1386f6d259a/go.mod h1:SgioiGhQNWqiV5qpFAXRDJF81SEFRBhtwGEiU0FViyA= git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240822104152-a3bc3099bd5b/go.mod h1:GZTk55RI4dKzsK6BCn5h2xxE28UHNfgoq/NJxW/LQ6A=
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA= git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0 h1:M2KR3iBj7WpY3hP10IevfIB9MURr4O9mwVfJ+SjT3HA=
git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0/go.mod h1:okpbKfVYf/BpejtfFTfhZqFP+sZ8rsHrP8Rr/jYPNRc= git.frostfs.info/TrueCloudLab/rfc6979 v0.4.0/go.mod h1:okpbKfVYf/BpejtfFTfhZqFP+sZ8rsHrP8Rr/jYPNRc=
git.frostfs.info/TrueCloudLab/tzhash v1.8.0 h1:UFMnUIk0Zh17m8rjGHJMqku2hCgaXDqjqZzS4gsb4UA= git.frostfs.info/TrueCloudLab/tzhash v1.8.0 h1:UFMnUIk0Zh17m8rjGHJMqku2hCgaXDqjqZzS4gsb4UA=
@ -66,10 +66,12 @@ github.com/antlr4-go/antlr/v4 v4.13.0 h1:lxCg3LAv+EUK6t1i0y1V6/SLeUi0eKEKdhQAlS8
github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g= github.com/antlr4-go/antlr/v4 v4.13.0/go.mod h1:pfChB/xh/Unjila75QW7+VU4TSnWnnk9UTnmpPaOR2g=
github.com/aws/aws-sdk-go v1.44.6 h1:Y+uHxmZfhRTLX2X3khkdxCoTZAyGEX21aOUHe1U6geg= github.com/aws/aws-sdk-go v1.44.6 h1:Y+uHxmZfhRTLX2X3khkdxCoTZAyGEX21aOUHe1U6geg=
github.com/aws/aws-sdk-go v1.44.6/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= github.com/aws/aws-sdk-go v1.44.6/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go-v2 v1.18.1 h1:+tefE750oAb7ZQGzla6bLkOwfcQCEtC5y2RqoqCeqKo= github.com/aws/aws-sdk-go-v2 v1.30.3 h1:jUeBtG0Ih+ZIFH0F4UkmL9w3cSpaMv9tYYDbzILP8dY=
github.com/aws/aws-sdk-go-v2 v1.18.1/go.mod h1:uzbQtefpm44goOPmdKyAlXSNcwlRgF3ePWVW6EtJvvw= github.com/aws/aws-sdk-go-v2 v1.30.3/go.mod h1:nIQjQVp5sfpQcTc9mPSr1B0PaWK5ByX9MOoDadSN4lc=
github.com/aws/smithy-go v1.13.5 h1:hgz0X/DX0dGqTYpGALqXJoRKRj5oQ7150i5FdTePzO8= github.com/aws/aws-sdk-go-v2/credentials v1.17.27 h1:2raNba6gr2IfA0eqqiP2XiQ0UVOpGPgDSi0I9iAP+UI=
github.com/aws/smithy-go v1.13.5/go.mod h1:Tg+OJXh4MB2R/uN61Ko2f6hTZwB/ZYGOtib8J3gBHzA= github.com/aws/aws-sdk-go-v2/credentials v1.17.27/go.mod h1:gniiwbGahQByxan6YjQUMcW4Aov6bLC3m+evgcoN4r4=
github.com/aws/smithy-go v1.20.3 h1:ryHwveWzPV5BIof6fyDvor6V3iUL7nTfiTKXHiW05nE=
github.com/aws/smithy-go v1.20.3/go.mod h1:krry+ya/rV9RDcV/Q16kpu6ypI4K2czasz0NC3qS14E=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bits-and-blooms/bitset v1.8.0 h1:FD+XqgOZDUxxZ8hzoBFuV9+cGWY9CslN6d5MS5JVb4c= github.com/bits-and-blooms/bitset v1.8.0 h1:FD+XqgOZDUxxZ8hzoBFuV9+cGWY9CslN6d5MS5JVb4c=
@ -100,8 +102,8 @@ github.com/consensys/bavard v0.1.13/go.mod h1:9ItSMtA/dXMAiL7BG6bqW2m3NdSEObYWoH
github.com/consensys/gnark-crypto v0.12.2-0.20231013160410-1f65e75b6dfb h1:f0BMgIjhZy4lSRHCXFbQst85f5agZAjtDMixQqBWNpc= github.com/consensys/gnark-crypto v0.12.2-0.20231013160410-1f65e75b6dfb h1:f0BMgIjhZy4lSRHCXFbQst85f5agZAjtDMixQqBWNpc=
github.com/consensys/gnark-crypto v0.12.2-0.20231013160410-1f65e75b6dfb/go.mod h1:v2Gy7L/4ZRosZ7Ivs+9SfUDr0f5UlG+EM5t7MPHiLuY= github.com/consensys/gnark-crypto v0.12.2-0.20231013160410-1f65e75b6dfb/go.mod h1:v2Gy7L/4ZRosZ7Ivs+9SfUDr0f5UlG+EM5t7MPHiLuY=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w= github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lVfRxZq4=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@ -177,7 +179,6 @@ github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
@ -295,8 +296,8 @@ github.com/spf13/afero v1.9.3 h1:41FoI0fD7OR7mGcKE/aOiLkGreyf8ifIOQmJANWogMk=
github.com/spf13/afero v1.9.3/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y= github.com/spf13/afero v1.9.3/go.mod h1:iUV7ddyEEZPO5gA3zD4fJt6iStLlL+Lg4m2cihcDf8Y=
github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w= github.com/spf13/cast v1.5.0 h1:rj3WzYc11XZaIZMPKmwP96zkFEnnAmV8s6XbB2aY32w=
github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU= github.com/spf13/cast v1.5.0/go.mod h1:SpXXQ5YoyJw6s3/6cMTQuxvgRl3PCJiyaX9p6b155UU=
github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I= github.com/spf13/cobra v1.8.1 h1:e5/vxKd/rZsfSJMUX1agtjeTDf+qv1/JdBF8gg5k9ZM=
github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0= github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3kD9Y=
github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk= github.com/spf13/jwalterweatherman v1.1.0 h1:ue6voC5bR5F8YxI5S67j9i582FU4Qvo2bmqnqMYADFk=
github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo= github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0EXowPYD95IqWIGo=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
@ -707,6 +708,8 @@ gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA=
gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View file

@ -99,6 +99,13 @@ const (
FailedToPassAuthentication = "failed to pass authentication" // Error in ../../api/middleware/auth.go FailedToPassAuthentication = "failed to pass authentication" // Error in ../../api/middleware/auth.go
FailedToResolveCID = "failed to resolve CID" // Debug in ../../api/middleware/metrics.go FailedToResolveCID = "failed to resolve CID" // Debug in ../../api/middleware/metrics.go
RequestStart = "request start" // Info in ../../api/middleware/reqinfo.go RequestStart = "request start" // Info in ../../api/middleware/reqinfo.go
LogHTTP = "http log" // Info in ../../api/middleware/log_http.go
FailedToCloseHTTPBody = "failed to close http body" // Error in ../../api/middleware/log_http.go
FailedToInitializeHTTPLogger = "failed to initialize http logger" // Error in ../../api/middleware/log_http.go
FailedToReloadHTTPFileLogger = "failed to reload http file logger" // Error in ../../api/middleware/log_http.go
FailedToReadHTTPBody = "failed to read http body" // Error in ../../api/middleware/log_http.go
FailedToProcessHTTPBody = "failed to process http body" // Error in ../../api/middleware/log_http.go
LogHTTPDisabledInThisBuild = "http logging disabled in this build" // Warn in ../../api/middleware/log_http_stub.go
FailedToUnescapeObjectName = "failed to unescape object name" // Warn in ../../api/middleware/reqinfo.go FailedToUnescapeObjectName = "failed to unescape object name" // Warn in ../../api/middleware/reqinfo.go
InvalidDefaultMaxAge = "invalid defaultMaxAge" // Fatal in ../../cmd/s3-gw/app_settings.go InvalidDefaultMaxAge = "invalid defaultMaxAge" // Fatal in ../../cmd/s3-gw/app_settings.go
CantShutDownService = "can't shut down service" // Panic in ../../cmd/s3-gw/service.go CantShutDownService = "can't shut down service" // Panic in ../../cmd/s3-gw/service.go
@ -161,4 +168,6 @@ const (
WarnDuplicateNamespaceVHS = "duplicate namespace with enabled VHS, config value skipped" WarnDuplicateNamespaceVHS = "duplicate namespace with enabled VHS, config value skipped"
WarnValueVHSEnabledFlagWrongType = "the value of the VHS enable flag for the namespace is of the wrong type, config value skipped" WarnValueVHSEnabledFlagWrongType = "the value of the VHS enable flag for the namespace is of the wrong type, config value skipped"
WarnDomainContainsInvalidPlaceholder = "the domain contains an invalid placeholder, domain skipped" WarnDomainContainsInvalidPlaceholder = "the domain contains an invalid placeholder, domain skipped"
FailedToRemoveOldPartNode = "failed to remove old part node"
CouldntCacheNetworkInfo = "couldn't cache network info"
) )

View file

@ -1,15 +1,15 @@
package layer package detector
import ( import (
"io" "io"
"net/http"
) )
type ( type (
detector struct { Detector struct {
io.Reader io.Reader
err error err error
data []byte data []byte
detectFunc func([]byte) string
} }
errReader struct { errReader struct {
data []byte data []byte
@ -36,23 +36,24 @@ func (r *errReader) Read(b []byte) (int, error) {
return n, nil return n, nil
} }
func newDetector(reader io.Reader) *detector { func NewDetector(reader io.Reader, detectFunc func([]byte) string) *Detector {
return &detector{ return &Detector{
data: make([]byte, contentTypeDetectSize), data: make([]byte, contentTypeDetectSize),
Reader: reader, Reader: reader,
detectFunc: detectFunc,
} }
} }
func (d *detector) Detect() (string, error) { func (d *Detector) Detect() (string, error) {
n, err := d.Reader.Read(d.data) n, err := d.Reader.Read(d.data)
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
d.err = err d.err = err
return "", err return "", err
} }
d.data = d.data[:n] d.data = d.data[:n]
return http.DetectContentType(d.data), nil return d.detectFunc(d.data), nil
} }
func (d *detector) MultiReader() io.Reader { func (d *Detector) RestoredReader() io.Reader {
return io.MultiReader(newReader(d.data, d.err), d.Reader) return io.MultiReader(newReader(d.data, d.err), d.Reader)
} }

View file

@ -156,6 +156,10 @@ type NodeResponse interface {
} }
func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) { func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
if err := validateNodeResponse(nodeInfo); err != nil {
return nil, err
}
tNode := &treeNode{ tNode := &treeNode{
ID: nodeInfo.GetNodeID(), ID: nodeInfo.GetNodeID(),
ParentID: nodeInfo.GetParentID(), ParentID: nodeInfo.GetParentID(),
@ -163,14 +167,6 @@ func newTreeNode(nodeInfo NodeResponse) (*treeNode, error) {
Meta: make(map[string]string, len(nodeInfo.GetMeta())), Meta: make(map[string]string, len(nodeInfo.GetMeta())),
} }
if len(tNode.ID) == 0 || len(tNode.ParentID) == 0 || len(tNode.TimeStamp) == 0 {
return nil, errors.New("invalid tree node: missing id")
}
if len(tNode.ID) != len(tNode.ParentID) || len(tNode.ID) != len(tNode.TimeStamp) {
return nil, errors.New("invalid tree node: length multiple ids mismatch")
}
for _, kv := range nodeInfo.GetMeta() { for _, kv := range nodeInfo.GetMeta() {
switch kv.GetKey() { switch kv.GetKey() {
case oidKV: case oidKV:
@ -377,6 +373,10 @@ func newMultipartInfoFromTreeNode(log *zap.Logger, filePath string, treeNode *tr
} }
func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) { func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo, error) {
if err := validateNodeResponse(node); err != nil {
return nil, err
}
if len(node.GetNodeID()) != 1 { if len(node.GetNodeID()) != 1 {
return nil, errors.New("invalid multipart node: this is split node") return nil, errors.New("invalid multipart node: this is split node")
} }
@ -426,10 +426,36 @@ func newMultipartInfo(log *zap.Logger, node NodeResponse) (*data.MultipartInfo,
return multipartInfo, nil return multipartInfo, nil
} }
func newPartInfo(node NodeResponse) (*data.PartInfo, error) { func validateNodeResponse(node NodeResponse) error {
var err error ids := node.GetNodeID()
partInfo := &data.PartInfo{} parentIDs := node.GetParentID()
timestamps := node.GetTimestamp()
if len(ids) == 0 || len(parentIDs) == 0 || len(timestamps) == 0 {
return errors.New("invalid node response: missing ids")
}
if len(ids) != len(parentIDs) || len(parentIDs) != len(timestamps) {
return errors.New("invalid node response: multiple ids length mismatch")
}
return nil
}
func newPartInfo(node NodeResponse) (*data.PartInfoExtended, error) {
if err := validateNodeResponse(node); err != nil {
return nil, err
}
if len(node.GetNodeID()) != 1 {
return nil, errors.New("invalid part node: this is split node")
}
partInfo := &data.PartInfoExtended{
Timestamp: node.GetTimestamp()[0],
}
var err error
for _, kv := range node.GetMeta() { for _, kv := range node.GetMeta() {
value := string(kv.GetValue()) value := string(kv.GetValue())
switch kv.GetKey() { switch kv.GetKey() {
@ -1397,10 +1423,10 @@ func (c *Tree) GetMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo,
return nil, layer.ErrNodeNotFound return nil, layer.ErrNodeNotFound
} }
func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) { func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDsToDelete []oid.ID, err error) {
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false) parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
if err != nil { if err != nil {
return oid.ID{}, err return nil, err
} }
meta := map[string]string{ meta := map[string]string{
@ -1412,48 +1438,76 @@ func (c *Tree) AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartN
md5KV: info.MD5, md5KV: info.MD5,
} }
objToDelete := make([]oid.ID, 0, 1)
partsToDelete := make([]uint64, 0, 1)
var (
latestPartID uint64
maxTimestamp uint64
)
multiNodeID := MultiID{multipartNodeID}
for _, part := range parts { for _, part := range parts {
if len(part.GetNodeID()) != 1 { if multiNodeID.Equal(part.GetNodeID()) {
// multipart parts nodeID shouldn't have multiple values
c.reqLogger(ctx).Warn(logs.UnexpectedMultiNodeIDsInSubTreeMultiParts,
zap.String("key", info.Key),
zap.String("upload id", info.UploadID),
zap.Uint64("multipart node id ", multipartNodeID),
zap.Uint64s("node ids", part.GetNodeID()))
continue
}
nodeID := part.GetNodeID()[0]
if nodeID == multipartNodeID {
continue continue
} }
partInfo, err := newPartInfo(part) partInfo, err := newPartInfo(part)
if err != nil { if err != nil {
c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo, c.reqLogger(ctx).Warn(logs.FailedToParsePartInfo,
zap.String("key", info.Key), zap.String("key", info.Key),
zap.String("upload id", info.UploadID), zap.String("upload id", info.UploadID),
zap.Uint64("multipart node id ", multipartNodeID), zap.Uint64("multipart node id ", multipartNodeID),
zap.Uint64s("id", part.GetNodeID()),
zap.Error(err)) zap.Error(err))
continue continue
} }
if partInfo.Number == info.Number { if partInfo.Number == info.Number {
return partInfo.OID, c.service.MoveNode(ctx, bktInfo, systemTree, nodeID, multipartNodeID, meta) nodeID := part.GetNodeID()[0]
objToDelete = append(objToDelete, partInfo.OID)
partsToDelete = append(partsToDelete, nodeID)
timestamp := partInfo.Timestamp
if timestamp > maxTimestamp {
maxTimestamp = timestamp
latestPartID = nodeID
} }
} }
}
if len(objToDelete) != 0 {
if err = c.service.MoveNode(ctx, bktInfo, systemTree, latestPartID, multipartNodeID, meta); err != nil {
return nil, fmt.Errorf("move part node: %w", err)
}
for _, nodeID := range partsToDelete {
if nodeID == latestPartID {
continue
}
if err = c.service.RemoveNode(ctx, bktInfo, systemTree, nodeID); err != nil {
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldPartNode,
zap.String("key", info.Key),
zap.String("upload id", info.UploadID),
zap.Uint64("id", nodeID))
}
}
return objToDelete, nil
}
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil { if _, err = c.service.AddNode(ctx, bktInfo, systemTree, multipartNodeID, meta); err != nil {
return oid.ID{}, err return nil, err
} }
return oid.ID{}, layer.ErrNoNodeToRemove return nil, layer.ErrNoNodeToRemove
} }
func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) { func (c *Tree) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfoExtended, error) {
parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false) parts, err := c.service.GetSubTree(ctx, bktInfo, systemTree, []uint64{multipartNodeID}, 2, false)
if err != nil { if err != nil {
return nil, err return nil, err
} }
result := make([]*data.PartInfo, 0, len(parts)) result := make([]*data.PartInfoExtended, 0, len(parts))
for _, part := range parts { for _, part := range parts {
if len(part.GetNodeID()) != 1 { if len(part.GetNodeID()) != 1 {
// multipart parts nodeID shouldn't have multiple values // multipart parts nodeID shouldn't have multiple values

View file

@ -7,6 +7,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test" cidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id/test"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test" oidtest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id/test"
usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test" usertest "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user/test"
"github.com/nspcc-dev/neo-go/pkg/crypto/keys" "github.com/nspcc-dev/neo-go/pkg/crypto/keys"
@ -304,3 +305,57 @@ func TestGetLatestNode(t *testing.T) {
}) })
} }
} }
func TestSplitTreeMultiparts(t *testing.T) {
ctx := context.Background()
memCli, err := NewTreeServiceClientMemory()
require.NoError(t, err)
treeService := NewTree(memCli, zaptest.NewLogger(t))
bktInfo := &data.BucketInfo{
CID: cidtest.ID(),
}
multipartInfo := &data.MultipartInfo{
Key: "multipart",
UploadID: "id",
Meta: map[string]string{},
Owner: usertest.ID(),
}
err = treeService.CreateMultipartUpload(ctx, bktInfo, multipartInfo)
require.NoError(t, err)
multipartInfo, err = treeService.GetMultipartUpload(ctx, bktInfo, multipartInfo.Key, multipartInfo.UploadID)
require.NoError(t, err)
var objIDs []oid.ID
for i := 0; i < 2; i++ {
objID := oidtest.ID()
_, err = memCli.AddNode(ctx, bktInfo, systemTree, multipartInfo.ID, map[string]string{
partNumberKV: "1",
oidKV: objID.EncodeToString(),
ownerKV: usertest.ID().EncodeToString(),
})
require.NoError(t, err)
objIDs = append(objIDs, objID)
}
parts, err := treeService.GetParts(ctx, bktInfo, multipartInfo.ID)
require.NoError(t, err)
require.Len(t, parts, 2)
objToDeletes, err := treeService.AddPart(ctx, bktInfo, multipartInfo.ID, &data.PartInfo{
Key: multipartInfo.Key,
UploadID: multipartInfo.UploadID,
Number: 1,
OID: oidtest.ID(),
})
require.NoError(t, err)
require.EqualValues(t, objIDs, objToDeletes, "oids to delete mismatched")
parts, err = treeService.GetParts(ctx, bktInfo, multipartInfo.ID)
require.NoError(t, err)
require.Len(t, parts, 1)
}

48
pkg/xmlutils/xmlutils.go Normal file
View file

@ -0,0 +1,48 @@
package xmlutils
import (
"bytes"
"encoding/base64"
"encoding/xml"
"io"
)
type nopCloseWriter struct {
io.Writer
}
func (b nopCloseWriter) Close() error {
return nil
}
const (
nonXML = "nonXML"
typeXML = "application/xml"
)
func DetectXML(data []byte) string {
token, err := xml.NewDecoder(bytes.NewReader(data)).RawToken()
if err != nil {
return nonXML
}
switch token.(type) {
case xml.StartElement, xml.ProcInst:
return typeXML
}
return nonXML
}
func ChooseWriter(dataType string, bodyWriter io.Writer) io.WriteCloser {
if dataType == typeXML {
return nopCloseWriter{bodyWriter}
}
return base64.NewEncoder(base64.StdEncoding, bodyWriter)
}
func ChooseReader(dataType string, bodyReader io.Reader) io.Reader {
if dataType == typeXML {
return bodyReader
}
return base64.NewDecoder(base64.StdEncoding, bodyReader)
}