forked from TrueCloudLab/frostfs-s3-gw
Compare commits
3 commits
5b10514bd3
...
6755ae912d
Author | SHA1 | Date | |
---|---|---|---|
6755ae912d | |||
128939c01e | |||
4a4ce00994 |
20 changed files with 335 additions and 48 deletions
|
@ -41,6 +41,7 @@ type (
|
|||
RetryMaxAttempts() int
|
||||
RetryMaxBackoff() time.Duration
|
||||
RetryStrategy() RetryStrategy
|
||||
TLSTerminationHeader() string
|
||||
}
|
||||
|
||||
FrostFSID interface {
|
||||
|
|
|
@ -98,7 +98,7 @@ func (h *handler) GetObjectAttributesHandler(w http.ResponseWriter, r *http.Requ
|
|||
}
|
||||
info := extendedInfo.ObjectInfo
|
||||
|
||||
encryptionParams, err := formEncryptionParams(r)
|
||||
encryptionParams, err := h.formEncryptionParams(r)
|
||||
if err != nil {
|
||||
h.logAndSendError(ctx, w, "invalid sse headers", reqInfo, err)
|
||||
return
|
||||
|
|
|
@ -103,12 +103,12 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
srcObjInfo := extendedSrcObjInfo.ObjectInfo
|
||||
|
||||
srcEncryptionParams, err := formCopySourceEncryptionParams(r)
|
||||
srcEncryptionParams, err := h.formCopySourceEncryptionParams(r)
|
||||
if err != nil {
|
||||
h.logAndSendError(ctx, w, "invalid sse headers", reqInfo, err)
|
||||
return
|
||||
}
|
||||
dstEncryptionParams, err := formEncryptionParams(r)
|
||||
dstEncryptionParams, err := h.formEncryptionParams(r)
|
||||
if err != nil {
|
||||
h.logAndSendError(ctx, w, "invalid sse headers", reqInfo, err)
|
||||
return
|
||||
|
|
|
@ -202,7 +202,7 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
encryptionParams, err := formEncryptionParams(r)
|
||||
encryptionParams, err := h.formEncryptionParams(r)
|
||||
if err != nil {
|
||||
h.logAndSendError(ctx, w, "invalid sse headers", reqInfo, err)
|
||||
return
|
||||
|
|
|
@ -141,6 +141,10 @@ func (c *configMock) RetryStrategy() RetryStrategy {
|
|||
return RetryStrategyConstant
|
||||
}
|
||||
|
||||
func (c *configMock) TLSTerminationHeader() string {
|
||||
return "X-Frostfs-TLS-Termination"
|
||||
}
|
||||
|
||||
func prepareHandlerContext(t *testing.T) *handlerContext {
|
||||
hc, err := prepareHandlerContextBase(layer.DefaultCachesConfigs(zap.NewExample()))
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -51,7 +51,7 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
info := extendedInfo.ObjectInfo
|
||||
|
||||
encryptionParams, err := formEncryptionParams(r)
|
||||
encryptionParams, err := h.formEncryptionParams(r)
|
||||
if err != nil {
|
||||
h.logAndSendError(ctx, w, "invalid sse headers", reqInfo, err)
|
||||
return
|
||||
|
|
|
@ -138,7 +138,7 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re
|
|||
}
|
||||
}
|
||||
|
||||
p.Info.Encryption, err = formEncryptionParams(r)
|
||||
p.Info.Encryption, err = h.formEncryptionParams(r)
|
||||
if err != nil {
|
||||
h.logAndSendError(ctx, w, "invalid sse headers", reqInfo, err, additional...)
|
||||
return
|
||||
|
@ -223,7 +223,7 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
|
|||
ContentSHA256Hash: r.Header.Get(api.AmzContentSha256),
|
||||
}
|
||||
|
||||
p.Info.Encryption, err = formEncryptionParams(r)
|
||||
p.Info.Encryption, err = h.formEncryptionParams(r)
|
||||
if err != nil {
|
||||
h.logAndSendError(ctx, w, "invalid sse headers", reqInfo, err, additional...)
|
||||
return
|
||||
|
@ -323,7 +323,7 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
srcEncryptionParams, err := formCopySourceEncryptionParams(r)
|
||||
srcEncryptionParams, err := h.formCopySourceEncryptionParams(r)
|
||||
if err != nil {
|
||||
h.logAndSendError(ctx, w, "invalid sse headers", reqInfo, err)
|
||||
return
|
||||
|
@ -348,7 +348,7 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
|
|||
Range: srcRange,
|
||||
}
|
||||
|
||||
p.Info.Encryption, err = formEncryptionParams(r)
|
||||
p.Info.Encryption, err = h.formEncryptionParams(r)
|
||||
if err != nil {
|
||||
h.logAndSendError(ctx, w, "invalid sse headers", reqInfo, err, additional...)
|
||||
return
|
||||
|
@ -593,7 +593,7 @@ func (h *handler) ListPartsHandler(w http.ResponseWriter, r *http.Request) {
|
|||
PartNumberMarker: partNumberMarker,
|
||||
}
|
||||
|
||||
p.Info.Encryption, err = formEncryptionParams(r)
|
||||
p.Info.Encryption, err = h.formEncryptionParams(r)
|
||||
if err != nil {
|
||||
h.logAndSendError(ctx, w, "invalid sse headers", reqInfo, err)
|
||||
return
|
||||
|
@ -629,7 +629,7 @@ func (h *handler) AbortMultipartUploadHandler(w http.ResponseWriter, r *http.Req
|
|||
Key: reqInfo.ObjectName,
|
||||
}
|
||||
|
||||
p.Encryption, err = formEncryptionParams(r)
|
||||
p.Encryption, err = h.formEncryptionParams(r)
|
||||
if err != nil {
|
||||
h.logAndSendError(ctx, w, "invalid sse headers", reqInfo, err)
|
||||
return
|
||||
|
|
|
@ -55,13 +55,17 @@ func TestDeleteMultipartAllParts(t *testing.T) {
|
|||
// unversioned bucket
|
||||
createTestBucket(hc, bktName)
|
||||
multipartUpload(hc, bktName, objName, nil, objLen, partSize)
|
||||
hc.tp.ClearTombstoneOIDCount()
|
||||
deleteObject(t, hc, bktName, objName, emptyVersion)
|
||||
require.Empty(t, hc.tp.Objects())
|
||||
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||
|
||||
// encrypted multipart
|
||||
multipartUploadEncrypted(hc, bktName, objName, nil, objLen, partSize)
|
||||
hc.tp.ClearTombstoneOIDCount()
|
||||
deleteObject(t, hc, bktName, objName, emptyVersion)
|
||||
require.Empty(t, hc.tp.Objects())
|
||||
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||
|
||||
// versions bucket
|
||||
createTestBucket(hc, bktName2)
|
||||
|
@ -69,8 +73,11 @@ func TestDeleteMultipartAllParts(t *testing.T) {
|
|||
multipartUpload(hc, bktName2, objName, nil, objLen, partSize)
|
||||
_, hdr := getObject(hc, bktName2, objName)
|
||||
versionID := hdr.Get("X-Amz-Version-Id")
|
||||
hc.tp.ClearTombstoneOIDCount()
|
||||
deleteObject(t, hc, bktName2, objName, emptyVersion)
|
||||
require.Equal(t, 0, hc.tp.TombstoneOIDCount())
|
||||
deleteObject(t, hc, bktName2, objName, versionID)
|
||||
require.Equal(t, objLen/partSize+1, hc.tp.TombstoneOIDCount())
|
||||
require.Empty(t, hc.tp.Objects())
|
||||
}
|
||||
|
||||
|
|
|
@ -228,7 +228,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
metadata[api.ContentLanguage] = contentLanguage
|
||||
}
|
||||
|
||||
encryptionParams, err := formEncryptionParams(r)
|
||||
encryptionParams, err := h.formEncryptionParams(r)
|
||||
if err != nil {
|
||||
h.logAndSendError(ctx, w, "invalid sse headers", reqInfo, err)
|
||||
return
|
||||
|
@ -363,15 +363,15 @@ func (h *handler) getBodyReader(r *http.Request) (io.ReadCloser, error) {
|
|||
return chunkReader, nil
|
||||
}
|
||||
|
||||
func formEncryptionParams(r *http.Request) (enc encryption.Params, err error) {
|
||||
return formEncryptionParamsBase(r, false)
|
||||
func (h *handler) formEncryptionParams(r *http.Request) (enc encryption.Params, err error) {
|
||||
return h.formEncryptionParamsBase(r, false)
|
||||
}
|
||||
|
||||
func formCopySourceEncryptionParams(r *http.Request) (enc encryption.Params, err error) {
|
||||
return formEncryptionParamsBase(r, true)
|
||||
func (h *handler) formCopySourceEncryptionParams(r *http.Request) (enc encryption.Params, err error) {
|
||||
return h.formEncryptionParamsBase(r, true)
|
||||
}
|
||||
|
||||
func formEncryptionParamsBase(r *http.Request, isCopySource bool) (enc encryption.Params, err error) {
|
||||
func (h *handler) formEncryptionParamsBase(r *http.Request, isCopySource bool) (enc encryption.Params, err error) {
|
||||
var sseCustomerAlgorithm, sseCustomerKey, sseCustomerKeyMD5 string
|
||||
if isCopySource {
|
||||
sseCustomerAlgorithm = r.Header.Get(api.AmzCopySourceServerSideEncryptionCustomerAlgorithm)
|
||||
|
@ -387,7 +387,17 @@ func formEncryptionParamsBase(r *http.Request, isCopySource bool) (enc encryptio
|
|||
return
|
||||
}
|
||||
|
||||
if r.TLS == nil {
|
||||
needCheckTLS := true
|
||||
if tlsTerminationStr := r.Header.Get(h.cfg.TLSTerminationHeader()); len(tlsTerminationStr) > 0 {
|
||||
tlsTermination, err := strconv.ParseBool(tlsTerminationStr)
|
||||
if err != nil {
|
||||
h.reqLogger(r.Context()).Warn(logs.WarnInvalidTypeTLSTerminationHeader, zap.String("header", tlsTerminationStr), zap.Error(err))
|
||||
} else {
|
||||
needCheckTLS = !tlsTermination
|
||||
}
|
||||
}
|
||||
|
||||
if needCheckTLS && r.TLS == nil {
|
||||
return enc, apierr.GetAPIError(apierr.ErrInsecureSSECustomerRequest)
|
||||
}
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"crypto/md5"
|
||||
"crypto/tls"
|
||||
"encoding/base64"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
|
@ -634,6 +635,186 @@ func TestPutObjectWithContentLanguage(t *testing.T) {
|
|||
require.Equal(t, expectedContentLanguage, w.Header().Get(api.ContentLanguage))
|
||||
}
|
||||
|
||||
func TestFormEncryptionParamsBase(t *testing.T) {
|
||||
hc := prepareHandlerContext(t)
|
||||
|
||||
userSecret := "test1customer2secret3with32char4"
|
||||
expectedEncKey := []byte(userSecret)
|
||||
emptyEncKey := []byte(nil)
|
||||
|
||||
validAlgo := "AES256"
|
||||
validKey := "dGVzdDFjdXN0b21lcjJzZWNyZXQzd2l0aDMyY2hhcjQ="
|
||||
validMD5 := "zcQmPqFhtJaxkOIg5tXm9g=="
|
||||
|
||||
invalidAlgo := "TTT111"
|
||||
invalidKeyBase64 := "dGVzdDFjdXN0b21lcjJzZWNyZXQzd2l0aDMyY2hhcjQ"
|
||||
invalidKeySize := "dGVzdDFjdXN0b21lcjJzZWNyZXQzd2l0aA=="
|
||||
invalidMD5Base64 := "zcQmPqFhtJaxkOIg5tXm9g"
|
||||
invalidMD5 := "zcQmPqPhtJaxkOIg5tXm9g=="
|
||||
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
algo string
|
||||
key string
|
||||
md5 string
|
||||
tlsTermination string
|
||||
reqWithoutTLS bool
|
||||
reqWithoutSSE bool
|
||||
isCopySource bool
|
||||
err error
|
||||
}{
|
||||
{
|
||||
name: "valid requst copy source",
|
||||
algo: validAlgo,
|
||||
key: validKey,
|
||||
md5: validMD5,
|
||||
isCopySource: true,
|
||||
},
|
||||
{
|
||||
name: "valid request with TLS",
|
||||
algo: validAlgo,
|
||||
key: validKey,
|
||||
md5: validMD5,
|
||||
},
|
||||
{
|
||||
name: "valid request without TLS and valid termination header",
|
||||
algo: validAlgo,
|
||||
key: validKey,
|
||||
md5: validMD5,
|
||||
tlsTermination: "true",
|
||||
reqWithoutTLS: true,
|
||||
},
|
||||
{
|
||||
name: "request without tls and termination header",
|
||||
algo: validAlgo,
|
||||
key: validKey,
|
||||
md5: validMD5,
|
||||
reqWithoutTLS: true,
|
||||
err: apierr.GetAPIError(apierr.ErrInsecureSSECustomerRequest),
|
||||
},
|
||||
{
|
||||
name: "request without tls and invalid header",
|
||||
algo: validAlgo,
|
||||
key: validKey,
|
||||
md5: validMD5,
|
||||
tlsTermination: "invalid",
|
||||
reqWithoutTLS: true,
|
||||
err: apierr.GetAPIError(apierr.ErrInsecureSSECustomerRequest),
|
||||
},
|
||||
{
|
||||
name: "missing SSE customer algorithm",
|
||||
key: validKey,
|
||||
md5: validMD5,
|
||||
err: apierr.GetAPIError(apierr.ErrMissingSSECustomerAlgorithm),
|
||||
},
|
||||
{
|
||||
name: "missing SSE customer key",
|
||||
algo: validAlgo,
|
||||
md5: validMD5,
|
||||
err: apierr.GetAPIError(apierr.ErrMissingSSECustomerKey),
|
||||
},
|
||||
{
|
||||
name: "invalid encryption algorithm",
|
||||
algo: invalidAlgo,
|
||||
key: validKey,
|
||||
md5: validMD5,
|
||||
err: apierr.GetAPIError(apierr.ErrInvalidEncryptionAlgorithm),
|
||||
},
|
||||
{
|
||||
name: "invalid base64 SSE customer key",
|
||||
algo: validAlgo,
|
||||
key: invalidKeyBase64,
|
||||
md5: validMD5,
|
||||
err: apierr.GetAPIError(apierr.ErrInvalidSSECustomerKey),
|
||||
},
|
||||
{
|
||||
name: "invalid base64 SSE customer parameters",
|
||||
algo: validAlgo,
|
||||
key: invalidKeyBase64,
|
||||
md5: validMD5,
|
||||
isCopySource: true,
|
||||
err: apierr.GetAPIError(apierr.ErrInvalidSSECustomerParameters),
|
||||
},
|
||||
{
|
||||
name: "invalid size of custom key",
|
||||
algo: validAlgo,
|
||||
key: invalidKeySize,
|
||||
md5: validMD5,
|
||||
err: apierr.GetAPIError(apierr.ErrInvalidSSECustomerKey),
|
||||
},
|
||||
{
|
||||
name: "invalid size of custom key - copy source",
|
||||
algo: validAlgo,
|
||||
key: invalidKeySize,
|
||||
md5: validMD5,
|
||||
isCopySource: true,
|
||||
err: apierr.GetAPIError(apierr.ErrInvalidSSECustomerParameters),
|
||||
},
|
||||
{
|
||||
name: "invalid base64 key md5 of customer",
|
||||
algo: validAlgo,
|
||||
key: validKey,
|
||||
md5: invalidMD5Base64,
|
||||
err: apierr.GetAPIError(apierr.ErrSSECustomerKeyMD5Mismatch),
|
||||
},
|
||||
{
|
||||
name: "invalid md5 sum key of customer",
|
||||
algo: validAlgo,
|
||||
key: validKey,
|
||||
md5: invalidMD5,
|
||||
err: apierr.GetAPIError(apierr.ErrSSECustomerKeyMD5Mismatch),
|
||||
},
|
||||
{
|
||||
name: "request without sse",
|
||||
reqWithoutSSE: true,
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
r := prepareRequestForEncryption(tc.algo, tc.key, tc.md5, tc.tlsTermination, tc.reqWithoutTLS, tc.reqWithoutSSE, tc.isCopySource)
|
||||
|
||||
enc, err := hc.h.formEncryptionParamsBase(r, tc.isCopySource)
|
||||
if tc.err != nil {
|
||||
require.ErrorIs(t, tc.err, err)
|
||||
return
|
||||
}
|
||||
|
||||
require.NoError(t, err)
|
||||
|
||||
if tc.reqWithoutSSE {
|
||||
require.Equal(t, emptyEncKey, enc.Key())
|
||||
} else {
|
||||
require.Equal(t, expectedEncKey, enc.Key())
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func prepareRequestForEncryption(algo, key, md5, tlsTermination string, reqWithoutTLS, reqWithoutSSE, isCopySource bool) *http.Request {
|
||||
r := httptest.NewRequest(http.MethodPost, "/", nil)
|
||||
|
||||
if !reqWithoutTLS {
|
||||
r.TLS = &tls.ConnectionState{}
|
||||
}
|
||||
|
||||
if !reqWithoutSSE {
|
||||
if isCopySource {
|
||||
r.Header.Set(api.AmzCopySourceServerSideEncryptionCustomerAlgorithm, algo)
|
||||
r.Header.Set(api.AmzCopySourceServerSideEncryptionCustomerKey, key)
|
||||
r.Header.Set(api.AmzCopySourceServerSideEncryptionCustomerKeyMD5, md5)
|
||||
} else {
|
||||
r.Header.Set(api.AmzServerSideEncryptionCustomerAlgorithm, algo)
|
||||
r.Header.Set(api.AmzServerSideEncryptionCustomerKey, key)
|
||||
r.Header.Set(api.AmzServerSideEncryptionCustomerKeyMD5, md5)
|
||||
}
|
||||
}
|
||||
|
||||
if tlsTermination != "" {
|
||||
r.Header.Set("X-Frostfs-TLS-Termination", tlsTermination)
|
||||
}
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
func postObjectBase(hc *handlerContext, ns, bktName, key, filename, content string) *httptest.ResponseRecorder {
|
||||
policy := "eyJleHBpcmF0aW9uIjogIjIwMjUtMTItMDFUMTI6MDA6MDAuMDAwWiIsImNvbmRpdGlvbnMiOiBbCiBbInN0YXJ0cy13aXRoIiwgIiR4LWFtei1jcmVkZW50aWFsIiwgIiJdLAogWyJzdGFydHMtd2l0aCIsICIkeC1hbXotZGF0ZSIsICIiXSwKIFsic3RhcnRzLXdpdGgiLCAiJGtleSIsICIiXQpdfQ=="
|
||||
|
||||
|
|
|
@ -75,13 +75,14 @@ func (k *FeatureSettingsMock) FormContainerZone(ns string) string {
|
|||
var _ frostfs.FrostFS = (*TestFrostFS)(nil)
|
||||
|
||||
type TestFrostFS struct {
|
||||
objects map[string]*object.Object
|
||||
objectErrors map[string]error
|
||||
objectPutErrors map[string]error
|
||||
containers map[string]*container.Container
|
||||
chains map[string][]chain.Chain
|
||||
currentEpoch uint64
|
||||
key *keys.PrivateKey
|
||||
objects map[string]*object.Object
|
||||
objectErrors map[string]error
|
||||
objectPutErrors map[string]error
|
||||
containers map[string]*container.Container
|
||||
chains map[string][]chain.Chain
|
||||
currentEpoch uint64
|
||||
key *keys.PrivateKey
|
||||
tombstoneOIDCount int
|
||||
}
|
||||
|
||||
func NewTestFrostFS(key *keys.PrivateKey) *TestFrostFS {
|
||||
|
@ -373,6 +374,7 @@ func (t *TestFrostFS) createTombstone(ctx context.Context, prm frostfs.PrmObject
|
|||
if err = t.DeleteObject(ctx, prmDelete); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
t.tombstoneOIDCount++
|
||||
}
|
||||
|
||||
return &frostfs.CreateObjectResult{
|
||||
|
@ -380,6 +382,14 @@ func (t *TestFrostFS) createTombstone(ctx context.Context, prm frostfs.PrmObject
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) TombstoneOIDCount() int {
|
||||
return t.tombstoneOIDCount
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) ClearTombstoneOIDCount() {
|
||||
t.tombstoneOIDCount = 0
|
||||
}
|
||||
|
||||
func (t *TestFrostFS) DeleteObject(ctx context.Context, prm frostfs.PrmObjectDelete) error {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(prm.Container)
|
||||
|
|
|
@ -763,25 +763,43 @@ func (n *Layer) removeCombinedObject(ctx context.Context, bkt *data.BucketInfo,
|
|||
}
|
||||
|
||||
tokens := prepareTokensParameter(ctx, bkt.Owner)
|
||||
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, nodeVersion.OID, tokens)
|
||||
if err != nil {
|
||||
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
|
||||
zap.String("oid", nodeVersion.OID.EncodeToString()), zap.Error(err))
|
||||
}
|
||||
|
||||
members := append(oids, nodeVersion.OID)
|
||||
members := make([]oid.ID, 0)
|
||||
// First gateway tries to delete all object parts.
|
||||
// In case of errors, abort multipart removal.
|
||||
for _, part := range parts {
|
||||
oids, err = relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, part.OID, tokens)
|
||||
if err != nil {
|
||||
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
|
||||
zap.String("oid", part.OID.EncodeToString()), zap.Error(err))
|
||||
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, part.OID, tokens)
|
||||
if err == nil {
|
||||
members = append(members, append(oids, part.OID)...)
|
||||
continue
|
||||
}
|
||||
|
||||
members = append(members, append(oids, part.OID)...)
|
||||
if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) {
|
||||
return fmt.Errorf("failed to list all object relations '%s': %w", part.OID.EncodeToString(), err)
|
||||
}
|
||||
|
||||
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
|
||||
zap.String("oid", part.OID.EncodeToString()), zap.Error(err))
|
||||
}
|
||||
|
||||
n.putTombstones(ctx, bkt, networkInfo, members)
|
||||
return nil
|
||||
if err = n.putTombstones(ctx, bkt, networkInfo, members); err != nil {
|
||||
return fmt.Errorf("put tombstones with parts: %w", err)
|
||||
}
|
||||
|
||||
// If all parts were removed successfully, remove multipart linking object.
|
||||
// Do not delete this object first, because gateway won't be able to find parts.
|
||||
oids, err := relations.ListAllRelations(ctx, n.frostFS.Relations(), bkt.CID, nodeVersion.OID, tokens)
|
||||
if err != nil {
|
||||
if !client.IsErrObjectAlreadyRemoved(err) && !client.IsErrObjectNotFound(err) {
|
||||
return fmt.Errorf("failed to list all object relations '%s': %w", nodeVersion.OID.EncodeToString(), err)
|
||||
}
|
||||
|
||||
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
|
||||
zap.String("oid", nodeVersion.OID.EncodeToString()), zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
members = append(oids, nodeVersion.OID)
|
||||
|
||||
return n.putTombstones(ctx, bkt, networkInfo, members)
|
||||
}
|
||||
|
||||
// DeleteObjects from the storage.
|
||||
|
|
|
@ -586,12 +586,16 @@ func (n *Layer) deleteUploadedParts(ctx context.Context, bkt *data.BucketInfo, p
|
|||
if err != nil {
|
||||
n.reqLogger(ctx).Warn(logs.FailedToListAllObjectRelations, zap.String("cid", bkt.CID.EncodeToString()),
|
||||
zap.String("oid", info.OID.EncodeToString()), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
members = append(members, append(oids, info.OID)...)
|
||||
}
|
||||
}
|
||||
|
||||
n.putTombstones(ctx, bkt, networkInfo, members)
|
||||
err := n.putTombstones(ctx, bkt, networkInfo, members)
|
||||
if err != nil {
|
||||
n.reqLogger(ctx).Warn(logs.FailedToPutTombstones, zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
func (n *Layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsInfo, error) {
|
||||
|
|
|
@ -21,23 +21,35 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) {
|
||||
func (n *Layer) putTombstones(ctx context.Context, bkt *data.BucketInfo, networkInfo netmap.NetworkInfo, members []oid.ID) error {
|
||||
var wg sync.WaitGroup
|
||||
tombstoneMembersSize := n.features.TombstoneMembersSize()
|
||||
tombstoneLifetime := n.features.TombstoneLifetime()
|
||||
tombstonesCount := len(members) / tombstoneMembersSize
|
||||
if len(members)%tombstoneMembersSize != 0 {
|
||||
tombstonesCount++
|
||||
}
|
||||
errCh := make(chan error, tombstonesCount)
|
||||
|
||||
for i := 0; i < len(members); i += tombstoneMembersSize {
|
||||
for i := 0; i < tombstonesCount; i++ {
|
||||
end := tombstoneMembersSize * (i + 1)
|
||||
if end > len(members) {
|
||||
end = len(members)
|
||||
}
|
||||
n.submitPutTombstone(ctx, bkt, members[i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg)
|
||||
n.submitPutTombstone(ctx, bkt, members[tombstoneMembersSize*i:end], networkInfo.CurrentEpoch()+tombstoneLifetime, &wg, errCh)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
close(errCh)
|
||||
|
||||
if err := <-errCh; err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup) {
|
||||
func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, members []oid.ID, expEpoch uint64, wg *sync.WaitGroup, errCh chan<- error) {
|
||||
tomb := object.NewTombstone()
|
||||
tomb.SetExpirationEpoch(expEpoch)
|
||||
tomb.SetMembers(members)
|
||||
|
@ -48,11 +60,13 @@ func (n *Layer) submitPutTombstone(ctx context.Context, bkt *data.BucketInfo, me
|
|||
|
||||
if err := n.putTombstoneObject(ctx, tomb, bkt); err != nil {
|
||||
n.reqLogger(ctx).Warn(logs.FailedToPutTombstoneObject, zap.String("cid", bkt.CID.EncodeToString()), zap.Error(err))
|
||||
errCh <- fmt.Errorf("put tombstone object: %w", err)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
wg.Done()
|
||||
n.reqLogger(ctx).Warn(logs.FailedToSubmitTaskToPool, zap.Error(err))
|
||||
errCh <- fmt.Errorf("submit task to pool: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -129,6 +129,7 @@ type (
|
|||
retryStrategy handler.RetryStrategy
|
||||
tombstoneMembersSize int
|
||||
tombstoneLifetime uint64
|
||||
tlsTerminationHeader string
|
||||
}
|
||||
|
||||
maxClientsConfig struct {
|
||||
|
@ -316,6 +317,7 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
|
|||
httpLoggingUseGzip := v.GetBool(cfgHTTPLoggingGzip)
|
||||
tombstoneMembersSize := fetchTombstoneMembersSize(v)
|
||||
tombstoneLifetime := fetchTombstoneLifetime(v)
|
||||
tlsTerminationHeader := v.GetString(cfgEncryptionTLSTerminationHeader)
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
@ -347,6 +349,7 @@ func (s *appSettings) update(v *viper.Viper, log *zap.Logger) {
|
|||
s.vhsNamespacesEnabled = vhsNamespacesEnabled
|
||||
s.tombstoneMembersSize = tombstoneMembersSize
|
||||
s.tombstoneLifetime = tombstoneLifetime
|
||||
s.tlsTerminationHeader = tlsTerminationHeader
|
||||
}
|
||||
|
||||
func (s *appSettings) prepareVHSNamespaces(v *viper.Viper, log *zap.Logger, defaultNamespaces []string) map[string]bool {
|
||||
|
@ -541,6 +544,12 @@ func (s *appSettings) RetryStrategy() handler.RetryStrategy {
|
|||
return s.retryStrategy
|
||||
}
|
||||
|
||||
func (s *appSettings) TLSTerminationHeader() string {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
return s.tlsTerminationHeader
|
||||
}
|
||||
|
||||
func (s *appSettings) AccessBoxContainer() (cid.ID, bool) {
|
||||
if s.accessbox != nil {
|
||||
return *s.accessbox, true
|
||||
|
|
|
@ -59,9 +59,10 @@ const (
|
|||
|
||||
defaultAccessBoxCacheRemovingCheckInterval = 5 * time.Minute
|
||||
|
||||
defaultNamespaceHeader = "X-Frostfs-Namespace"
|
||||
defaultVHSHeader = "X-Frostfs-S3-VHS"
|
||||
defaultServernameHeader = "X-Frostfs-Servername"
|
||||
defaultNamespaceHeader = "X-Frostfs-Namespace"
|
||||
defaultVHSHeader = "X-Frostfs-S3-VHS"
|
||||
defaultServernameHeader = "X-Frostfs-Servername"
|
||||
defaultTLSTerminationHeader = "X-Frostfs-TLS-Termination"
|
||||
|
||||
defaultMultinetFallbackDelay = 300 * time.Millisecond
|
||||
|
||||
|
@ -280,6 +281,9 @@ const ( // Settings.
|
|||
// Server.
|
||||
cfgReconnectInterval = "reconnect_interval"
|
||||
|
||||
// Encryption.
|
||||
cfgEncryptionTLSTerminationHeader = "encryption.tls_termination_header"
|
||||
|
||||
// envPrefix is an environment variables prefix used for configuration.
|
||||
envPrefix = "S3_GW"
|
||||
)
|
||||
|
@ -946,6 +950,9 @@ func newSettings() *viper.Viper {
|
|||
// multinet
|
||||
v.SetDefault(cfgMultinetFallbackDelay, defaultMultinetFallbackDelay)
|
||||
|
||||
// encryption
|
||||
v.SetDefault(cfgEncryptionTLSTerminationHeader, defaultTLSTerminationHeader)
|
||||
|
||||
// Bind flags
|
||||
if err := bindFlags(v, flags); err != nil {
|
||||
panic(fmt.Errorf("bind flags: %w", err))
|
||||
|
|
|
@ -269,3 +269,6 @@ S3_GW_MULTINET_FALLBACK_DELAY=300ms
|
|||
# List of subnets and IP addresses to use as source for those subnets
|
||||
S3_GW_MULTINET_SUBNETS_1_MASK=1.2.3.4/24
|
||||
S3_GW_MULTINET_SUBNETS_1_SOURCE_IPS=1.2.3.4 1.2.3.5
|
||||
|
||||
# Header for determining the termination of TLS.
|
||||
S3_GW_ENCRYPTION_TLS_TERMINATION_TLS_HEADER=X-Frostfs-TLS-Termination
|
||||
|
|
|
@ -318,3 +318,6 @@ multinet:
|
|||
source_ips:
|
||||
- 1.2.3.4
|
||||
- 1.2.3.5
|
||||
|
||||
encryption:
|
||||
tls_termination_header: X-Frostfs-TLS-Termination
|
||||
|
|
|
@ -196,6 +196,7 @@ There are some custom types used for brevity:
|
|||
| `containers` | [Containers configuration](#containers-section) |
|
||||
| `vhs` | [VHS configuration](#vhs-section) |
|
||||
| `multinet` | [Multinet configuration](#multinet-section) |
|
||||
| `encryption` | [Encryption configuration](#encryption-section) |
|
||||
|
||||
### General section
|
||||
|
||||
|
@ -681,7 +682,7 @@ web:
|
|||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|-----------------------|------------|---------------|---------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| `read_timeout` | `duration` | no | `0` | The maximum duration for reading the entire request, including the body. A zero or negative value means there will be no timeout. |
|
||||
| `read_timeout` | `duration` | no | `0` | The maximum duration for reading the entire request, including the body. A zero or negative value means there will be no timeout. |
|
||||
| `read_header_timeout` | `duration` | no | `30s` | The amount of time allowed to read request headers. If `read_header_timeout` is zero, the value of `read_timeout` is used. If both are zero, there is no timeout. |
|
||||
| `write_timeout` | `duration` | no | `0` | The maximum duration before timing out writes of the response. A zero or negative value means there will be no timeout. |
|
||||
| `idle_timeout` | `duration` | no | `30s` | The maximum amount of time to wait for the next request when keep-alives are enabled. If `idle_timeout` is zero, the value of `read_timeout` is used. If both are zero, there is no timeout. |
|
||||
|
@ -858,3 +859,16 @@ multinet:
|
|||
|--------------|------------|---------------|---------------|----------------------------------------------------------------------|
|
||||
| `mask` | `string` | yes | | Destination subnet. |
|
||||
| `source_ips` | `[]string` | yes | | Array of source IP addresses to use when dialing destination subnet. |
|
||||
|
||||
# `encryption` section
|
||||
|
||||
Configuration of encryption.
|
||||
|
||||
```yaml
|
||||
encryption:
|
||||
tls_termination_header: X-Frostfs-TLS-Termination
|
||||
```
|
||||
|
||||
| Parameter | Type | SIGHUP reload | Default value | Description |
|
||||
|--------------------------|----------|---------------|-----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| `tls_termination_header` | `string` | yes | `X-Frostfs-TLS-Termination` | The header for determining whether TLS needs to be checked. If the system requests come through a proxy server and TLS can terminate at the proxy level, you should use this header to disable TLS verification at server-side encryption. |
|
||||
|
|
|
@ -181,4 +181,6 @@ const (
|
|||
FailedToPutTombstoneObject = "failed to put tombstone object"
|
||||
FailedToCreateWorkerPool = "failed to create worker pool"
|
||||
FailedToListAllObjectRelations = "failed to list all object relations"
|
||||
WarnInvalidTypeTLSTerminationHeader = "invalid type of value of tls termination header"
|
||||
FailedToPutTombstones = "failed to put tombstones"
|
||||
)
|
||||
|
|
Loading…
Reference in a new issue