[#158] Separate init object reader from read itself #164
10 changed files with 182 additions and 84 deletions
|
@ -12,6 +12,7 @@ This document outlines major changes between releases.
|
||||||
- Handle negative `Content-Length` on put (#125)
|
- Handle negative `Content-Length` on put (#125)
|
||||||
- Use `DisableURIPathEscaping` to presign urls (#125)
|
- Use `DisableURIPathEscaping` to presign urls (#125)
|
||||||
- Use specific s3 errors instead of `InternalError` where possible (#143)
|
- Use specific s3 errors instead of `InternalError` where possible (#143)
|
||||||
|
- Return appropriate 404 code when object missed in storage but there is in gate cache (#158)
|
||||||
|
|
||||||
### Added
|
### Added
|
||||||
- Implement chunk uploading (#106)
|
- Implement chunk uploading (#106)
|
||||||
|
|
|
@ -170,13 +170,14 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
params := &layer.CopyObjectParams{
|
params := &layer.CopyObjectParams{
|
||||||
SrcObject: srcObjInfo,
|
SrcVersioned: srcObjPrm.Versioned(),
|
||||||
ScrBktInfo: srcObjPrm.BktInfo,
|
SrcObject: srcObjInfo,
|
||||||
DstBktInfo: dstBktInfo,
|
ScrBktInfo: srcObjPrm.BktInfo,
|
||||||
DstObject: reqInfo.ObjectName,
|
DstBktInfo: dstBktInfo,
|
||||||
SrcSize: srcObjInfo.Size,
|
DstObject: reqInfo.ObjectName,
|
||||||
Header: metadata,
|
SrcSize: srcObjInfo.Size,
|
||||||
Encryption: encryptionParams,
|
Header: metadata,
|
||||||
|
Encryption: encryptionParams,
|
||||||
}
|
}
|
||||||
|
|
||||||
params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, dstBktInfo.LocationConstraint)
|
params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, dstBktInfo.LocationConstraint)
|
||||||
|
|
|
@ -202,6 +202,20 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
getPayloadParams := &layer.GetObjectParams{
|
||||||
|
ObjectInfo: info,
|
||||||
|
Versioned: p.Versioned(),
|
||||||
|
Range: params,
|
||||||
|
BucketInfo: bktInfo,
|
||||||
|
Encryption: encryptionParams,
|
||||||
|
}
|
||||||
|
|
||||||
|
objPayload, err := h.obj.GetObject(r.Context(), getPayloadParams)
|
||||||
|
if err != nil {
|
||||||
|
h.logAndSendError(w, "could not get object payload", reqInfo, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned())
|
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned())
|
||||||
if params != nil {
|
if params != nil {
|
||||||
writeRangeHeaders(w, params, info.Size)
|
writeRangeHeaders(w, params, info.Size)
|
||||||
|
@ -209,15 +223,9 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
w.WriteHeader(http.StatusOK)
|
w.WriteHeader(http.StatusOK)
|
||||||
}
|
}
|
||||||
|
|
||||||
getParams := &layer.GetObjectParams{
|
if err = objPayload.StreamTo(w); err != nil {
|
||||||
ObjectInfo: info,
|
h.logAndSendError(w, "could not stream object payload", reqInfo, err)
|
||||||
Writer: w,
|
return
|
||||||
Range: params,
|
|
||||||
BucketInfo: bktInfo,
|
|
||||||
Encryption: encryptionParams,
|
|
||||||
}
|
|
||||||
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
|
|
||||||
h.logAndSendError(w, "could not get object", reqInfo, err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,12 +5,18 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/http/httptest"
|
||||||
|
"net/url"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||||
|
apiErrors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||||
|
s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -170,6 +176,24 @@ func TestGetRange(t *testing.T) {
|
||||||
require.Equal(t, "bcdef", string(end))
|
require.Equal(t, "bcdef", string(end))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGetObject(t *testing.T) {
|
||||||
|
hc := prepareHandlerContext(t)
|
||||||
|
bktName, objName := "bucket", "obj"
|
||||||
|
bktInfo, objInfo := createVersionedBucketAndObject(hc.t, hc, bktName, objName)
|
||||||
|
|
||||||
|
putObject(hc.t, hc, bktName, objName)
|
||||||
|
|
||||||
|
checkFound(hc.t, hc, bktName, objName, objInfo.VersionID())
|
||||||
|
checkFound(hc.t, hc, bktName, objName, emptyVersion)
|
||||||
|
|
||||||
|
addr := getAddressOfLastVersion(hc, bktInfo, objName)
|
||||||
|
hc.tp.SetObjectError(addr, apistatus.ObjectNotFound{})
|
||||||
|
hc.tp.SetObjectError(objInfo.Address(), apistatus.ObjectNotFound{})
|
||||||
|
|
||||||
|
getObjectAssertS3Error(hc, bktName, objName, objInfo.VersionID(), s3errors.ErrNoSuchVersion)
|
||||||
|
getObjectAssertS3Error(hc, bktName, objName, emptyVersion, s3errors.ErrNoSuchKey)
|
||||||
|
}
|
||||||
|
|
||||||
func putObjectContent(hc *handlerContext, bktName, objName, content string) {
|
func putObjectContent(hc *handlerContext, bktName, objName, content string) {
|
||||||
body := bytes.NewReader([]byte(content))
|
body := bytes.NewReader([]byte(content))
|
||||||
w, r := prepareTestPayloadRequest(hc, bktName, objName, body)
|
w, r := prepareTestPayloadRequest(hc, bktName, objName, body)
|
||||||
|
@ -186,3 +210,17 @@ func getObjectRange(t *testing.T, tc *handlerContext, bktName, objName string, s
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
return content
|
return content
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func getObjectAssertS3Error(hc *handlerContext, bktName, objName, version string, code apiErrors.ErrorCode) {
|
||||||
|
w := getObjectBase(hc, bktName, objName, version)
|
||||||
|
assertS3Error(hc.t, w, apiErrors.GetAPIError(code))
|
||||||
|
}
|
||||||
|
|
||||||
|
func getObjectBase(hc *handlerContext, bktName, objName, version string) *httptest.ResponseRecorder {
|
||||||
|
query := make(url.Values)
|
||||||
|
query.Add(api.QueryVersionID, version)
|
||||||
|
|
||||||
|
w, r := prepareTestFullRequest(hc, bktName, objName, query, nil)
|
||||||
|
hc.Handler().GetObjectHandler(w, r)
|
||||||
|
return w
|
||||||
|
}
|
||||||
|
|
|
@ -272,10 +272,16 @@ func existInMockedFrostFS(tc *handlerContext, bktInfo *data.BucketInfo, objInfo
|
||||||
p := &layer.GetObjectParams{
|
p := &layer.GetObjectParams{
|
||||||
BucketInfo: bktInfo,
|
BucketInfo: bktInfo,
|
||||||
ObjectInfo: objInfo,
|
ObjectInfo: objInfo,
|
||||||
Writer: io.Discard,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return tc.Layer().GetObject(tc.Context(), p) == nil
|
objPayload, err := tc.Layer().GetObject(tc.Context(), p)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = io.ReadAll(objPayload)
|
||||||
|
require.NoError(tc.t, err)
|
||||||
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func listOIDsFromMockedFrostFS(t *testing.T, tc *handlerContext, bktName string) []oid.ID {
|
func listOIDsFromMockedFrostFS(t *testing.T, tc *handlerContext, bktName string) []oid.ID {
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
package handler
|
package handler
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||||
|
@ -84,18 +84,26 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
|
||||||
if len(info.ContentType) == 0 {
|
if len(info.ContentType) == 0 {
|
||||||
if info.ContentType = layer.MimeByFilePath(info.Name); len(info.ContentType) == 0 {
|
if info.ContentType = layer.MimeByFilePath(info.Name); len(info.ContentType) == 0 {
|
||||||
buffer := bytes.NewBuffer(make([]byte, 0, sizeToDetectType))
|
|
||||||
getParams := &layer.GetObjectParams{
|
getParams := &layer.GetObjectParams{
|
||||||
ObjectInfo: info,
|
ObjectInfo: info,
|
||||||
Writer: buffer,
|
Versioned: p.Versioned(),
|
||||||
Range: getRangeToDetectContentType(info.Size),
|
Range: getRangeToDetectContentType(info.Size),
|
||||||
BucketInfo: bktInfo,
|
BucketInfo: bktInfo,
|
||||||
}
|
}
|
||||||
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
|
|
||||||
|
objPayload, err := h.obj.GetObject(r.Context(), getParams)
|
||||||
|
if err != nil {
|
||||||
h.logAndSendError(w, "could not get object", reqInfo, err, zap.Stringer("oid", info.ID))
|
h.logAndSendError(w, "could not get object", reqInfo, err, zap.Stringer("oid", info.ID))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
info.ContentType = http.DetectContentType(buffer.Bytes())
|
|
||||||
|
buffer, err := io.ReadAll(objPayload)
|
||||||
|
if err != nil {
|
||||||
|
h.logAndSendError(w, "could not partly read payload to detect content type", reqInfo, err, zap.Stringer("oid", info.ID))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
info.ContentType = http.DetectContentType(buffer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -320,11 +320,13 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
srcInfo, err := h.obj.GetObjectInfo(ctx, &layer.HeadObjectParams{
|
headPrm := &layer.HeadObjectParams{
|
||||||
BktInfo: srcBktInfo,
|
BktInfo: srcBktInfo,
|
||||||
Object: srcObject,
|
Object: srcObject,
|
||||||
VersionID: versionID,
|
VersionID: versionID,
|
||||||
})
|
}
|
||||||
|
|
||||||
|
srcInfo, err := h.obj.GetObjectInfo(ctx, headPrm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.IsS3Error(err, errors.ErrNoSuchKey) && versionID != "" {
|
if errors.IsS3Error(err, errors.ErrNoSuchKey) && versionID != "" {
|
||||||
h.logAndSendError(w, "could not head source object version", reqInfo,
|
h.logAndSendError(w, "could not head source object version", reqInfo,
|
||||||
|
@ -349,6 +351,7 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
|
||||||
}
|
}
|
||||||
|
|
||||||
p := &layer.UploadCopyParams{
|
p := &layer.UploadCopyParams{
|
||||||
|
Versioned: headPrm.Versioned(),
|
||||||
Info: &layer.UploadInfoParams{
|
Info: &layer.UploadInfoParams{
|
||||||
UploadID: uploadID,
|
UploadID: uploadID,
|
||||||
Bkt: bktInfo,
|
Bkt: bktInfo,
|
||||||
|
|
|
@ -74,7 +74,7 @@ type (
|
||||||
Range *RangeParams
|
Range *RangeParams
|
||||||
ObjectInfo *data.ObjectInfo
|
ObjectInfo *data.ObjectInfo
|
||||||
BucketInfo *data.BucketInfo
|
BucketInfo *data.BucketInfo
|
||||||
Writer io.Writer
|
Versioned bool
|
||||||
Encryption encryption.Params
|
Encryption encryption.Params
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -132,6 +132,7 @@ type (
|
||||||
|
|
||||||
// CopyObjectParams stores object copy request parameters.
|
// CopyObjectParams stores object copy request parameters.
|
||||||
CopyObjectParams struct {
|
CopyObjectParams struct {
|
||||||
|
SrcVersioned bool
|
||||||
SrcObject *data.ObjectInfo
|
SrcObject *data.ObjectInfo
|
||||||
ScrBktInfo *data.BucketInfo
|
ScrBktInfo *data.BucketInfo
|
||||||
DstBktInfo *data.BucketInfo
|
DstBktInfo *data.BucketInfo
|
||||||
|
@ -185,6 +186,13 @@ type (
|
||||||
Error error
|
Error error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ObjectPayload struct {
|
||||||
|
r io.Reader
|
||||||
|
params getParams
|
||||||
|
encrypted bool
|
||||||
|
decryptedLen uint64
|
||||||
|
}
|
||||||
|
|
||||||
// Client provides S3 API client interface.
|
// Client provides S3 API client interface.
|
||||||
Client interface {
|
Client interface {
|
||||||
Initialize(ctx context.Context, c EventListener) error
|
Initialize(ctx context.Context, c EventListener) error
|
||||||
|
@ -204,7 +212,7 @@ type (
|
||||||
CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error)
|
CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error)
|
||||||
DeleteBucket(ctx context.Context, p *DeleteBucketParams) error
|
DeleteBucket(ctx context.Context, p *DeleteBucketParams) error
|
||||||
|
|
||||||
GetObject(ctx context.Context, p *GetObjectParams) error
|
GetObject(ctx context.Context, p *GetObjectParams) (*ObjectPayload, error)
|
||||||
GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ObjectInfo, error)
|
GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ObjectInfo, error)
|
||||||
GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error)
|
GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error)
|
||||||
|
|
||||||
|
@ -268,6 +276,10 @@ func (f MsgHandlerFunc) HandleMessage(ctx context.Context, msg *nats.Msg) error
|
||||||
return f(ctx, msg)
|
return f(ctx, msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (p HeadObjectParams) Versioned() bool {
|
||||||
|
return len(p.VersionID) > 0
|
||||||
|
}
|
||||||
|
|
||||||
// NewLayer creates an instance of a layer. It checks credentials
|
// NewLayer creates an instance of a layer. It checks credentials
|
||||||
// and establishes gRPC connection with the node.
|
// and establishes gRPC connection with the node.
|
||||||
func NewLayer(log *zap.Logger, frostFS FrostFS, config *Config) Client {
|
func NewLayer(log *zap.Logger, frostFS FrostFS, config *Config) Client {
|
||||||
|
@ -395,7 +407,7 @@ func (n *layer) ListBuckets(ctx context.Context) ([]*data.BucketInfo, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetObject from storage.
|
// GetObject from storage.
|
||||||
func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
|
func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) (*ObjectPayload, error) {
|
||||||
var params getParams
|
var params getParams
|
||||||
|
|
||||||
params.oid = p.ObjectInfo.ID
|
params.oid = p.ObjectInfo.ID
|
||||||
|
@ -406,7 +418,7 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
|
||||||
var err error
|
var err error
|
||||||
decReader, err = getDecrypter(p)
|
decReader, err = getDecrypter(p)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("creating decrypter: %w", err)
|
return nil, fmt.Errorf("creating decrypter: %w", err)
|
||||||
}
|
}
|
||||||
params.off = decReader.EncryptedOffset()
|
params.off = decReader.EncryptedOffset()
|
||||||
params.ln = decReader.EncryptedLength()
|
params.ln = decReader.EncryptedLength()
|
||||||
|
@ -420,32 +432,58 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
payload, err := n.initObjectPayloadReader(ctx, params)
|
r, err := n.initObjectPayloadReader(ctx, params)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("init object payload reader: %w", err)
|
if client.IsErrObjectNotFound(err) {
|
||||||
|
if p.Versioned {
|
||||||
|
err = fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchVersion), err.Error())
|
||||||
|
} else {
|
||||||
|
err = fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchKey), err.Error())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("init object payload reader: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var decryptedLen uint64
|
||||||
|
if decReader != nil {
|
||||||
|
if err = decReader.SetReader(r); err != nil {
|
||||||
|
return nil, fmt.Errorf("set reader to decrypter: %w", err)
|
||||||
|
}
|
||||||
|
r = io.LimitReader(decReader, int64(decReader.DecryptedLength()))
|
||||||
|
decryptedLen = decReader.DecryptedLength()
|
||||||
|
}
|
||||||
|
|
||||||
|
return &ObjectPayload{
|
||||||
|
r: r,
|
||||||
|
params: params,
|
||||||
|
encrypted: decReader != nil,
|
||||||
|
decryptedLen: decryptedLen,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read implements io.Reader. If you want to use ObjectPayload as io.Reader
|
||||||
|
// you must not use ObjectPayload.StreamTo method and vice versa.
|
||||||
|
func (o *ObjectPayload) Read(p []byte) (int, error) {
|
||||||
|
return o.r.Read(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
// StreamTo reads all payload to provided writer.
|
||||||
|
// If you want to use this method you must not use ObjectPayload.Read and vice versa.
|
||||||
|
func (o *ObjectPayload) StreamTo(w io.Writer) error {
|
||||||
bufSize := uint64(32 * 1024) // configure?
|
bufSize := uint64(32 * 1024) // configure?
|
||||||
if params.ln != 0 && params.ln < bufSize {
|
if o.params.ln != 0 && o.params.ln < bufSize {
|
||||||
bufSize = params.ln
|
bufSize = o.params.ln
|
||||||
}
|
}
|
||||||
|
|
||||||
// alloc buffer for copying
|
// alloc buffer for copying
|
||||||
buf := make([]byte, bufSize) // sync-pool it?
|
buf := make([]byte, bufSize) // sync-pool it?
|
||||||
|
|
||||||
r := payload
|
|
||||||
if decReader != nil {
|
|
||||||
if err = decReader.SetReader(payload); err != nil {
|
|
||||||
return fmt.Errorf("set reader to decrypter: %w", err)
|
|
||||||
}
|
|
||||||
r = io.LimitReader(decReader, int64(decReader.DecryptedLength()))
|
|
||||||
}
|
|
||||||
|
|
||||||
// copy full payload
|
// copy full payload
|
||||||
written, err := io.CopyBuffer(p.Writer, r, buf)
|
written, err := io.CopyBuffer(w, o.r, buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if decReader != nil {
|
if o.encrypted {
|
||||||
return fmt.Errorf("copy object payload written: '%d', decLength: '%d', params.ln: '%d' : %w", written, decReader.DecryptedLength(), params.ln, err)
|
return fmt.Errorf("copy object payload written: '%d', decLength: '%d', params.ln: '%d' : %w", written, o.decryptedLen, o.params.ln, err)
|
||||||
}
|
}
|
||||||
return fmt.Errorf("copy object payload written: '%d': %w", written, err)
|
return fmt.Errorf("copy object payload written: '%d': %w", written, err)
|
||||||
}
|
}
|
||||||
|
@ -497,10 +535,10 @@ func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams)
|
||||||
var objInfo *data.ExtendedObjectInfo
|
var objInfo *data.ExtendedObjectInfo
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if len(p.VersionID) == 0 {
|
if p.Versioned() {
|
||||||
objInfo, err = n.headLastVersionIfNotDeleted(ctx, p.BktInfo, p.Object)
|
|
||||||
} else {
|
|
||||||
objInfo, err = n.headVersion(ctx, p.BktInfo, p)
|
objInfo, err = n.headVersion(ctx, p.BktInfo, p)
|
||||||
|
} else {
|
||||||
|
objInfo, err = n.headLastVersionIfNotDeleted(ctx, p.BktInfo, p.Object)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -515,27 +553,22 @@ func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams)
|
||||||
|
|
||||||
// CopyObject from one bucket into another bucket.
|
// CopyObject from one bucket into another bucket.
|
||||||
func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ExtendedObjectInfo, error) {
|
func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ExtendedObjectInfo, error) {
|
||||||
pr, pw := io.Pipe()
|
objPayload, err := n.GetObject(ctx, &GetObjectParams{
|
||||||
|
ObjectInfo: p.SrcObject,
|
||||||
go func() {
|
Versioned: p.SrcVersioned,
|
||||||
err := n.GetObject(ctx, &GetObjectParams{
|
Range: p.Range,
|
||||||
ObjectInfo: p.SrcObject,
|
BucketInfo: p.ScrBktInfo,
|
||||||
Writer: pw,
|
Encryption: p.Encryption,
|
||||||
Range: p.Range,
|
})
|
||||||
BucketInfo: p.ScrBktInfo,
|
if err != nil {
|
||||||
Encryption: p.Encryption,
|
return nil, fmt.Errorf("get object to copy: %w", err)
|
||||||
})
|
}
|
||||||
|
|
||||||
if err = pw.CloseWithError(err); err != nil {
|
|
||||||
n.reqLogger(ctx).Error("could not get object", zap.Error(err))
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return n.PutObject(ctx, &PutObjectParams{
|
return n.PutObject(ctx, &PutObjectParams{
|
||||||
BktInfo: p.DstBktInfo,
|
BktInfo: p.DstBktInfo,
|
||||||
Object: p.DstObject,
|
Object: p.DstObject,
|
||||||
Size: p.SrcSize,
|
Size: p.SrcSize,
|
||||||
Reader: pr,
|
Reader: objPayload,
|
||||||
Header: p.Header,
|
Header: p.Header,
|
||||||
Encryption: p.Encryption,
|
Encryption: p.Encryption,
|
||||||
CopiesNumbers: p.CopiesNumbers,
|
CopiesNumbers: p.CopiesNumbers,
|
||||||
|
|
|
@ -64,6 +64,7 @@ type (
|
||||||
}
|
}
|
||||||
|
|
||||||
UploadCopyParams struct {
|
UploadCopyParams struct {
|
||||||
|
Versioned bool
|
||||||
Info *UploadInfoParams
|
Info *UploadInfoParams
|
||||||
SrcObjInfo *data.ObjectInfo
|
SrcObjInfo *data.ObjectInfo
|
||||||
SrcBktInfo *data.BucketInfo
|
SrcBktInfo *data.BucketInfo
|
||||||
|
@ -292,26 +293,21 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
|
||||||
return nil, fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooLarge), size, uploadMaxSize)
|
return nil, fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooLarge), size, uploadMaxSize)
|
||||||
}
|
}
|
||||||
|
|
||||||
pr, pw := io.Pipe()
|
objPayload, err := n.GetObject(ctx, &GetObjectParams{
|
||||||
|
ObjectInfo: p.SrcObjInfo,
|
||||||
go func() {
|
Versioned: p.Versioned,
|
||||||
err = n.GetObject(ctx, &GetObjectParams{
|
Range: p.Range,
|
||||||
ObjectInfo: p.SrcObjInfo,
|
BucketInfo: p.SrcBktInfo,
|
||||||
Writer: pw,
|
})
|
||||||
Range: p.Range,
|
if err != nil {
|
||||||
BucketInfo: p.SrcBktInfo,
|
return nil, fmt.Errorf("get object to upload copy: %w", err)
|
||||||
})
|
}
|
||||||
|
|
||||||
if err = pw.CloseWithError(err); err != nil {
|
|
||||||
n.reqLogger(ctx).Error("could not get object", zap.Error(err))
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
params := &UploadPartParams{
|
params := &UploadPartParams{
|
||||||
Info: p.Info,
|
Info: p.Info,
|
||||||
PartNumber: p.PartNumber,
|
PartNumber: p.PartNumber,
|
||||||
Size: size,
|
Size: size,
|
||||||
Reader: pr,
|
Reader: objPayload,
|
||||||
}
|
}
|
||||||
|
|
||||||
return n.uploadPart(ctx, multipartInfo, params)
|
return n.uploadPart(ctx, multipartInfo, params)
|
||||||
|
|
|
@ -3,6 +3,7 @@ package layer
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"io"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||||
|
@ -31,26 +32,29 @@ func (tc *testContext) putObject(content []byte) *data.ObjectInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tc *testContext) getObject(objectName, versionID string, needError bool) (*data.ObjectInfo, []byte) {
|
func (tc *testContext) getObject(objectName, versionID string, needError bool) (*data.ObjectInfo, []byte) {
|
||||||
objInfo, err := tc.layer.GetObjectInfo(tc.ctx, &HeadObjectParams{
|
headPrm := &HeadObjectParams{
|
||||||
BktInfo: tc.bktInfo,
|
BktInfo: tc.bktInfo,
|
||||||
Object: objectName,
|
Object: objectName,
|
||||||
VersionID: versionID,
|
VersionID: versionID,
|
||||||
})
|
}
|
||||||
|
objInfo, err := tc.layer.GetObjectInfo(tc.ctx, headPrm)
|
||||||
if needError {
|
if needError {
|
||||||
require.Error(tc.t, err)
|
require.Error(tc.t, err)
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
require.NoError(tc.t, err)
|
require.NoError(tc.t, err)
|
||||||
|
|
||||||
content := bytes.NewBuffer(nil)
|
objPayload, err := tc.layer.GetObject(tc.ctx, &GetObjectParams{
|
||||||
err = tc.layer.GetObject(tc.ctx, &GetObjectParams{
|
|
||||||
ObjectInfo: objInfo,
|
ObjectInfo: objInfo,
|
||||||
Writer: content,
|
Versioned: headPrm.Versioned(),
|
||||||
BucketInfo: tc.bktInfo,
|
BucketInfo: tc.bktInfo,
|
||||||
})
|
})
|
||||||
require.NoError(tc.t, err)
|
require.NoError(tc.t, err)
|
||||||
|
|
||||||
return objInfo, content.Bytes()
|
payload, err := io.ReadAll(objPayload)
|
||||||
|
require.NoError(tc.t, err)
|
||||||
|
|
||||||
|
return objInfo, payload
|
||||||
}
|
}
|
||||||
|
|
||||||
func (tc *testContext) deleteObject(objectName, versionID string, settings *data.BucketSettings) {
|
func (tc *testContext) deleteObject(objectName, versionID string, settings *data.BucketSettings) {
|
||||||
|
|
Loading…
Reference in a new issue