forked from TrueCloudLab/frostfs-s3-gw
Compare commits
12 commits
performanc
...
master
Author | SHA1 | Date | |
---|---|---|---|
20719bd85c | |||
4f27e34974 | |||
3dc989d7fe | |||
69e77aecc9 | |||
c34680d157 | |||
f5326b9f04 | |||
51c5c227c2 | |||
c506620199 | |||
6cb0026007 | |||
971006a28c | |||
527e0dc612 | |||
3213dd7236 |
55 changed files with 1034 additions and 504 deletions
|
@ -1,4 +1,4 @@
|
|||
FROM golang:1.21 as builder
|
||||
FROM golang:1.21 AS builder
|
||||
|
||||
ARG BUILD=now
|
||||
ARG REPO=git.frostfs.info/TrueCloudLab/frostfs-s3-gw
|
||||
|
|
11
CHANGELOG.md
11
CHANGELOG.md
|
@ -4,14 +4,6 @@ This document outlines major changes between releases.
|
|||
|
||||
## [Unreleased]
|
||||
|
||||
## [0.30.1] - 2024-07-25
|
||||
|
||||
### Fixed
|
||||
- Redundant system node removal in tree service (#437)
|
||||
|
||||
### Added
|
||||
- Log details on SDK Pool health status change (#439)
|
||||
|
||||
## [0.30.0] - Kangshung -2024-07-19
|
||||
|
||||
### Fixed
|
||||
|
@ -241,5 +233,4 @@ To see CHANGELOG for older versions, refer to https://github.com/nspcc-dev/neofs
|
|||
[0.29.2]: https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/compare/v0.29.1...v0.29.2
|
||||
[0.29.3]: https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/compare/v0.29.2...v0.29.3
|
||||
[0.30.0]: https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/compare/v0.29.3...v0.30.0
|
||||
[0.30.1]: https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/compare/v0.30.0...v0.30.1
|
||||
[Unreleased]: https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/compare/v0.30.1...master
|
||||
[Unreleased]: https://git.frostfs.info/TrueCloudLab/frostfs-s3-gw/compare/v0.30.0...master
|
||||
|
|
2
VERSION
2
VERSION
|
@ -1 +1 @@
|
|||
v0.30.1
|
||||
v0.30.0
|
||||
|
|
|
@ -270,7 +270,9 @@ func (c *Center) checkFormData(r *http.Request) (*middleware.Box, error) {
|
|||
return nil, fmt.Errorf("failed to parse x-amz-date field: %w", err)
|
||||
}
|
||||
|
||||
addr, err := getAddress(submatches["access_key_id"])
|
||||
accessKeyID := submatches["access_key_id"]
|
||||
|
||||
addr, err := getAddress(accessKeyID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -283,14 +285,22 @@ func (c *Center) checkFormData(r *http.Request) (*middleware.Box, error) {
|
|||
secret := box.Gate.SecretKey
|
||||
service, region := submatches["service"], submatches["region"]
|
||||
|
||||
signature := signStr(secret, service, region, signatureDateTime, policy)
|
||||
signature := SignStr(secret, service, region, signatureDateTime, policy)
|
||||
reqSignature := MultipartFormValue(r, "x-amz-signature")
|
||||
if signature != reqSignature {
|
||||
return nil, fmt.Errorf("%w: %s != %s", apiErrors.GetAPIError(apiErrors.ErrSignatureDoesNotMatch),
|
||||
reqSignature, signature)
|
||||
}
|
||||
|
||||
return &middleware.Box{AccessBox: box, Attributes: attrs}, nil
|
||||
return &middleware.Box{
|
||||
AccessBox: box,
|
||||
AuthHeaders: &middleware.AuthHeader{
|
||||
AccessKeyID: accessKeyID,
|
||||
Region: region,
|
||||
SignatureV4: signature,
|
||||
},
|
||||
Attributes: attrs,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func cloneRequest(r *http.Request, authHeader *AuthHeader) *http.Request {
|
||||
|
@ -349,7 +359,7 @@ func (c *Center) checkSign(authHeader *AuthHeader, box *accessbox.Box, request *
|
|||
return nil
|
||||
}
|
||||
|
||||
func signStr(secret, service, region string, t time.Time, strToSign string) string {
|
||||
func SignStr(secret, service, region string, t time.Time, strToSign string) string {
|
||||
creds := deriveKey(secret, service, region, t)
|
||||
signature := hmacSHA256(creds, []byte(strToSign))
|
||||
return hex.EncodeToString(signature)
|
||||
|
|
|
@ -115,7 +115,7 @@ func TestSignature(t *testing.T) {
|
|||
panic(err)
|
||||
}
|
||||
|
||||
signature := signStr(secret, "s3", "us-east-1", signTime, strToSign)
|
||||
signature := SignStr(secret, "s3", "us-east-1", signTime, strToSign)
|
||||
require.Equal(t, "dfbe886241d9e369cf4b329ca0f15eb27306c97aa1022cc0bb5a914c4ef87634", signature)
|
||||
}
|
||||
|
||||
|
@ -434,7 +434,7 @@ func TestAuthenticate(t *testing.T) {
|
|||
|
||||
func TestHTTPPostAuthenticate(t *testing.T) {
|
||||
const (
|
||||
policyBase64 = "eyAiZXhwaXJhdGlvbiI6ICIyMDA3LTEyLTAxVDEyOjAwOjAwLjAwMFoiLAogICJjb25kaXRpb25zIjogWwogICAgeyJhY2wiOiAicHVibGljLXJlYWQiIH0sCiAgICB7ImJ1Y2tldCI6ICJqb2huc21pdGgiIH0sCiAgICBbInN0YXJ0cy13aXRoIiwgIiRrZXkiLCAidXNlci9lcmljLyJdLAogIF0KfQ=="
|
||||
policyBase64 = "eyJleHBpcmF0aW9uIjogIjIwMjUtMTItMDFUMTI6MDA6MDAuMDAwWiIsImNvbmRpdGlvbnMiOiBbCiBbInN0YXJ0cy13aXRoIiwgIiR4LWFtei1jcmVkZW50aWFsIiwgIiJdLAogWyJzdGFydHMtd2l0aCIsICIkeC1hbXotZGF0ZSIsICIiXQpdfQ=="
|
||||
invalidValue = "invalid-value"
|
||||
defaultFieldName = "file"
|
||||
service = "s3"
|
||||
|
@ -492,7 +492,7 @@ func TestHTTPPostAuthenticate(t *testing.T) {
|
|||
name: "HTTP POST valid",
|
||||
request: func() *http.Request {
|
||||
creds := getCredsStr(accessKeyID, timeToSignStr, region, service)
|
||||
sign := signStr(secret.SecretKey, service, region, timeToSign, policyBase64)
|
||||
sign := SignStr(secret.SecretKey, service, region, timeToSign, policyBase64)
|
||||
|
||||
return getRequestWithMultipartForm(t, policyBase64, creds, timeToSignStr, sign, defaultFieldName)
|
||||
}(),
|
||||
|
@ -501,7 +501,7 @@ func TestHTTPPostAuthenticate(t *testing.T) {
|
|||
name: "HTTP POST valid with custom field name",
|
||||
request: func() *http.Request {
|
||||
creds := getCredsStr(accessKeyID, timeToSignStr, region, service)
|
||||
sign := signStr(secret.SecretKey, service, region, timeToSign, policyBase64)
|
||||
sign := SignStr(secret.SecretKey, service, region, timeToSign, policyBase64)
|
||||
|
||||
return getRequestWithMultipartForm(t, policyBase64, creds, timeToSignStr, sign, "files")
|
||||
}(),
|
||||
|
@ -510,7 +510,7 @@ func TestHTTPPostAuthenticate(t *testing.T) {
|
|||
name: "HTTP POST valid with field name with a capital letter",
|
||||
request: func() *http.Request {
|
||||
creds := getCredsStr(accessKeyID, timeToSignStr, region, service)
|
||||
sign := signStr(secret.SecretKey, service, region, timeToSign, policyBase64)
|
||||
sign := SignStr(secret.SecretKey, service, region, timeToSign, policyBase64)
|
||||
|
||||
return getRequestWithMultipartForm(t, policyBase64, creds, timeToSignStr, sign, "File")
|
||||
}(),
|
||||
|
@ -530,7 +530,7 @@ func TestHTTPPostAuthenticate(t *testing.T) {
|
|||
name: "HTTP POST invalid signature date time",
|
||||
request: func() *http.Request {
|
||||
creds := getCredsStr(accessKeyID, timeToSignStr, region, service)
|
||||
sign := signStr(secret.SecretKey, service, region, timeToSign, policyBase64)
|
||||
sign := SignStr(secret.SecretKey, service, region, timeToSign, policyBase64)
|
||||
|
||||
return getRequestWithMultipartForm(t, policyBase64, creds, invalidValue, sign, defaultFieldName)
|
||||
}(),
|
||||
|
@ -539,7 +539,7 @@ func TestHTTPPostAuthenticate(t *testing.T) {
|
|||
{
|
||||
name: "HTTP POST invalid creds",
|
||||
request: func() *http.Request {
|
||||
sign := signStr(secret.SecretKey, service, region, timeToSign, policyBase64)
|
||||
sign := SignStr(secret.SecretKey, service, region, timeToSign, policyBase64)
|
||||
|
||||
return getRequestWithMultipartForm(t, policyBase64, invalidValue, timeToSignStr, sign, defaultFieldName)
|
||||
}(),
|
||||
|
@ -550,7 +550,7 @@ func TestHTTPPostAuthenticate(t *testing.T) {
|
|||
name: "HTTP POST missing policy",
|
||||
request: func() *http.Request {
|
||||
creds := getCredsStr(accessKeyID, timeToSignStr, region, service)
|
||||
sign := signStr(secret.SecretKey, service, region, timeToSign, policyBase64)
|
||||
sign := SignStr(secret.SecretKey, service, region, timeToSign, policyBase64)
|
||||
|
||||
return getRequestWithMultipartForm(t, "", creds, timeToSignStr, sign, defaultFieldName)
|
||||
}(),
|
||||
|
@ -560,7 +560,7 @@ func TestHTTPPostAuthenticate(t *testing.T) {
|
|||
name: "HTTP POST invalid accessKeyId",
|
||||
request: func() *http.Request {
|
||||
creds := getCredsStr(invalidValue, timeToSignStr, region, service)
|
||||
sign := signStr(secret.SecretKey, service, region, timeToSign, policyBase64)
|
||||
sign := SignStr(secret.SecretKey, service, region, timeToSign, policyBase64)
|
||||
|
||||
return getRequestWithMultipartForm(t, policyBase64, creds, timeToSignStr, sign, defaultFieldName)
|
||||
}(),
|
||||
|
@ -570,7 +570,7 @@ func TestHTTPPostAuthenticate(t *testing.T) {
|
|||
name: "HTTP POST invalid accessKeyId - a non-existent box",
|
||||
request: func() *http.Request {
|
||||
creds := getCredsStr(invalidAccessKeyID, timeToSignStr, region, service)
|
||||
sign := signStr(secret.SecretKey, service, region, timeToSign, policyBase64)
|
||||
sign := SignStr(secret.SecretKey, service, region, timeToSign, policyBase64)
|
||||
|
||||
return getRequestWithMultipartForm(t, policyBase64, creds, timeToSignStr, sign, defaultFieldName)
|
||||
}(),
|
||||
|
@ -580,7 +580,7 @@ func TestHTTPPostAuthenticate(t *testing.T) {
|
|||
name: "HTTP POST invalid signature",
|
||||
request: func() *http.Request {
|
||||
creds := getCredsStr(accessKeyID, timeToSignStr, region, service)
|
||||
sign := signStr(secret.SecretKey, service, region, timeToSign, invalidValue)
|
||||
sign := SignStr(secret.SecretKey, service, region, timeToSign, invalidValue)
|
||||
|
||||
return getRequestWithMultipartForm(t, policyBase64, creds, timeToSignStr, sign, defaultFieldName)
|
||||
}(),
|
||||
|
@ -602,6 +602,7 @@ func TestHTTPPostAuthenticate(t *testing.T) {
|
|||
} else {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, secret.SecretKey, box.AccessBox.Gate.SecretKey)
|
||||
require.Equal(t, accessKeyID, box.AuthHeaders.AccessKeyID)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -616,7 +617,7 @@ func getRequestWithMultipartForm(t *testing.T, policy, creds, date, sign, fieldN
|
|||
writer := multipart.NewWriter(body)
|
||||
defer writer.Close()
|
||||
|
||||
err := writer.WriteField("Policy", policy)
|
||||
err := writer.WriteField("policy", policy)
|
||||
require.NoError(t, err)
|
||||
err = writer.WriteField(AmzCredential, creds)
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -87,7 +87,9 @@ type (
|
|||
func (b *BucketInfo) SettingsObjectName() string { return bktSettingsObject }
|
||||
|
||||
// CORSObjectName returns a system name for a bucket CORS configuration file.
|
||||
func (b *BucketInfo) CORSObjectName() string { return bktCORSConfigurationObject }
|
||||
func (b *BucketInfo) CORSObjectName() string {
|
||||
return b.CID.EncodeToString() + bktCORSConfigurationObject
|
||||
}
|
||||
|
||||
// VersionID returns object version from ObjectInfo.
|
||||
func (o *ObjectInfo) VersionID() string { return o.ID.EncodeToString() }
|
||||
|
|
|
@ -41,6 +41,7 @@ type (
|
|||
RetryMaxAttempts() int
|
||||
RetryMaxBackoff() time.Duration
|
||||
RetryStrategy() RetryStrategy
|
||||
Domains() []string
|
||||
}
|
||||
|
||||
FrostFSID interface {
|
||||
|
|
|
@ -237,9 +237,18 @@ func (h *handler) DeleteBucketHandler(w http.ResponseWriter, r *http.Request) {
|
|||
sessionToken = boxData.Gate.SessionTokenForDelete()
|
||||
}
|
||||
|
||||
skipObjCheck := false
|
||||
if value, ok := r.Header[api.AmzForceBucketDelete]; ok {
|
||||
s := value[0]
|
||||
if s == "true" {
|
||||
skipObjCheck = true
|
||||
}
|
||||
}
|
||||
|
||||
if err = h.obj.DeleteBucket(r.Context(), &layer.DeleteBucketParams{
|
||||
BktInfo: bktInfo,
|
||||
SessionToken: sessionToken,
|
||||
SkipCheck: skipObjCheck,
|
||||
}); err != nil {
|
||||
h.logAndSendError(w, "couldn't delete bucket", reqInfo, err)
|
||||
return
|
||||
|
|
|
@ -85,6 +85,24 @@ func TestDeleteBucketOnNotFoundError(t *testing.T) {
|
|||
deleteBucket(t, hc, bktName, http.StatusNoContent)
|
||||
}
|
||||
|
||||
func TestForceDeleteBucket(t *testing.T) {
|
||||
hc := prepareHandlerContext(t)
|
||||
|
||||
bktName, objName := "bucket-for-removal", "object-to-delete"
|
||||
bktInfo := createTestBucket(hc, bktName)
|
||||
|
||||
putObject(hc, bktName, objName)
|
||||
|
||||
nodeVersion, err := hc.tree.GetUnversioned(hc.context, bktInfo, objName)
|
||||
require.NoError(t, err)
|
||||
var addr oid.Address
|
||||
addr.SetContainer(bktInfo.CID)
|
||||
addr.SetObject(nodeVersion.OID)
|
||||
|
||||
deleteBucketForce(t, hc, bktName, http.StatusConflict, "false")
|
||||
deleteBucketForce(t, hc, bktName, http.StatusNoContent, "true")
|
||||
}
|
||||
|
||||
func TestDeleteMultipleObjectCheckUniqueness(t *testing.T) {
|
||||
hc := prepareHandlerContext(t)
|
||||
|
||||
|
@ -517,6 +535,13 @@ func deleteObjectsBase(hc *handlerContext, bktName string, objVersions [][2]stri
|
|||
return w
|
||||
}
|
||||
|
||||
func deleteBucketForce(t *testing.T, tc *handlerContext, bktName string, code int, value string) {
|
||||
w, r := prepareTestRequest(tc, bktName, "", nil)
|
||||
r.Header.Set(api.AmzForceBucketDelete, value)
|
||||
tc.Handler().DeleteBucketHandler(w, r)
|
||||
assertStatus(t, w, code)
|
||||
}
|
||||
|
||||
func deleteBucket(t *testing.T, tc *handlerContext, bktName string, code int) {
|
||||
w, r := prepareTestRequest(tc, bktName, "", nil)
|
||||
tc.Handler().DeleteBucketHandler(w, r)
|
||||
|
|
|
@ -37,7 +37,7 @@ func TestSimpleGetEncrypted(t *testing.T) {
|
|||
|
||||
objInfo, err := tc.Layer().GetObjectInfo(tc.Context(), &layer.HeadObjectParams{BktInfo: bktInfo, Object: objName})
|
||||
require.NoError(t, err)
|
||||
obj, err := tc.MockedPool().ReadObject(tc.Context(), layer.PrmObjectRead{Container: bktInfo.CID, Object: objInfo.ID})
|
||||
obj, err := tc.MockedPool().GetObject(tc.Context(), layer.PrmObjectGet{Container: bktInfo.CID, Object: objInfo.ID})
|
||||
require.NoError(t, err)
|
||||
encryptedContent, err := io.ReadAll(obj.Payload)
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -72,6 +72,7 @@ type configMock struct {
|
|||
defaultCopiesNumbers []uint32
|
||||
bypassContentEncodingInChunks bool
|
||||
md5Enabled bool
|
||||
domains []string
|
||||
}
|
||||
|
||||
func (c *configMock) DefaultPlacementPolicy(_ string) netmap.PlacementPolicy {
|
||||
|
@ -135,6 +136,10 @@ func (c *configMock) RetryStrategy() RetryStrategy {
|
|||
return RetryStrategyConstant
|
||||
}
|
||||
|
||||
func (c *configMock) Domains() []string {
|
||||
return c.domains
|
||||
}
|
||||
|
||||
func prepareHandlerContext(t *testing.T) *handlerContext {
|
||||
return prepareHandlerContextBase(t, layer.DefaultCachesConfigs(zap.NewExample()))
|
||||
}
|
||||
|
|
|
@ -5,7 +5,9 @@ import (
|
|||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"path"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
|
@ -26,10 +28,11 @@ type (
|
|||
}
|
||||
|
||||
CompleteMultipartUploadResponse struct {
|
||||
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult" json:"-"`
|
||||
Bucket string `xml:"Bucket"`
|
||||
Key string `xml:"Key"`
|
||||
ETag string `xml:"ETag"`
|
||||
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ CompleteMultipartUploadResult" json:"-"`
|
||||
Bucket string `xml:"Bucket"`
|
||||
Key string `xml:"Key"`
|
||||
ETag string `xml:"ETag"`
|
||||
Location string `xml:"Location"`
|
||||
}
|
||||
|
||||
ListMultipartUploadsResponse struct {
|
||||
|
@ -54,11 +57,11 @@ type (
|
|||
Initiator Initiator `xml:"Initiator"`
|
||||
IsTruncated bool `xml:"IsTruncated"`
|
||||
Key string `xml:"Key"`
|
||||
MaxParts int `xml:"MaxParts,omitempty"`
|
||||
NextPartNumberMarker int `xml:"NextPartNumberMarker,omitempty"`
|
||||
MaxParts int `xml:"MaxParts"`
|
||||
NextPartNumberMarker int `xml:"NextPartNumberMarker"`
|
||||
Owner Owner `xml:"Owner"`
|
||||
Parts []*layer.Part `xml:"Part"`
|
||||
PartNumberMarker int `xml:"PartNumberMarker,omitempty"`
|
||||
PartNumberMarker int `xml:"PartNumberMarker"`
|
||||
StorageClass string `xml:"StorageClass"`
|
||||
UploadID string `xml:"UploadId"`
|
||||
}
|
||||
|
@ -423,9 +426,10 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
|
|||
}
|
||||
|
||||
response := CompleteMultipartUploadResponse{
|
||||
Bucket: objInfo.Bucket,
|
||||
Key: objInfo.Name,
|
||||
ETag: data.Quote(objInfo.ETag(h.cfg.MD5Enabled())),
|
||||
Bucket: objInfo.Bucket,
|
||||
Key: objInfo.Name,
|
||||
ETag: data.Quote(objInfo.ETag(h.cfg.MD5Enabled())),
|
||||
Location: getObjectLocation(r, h.cfg.Domains(), reqInfo.BucketName, reqInfo.ObjectName),
|
||||
}
|
||||
|
||||
if settings.VersioningEnabled() {
|
||||
|
@ -437,6 +441,35 @@ func (h *handler) CompleteMultipartUploadHandler(w http.ResponseWriter, r *http.
|
|||
}
|
||||
}
|
||||
|
||||
// returns "https" if the tls boolean is true, "http" otherwise.
|
||||
func getURLScheme(r *http.Request) string {
|
||||
if r.TLS != nil {
|
||||
return "https"
|
||||
}
|
||||
return "http"
|
||||
}
|
||||
|
||||
// getObjectLocation gets the fully qualified URL of an object.
|
||||
func getObjectLocation(r *http.Request, domains []string, bucket, object string) string {
|
||||
proto := middleware.GetSourceScheme(r)
|
||||
if proto == "" {
|
||||
proto = getURLScheme(r)
|
||||
}
|
||||
u := &url.URL{
|
||||
Host: r.Host,
|
||||
Path: path.Join("/", bucket, object),
|
||||
Scheme: proto,
|
||||
}
|
||||
// If domain is set then we need to use bucket DNS style.
|
||||
for _, domain := range domains {
|
||||
if strings.HasPrefix(r.Host, bucket+"."+domain) {
|
||||
u.Path = path.Join("/", object)
|
||||
break
|
||||
}
|
||||
}
|
||||
return u.String()
|
||||
}
|
||||
|
||||
func (h *handler) completeMultipartUpload(r *http.Request, c *layer.CompleteMultipartParams, bktInfo *data.BucketInfo) (*data.ObjectInfo, error) {
|
||||
ctx := r.Context()
|
||||
uploadData, extendedObjInfo, err := h.obj.CompleteMultipartUpload(ctx, c)
|
||||
|
|
|
@ -292,13 +292,19 @@ func TestListParts(t *testing.T) {
|
|||
require.Len(t, list.Parts, 2)
|
||||
require.Equal(t, etag1, list.Parts[0].ETag)
|
||||
require.Equal(t, etag2, list.Parts[1].ETag)
|
||||
require.Zero(t, list.PartNumberMarker)
|
||||
require.Equal(t, 2, list.NextPartNumberMarker)
|
||||
|
||||
list = listParts(hc, bktName, objName, uploadInfo.UploadID, "1", http.StatusOK)
|
||||
require.Len(t, list.Parts, 1)
|
||||
require.Equal(t, etag2, list.Parts[0].ETag)
|
||||
require.Equal(t, 1, list.PartNumberMarker)
|
||||
require.Equal(t, 2, list.NextPartNumberMarker)
|
||||
|
||||
list = listParts(hc, bktName, objName, uploadInfo.UploadID, "2", http.StatusOK)
|
||||
require.Len(t, list.Parts, 0)
|
||||
require.Equal(t, 2, list.PartNumberMarker)
|
||||
require.Equal(t, 0, list.NextPartNumberMarker)
|
||||
|
||||
list = listParts(hc, bktName, objName, uploadInfo.UploadID, "7", http.StatusOK)
|
||||
require.Len(t, list.Parts, 0)
|
||||
|
@ -435,6 +441,80 @@ func TestUploadPartCheckContentSHA256(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestMultipartObjectLocation(t *testing.T) {
|
||||
for _, tc := range []struct {
|
||||
req *http.Request
|
||||
bucket string
|
||||
object string
|
||||
domains []string
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
req: &http.Request{
|
||||
Host: "127.0.0.1:8084",
|
||||
Header: map[string][]string{"X-Forwarded-Scheme": {"http"}},
|
||||
},
|
||||
bucket: "testbucket1",
|
||||
object: "test/1.txt",
|
||||
expected: "http://127.0.0.1:8084/testbucket1/test/1.txt",
|
||||
},
|
||||
{
|
||||
req: &http.Request{
|
||||
Host: "localhost:8084",
|
||||
Header: map[string][]string{"X-Forwarded-Scheme": {"https"}},
|
||||
},
|
||||
bucket: "testbucket1",
|
||||
object: "test/1.txt",
|
||||
expected: "https://localhost:8084/testbucket1/test/1.txt",
|
||||
},
|
||||
{
|
||||
req: &http.Request{
|
||||
Host: "s3.mybucket.org",
|
||||
Header: map[string][]string{"X-Forwarded-Scheme": {"http"}},
|
||||
},
|
||||
bucket: "mybucket",
|
||||
object: "test/1.txt",
|
||||
expected: "http://s3.mybucket.org/mybucket/test/1.txt",
|
||||
},
|
||||
{
|
||||
req: &http.Request{Host: "mys3.mybucket.org"},
|
||||
bucket: "mybucket",
|
||||
object: "test/1.txt",
|
||||
expected: "http://mys3.mybucket.org/mybucket/test/1.txt",
|
||||
},
|
||||
{
|
||||
req: &http.Request{Host: "s3.bucket.org", TLS: &tls.ConnectionState{}},
|
||||
bucket: "bucket",
|
||||
object: "obj",
|
||||
expected: "https://s3.bucket.org/bucket/obj",
|
||||
},
|
||||
{
|
||||
req: &http.Request{
|
||||
Host: "mybucket.s3dev.frostfs.devenv",
|
||||
},
|
||||
domains: []string{"s3dev.frostfs.devenv"},
|
||||
bucket: "mybucket",
|
||||
object: "test/1.txt",
|
||||
expected: "http://mybucket.s3dev.frostfs.devenv/test/1.txt",
|
||||
},
|
||||
{
|
||||
req: &http.Request{
|
||||
Host: "mybucket.s3dev.frostfs.devenv",
|
||||
Header: map[string][]string{"X-Forwarded-Scheme": {"https"}},
|
||||
},
|
||||
domains: []string{"s3dev.frostfs.devenv"},
|
||||
bucket: "mybucket",
|
||||
object: "test/1.txt",
|
||||
expected: "https://mybucket.s3dev.frostfs.devenv/test/1.txt",
|
||||
},
|
||||
} {
|
||||
t.Run("", func(t *testing.T) {
|
||||
location := getObjectLocation(tc.req, tc.domains, tc.bucket, tc.object)
|
||||
require.Equal(t, tc.expected, location)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func uploadPartCopy(hc *handlerContext, bktName, objName, uploadID string, num int, srcObj string, start, end int) *UploadPartCopyResponse {
|
||||
return uploadPartCopyBase(hc, bktName, objName, false, uploadID, num, srcObj, start, end)
|
||||
}
|
||||
|
|
|
@ -232,7 +232,7 @@ func (h *handler) ListBucketObjectVersionsHandler(w http.ResponseWriter, r *http
|
|||
return
|
||||
}
|
||||
|
||||
response := encodeListObjectVersionsToResponse(info, p.BktInfo.Name, h.cfg.MD5Enabled())
|
||||
response := encodeListObjectVersionsToResponse(p, info, p.BktInfo.Name, h.cfg.MD5Enabled())
|
||||
if err = middleware.EncodeToResponse(w, response); err != nil {
|
||||
h.logAndSendError(w, "something went wrong", reqInfo, err)
|
||||
}
|
||||
|
@ -264,24 +264,28 @@ func parseListObjectVersionsRequest(reqInfo *middleware.ReqInfo) (*layer.ListObj
|
|||
return &res, nil
|
||||
}
|
||||
|
||||
func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, bucketName string, md5Enabled bool) *ListObjectsVersionsResponse {
|
||||
func encodeListObjectVersionsToResponse(p *layer.ListObjectVersionsParams, info *layer.ListObjectVersionsInfo, bucketName string, md5Enabled bool) *ListObjectsVersionsResponse {
|
||||
res := ListObjectsVersionsResponse{
|
||||
Name: bucketName,
|
||||
IsTruncated: info.IsTruncated,
|
||||
KeyMarker: info.KeyMarker,
|
||||
NextKeyMarker: info.NextKeyMarker,
|
||||
KeyMarker: s3PathEncode(info.KeyMarker, p.Encode),
|
||||
NextKeyMarker: s3PathEncode(info.NextKeyMarker, p.Encode),
|
||||
NextVersionIDMarker: info.NextVersionIDMarker,
|
||||
VersionIDMarker: info.VersionIDMarker,
|
||||
Prefix: s3PathEncode(p.Prefix, p.Encode),
|
||||
Delimiter: s3PathEncode(p.Delimiter, p.Encode),
|
||||
EncodingType: p.Encode,
|
||||
MaxKeys: p.MaxKeys,
|
||||
}
|
||||
|
||||
for _, prefix := range info.CommonPrefixes {
|
||||
res.CommonPrefixes = append(res.CommonPrefixes, CommonPrefix{Prefix: prefix})
|
||||
res.CommonPrefixes = append(res.CommonPrefixes, CommonPrefix{Prefix: s3PathEncode(prefix, p.Encode)})
|
||||
}
|
||||
|
||||
for _, ver := range info.Version {
|
||||
res.Version = append(res.Version, ObjectVersionResponse{
|
||||
IsLatest: ver.IsLatest,
|
||||
Key: ver.NodeVersion.FilePath,
|
||||
Key: s3PathEncode(ver.NodeVersion.FilePath, p.Encode),
|
||||
LastModified: ver.NodeVersion.Created.UTC().Format(time.RFC3339),
|
||||
Owner: Owner{
|
||||
ID: ver.NodeVersion.Owner.String(),
|
||||
|
@ -297,7 +301,7 @@ func encodeListObjectVersionsToResponse(info *layer.ListObjectVersionsInfo, buck
|
|||
for _, del := range info.DeleteMarker {
|
||||
res.DeleteMarker = append(res.DeleteMarker, DeleteMarkerEntry{
|
||||
IsLatest: del.IsLatest,
|
||||
Key: del.NodeVersion.FilePath,
|
||||
Key: s3PathEncode(del.NodeVersion.FilePath, p.Encode),
|
||||
LastModified: del.NodeVersion.Created.UTC().Format(time.RFC3339),
|
||||
Owner: Owner{
|
||||
ID: del.NodeVersion.Owner.String(),
|
||||
|
|
|
@ -675,6 +675,49 @@ func TestMintVersioningListObjectVersionsVersionIDContinuation(t *testing.T) {
|
|||
require.Equal(t, page1.NextVersionIDMarker, page2.VersionIDMarker)
|
||||
}
|
||||
|
||||
func TestListObjectVersionsEncoding(t *testing.T) {
|
||||
hc := prepareHandlerContext(t)
|
||||
|
||||
bktName := "bucket-for-listing-versions-encoding"
|
||||
bktInfo := createTestBucket(hc, bktName)
|
||||
putBucketVersioning(t, hc, bktName, true)
|
||||
|
||||
objects := []string{"foo()/bar", "foo()/bar/xyzzy", "auux ab/thud", "asdf+b"}
|
||||
for _, objName := range objects {
|
||||
createTestObject(hc, bktInfo, objName, encryption.Params{})
|
||||
}
|
||||
deleteObject(t, hc, bktName, "auux ab/thud", "")
|
||||
|
||||
listResponse := listObjectsVersionsURL(hc, bktName, "foo(", ")", "", "", -1)
|
||||
|
||||
require.Len(t, listResponse.CommonPrefixes, 1)
|
||||
require.Equal(t, "foo%28%29", listResponse.CommonPrefixes[0].Prefix)
|
||||
require.Len(t, listResponse.Version, 0)
|
||||
require.Len(t, listResponse.DeleteMarker, 0)
|
||||
require.Equal(t, "foo%28", listResponse.Prefix)
|
||||
require.Equal(t, "%29", listResponse.Delimiter)
|
||||
require.Equal(t, "url", listResponse.EncodingType)
|
||||
require.Equal(t, maxObjectList, listResponse.MaxKeys)
|
||||
|
||||
listResponse = listObjectsVersions(hc, bktName, "", "", "", "", 1)
|
||||
require.Empty(t, listResponse.EncodingType)
|
||||
|
||||
listResponse = listObjectsVersionsURL(hc, bktName, "", "", listResponse.NextKeyMarker, listResponse.NextVersionIDMarker, 3)
|
||||
|
||||
require.Len(t, listResponse.CommonPrefixes, 0)
|
||||
require.Len(t, listResponse.Version, 2)
|
||||
require.Equal(t, "auux%20ab/thud", listResponse.Version[0].Key)
|
||||
require.False(t, listResponse.Version[0].IsLatest)
|
||||
require.Equal(t, "foo%28%29/bar", listResponse.Version[1].Key)
|
||||
require.Len(t, listResponse.DeleteMarker, 1)
|
||||
require.Equal(t, "auux%20ab/thud", listResponse.DeleteMarker[0].Key)
|
||||
require.True(t, listResponse.DeleteMarker[0].IsLatest)
|
||||
require.Equal(t, "asdf%2Bb", listResponse.KeyMarker)
|
||||
require.Equal(t, "foo%28%29/bar", listResponse.NextKeyMarker)
|
||||
require.Equal(t, "url", listResponse.EncodingType)
|
||||
require.Equal(t, 3, listResponse.MaxKeys)
|
||||
}
|
||||
|
||||
func checkVersionsNames(t *testing.T, versions *ListObjectsVersionsResponse, names []string) {
|
||||
for i, v := range versions.Version {
|
||||
require.Equal(t, names[i], v.Key)
|
||||
|
@ -777,6 +820,14 @@ func listObjectsV1(hc *handlerContext, bktName, prefix, delimiter, marker string
|
|||
}
|
||||
|
||||
func listObjectsVersions(hc *handlerContext, bktName, prefix, delimiter, keyMarker, versionIDMarker string, maxKeys int) *ListObjectsVersionsResponse {
|
||||
return listObjectsVersionsBase(hc, bktName, prefix, delimiter, keyMarker, versionIDMarker, maxKeys, false)
|
||||
}
|
||||
|
||||
func listObjectsVersionsURL(hc *handlerContext, bktName, prefix, delimiter, keyMarker, versionIDMarker string, maxKeys int) *ListObjectsVersionsResponse {
|
||||
return listObjectsVersionsBase(hc, bktName, prefix, delimiter, keyMarker, versionIDMarker, maxKeys, true)
|
||||
}
|
||||
|
||||
func listObjectsVersionsBase(hc *handlerContext, bktName, prefix, delimiter, keyMarker, versionIDMarker string, maxKeys int, encode bool) *ListObjectsVersionsResponse {
|
||||
query := prepareCommonListObjectsQuery(prefix, delimiter, maxKeys)
|
||||
if len(keyMarker) != 0 {
|
||||
query.Add("key-marker", keyMarker)
|
||||
|
@ -784,6 +835,9 @@ func listObjectsVersions(hc *handlerContext, bktName, prefix, delimiter, keyMark
|
|||
if len(versionIDMarker) != 0 {
|
||||
query.Add("version-id-marker", versionIDMarker)
|
||||
}
|
||||
if encode {
|
||||
query.Add("encoding-type", "url")
|
||||
}
|
||||
|
||||
w, r := prepareTestFullRequest(hc, bktName, "", query, nil)
|
||||
hc.Handler().ListBucketObjectVersionsHandler(w, r)
|
||||
|
|
|
@ -9,10 +9,10 @@ import (
|
|||
stderrors "errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"mime/multipart"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"runtime/trace"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
@ -183,12 +183,6 @@ type createBucketParams struct {
|
|||
}
|
||||
|
||||
func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||
if trace.IsEnabled() {
|
||||
pctx, task := trace.NewTask(r.Context(), "PutObjectHandler")
|
||||
defer task.End()
|
||||
r = r.WithContext(pctx)
|
||||
}
|
||||
|
||||
var (
|
||||
err error
|
||||
cannedACLStatus = aclHeadersStatus(r)
|
||||
|
@ -476,21 +470,47 @@ func (h *handler) PostObject(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
reqInfo.ObjectName = auth.MultipartFormValue(r, "key")
|
||||
|
||||
var contentReader io.Reader
|
||||
var size uint64
|
||||
var filename string
|
||||
|
||||
if content, ok := r.MultipartForm.Value["file"]; ok {
|
||||
contentReader = bytes.NewBufferString(content[0])
|
||||
size = uint64(len(content[0]))
|
||||
fullContent := strings.Join(content, "")
|
||||
contentReader = bytes.NewBufferString(fullContent)
|
||||
size = uint64(len(fullContent))
|
||||
|
||||
if reqInfo.ObjectName == "" || strings.Contains(reqInfo.ObjectName, "${filename}") {
|
||||
_, head, err := r.FormFile("file")
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not parse file field", reqInfo, err)
|
||||
return
|
||||
}
|
||||
filename = head.Filename
|
||||
}
|
||||
} else {
|
||||
file, head, err := r.FormFile("file")
|
||||
var head *multipart.FileHeader
|
||||
contentReader, head, err = r.FormFile("file")
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could get uploading file", reqInfo, err)
|
||||
h.logAndSendError(w, "could not parse file field", reqInfo, err)
|
||||
return
|
||||
}
|
||||
contentReader = file
|
||||
size = uint64(head.Size)
|
||||
reqInfo.ObjectName = strings.ReplaceAll(reqInfo.ObjectName, "${filename}", head.Filename)
|
||||
filename = head.Filename
|
||||
}
|
||||
|
||||
if reqInfo.ObjectName == "" {
|
||||
reqInfo.ObjectName = filename
|
||||
} else {
|
||||
reqInfo.ObjectName = strings.ReplaceAll(reqInfo.ObjectName, "${filename}", filename)
|
||||
}
|
||||
|
||||
if reqInfo.ObjectName == "" {
|
||||
h.logAndSendError(w, "missing object name", reqInfo, errors.GetAPIError(errors.ErrInvalidArgument))
|
||||
return
|
||||
}
|
||||
|
||||
if !policy.CheckContentLength(size) {
|
||||
h.logAndSendError(w, "invalid content-length", reqInfo, errors.GetAPIError(errors.ErrInvalidArgument))
|
||||
return
|
||||
|
@ -606,10 +626,6 @@ func checkPostPolicy(r *http.Request, reqInfo *middleware.ReqInfo, metadata map[
|
|||
if key == "content-type" {
|
||||
metadata[api.ContentType] = value
|
||||
}
|
||||
|
||||
if key == "key" {
|
||||
reqInfo.ObjectName = value
|
||||
}
|
||||
}
|
||||
|
||||
for _, cond := range policy.Conditions {
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
||||
v4 "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth/signer/v4"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
|
@ -122,6 +123,92 @@ func TestEmptyPostPolicy(t *testing.T) {
|
|||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
// if content length is greater than this value
|
||||
// data will be writen to file location.
|
||||
const maxContentSizeForFormData = 10
|
||||
|
||||
func TestPostObject(t *testing.T) {
|
||||
hc := prepareHandlerContext(t)
|
||||
|
||||
ns, bktName := "", "bucket"
|
||||
createTestBucket(hc, bktName)
|
||||
|
||||
for _, tc := range []struct {
|
||||
key string
|
||||
filename string
|
||||
content string
|
||||
objName string
|
||||
err bool
|
||||
}{
|
||||
{
|
||||
key: "user/user1/${filename}",
|
||||
filename: "object",
|
||||
content: "content",
|
||||
objName: "user/user1/object",
|
||||
},
|
||||
{
|
||||
key: "user/user1/${filename}",
|
||||
filename: "object",
|
||||
content: "maxContentSizeForFormData",
|
||||
objName: "user/user1/object",
|
||||
},
|
||||
{
|
||||
key: "user/user1/key-object",
|
||||
filename: "object",
|
||||
content: "",
|
||||
objName: "user/user1/key-object",
|
||||
},
|
||||
{
|
||||
key: "user/user1/key-object",
|
||||
filename: "object",
|
||||
content: "maxContentSizeForFormData",
|
||||
objName: "user/user1/key-object",
|
||||
},
|
||||
{
|
||||
key: "",
|
||||
filename: "object",
|
||||
content: "",
|
||||
objName: "object",
|
||||
},
|
||||
{
|
||||
key: "",
|
||||
filename: "object",
|
||||
content: "maxContentSizeForFormData",
|
||||
objName: "object",
|
||||
},
|
||||
{
|
||||
// RFC 7578, Section 4.2 requires that if a filename is provided, the
|
||||
// directory path information must not be used.
|
||||
key: "",
|
||||
filename: "dir/object",
|
||||
content: "content",
|
||||
objName: "object",
|
||||
},
|
||||
{
|
||||
key: "object",
|
||||
filename: "",
|
||||
content: "content",
|
||||
objName: "object",
|
||||
},
|
||||
{
|
||||
key: "",
|
||||
filename: "",
|
||||
err: true,
|
||||
},
|
||||
} {
|
||||
t.Run(tc.key+";"+tc.filename, func(t *testing.T) {
|
||||
w := postObjectBase(hc, ns, bktName, tc.key, tc.filename, tc.content)
|
||||
if tc.err {
|
||||
assertS3Error(hc.t, w, s3errors.GetAPIError(s3errors.ErrInternalError))
|
||||
return
|
||||
}
|
||||
assertStatus(hc.t, w, http.StatusNoContent)
|
||||
content, _ := getObject(hc, bktName, tc.objName)
|
||||
require.Equal(t, tc.content, string(content))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestPutObjectOverrideCopiesNumber(t *testing.T) {
|
||||
tc := prepareHandlerContext(t)
|
||||
|
||||
|
@ -449,3 +536,85 @@ func TestPutObjectWithContentLanguage(t *testing.T) {
|
|||
tc.Handler().HeadObjectHandler(w, r)
|
||||
require.Equal(t, expectedContentLanguage, w.Header().Get(api.ContentLanguage))
|
||||
}
|
||||
|
||||
func postObjectBase(hc *handlerContext, ns, bktName, key, filename, content string) *httptest.ResponseRecorder {
|
||||
policy := "eyJleHBpcmF0aW9uIjogIjIwMjUtMTItMDFUMTI6MDA6MDAuMDAwWiIsImNvbmRpdGlvbnMiOiBbCiBbInN0YXJ0cy13aXRoIiwgIiR4LWFtei1jcmVkZW50aWFsIiwgIiJdLAogWyJzdGFydHMtd2l0aCIsICIkeC1hbXotZGF0ZSIsICIiXSwKIFsic3RhcnRzLXdpdGgiLCAiJGtleSIsICIiXQpdfQ=="
|
||||
|
||||
timeToSign := time.Now()
|
||||
timeToSignStr := timeToSign.Format("20060102T150405Z")
|
||||
region := "default"
|
||||
service := "s3"
|
||||
|
||||
accessKeyID := "5jizSbYu8hX345aqCKDgRWKCJYHxnzxRS8e6SUYHZ8Fw0HiRkf3KbJAWBn5mRzmiyHQ3UHADGyzVXLusn1BrmAfLn"
|
||||
secretKey := "abf066d77c6744cd956a123a0b9612df587f5c14d3350ecb01b363f182dd7279"
|
||||
|
||||
creds := getCredsStr(accessKeyID, timeToSignStr, region, service)
|
||||
sign := auth.SignStr(secretKey, service, region, timeToSign, policy)
|
||||
|
||||
body, contentType, err := getMultipartFormBody(policy, creds, timeToSignStr, sign, key, filename, content)
|
||||
require.NoError(hc.t, err)
|
||||
|
||||
w, r := prepareTestPostRequest(hc, bktName, body)
|
||||
r.Header.Set(auth.ContentTypeHdr, contentType)
|
||||
r.Header.Set("X-Frostfs-Namespace", ns)
|
||||
|
||||
err = r.ParseMultipartForm(50 * 1024 * 1024)
|
||||
require.NoError(hc.t, err)
|
||||
|
||||
hc.Handler().PostObject(w, r)
|
||||
return w
|
||||
}
|
||||
|
||||
func getCredsStr(accessKeyID, timeToSign, region, service string) string {
|
||||
return accessKeyID + "/" + timeToSign + "/" + region + "/" + service + "/aws4_request"
|
||||
}
|
||||
|
||||
func getMultipartFormBody(policy, creds, date, sign, key, filename, content string) (io.Reader, string, error) {
|
||||
body := &bytes.Buffer{}
|
||||
writer := multipart.NewWriter(body)
|
||||
defer writer.Close()
|
||||
|
||||
if err := writer.WriteField("policy", policy); err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
if err := writer.WriteField("key", key); err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
if err := writer.WriteField(strings.ToLower(auth.AmzCredential), creds); err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
if err := writer.WriteField(strings.ToLower(auth.AmzDate), date); err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
if err := writer.WriteField(strings.ToLower(auth.AmzSignature), sign); err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
file, err := writer.CreateFormFile("file", filename)
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
|
||||
if len(content) < maxContentSizeForFormData {
|
||||
if err = writer.WriteField("file", content); err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
} else {
|
||||
if _, err = file.Write([]byte(content)); err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
}
|
||||
|
||||
return body, writer.FormDataContentType(), nil
|
||||
}
|
||||
|
||||
func prepareTestPostRequest(hc *handlerContext, bktName string, payload io.Reader) (*httptest.ResponseRecorder, *http.Request) {
|
||||
w := httptest.NewRecorder()
|
||||
r := httptest.NewRequest(http.MethodPost, defaultURL+bktName, payload)
|
||||
|
||||
reqInfo := middleware.NewReqInfo(w, r, middleware.ObjectRequest{Bucket: bktName}, "")
|
||||
r = r.WithContext(middleware.SetReqInfo(hc.Context(), reqInfo))
|
||||
|
||||
return w, r
|
||||
}
|
||||
|
|
|
@ -176,6 +176,9 @@ type ListObjectsVersionsResponse struct {
|
|||
DeleteMarker []DeleteMarkerEntry `xml:"DeleteMarker"`
|
||||
Version []ObjectVersionResponse `xml:"Version"`
|
||||
CommonPrefixes []CommonPrefix `xml:"CommonPrefixes"`
|
||||
Prefix string `xml:"Prefix"`
|
||||
Delimiter string `xml:"Delimiter,omitempty"`
|
||||
MaxKeys int `xml:"MaxKeys"`
|
||||
}
|
||||
|
||||
// VersioningConfiguration contains VersioningConfiguration XML representation.
|
||||
|
|
|
@ -62,6 +62,7 @@ const (
|
|||
AmzMaxParts = "X-Amz-Max-Parts"
|
||||
AmzPartNumberMarker = "X-Amz-Part-Number-Marker"
|
||||
AmzStorageClass = "X-Amz-Storage-Class"
|
||||
AmzForceBucketDelete = "X-Amz-Force-Delete-Bucket"
|
||||
|
||||
AmzServerSideEncryptionCustomerAlgorithm = "x-amz-server-side-encryption-customer-algorithm"
|
||||
AmzServerSideEncryptionCustomerKey = "x-amz-server-side-encryption-customer-key"
|
||||
|
|
|
@ -233,7 +233,7 @@ func (c *Cache) PutSettings(owner user.ID, bktInfo *data.BucketInfo, settings *d
|
|||
}
|
||||
|
||||
func (c *Cache) GetCORS(owner user.ID, bkt *data.BucketInfo) *data.CORSConfiguration {
|
||||
key := bkt.Name + bkt.CORSObjectName()
|
||||
key := bkt.CORSObjectName()
|
||||
|
||||
if !c.accessCache.Get(owner, key) {
|
||||
return nil
|
||||
|
@ -243,7 +243,7 @@ func (c *Cache) GetCORS(owner user.ID, bkt *data.BucketInfo) *data.CORSConfigura
|
|||
}
|
||||
|
||||
func (c *Cache) PutCORS(owner user.ID, bkt *data.BucketInfo, cors *data.CORSConfiguration) {
|
||||
key := bkt.Name + bkt.CORSObjectName()
|
||||
key := bkt.CORSObjectName()
|
||||
|
||||
if err := c.systemCache.PutCORS(key, cors); err != nil {
|
||||
c.logger.Warn(logs.CouldntCacheCors, zap.String("bucket", bkt.Name), zap.Error(err))
|
||||
|
@ -255,5 +255,5 @@ func (c *Cache) PutCORS(owner user.ID, bkt *data.BucketInfo, cors *data.CORSConf
|
|||
}
|
||||
|
||||
func (c *Cache) DeleteCORS(bktInfo *data.BucketInfo) {
|
||||
c.systemCache.Delete(bktInfo.Name + bktInfo.CORSObjectName())
|
||||
c.systemCache.Delete(bktInfo.CORSObjectName())
|
||||
}
|
||||
|
|
|
@ -10,6 +10,8 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -37,31 +39,36 @@ func (n *Layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
|
|||
}
|
||||
|
||||
prm := PrmObjectCreate{
|
||||
Container: p.BktInfo.CID,
|
||||
Payload: &buf,
|
||||
Filepath: p.BktInfo.CORSObjectName(),
|
||||
CreationTime: TimeNow(ctx),
|
||||
CopiesNumber: p.CopiesNumbers,
|
||||
}
|
||||
|
||||
_, objID, _, _, err := n.objectPutAndHash(ctx, prm, p.BktInfo)
|
||||
var corsBkt *data.BucketInfo
|
||||
if n.corsCnrInfo == nil {
|
||||
corsBkt = p.BktInfo
|
||||
prm.CopiesNumber = p.CopiesNumbers
|
||||
} else {
|
||||
corsBkt = n.corsCnrInfo
|
||||
prm.PrmAuth.PrivateKey = &n.gateKey.PrivateKey
|
||||
}
|
||||
|
||||
prm.Container = corsBkt.CID
|
||||
|
||||
_, objID, _, _, err := n.objectPutAndHash(ctx, prm, corsBkt)
|
||||
if err != nil {
|
||||
return fmt.Errorf("put system object: %w", err)
|
||||
return fmt.Errorf("put cors object: %w", err)
|
||||
}
|
||||
|
||||
objIDsToDelete, err := n.treeService.PutBucketCORS(ctx, p.BktInfo, objID)
|
||||
objIDToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
|
||||
if err != nil && !objIDToDeleteNotFound {
|
||||
objsToDelete, err := n.treeService.PutBucketCORS(ctx, p.BktInfo, newAddress(corsBkt.CID, objID))
|
||||
objToDeleteNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
|
||||
if err != nil && !objToDeleteNotFound {
|
||||
return err
|
||||
}
|
||||
|
||||
if !objIDToDeleteNotFound {
|
||||
for _, id := range objIDsToDelete {
|
||||
if err = n.objectDelete(ctx, p.BktInfo, id); err != nil {
|
||||
n.reqLogger(ctx).Error(logs.CouldntDeleteCorsObject, zap.Error(err),
|
||||
zap.String("cnrID", p.BktInfo.CID.EncodeToString()),
|
||||
zap.String("objID", id.EncodeToString()))
|
||||
}
|
||||
if !objToDeleteNotFound {
|
||||
for _, addr := range objsToDelete {
|
||||
n.deleteCORSObject(ctx, p.BktInfo, addr)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -70,12 +77,25 @@ func (n *Layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// deleteCORSObject removes object and logs in case of error.
|
||||
func (n *Layer) deleteCORSObject(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) {
|
||||
var prmAuth PrmAuth
|
||||
corsBkt := bktInfo
|
||||
if !addr.Container().Equals(bktInfo.CID) && !addr.Container().Equals(cid.ID{}) {
|
||||
corsBkt = &data.BucketInfo{CID: addr.Container()}
|
||||
prmAuth.PrivateKey = &n.gateKey.PrivateKey
|
||||
}
|
||||
|
||||
if err := n.objectDeleteWithAuth(ctx, corsBkt, addr.Object(), prmAuth); err != nil {
|
||||
n.reqLogger(ctx).Error(logs.CouldntDeleteCorsObject, zap.Error(err),
|
||||
zap.String("cnrID", corsBkt.CID.EncodeToString()),
|
||||
zap.String("objID", addr.Object().EncodeToString()))
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Layer) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*data.CORSConfiguration, error) {
|
||||
cors, err := n.getCORS(ctx, bktInfo)
|
||||
if err != nil {
|
||||
if errorsStd.Is(err, ErrNodeNotFound) {
|
||||
return nil, fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchCORSConfiguration), err.Error())
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -83,16 +103,15 @@ func (n *Layer) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*d
|
|||
}
|
||||
|
||||
func (n *Layer) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error {
|
||||
objIDs, err := n.treeService.DeleteBucketCORS(ctx, bktInfo)
|
||||
objIDNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
|
||||
if err != nil && !objIDNotFound {
|
||||
objs, err := n.treeService.DeleteBucketCORS(ctx, bktInfo)
|
||||
objNotFound := errorsStd.Is(err, ErrNoNodeToRemove)
|
||||
if err != nil && !objNotFound {
|
||||
return err
|
||||
}
|
||||
if !objIDNotFound {
|
||||
for _, id := range objIDs {
|
||||
if err = n.objectDelete(ctx, bktInfo, id); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !objNotFound {
|
||||
for _, addr := range objs {
|
||||
n.deleteCORSObject(ctx, bktInfo, addr)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -78,8 +78,32 @@ type PrmAuth struct {
|
|||
PrivateKey *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
// PrmObjectRead groups parameters of FrostFS.ReadObject operation.
|
||||
type PrmObjectRead struct {
|
||||
// PrmObjectHead groups parameters of FrostFS.HeadObject operation.
|
||||
type PrmObjectHead struct {
|
||||
// Authentication parameters.
|
||||
PrmAuth
|
||||
|
||||
// Container to read the object header from.
|
||||
Container cid.ID
|
||||
|
||||
// ID of the object for which to read the header.
|
||||
Object oid.ID
|
||||
}
|
||||
|
||||
// PrmObjectGet groups parameters of FrostFS.GetObject operation.
|
||||
type PrmObjectGet struct {
|
||||
// Authentication parameters.
|
||||
PrmAuth
|
||||
|
||||
// Container to read the object header from.
|
||||
Container cid.ID
|
||||
|
||||
// ID of the object for which to read the header.
|
||||
Object oid.ID
|
||||
}
|
||||
|
||||
// PrmObjectRange groups parameters of FrostFS.RangeObject operation.
|
||||
type PrmObjectRange struct {
|
||||
// Authentication parameters.
|
||||
PrmAuth
|
||||
|
||||
|
@ -89,20 +113,14 @@ type PrmObjectRead struct {
|
|||
// ID of the object for which to read the header.
|
||||
Object oid.ID
|
||||
|
||||
// Flag to read object header.
|
||||
WithHeader bool
|
||||
|
||||
// Flag to read object payload. False overlaps payload range.
|
||||
WithPayload bool
|
||||
|
||||
// Offset-length range of the object payload to be read.
|
||||
PayloadRange [2]uint64
|
||||
}
|
||||
|
||||
// ObjectPart represents partially read FrostFS object.
|
||||
type ObjectPart struct {
|
||||
// Object header with optional in-memory payload part.
|
||||
Head *object.Object
|
||||
// Object represents full read FrostFS object.
|
||||
type Object struct {
|
||||
// Object header (doesn't contain payload).
|
||||
Header object.Object
|
||||
|
||||
// Object payload part encapsulated in io.Reader primitive.
|
||||
// Returns ErrAccessDenied on read access violation.
|
||||
|
@ -213,13 +231,15 @@ type FrostFS interface {
|
|||
// It returns any error encountered which prevented the removal request from being sent.
|
||||
DeleteContainer(context.Context, cid.ID, *session.Container) error
|
||||
|
||||
// ReadObject reads a part of the object from the FrostFS container by identifier.
|
||||
// Exact part is returned according to the parameters:
|
||||
// * with header only: empty payload (both in-mem and reader parts are nil);
|
||||
// * with payload only: header is nil (zero range means full payload);
|
||||
// * with header and payload: full in-mem object, payload reader is nil.
|
||||
// HeadObject reads an info of the object from the FrostFS container by identifier.
|
||||
//
|
||||
// WithHeader or WithPayload is true. Range length is positive if offset is positive.
|
||||
// It returns ErrAccessDenied on read access violation.
|
||||
//
|
||||
// It returns exactly one non-nil value. It returns any error encountered which
|
||||
// prevented the object header from being read.
|
||||
HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Object, error)
|
||||
|
||||
// GetObject reads an object from the FrostFS container by identifier.
|
||||
//
|
||||
// Payload reader should be closed if it is no longer needed.
|
||||
//
|
||||
|
@ -227,7 +247,17 @@ type FrostFS interface {
|
|||
//
|
||||
// It returns exactly one non-nil value. It returns any error encountered which
|
||||
// prevented the object header from being read.
|
||||
ReadObject(context.Context, PrmObjectRead) (*ObjectPart, error)
|
||||
GetObject(ctx context.Context, prm PrmObjectGet) (*Object, error)
|
||||
|
||||
// RangeObject reads a part of object from the FrostFS container by identifier.
|
||||
//
|
||||
// Payload reader should be closed if it is no longer needed.
|
||||
//
|
||||
// It returns ErrAccessDenied on read access violation.
|
||||
//
|
||||
// It returns exactly one non-nil value. It returns any error encountered which
|
||||
// prevented the object header from being read.
|
||||
RangeObject(ctx context.Context, prm PrmObjectRange) (io.ReadCloser, error)
|
||||
|
||||
// CreateObject creates and saves a parameterized object in the FrostFS container.
|
||||
// It sets 'Timestamp' attribute to the current time.
|
||||
|
|
|
@ -204,10 +204,10 @@ func (t *TestFrostFS) UserContainers(context.Context, PrmUserContainers) ([]cid.
|
|||
return res, nil
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*ObjectPart, error) {
|
||||
func (t *TestFrostFS) retrieveObject(ctx context.Context, cnrID cid.ID, objID oid.ID) (*object.Object, error) {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(prm.Container)
|
||||
addr.SetObject(prm.Object)
|
||||
addr.SetContainer(cnrID)
|
||||
addr.SetObject(objID)
|
||||
|
||||
sAddr := addr.EncodeToString()
|
||||
|
||||
|
@ -217,26 +217,44 @@ func (t *TestFrostFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*Objec
|
|||
|
||||
if obj, ok := t.objects[sAddr]; ok {
|
||||
owner := getBearerOwner(ctx)
|
||||
if !t.checkAccess(prm.Container, owner) {
|
||||
if !t.checkAccess(cnrID, owner) {
|
||||
return nil, ErrAccessDenied
|
||||
}
|
||||
|
||||
payload := obj.Payload()
|
||||
|
||||
if prm.PayloadRange[0]+prm.PayloadRange[1] > 0 {
|
||||
off := prm.PayloadRange[0]
|
||||
payload = payload[off : off+prm.PayloadRange[1]]
|
||||
}
|
||||
|
||||
return &ObjectPart{
|
||||
Head: obj,
|
||||
Payload: io.NopCloser(bytes.NewReader(payload)),
|
||||
}, nil
|
||||
return obj, nil
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("%w: %s", &apistatus.ObjectNotFound{}, addr)
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) HeadObject(ctx context.Context, prm PrmObjectHead) (*object.Object, error) {
|
||||
return t.retrieveObject(ctx, prm.Container, prm.Object)
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) GetObject(ctx context.Context, prm PrmObjectGet) (*Object, error) {
|
||||
obj, err := t.retrieveObject(ctx, prm.Container, prm.Object)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &Object{
|
||||
Header: *obj,
|
||||
Payload: io.NopCloser(bytes.NewReader(obj.Payload())),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) RangeObject(ctx context.Context, prm PrmObjectRange) (io.ReadCloser, error) {
|
||||
obj, err := t.retrieveObject(ctx, prm.Container, prm.Object)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
off := prm.PayloadRange[0]
|
||||
payload := obj.Payload()[off : off+prm.PayloadRange[1]]
|
||||
|
||||
return io.NopCloser(bytes.NewReader(payload)), nil
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) {
|
||||
b := make([]byte, 32)
|
||||
if _, err := io.ReadFull(rand.Reader, b); err != nil {
|
||||
|
|
|
@ -54,6 +54,8 @@ type (
|
|||
cache *Cache
|
||||
treeService TreeService
|
||||
features FeatureSettings
|
||||
gateKey *keys.PrivateKey
|
||||
corsCnrInfo *data.BucketInfo
|
||||
}
|
||||
|
||||
Config struct {
|
||||
|
@ -64,6 +66,8 @@ type (
|
|||
Resolver BucketResolver
|
||||
TreeService TreeService
|
||||
Features FeatureSettings
|
||||
GateKey *keys.PrivateKey
|
||||
CORSCnrInfo *data.BucketInfo
|
||||
}
|
||||
|
||||
// AnonymousKey contains data for anonymous requests.
|
||||
|
@ -166,6 +170,7 @@ type (
|
|||
DeleteBucketParams struct {
|
||||
BktInfo *data.BucketInfo
|
||||
SessionToken *session.Container
|
||||
SkipCheck bool
|
||||
}
|
||||
|
||||
// ListObjectVersionsParams stores list objects versions parameters.
|
||||
|
@ -236,6 +241,8 @@ func NewLayer(log *zap.Logger, frostFS FrostFS, config *Config) *Layer {
|
|||
cache: config.Cache,
|
||||
treeService: config.TreeService,
|
||||
features: config.Features,
|
||||
gateKey: config.GateKey,
|
||||
corsCnrInfo: config.CORSCnrInfo,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -288,6 +295,10 @@ func (n *Layer) reqLogger(ctx context.Context) *zap.Logger {
|
|||
}
|
||||
|
||||
func (n *Layer) prepareAuthParameters(ctx context.Context, prm *PrmAuth, bktOwner user.ID) {
|
||||
if prm.BearerToken != nil || prm.PrivateKey != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if bd, err := middleware.GetBoxData(ctx); err == nil && bd.Gate.BearerToken != nil {
|
||||
if bd.Gate.BearerToken.Impersonate() || bktOwner.Equals(bearer.ResolveIssuer(*bd.Gate.BearerToken)) {
|
||||
prm.BearerToken = bd.Gate.BearerToken
|
||||
|
@ -732,7 +743,7 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
|
|||
}
|
||||
|
||||
var parts []*data.PartInfo
|
||||
if err = json.Unmarshal(combinedObj.Payload(), &parts); err != nil {
|
||||
if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
|
||||
return fmt.Errorf("unmarshal combined object parts: %w", err)
|
||||
}
|
||||
|
||||
|
@ -794,18 +805,35 @@ func (n *Layer) ResolveBucket(ctx context.Context, name string) (cid.ID, error)
|
|||
}
|
||||
|
||||
func (n *Layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
|
||||
res, _, err := n.getAllObjectsVersions(ctx, commonVersionsListingParams{
|
||||
BktInfo: p.BktInfo,
|
||||
MaxKeys: 1,
|
||||
})
|
||||
if !p.SkipCheck {
|
||||
res, _, err := n.getAllObjectsVersions(ctx, commonVersionsListingParams{
|
||||
BktInfo: p.BktInfo,
|
||||
MaxKeys: 1,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(res) != 0 {
|
||||
return errors.GetAPIError(errors.ErrBucketNotEmpty)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(res) != 0 {
|
||||
return errors.GetAPIError(errors.ErrBucketNotEmpty)
|
||||
}
|
||||
}
|
||||
|
||||
n.cache.DeleteBucket(p.BktInfo)
|
||||
return n.frostFS.DeleteContainer(ctx, p.BktInfo.CID, p.SessionToken)
|
||||
|
||||
corsObj, err := n.treeService.GetBucketCORS(ctx, p.BktInfo)
|
||||
if err != nil {
|
||||
n.reqLogger(ctx).Error(logs.GetBucketCors, zap.Error(err))
|
||||
}
|
||||
|
||||
err = n.frostFS.DeleteContainer(ctx, p.BktInfo.CID, p.SessionToken)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete container: %w", err)
|
||||
}
|
||||
|
||||
if !corsObj.Container().Equals(p.BktInfo.CID) && !corsObj.Container().Equals(cid.ID{}) {
|
||||
n.deleteCORSObject(ctx, p.BktInfo, corsObj)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -603,10 +603,10 @@ func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn
|
|||
|
||||
if len(parts) > p.MaxParts {
|
||||
res.IsTruncated = true
|
||||
res.NextPartNumberMarker = parts[p.MaxParts-1].PartNumber
|
||||
parts = parts[:p.MaxParts]
|
||||
}
|
||||
|
||||
res.NextPartNumberMarker = parts[len(parts)-1].PartNumber
|
||||
res.Parts = parts
|
||||
|
||||
return &res, nil
|
||||
|
|
|
@ -68,20 +68,14 @@ func newAddress(cnr cid.ID, obj oid.ID) oid.Address {
|
|||
|
||||
// objectHead returns all object's headers.
|
||||
func (n *Layer) objectHead(ctx context.Context, bktInfo *data.BucketInfo, idObj oid.ID) (*object.Object, error) {
|
||||
prm := PrmObjectRead{
|
||||
Container: bktInfo.CID,
|
||||
Object: idObj,
|
||||
WithHeader: true,
|
||||
prm := PrmObjectHead{
|
||||
Container: bktInfo.CID,
|
||||
Object: idObj,
|
||||
}
|
||||
|
||||
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
|
||||
|
||||
res, err := n.frostFS.ReadObject(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.Head, nil
|
||||
return n.frostFS.HeadObject(ctx, prm)
|
||||
}
|
||||
|
||||
func (n *Layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Reader, error) {
|
||||
|
@ -100,7 +94,7 @@ func (n *Layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Re
|
|||
}
|
||||
|
||||
var parts []*data.PartInfo
|
||||
if err = json.Unmarshal(combinedObj.Payload(), &parts); err != nil {
|
||||
if err = json.NewDecoder(combinedObj.Payload).Decode(&parts); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal combined object parts: %w", err)
|
||||
}
|
||||
|
||||
|
@ -132,16 +126,27 @@ func (n *Layer) initObjectPayloadReader(ctx context.Context, p getParams) (io.Re
|
|||
// initializes payload reader of the FrostFS object.
|
||||
// Zero range corresponds to full payload (panics if only offset is set).
|
||||
func (n *Layer) initFrostFSObjectPayloadReader(ctx context.Context, p getFrostFSParams) (io.Reader, error) {
|
||||
prm := PrmObjectRead{
|
||||
Container: p.bktInfo.CID,
|
||||
Object: p.oid,
|
||||
WithPayload: true,
|
||||
PayloadRange: [2]uint64{p.off, p.ln},
|
||||
var prmAuth PrmAuth
|
||||
n.prepareAuthParameters(ctx, &prmAuth, p.bktInfo.Owner)
|
||||
|
||||
if p.off+p.ln != 0 {
|
||||
prm := PrmObjectRange{
|
||||
PrmAuth: prmAuth,
|
||||
Container: p.bktInfo.CID,
|
||||
Object: p.oid,
|
||||
PayloadRange: [2]uint64{p.off, p.ln},
|
||||
}
|
||||
|
||||
return n.frostFS.RangeObject(ctx, prm)
|
||||
}
|
||||
|
||||
n.prepareAuthParameters(ctx, &prm.PrmAuth, p.bktInfo.Owner)
|
||||
prm := PrmObjectGet{
|
||||
PrmAuth: prmAuth,
|
||||
Container: p.bktInfo.CID,
|
||||
Object: p.oid,
|
||||
}
|
||||
|
||||
res, err := n.frostFS.ReadObject(ctx, prm)
|
||||
res, err := n.frostFS.GetObject(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -150,22 +155,25 @@ func (n *Layer) initFrostFSObjectPayloadReader(ctx context.Context, p getFrostFS
|
|||
}
|
||||
|
||||
// objectGet returns an object with payload in the object.
|
||||
func (n *Layer) objectGet(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (*object.Object, error) {
|
||||
prm := PrmObjectRead{
|
||||
Container: bktInfo.CID,
|
||||
Object: objID,
|
||||
WithHeader: true,
|
||||
WithPayload: true,
|
||||
func (n *Layer) objectGet(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (*Object, error) {
|
||||
return n.objectGetBase(ctx, bktInfo, objID, PrmAuth{})
|
||||
}
|
||||
|
||||
// objectGetWithAuth returns an object with payload in the object. Uses provided PrmAuth.
|
||||
func (n *Layer) objectGetWithAuth(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, auth PrmAuth) (*Object, error) {
|
||||
return n.objectGetBase(ctx, bktInfo, objID, auth)
|
||||
}
|
||||
|
||||
func (n *Layer) objectGetBase(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID, auth PrmAuth) (*Object, error) {
|
||||
prm := PrmObjectGet{
|
||||
PrmAuth: auth,
|
||||
Container: bktInfo.CID,
|
||||
Object: objID,
|
||||
}
|
||||
|
||||
n.prepareAuthParameters(ctx, &prm.PrmAuth, bktInfo.Owner)
|
||||
|
||||
res, err := n.frostFS.ReadObject(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.Head, nil
|
||||
return n.frostFS.GetObject(ctx, prm)
|
||||
}
|
||||
|
||||
// MimeByFilePath detect mime type by file path extension.
|
||||
|
@ -460,7 +468,17 @@ func (n *Layer) headVersion(ctx context.Context, bkt *data.BucketInfo, p *HeadOb
|
|||
|
||||
// objectDelete puts tombstone object into frostfs.
|
||||
func (n *Layer) objectDelete(ctx context.Context, bktInfo *data.BucketInfo, idObj oid.ID) error {
|
||||
return n.objectDeleteBase(ctx, bktInfo, idObj, PrmAuth{})
|
||||
}
|
||||
|
||||
// objectDeleteWithAuth puts tombstone object into frostfs. Uses provided PrmAuth.
|
||||
func (n *Layer) objectDeleteWithAuth(ctx context.Context, bktInfo *data.BucketInfo, idObj oid.ID, auth PrmAuth) error {
|
||||
return n.objectDeleteBase(ctx, bktInfo, idObj, auth)
|
||||
}
|
||||
|
||||
func (n *Layer) objectDeleteBase(ctx context.Context, bktInfo *data.BucketInfo, idObj oid.ID, auth PrmAuth) error {
|
||||
prm := PrmObjectDelete{
|
||||
PrmAuth: auth,
|
||||
Container: bktInfo.CID,
|
||||
Object: idObj,
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
|
@ -159,24 +160,30 @@ func (n *Layer) getCORS(ctx context.Context, bkt *data.BucketInfo) (*data.CORSCo
|
|||
return cors, nil
|
||||
}
|
||||
|
||||
objID, err := n.treeService.GetBucketCORS(ctx, bkt)
|
||||
objIDNotFound := errorsStd.Is(err, ErrNodeNotFound)
|
||||
if err != nil && !objIDNotFound {
|
||||
addr, err := n.treeService.GetBucketCORS(ctx, bkt)
|
||||
objNotFound := errorsStd.Is(err, ErrNodeNotFound)
|
||||
if err != nil && !objNotFound {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if objIDNotFound {
|
||||
if objNotFound {
|
||||
return nil, fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchCORSConfiguration), err.Error())
|
||||
}
|
||||
|
||||
obj, err := n.objectGet(ctx, bkt, objID)
|
||||
var prmAuth PrmAuth
|
||||
corsBkt := bkt
|
||||
if !addr.Container().Equals(bkt.CID) && !addr.Container().Equals(cid.ID{}) {
|
||||
corsBkt = &data.BucketInfo{CID: addr.Container()}
|
||||
prmAuth.PrivateKey = &n.gateKey.PrivateKey
|
||||
}
|
||||
|
||||
obj, err := n.objectGetWithAuth(ctx, corsBkt, addr.Object(), prmAuth)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("get cors object: %w", err)
|
||||
}
|
||||
|
||||
cors := &data.CORSConfiguration{}
|
||||
|
||||
if err = xml.Unmarshal(obj.Payload(), &cors); err != nil {
|
||||
if err = xml.NewDecoder(obj.Payload).Decode(&cors); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal cors: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -110,28 +110,31 @@ func (t *TreeServiceMock) GetSettingsNode(_ context.Context, bktInfo *data.Bucke
|
|||
return settings, nil
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) GetBucketCORS(_ context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
|
||||
func (t *TreeServiceMock) GetBucketCORS(_ context.Context, bktInfo *data.BucketInfo) (oid.Address, error) {
|
||||
systemMap, ok := t.system[bktInfo.CID.EncodeToString()]
|
||||
if !ok {
|
||||
return oid.ID{}, nil
|
||||
return oid.Address{}, nil
|
||||
}
|
||||
|
||||
node, ok := systemMap["cors"]
|
||||
if !ok {
|
||||
return oid.ID{}, nil
|
||||
return oid.Address{}, nil
|
||||
}
|
||||
|
||||
return node.OID, nil
|
||||
var addr oid.Address
|
||||
addr.SetContainer(bktInfo.CID)
|
||||
addr.SetObject(node.OID)
|
||||
return addr, nil
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketInfo, objID oid.ID) ([]oid.ID, error) {
|
||||
func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) {
|
||||
systemMap, ok := t.system[bktInfo.CID.EncodeToString()]
|
||||
if !ok {
|
||||
systemMap = make(map[string]*data.BaseNodeVersion)
|
||||
}
|
||||
|
||||
systemMap["cors"] = &data.BaseNodeVersion{
|
||||
OID: objID,
|
||||
OID: addr.Object(),
|
||||
}
|
||||
|
||||
t.system[bktInfo.CID.EncodeToString()] = systemMap
|
||||
|
@ -139,7 +142,7 @@ func (t *TreeServiceMock) PutBucketCORS(_ context.Context, bktInfo *data.BucketI
|
|||
return nil, ErrNoNodeToRemove
|
||||
}
|
||||
|
||||
func (t *TreeServiceMock) DeleteBucketCORS(context.Context, *data.BucketInfo) ([]oid.ID, error) {
|
||||
func (t *TreeServiceMock) DeleteBucketCORS(context.Context, *data.BucketInfo) ([]oid.Address, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
|
|
|
@ -21,17 +21,17 @@ type TreeService interface {
|
|||
// GetBucketCORS gets an object id that corresponds to object with bucket CORS.
|
||||
//
|
||||
// If object id is not found returns ErrNodeNotFound error.
|
||||
GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error)
|
||||
GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error)
|
||||
|
||||
// PutBucketCORS puts a node to a system tree and returns objectID of a previous cors config which must be deleted in FrostFS.
|
||||
//
|
||||
// If object ids to remove is not found returns ErrNoNodeToRemove error.
|
||||
PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) ([]oid.ID, error)
|
||||
PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error)
|
||||
|
||||
// DeleteBucketCORS removes a node from a system tree and returns objID which must be deleted in FrostFS.
|
||||
//
|
||||
// If object ids to remove is not found returns ErrNoNodeToRemove error.
|
||||
DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.ID, error)
|
||||
DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.Address, error)
|
||||
|
||||
GetObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion) (map[string]string, error)
|
||||
PutObjectTagging(ctx context.Context, bktInfo *data.BucketInfo, objVersion *data.NodeVersion, tagSet map[string]string) error
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"runtime/trace"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/acl"
|
||||
|
@ -49,8 +48,6 @@ var ErrNoAuthorizationHeader = errors.New("no authorization header")
|
|||
func Auth(center Center, log *zap.Logger) Func {
|
||||
return func(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
reg := trace.StartRegion(r.Context(), "handler:auth")
|
||||
|
||||
ctx := r.Context()
|
||||
reqInfo := GetReqInfo(ctx)
|
||||
reqInfo.User = "anon"
|
||||
|
@ -67,7 +64,6 @@ func Auth(center Center, log *zap.Logger) Func {
|
|||
if _, wrErr := WriteErrorResponse(w, GetReqInfo(r.Context()), err); wrErr != nil {
|
||||
reqLogOrDefault(ctx, log).Error(logs.FailedToWriteResponse, zap.Error(wrErr))
|
||||
}
|
||||
reg.End()
|
||||
return
|
||||
}
|
||||
} else {
|
||||
|
@ -79,8 +75,6 @@ func Auth(center Center, log *zap.Logger) Func {
|
|||
reqLogOrDefault(ctx, log).Debug(logs.SuccessfulAuth, zap.String("accessKeyID", box.AuthHeaders.AccessKeyID))
|
||||
}
|
||||
|
||||
reg.End()
|
||||
|
||||
h.ServeHTTP(w, r.WithContext(ctx))
|
||||
})
|
||||
}
|
||||
|
@ -93,13 +87,11 @@ type FrostFSIDValidator interface {
|
|||
func FrostfsIDValidation(frostfsID FrostFSIDValidator, log *zap.Logger) Func {
|
||||
return func(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
reg := trace.StartRegion(r.Context(), "handler:ffsid")
|
||||
ctx := r.Context()
|
||||
bd, err := GetBoxData(ctx)
|
||||
if err != nil || bd.Gate.BearerToken == nil {
|
||||
reqLogOrDefault(ctx, log).Debug(logs.AnonRequestSkipFrostfsIDValidation)
|
||||
h.ServeHTTP(w, r)
|
||||
reg.End()
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -108,11 +100,9 @@ func FrostfsIDValidation(frostfsID FrostFSIDValidator, log *zap.Logger) Func {
|
|||
if _, wrErr := WriteErrorResponse(w, GetReqInfo(r.Context()), err); wrErr != nil {
|
||||
reqLogOrDefault(ctx, log).Error(logs.FailedToWriteResponse, zap.Error(wrErr))
|
||||
}
|
||||
reg.End()
|
||||
return
|
||||
}
|
||||
|
||||
reg.End()
|
||||
h.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -8,7 +8,6 @@ import (
|
|||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"runtime/trace"
|
||||
"strings"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
|
@ -84,7 +83,6 @@ type PolicyConfig struct {
|
|||
func PolicyCheck(cfg PolicyConfig) Func {
|
||||
return func(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
reg := trace.StartRegion(r.Context(), "handler:policy")
|
||||
ctx := r.Context()
|
||||
if err := policyCheck(r, cfg); err != nil {
|
||||
reqLogOrDefault(ctx, cfg.Log).Error(logs.PolicyValidationFailed, zap.Error(err))
|
||||
|
@ -92,11 +90,9 @@ func PolicyCheck(cfg PolicyConfig) Func {
|
|||
if _, wrErr := WriteErrorResponse(w, GetReqInfo(ctx), err); wrErr != nil {
|
||||
reqLogOrDefault(ctx, cfg.Log).Error(logs.FailedToWriteResponse, zap.Error(wrErr))
|
||||
}
|
||||
reg.End()
|
||||
return
|
||||
}
|
||||
|
||||
reg.End()
|
||||
h.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -69,8 +69,10 @@ var deploymentID = uuid.Must(uuid.NewRandom())
|
|||
|
||||
var (
|
||||
// De-facto standard header keys.
|
||||
xForwardedFor = http.CanonicalHeaderKey("X-Forwarded-For")
|
||||
xRealIP = http.CanonicalHeaderKey("X-Real-IP")
|
||||
xForwardedFor = http.CanonicalHeaderKey("X-Forwarded-For")
|
||||
xRealIP = http.CanonicalHeaderKey("X-Real-IP")
|
||||
xForwardedProto = http.CanonicalHeaderKey("X-Forwarded-Proto")
|
||||
xForwardedScheme = http.CanonicalHeaderKey("X-Forwarded-Scheme")
|
||||
|
||||
// RFC7239 defines a new "Forwarded: " header designed to replace the
|
||||
// existing use of X-Forwarded-* headers.
|
||||
|
@ -79,6 +81,9 @@ var (
|
|||
// Allows for a sub-match of the first value after 'for=' to the next
|
||||
// comma, semi-colon or space. The match is case-insensitive.
|
||||
forRegex = regexp.MustCompile(`(?i)(?:for=)([^(;|, )]+)(.*)`)
|
||||
// Allows for a sub-match for the first instance of scheme (http|https)
|
||||
// prefixed by 'proto='. The match is case-insensitive.
|
||||
protoRegex = regexp.MustCompile(`(?i)^(;|,| )+(?:proto=)(https|http)`)
|
||||
)
|
||||
|
||||
// NewReqInfo returns new ReqInfo based on parameters.
|
||||
|
@ -291,3 +296,31 @@ func getSourceIP(r *http.Request) string {
|
|||
}
|
||||
return raddr
|
||||
}
|
||||
|
||||
// GetSourceScheme retrieves the scheme from the X-Forwarded-Proto and RFC7239
|
||||
// Forwarded headers (in that order).
|
||||
func GetSourceScheme(r *http.Request) string {
|
||||
var scheme string
|
||||
|
||||
// Retrieve the scheme from X-Forwarded-Proto.
|
||||
if proto := r.Header.Get(xForwardedProto); proto != "" {
|
||||
scheme = strings.ToLower(proto)
|
||||
} else if proto = r.Header.Get(xForwardedScheme); proto != "" {
|
||||
scheme = strings.ToLower(proto)
|
||||
} else if proto := r.Header.Get(forwarded); proto != "" {
|
||||
// match should contain at least two elements if the protocol was
|
||||
// specified in the Forwarded header. The first element will always be
|
||||
// the 'for=', which we ignore, subsequently we proceed to look for
|
||||
// 'proto=' which should precede right after `for=` if not
|
||||
// we simply ignore the values and return empty. This is in line
|
||||
// with the approach we took for returning first ip from multiple
|
||||
// params.
|
||||
if match := forRegex.FindStringSubmatch(proto); len(match) > 1 {
|
||||
if match = protoRegex.FindStringSubmatch(match[2]); len(match) > 1 {
|
||||
scheme = strings.ToLower(match[2])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return scheme
|
||||
}
|
||||
|
|
|
@ -3,7 +3,6 @@ package middleware
|
|||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
gotrace "runtime/trace"
|
||||
"sync"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-observability/tracing"
|
||||
|
@ -17,12 +16,6 @@ import (
|
|||
func Tracing() Func {
|
||||
return func(h http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if gotrace.IsEnabled() {
|
||||
pctx, task := gotrace.NewTask(r.Context(), "request")
|
||||
defer task.End()
|
||||
r = r.WithContext(pctx)
|
||||
}
|
||||
|
||||
appCtx, span := StartHTTPServerSpan(r, "REQUEST S3")
|
||||
reqInfo := GetReqInfo(r.Context())
|
||||
reqInfo.TraceID = span.SpanContext().TraceID().String()
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"runtime/trace"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
|
@ -155,14 +154,6 @@ func NewRouter(cfg Config) *chi.Mux {
|
|||
}))
|
||||
|
||||
defaultRouter := chi.NewRouter()
|
||||
defaultRouter.Use(func(handler http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
reg := trace.StartRegion(r.Context(), "handler:s3")
|
||||
defer reg.End()
|
||||
handler.ServeHTTP(w, r)
|
||||
})
|
||||
})
|
||||
|
||||
defaultRouter.Mount(fmt.Sprintf("/{%s}", s3middleware.BucketURLPrm), bucketRouter(cfg.Handler, cfg.Log))
|
||||
defaultRouter.Get("/", named("ListBuckets", cfg.Handler.ListBucketsHandler))
|
||||
attachErrorHandler(defaultRouter)
|
||||
|
|
|
@ -50,7 +50,7 @@ func createFrostFS(ctx context.Context, log *zap.Logger, cfg PoolConfig) (*frost
|
|||
return nil, fmt.Errorf("dial pool: %w", err)
|
||||
}
|
||||
|
||||
return frostfs.NewAuthmateFrostFS(frostfs.NewFrostFS(p, cfg.Key)), nil
|
||||
return frostfs.NewAuthmateFrostFS(frostfs.NewFrostFS(p, cfg.Key), log), nil
|
||||
}
|
||||
|
||||
func parsePolicies(val string) (authmate.ContainerPolicies, error) {
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/auth"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/cache"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/handler"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||
s3middleware "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||
|
@ -37,6 +38,8 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/wallet"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/metrics"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/tree"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
|
||||
|
@ -104,6 +107,7 @@ type (
|
|||
retryMaxAttempts int
|
||||
retryMaxBackoff time.Duration
|
||||
retryStrategy handler.RetryStrategy
|
||||
domains []string
|
||||
}
|
||||
|
||||
maxClientsConfig struct {
|
||||
|
@ -121,7 +125,7 @@ func newApp(ctx context.Context, log *Logger, v *viper.Viper) *App {
|
|||
objPool, treePool, key := getPools(ctx, log.logger, v)
|
||||
|
||||
cfg := tokens.Config{
|
||||
FrostFS: frostfs.NewAuthmateFrostFS(frostfs.NewFrostFS(objPool, key)),
|
||||
FrostFS: frostfs.NewAuthmateFrostFS(frostfs.NewFrostFS(objPool, key), log.logger),
|
||||
Key: key,
|
||||
CacheConfig: getAccessBoxCacheConfig(v, log.logger),
|
||||
RemovingCheckAfterDurations: fetchRemovingCheckInterval(v, log.logger),
|
||||
|
@ -153,13 +157,13 @@ func (a *App) init(ctx context.Context) {
|
|||
a.setRuntimeParameters()
|
||||
a.initFrostfsID(ctx)
|
||||
a.initPolicyStorage(ctx)
|
||||
a.initAPI()
|
||||
a.initAPI(ctx)
|
||||
a.initMetrics()
|
||||
a.initServers(ctx)
|
||||
a.initTracing(ctx)
|
||||
}
|
||||
|
||||
func (a *App) initLayer() {
|
||||
func (a *App) initLayer(ctx context.Context) {
|
||||
a.initResolver()
|
||||
|
||||
// prepare random key for anonymous requests
|
||||
|
@ -171,6 +175,14 @@ func (a *App) initLayer() {
|
|||
var gateOwner user.ID
|
||||
user.IDFromKey(&gateOwner, a.key.PrivateKey.PublicKey)
|
||||
|
||||
var corsCnrInfo *data.BucketInfo
|
||||
if a.cfg.IsSet(cfgContainersCORS) {
|
||||
corsCnrInfo, err = a.fetchContainerInfo(ctx, cfgContainersCORS)
|
||||
if err != nil {
|
||||
a.log.Fatal(logs.CouldNotFetchCORSContainerInfo, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
layerCfg := &layer.Config{
|
||||
Cache: layer.NewCache(getCacheOptions(a.cfg, a.log)),
|
||||
AnonKey: layer.AnonymousKey{
|
||||
|
@ -180,6 +192,8 @@ func (a *App) initLayer() {
|
|||
Resolver: a.bucketResolver,
|
||||
TreeService: tree.NewTree(services.NewPoolWrapper(a.treePool), a.log),
|
||||
Features: a.settings,
|
||||
GateKey: a.key,
|
||||
CORSCnrInfo: corsCnrInfo,
|
||||
}
|
||||
|
||||
// prepare object layer
|
||||
|
@ -218,6 +232,7 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
|
|||
s.setRetryMaxAttempts(fetchRetryMaxAttempts(v))
|
||||
s.setRetryMaxBackoff(fetchRetryMaxBackoff(v))
|
||||
s.setRetryStrategy(fetchRetryStrategy(v))
|
||||
s.setVHSSettings(v, log)
|
||||
}
|
||||
|
||||
func (s *appSettings) updateNamespacesSettings(v *viper.Viper, log *zap.Logger) {
|
||||
|
@ -232,6 +247,15 @@ func (s *appSettings) updateNamespacesSettings(v *viper.Viper, log *zap.Logger)
|
|||
s.namespaces = nsConfig.Namespaces
|
||||
}
|
||||
|
||||
func (s *appSettings) setVHSSettings(v *viper.Viper, _ *zap.Logger) {
|
||||
domains := v.GetStringSlice(cfgListenDomains)
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
s.domains = domains
|
||||
}
|
||||
|
||||
func (s *appSettings) BypassContentEncodingInChunks() bool {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
@ -434,8 +458,14 @@ func (s *appSettings) RetryStrategy() handler.RetryStrategy {
|
|||
return s.retryStrategy
|
||||
}
|
||||
|
||||
func (a *App) initAPI() {
|
||||
a.initLayer()
|
||||
func (s *appSettings) Domains() []string {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.domains
|
||||
}
|
||||
|
||||
func (a *App) initAPI(ctx context.Context) {
|
||||
a.initLayer(ctx)
|
||||
a.initHandler()
|
||||
}
|
||||
|
||||
|
@ -443,7 +473,6 @@ func (a *App) initMetrics() {
|
|||
cfg := metrics.AppMetricsConfig{
|
||||
Logger: a.log,
|
||||
PoolStatistics: frostfs.NewPoolStatistic(a.pool),
|
||||
TreeStatistic: a.treePool,
|
||||
Enabled: a.cfg.GetBool(cfgPrometheusEnabled),
|
||||
}
|
||||
|
||||
|
@ -672,8 +701,7 @@ func (a *App) setHealthStatus() {
|
|||
// Serve runs HTTP server to handle S3 API requests.
|
||||
func (a *App) Serve(ctx context.Context) {
|
||||
// Attach S3 API:
|
||||
domains := a.cfg.GetStringSlice(cfgListenDomains)
|
||||
a.log.Info(logs.FetchDomainsPrepareToUseAPI, zap.Strings("domains", domains))
|
||||
a.log.Info(logs.FetchDomainsPrepareToUseAPI, zap.Strings("domains", a.settings.Domains()))
|
||||
|
||||
cfg := api.Config{
|
||||
Throttle: middleware.ThrottleOpts{
|
||||
|
@ -684,7 +712,7 @@ func (a *App) Serve(ctx context.Context) {
|
|||
Center: a.ctr,
|
||||
Log: a.log,
|
||||
Metrics: a.metrics,
|
||||
Domains: domains,
|
||||
Domains: a.settings.Domains(),
|
||||
|
||||
MiddlewareSettings: a.settings,
|
||||
PolicyChecker: a.policyStorage,
|
||||
|
@ -1035,3 +1063,32 @@ func (a *App) tryReconnect(ctx context.Context, sr *http.Server) bool {
|
|||
|
||||
return len(a.unbindServers) == 0
|
||||
}
|
||||
|
||||
func (a *App) fetchContainerInfo(ctx context.Context, cfgKey string) (info *data.BucketInfo, err error) {
|
||||
containerString := a.cfg.GetString(cfgKey)
|
||||
|
||||
var id cid.ID
|
||||
if err = id.DecodeString(containerString); err != nil {
|
||||
if id, err = a.bucketResolver.Resolve(ctx, containerString); err != nil {
|
||||
return nil, fmt.Errorf("resolve container name %s: %w", containerString, err)
|
||||
}
|
||||
}
|
||||
|
||||
return getContainerInfo(ctx, id, a.pool)
|
||||
}
|
||||
|
||||
func getContainerInfo(ctx context.Context, id cid.ID, frostFSPool *pool.Pool) (*data.BucketInfo, error) {
|
||||
prm := pool.PrmContainerGet{
|
||||
ContainerID: id,
|
||||
}
|
||||
|
||||
res, err := frostFSPool.GetContainer(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &data.BucketInfo{
|
||||
CID: id,
|
||||
HomomorphicHashDisabled: container.IsHomomorphicHashingDisabled(res),
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -176,6 +176,9 @@ const ( // Settings.
|
|||
|
||||
cfgSourceIPHeader = "source_ip_header"
|
||||
|
||||
// Containers.
|
||||
cfgContainersCORS = "containers.cors"
|
||||
|
||||
// Command line args.
|
||||
cmdHelp = "help"
|
||||
cmdVersion = "version"
|
||||
|
|
|
@ -216,3 +216,5 @@ S3_GW_RETRY_MAX_BACKOFF=30s
|
|||
# Backoff strategy. `exponential` and `constant` are allowed.
|
||||
S3_GW_RETRY_STRATEGY=exponential
|
||||
|
||||
# Containers properties
|
||||
S3_GW_CONTAINERS_CORS=AZjLTXfK4vs4ovxMic2xEJKSymMNLqdwq9JT64ASFCRj
|
||||
|
|
|
@ -252,3 +252,7 @@ retry:
|
|||
max_backoff: 30s
|
||||
# Backoff strategy. `exponential` and `constant` are allowed.
|
||||
strategy: exponential
|
||||
|
||||
# Containers properties
|
||||
containers:
|
||||
cors: AZjLTXfK4vs4ovxMic2xEJKSymMNLqdwq9JT64ASFCRj
|
||||
|
|
|
@ -92,6 +92,7 @@ type FrostFS interface {
|
|||
//
|
||||
// It returns exactly one non-nil value. It returns any error encountered which
|
||||
// prevented the object payload from being read.
|
||||
// Object must contain full payload.
|
||||
GetCredsObject(context.Context, oid.Address) (*object.Object, error)
|
||||
}
|
||||
|
||||
|
|
|
@ -192,6 +192,7 @@ There are some custom types used for brevity:
|
|||
| `proxy` | [Proxy contract configuration](#proxy-section) |
|
||||
| `namespaces` | [Namespaces configuration](#namespaces-section) |
|
||||
| `retry` | [Retry configuration](#retry-section) |
|
||||
| `containers` | [Containers configuration](#containers-section) |
|
||||
|
||||
### General section
|
||||
|
||||
|
@ -708,3 +709,15 @@ retry:
|
|||
| `max_backoff` | `duration` | yes | `30s` | Max delay before next attempt. |
|
||||
| `strategy` | `string` | yes | `exponential` | Backoff strategy. `exponential` and `constant` are allowed. |
|
||||
|
||||
# `containers` section
|
||||
|
||||
Section for well-known containers to store s3-related data and settings.
|
||||
|
||||
```yaml
|
||||
containers:
|
||||
cors: AZjLTXfK4vs4ovxMic2xEJKSymMNLqdwq9JT64ASFCRj
|
||||
```
|
||||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|-----------|----------|---------------|---------------|--------------------------------------------------------------------------------------|
|
||||
| `cors` | `string` | no | | Container name for CORS configurations. If not set, container of the bucket is used. |
|
||||
|
|
52
docs/extensions.md
Normal file
52
docs/extensions.md
Normal file
|
@ -0,0 +1,52 @@
|
|||
# S3 API Extension
|
||||
|
||||
## Bucket operations management
|
||||
|
||||
### Action to delete bucket (DeleteBucket)
|
||||
|
||||
Deletes bucket with all objects in it.
|
||||
|
||||
#### Request Parameters
|
||||
|
||||
- **Bucket**
|
||||
|
||||
Specifies the bucket being deleted.
|
||||
|
||||
|
||||
#### Errors
|
||||
|
||||
- **NoSuchEntity**
|
||||
|
||||
The request was rejected because it referenced a resource entity that does not exist.
|
||||
|
||||
HTTP Status Code: 404
|
||||
|
||||
- **ServiceFailure**
|
||||
|
||||
The request processing has failed because of an unknown error, exception or failure.
|
||||
|
||||
HTTP Status Code: 500
|
||||
|
||||
|
||||
#### Example
|
||||
|
||||
Sample Request
|
||||
|
||||
```text
|
||||
DELETE / HTTP/1.1
|
||||
X-Amz-Force-Delete-Bucket: true
|
||||
Host: data.s3.<Region>.frostfs-s3-gw.com
|
||||
Date: Wed, 01 Mar 2024 12:00:00 GMT
|
||||
Authorization: authorization string
|
||||
```
|
||||
|
||||
Sample Response
|
||||
|
||||
```text
|
||||
HTTP/1.1 204 No Content
|
||||
x-amz-id-2: JuKZqmXuiwFeDQxhD7M8KtsKobSzWA1QEjLbTMTagkKdBX2z7Il/jGhDeJ3j6s80
|
||||
x-amz-request-id: 32FE2CEB32F5EE25
|
||||
Date: Wed, 01 Mar 2006 12:00:00 GMT
|
||||
Connection: close
|
||||
Server: AmazonS3
|
||||
```
|
4
go.mod
4
go.mod
|
@ -3,7 +3,7 @@ module git.frostfs.info/TrueCloudLab/frostfs-s3-gw
|
|||
go 1.21
|
||||
|
||||
require (
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240726072425-3dfa2f4fd65e
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240716113920-f517e3949164
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240722121227-fa89999d919c
|
||||
|
@ -36,8 +36,6 @@ require (
|
|||
google.golang.org/protobuf v1.33.0
|
||||
)
|
||||
|
||||
replace git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240722121227-fa89999d919c => git.frostfs.info/mbiryukova/frostfs-sdk-go v0.0.0-20240731133113-04024af80f29
|
||||
|
||||
require (
|
||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 // indirect
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 // indirect
|
||||
|
|
8
go.sum
8
go.sum
|
@ -36,14 +36,16 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
|
|||
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
|
||||
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
|
||||
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240726072425-3dfa2f4fd65e h1:gEWT+70E/RvGkxtSv+PlyUN2vtJVymhQa1mypvrXukM=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240726072425-3dfa2f4fd65e/go.mod h1:OBDSr+DqV1z4VDouoX3YMleNc4DPBVBWTG3WDT2PK1o=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240716113920-f517e3949164 h1:XxvwQKJT/f16qS3df5PBQPRYKkhy0/A7zH6644QpKD0=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-api-go/v2 v2.16.1-0.20240716113920-f517e3949164/go.mod h1:OBDSr+DqV1z4VDouoX3YMleNc4DPBVBWTG3WDT2PK1o=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e h1:kcBqZBiFIUBATUqEuvVigtkJJWQ2Gug/eYXn967o3M4=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-contract v0.19.3-0.20240621131249-49e5270f673e/go.mod h1:F/fe1OoIDKr5Bz99q4sriuHDuf3aZefZy9ZsCqEtgxc=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0 h1:FxqFDhQYYgpe41qsIHVOcdzSVCB8JNSfPG7Uk4r2oSk=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-crypto v0.6.0/go.mod h1:RUIKZATQLJ+TaYQa60X2fTDwfuhMfm8Ar60bQ5fr+vU=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6 h1:aGQ6QaAnTerQ5Dq5b2/f9DUQtSqPkZZ/bkMx/HKuLCo=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-observability v0.0.0-20230531082742-c97d21411eb6/go.mod h1:W8Nn08/l6aQ7UlIbpF7FsQou7TVpcRD1ZT1KG4TrFhE=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240722121227-fa89999d919c h1:8ZS6eUFnOhzUo9stFqwq1Zyq+Y5YNcYAidCGICcZVL4=
|
||||
git.frostfs.info/TrueCloudLab/frostfs-sdk-go v0.0.0-20240722121227-fa89999d919c/go.mod h1:vluJ/+yQMcq8ZIZZSA7Te+JKClr0lgtRErjICvb8wto=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1 h1:ccBRK21rFvY5R1WotI6LNoPlizk7qSvdfD8lNIRudVc=
|
||||
git.frostfs.info/TrueCloudLab/hrw v1.2.1/go.mod h1:C1Ygde2n843yTZEQ0FP69jYiuaYV0kriLvP4zm8JuvM=
|
||||
git.frostfs.info/TrueCloudLab/policy-engine v0.0.0-20240611102930-ac965e8d176a h1:Bk1fB4cQASPKgAVGCdlBOEp5ohZfDxqK6fZM8eP+Emo=
|
||||
|
@ -54,8 +56,6 @@ git.frostfs.info/TrueCloudLab/tzhash v1.8.0 h1:UFMnUIk0Zh17m8rjGHJMqku2hCgaXDqjq
|
|||
git.frostfs.info/TrueCloudLab/tzhash v1.8.0/go.mod h1:dhY+oy274hV8wGvGL4MwwMpdL3GYvaX1a8GQZQHvlF8=
|
||||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02 h1:HeY8n27VyPRQe49l/fzyVMkWEB2fsLJYKp64pwA7tz4=
|
||||
git.frostfs.info/TrueCloudLab/zapjournald v0.0.0-20240124114243-cb2e66427d02/go.mod h1:rQFJJdEOV7KbbMtQYR2lNfiZk+ONRDJSbMCTWxKt8Fw=
|
||||
git.frostfs.info/mbiryukova/frostfs-sdk-go v0.0.0-20240731133113-04024af80f29 h1:sYkxbvaKd9Q9flezuxy21Tr3Of+4Y+mtXFk6+JxFRHw=
|
||||
git.frostfs.info/mbiryukova/frostfs-sdk-go v0.0.0-20240731133113-04024af80f29/go.mod h1:DlJmgV4/qkFkx2ab+YWznlMijiF2yZHnrJswJOB7XGs=
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
|
||||
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
|
||||
|
|
|
@ -4,18 +4,22 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
objectv2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/authmate"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/creds/tokens"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/frostfs/crdt"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/acl"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -25,11 +29,12 @@ const (
|
|||
// AuthmateFrostFS is a mediator which implements authmate.FrostFS through pool.Pool.
|
||||
type AuthmateFrostFS struct {
|
||||
frostFS layer.FrostFS
|
||||
log *zap.Logger
|
||||
}
|
||||
|
||||
// NewAuthmateFrostFS creates new AuthmateFrostFS using provided pool.Pool.
|
||||
func NewAuthmateFrostFS(frostFS layer.FrostFS) *AuthmateFrostFS {
|
||||
return &AuthmateFrostFS{frostFS: frostFS}
|
||||
func NewAuthmateFrostFS(frostFS layer.FrostFS, log *zap.Logger) *AuthmateFrostFS {
|
||||
return &AuthmateFrostFS{frostFS: frostFS, log: log}
|
||||
}
|
||||
|
||||
// ContainerExists implements authmate.FrostFS interface method.
|
||||
|
@ -79,17 +84,27 @@ func (x *AuthmateFrostFS) GetCredsObject(ctx context.Context, addr oid.Address)
|
|||
credObjID = last.ObjID
|
||||
}
|
||||
|
||||
res, err := x.frostFS.ReadObject(ctx, layer.PrmObjectRead{
|
||||
Container: addr.Container(),
|
||||
Object: credObjID,
|
||||
WithPayload: true,
|
||||
WithHeader: true,
|
||||
res, err := x.frostFS.GetObject(ctx, layer.PrmObjectGet{
|
||||
Container: addr.Container(),
|
||||
Object: credObjID,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.Head, err
|
||||
defer func() {
|
||||
if closeErr := res.Payload.Close(); closeErr != nil {
|
||||
x.reqLogger(ctx).Warn(logs.CloseCredsObjectPayload, zap.Error(closeErr))
|
||||
}
|
||||
}()
|
||||
|
||||
data, err := io.ReadAll(res.Payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res.Header.SetPayload(data)
|
||||
|
||||
return &res.Header, err
|
||||
}
|
||||
|
||||
// CreateObject implements authmate.FrostFS interface method.
|
||||
|
@ -143,21 +158,28 @@ func (x *AuthmateFrostFS) getCredVersions(ctx context.Context, addr oid.Address)
|
|||
versions := crdt.NewObjectVersions(objCredSystemName)
|
||||
|
||||
for _, id := range credVersions {
|
||||
objVersion, err := x.frostFS.ReadObject(ctx, layer.PrmObjectRead{
|
||||
Container: addr.Container(),
|
||||
Object: id,
|
||||
WithHeader: true,
|
||||
objVersion, err := x.frostFS.HeadObject(ctx, layer.PrmObjectHead{
|
||||
Container: addr.Container(),
|
||||
Object: id,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("head crdt access box '%s': %w", id.EncodeToString(), err)
|
||||
}
|
||||
|
||||
versions.AppendVersion(crdt.NewObjectVersion(objVersion.Head))
|
||||
versions.AppendVersion(crdt.NewObjectVersion(objVersion))
|
||||
}
|
||||
|
||||
return versions, nil
|
||||
}
|
||||
|
||||
func (x *AuthmateFrostFS) reqLogger(ctx context.Context) *zap.Logger {
|
||||
reqLogger := middleware.GetReqLog(ctx)
|
||||
if reqLogger != nil {
|
||||
return reqLogger
|
||||
}
|
||||
return x.log
|
||||
}
|
||||
|
||||
func credVersionSysName(cnrID cid.ID, objID oid.ID) string {
|
||||
return cnrID.EncodeToString() + "0" + objID.EncodeToString()
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.uber.org/zap/zaptest"
|
||||
)
|
||||
|
||||
func TestGetCredsObject(t *testing.T) {
|
||||
|
@ -35,7 +36,7 @@ func TestGetCredsObject(t *testing.T) {
|
|||
},
|
||||
}})
|
||||
|
||||
frostfs := NewAuthmateFrostFS(layer.NewTestFrostFS(key))
|
||||
frostfs := NewAuthmateFrostFS(layer.NewTestFrostFS(key), zaptest.NewLogger(t))
|
||||
|
||||
cid, err := frostfs.CreateContainer(ctx, authmate.PrmContainerCreate{
|
||||
FriendlyName: bktName,
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"runtime/trace"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
|
@ -54,9 +53,6 @@ func NewFrostFS(p *pool.Pool, key *keys.PrivateKey) *FrostFS {
|
|||
|
||||
// TimeToEpoch implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (uint64, uint64, error) {
|
||||
reg := trace.StartRegion(ctx, "ffs:TimeToEpoch")
|
||||
defer reg.End()
|
||||
|
||||
dur := futureTime.Sub(now)
|
||||
if dur < 0 {
|
||||
return 0, 0, fmt.Errorf("time '%s' must be in the future (after %s)",
|
||||
|
@ -93,9 +89,6 @@ func (x *FrostFS) TimeToEpoch(ctx context.Context, now, futureTime time.Time) (u
|
|||
|
||||
// Container implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) Container(ctx context.Context, layerPrm layer.PrmContainer) (*container.Container, error) {
|
||||
reg := trace.StartRegion(ctx, "ffs:Container")
|
||||
defer reg.End()
|
||||
|
||||
prm := pool.PrmContainerGet{
|
||||
ContainerID: layerPrm.ContainerID,
|
||||
Session: layerPrm.SessionToken,
|
||||
|
@ -111,9 +104,6 @@ func (x *FrostFS) Container(ctx context.Context, layerPrm layer.PrmContainer) (*
|
|||
|
||||
// CreateContainer implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCreate) (*layer.ContainerCreateResult, error) {
|
||||
reg := trace.StartRegion(ctx, "ffs:CreateContainer")
|
||||
defer reg.End()
|
||||
|
||||
var cnr container.Container
|
||||
cnr.Init()
|
||||
cnr.SetPlacementPolicy(prm.Policy)
|
||||
|
@ -162,9 +152,6 @@ func (x *FrostFS) CreateContainer(ctx context.Context, prm layer.PrmContainerCre
|
|||
|
||||
// UserContainers implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) UserContainers(ctx context.Context, layerPrm layer.PrmUserContainers) ([]cid.ID, error) {
|
||||
reg := trace.StartRegion(ctx, "ffs:UserContainers")
|
||||
defer reg.End()
|
||||
|
||||
prm := pool.PrmContainerList{
|
||||
OwnerID: layerPrm.UserID,
|
||||
Session: layerPrm.SessionToken,
|
||||
|
@ -176,9 +163,6 @@ func (x *FrostFS) UserContainers(ctx context.Context, layerPrm layer.PrmUserCont
|
|||
|
||||
// DeleteContainer implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session.Container) error {
|
||||
reg := trace.StartRegion(ctx, "ffs:DeleteContainer")
|
||||
defer reg.End()
|
||||
|
||||
prm := pool.PrmContainerDelete{ContainerID: id, Session: token, WaitParams: &x.await}
|
||||
|
||||
err := x.pool.DeleteContainer(ctx, prm)
|
||||
|
@ -187,9 +171,6 @@ func (x *FrostFS) DeleteContainer(ctx context.Context, id cid.ID, token *session
|
|||
|
||||
// CreateObject implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oid.ID, error) {
|
||||
reg := trace.StartRegion(ctx, "ffs:CreateObject")
|
||||
defer reg.End()
|
||||
|
||||
attrNum := len(prm.Attributes) + 1 // + creation time
|
||||
|
||||
if prm.Filepath != "" {
|
||||
|
@ -278,11 +259,31 @@ func (x payloadReader) Read(p []byte) (int, error) {
|
|||
return n, handleObjectError("read payload", err)
|
||||
}
|
||||
|
||||
// ReadObject implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*layer.ObjectPart, error) {
|
||||
reg := trace.StartRegion(ctx, "ffs:ReadObject")
|
||||
defer reg.End()
|
||||
// HeadObject implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) HeadObject(ctx context.Context, prm layer.PrmObjectHead) (*object.Object, error) {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(prm.Container)
|
||||
addr.SetObject(prm.Object)
|
||||
|
||||
var prmHead pool.PrmObjectHead
|
||||
prmHead.SetAddress(addr)
|
||||
|
||||
if prm.BearerToken != nil {
|
||||
prmHead.UseBearer(*prm.BearerToken)
|
||||
} else {
|
||||
prmHead.UseKey(prm.PrivateKey)
|
||||
}
|
||||
|
||||
res, err := x.pool.HeadObject(ctx, prmHead)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("read object header via connection pool", err)
|
||||
}
|
||||
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
// GetObject implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) GetObject(ctx context.Context, prm layer.PrmObjectGet) (*layer.Object, error) {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(prm.Container)
|
||||
addr.SetObject(prm.Object)
|
||||
|
@ -296,55 +297,23 @@ func (x *FrostFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*lay
|
|||
prmGet.UseKey(prm.PrivateKey)
|
||||
}
|
||||
|
||||
if prm.WithHeader {
|
||||
if prm.WithPayload {
|
||||
res, err := x.pool.GetObject(ctx, prmGet)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("init full object reading via connection pool", err)
|
||||
}
|
||||
|
||||
defer res.Payload.Close()
|
||||
|
||||
payload, err := io.ReadAll(res.Payload)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("read full object payload", err)
|
||||
}
|
||||
|
||||
res.Header.SetPayload(payload)
|
||||
|
||||
return &layer.ObjectPart{
|
||||
Head: &res.Header,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var prmHead pool.PrmObjectHead
|
||||
prmHead.SetAddress(addr)
|
||||
|
||||
if prm.BearerToken != nil {
|
||||
prmHead.UseBearer(*prm.BearerToken)
|
||||
} else {
|
||||
prmHead.UseKey(prm.PrivateKey)
|
||||
}
|
||||
|
||||
hdr, err := x.pool.HeadObject(ctx, prmHead)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("read object header via connection pool", err)
|
||||
}
|
||||
|
||||
return &layer.ObjectPart{
|
||||
Head: &hdr,
|
||||
}, nil
|
||||
} else if prm.PayloadRange[0]+prm.PayloadRange[1] == 0 {
|
||||
res, err := x.pool.GetObject(ctx, prmGet)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("init full payload range reading via connection pool", err)
|
||||
}
|
||||
|
||||
return &layer.ObjectPart{
|
||||
Payload: res.Payload,
|
||||
}, nil
|
||||
res, err := x.pool.GetObject(ctx, prmGet)
|
||||
if err != nil {
|
||||
return nil, handleObjectError("init full object reading via connection pool", err)
|
||||
}
|
||||
|
||||
return &layer.Object{
|
||||
Header: res.Header,
|
||||
Payload: res.Payload,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// RangeObject implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) RangeObject(ctx context.Context, prm layer.PrmObjectRange) (io.ReadCloser, error) {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(prm.Container)
|
||||
addr.SetObject(prm.Object)
|
||||
|
||||
var prmRange pool.PrmObjectRange
|
||||
prmRange.SetAddress(addr)
|
||||
prmRange.SetOffset(prm.PayloadRange[0])
|
||||
|
@ -361,16 +330,11 @@ func (x *FrostFS) ReadObject(ctx context.Context, prm layer.PrmObjectRead) (*lay
|
|||
return nil, handleObjectError("init payload range reading via connection pool", err)
|
||||
}
|
||||
|
||||
return &layer.ObjectPart{
|
||||
Payload: payloadReader{&res},
|
||||
}, nil
|
||||
return payloadReader{&res}, nil
|
||||
}
|
||||
|
||||
// DeleteObject implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) error {
|
||||
reg := trace.StartRegion(ctx, "ffs:DeleteObject")
|
||||
defer reg.End()
|
||||
|
||||
var addr oid.Address
|
||||
addr.SetContainer(prm.Container)
|
||||
addr.SetObject(prm.Object)
|
||||
|
@ -390,9 +354,6 @@ func (x *FrostFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) e
|
|||
|
||||
// SearchObjects implements frostfs.FrostFS interface method.
|
||||
func (x *FrostFS) SearchObjects(ctx context.Context, prm layer.PrmObjectSearch) ([]oid.ID, error) {
|
||||
reg := trace.StartRegion(ctx, "ffs:SearchObject")
|
||||
defer reg.End()
|
||||
|
||||
filters := object.NewSearchFilters()
|
||||
filters.AddRootFilter()
|
||||
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"runtime/trace"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||
|
@ -79,9 +78,6 @@ func NewPoolWrapper(p *treepool.Pool) *PoolWrapper {
|
|||
}
|
||||
|
||||
func (w *PoolWrapper) GetNodes(ctx context.Context, prm *tree.GetNodesParams) ([]tree.NodeResponse, error) {
|
||||
reg := trace.StartRegion(ctx, "tree:GetNodes")
|
||||
defer reg.End()
|
||||
|
||||
poolPrm := treepool.GetNodesParams{
|
||||
CID: prm.BktInfo.CID,
|
||||
TreeID: prm.TreeID,
|
||||
|
@ -107,9 +103,6 @@ func (w *PoolWrapper) GetNodes(ctx context.Context, prm *tree.GetNodesParams) ([
|
|||
}
|
||||
|
||||
func (w *PoolWrapper) GetSubTree(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) ([]tree.NodeResponse, error) {
|
||||
reg := trace.StartRegion(ctx, "tree:GetSubTree")
|
||||
defer reg.End()
|
||||
|
||||
poolPrm := treepool.GetSubTreeParams{
|
||||
CID: bktInfo.CID,
|
||||
TreeID: treeID,
|
||||
|
@ -184,9 +177,6 @@ func (s *SubTreeStreamImpl) Next() (tree.NodeResponse, error) {
|
|||
}
|
||||
|
||||
func (w *PoolWrapper) GetSubTreeStream(ctx context.Context, bktInfo *data.BucketInfo, treeID string, rootID []uint64, depth uint32) (tree.SubTreeStream, error) {
|
||||
reg := trace.StartRegion(ctx, "tree:GetSubTreeStream")
|
||||
defer reg.End()
|
||||
|
||||
poolPrm := treepool.GetSubTreeParams{
|
||||
CID: bktInfo.CID,
|
||||
TreeID: treeID,
|
||||
|
@ -216,9 +206,6 @@ func (w *PoolWrapper) GetSubTreeStream(ctx context.Context, bktInfo *data.Bucket
|
|||
}
|
||||
|
||||
func (w *PoolWrapper) AddNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, parent uint64, meta map[string]string) (uint64, error) {
|
||||
reg := trace.StartRegion(ctx, "tree:AddNode")
|
||||
defer reg.End()
|
||||
|
||||
nodeID, err := w.p.AddNode(ctx, treepool.AddNodeParams{
|
||||
CID: bktInfo.CID,
|
||||
TreeID: treeID,
|
||||
|
@ -230,9 +217,6 @@ func (w *PoolWrapper) AddNode(ctx context.Context, bktInfo *data.BucketInfo, tre
|
|||
}
|
||||
|
||||
func (w *PoolWrapper) AddNodeByPath(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path []string, meta map[string]string) (uint64, error) {
|
||||
reg := trace.StartRegion(ctx, "tree:AddNodeByPath")
|
||||
defer reg.End()
|
||||
|
||||
nodeID, err := w.p.AddNodeByPath(ctx, treepool.AddNodeByPathParams{
|
||||
CID: bktInfo.CID,
|
||||
TreeID: treeID,
|
||||
|
@ -245,9 +229,6 @@ func (w *PoolWrapper) AddNodeByPath(ctx context.Context, bktInfo *data.BucketInf
|
|||
}
|
||||
|
||||
func (w *PoolWrapper) MoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID, parentID uint64, meta map[string]string) error {
|
||||
reg := trace.StartRegion(ctx, "tree:MoveNode")
|
||||
defer reg.End()
|
||||
|
||||
return handleError(w.p.MoveNode(ctx, treepool.MoveNodeParams{
|
||||
CID: bktInfo.CID,
|
||||
TreeID: treeID,
|
||||
|
@ -259,9 +240,6 @@ func (w *PoolWrapper) MoveNode(ctx context.Context, bktInfo *data.BucketInfo, tr
|
|||
}
|
||||
|
||||
func (w *PoolWrapper) RemoveNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, nodeID uint64) error {
|
||||
reg := trace.StartRegion(ctx, "tree:RemoveNode")
|
||||
defer reg.End()
|
||||
|
||||
return handleError(w.p.RemoveNode(ctx, treepool.RemoveNodeParams{
|
||||
CID: bktInfo.CID,
|
||||
TreeID: treeID,
|
||||
|
|
|
@ -148,7 +148,10 @@ const (
|
|||
BucketCORSNodeHasMultipleIDs = "bucket cors node has multiple ids"
|
||||
SystemNodeHasMultipleIDs = "system node has multiple ids"
|
||||
FailedToRemoveOldSystemNode = "failed to remove old system node"
|
||||
FailedToParseAddressInTreeNode = "failed to parse object addr in tree node"
|
||||
UnexpectedMultiNodeIDsInSubTreeMultiParts = "unexpected multi node ids in sub tree multi parts"
|
||||
FoundSeveralSystemNodes = "found several system nodes"
|
||||
FailedToParsePartInfo = "failed to parse part info"
|
||||
CouldNotFetchCORSContainerInfo = "couldn't fetch CORS container info"
|
||||
CloseCredsObjectPayload = "close creds object payload"
|
||||
)
|
||||
|
|
|
@ -20,7 +20,6 @@ type AppMetrics struct {
|
|||
type AppMetricsConfig struct {
|
||||
Logger *zap.Logger
|
||||
PoolStatistics StatisticScraper
|
||||
TreeStatistic TreePoolStatistic
|
||||
Registerer prometheus.Registerer
|
||||
Enabled bool
|
||||
}
|
||||
|
@ -37,7 +36,7 @@ func NewAppMetrics(cfg AppMetricsConfig) *AppMetrics {
|
|||
|
||||
return &AppMetrics{
|
||||
logger: cfg.Logger,
|
||||
gate: NewGateMetrics(cfg.PoolStatistics, cfg.TreeStatistic, registry),
|
||||
gate: NewGateMetrics(cfg.PoolStatistics, registry),
|
||||
enabled: cfg.Enabled,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,14 +48,6 @@ var appMetricsDesc = map[string]map[string]Description{
|
|||
Help: "Average request duration (in milliseconds) for specific method on node in pool",
|
||||
VariableLabels: []string{"node", "method"},
|
||||
},
|
||||
interAvgRequestDurationMetric: Description{
|
||||
Type: dto.MetricType_GAUGE,
|
||||
Namespace: namespace,
|
||||
Subsystem: poolSubsystem,
|
||||
Name: interAvgRequestDurationMetric,
|
||||
Help: "Intermediate average request duration (in milliseconds) for specific method on node in pool",
|
||||
VariableLabels: []string{"node", "method"},
|
||||
},
|
||||
currentNodesMetric: Description{
|
||||
Type: dto.MetricType_GAUGE,
|
||||
Namespace: namespace,
|
||||
|
@ -152,24 +144,6 @@ var appMetricsDesc = map[string]map[string]Description{
|
|||
VariableLabels: []string{"endpoint"},
|
||||
},
|
||||
},
|
||||
treePoolSubsystem: {
|
||||
avgRequestDurationMetric: Description{
|
||||
Type: dto.MetricType_GAUGE,
|
||||
Namespace: namespace,
|
||||
Subsystem: treePoolSubsystem,
|
||||
Name: avgRequestDurationMetric,
|
||||
Help: "Average request duration (in milliseconds) for specific method in tree pool",
|
||||
VariableLabels: []string{"method"},
|
||||
},
|
||||
interAvgRequestDurationMetric: Description{
|
||||
Type: dto.MetricType_GAUGE,
|
||||
Namespace: namespace,
|
||||
Subsystem: treePoolSubsystem,
|
||||
Name: interAvgRequestDurationMetric,
|
||||
Help: "Intermediate average request duration (in milliseconds) for specific method in tree pool",
|
||||
VariableLabels: []string{"method"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
type Description struct {
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"net/http"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
|
@ -16,10 +15,6 @@ type StatisticScraper interface {
|
|||
Statistic() pool.Statistic
|
||||
}
|
||||
|
||||
type TreePoolStatistic interface {
|
||||
Statistic() tree.Statistic
|
||||
}
|
||||
|
||||
type GateMetrics struct {
|
||||
registry prometheus.Registerer
|
||||
State *StateMetrics
|
||||
|
@ -27,10 +22,9 @@ type GateMetrics struct {
|
|||
Billing *billingMetrics
|
||||
Stats *APIStatMetrics
|
||||
HTTPServer *httpServerMetrics
|
||||
TreePool *treePoolMetricsCollector
|
||||
}
|
||||
|
||||
func NewGateMetrics(scraper StatisticScraper, treeScraper TreePoolStatistic, registry prometheus.Registerer) *GateMetrics {
|
||||
func NewGateMetrics(scraper StatisticScraper, registry prometheus.Registerer) *GateMetrics {
|
||||
stateMetric := newStateMetrics()
|
||||
registry.MustRegister(stateMetric)
|
||||
|
||||
|
@ -46,9 +40,6 @@ func NewGateMetrics(scraper StatisticScraper, treeScraper TreePoolStatistic, reg
|
|||
serverMetric := newHTTPServerMetrics()
|
||||
registry.MustRegister(serverMetric)
|
||||
|
||||
treePoolMetric := newTreePoolMetricsCollector(treeScraper)
|
||||
registry.MustRegister(treePoolMetric)
|
||||
|
||||
return &GateMetrics{
|
||||
registry: registry,
|
||||
State: stateMetric,
|
||||
|
@ -56,7 +47,6 @@ func NewGateMetrics(scraper StatisticScraper, treeScraper TreePoolStatistic, reg
|
|||
Billing: billingMetric,
|
||||
Stats: statsMetric,
|
||||
HTTPServer: serverMetric,
|
||||
TreePool: treePoolMetric,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -10,13 +10,12 @@ const (
|
|||
)
|
||||
|
||||
const (
|
||||
overallErrorsMetric = "overall_errors"
|
||||
overallNodeErrorsMetric = "overall_node_errors"
|
||||
overallNodeRequestsMetric = "overall_node_requests"
|
||||
currentErrorMetric = "current_errors"
|
||||
avgRequestDurationMetric = "avg_request_duration"
|
||||
interAvgRequestDurationMetric = "inter_avg_request_duration"
|
||||
currentNodesMetric = "current_nodes"
|
||||
overallErrorsMetric = "overall_errors"
|
||||
overallNodeErrorsMetric = "overall_node_errors"
|
||||
overallNodeRequestsMetric = "overall_node_requests"
|
||||
currentErrorMetric = "current_errors"
|
||||
avgRequestDurationMetric = "avg_request_duration"
|
||||
currentNodesMetric = "current_nodes"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -38,26 +37,24 @@ const (
|
|||
)
|
||||
|
||||
type poolMetricsCollector struct {
|
||||
poolStatScraper StatisticScraper
|
||||
overallErrors prometheus.Gauge
|
||||
overallNodeErrors *prometheus.GaugeVec
|
||||
overallNodeRequests *prometheus.GaugeVec
|
||||
currentErrors *prometheus.GaugeVec
|
||||
requestDuration *prometheus.GaugeVec
|
||||
interRequestDuration *prometheus.GaugeVec
|
||||
currentNodes *prometheus.GaugeVec
|
||||
poolStatScraper StatisticScraper
|
||||
overallErrors prometheus.Gauge
|
||||
overallNodeErrors *prometheus.GaugeVec
|
||||
overallNodeRequests *prometheus.GaugeVec
|
||||
currentErrors *prometheus.GaugeVec
|
||||
requestDuration *prometheus.GaugeVec
|
||||
currentNodes *prometheus.GaugeVec
|
||||
}
|
||||
|
||||
func newPoolMetricsCollector(scraper StatisticScraper) *poolMetricsCollector {
|
||||
return &poolMetricsCollector{
|
||||
poolStatScraper: scraper,
|
||||
overallErrors: mustNewGauge(appMetricsDesc[poolSubsystem][overallErrorsMetric]),
|
||||
overallNodeErrors: mustNewGaugeVec(appMetricsDesc[poolSubsystem][overallNodeErrorsMetric]),
|
||||
overallNodeRequests: mustNewGaugeVec(appMetricsDesc[poolSubsystem][overallNodeRequestsMetric]),
|
||||
currentErrors: mustNewGaugeVec(appMetricsDesc[poolSubsystem][currentErrorMetric]),
|
||||
requestDuration: mustNewGaugeVec(appMetricsDesc[poolSubsystem][avgRequestDurationMetric]),
|
||||
interRequestDuration: mustNewGaugeVec(appMetricsDesc[poolSubsystem][interAvgRequestDurationMetric]),
|
||||
currentNodes: mustNewGaugeVec(appMetricsDesc[poolSubsystem][currentNodesMetric]),
|
||||
poolStatScraper: scraper,
|
||||
overallErrors: mustNewGauge(appMetricsDesc[poolSubsystem][overallErrorsMetric]),
|
||||
overallNodeErrors: mustNewGaugeVec(appMetricsDesc[poolSubsystem][overallNodeErrorsMetric]),
|
||||
overallNodeRequests: mustNewGaugeVec(appMetricsDesc[poolSubsystem][overallNodeRequestsMetric]),
|
||||
currentErrors: mustNewGaugeVec(appMetricsDesc[poolSubsystem][currentErrorMetric]),
|
||||
requestDuration: mustNewGaugeVec(appMetricsDesc[poolSubsystem][avgRequestDurationMetric]),
|
||||
currentNodes: mustNewGaugeVec(appMetricsDesc[poolSubsystem][currentNodesMetric]),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -68,7 +65,6 @@ func (m *poolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
|
|||
m.overallNodeRequests.Collect(ch)
|
||||
m.currentErrors.Collect(ch)
|
||||
m.requestDuration.Collect(ch)
|
||||
m.interRequestDuration.Collect(ch)
|
||||
m.currentNodes.Collect(ch)
|
||||
}
|
||||
|
||||
|
@ -78,7 +74,6 @@ func (m *poolMetricsCollector) Describe(descs chan<- *prometheus.Desc) {
|
|||
m.overallNodeRequests.Describe(descs)
|
||||
m.currentErrors.Describe(descs)
|
||||
m.requestDuration.Describe(descs)
|
||||
m.interRequestDuration.Describe(descs)
|
||||
m.currentNodes.Describe(descs)
|
||||
}
|
||||
|
||||
|
@ -89,7 +84,6 @@ func (m *poolMetricsCollector) updateStatistic() {
|
|||
m.overallNodeRequests.Reset()
|
||||
m.currentErrors.Reset()
|
||||
m.requestDuration.Reset()
|
||||
m.interRequestDuration.Reset()
|
||||
m.currentNodes.Reset()
|
||||
|
||||
for _, node := range stat.Nodes() {
|
||||
|
@ -123,20 +117,4 @@ func (m *poolMetricsCollector) updateRequestsDuration(node pool.NodeStatistic) {
|
|||
m.requestDuration.WithLabelValues(node.Address(), methodHeadObject).Set(float64(node.AverageHeadObject().Milliseconds()))
|
||||
m.requestDuration.WithLabelValues(node.Address(), methodRangeObject).Set(float64(node.AverageRangeObject().Milliseconds()))
|
||||
m.requestDuration.WithLabelValues(node.Address(), methodCreateSession).Set(float64(node.AverageCreateSession().Milliseconds()))
|
||||
|
||||
m.interRequestDuration.WithLabelValues(node.Address(), methodGetBalance).Set(float64(node.InterAverageGetBalance().Milliseconds()))
|
||||
m.interRequestDuration.WithLabelValues(node.Address(), methodPutContainer).Set(float64(node.InterAveragePutContainer().Milliseconds()))
|
||||
m.interRequestDuration.WithLabelValues(node.Address(), methodGetContainer).Set(float64(node.InterAverageGetContainer().Milliseconds()))
|
||||
m.interRequestDuration.WithLabelValues(node.Address(), methodListContainer).Set(float64(node.InterAverageListContainer().Milliseconds()))
|
||||
m.interRequestDuration.WithLabelValues(node.Address(), methodDeleteContainer).Set(float64(node.InterAverageDeleteContainer().Milliseconds()))
|
||||
m.interRequestDuration.WithLabelValues(node.Address(), methodGetContainerEacl).Set(float64(node.InterAverageGetContainerEACL().Milliseconds()))
|
||||
m.interRequestDuration.WithLabelValues(node.Address(), methodSetContainerEacl).Set(float64(node.InterAverageSetContainerEACL().Milliseconds()))
|
||||
m.interRequestDuration.WithLabelValues(node.Address(), methodEndpointInfo).Set(float64(node.InterAverageEndpointInfo().Milliseconds()))
|
||||
m.interRequestDuration.WithLabelValues(node.Address(), methodNetworkInfo).Set(float64(node.InterAverageNetworkInfo().Milliseconds()))
|
||||
m.interRequestDuration.WithLabelValues(node.Address(), methodPutObject).Set(float64(node.InterAveragePutObject().Milliseconds()))
|
||||
m.interRequestDuration.WithLabelValues(node.Address(), methodDeleteObject).Set(float64(node.InterAverageDeleteObject().Milliseconds()))
|
||||
m.interRequestDuration.WithLabelValues(node.Address(), methodGetObject).Set(float64(node.InterAverageGetObject().Milliseconds()))
|
||||
m.interRequestDuration.WithLabelValues(node.Address(), methodHeadObject).Set(float64(node.InterAverageHeadObject().Milliseconds()))
|
||||
m.interRequestDuration.WithLabelValues(node.Address(), methodRangeObject).Set(float64(node.InterAverageRangeObject().Milliseconds()))
|
||||
m.interRequestDuration.WithLabelValues(node.Address(), methodCreateSession).Set(float64(node.InterAverageCreateSession().Milliseconds()))
|
||||
}
|
||||
|
|
|
@ -1,62 +0,0 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
treePoolSubsystem = "tree_pool"
|
||||
|
||||
methodGetNodes = "get_nodes"
|
||||
methodGetSubTree = "get_sub_tree"
|
||||
methodAddNode = "add_node"
|
||||
methodAddNodeByPath = "add_node_by_path"
|
||||
methodMoveNode = "move_node"
|
||||
methodRemoveNode = "remove_node"
|
||||
)
|
||||
|
||||
type treePoolMetricsCollector struct {
|
||||
statScraper TreePoolStatistic
|
||||
requestDuration *prometheus.GaugeVec
|
||||
interRequestDuration *prometheus.GaugeVec
|
||||
}
|
||||
|
||||
func newTreePoolMetricsCollector(stat TreePoolStatistic) *treePoolMetricsCollector {
|
||||
return &treePoolMetricsCollector{
|
||||
statScraper: stat,
|
||||
requestDuration: mustNewGaugeVec(appMetricsDesc[treePoolSubsystem][avgRequestDurationMetric]),
|
||||
interRequestDuration: mustNewGaugeVec(appMetricsDesc[treePoolSubsystem][interAvgRequestDurationMetric]),
|
||||
}
|
||||
}
|
||||
|
||||
func (m *treePoolMetricsCollector) Collect(ch chan<- prometheus.Metric) {
|
||||
m.updateStatistic()
|
||||
m.requestDuration.Collect(ch)
|
||||
m.interRequestDuration.Collect(ch)
|
||||
}
|
||||
|
||||
func (m *treePoolMetricsCollector) Describe(descs chan<- *prometheus.Desc) {
|
||||
m.requestDuration.Describe(descs)
|
||||
m.interRequestDuration.Describe(descs)
|
||||
}
|
||||
|
||||
func (m *treePoolMetricsCollector) updateStatistic() {
|
||||
stat := m.statScraper.Statistic()
|
||||
|
||||
m.requestDuration.Reset()
|
||||
m.interRequestDuration.Reset()
|
||||
|
||||
m.requestDuration.WithLabelValues(methodGetNodes).Set(float64(stat.AverageGetNodes().Milliseconds()))
|
||||
m.requestDuration.WithLabelValues(methodGetSubTree).Set(float64(stat.AverageGetSubTree().Milliseconds()))
|
||||
m.requestDuration.WithLabelValues(methodAddNode).Set(float64(stat.AverageAddNode().Milliseconds()))
|
||||
m.requestDuration.WithLabelValues(methodAddNodeByPath).Set(float64(stat.AverageAddNodeByPath().Milliseconds()))
|
||||
m.requestDuration.WithLabelValues(methodMoveNode).Set(float64(stat.AverageMoveNode().Milliseconds()))
|
||||
m.requestDuration.WithLabelValues(methodRemoveNode).Set(float64(stat.AverageRemoveNode().Milliseconds()))
|
||||
|
||||
m.interRequestDuration.WithLabelValues(methodGetNodes).Set(float64(stat.InterAverageGetNodes().Milliseconds()))
|
||||
m.interRequestDuration.WithLabelValues(methodGetSubTree).Set(float64(stat.InterAverageGetSubTree().Milliseconds()))
|
||||
m.interRequestDuration.WithLabelValues(methodAddNode).Set(float64(stat.InterAverageAddNode().Milliseconds()))
|
||||
m.interRequestDuration.WithLabelValues(methodAddNodeByPath).Set(float64(stat.InterAverageAddNodeByPath().Milliseconds()))
|
||||
m.interRequestDuration.WithLabelValues(methodMoveNode).Set(float64(stat.InterAverageMoveNode().Milliseconds()))
|
||||
m.interRequestDuration.WithLabelValues(methodRemoveNode).Set(float64(stat.InterAverageRemoveNode().Milliseconds()))
|
||||
}
|
|
@ -15,6 +15,7 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
|
@ -89,6 +90,7 @@ const (
|
|||
ownerKeyKV = "ownerKey"
|
||||
lockConfigurationKV = "LockConfiguration"
|
||||
oidKV = "OID"
|
||||
cidKV = "CID"
|
||||
|
||||
isCombinedKV = "IsCombined"
|
||||
isUnversionedKV = "IsUnversioned"
|
||||
|
@ -496,16 +498,16 @@ func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, se
|
|||
return nil
|
||||
}
|
||||
|
||||
func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
|
||||
func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.Address, error) {
|
||||
node, err := c.getSystemNode(ctx, bktInfo, corsFilename)
|
||||
if err != nil {
|
||||
return oid.ID{}, err
|
||||
return oid.Address{}, err
|
||||
}
|
||||
|
||||
return node.Latest().ObjID, nil
|
||||
return getTreeNodeAddress(node.Latest())
|
||||
}
|
||||
|
||||
func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) ([]oid.ID, error) {
|
||||
func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, addr oid.Address) ([]oid.Address, error) {
|
||||
multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename)
|
||||
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
||||
if err != nil && !isErrNotFound {
|
||||
|
@ -514,7 +516,8 @@ func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objI
|
|||
|
||||
meta := make(map[string]string)
|
||||
meta[FileNameKey] = corsFilename
|
||||
meta[oidKV] = objID.EncodeToString()
|
||||
meta[oidKV] = addr.Object().EncodeToString()
|
||||
meta[cidKV] = addr.Container().EncodeToString()
|
||||
|
||||
if isErrNotFound {
|
||||
if _, err = c.service.AddNode(ctx, bktInfo, systemTree, 0, meta); err != nil {
|
||||
|
@ -533,15 +536,18 @@ func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objI
|
|||
return nil, fmt.Errorf("move cors node: %w", err)
|
||||
}
|
||||
|
||||
objToDelete := make([]oid.ID, 1, len(multiNode.nodes))
|
||||
objToDelete[0] = latest.ObjID
|
||||
objToDelete := make([]oid.Address, 1, len(multiNode.nodes))
|
||||
objToDelete[0], err = getTreeNodeAddress(latest)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse object addr of latest cors node in tree: %w", err)
|
||||
}
|
||||
|
||||
objToDelete = append(objToDelete, c.cleanOldNodes(ctx, multiNode.Old(), bktInfo)...)
|
||||
|
||||
return objToDelete, nil
|
||||
}
|
||||
|
||||
func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.ID, error) {
|
||||
func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) ([]oid.Address, error) {
|
||||
multiNode, err := c.getSystemNode(ctx, bktInfo, corsFilename)
|
||||
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
||||
if err != nil && !isErrNotFound {
|
||||
|
@ -560,8 +566,23 @@ func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (
|
|||
return objToDelete, nil
|
||||
}
|
||||
|
||||
func (c *Tree) cleanOldNodes(ctx context.Context, nodes []*treeNode, bktInfo *data.BucketInfo) []oid.ID {
|
||||
res := make([]oid.ID, 0, len(nodes))
|
||||
func getTreeNodeAddress(node *treeNode) (oid.Address, error) {
|
||||
var addr oid.Address
|
||||
addr.SetObject(node.ObjID)
|
||||
|
||||
if cidStr, ok := node.Get(cidKV); ok {
|
||||
var cnrID cid.ID
|
||||
if err := cnrID.DecodeString(cidStr); err != nil {
|
||||
return oid.Address{}, fmt.Errorf("couldn't decode cid: %w", err)
|
||||
}
|
||||
addr.SetContainer(cnrID)
|
||||
}
|
||||
|
||||
return addr, nil
|
||||
}
|
||||
|
||||
func (c *Tree) cleanOldNodes(ctx context.Context, nodes []*treeNode, bktInfo *data.BucketInfo) []oid.Address {
|
||||
res := make([]oid.Address, 0, len(nodes))
|
||||
|
||||
for _, node := range nodes {
|
||||
ind := node.GetLatestNodeIndex()
|
||||
|
@ -571,7 +592,12 @@ func (c *Tree) cleanOldNodes(ctx context.Context, nodes []*treeNode, bktInfo *da
|
|||
if err := c.service.RemoveNode(ctx, bktInfo, systemTree, node.ID[ind]); err != nil {
|
||||
c.reqLogger(ctx).Warn(logs.FailedToRemoveOldSystemNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind]))
|
||||
} else {
|
||||
res = append(res, node.ObjID)
|
||||
addr, err := getTreeNodeAddress(node)
|
||||
if err != nil {
|
||||
c.log.Warn(logs.FailedToParseAddressInTreeNode, zap.String("FileName", node.Meta[FileNameKey]), zap.Uint64("id", node.ID[ind]))
|
||||
continue
|
||||
}
|
||||
res = append(res, addr)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue