[#158] Separate init object reader from read itself #164

10 changed files with 182 additions and 84 deletions

View file

@ -12,6 +12,7 @@ This document outlines major changes between releases.
- Handle negative `Content-Length` on put (#125) - Handle negative `Content-Length` on put (#125)
- Use `DisableURIPathEscaping` to presign urls (#125) - Use `DisableURIPathEscaping` to presign urls (#125)
- Use specific s3 errors instead of `InternalError` where possible (#143) - Use specific s3 errors instead of `InternalError` where possible (#143)
- Return appropriate 404 code when object missed in storage but there is in gate cache (#158)
### Added ### Added
- Implement chunk uploading (#106) - Implement chunk uploading (#106)

View file

@ -170,6 +170,7 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
} }
params := &layer.CopyObjectParams{ params := &layer.CopyObjectParams{
SrcVersioned: srcObjPrm.Versioned(),
SrcObject: srcObjInfo, SrcObject: srcObjInfo,
ScrBktInfo: srcObjPrm.BktInfo, ScrBktInfo: srcObjPrm.BktInfo,
DstBktInfo: dstBktInfo, DstBktInfo: dstBktInfo,

View file

@ -202,6 +202,20 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
return return
} }
getPayloadParams := &layer.GetObjectParams{
ObjectInfo: info,
Versioned: p.Versioned(),
Range: params,
BucketInfo: bktInfo,
Encryption: encryptionParams,
}
objPayload, err := h.obj.GetObject(r.Context(), getPayloadParams)
if err != nil {
h.logAndSendError(w, "could not get object payload", reqInfo, err)
return
}
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned()) writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned())
if params != nil { if params != nil {
writeRangeHeaders(w, params, info.Size) writeRangeHeaders(w, params, info.Size)
@ -209,15 +223,9 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
} }
getParams := &layer.GetObjectParams{ if err = objPayload.StreamTo(w); err != nil {
ObjectInfo: info, h.logAndSendError(w, "could not stream object payload", reqInfo, err)
Writer: w, return
Range: params,
BucketInfo: bktInfo,
Encryption: encryptionParams,
}
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
h.logAndSendError(w, "could not get object", reqInfo, err)
} }
} }

View file

