[#476] Fix parts info for GetObjectAttributes

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-06-01 12:54:27 +03:00 committed by Alex Vanin
parent 9b1ccd39be
commit c8e8ba9f6a
6 changed files with 217 additions and 101 deletions

View file

@ -26,7 +26,9 @@ type (
NextPartNumberMarker int `xml:"NextPartNumberMarker,omitempty"`
PartNumberMarker int `xml:"PartNumberMarker,omitempty"`
Parts []Part `xml:"Part,omitempty"`
PartsCount int `xml:"PartsCount,omitempty"`
// Only this field is used.
PartsCount int `xml:"PartsCount,omitempty"`
}
Part struct {
@ -35,17 +37,13 @@ type (
}
GetObjectAttributesArgs struct {
MaxParts int
PartNumberMarker int
Attributes []string
VersionID string
Conditional *conditionalArgs
Attributes []string
VersionID string
Conditional *conditionalArgs
}
)
const (
partNumberMarkerDefault = -1
eTag = "ETag"
checksum = "Checksum"
objectParts = "ObjectParts"
@ -123,16 +121,11 @@ func writeAttributesHeaders(h http.Header, info *data.ObjectInfo, params *GetObj
}
func parseGetObjectAttributeArgs(r *http.Request) (*GetObjectAttributesArgs, error) {
var (
err error
res = &GetObjectAttributesArgs{}
attributesVal = r.Header.Get("X-Amz-Object-Attributes")
maxPartsVal = r.Header.Get("X-Amz-Max-Parts")
markerVal = r.Header.Get("X-Amz-Part-Number-Marker")
queryValues = r.URL.Query()
)
res := &GetObjectAttributesArgs{
VersionID: r.URL.Query().Get(api.QueryVersionID),
}
attributesVal := r.Header.Get(api.AmzObjectAttributes)
if attributesVal == "" {
return nil, errors.GetAPIError(errors.ErrInvalidAttributeName)
}
@ -145,22 +138,7 @@ func parseGetObjectAttributeArgs(r *http.Request) (*GetObjectAttributesArgs, err
res.Attributes = append(res.Attributes, a)
}
if maxPartsVal == "" {
res.MaxParts = layer.MaxSizePartsList
} else if res.MaxParts, err = strconv.Atoi(maxPartsVal); err != nil || res.MaxParts < 0 {
return nil, errors.GetAPIError(errors.ErrInvalidMaxKeys)
}
if markerVal == "" {
res.PartNumberMarker = partNumberMarkerDefault
} else if res.PartNumberMarker, err = strconv.Atoi(markerVal); err != nil || res.PartNumberMarker < 0 {
return nil, errors.GetAPIError(errors.ErrInvalidPartNumberMarker)
}
res.VersionID = queryValues.Get(api.QueryVersionID)
res.Conditional, err = parseConditionalHeaders(r.Header)
return res, err
return res, nil
}
func encodeToObjectAttributesResponse(info *data.ObjectInfo, p *GetObjectAttributesArgs) (*GetObjectAttributesResponse, error) {
@ -175,7 +153,7 @@ func encodeToObjectAttributesResponse(info *data.ObjectInfo, p *GetObjectAttribu
case objectSize:
resp.ObjectSize = info.Size
case objectParts:
parts, err := formUploadAttributes(info, p.MaxParts, p.PartNumberMarker)
parts, err := formUploadAttributes(info)
if err != nil {
return nil, fmt.Errorf("form upload attributes: %w", err)
}
@ -188,54 +166,19 @@ func encodeToObjectAttributesResponse(info *data.ObjectInfo, p *GetObjectAttribu
return resp, nil
}
func formUploadAttributes(info *data.ObjectInfo, maxParts, marker int) (*ObjectParts, error) {
func formUploadAttributes(info *data.ObjectInfo) (*ObjectParts, error) {
var err error
res := ObjectParts{}
if _, ok := info.Headers[layer.UploadIDAttributeName]; !ok {
partsCountStr, ok := info.Headers[layer.UploadCompletedPartsCount]
if !ok {
return nil, nil
}
parts := make([]Part, 0)
val, ok := info.Headers[layer.UploadCompletedParts]
if ok {
pairs := strings.Split(val, ",")
for _, p := range pairs {
// nums[0] -- part number, nums[1] -- part size
nums := strings.Split(p, "=")
if len(nums) != 2 {
return nil, nil
}
num, err := strconv.Atoi(nums[0])
if err != nil {
return nil, err
}
size, err := strconv.Atoi(nums[1])
if err != nil {
return nil, fmt.Errorf("parse part size: %w", err)
}
parts = append(parts, Part{PartNumber: num, Size: size})
}
res.PartsCount, err = strconv.Atoi(partsCountStr)
if err != nil {
return nil, fmt.Errorf("invalid parts count header '%s': %w", partsCountStr, err)
}
res.PartsCount = len(parts)
if marker != partNumberMarkerDefault {
res.PartNumberMarker = marker
for i, n := range parts {
if n.PartNumber == marker {
parts = parts[i:]
break
}
}
}
res.MaxParts = maxParts
if len(parts) > maxParts {
res.IsTruncated = true
res.NextPartNumberMarker = parts[maxParts].PartNumber
parts = parts[:maxParts]
}
res.Parts = parts
return &res, nil
}

View file

@ -0,0 +1,71 @@
package handler
import (
"bytes"
"context"
"net/http"
"net/url"
"testing"
"github.com/nspcc-dev/neofs-s3-gw/api"
"github.com/nspcc-dev/neofs-s3-gw/api/layer"
"github.com/stretchr/testify/require"
)
func TestGetObjectPartsAttributes(t *testing.T) {
ctx := context.Background()
hc := prepareHandlerContext(t)
bktName := "bucket-get-attributes"
objName, objMultipartName := "object", "object-multipart"
createTestBucket(ctx, t, hc, bktName)
body := bytes.NewReader([]byte("content"))
w, r := prepareTestPayloadRequest(bktName, objName, body)
hc.Handler().PutObjectHandler(w, r)
assertStatus(t, w, http.StatusOK)
w, r = prepareTestRequest(t, bktName, objName, nil)
r.Header.Set(api.AmzObjectAttributes, objectParts)
hc.Handler().GetObjectAttributesHandler(w, r)
result := &GetObjectAttributesResponse{}
parseTestResponse(t, w, result)
require.Nil(t, result.ObjectParts)
w, r = prepareTestRequest(t, bktName, objMultipartName, nil)
hc.Handler().CreateMultipartUploadHandler(w, r)
multipartUpload := &InitiateMultipartUploadResponse{}
parseTestResponse(t, w, multipartUpload)
body2 := bytes.NewReader([]byte("content2"))
w, r = prepareTestPayloadRequest(bktName, objMultipartName, body2)
query := make(url.Values)
query.Add(uploadIDHeaderName, multipartUpload.UploadID)
query.Add(partNumberHeaderName, "1")
r.URL.RawQuery = query.Encode()
hc.Handler().UploadPartHandler(w, r)
assertStatus(t, w, http.StatusOK)
etag := w.Result().Header.Get(api.ETag)
completeUpload := &CompleteMultipartUpload{
Parts: []*layer.CompletedPart{{
ETag: etag,
PartNumber: 1,
}},
}
w, r = prepareTestRequest(t, bktName, objMultipartName, completeUpload)
query = make(url.Values)
query.Add(uploadIDHeaderName, multipartUpload.UploadID)
r.URL.RawQuery = query.Encode()
hc.Handler().CompleteMultipartUploadHandler(w, r)
assertStatus(t, w, http.StatusOK)
w, r = prepareTestRequest(t, bktName, objMultipartName, nil)
r.Header.Set(api.AmzObjectAttributes, objectParts)
hc.Handler().GetObjectAttributesHandler(w, r)
result = &GetObjectAttributesResponse{}
parseTestResponse(t, w, result)
require.NotNil(t, result.ObjectParts)
require.Equal(t, 1, result.ObjectParts.PartsCount)
}

View file

@ -144,10 +144,18 @@ func prepareTestRequest(t *testing.T, bktName, objName string, body interface{})
return w, r
}
func assertStatus(t *testing.T, w *httptest.ResponseRecorder, status int) {
if w.Code != status {
resp, err := io.ReadAll(w.Result().Body)
require.NoError(t, err)
require.Failf(t, string(resp), "assert status fail, expected: %d, actual: %d", status, w.Code)
}
func prepareTestPayloadRequest(bktName, objName string, payload io.Reader) (*httptest.ResponseRecorder, *http.Request) {
w := httptest.NewRecorder()
r := httptest.NewRequest(http.MethodPut, defaultURL, payload)
reqInfo := api.NewReqInfo(w, r, api.ObjectRequest{Bucket: bktName, Object: objName})
r = r.WithContext(api.SetReqInfo(r.Context(), reqInfo))
return w, r
}
func parseTestResponse(t *testing.T, response *httptest.ResponseRecorder, body interface{}) {
assertStatus(t, response, http.StatusOK)
err := xml.NewDecoder(response.Result().Body).Decode(body)
require.NoError(t, err)
}

View file

@ -52,6 +52,7 @@ const (
AmzObjectLockMode = "X-Amz-Object-Lock-Mode"
AmzObjectLockRetainUntilDate = "X-Amz-Object-Lock-Retain-Until-Date"
AmzBypassGovernanceRetention = "X-Amz-Bypass-Governance-Retention"
AmzObjectAttributes = "X-Amz-Object-Attributes"
ContainerID = "X-Container-Id"

View file

@ -22,7 +22,7 @@ import (
const (
UploadIDAttributeName = "S3-Upload-Id"
UploadPartNumberAttributeName = "S3-Upload-Part-Number"
UploadCompletedParts = "S3-Completed-Parts"
UploadCompletedPartsCount = "S3-Completed-Parts-Count"
metaPrefix = "meta-"
aclPrefix = "acl-"
@ -349,7 +349,8 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar
parts = append(parts, partInfo)
}
initMetadata := make(map[string]string, len(multipartInfo.Meta))
initMetadata := make(map[string]string, len(multipartInfo.Meta)+1)
initMetadata[UploadCompletedPartsCount] = strconv.Itoa(len(p.Parts))
uploadData := &UploadData{
TagSet: make(map[string]string),
ACLHeaders: make(map[string]string),

View file

@ -2,6 +2,7 @@ package layer
import (
"context"
"fmt"
"sort"
"strings"
@ -11,10 +12,12 @@ import (
)
type TreeServiceMock struct {
settings map[string]*data.BucketSettings
versions map[string]map[string][]*data.NodeVersion
system map[string]map[string]*data.BaseNodeVersion
locks map[string]map[uint64]*data.LockInfo
settings map[string]*data.BucketSettings
versions map[string]map[string][]*data.NodeVersion
system map[string]map[string]*data.BaseNodeVersion
locks map[string]map[uint64]*data.LockInfo
multiparts map[string]map[string][]*data.MultipartInfo
parts map[string]map[int]*data.PartInfo
}
func (t *TreeServiceMock) GetObjectTagging(ctx context.Context, cnrID *cid.ID, objVersion *data.NodeVersion) (map[string]string, error) {
@ -49,10 +52,12 @@ func (t *TreeServiceMock) DeleteBucketTagging(ctx context.Context, cnrID *cid.ID
func NewTreeService() *TreeServiceMock {
return &TreeServiceMock{
settings: make(map[string]*data.BucketSettings),
versions: make(map[string]map[string][]*data.NodeVersion),
system: make(map[string]map[string]*data.BaseNodeVersion),
locks: make(map[string]map[uint64]*data.LockInfo),
settings: make(map[string]*data.BucketSettings),
versions: make(map[string]map[string][]*data.NodeVersion),
system: make(map[string]map[string]*data.BaseNodeVersion),
locks: make(map[string]map[uint64]*data.LockInfo),
multiparts: make(map[string]map[string][]*data.MultipartInfo),
parts: make(map[string]map[int]*data.PartInfo),
}
}
@ -245,28 +250,115 @@ func (t *TreeServiceMock) GetAllVersionsByPrefix(_ context.Context, cnrID *cid.I
return result, nil
}
func (t *TreeServiceMock) CreateMultipartUpload(ctx context.Context, cnrID *cid.ID, info *data.MultipartInfo) error {
panic("implement me")
func (t *TreeServiceMock) CreateMultipartUpload(_ context.Context, cnrID *cid.ID, info *data.MultipartInfo) error {
cnrMultipartsMap, ok := t.multiparts[cnrID.EncodeToString()]
if !ok {
t.multiparts[cnrID.EncodeToString()] = map[string][]*data.MultipartInfo{
info.Key: {info},
}
return nil
}
multiparts := cnrMultipartsMap[info.Key]
if len(multiparts) != 0 {
info.ID = multiparts[len(multiparts)-1].ID + 1
}
cnrMultipartsMap[info.Key] = append(multiparts, info)
return nil
}
func (t *TreeServiceMock) GetMultipartUploadsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.MultipartInfo, error) {
panic("implement me")
}
func (t *TreeServiceMock) GetMultipartUpload(ctx context.Context, cnrID *cid.ID, objectName, uploadID string) (*data.MultipartInfo, error) {
panic("implement me")
func (t *TreeServiceMock) GetMultipartUpload(_ context.Context, cnrID *cid.ID, objectName, uploadID string) (*data.MultipartInfo, error) {
cnrMultipartsMap, ok := t.multiparts[cnrID.EncodeToString()]
if !ok {
return nil, ErrNodeNotFound
}
multiparts := cnrMultipartsMap[objectName]
for _, multipart := range multiparts {
if multipart.UploadID == uploadID {
return multipart, nil
}
}
return nil, ErrNodeNotFound
}
func (t *TreeServiceMock) AddPart(ctx context.Context, cnrID *cid.ID, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete *oid.ID, err error) {
panic("implement me")
multipartInfo, err := t.GetMultipartUpload(ctx, cnrID, info.Key, info.UploadID)
if err != nil {
return nil, err
}
if multipartInfo.ID != multipartNodeID {
return nil, fmt.Errorf("invalid multipart info id")
}
partsMap, ok := t.parts[info.UploadID]
if !ok {
partsMap = make(map[int]*data.PartInfo)
}
partsMap[info.Number] = info
t.parts[info.UploadID] = partsMap
return nil, nil
}
func (t *TreeServiceMock) GetParts(ctx context.Context, cnrID *cid.ID, multipartNodeID uint64) ([]*data.PartInfo, error) {
panic("implement me")
func (t *TreeServiceMock) GetParts(_ context.Context, cnrID *cid.ID, multipartNodeID uint64) ([]*data.PartInfo, error) {
cnrMultipartsMap := t.multiparts[cnrID.EncodeToString()]
var foundMultipart *data.MultipartInfo
LOOP:
for _, multiparts := range cnrMultipartsMap {
for _, multipart := range multiparts {
if multipart.ID == multipartNodeID {
foundMultipart = multipart
break LOOP
}
}
}
if foundMultipart == nil {
return nil, ErrNodeNotFound
}
partsMap := t.parts[foundMultipart.UploadID]
result := make([]*data.PartInfo, 0, len(partsMap))
for _, part := range partsMap {
result = append(result, part)
}
return result, nil
}
func (t *TreeServiceMock) DeleteMultipartUpload(ctx context.Context, cnrID *cid.ID, multipartNodeID uint64) error {
panic("implement me")
func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, cnrID *cid.ID, multipartNodeID uint64) error {
cnrMultipartsMap := t.multiparts[cnrID.EncodeToString()]
var uploadID string
LOOP:
for key, multiparts := range cnrMultipartsMap {
for i, multipart := range multiparts {
if multipart.ID == multipartNodeID {
uploadID = multipart.UploadID
cnrMultipartsMap[key] = append(multiparts[:i], multiparts[i+1:]...)
break LOOP
}
}
}
if uploadID == "" {
return ErrNodeNotFound
}
delete(t.parts, uploadID)
return nil
}
func (t *TreeServiceMock) PutLock(ctx context.Context, cnrID *cid.ID, nodeID uint64, lock *data.LockInfo) error {