[#158] Separate init object reader from read itself #164
10 changed files with 182 additions and 84 deletions
|
@ -12,6 +12,7 @@ This document outlines major changes between releases.
|
|||
- Handle negative `Content-Length` on put (#125)
|
||||
- Use `DisableURIPathEscaping` to presign urls (#125)
|
||||
- Use specific s3 errors instead of `InternalError` where possible (#143)
|
||||
- Return appropriate 404 code when object missed in storage but there is in gate cache (#158)
|
||||
|
||||
### Added
|
||||
- Implement chunk uploading (#106)
|
||||
|
|
|
@ -170,6 +170,7 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
params := &layer.CopyObjectParams{
|
||||
SrcVersioned: srcObjPrm.Versioned(),
|
||||
SrcObject: srcObjInfo,
|
||||
ScrBktInfo: srcObjPrm.BktInfo,
|
||||
DstBktInfo: dstBktInfo,
|
||||
|
|
|
@ -202,6 +202,20 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
getPayloadParams := &layer.GetObjectParams{
|
||||
ObjectInfo: info,
|
||||
Versioned: p.Versioned(),
|
||||
Range: params,
|
||||
BucketInfo: bktInfo,
|
||||
Encryption: encryptionParams,
|
||||
}
|
||||
|
||||
objPayload, err := h.obj.GetObject(r.Context(), getPayloadParams)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get object payload", reqInfo, err)
|
||||
return
|
||||
}
|
||||
|
||||
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned())
|
||||
if params != nil {
|
||||
writeRangeHeaders(w, params, info.Size)
|
||||
|
@ -209,15 +223,9 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
getParams := &layer.GetObjectParams{
|
||||
ObjectInfo: info,
|
||||
Writer: w,
|
||||
Range: params,
|
||||
BucketInfo: bktInfo,
|
||||
Encryption: encryptionParams,
|
||||
}
|
||||
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
|
||||
h.logAndSendError(w, "could not get object", reqInfo, err)
|
||||
if err = objPayload.StreamTo(w); err != nil {
|
||||
h.logAndSendError(w, "could not stream object payload", reqInfo, err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -5,12 +5,18 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
apiErrors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
|
@ -170,6 +176,24 @@ func TestGetRange(t *testing.T) {
|
|||
require.Equal(t, "bcdef", string(end))
|
||||
}
|
||||
|
||||
func TestGetObject(t *testing.T) {
|
||||
hc := prepareHandlerContext(t)
|
||||
bktName, objName := "bucket", "obj"
|
||||
bktInfo, objInfo := createVersionedBucketAndObject(hc.t, hc, bktName, objName)
|
||||
|
||||
putObject(hc.t, hc, bktName, objName)
|
||||
|
||||
checkFound(hc.t, hc, bktName, objName, objInfo.VersionID())
|
||||
checkFound(hc.t, hc, bktName, objName, emptyVersion)
|
||||
|
||||
addr := getAddressOfLastVersion(hc, bktInfo, objName)
|
||||
hc.tp.SetObjectError(addr, apistatus.ObjectNotFound{})
|
||||
hc.tp.SetObjectError(objInfo.Address(), apistatus.ObjectNotFound{})
|
||||
|
||||
getObjectAssertS3Error(hc, bktName, objName, objInfo.VersionID(), s3errors.ErrNoSuchVersion)
|
||||
getObjectAssertS3Error(hc, bktName, objName, emptyVersion, s3errors.ErrNoSuchKey)
|
||||
}
|
||||
|
||||
func putObjectContent(hc *handlerContext, bktName, objName, content string) {
|
||||
body := bytes.NewReader([]byte(content))
|
||||
w, r := prepareTestPayloadRequest(hc, bktName, objName, body)
|
||||
|
@ -186,3 +210,17 @@ func getObjectRange(t *testing.T, tc *handlerContext, bktName, objName string, s
|
|||
require.NoError(t, err)
|
||||
return content
|
||||
}
|
||||
|
||||
func getObjectAssertS3Error(hc *handlerContext, bktName, objName, version string, code apiErrors.ErrorCode) {
|
||||
w := getObjectBase(hc, bktName, objName, version)
|
||||
assertS3Error(hc.t, w, apiErrors.GetAPIError(code))
|
||||
}
|
||||
|
||||
func getObjectBase(hc *handlerContext, bktName, objName, version string) *httptest.ResponseRecorder {
|
||||
query := make(url.Values)
|
||||
query.Add(api.QueryVersionID, version)
|
||||
|
||||
w, r := prepareTestFullRequest(hc, bktName, objName, query, nil)
|
||||
hc.Handler().GetObjectHandler(w, r)
|
||||
return w
|
||||
}
|
||||
|
|
|
@ -272,10 +272,16 @@ func existInMockedFrostFS(tc *handlerContext, bktInfo *data.BucketInfo, objInfo
|
|||
p := &layer.GetObjectParams{
|
||||
BucketInfo: bktInfo,
|
||||
ObjectInfo: objInfo,
|
||||
Writer: io.Discard,
|
||||
}
|
||||
|
||||
return tc.Layer().GetObject(tc.Context(), p) == nil
|
||||
objPayload, err := tc.Layer().GetObject(tc.Context(), p)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
_, err = io.ReadAll(objPayload)
|
||||
require.NoError(tc.t, err)
|
||||
return true
|
||||
}
|
||||
|
||||
func listOIDsFromMockedFrostFS(t *testing.T, tc *handlerContext, bktName string) []oid.ID {
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
package handler
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"net/http"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
|
||||
|
@ -84,18 +84,26 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
if len(info.ContentType) == 0 {
|
||||
if info.ContentType = layer.MimeByFilePath(info.Name); len(info.ContentType) == 0 {
|
||||
buffer := bytes.NewBuffer(make([]byte, 0, sizeToDetectType))
|
||||
getParams := &layer.GetObjectParams{
|
||||
ObjectInfo: info,
|
||||
Writer: buffer,
|
||||
Versioned: p.Versioned(),
|
||||
Range: getRangeToDetectContentType(info.Size),
|
||||
BucketInfo: bktInfo,
|
||||
}
|
||||
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
|
||||
|
||||
objPayload, err := h.obj.GetObject(r.Context(), getParams)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not get object", reqInfo, err, zap.Stringer("oid", info.ID))
|
||||
return
|
||||
}
|
||||
info.ContentType = http.DetectContentType(buffer.Bytes())
|
||||
|
||||
buffer, err := io.ReadAll(objPayload)
|
||||
if err != nil {
|
||||
h.logAndSendError(w, "could not partly read payload to detect content type", reqInfo, err, zap.Stringer("oid", info.ID))
|
||||
return
|
||||
}
|
||||
|
||||
info.ContentType = http.DetectContentType(buffer)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -320,11 +320,13 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
srcInfo, err := h.obj.GetObjectInfo(ctx, &layer.HeadObjectParams{
|
||||
headPrm := &layer.HeadObjectParams{
|
||||
BktInfo: srcBktInfo,
|
||||
Object: srcObject,
|
||||
VersionID: versionID,
|
||||
})
|
||||
}
|
||||
|
||||
srcInfo, err := h.obj.GetObjectInfo(ctx, headPrm)
|
||||
if err != nil {
|
||||
if errors.IsS3Error(err, errors.ErrNoSuchKey) && versionID != "" {
|
||||
h.logAndSendError(w, "could not head source object version", reqInfo,
|
||||
|
@ -349,6 +351,7 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
|
||||
p := &layer.UploadCopyParams{
|
||||
Versioned: headPrm.Versioned(),
|
||||
Info: &layer.UploadInfoParams{
|
||||
UploadID: uploadID,
|
||||
Bkt: bktInfo,
|
||||
|
|
|
@ -74,7 +74,7 @@ type (
|
|||
Range *RangeParams
|
||||
ObjectInfo *data.ObjectInfo
|
||||
BucketInfo *data.BucketInfo
|
||||
Writer io.Writer
|
||||
Versioned bool
|
||||
Encryption encryption.Params
|
||||
}
|
||||
|
||||
|
@ -132,6 +132,7 @@ type (
|
|||
|
||||
// CopyObjectParams stores object copy request parameters.
|
||||
CopyObjectParams struct {
|
||||
SrcVersioned bool
|
||||
SrcObject *data.ObjectInfo
|
||||
ScrBktInfo *data.BucketInfo
|
||||
DstBktInfo *data.BucketInfo
|
||||
|
@ -185,6 +186,13 @@ type (
|
|||
Error error
|
||||
}
|
||||
|
||||
ObjectPayload struct {
|
||||
r io.Reader
|
||||
params getParams
|
||||
encrypted bool
|
||||
decryptedLen uint64
|
||||
}
|
||||
|
||||
// Client provides S3 API client interface.
|
||||
Client interface {
|
||||
Initialize(ctx context.Context, c EventListener) error
|
||||
|
@ -204,7 +212,7 @@ type (
|
|||
CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error)
|
||||
DeleteBucket(ctx context.Context, p *DeleteBucketParams) error
|
||||
|
||||
GetObject(ctx context.Context, p *GetObjectParams) error
|
||||
GetObject(ctx context.Context, p *GetObjectParams) (*ObjectPayload, error)
|
||||
GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ObjectInfo, error)
|
||||
GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error)
|
||||
|
||||
|
@ -268,6 +276,10 @@ func (f MsgHandlerFunc) HandleMessage(ctx context.Context, msg *nats.Msg) error
|
|||
return f(ctx, msg)
|
||||
}
|
||||
|
||||
func (p HeadObjectParams) Versioned() bool {
|
||||
return len(p.VersionID) > 0
|
||||
}
|
||||
|
||||
// NewLayer creates an instance of a layer. It checks credentials
|
||||
// and establishes gRPC connection with the node.
|
||||
func NewLayer(log *zap.Logger, frostFS FrostFS, config *Config) Client {
|
||||
|
@ -395,7 +407,7 @@ func (n *layer) ListBuckets(ctx context.Context) ([]*data.BucketInfo, error) {
|
|||
}
|
||||
|
||||
// GetObject from storage.
|
||||
func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
|
||||
func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) (*ObjectPayload, error) {
|
||||
var params getParams
|
||||
|
||||
params.oid = p.ObjectInfo.ID
|
||||
|
@ -406,7 +418,7 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
|
|||
var err error
|
||||
decReader, err = getDecrypter(p)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating decrypter: %w", err)
|
||||
return nil, fmt.Errorf("creating decrypter: %w", err)
|
||||
}
|
||||
params.off = decReader.EncryptedOffset()
|
||||
params.ln = decReader.EncryptedLength()
|
||||
|
@ -420,32 +432,58 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
|
|||
}
|
||||
}
|
||||
|
||||
payload, err := n.initObjectPayloadReader(ctx, params)
|
||||
r, err := n.initObjectPayloadReader(ctx, params)
|
||||
if err != nil {
|
||||
return fmt.Errorf("init object payload reader: %w", err)
|
||||
if client.IsErrObjectNotFound(err) {
|
||||
if p.Versioned {
|
||||
err = fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchVersion), err.Error())
|
||||
} else {
|
||||
err = fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchKey), err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("init object payload reader: %w", err)
|
||||
}
|
||||
|
||||
var decryptedLen uint64
|
||||
if decReader != nil {
|
||||
if err = decReader.SetReader(r); err != nil {
|
||||
return nil, fmt.Errorf("set reader to decrypter: %w", err)
|
||||
}
|
||||
r = io.LimitReader(decReader, int64(decReader.DecryptedLength()))
|
||||
decryptedLen = decReader.DecryptedLength()
|
||||
}
|
||||
|
||||
return &ObjectPayload{
|
||||
r: r,
|
||||
params: params,
|
||||
encrypted: decReader != nil,
|
||||
decryptedLen: decryptedLen,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Read implements io.Reader. If you want to use ObjectPayload as io.Reader
|
||||
// you must not use ObjectPayload.StreamTo method and vice versa.
|
||||
func (o *ObjectPayload) Read(p []byte) (int, error) {
|
||||
return o.r.Read(p)
|
||||
}
|
||||
|
||||
// StreamTo reads all payload to provided writer.
|
||||
// If you want to use this method you must not use ObjectPayload.Read and vice versa.
|
||||
func (o *ObjectPayload) StreamTo(w io.Writer) error {
|
||||
bufSize := uint64(32 * 1024) // configure?
|
||||
if params.ln != 0 && params.ln < bufSize {
|
||||
bufSize = params.ln
|
||||
if o.params.ln != 0 && o.params.ln < bufSize {
|
||||
bufSize = o.params.ln
|
||||
}
|
||||
|
||||
// alloc buffer for copying
|
||||
buf := make([]byte, bufSize) // sync-pool it?
|
||||
|
||||
r := payload
|
||||
if decReader != nil {
|
||||
if err = decReader.SetReader(payload); err != nil {
|
||||
return fmt.Errorf("set reader to decrypter: %w", err)
|
||||
}
|
||||
r = io.LimitReader(decReader, int64(decReader.DecryptedLength()))
|
||||
}
|
||||
|
||||
// copy full payload
|
||||
written, err := io.CopyBuffer(p.Writer, r, buf)
|
||||
written, err := io.CopyBuffer(w, o.r, buf)
|
||||
if err != nil {
|
||||
if decReader != nil {
|
||||
return fmt.Errorf("copy object payload written: '%d', decLength: '%d', params.ln: '%d' : %w", written, decReader.DecryptedLength(), params.ln, err)
|
||||
if o.encrypted {
|
||||
return fmt.Errorf("copy object payload written: '%d', decLength: '%d', params.ln: '%d' : %w", written, o.decryptedLen, o.params.ln, err)
|
||||
}
|
||||
return fmt.Errorf("copy object payload written: '%d': %w", written, err)
|
||||
}
|
||||
|
@ -497,10 +535,10 @@ func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams)
|
|||
var objInfo *data.ExtendedObjectInfo
|
||||
var err error
|
||||
|
||||
if len(p.VersionID) == 0 {
|
||||
objInfo, err = n.headLastVersionIfNotDeleted(ctx, p.BktInfo, p.Object)
|
||||
} else {
|
||||
if p.Versioned() {
|
||||
objInfo, err = n.headVersion(ctx, p.BktInfo, p)
|
||||
} else {
|
||||
objInfo, err = n.headLastVersionIfNotDeleted(ctx, p.BktInfo, p.Object)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -515,27 +553,22 @@ func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams)
|
|||
|
||||
// CopyObject from one bucket into another bucket.
|
||||
func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ExtendedObjectInfo, error) {
|
||||
pr, pw := io.Pipe()
|
||||
|
||||
go func() {
|
||||
err := n.GetObject(ctx, &GetObjectParams{
|
||||
objPayload, err := n.GetObject(ctx, &GetObjectParams{
|
||||
ObjectInfo: p.SrcObject,
|
||||
Writer: pw,
|
||||
Versioned: p.SrcVersioned,
|
||||
Range: p.Range,
|
||||
BucketInfo: p.ScrBktInfo,
|
||||
Encryption: p.Encryption,
|
||||
})
|
||||
|
||||
if err = pw.CloseWithError(err); err != nil {
|
||||
n.reqLogger(ctx).Error("could not get object", zap.Error(err))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get object to copy: %w", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return n.PutObject(ctx, &PutObjectParams{
|
||||
BktInfo: p.DstBktInfo,
|
||||
Object: p.DstObject,
|
||||
Size: p.SrcSize,
|
||||
Reader: pr,
|
||||
Reader: objPayload,
|
||||
Header: p.Header,
|
||||
Encryption: p.Encryption,
|
||||
CopiesNumbers: p.CopiesNumbers,
|
||||
|
|
|
@ -64,6 +64,7 @@ type (
|
|||
}
|
||||
|
||||
UploadCopyParams struct {
|
||||
Versioned bool
|
||||
Info *UploadInfoParams
|
||||
SrcObjInfo *data.ObjectInfo
|
||||
SrcBktInfo *data.BucketInfo
|
||||
|
@ -292,26 +293,21 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
|
|||
return nil, fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooLarge), size, uploadMaxSize)
|
||||
}
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
|
||||
go func() {
|
||||
err = n.GetObject(ctx, &GetObjectParams{
|
||||
objPayload, err := n.GetObject(ctx, &GetObjectParams{
|
||||
ObjectInfo: p.SrcObjInfo,
|
||||
Writer: pw,
|
||||
Versioned: p.Versioned,
|
||||
Range: p.Range,
|
||||
BucketInfo: p.SrcBktInfo,
|
||||
})
|
||||
|
||||
if err = pw.CloseWithError(err); err != nil {
|
||||
n.reqLogger(ctx).Error("could not get object", zap.Error(err))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get object to upload copy: %w", err)
|
||||
}
|
||||
}()
|
||||
|
||||
params := &UploadPartParams{
|
||||
Info: p.Info,
|
||||
PartNumber: p.PartNumber,
|
||||
Size: size,
|
||||
Reader: pr,
|
||||
Reader: objPayload,
|
||||
}
|
||||
|
||||
return n.uploadPart(ctx, multipartInfo, params)
|
||||
|
|
|
@ -3,6 +3,7 @@ package layer
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
|
||||
|
@ -31,26 +32,29 @@ func (tc *testContext) putObject(content []byte) *data.ObjectInfo {
|
|||
}
|
||||
|
||||
func (tc *testContext) getObject(objectName, versionID string, needError bool) (*data.ObjectInfo, []byte) {
|
||||
objInfo, err := tc.layer.GetObjectInfo(tc.ctx, &HeadObjectParams{
|
||||
headPrm := &HeadObjectParams{
|
||||
BktInfo: tc.bktInfo,
|
||||
Object: objectName,
|
||||
VersionID: versionID,
|
||||
})
|
||||
}
|
||||
objInfo, err := tc.layer.GetObjectInfo(tc.ctx, headPrm)
|
||||
if needError {
|
||||
require.Error(tc.t, err)
|
||||
return nil, nil
|
||||
}
|
||||
require.NoError(tc.t, err)
|
||||
|
||||
content := bytes.NewBuffer(nil)
|
||||
err = tc.layer.GetObject(tc.ctx, &GetObjectParams{
|
||||
objPayload, err := tc.layer.GetObject(tc.ctx, &GetObjectParams{
|
||||
ObjectInfo: objInfo,
|
||||
Writer: content,
|
||||
Versioned: headPrm.Versioned(),
|
||||
BucketInfo: tc.bktInfo,
|
||||
})
|
||||
require.NoError(tc.t, err)
|
||||
|
||||
return objInfo, content.Bytes()
|
||||
payload, err := io.ReadAll(objPayload)
|
||||
require.NoError(tc.t, err)
|
||||
|
||||
return objInfo, payload
|
||||
}
|
||||
|
||||
func (tc *testContext) deleteObject(objectName, versionID string, settings *data.BucketSettings) {
|
||||
|
|
Loading…
Reference in a new issue