@ -5,12 +5,18 @@ import (
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"net/http/httptest"
"net/url"
"testing" "testing"
"time" "time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
apiErrors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@ -170,6 +176,24 @@ func TestGetRange(t *testing.T) {
require.Equal(t, "bcdef", string(end)) require.Equal(t, "bcdef", string(end))
} }
func TestGetObject(t *testing.T) {
hc := prepareHandlerContext(t)
bktName, objName := "bucket", "obj"
bktInfo, objInfo := createVersionedBucketAndObject(hc.t, hc, bktName, objName)
putObject(hc.t, hc, bktName, objName)
checkFound(hc.t, hc, bktName, objName, objInfo.VersionID())
checkFound(hc.t, hc, bktName, objName, emptyVersion)
addr := getAddressOfLastVersion(hc, bktInfo, objName)
hc.tp.SetObjectError(addr, apistatus.ObjectNotFound{})
hc.tp.SetObjectError(objInfo.Address(), apistatus.ObjectNotFound{})
getObjectAssertS3Error(hc, bktName, objName, objInfo.VersionID(), s3errors.ErrNoSuchVersion)
getObjectAssertS3Error(hc, bktName, objName, emptyVersion, s3errors.ErrNoSuchKey)
}
func putObjectContent(hc *handlerContext, bktName, objName, content string) { func putObjectContent(hc *handlerContext, bktName, objName, content string) {
body := bytes.NewReader([]byte(content)) body := bytes.NewReader([]byte(content))
w, r := prepareTestPayloadRequest(hc, bktName, objName, body) w, r := prepareTestPayloadRequest(hc, bktName, objName, body)
@ -186,3 +210,17 @@ func getObjectRange(t *testing.T, tc *handlerContext, bktName, objName string, s
require.NoError(t, err) require.NoError(t, err)
return content return content
} }
func getObjectAssertS3Error(hc *handlerContext, bktName, objName, version string, code apiErrors.ErrorCode) {
w := getObjectBase(hc, bktName, objName, version)
assertS3Error(hc.t, w, apiErrors.GetAPIError(code))
}
func getObjectBase(hc *handlerContext, bktName, objName, version string) *httptest.ResponseRecorder {
query := make(url.Values)
query.Add(api.QueryVersionID, version)
w, r := prepareTestFullRequest(hc, bktName, objName, query, nil)
hc.Handler().GetObjectHandler(w, r)
return w
}

View file

@ -272,10 +272,16 @@ func existInMockedFrostFS(tc *handlerContext, bktInfo *data.BucketInfo, objInfo
p := &layer.GetObjectParams{ p := &layer.GetObjectParams{
BucketInfo: bktInfo, BucketInfo: bktInfo,
ObjectInfo: objInfo, ObjectInfo: objInfo,
Writer: io.Discard,
} }
return tc.Layer().GetObject(tc.Context(), p) == nil objPayload, err := tc.Layer().GetObject(tc.Context(), p)
if err != nil {
return false
}
_, err = io.ReadAll(objPayload)
require.NoError(tc.t, err)
return true
} }
func listOIDsFromMockedFrostFS(t *testing.T, tc *handlerContext, bktName string) []oid.ID { func listOIDsFromMockedFrostFS(t *testing.T, tc *handlerContext, bktName string) []oid.ID {

View file

@ -1,7 +1,7 @@
package handler package handler
import ( import (
"bytes" "io"
"net/http" "net/http"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
@ -84,18 +84,26 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
if len(info.ContentType) == 0 { if len(info.ContentType) == 0 {
if info.ContentType = layer.MimeByFilePath(info.Name); len(info.ContentType) == 0 { if info.ContentType = layer.MimeByFilePath(info.Name); len(info.ContentType) == 0 {
buffer := bytes.NewBuffer(make([]byte, 0, sizeToDetectType))
getParams := &layer.GetObjectParams{ getParams := &layer.GetObjectParams{
ObjectInfo: info, ObjectInfo: info,
Writer: buffer, Versioned: p.Versioned(),
Range: getRangeToDetectContentType(info.Size), Range: getRangeToDetectContentType(info.Size),
BucketInfo: bktInfo, BucketInfo: bktInfo,
} }
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
objPayload, err := h.obj.GetObject(r.Context(), getParams)
if err != nil {
h.logAndSendError(w, "could not get object", reqInfo, err, zap.Stringer("oid", info.ID)) h.logAndSendError(w, "could not get object", reqInfo, err, zap.Stringer("oid", info.ID))
return return
} }
info.ContentType = http.DetectContentType(buffer.Bytes())
buffer, err := io.ReadAll(objPayload)
if err != nil {
h.logAndSendError(w, "could not partly read payload to detect content type", reqInfo, err, zap.Stringer("oid", info.ID))
return
}
info.ContentType = http.DetectContentType(buffer)
} }
} }

View file

@ -320,11 +320,13 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
return return
} }
srcInfo, err := h.obj.GetObjectInfo(ctx, &layer.HeadObjectParams{ headPrm := &layer.HeadObjectParams{
BktInfo: srcBktInfo, BktInfo: srcBktInfo,
Object: srcObject, Object: srcObject,
VersionID: versionID, VersionID: versionID,
}) }
srcInfo, err := h.obj.GetObjectInfo(ctx, headPrm)
if err != nil { if err != nil {
if errors.IsS3Error(err, errors.ErrNoSuchKey) && versionID != "" { if errors.IsS3Error(err, errors.ErrNoSuchKey) && versionID != "" {
h.logAndSendError(w, "could not head source object version", reqInfo, h.logAndSendError(w, "could not head source object version", reqInfo,
@ -349,6 +351,7 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
} }
p := &layer.UploadCopyParams{ p := &layer.UploadCopyParams{
Versioned: headPrm.Versioned(),
Info: &layer.UploadInfoParams{ Info: &layer.UploadInfoParams{
UploadID: uploadID, UploadID: uploadID,
Bkt: bktInfo, Bkt: bktInfo,

View file

@ -74,7 +74,7 @@ type (
Range *RangeParams Range *RangeParams
ObjectInfo *data.ObjectInfo ObjectInfo *data.ObjectInfo
BucketInfo *data.BucketInfo BucketInfo *data.BucketInfo
Writer io.Writer Versioned bool
Encryption encryption.Params Encryption encryption.Params
} }
@ -132,6 +132,7 @@ type (
// CopyObjectParams stores object copy request parameters. // CopyObjectParams stores object copy request parameters.
CopyObjectParams struct { CopyObjectParams struct {
SrcVersioned bool
SrcObject *data.ObjectInfo SrcObject *data.ObjectInfo
ScrBktInfo *data.BucketInfo ScrBktInfo *data.BucketInfo
DstBktInfo *data.BucketInfo DstBktInfo *data.BucketInfo
@ -185,6 +186,13 @@ type (
Error error Error error
} }
ObjectPayload struct {
r io.Reader
params getParams
encrypted bool
decryptedLen uint64
}
// Client provides S3 API client interface. // Client provides S3 API client interface.
Client interface { Client interface {
Initialize(ctx context.Context, c EventListener) error Initialize(ctx context.Context, c EventListener) error
@ -204,7 +212,7 @@ type (
CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error) CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error)
DeleteBucket(ctx context.Context, p *DeleteBucketParams) error DeleteBucket(ctx context.Context, p *DeleteBucketParams) error
GetObject(ctx context.Context, p *GetObjectParams) error GetObject(ctx context.Context, p *GetObjectParams) (*ObjectPayload, error)
GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ObjectInfo, error) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ObjectInfo, error)
GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error)
@ -268,6 +276,10 @@ func (f MsgHandlerFunc) HandleMessage(ctx context.Context, msg *nats.Msg) error
return f(ctx, msg) return f(ctx, msg)
} }
func (p HeadObjectParams) Versioned() bool {
return len(p.VersionID) > 0
}
// NewLayer creates an instance of a layer. It checks credentials // NewLayer creates an instance of a layer. It checks credentials
// and establishes gRPC connection with the node. // and establishes gRPC connection with the node.
func NewLayer(log *zap.Logger, frostFS FrostFS, config *Config) Client { func NewLayer(log *zap.Logger, frostFS FrostFS, config *Config) Client {
@ -395,7 +407,7 @@ func (n *layer) ListBuckets(ctx context.Context) ([]*data.BucketInfo, error) {
} }
// GetObject from storage. // GetObject from storage.
func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error { func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) (*ObjectPayload, error) {
var params getParams var params getParams
params.oid = p.ObjectInfo.ID params.oid = p.ObjectInfo.ID
@ -406,7 +418,7 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
var err error var err error
decReader, err = getDecrypter(p) decReader, err = getDecrypter(p)
if err != nil { if err != nil {
return fmt.Errorf("creating decrypter: %w", err) return nil, fmt.Errorf("creating decrypter: %w", err)
} }
params.off = decReader.EncryptedOffset() params.off = decReader.EncryptedOffset()
params.ln = decReader.EncryptedLength() params.ln = decReader.EncryptedLength()
@ -420,32 +432,58 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
} }
} }
payload, err := n.initObjectPayloadReader(ctx, params) r, err := n.initObjectPayloadReader(ctx, params)
if err != nil { if err != nil {
return fmt.Errorf("init object payload reader: %w", err) if client.IsErrObjectNotFound(err) {
if p.Versioned {
err = fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchVersion), err.Error())
} else {
err = fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchKey), err.Error())
}
} }
return nil, fmt.Errorf("init object payload reader: %w", err)
}
var decryptedLen uint64
if decReader != nil {
if err = decReader.SetReader(r); err != nil {
return nil, fmt.Errorf("set reader to decrypter: %w", err)
}
r = io.LimitReader(decReader, int64(decReader.DecryptedLength()))
decryptedLen = decReader.DecryptedLength()
}
return &ObjectPayload{
r: r,
params: params,
encrypted: decReader != nil,
decryptedLen: decryptedLen,
}, nil
}
// Read implements io.Reader. If you want to use ObjectPayload as io.Reader
// you must not use ObjectPayload.StreamTo method and vice versa.
func (o *ObjectPayload) Read(p []byte) (int, error) {
return o.r.Read(p)
}
// StreamTo reads all payload to provided writer.
// If you want to use this method you must not use ObjectPayload.Read and vice versa.
func (o *ObjectPayload) StreamTo(w io.Writer) error {
bufSize := uint64(32 * 1024) // configure? bufSize := uint64(32 * 1024) // configure?
if params.ln != 0 && params.ln < bufSize { if o.params.ln != 0 && o.params.ln < bufSize {
bufSize = params.ln bufSize = o.params.ln
} }
// alloc buffer for copying // alloc buffer for copying
buf := make([]byte, bufSize) // sync-pool it? buf := make([]byte, bufSize) // sync-pool it?
r := payload
if decReader != nil {
if err = decReader.SetReader(payload); err != nil {
return fmt.Errorf("set reader to decrypter: %w", err)
}
r = io.LimitReader(decReader, int64(decReader.DecryptedLength()))
}
// copy full payload // copy full payload
written, err := io.CopyBuffer(p.Writer, r, buf) written, err := io.CopyBuffer(w, o.r, buf)
if err != nil { if err != nil {
if decReader != nil { if o.encrypted {
return fmt.Errorf("copy object payload written: '%d', decLength: '%d', params.ln: '%d' : %w", written, decReader.DecryptedLength(), params.ln, err) return fmt.Errorf("copy object payload written: '%d', decLength: '%d', params.ln: '%d' : %w", written, o.decryptedLen, o.params.ln, err)
} }
return fmt.Errorf("copy object payload written: '%d': %w", written, err) return fmt.Errorf("copy object payload written: '%d': %w", written, err)
} }
@ -497,10 +535,10 @@ func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams)
var objInfo *data.ExtendedObjectInfo var objInfo *data.ExtendedObjectInfo
var err error var err error
if len(p.VersionID) == 0 { if p.Versioned() {
objInfo, err = n.headLastVersionIfNotDeleted(ctx, p.BktInfo, p.Object)
} else {
objInfo, err = n.headVersion(ctx, p.BktInfo, p) objInfo, err = n.headVersion(ctx, p.BktInfo, p)
} else {
objInfo, err = n.headLastVersionIfNotDeleted(ctx, p.BktInfo, p.Object)
} }
if err != nil { if err != nil {
return nil, err return nil, err
@ -515,27 +553,22 @@ func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams)
// CopyObject from one bucket into another bucket. // CopyObject from one bucket into another bucket.
func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ExtendedObjectInfo, error) { func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ExtendedObjectInfo, error) {
pr, pw := io.Pipe() objPayload, err := n.GetObject(ctx, &GetObjectParams{
go func() {
err := n.GetObject(ctx, &GetObjectParams{
ObjectInfo: p.SrcObject, ObjectInfo: p.SrcObject,
Writer: pw, Versioned: p.SrcVersioned,
Range: p.Range, Range: p.Range,
BucketInfo: p.ScrBktInfo, BucketInfo: p.ScrBktInfo,
Encryption: p.Encryption, Encryption: p.Encryption,
}) })
if err != nil {
if err = pw.CloseWithError(err); err != nil { return nil, fmt.Errorf("get object to copy: %w", err)
n.reqLogger(ctx).Error("could not get object", zap.Error(err))
} }
}()
return n.PutObject(ctx, &PutObjectParams{ return n.PutObject(ctx, &PutObjectParams{
BktInfo: p.DstBktInfo, BktInfo: p.DstBktInfo,
Object: p.DstObject, Object: p.DstObject,
Size: p.SrcSize, Size: p.SrcSize,
Reader: pr, Reader: objPayload,
Header: p.Header, Header: p.Header,
Encryption: p.Encryption, Encryption: p.Encryption,
CopiesNumbers: p.CopiesNumbers, CopiesNumbers: p.CopiesNumbers,

View file

@ -64,6 +64,7 @@ type (
} }
UploadCopyParams struct { UploadCopyParams struct {
Versioned bool
Info *UploadInfoParams Info *UploadInfoParams
SrcObjInfo *data.ObjectInfo SrcObjInfo *data.ObjectInfo
SrcBktInfo *data.BucketInfo SrcBktInfo *data.BucketInfo
@ -292,26 +293,21 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
return nil, fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooLarge), size, uploadMaxSize) return nil, fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooLarge), size, uploadMaxSize)
} }
pr, pw := io.Pipe() objPayload, err := n.GetObject(ctx, &GetObjectParams{
go func() {
err = n.GetObject(ctx, &GetObjectParams{
ObjectInfo: p.SrcObjInfo, ObjectInfo: p.SrcObjInfo,
Writer: pw, Versioned: p.Versioned,
Range: p.Range, Range: p.Range,
BucketInfo: p.SrcBktInfo, BucketInfo: p.SrcBktInfo,
}) })
if err != nil {
if err = pw.CloseWithError(err); err != nil { return nil, fmt.Errorf("get object to upload copy: %w", err)
n.reqLogger(ctx).Error("could not get object", zap.Error(err))
} }
}()
params := &UploadPartParams{ params := &UploadPartParams{
Info: p.Info, Info: p.Info,
PartNumber: p.PartNumber, PartNumber: p.PartNumber,
Size: size, Size: size,
Reader: pr, Reader: objPayload,
} }
return n.uploadPart(ctx, multipartInfo, params) return n.uploadPart(ctx, multipartInfo, params)

