forked from TrueCloudLab/frostfs-s3-gw
[#637] Add header to override CopiesNumber
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
d2c68589b5
commit
c3ad6d2faf
7 changed files with 127 additions and 69 deletions
|
@ -67,12 +67,13 @@ type ObjectTaggingInfo struct {
|
||||||
type MultipartInfo struct {
|
type MultipartInfo struct {
|
||||||
// ID is node id in tree service.
|
// ID is node id in tree service.
|
||||||
// It's ignored when creating a new multipart upload.
|
// It's ignored when creating a new multipart upload.
|
||||||
ID uint64
|
ID uint64
|
||||||
Key string
|
Key string
|
||||||
UploadID string
|
UploadID string
|
||||||
Owner user.ID
|
Owner user.ID
|
||||||
Created time.Time
|
Created time.Time
|
||||||
Meta map[string]string
|
Meta map[string]string
|
||||||
|
CopiesNumber uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
// PartInfo is upload information about part.
|
// PartInfo is upload information about part.
|
||||||
|
|
|
@ -121,14 +121,21 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
metadata[api.ContentType] = contentType
|
metadata[api.ContentType] = contentType
|
||||||
}
|
}
|
||||||
|
|
||||||
|
copiesNumber, err := getCopiesNumberOrDefault(metadata, h.cfg.CopiesNumber)
|
||||||
|
if err != nil {
|
||||||
|
h.logAndSendError(w, "invalid copies number", reqInfo, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
params := &layer.CopyObjectParams{
|
params := &layer.CopyObjectParams{
|
||||||
SrcObject: objInfo,
|
SrcObject: objInfo,
|
||||||
ScrBktInfo: p.BktInfo,
|
ScrBktInfo: p.BktInfo,
|
||||||
DstBktInfo: dstBktInfo,
|
DstBktInfo: dstBktInfo,
|
||||||
DstObject: reqInfo.ObjectName,
|
DstObject: reqInfo.ObjectName,
|
||||||
SrcSize: objInfo.Size,
|
SrcSize: objInfo.Size,
|
||||||
Header: metadata,
|
Header: metadata,
|
||||||
Encryption: encryptionParams,
|
Encryption: encryptionParams,
|
||||||
|
CopiesNuber: copiesNumber,
|
||||||
}
|
}
|
||||||
|
|
||||||
settings, err := h.obj.GetBucketSettings(r.Context(), dstBktInfo)
|
settings, err := h.obj.GetBucketSettings(r.Context(), dstBktInfo)
|
||||||
|
|
|
@ -148,6 +148,12 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re
|
||||||
p.Header[api.ContentType] = contentType
|
p.Header[api.ContentType] = contentType
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p.CopiesNumber, err = getCopiesNumberOrDefault(p.Header, h.cfg.CopiesNumber)
|
||||||
|
if err != nil {
|
||||||
|
h.logAndSendError(w, "invalid copies number", reqInfo, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if err = h.obj.CreateMultipartUpload(r.Context(), p); err != nil {
|
if err = h.obj.CreateMultipartUpload(r.Context(), p); err != nil {
|
||||||
h.logAndSendError(w, "could create multipart upload", reqInfo, err, additional...)
|
h.logAndSendError(w, "could create multipart upload", reqInfo, err, additional...)
|
||||||
return
|
return
|
||||||
|
@ -215,10 +221,9 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
Bkt: bktInfo,
|
Bkt: bktInfo,
|
||||||
Key: reqInfo.ObjectName,
|
Key: reqInfo.ObjectName,
|
||||||
},
|
},
|
||||||
PartNumber: partNumber,
|
PartNumber: partNumber,
|
||||||
Size: r.ContentLength,
|
Size: r.ContentLength,
|
||||||
Reader: r.Body,
|
Reader: r.Body,
|
||||||
CopiesNumber: h.cfg.CopiesNumber,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
p.Info.Encryption, err = h.formEncryptionParams(r.Header)
|
p.Info.Encryption, err = h.formEncryptionParams(r.Header)
|
||||||
|
@ -316,11 +321,10 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
|
||||||
Bkt: bktInfo,
|
Bkt: bktInfo,
|
||||||
Key: reqInfo.ObjectName,
|
Key: reqInfo.ObjectName,
|
||||||
},
|
},
|
||||||
SrcObjInfo: srcInfo,
|
SrcObjInfo: srcInfo,
|
||||||
SrcBktInfo: srcBktInfo,
|
SrcBktInfo: srcBktInfo,
|
||||||
PartNumber: partNumber,
|
PartNumber: partNumber,
|
||||||
Range: srcRange,
|
Range: srcRange,
|
||||||
CopiesNumber: h.cfg.CopiesNumber,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
p.Info.Encryption, err = h.formEncryptionParams(r.Header)
|
p.Info.Encryption, err = h.formEncryptionParams(r.Header)
|
||||||
|
|
|
@ -212,6 +212,12 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
metadata[api.Expires] = expires
|
metadata[api.Expires] = expires
|
||||||
}
|
}
|
||||||
|
|
||||||
|
copiesNumber, err := getCopiesNumberOrDefault(metadata, h.cfg.CopiesNumber)
|
||||||
|
if err != nil {
|
||||||
|
h.logAndSendError(w, "invalid copies number", reqInfo, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
encryption, err := h.formEncryptionParams(r.Header)
|
encryption, err := h.formEncryptionParams(r.Header)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logAndSendError(w, "invalid sse headers", reqInfo, err)
|
h.logAndSendError(w, "invalid sse headers", reqInfo, err)
|
||||||
|
@ -225,7 +231,7 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
Size: r.ContentLength,
|
Size: r.ContentLength,
|
||||||
Header: metadata,
|
Header: metadata,
|
||||||
Encryption: encryption,
|
Encryption: encryption,
|
||||||
CopiesNumber: h.cfg.CopiesNumber,
|
CopiesNumber: copiesNumber,
|
||||||
}
|
}
|
||||||
|
|
||||||
settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
|
settings, err := h.obj.GetBucketSettings(r.Context(), bktInfo)
|
||||||
|
@ -299,6 +305,20 @@ func (h *handler) PutObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
api.WriteSuccessResponseHeadersOnly(w)
|
api.WriteSuccessResponseHeadersOnly(w)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getCopiesNumberOrDefault(metadata map[string]string, defaultCopiesNumber uint32) (uint32, error) {
|
||||||
|
copiesNumberStr, ok := metadata[layer.AttributeNeofsCopiesNumber]
|
||||||
|
if !ok {
|
||||||
|
return defaultCopiesNumber, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
copiesNumber, err := strconv.ParseUint(copiesNumberStr, 10, 32)
|
||||||
|
if err != nil {
|
||||||
|
return 0, fmt.Errorf("pasrse copies number: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return uint32(copiesNumber), nil
|
||||||
|
}
|
||||||
|
|
||||||
func (h handler) formEncryptionParams(header http.Header) (enc encryption.Params, err error) {
|
func (h handler) formEncryptionParams(header http.Header) (enc encryption.Params, err error) {
|
||||||
sseCustomerAlgorithm := header.Get(api.AmzServerSideEncryptionCustomerAlgorithm)
|
sseCustomerAlgorithm := header.Get(api.AmzServerSideEncryptionCustomerAlgorithm)
|
||||||
sseCustomerKey := header.Get(api.AmzServerSideEncryptionCustomerKey)
|
sseCustomerKey := header.Get(api.AmzServerSideEncryptionCustomerKey)
|
||||||
|
|
|
@ -4,10 +4,12 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"mime/multipart"
|
"mime/multipart"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nspcc-dev/neofs-s3-gw/api"
|
"github.com/nspcc-dev/neofs-s3-gw/api"
|
||||||
|
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -104,3 +106,23 @@ func TestEmptyPostPolicy(t *testing.T) {
|
||||||
_, err := checkPostPolicy(r, reqInfo, metadata)
|
_, err := checkPostPolicy(r, reqInfo, metadata)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestPutObjectOverrideCopiesNumber(t *testing.T) {
|
||||||
|
tc := prepareHandlerContext(t)
|
||||||
|
|
||||||
|
bktName, objName := "bucket-for-copies-number", "object-for-copies-number"
|
||||||
|
bktInfo := createTestBucket(tc.Context(), t, tc, bktName)
|
||||||
|
|
||||||
|
w, r := prepareTestRequest(t, bktName, objName, nil)
|
||||||
|
r.Header.Set(api.MetadataPrefix+strings.ToUpper(layer.AttributeNeofsCopiesNumber), "1")
|
||||||
|
tc.Handler().PutObjectHandler(w, r)
|
||||||
|
|
||||||
|
p := &layer.HeadObjectParams{
|
||||||
|
BktInfo: bktInfo,
|
||||||
|
Object: objName,
|
||||||
|
}
|
||||||
|
|
||||||
|
objInfo, err := tc.Layer().GetObjectInfo(tc.Context(), p)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, "1", objInfo.Headers[layer.AttributeNeofsCopiesNumber])
|
||||||
|
}
|
||||||
|
|
|
@ -141,15 +141,16 @@ type (
|
||||||
|
|
||||||
// CopyObjectParams stores object copy request parameters.
|
// CopyObjectParams stores object copy request parameters.
|
||||||
CopyObjectParams struct {
|
CopyObjectParams struct {
|
||||||
SrcObject *data.ObjectInfo
|
SrcObject *data.ObjectInfo
|
||||||
ScrBktInfo *data.BucketInfo
|
ScrBktInfo *data.BucketInfo
|
||||||
DstBktInfo *data.BucketInfo
|
DstBktInfo *data.BucketInfo
|
||||||
DstObject string
|
DstObject string
|
||||||
SrcSize int64
|
SrcSize int64
|
||||||
Header map[string]string
|
Header map[string]string
|
||||||
Range *RangeParams
|
Range *RangeParams
|
||||||
Lock *data.ObjectLock
|
Lock *data.ObjectLock
|
||||||
Encryption encryption.Params
|
Encryption encryption.Params
|
||||||
|
CopiesNuber uint32
|
||||||
}
|
}
|
||||||
// CreateBucketParams stores bucket create request parameters.
|
// CreateBucketParams stores bucket create request parameters.
|
||||||
CreateBucketParams struct {
|
CreateBucketParams struct {
|
||||||
|
@ -264,6 +265,8 @@ const (
|
||||||
AttributeDecryptedSize = api.NeoFSSystemMetadataPrefix + "Decrypted-Size"
|
AttributeDecryptedSize = api.NeoFSSystemMetadataPrefix + "Decrypted-Size"
|
||||||
AttributeHMACSalt = api.NeoFSSystemMetadataPrefix + "HMAC-Salt"
|
AttributeHMACSalt = api.NeoFSSystemMetadataPrefix + "HMAC-Salt"
|
||||||
AttributeHMACKey = api.NeoFSSystemMetadataPrefix + "HMAC-Key"
|
AttributeHMACKey = api.NeoFSSystemMetadataPrefix + "HMAC-Key"
|
||||||
|
|
||||||
|
AttributeNeofsCopiesNumber = "neofs-copies-number" // such formate to match X-Amz-Meta-Neofs-Copies-Number header
|
||||||
)
|
)
|
||||||
|
|
||||||
func (t *VersionedObject) String() string {
|
func (t *VersionedObject) String() string {
|
||||||
|
@ -519,12 +522,13 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.Obje
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return n.PutObject(ctx, &PutObjectParams{
|
return n.PutObject(ctx, &PutObjectParams{
|
||||||
BktInfo: p.DstBktInfo,
|
BktInfo: p.DstBktInfo,
|
||||||
Object: p.DstObject,
|
Object: p.DstObject,
|
||||||
Size: p.SrcSize,
|
Size: p.SrcSize,
|
||||||
Reader: pr,
|
Reader: pr,
|
||||||
Header: p.Header,
|
Header: p.Header,
|
||||||
Encryption: p.Encryption,
|
Encryption: p.Encryption,
|
||||||
|
CopiesNumber: p.CopiesNuber,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -45,9 +45,10 @@ type (
|
||||||
}
|
}
|
||||||
|
|
||||||
CreateMultipartParams struct {
|
CreateMultipartParams struct {
|
||||||
Info *UploadInfoParams
|
Info *UploadInfoParams
|
||||||
Header map[string]string
|
Header map[string]string
|
||||||
Data *UploadData
|
Data *UploadData
|
||||||
|
CopiesNumber uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
UploadData struct {
|
UploadData struct {
|
||||||
|
@ -56,20 +57,18 @@ type (
|
||||||
}
|
}
|
||||||
|
|
||||||
UploadPartParams struct {
|
UploadPartParams struct {
|
||||||
Info *UploadInfoParams
|
Info *UploadInfoParams
|
||||||
PartNumber int
|
PartNumber int
|
||||||
Size int64
|
Size int64
|
||||||
Reader io.Reader
|
Reader io.Reader
|
||||||
CopiesNumber uint32
|
|
||||||
}
|
}
|
||||||
|
|
||||||
UploadCopyParams struct {
|
UploadCopyParams struct {
|
||||||
Info *UploadInfoParams
|
Info *UploadInfoParams
|
||||||
SrcObjInfo *data.ObjectInfo
|
SrcObjInfo *data.ObjectInfo
|
||||||
SrcBktInfo *data.BucketInfo
|
SrcBktInfo *data.BucketInfo
|
||||||
PartNumber int
|
PartNumber int
|
||||||
Range *RangeParams
|
Range *RangeParams
|
||||||
CopiesNumber uint32
|
|
||||||
}
|
}
|
||||||
|
|
||||||
CompleteMultipartParams struct {
|
CompleteMultipartParams struct {
|
||||||
|
@ -141,11 +140,12 @@ func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar
|
||||||
}
|
}
|
||||||
|
|
||||||
info := &data.MultipartInfo{
|
info := &data.MultipartInfo{
|
||||||
Key: p.Info.Key,
|
Key: p.Info.Key,
|
||||||
UploadID: p.Info.UploadID,
|
UploadID: p.Info.UploadID,
|
||||||
Owner: n.Owner(ctx),
|
Owner: n.Owner(ctx),
|
||||||
Created: time.Now(),
|
Created: time.Now(),
|
||||||
Meta: make(map[string]string, metaSize),
|
Meta: make(map[string]string, metaSize),
|
||||||
|
CopiesNumber: p.CopiesNumber,
|
||||||
}
|
}
|
||||||
|
|
||||||
for key, val := range p.Header {
|
for key, val := range p.Header {
|
||||||
|
@ -205,7 +205,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
|
||||||
Creator: bktInfo.Owner,
|
Creator: bktInfo.Owner,
|
||||||
Attributes: make([][2]string, 2),
|
Attributes: make([][2]string, 2),
|
||||||
Payload: p.Reader,
|
Payload: p.Reader,
|
||||||
CopiesNumber: p.CopiesNumber,
|
CopiesNumber: multipartInfo.CopiesNumber,
|
||||||
}
|
}
|
||||||
|
|
||||||
decSize := p.Size
|
decSize := p.Size
|
||||||
|
@ -301,11 +301,10 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
|
||||||
}()
|
}()
|
||||||
|
|
||||||
params := &UploadPartParams{
|
params := &UploadPartParams{
|
||||||
Info: p.Info,
|
Info: p.Info,
|
||||||
PartNumber: p.PartNumber,
|
PartNumber: p.PartNumber,
|
||||||
Size: size,
|
Size: size,
|
||||||
Reader: pr,
|
Reader: pr,
|
||||||
CopiesNumber: p.CopiesNumber,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return n.uploadPart(ctx, multipartInfo, params)
|
return n.uploadPart(ctx, multipartInfo, params)
|
||||||
|
@ -435,12 +434,13 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
|
||||||
r.prm.bktInfo = p.Info.Bkt
|
r.prm.bktInfo = p.Info.Bkt
|
||||||
|
|
||||||
obj, err := n.PutObject(ctx, &PutObjectParams{
|
obj, err := n.PutObject(ctx, &PutObjectParams{
|
||||||
BktInfo: p.Info.Bkt,
|
BktInfo: p.Info.Bkt,
|
||||||
Object: p.Info.Key,
|
Object: p.Info.Key,
|
||||||
Reader: r,
|
Reader: r,
|
||||||
Header: initMetadata,
|
Header: initMetadata,
|
||||||
Size: multipartObjetSize,
|
Size: multipartObjetSize,
|
||||||
Encryption: p.Info.Encryption,
|
Encryption: p.Info.Encryption,
|
||||||
|
CopiesNumber: multipartInfo.CopiesNumber,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
n.log.Error("could not put a completed object (multipart upload)",
|
n.log.Error("could not put a completed object (multipart upload)",
|
||||||
|
|
Loading…
Reference in a new issue