[#158] Separate init object reader from read itself

To be able to handle cases and return appropriate http status code
when object missed in storage but gate cache contains its metadata
we need write code after init object reader.
So we separate init reader from actual reading.

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2023-07-06 16:37:53 +03:00
parent fc90981c03
commit 14ef9ff091
10 changed files with 182 additions and 84 deletions

View file

@ -12,6 +12,7 @@ This document outlines major changes between releases.
- Handle negative `Content-Length` on put (#125)
- Use `DisableURIPathEscaping` to presign urls (#125)
- Use specific s3 errors instead of `InternalError` where possible (#143)
- Return appropriate 404 code when object missed in storage but there is in gate cache (#158)
### Added
- Implement chunk uploading (#106)

View file

@ -170,6 +170,7 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) {
}
params := &layer.CopyObjectParams{
SrcVersioned: srcObjPrm.Versioned(),
SrcObject: srcObjInfo,
ScrBktInfo: srcObjPrm.BktInfo,
DstBktInfo: dstBktInfo,

View file

@ -202,6 +202,20 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
return
}
getPayloadParams := &layer.GetObjectParams{
ObjectInfo: info,
Versioned: p.Versioned(),
Range: params,
BucketInfo: bktInfo,
Encryption: encryptionParams,
}
objPayload, err := h.obj.GetObject(r.Context(), getPayloadParams)
if err != nil {
h.logAndSendError(w, "could not get object payload", reqInfo, err)
return
}
writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned())
if params != nil {
writeRangeHeaders(w, params, info.Size)
@ -209,15 +223,9 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
getParams := &layer.GetObjectParams{
ObjectInfo: info,
Writer: w,
Range: params,
BucketInfo: bktInfo,
Encryption: encryptionParams,
}
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
h.logAndSendError(w, "could not get object", reqInfo, err)
if err = objPayload.StreamTo(w); err != nil {
h.logAndSendError(w, "could not stream object payload", reqInfo, err)
return
}
}

View file

@ -5,12 +5,18 @@ import (
"fmt"
"io"
"net/http"
"net/http/httptest"
"net/url"
"testing"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
apiErrors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"github.com/stretchr/testify/require"
)
@ -170,6 +176,24 @@ func TestGetRange(t *testing.T) {
require.Equal(t, "bcdef", string(end))
}
func TestGetObject(t *testing.T) {
hc := prepareHandlerContext(t)
bktName, objName := "bucket", "obj"
bktInfo, objInfo := createVersionedBucketAndObject(hc.t, hc, bktName, objName)
putObject(hc.t, hc, bktName, objName)
checkFound(hc.t, hc, bktName, objName, objInfo.VersionID())
checkFound(hc.t, hc, bktName, objName, emptyVersion)
addr := getAddressOfLastVersion(hc, bktInfo, objName)
hc.tp.SetObjectError(addr, apistatus.ObjectNotFound{})
hc.tp.SetObjectError(objInfo.Address(), apistatus.ObjectNotFound{})
getObjectAssertS3Error(hc, bktName, objName, objInfo.VersionID(), s3errors.ErrNoSuchVersion)
getObjectAssertS3Error(hc, bktName, objName, emptyVersion, s3errors.ErrNoSuchKey)
}
func putObjectContent(hc *handlerContext, bktName, objName, content string) {
body := bytes.NewReader([]byte(content))
w, r := prepareTestPayloadRequest(hc, bktName, objName, body)
@ -186,3 +210,17 @@ func getObjectRange(t *testing.T, tc *handlerContext, bktName, objName string, s
require.NoError(t, err)
return content
}
func getObjectAssertS3Error(hc *handlerContext, bktName, objName, version string, code apiErrors.ErrorCode) {
w := getObjectBase(hc, bktName, objName, version)
assertS3Error(hc.t, w, apiErrors.GetAPIError(code))
}
func getObjectBase(hc *handlerContext, bktName, objName, version string) *httptest.ResponseRecorder {
query := make(url.Values)
query.Add(api.QueryVersionID, version)
w, r := prepareTestFullRequest(hc, bktName, objName, query, nil)
hc.Handler().GetObjectHandler(w, r)
return w
}

View file

@ -272,10 +272,16 @@ func existInMockedFrostFS(tc *handlerContext, bktInfo *data.BucketInfo, objInfo
p := &layer.GetObjectParams{
BucketInfo: bktInfo,
ObjectInfo: objInfo,
Writer: io.Discard,
}
return tc.Layer().GetObject(tc.Context(), p) == nil
objPayload, err := tc.Layer().GetObject(tc.Context(), p)
if err != nil {
return false
}
_, err = io.ReadAll(objPayload)
require.NoError(tc.t, err)
return true
}
func listOIDsFromMockedFrostFS(t *testing.T, tc *handlerContext, bktName string) []oid.ID {

View file

@ -1,7 +1,7 @@
package handler
import (
"bytes"
"io"
"net/http"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api"
@ -84,18 +84,26 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) {
if len(info.ContentType) == 0 {
if info.ContentType = layer.MimeByFilePath(info.Name); len(info.ContentType) == 0 {
buffer := bytes.NewBuffer(make([]byte, 0, sizeToDetectType))
getParams := &layer.GetObjectParams{
ObjectInfo: info,
Writer: buffer,
Versioned: p.Versioned(),
Range: getRangeToDetectContentType(info.Size),
BucketInfo: bktInfo,
}
if err = h.obj.GetObject(r.Context(), getParams); err != nil {
objPayload, err := h.obj.GetObject(r.Context(), getParams)
if err != nil {
h.logAndSendError(w, "could not get object", reqInfo, err, zap.Stringer("oid", info.ID))
return
}
info.ContentType = http.DetectContentType(buffer.Bytes())
buffer, err := io.ReadAll(objPayload)
if err != nil {
h.logAndSendError(w, "could not partly read payload to detect content type", reqInfo, err, zap.Stringer("oid", info.ID))
return
}
info.ContentType = http.DetectContentType(buffer)
}
}

View file

@ -320,11 +320,13 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
return
}
srcInfo, err := h.obj.GetObjectInfo(ctx, &layer.HeadObjectParams{
headPrm := &layer.HeadObjectParams{
BktInfo: srcBktInfo,
Object: srcObject,
VersionID: versionID,
})
}
srcInfo, err := h.obj.GetObjectInfo(ctx, headPrm)
if err != nil {
if errors.IsS3Error(err, errors.ErrNoSuchKey) && versionID != "" {
h.logAndSendError(w, "could not head source object version", reqInfo,
@ -349,6 +351,7 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) {
}
p := &layer.UploadCopyParams{
Versioned: headPrm.Versioned(),
Info: &layer.UploadInfoParams{
UploadID: uploadID,
Bkt: bktInfo,

View file

@ -74,7 +74,7 @@ type (
Range *RangeParams
ObjectInfo *data.ObjectInfo
BucketInfo *data.BucketInfo
Writer io.Writer
Versioned bool
Encryption encryption.Params
}
@ -132,6 +132,7 @@ type (
// CopyObjectParams stores object copy request parameters.
CopyObjectParams struct {
SrcVersioned bool
SrcObject *data.ObjectInfo
ScrBktInfo *data.BucketInfo
DstBktInfo *data.BucketInfo
@ -185,6 +186,13 @@ type (
Error error
}
ObjectPayload struct {
r io.Reader
params getParams
encrypted bool
decryptedLen uint64
}
// Client provides S3 API client interface.
Client interface {
Initialize(ctx context.Context, c EventListener) error
@ -204,7 +212,7 @@ type (
CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error)
DeleteBucket(ctx context.Context, p *DeleteBucketParams) error
GetObject(ctx context.Context, p *GetObjectParams) error
GetObject(ctx context.Context, p *GetObjectParams) (*ObjectPayload, error)
GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ObjectInfo, error)
GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error)
@ -268,6 +276,10 @@ func (f MsgHandlerFunc) HandleMessage(ctx context.Context, msg *nats.Msg) error
return f(ctx, msg)
}
func (p HeadObjectParams) Versioned() bool {
return len(p.VersionID) > 0
}
// NewLayer creates an instance of a layer. It checks credentials
// and establishes gRPC connection with the node.
func NewLayer(log *zap.Logger, frostFS FrostFS, config *Config) Client {
@ -395,7 +407,7 @@ func (n *layer) ListBuckets(ctx context.Context) ([]*data.BucketInfo, error) {
}
// GetObject from storage.
func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) (*ObjectPayload, error) {
var params getParams
params.oid = p.ObjectInfo.ID
@ -406,7 +418,7 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
var err error
decReader, err = getDecrypter(p)
if err != nil {
return fmt.Errorf("creating decrypter: %w", err)
return nil, fmt.Errorf("creating decrypter: %w", err)
}
params.off = decReader.EncryptedOffset()
params.ln = decReader.EncryptedLength()
@ -420,32 +432,58 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error {
}
}
payload, err := n.initObjectPayloadReader(ctx, params)
r, err := n.initObjectPayloadReader(ctx, params)
if err != nil {
return fmt.Errorf("init object payload reader: %w", err)
if client.IsErrObjectNotFound(err) {
if p.Versioned {
err = fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchVersion), err.Error())
} else {
err = fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchKey), err.Error())
}
}
return nil, fmt.Errorf("init object payload reader: %w", err)
}
var decryptedLen uint64
if decReader != nil {
if err = decReader.SetReader(r); err != nil {
return nil, fmt.Errorf("set reader to decrypter: %w", err)
}
r = io.LimitReader(decReader, int64(decReader.DecryptedLength()))
decryptedLen = decReader.DecryptedLength()
}
return &ObjectPayload{
r: r,
params: params,
encrypted: decReader != nil,
decryptedLen: decryptedLen,
}, nil
}
// Read implements io.Reader. If you want to use ObjectPayload as io.Reader
// you must not use ObjectPayload.StreamTo method and vice versa.
func (o *ObjectPayload) Read(p []byte) (int, error) {
return o.r.Read(p)
}
// StreamTo reads all payload to provided writer.
// If you want to use this method you must not use ObjectPayload.Read and vice versa.
func (o *ObjectPayload) StreamTo(w io.Writer) error {
bufSize := uint64(32 * 1024) // configure?
if params.ln != 0 && params.ln < bufSize {
bufSize = params.ln
if o.params.ln != 0 && o.params.ln < bufSize {
bufSize = o.params.ln
}
// alloc buffer for copying
buf := make([]byte, bufSize) // sync-pool it?
r := payload
if decReader != nil {
if err = decReader.SetReader(payload); err != nil {
return fmt.Errorf("set reader to decrypter: %w", err)
}
r = io.LimitReader(decReader, int64(decReader.DecryptedLength()))
}
// copy full payload
written, err := io.CopyBuffer(p.Writer, r, buf)
written, err := io.CopyBuffer(w, o.r, buf)
if err != nil {
if decReader != nil {
return fmt.Errorf("copy object payload written: '%d', decLength: '%d', params.ln: '%d' : %w", written, decReader.DecryptedLength(), params.ln, err)
if o.encrypted {
return fmt.Errorf("copy object payload written: '%d', decLength: '%d', params.ln: '%d' : %w", written, o.decryptedLen, o.params.ln, err)
}
return fmt.Errorf("copy object payload written: '%d': %w", written, err)
}
@ -497,10 +535,10 @@ func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams)
var objInfo *data.ExtendedObjectInfo
var err error
if len(p.VersionID) == 0 {
objInfo, err = n.headLastVersionIfNotDeleted(ctx, p.BktInfo, p.Object)
} else {
if p.Versioned() {
objInfo, err = n.headVersion(ctx, p.BktInfo, p)
} else {
objInfo, err = n.headLastVersionIfNotDeleted(ctx, p.BktInfo, p.Object)
}
if err != nil {
return nil, err
@ -515,27 +553,22 @@ func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams)
// CopyObject from one bucket into another bucket.
func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ExtendedObjectInfo, error) {
pr, pw := io.Pipe()
go func() {
err := n.GetObject(ctx, &GetObjectParams{
objPayload, err := n.GetObject(ctx, &GetObjectParams{
ObjectInfo: p.SrcObject,
Writer: pw,
Versioned: p.SrcVersioned,
Range: p.Range,
BucketInfo: p.ScrBktInfo,
Encryption: p.Encryption,
})
if err = pw.CloseWithError(err); err != nil {
n.reqLogger(ctx).Error("could not get object", zap.Error(err))
if err != nil {
return nil, fmt.Errorf("get object to copy: %w", err)
}
}()
return n.PutObject(ctx, &PutObjectParams{
BktInfo: p.DstBktInfo,
Object: p.DstObject,
Size: p.SrcSize,
Reader: pr,
Reader: objPayload,
Header: p.Header,
Encryption: p.Encryption,
CopiesNumbers: p.CopiesNumbers,

View file

@ -64,6 +64,7 @@ type (
}
UploadCopyParams struct {
Versioned bool
Info *UploadInfoParams
SrcObjInfo *data.ObjectInfo
SrcBktInfo *data.BucketInfo
@ -292,26 +293,21 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
return nil, fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooLarge), size, uploadMaxSize)
}
pr, pw := io.Pipe()
go func() {
err = n.GetObject(ctx, &GetObjectParams{
objPayload, err := n.GetObject(ctx, &GetObjectParams{
ObjectInfo: p.SrcObjInfo,
Writer: pw,
Versioned: p.Versioned,
Range: p.Range,
BucketInfo: p.SrcBktInfo,
})
if err = pw.CloseWithError(err); err != nil {
n.reqLogger(ctx).Error("could not get object", zap.Error(err))
if err != nil {
return nil, fmt.Errorf("get object to upload copy: %w", err)
}
}()
params := &UploadPartParams{
Info: p.Info,
PartNumber: p.PartNumber,
Size: size,
Reader: pr,
Reader: objPayload,
}
return n.uploadPart(ctx, multipartInfo, params)

View file

@ -3,6 +3,7 @@ package layer
import (
"bytes"
"context"
"io"
"testing"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data"
@ -31,26 +32,29 @@ func (tc *testContext) putObject(content []byte) *data.ObjectInfo {
}
func (tc *testContext) getObject(objectName, versionID string, needError bool) (*data.ObjectInfo, []byte) {
objInfo, err := tc.layer.GetObjectInfo(tc.ctx, &HeadObjectParams{
headPrm := &HeadObjectParams{
BktInfo: tc.bktInfo,
Object: objectName,
VersionID: versionID,
})
}
objInfo, err := tc.layer.GetObjectInfo(tc.ctx, headPrm)
if needError {
require.Error(tc.t, err)
return nil, nil
}
require.NoError(tc.t, err)
content := bytes.NewBuffer(nil)
err = tc.layer.GetObject(tc.ctx, &GetObjectParams{
objPayload, err := tc.layer.GetObject(tc.ctx, &GetObjectParams{
ObjectInfo: objInfo,
Writer: content,
Versioned: headPrm.Versioned(),
BucketInfo: tc.bktInfo,
})
require.NoError(tc.t, err)
return objInfo, content.Bytes()
payload, err := io.ReadAll(objPayload)
require.NoError(tc.t, err)
return objInfo, payload
}
func (tc *testContext) deleteObject(objectName, versionID string, settings *data.BucketSettings) {