View file

@ -3,6 +3,7 @@ package layer
import ( import (
"bytes" "bytes"
"context" "context"
"io"
"testing" "testing"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
@ -31,26 +32,29 @@ func (tc *testContext) putObject(content []byte) *data.ObjectInfo {
} }
func (tc *testContext) getObject(objectName, versionID string, needError bool) (*data.ObjectInfo, []byte) { func (tc *testContext) getObject(objectName, versionID string, needError bool) (*data.ObjectInfo, []byte) {
objInfo, err := tc.layer.GetObjectInfo(tc.ctx, &HeadObjectParams{ headPrm := &HeadObjectParams{
BktInfo: tc.bktInfo, BktInfo: tc.bktInfo,
Object: objectName, Object: objectName,
VersionID: versionID, VersionID: versionID,
}) }
objInfo, err := tc.layer.GetObjectInfo(tc.ctx, headPrm)
if needError { if needError {
require.Error(tc.t, err) require.Error(tc.t, err)
return nil, nil return nil, nil
} }
require.NoError(tc.t, err) require.NoError(tc.t, err)
content := bytes.NewBuffer(nil) objPayload, err := tc.layer.GetObject(tc.ctx, &GetObjectParams{
err = tc.layer.GetObject(tc.ctx, &GetObjectParams{
ObjectInfo: objInfo, ObjectInfo: objInfo,
Writer: content, Versioned: headPrm.Versioned(),
BucketInfo: tc.bktInfo, BucketInfo: tc.bktInfo,
}) })
require.NoError(tc.t, err) require.NoError(tc.t, err)
return objInfo, content.Bytes() payload, err := io.ReadAll(objPayload)
require.NoError(tc.t, err)
return objInfo, payload
} }
func (tc *testContext) deleteObject(objectName, versionID string, settings *data.BucketSettings) { func (tc *testContext) deleteObject(objectName, versionID string, settings *data.BucketSettings) {