Compare commits

...

4 commits

Author SHA1 Message Date
8c3679867f Add PATCH method response
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
2024-07-01 12:05:42 +03:00
31b0455882 Use one get object in PATCH
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
2024-07-01 12:05:42 +03:00
6daa8b4698 Use Content-Range in PATCH
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
2024-07-01 12:05:36 +03:00
b2682e49ea Add PATCH method for object
Signed-off-by: Marina Biryukova <m.biryukova@yadro.com>
2024-06-11 12:23:44 +03:00
8 changed files with 454 additions and 0 deletions

164
api/handler/patch.go Normal file
View file

@ -0,0 +1,164 @@
package handler
import (
"fmt"
"net/http"
"strconv"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
"go.uber.org/zap"
)
func (h *handler) PatchHandler(w http.ResponseWriter, r *http.Request) {
var (
ctx = r.Context()
reqInfo = middleware.GetReqInfo(ctx)
)
byteRange, err := parseByteRange(r.Header.Get(api.ContentRange))
if err != nil {
h.logAndSendError(w, "could not parse byte range", reqInfo, errors.GetAPIError(errors.ErrInvalidRange), zap.Error(err))
return
}
if uint64(r.ContentLength) != (byteRange.End - byteRange.Start + 1) {
h.logAndSendError(w, "content-length must be equal to byte range length", reqInfo, errors.GetAPIError(errors.ErrBadRequest))
return
}
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
if err != nil {
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
return
}
settings, err := h.obj.GetBucketSettings(ctx, bktInfo)
if err != nil {
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
return
}
if !settings.VersioningEnabled() {
h.logAndSendError(w, "could not patch object in unversioned bucket", reqInfo, errors.GetAPIError(errors.ErrBadRequest))
return
}
srcObjPrm := &layer.HeadObjectParams{
Object: reqInfo.ObjectName,
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
BktInfo: bktInfo,
}
extendedSrcObjInfo, err := h.obj.GetExtendedObjectInfo(ctx, srcObjPrm)
if err != nil {
h.logAndSendError(w, "could not find object", reqInfo, err)
return
}
srcObjInfo := extendedSrcObjInfo.ObjectInfo
srcSize, err := layer.GetObjectSize(srcObjInfo)
if err != nil {
h.logAndSendError(w, "failed to get source object size", reqInfo, err)
return
}
if byteRange.Start > srcSize {
h.logAndSendError(w, "start byte is greater than object size", reqInfo, errors.GetAPIError(errors.ErrBadRequest))
return
}
if len(srcObjInfo.ContentType) > 0 {
srcObjInfo.Headers[api.ContentType] = srcObjInfo.ContentType
}
metadata := makeCopyMap(srcObjInfo.Headers)
filterMetadataMap(metadata)
var size uint64
if r.ContentLength > 0 {
size = uint64(r.ContentLength)
}
params := &layer.PatchObjectParams{
Object: srcObjInfo,
BktInfo: bktInfo,
SrcSize: srcSize,
Header: metadata,
NewBytes: r.Body,
NewBytesSize: size,
Range: byteRange,
}
params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, reqInfo.Namespace, bktInfo.LocationConstraint)
if err != nil {
h.logAndSendError(w, "invalid copies number", reqInfo, err)
return
}
extendedObjInfo, err := h.obj.PatchObject(ctx, params)
if err != nil {
h.logAndSendError(w, "couldn't patch object", reqInfo, err)
return
}
w.Header().Set(api.AmzVersionID, extendedObjInfo.ObjectInfo.VersionID())
w.Header().Set(api.ETag, data.Quote(extendedObjInfo.ObjectInfo.ETag(h.cfg.MD5Enabled())))
resp := PatchObjectResult{
Object: PatchObject{
LastModified: extendedObjInfo.ObjectInfo.Created.UTC().Format(time.RFC3339),
ETag: data.Quote(extendedObjInfo.ObjectInfo.ETag(h.cfg.MD5Enabled())),
},
}
if err = middleware.EncodeToResponse(w, resp); err != nil {
h.logAndSendError(w, "could not encode PatchObjectResult to response", reqInfo, err)
return
}
}
func parseByteRange(rangeStr string) (*layer.RangeParams, error) {
const (
prefix = "bytes "
suffix = "/*"
)
if rangeStr == "" {
return nil, fmt.Errorf("empty range")
}
if !strings.HasPrefix(rangeStr, prefix) {
return nil, fmt.Errorf("unknown unit in range header")
}
if !strings.HasSuffix(rangeStr, suffix) {
return nil, fmt.Errorf("invalid size in range header")
}
parts := strings.Split(strings.TrimSuffix(strings.TrimPrefix(rangeStr, prefix), suffix), "-")
if len(parts) != 2 {
return nil, fmt.Errorf("invalid range: %s", rangeStr)
}
start, err := strconv.ParseUint(parts[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid start byte: %s", parts[0])
}
end, err := strconv.ParseUint(parts[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid end byte: %s", parts[1])
}
if start > end {
return nil, fmt.Errorf("start byte is greater than end byte")
}
return &layer.RangeParams{
Start: start,
End: end,
}, nil
}

View file

@ -192,6 +192,15 @@ type PostResponse struct {
ETag string `xml:"Etag"`
}
type PatchObjectResult struct {
Object PatchObject `xml:"Object"`
}
type PatchObject struct {
LastModified string `xml:"LastModified"`
ETag string `xml:"ETag"`
}
// MarshalXML -- StringMap marshals into XML.
func (s StringMap) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
tokens := []xml.Token{start}

View file

@ -6,6 +6,7 @@ import (
"crypto/rand"
"encoding/json"
"encoding/xml"
stderrors "errors"
"fmt"
"io"
"net/url"
@ -164,6 +165,19 @@ type (
DstEncryption encryption.Params
CopiesNumbers []uint32
}
PatchObjectParams struct {
Object *data.ObjectInfo
BktInfo *data.BucketInfo
SrcSize uint64
Header map[string]string
NewBytes io.Reader
NewBytesSize uint64
Range *RangeParams
Encryption encryption.Params
CopiesNumbers []uint32
}
// CreateBucketParams stores bucket create request parameters.
CreateBucketParams struct {
Name string
@ -269,6 +283,8 @@ type (
PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error
GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error)
PatchObject(ctx context.Context, p *PatchObjectParams) (*data.ExtendedObjectInfo, error)
// Compound methods for optimizations
// GetObjectTaggingAndLock unifies GetObjectTagging and GetLock methods in single tree service invocation.
@ -639,6 +655,61 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.Exte
})
}
func (n *layer) PatchObject(ctx context.Context, p *PatchObjectParams) (*data.ExtendedObjectInfo, error) {
objPayload, err := n.GetObject(ctx, &GetObjectParams{
ObjectInfo: p.Object,
Versioned: true,
BucketInfo: p.BktInfo,
Encryption: p.Encryption,
})
if err != nil {
return nil, fmt.Errorf("get object to patch: %w", err)
}
if p.Range.Start == p.SrcSize {
return n.PutObject(ctx, &PutObjectParams{
BktInfo: p.BktInfo,
Object: p.Object.Name,
Size: p.SrcSize + p.NewBytesSize,
Reader: io.MultiReader(objPayload, p.NewBytes),
Header: p.Header,
Encryption: p.Encryption,
CopiesNumbers: p.CopiesNumbers,
})
}
var size uint64
if p.Range.Start == 0 {
if p.Range.End >= p.SrcSize-1 {
return n.PutObject(ctx, &PutObjectParams{
BktInfo: p.BktInfo,
Object: p.Object.Name,
Size: p.NewBytesSize,
Reader: p.NewBytes,
Header: p.Header,
Encryption: p.Encryption,
CopiesNumbers: p.CopiesNumbers,
})
}
size = p.SrcSize - 1 - p.Range.End + p.NewBytesSize
} else if p.Range.End >= p.SrcSize-1 {
size = p.Range.Start + p.NewBytesSize
} else {
size = p.SrcSize
}
return n.PutObject(ctx, &PutObjectParams{
BktInfo: p.BktInfo,
Object: p.Object.Name,
Size: size,
Reader: wrapPatchReader(objPayload, p.NewBytes, p.Range, 64*1024),
Header: p.Header,
Encryption: p.Encryption,
CopiesNumbers: p.CopiesNumbers,
})
}
func getRandomOID() (oid.ID, error) {
b := [32]byte{}
if _, err := rand.Read(b[:]); err != nil {
@ -875,3 +946,101 @@ func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
n.cache.DeleteBucket(p.BktInfo)
return n.frostFS.DeleteContainer(ctx, p.BktInfo.CID, p.SessionToken)
}
func wrapPatchReader(payload, rngPayload io.Reader, rng *RangeParams, bufSize int) io.Reader {
if payload == nil || rngPayload == nil {
return nil
}
r, w := io.Pipe()
go func() {
var buf = make([]byte, bufSize)
if rng.Start == 0 {
err := readRange(rngPayload, w, buf)
if err != nil {
_ = w.CloseWithError(err)
return
}
var readSize uint64
for {
n, err := payload.Read(buf)
if err != nil && !stderrors.Is(err, io.EOF) {
_ = w.CloseWithError(err)
break
}
readSize += uint64(n)
if readSize > rng.End+1 {
var start uint64
if readSize-rng.End-1 < uint64(n) {
start = uint64(n) - (readSize - rng.End - 1)
}
_, _ = w.Write(buf[start:n])
}
if stderrors.Is(err, io.EOF) {
_ = w.CloseWithError(err)
break
}
}
} else {
var (
readSize uint64
readRng bool
)
for {
n, err := payload.Read(buf)
if err != nil && !stderrors.Is(err, io.EOF) {
_ = w.CloseWithError(err)
break
}
readSize += uint64(n)
if readSize <= rng.Start {
_, _ = w.Write(buf[:n])
continue
}
if readSize-rng.Start < uint64(n) {
_, _ = w.Write(buf[:n-int(readSize-rng.Start)])
}
if !readRng {
err = readRange(rngPayload, w, buf)
if err != nil {
_ = w.CloseWithError(err)
break
}
readRng = true
}
if readSize > rng.End+1 {
var start uint64
if readSize-rng.End-1 < uint64(n) {
start = uint64(n) - (readSize - rng.End - 1)
}
_, _ = w.Write(buf[start:n])
}
if stderrors.Is(err, io.EOF) {
_ = w.CloseWithError(err)
break
}
}
}
}()
return r
}
func readRange(r io.Reader, w *io.PipeWriter, buf []byte) error {
for {
n, err := r.Read(buf)
if n > 0 {
_, _ = w.Write(buf[:n])
}
if err != nil {
if !stderrors.Is(err, io.EOF) {
return err
}
break
}
}
return nil
}

102
api/layer/layer_test.go Normal file
View file

@ -0,0 +1,102 @@
package layer
import (
"bytes"
"io"
"testing"
"github.com/stretchr/testify/require"
)
func TestWrapPatchReader(t *testing.T) {
payload := "abcdefghijklmn"
rngPayload := "123"
for _, tc := range []struct {
name string
rng *RangeParams
bufSize int
expected string
}{
{
name: "patch object start, buffer is less than range size",
rng: &RangeParams{
Start: 0,
End: 2,
},
bufSize: 2,
expected: "123defghijklmn",
},
{
name: "patch object start, buffer is equal to range size",
rng: &RangeParams{
Start: 0,
End: 2,
},
bufSize: 3,
expected: "123defghijklmn",
},
{
name: "patch object start, buffer is greater than range size",
rng: &RangeParams{
Start: 0,
End: 2,
},
bufSize: 4,
expected: "123defghijklmn",
},
{
name: "patch object middle, range at the beginning of buffer",
rng: &RangeParams{
Start: 5,
End: 7,
},
bufSize: 5,
expected: "abcde123ijklmn",
},
{
name: "patch object middle, range in the middle of buffer",
rng: &RangeParams{
Start: 6,
End: 8,
},
bufSize: 5,
expected: "abcdef123jklmn",
},
{
name: "patch object middle, range in the end of buffer",
rng: &RangeParams{
Start: 7,
End: 9,
},
bufSize: 5,
expected: "abcdefg123klmn",
},
{
name: "patch object end, increase size",
rng: &RangeParams{
Start: 12,
End: 14,
},
bufSize: 4,
expected: "abcdefghijkl123",
},
{
name: "patch object end",
rng: &RangeParams{
Start: 11,
End: 13,
},
bufSize: 4,
expected: "abcdefghijk123",
},
} {
t.Run(tc.name, func(t *testing.T) {
wrappedReader := wrapPatchReader(bytes.NewBufferString(payload), bytes.NewBufferString(rngPayload), tc.rng, tc.bufSize)
res, err := io.ReadAll(wrappedReader)
require.NoError(t, err)
require.Equal(t, tc.expected, string(res))
})
}
}

View file

@ -74,6 +74,7 @@ const (
AbortMultipartUploadOperation = "AbortMultipartUpload"
DeleteObjectTaggingOperation = "DeleteObjectTagging"
DeleteObjectOperation = "DeleteObject"
PatchObjectOperation = "PatchObject"
)
const (

View file

@ -358,6 +358,8 @@ func determineObjectOperation(r *http.Request) string {
switch r.Method {
case http.MethodOptions:
return OptionsObjectOperation
case http.MethodPatch:
return PatchObjectOperation
case http.MethodHead:
return HeadObjectOperation
case http.MethodGet:

View file

@ -87,6 +87,7 @@ type (
AbortMultipartUploadHandler(http.ResponseWriter, *http.Request)
ListPartsHandler(w http.ResponseWriter, r *http.Request)
ListMultipartUploadsHandler(http.ResponseWriter, *http.Request)
PatchHandler(http.ResponseWriter, *http.Request)
ResolveBucket(ctx context.Context, bucket string) (*data.BucketInfo, error)
ResolveCID(ctx context.Context, bucket string) (cid.ID, error)
@ -376,6 +377,8 @@ func objectRouter(h Handler, l *zap.Logger) chi.Router {
objRouter.Head("/*", named(s3middleware.HeadObjectOperation, h.HeadObjectHandler))
objRouter.Patch("/*", named(s3middleware.PatchObjectOperation, h.PatchHandler))
// GET method handlers
objRouter.Group(func(r chi.Router) {
r.Method(http.MethodGet, "/*", NewHandlerFilter().

View file

@ -540,6 +540,10 @@ func (h *handlerMock) ListMultipartUploadsHandler(w http.ResponseWriter, r *http
h.writeResponse(w, res)
}
func (h *handlerMock) PatchHandler(http.ResponseWriter, *http.Request) {
panic("implement me")
}
func (h *handlerMock) ResolveBucket(ctx context.Context, name string) (*data.BucketInfo, error) {
reqInfo := middleware.GetReqInfo(ctx)
bktInfo, ok := h.buckets[reqInfo.Namespace+name]