[#158] Separate init object reader from read itself #164

10 changed files with 182 additions and 84 deletions

View file

@ -12,6 +12,7 @@ This document outlines major changes between releases.
- Handle negative `Content-Length` on put (#125)
- Use `DisableURIPathEscaping` to presign urls (#125)
- Use specific s3 errors instead of `InternalError` where possible (#143)
- Return appropriate 404 code when object missed in storage but there is in gate cache (#158)
### Added
- Implement chunk uploading (#106)

View file

@ -170,13 +170,14 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
}
params := &layer.CopyObjectParams{
SrcObject: srcObjInfo,
ScrBktInfo: srcObjPrm.BktInfo,
DstBktInfo: dstBktInfo,
DstObject: reqInfo.ObjectName,
SrcSize: srcObjInfo.Size,
Header: metadata,
Encryption: encryptionParams,
SrcVersioned: srcObjPrm.Versioned(),
SrcObject: srcObjInfo,
ScrBktInfo: srcObjPrm.BktInfo,
DstBktInfo: dstBktInfo,
DstObject: reqInfo.ObjectName,
SrcSize: srcObjInfo.Size,
Header: metadata,
Encryption: encryptionParams,
}
params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, dstBktInfo.LocationConstraint)

View file

@ -202,6 +202,20 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}
getPayloadParams := &layer.GetObjectParams{
ObjectInfo: info,
Versioned: p.Versioned(),
Range: params,
BucketInfo: bktInfo,
Encryption: encryptionParams,
}
objPayload, err := h.obj.GetObject(r.Context(), getPayloadParams)
if err != nil {
h.logAndSendError(w, "could not get object payload", reqInfo, err)
return
}
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned())
if params != nil {
writeRangeHeaders(w, params, info.Size)
@ -209,15 +223,9 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
getParams := &layer.GetObjectParams{
ObjectInfo: info,
Writer: w,
Range: params,
BucketInfo: bktInfo,
Encryption: encryptionParams,
}
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
h.logAndSendError(w, "could not get object", reqInfo, err)
if err = objPayload.StreamTo(w); err != nil {
h.logAndSendError(w, "could not stream object payload", reqInfo, err)
return
}
}

View file

@ -5,12 +5,18 @@ import (
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
apiErrors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"github.com/stretchr/testify/require"
)
@ -170,6 +176,24 @@ func TestGetRange(t *testing.T) {
require.Equal(t, "bcdef", string(end))
}
func TestGetObject(t *testing.T) {
hc := prepareHandlerContext(t)
bktName, objName := "bucket", "obj"
bktInfo, objInfo := createVersionedBucketAndObject(hc.t, hc, bktName, objName)
putObject(hc.t, hc, bktName, objName)
checkFound(hc.t, hc, bktName, objName, objInfo.VersionID())
checkFound(hc.t, hc, bktName, objName, emptyVersion)
addr := getAddressOfLastVersion(hc, bktInfo, objName)
hc.tp.SetObjectError(addr, apistatus.ObjectNotFound{})
hc.tp.SetObjectError(objInfo.Address(), apistatus.ObjectNotFound{})
getObjectAssertS3Error(hc, bktName, objName, objInfo.VersionID(), s3errors.ErrNoSuchVersion)
getObjectAssertS3Error(hc, bktName, objName, emptyVersion, s3errors.ErrNoSuchKey)
}
func putObjectContent(hc *handlerContext, bktName, objName, content string) {
body := bytes.NewReader([]byte(content))
w, r := prepareTestPayloadRequest(hc, bktName, objName, body)
@ -186,3 +210,17 @@ func getObjectRange(t *testing.T, tc *handlerContext, bktName, objName string, s
require.NoError(t, err)
return content
}
func getObjectAssertS3Error(hc *handlerContext, bktName, objName, version string, code apiErrors.ErrorCode) {
w := getObjectBase(hc, bktName, objName, version)
assertS3Error(hc.t, w, apiErrors.GetAPIError(code))
}
func getObjectBase(hc *handlerContext, bktName, objName, version string) *httptest.ResponseRecorder {
query := make(url.Values)
query.Add(api.QueryVersionID, version)
w, r := prepareTestFullRequest(hc, bktName, objName, query, nil)
hc.Handler().GetObjectHandler(w, r)
return w
}

View file

@ -272,10 +272,16 @@ func existInMockedFrostFS(tc *handlerContext, bktInfo *data.BucketInfo, objInfo
p := &layer.GetObjectParams{
BucketInfo: bktInfo,
ObjectInfo: objInfo,
Writer: io.Discard,
}
return tc.Layer().GetObject(tc.Context(), p) == nil
objPayload, err := tc.Layer().GetObject(tc.Context(), p)
if err != nil {
return false
}
_, err = io.ReadAll(objPayload)
require.NoError(tc.t, err)
return true
}
func listOIDsFromMockedFrostFS(t *testing.T, tc *handlerContext, bktName string) []oid.ID {

View file

@ -1,7 +1,7 @@
package handler
import (
"bytes"
"io"
"net/http"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
@ -84,18 +84,26 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
if len(info.ContentType) == 0 {
if info.ContentType = layer.MimeByFilePath(info.Name); len(info.ContentType) == 0 {
buffer := bytes.NewBuffer(make([]byte, 0, sizeToDetectType))
getParams := &layer.GetObjectParams{
ObjectInfo: info,
Writer: buffer,
Versioned: p.Versioned(),
Range: getRangeToDetectContentType(info.Size),
BucketInfo: bktInfo,
}
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
objPayload, err := h.obj.GetObject(r.Context(), getParams)
if err != nil {
h.logAndSendError(w, "could not get object", reqInfo, err, zap.Stringer("oid", info.ID))
return
}
info.ContentType = http.DetectContentType(buffer.Bytes())
buffer, err := io.ReadAll(objPayload)
if err != nil {
h.logAndSendError(w, "could not partly read payload to detect content type", reqInfo, err, zap.Stringer("oid", info.ID))
return
}
info.ContentType = http.DetectContentType(buffer)
}
}

View file

@ -320,11 +320,13 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
return
}
srcInfo, err := h.obj.GetObjectInfo(ctx, &layer.HeadObjectParams{
headPrm := &layer.HeadObjectParams{
BktInfo: srcBktInfo,
Object: srcObject,
VersionID: versionID,
})
}
srcInfo, err := h.obj.GetObjectInfo(ctx, headPrm)
if err != nil {
if errors.IsS3Error(err, errors.ErrNoSuchKey) && versionID != "" {
h.logAndSendError(w, "could not head source object version", reqInfo,
@ -349,6 +351,7 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
}
p := &layer.UploadCopyParams{
Versioned: headPrm.Versioned(),
Info: &layer.UploadInfoParams{
UploadID: uploadID,
Bkt: bktInfo,

View file

@ -74,7 +74,7 @@ type (
Range *RangeParams
ObjectInfo *data.ObjectInfo
BucketInfo *data.BucketInfo
Writer io.Writer
Versioned bool
Encryption encryption.Params
}
@ -132,6 +132,7 @@ type (
// CopyObjectParams stores object copy request parameters.
CopyObjectParams struct {
SrcVersioned bool
SrcObject *data.ObjectInfo
ScrBktInfo *data.BucketInfo
DstBktInfo *data.BucketInfo
@ -185,6 +186,13 @@ type (
Error error
}
ObjectPayload struct {
r io.Reader
params getParams
encrypted bool
decryptedLen uint64
}
// Client provides S3 API client interface.
Client interface {
Initialize(ctx context.Context, c EventListener) error
@ -204,7 +212,7 @@ type (
CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error)
DeleteBucket(ctx context.Context, p *DeleteBucketParams) error
GetObject(ctx context.Context, p *GetObjectParams) error
GetObject(ctx context.Context, p *GetObjectParams) (*ObjectPayload, error)
GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ObjectInfo, error)
GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error)
@ -268,6 +276,10 @@ func (f MsgHandlerFunc) HandleMessage(ctx context.Context, msg *nats.Msg) error
return f(ctx, msg)
}
func (p HeadObjectParams) Versioned() bool {
return len(p.VersionID) > 0
}
// NewLayer creates an instance of a layer. It checks credentials
// and establishes gRPC connection with the node.
func NewLayer(log *zap.Logger, frostFS FrostFS, config *Config) Client {
@ -395,7 +407,7 @@ func (n *layer) ListBuckets(ctx context.Context) ([]*data.BucketInfo, error) {
}
// GetObject from storage.
func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) (*ObjectPayload, error) {
var params getParams
params.oid = p.ObjectInfo.ID
@ -406,7 +418,7 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
var err error
decReader, err = getDecrypter(p)
if err != nil {
return fmt.Errorf("creating decrypter: %w", err)
return nil, fmt.Errorf("creating decrypter: %w", err)
}
params.off = decReader.EncryptedOffset()
params.ln = decReader.EncryptedLength()
@ -420,32 +432,58 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
}
}
payload, err := n.initObjectPayloadReader(ctx, params)
r, err := n.initObjectPayloadReader(ctx, params)
if err != nil {
return fmt.Errorf("init object payload reader: %w", err)
if client.IsErrObjectNotFound(err) {
if p.Versioned {
err = fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchVersion), err.Error())
} else {
err = fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchKey), err.Error())
}
}
return nil, fmt.Errorf("init object payload reader: %w", err)
}
var decryptedLen uint64
if decReader != nil {
if err = decReader.SetReader(r); err != nil {
return nil, fmt.Errorf("set reader to decrypter: %w", err)
}
r = io.LimitReader(decReader, int64(decReader.DecryptedLength()))
decryptedLen = decReader.DecryptedLength()
}
return &ObjectPayload{
r: r,
params: params,
encrypted: decReader != nil,
decryptedLen: decryptedLen,
}, nil
}
// Read implements io.Reader. If you want to use ObjectPayload as io.Reader
// you must not use ObjectPayload.StreamTo method and vice versa.
func (o *ObjectPayload) Read(p []byte) (int, error) {
return o.r.Read(p)
}
// StreamTo reads all payload to provided writer.
// If you want to use this method you must not use ObjectPayload.Read and vice versa.
func (o *ObjectPayload) StreamTo(w io.Writer) error {
bufSize := uint64(32 * 1024) // configure?
if params.ln != 0 && params.ln < bufSize {
bufSize = params.ln
if o.params.ln != 0 && o.params.ln < bufSize {
bufSize = o.params.ln
}
// alloc buffer for copying
buf := make([]byte, bufSize) // sync-pool it?
r := payload
if decReader != nil {
if err = decReader.SetReader(payload); err != nil {
return fmt.Errorf("set reader to decrypter: %w", err)
}
r = io.LimitReader(decReader, int64(decReader.DecryptedLength()))
}
// copy full payload
written, err := io.CopyBuffer(p.Writer, r, buf)
written, err := io.CopyBuffer(w, o.r, buf)
if err != nil {
if decReader != nil {
return fmt.Errorf("copy object payload written: '%d', decLength: '%d', params.ln: '%d' : %w", written, decReader.DecryptedLength(), params.ln, err)
if o.encrypted {
return fmt.Errorf("copy object payload written: '%d', decLength: '%d', params.ln: '%d' : %w", written, o.decryptedLen, o.params.ln, err)
}
return fmt.Errorf("copy object payload written: '%d': %w", written, err)
}
@ -497,10 +535,10 @@ func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams)
var objInfo *data.ExtendedObjectInfo
var err error
if len(p.VersionID) == 0 {
objInfo, err = n.headLastVersionIfNotDeleted(ctx, p.BktInfo, p.Object)
} else {
if p.Versioned() {
objInfo, err = n.headVersion(ctx, p.BktInfo, p)
} else {
objInfo, err = n.headLastVersionIfNotDeleted(ctx, p.BktInfo, p.Object)
}
if err != nil {
return nil, err
@ -515,27 +553,22 @@ func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams)
// CopyObject from one bucket into another bucket.
func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ExtendedObjectInfo, error) {
pr, pw := io.Pipe()
go func() {
err := n.GetObject(ctx, &GetObjectParams{
ObjectInfo: p.SrcObject,
Writer: pw,
Range: p.Range,
BucketInfo: p.ScrBktInfo,
Encryption: p.Encryption,
})
if err = pw.CloseWithError(err); err != nil {
n.reqLogger(ctx).Error("could not get object", zap.Error(err))
}
}()
objPayload, err := n.GetObject(ctx, &GetObjectParams{
ObjectInfo: p.SrcObject,
Versioned: p.SrcVersioned,
Range: p.Range,
BucketInfo: p.ScrBktInfo,
Encryption: p.Encryption,
})
if err != nil {
return nil, fmt.Errorf("get object to copy: %w", err)
}
return n.PutObject(ctx, &PutObjectParams{
BktInfo: p.DstBktInfo,
Object: p.DstObject,
Size: p.SrcSize,
Reader: pr,
Reader: objPayload,
Header: p.Header,
Encryption: p.Encryption,
CopiesNumbers: p.CopiesNumbers,

View file

@ -64,6 +64,7 @@ type (
}
UploadCopyParams struct {
Versioned bool
Info *UploadInfoParams
SrcObjInfo *data.ObjectInfo
SrcBktInfo *data.BucketInfo
@ -292,26 +293,21 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
return nil, fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooLarge), size, uploadMaxSize)
}
pr, pw := io.Pipe()
go func() {
err = n.GetObject(ctx, &GetObjectParams{
ObjectInfo: p.SrcObjInfo,
Writer: pw,
Range: p.Range,
BucketInfo: p.SrcBktInfo,
})
if err = pw.CloseWithError(err); err != nil {
n.reqLogger(ctx).Error("could not get object", zap.Error(err))
}
}()
objPayload, err := n.GetObject(ctx, &GetObjectParams{
ObjectInfo: p.SrcObjInfo,
Versioned: p.Versioned,
Range: p.Range,
BucketInfo: p.SrcBktInfo,
})
if err != nil {
return nil, fmt.Errorf("get object to upload copy: %w", err)
}
params := &UploadPartParams{
Info: p.Info,
PartNumber: p.PartNumber,
Size: size,
Reader: pr,
Reader: objPayload,
}
return n.uploadPart(ctx, multipartInfo, params)

View file

@ -3,6 +3,7 @@ package layer
import (
"bytes"
"context"
"io"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
@ -31,26 +32,29 @@ func (tc *testContext) putObject(content []byte) *data.ObjectInfo {
}
func (tc *testContext) getObject(objectName, versionID string, needError bool) (*data.ObjectInfo, []byte) {
objInfo, err := tc.layer.GetObjectInfo(tc.ctx, &HeadObjectParams{
headPrm := &HeadObjectParams{
BktInfo: tc.bktInfo,
Object: objectName,
VersionID: versionID,
})
}
objInfo, err := tc.layer.GetObjectInfo(tc.ctx, headPrm)
if needError {
require.Error(tc.t, err)
return nil, nil
}
require.NoError(tc.t, err)
content := bytes.NewBuffer(nil)
err = tc.layer.GetObject(tc.ctx, &GetObjectParams{
objPayload, err := tc.layer.GetObject(tc.ctx, &GetObjectParams{
ObjectInfo: objInfo,
Writer: content,
Versioned: headPrm.Versioned(),
BucketInfo: tc.bktInfo,
})
require.NoError(tc.t, err)
return objInfo, content.Bytes()
payload, err := io.ReadAll(objPayload)
require.NoError(tc.t, err)
return objInfo, payload
}
func (tc *testContext) deleteObject(objectName, versionID string, settings *data.BucketSettings) {