Compare commits
4 commits
master
...
feature/pa
Author | SHA1 | Date | |
---|---|---|---|
8c3679867f | |||
31b0455882 | |||
6daa8b4698 | |||
b2682e49ea |
8 changed files with 454 additions and 0 deletions
164
api/handler/patch.go
Normal file
164
api/handler/patch.go
Normal file
|
@ -0,0 +1,164 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/middleware"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (h *handler) PatchHandler(w http.ResponseWriter, r *http.Request) {
|
||||
var (
|
||||
ctx = r.Context()
|
||||
reqInfo = middleware.GetReqInfo(ctx)
|
||||
)
|
||||
|
||||
byteRange, err := parseByteRange(r.Header.Get(api.ContentRange))
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not parse byte range", reqInfo, errors.GetAPIError(errors.ErrInvalidRange), zap.Error(err))
|
||||
return
|
||||
}
|
||||
|
||||
if uint64(r.ContentLength) != (byteRange.End - byteRange.Start + 1) {
|
||||
h.logAndSendError(w, "content-length must be equal to byte range length", reqInfo, errors.GetAPIError(errors.ErrBadRequest))
|
||||
return
|
||||
}
|
||||
|
||||
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket info", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
settings, err := h.obj.GetBucketSettings(ctx, bktInfo)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get bucket settings", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
if !settings.VersioningEnabled() {
|
||||
h.logAndSendError(w, "could not patch object in unversioned bucket", reqInfo, errors.GetAPIError(errors.ErrBadRequest))
|
||||
return
|
||||
}
|
||||
|
||||
srcObjPrm := &layer.HeadObjectParams{
|
||||
Object: reqInfo.ObjectName,
|
||||
VersionID: reqInfo.URL.Query().Get(api.QueryVersionID),
|
||||
BktInfo: bktInfo,
|
||||
}
|
||||
|
||||
extendedSrcObjInfo, err := h.obj.GetExtendedObjectInfo(ctx, srcObjPrm)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not find object", reqInfo, err)
|
||||
return
|
||||
}
|
||||
srcObjInfo := extendedSrcObjInfo.ObjectInfo
|
||||
|
||||
srcSize, err := layer.GetObjectSize(srcObjInfo)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "failed to get source object size", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
if byteRange.Start > srcSize {
|
||||
h.logAndSendError(w, "start byte is greater than object size", reqInfo, errors.GetAPIError(errors.ErrBadRequest))
|
||||
return
|
||||
}
|
||||
|
||||
if len(srcObjInfo.ContentType) > 0 {
|
||||
srcObjInfo.Headers[api.ContentType] = srcObjInfo.ContentType
|
||||
}
|
||||
metadata := makeCopyMap(srcObjInfo.Headers)
|
||||
filterMetadataMap(metadata)
|
||||
|
||||
var size uint64
|
||||
if r.ContentLength > 0 {
|
||||
size = uint64(r.ContentLength)
|
||||
}
|
||||
|
||||
params := &layer.PatchObjectParams{
|
||||
Object: srcObjInfo,
|
||||
BktInfo: bktInfo,
|
||||
SrcSize: srcSize,
|
||||
Header: metadata,
|
||||
NewBytes: r.Body,
|
||||
NewBytesSize: size,
|
||||
Range: byteRange,
|
||||
}
|
||||
|
||||
params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, reqInfo.Namespace, bktInfo.LocationConstraint)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "invalid copies number", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
extendedObjInfo, err := h.obj.PatchObject(ctx, params)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "couldn't patch object", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set(api.AmzVersionID, extendedObjInfo.ObjectInfo.VersionID())
|
||||
w.Header().Set(api.ETag, data.Quote(extendedObjInfo.ObjectInfo.ETag(h.cfg.MD5Enabled())))
|
||||
|
||||
resp := PatchObjectResult{
|
||||
Object: PatchObject{
|
||||
LastModified: extendedObjInfo.ObjectInfo.Created.UTC().Format(time.RFC3339),
|
||||
ETag: data.Quote(extendedObjInfo.ObjectInfo.ETag(h.cfg.MD5Enabled())),
|
||||
},
|
||||
}
|
||||
|
||||
if err = middleware.EncodeToResponse(w, resp); err != nil {
|
||||
h.logAndSendError(w, "could not encode PatchObjectResult to response", reqInfo, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func parseByteRange(rangeStr string) (*layer.RangeParams, error) {
|
||||
const (
|
||||
prefix = "bytes "
|
||||
suffix = "/*"
|
||||
)
|
||||
|
||||
if rangeStr == "" {
|
||||
return nil, fmt.Errorf("empty range")
|
||||
}
|
||||
if !strings.HasPrefix(rangeStr, prefix) {
|
||||
return nil, fmt.Errorf("unknown unit in range header")
|
||||
}
|
||||
if !strings.HasSuffix(rangeStr, suffix) {
|
||||
return nil, fmt.Errorf("invalid size in range header")
|
||||
}
|
||||
|
||||
parts := strings.Split(strings.TrimSuffix(strings.TrimPrefix(rangeStr, prefix), suffix), "-")
|
||||
if len(parts) != 2 {
|
||||
return nil, fmt.Errorf("invalid range: %s", rangeStr)
|
||||
}
|
||||
|
||||
start, err := strconv.ParseUint(parts[0], 10, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid start byte: %s", parts[0])
|
||||
}
|
||||
|
||||
end, err := strconv.ParseUint(parts[1], 10, 64)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid end byte: %s", parts[1])
|
||||
}
|
||||
|
||||
if start > end {
|
||||
return nil, fmt.Errorf("start byte is greater than end byte")
|
||||
}
|
||||
|
||||
return &layer.RangeParams{
|
||||
Start: start,
|
||||
End: end,
|
||||
}, nil
|
||||
}
|
|
@ -192,6 +192,15 @@ type PostResponse struct {
|
|||
ETag string `xml:"Etag"`
|
||||
}
|
||||
|
||||
type PatchObjectResult struct {
|
||||
Object PatchObject `xml:"Object"`
|
||||
}
|
||||
|
||||
type PatchObject struct {
|
||||
LastModified string `xml:"LastModified"`
|
||||
ETag string `xml:"ETag"`
|
||||
}
|
||||
|
||||
// MarshalXML -- StringMap marshals into XML.
|
||||
func (s StringMap) MarshalXML(e *xml.Encoder, start xml.StartElement) error {
|
||||
tokens := []xml.Token{start}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"crypto/rand"
|
||||
"encoding/json"
|
||||
"encoding/xml"
|
||||
stderrors "errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/url"
|
||||
|
@ -164,6 +165,19 @@ type (
|
|||
DstEncryption encryption.Params
|
||||
CopiesNumbers []uint32
|
||||
}
|
||||
|
||||
PatchObjectParams struct {
|
||||
Object *data.ObjectInfo
|
||||
BktInfo *data.BucketInfo
|
||||
SrcSize uint64
|
||||
Header map[string]string
|
||||
NewBytes io.Reader
|
||||
NewBytesSize uint64
|
||||
Range *RangeParams
|
||||
Encryption encryption.Params
|
||||
CopiesNumbers []uint32
|
||||
}
|
||||
|
||||
// CreateBucketParams stores bucket create request parameters.
|
||||
CreateBucketParams struct {
|
||||
Name string
|
||||
|
@ -269,6 +283,8 @@ type (
|
|||
PutBucketNotificationConfiguration(ctx context.Context, p *PutBucketNotificationConfigurationParams) error
|
||||
GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error)
|
||||
|
||||
PatchObject(ctx context.Context, p *PatchObjectParams) (*data.ExtendedObjectInfo, error)
|
||||
|
||||
// Compound methods for optimizations
|
||||
|
||||
// GetObjectTaggingAndLock unifies GetObjectTagging and GetLock methods in single tree service invocation.
|
||||
|
@ -639,6 +655,61 @@ func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.Exte
|
|||
})
|
||||
}
|
||||
|
||||
func (n *layer) PatchObject(ctx context.Context, p *PatchObjectParams) (*data.ExtendedObjectInfo, error) {
|
||||
objPayload, err := n.GetObject(ctx, &GetObjectParams{
|
||||
ObjectInfo: p.Object,
|
||||
Versioned: true,
|
||||
BucketInfo: p.BktInfo,
|
||||
Encryption: p.Encryption,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get object to patch: %w", err)
|
||||
}
|
||||
|
||||
if p.Range.Start == p.SrcSize {
|
||||
return n.PutObject(ctx, &PutObjectParams{
|
||||
BktInfo: p.BktInfo,
|
||||
Object: p.Object.Name,
|
||||
Size: p.SrcSize + p.NewBytesSize,
|
||||
Reader: io.MultiReader(objPayload, p.NewBytes),
|
||||
Header: p.Header,
|
||||
Encryption: p.Encryption,
|
||||
CopiesNumbers: p.CopiesNumbers,
|
||||
})
|
||||
}
|
||||
|
||||
var size uint64
|
||||
if p.Range.Start == 0 {
|
||||
if p.Range.End >= p.SrcSize-1 {
|
||||
return n.PutObject(ctx, &PutObjectParams{
|
||||
BktInfo: p.BktInfo,
|
||||
Object: p.Object.Name,
|
||||
Size: p.NewBytesSize,
|
||||
Reader: p.NewBytes,
|
||||
Header: p.Header,
|
||||
Encryption: p.Encryption,
|
||||
CopiesNumbers: p.CopiesNumbers,
|
||||
})
|
||||
}
|
||||
|
||||
size = p.SrcSize - 1 - p.Range.End + p.NewBytesSize
|
||||
} else if p.Range.End >= p.SrcSize-1 {
|
||||
size = p.Range.Start + p.NewBytesSize
|
||||
} else {
|
||||
size = p.SrcSize
|
||||
}
|
||||
|
||||
return n.PutObject(ctx, &PutObjectParams{
|
||||
BktInfo: p.BktInfo,
|
||||
Object: p.Object.Name,
|
||||
Size: size,
|
||||
Reader: wrapPatchReader(objPayload, p.NewBytes, p.Range, 64*1024),
|
||||
Header: p.Header,
|
||||
Encryption: p.Encryption,
|
||||
CopiesNumbers: p.CopiesNumbers,
|
||||
})
|
||||
}
|
||||
|
||||
func getRandomOID() (oid.ID, error) {
|
||||
b := [32]byte{}
|
||||
if _, err := rand.Read(b[:]); err != nil {
|
||||
|
@ -875,3 +946,101 @@ func (n *layer) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error {
|
|||
n.cache.DeleteBucket(p.BktInfo)
|
||||
return n.frostFS.DeleteContainer(ctx, p.BktInfo.CID, p.SessionToken)
|
||||
}
|
||||
|
||||
func wrapPatchReader(payload, rngPayload io.Reader, rng *RangeParams, bufSize int) io.Reader {
|
||||
if payload == nil || rngPayload == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
r, w := io.Pipe()
|
||||
go func() {
|
||||
var buf = make([]byte, bufSize)
|
||||
|
||||
if rng.Start == 0 {
|
||||
err := readRange(rngPayload, w, buf)
|
||||
if err != nil {
|
||||
_ = w.CloseWithError(err)
|
||||
return
|
||||
}
|
||||
|
||||
var readSize uint64
|
||||
for {
|
||||
n, err := payload.Read(buf)
|
||||
if err != nil && !stderrors.Is(err, io.EOF) {
|
||||
_ = w.CloseWithError(err)
|
||||
break
|
||||
}
|
||||
readSize += uint64(n)
|
||||
if readSize > rng.End+1 {
|
||||
var start uint64
|
||||
if readSize-rng.End-1 < uint64(n) {
|
||||
start = uint64(n) - (readSize - rng.End - 1)
|
||||
}
|
||||
_, _ = w.Write(buf[start:n])
|
||||
}
|
||||
if stderrors.Is(err, io.EOF) {
|
||||
_ = w.CloseWithError(err)
|
||||
break
|
||||
}
|
||||
}
|
||||
} else {
|
||||
var (
|
||||
readSize uint64
|
||||
readRng bool
|
||||
)
|
||||
|
||||
for {
|
||||
n, err := payload.Read(buf)
|
||||
if err != nil && !stderrors.Is(err, io.EOF) {
|
||||
_ = w.CloseWithError(err)
|
||||
break
|
||||
}
|
||||
readSize += uint64(n)
|
||||
if readSize <= rng.Start {
|
||||
_, _ = w.Write(buf[:n])
|
||||
continue
|
||||
}
|
||||
if readSize-rng.Start < uint64(n) {
|
||||
_, _ = w.Write(buf[:n-int(readSize-rng.Start)])
|
||||
}
|
||||
if !readRng {
|
||||
err = readRange(rngPayload, w, buf)
|
||||
if err != nil {
|
||||
_ = w.CloseWithError(err)
|
||||
break
|
||||
}
|
||||
readRng = true
|
||||
}
|
||||
if readSize > rng.End+1 {
|
||||
var start uint64
|
||||
if readSize-rng.End-1 < uint64(n) {
|
||||
start = uint64(n) - (readSize - rng.End - 1)
|
||||
}
|
||||
_, _ = w.Write(buf[start:n])
|
||||
}
|
||||
if stderrors.Is(err, io.EOF) {
|
||||
_ = w.CloseWithError(err)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
return r
|
||||
}
|
||||
|
||||
func readRange(r io.Reader, w *io.PipeWriter, buf []byte) error {
|
||||
for {
|
||||
n, err := r.Read(buf)
|
||||
if n > 0 {
|
||||
_, _ = w.Write(buf[:n])
|
||||
}
|
||||
if err != nil {
|
||||
if !stderrors.Is(err, io.EOF) {
|
||||
return err
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
102
api/layer/layer_test.go
Normal file
102
api/layer/layer_test.go
Normal file
|
@ -0,0 +1,102 @@
|
|||
package layer
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestWrapPatchReader(t *testing.T) {
|
||||
payload := "abcdefghijklmn"
|
||||
rngPayload := "123"
|
||||
|
||||
for _, tc := range []struct {
|
||||
name string
|
||||
rng *RangeParams
|
||||
bufSize int
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
name: "patch object start, buffer is less than range size",
|
||||
rng: &RangeParams{
|
||||
Start: 0,
|
||||
End: 2,
|
||||
},
|
||||
bufSize: 2,
|
||||
expected: "123defghijklmn",
|
||||
},
|
||||
{
|
||||
name: "patch object start, buffer is equal to range size",
|
||||
rng: &RangeParams{
|
||||
Start: 0,
|
||||
End: 2,
|
||||
},
|
||||
bufSize: 3,
|
||||
expected: "123defghijklmn",
|
||||
},
|
||||
{
|
||||
name: "patch object start, buffer is greater than range size",
|
||||
rng: &RangeParams{
|
||||
Start: 0,
|
||||
End: 2,
|
||||
},
|
||||
bufSize: 4,
|
||||
expected: "123defghijklmn",
|
||||
},
|
||||
{
|
||||
name: "patch object middle, range at the beginning of buffer",
|
||||
rng: &RangeParams{
|
||||
Start: 5,
|
||||
End: 7,
|
||||
},
|
||||
bufSize: 5,
|
||||
expected: "abcde123ijklmn",
|
||||
},
|
||||
{
|
||||
name: "patch object middle, range in the middle of buffer",
|
||||
rng: &RangeParams{
|
||||
Start: 6,
|
||||
End: 8,
|
||||
},
|
||||
bufSize: 5,
|
||||
expected: "abcdef123jklmn",
|
||||
},
|
||||
{
|
||||
name: "patch object middle, range in the end of buffer",
|
||||
rng: &RangeParams{
|
||||
Start: 7,
|
||||
End: 9,
|
||||
},
|
||||
bufSize: 5,
|
||||
expected: "abcdefg123klmn",
|
||||
},
|
||||
{
|
||||
name: "patch object end, increase size",
|
||||
rng: &RangeParams{
|
||||
Start: 12,
|
||||
End: 14,
|
||||
},
|
||||
bufSize: 4,
|
||||
expected: "abcdefghijkl123",
|
||||
},
|
||||
{
|
||||
name: "patch object end",
|
||||
rng: &RangeParams{
|
||||
Start: 11,
|
||||
End: 13,
|
||||
},
|
||||
bufSize: 4,
|
||||
expected: "abcdefghijk123",
|
||||
},
|
||||
} {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
wrappedReader := wrapPatchReader(bytes.NewBufferString(payload), bytes.NewBufferString(rngPayload), tc.rng, tc.bufSize)
|
||||
|
||||
res, err := io.ReadAll(wrappedReader)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expected, string(res))
|
||||
})
|
||||
}
|
||||
}
|
|
@ -74,6 +74,7 @@ const (
|
|||
AbortMultipartUploadOperation = "AbortMultipartUpload"
|
||||
DeleteObjectTaggingOperation = "DeleteObjectTagging"
|
||||
DeleteObjectOperation = "DeleteObject"
|
||||
PatchObjectOperation = "PatchObject"
|
||||
)
|
||||
|
||||
const (
|
||||
|
|
|
@ -358,6 +358,8 @@ func determineObjectOperation(r *http.Request) string {
|
|||
switch r.Method {
|
||||
case http.MethodOptions:
|
||||
return OptionsObjectOperation
|
||||
case http.MethodPatch:
|
||||
return PatchObjectOperation
|
||||
case http.MethodHead:
|
||||
return HeadObjectOperation
|
||||
case http.MethodGet:
|
||||
|
|
|
@ -87,6 +87,7 @@ type (
|
|||
AbortMultipartUploadHandler(http.ResponseWriter, *http.Request)
|
||||
ListPartsHandler(w http.ResponseWriter, r *http.Request)
|
||||
ListMultipartUploadsHandler(http.ResponseWriter, *http.Request)
|
||||
PatchHandler(http.ResponseWriter, *http.Request)
|
||||
|
||||
ResolveBucket(ctx context.Context, bucket string) (*data.BucketInfo, error)
|
||||
ResolveCID(ctx context.Context, bucket string) (cid.ID, error)
|
||||
|
@ -376,6 +377,8 @@ func objectRouter(h Handler, l *zap.Logger) chi.Router {
|
|||
|
||||
objRouter.Head("/*", named(s3middleware.HeadObjectOperation, h.HeadObjectHandler))
|
||||
|
||||
objRouter.Patch("/*", named(s3middleware.PatchObjectOperation, h.PatchHandler))
|
||||
|
||||
// GET method handlers
|
||||
objRouter.Group(func(r chi.Router) {
|
||||
r.Method(http.MethodGet, "/*", NewHandlerFilter().
|
||||
|
|
|
@ -540,6 +540,10 @@ func (h *handlerMock) ListMultipartUploadsHandler(w http.ResponseWriter, r *http
|
|||
h.writeResponse(w, res)
|
||||
}
|
||||
|
||||
func (h *handlerMock) PatchHandler(http.ResponseWriter, *http.Request) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (h *handlerMock) ResolveBucket(ctx context.Context, name string) (*data.BucketInfo, error) {
|
||||
reqInfo := middleware.GetReqInfo(ctx)
|
||||
bktInfo, ok := h.buckets[reqInfo.Namespace+name]
|
||||
|
|
Loading…
Reference in a new issue