From 14ef9ff091829487882d6f774e8e931399f8b2cb Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Thu, 6 Jul 2023 16:37:53 +0300 Subject: [PATCH] [#158] Separate init object reader from read itself To be able to handle cases and return appropriate http status code when object missed in storage but gate cache contains its metadata we need write code after init object reader. So we separate init reader from actual reading. Signed-off-by: Denis Kirillov --- CHANGELOG.md | 1 + api/handler/copy.go | 15 +++-- api/handler/get.go | 26 +++++--- api/handler/get_test.go | 38 +++++++++++ api/handler/handlers_test.go | 10 ++- api/handler/head.go | 18 ++++-- api/handler/multipart_upload.go | 7 +- api/layer/layer.go | 109 +++++++++++++++++++++----------- api/layer/multipart_upload.go | 26 ++++---- api/layer/versioning_test.go | 16 +++-- 10 files changed, 182 insertions(+), 84 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a1e94bb..9c3f4c9b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ This document outlines major changes between releases. - Handle negative `Content-Length` on put (#125) - Use `DisableURIPathEscaping` to presign urls (#125) - Use specific s3 errors instead of `InternalError` where possible (#143) +- Return appropriate 404 code when object missed in storage but there is in gate cache (#158) ### Added - Implement chunk uploading (#106) diff --git a/api/handler/copy.go b/api/handler/copy.go index 494134c5..21e277bd 100644 --- a/api/handler/copy.go +++ b/api/handler/copy.go @@ -170,13 +170,14 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) { } params := &layer.CopyObjectParams{ - SrcObject: srcObjInfo, - ScrBktInfo: srcObjPrm.BktInfo, - DstBktInfo: dstBktInfo, - DstObject: reqInfo.ObjectName, - SrcSize: srcObjInfo.Size, - Header: metadata, - Encryption: encryptionParams, + SrcVersioned: srcObjPrm.Versioned(), + SrcObject: srcObjInfo, + ScrBktInfo: srcObjPrm.BktInfo, + DstBktInfo: dstBktInfo, + DstObject: reqInfo.ObjectName, + SrcSize: srcObjInfo.Size, + Header: metadata, + Encryption: encryptionParams, } params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, dstBktInfo.LocationConstraint) diff --git a/api/handler/get.go b/api/handler/get.go index 1605dce8..615f7f05 100644 --- a/api/handler/get.go +++ b/api/handler/get.go @@ -202,6 +202,20 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) { return } + getPayloadParams := &layer.GetObjectParams{ + ObjectInfo: info, + Versioned: p.Versioned(), + Range: params, + BucketInfo: bktInfo, + Encryption: encryptionParams, + } + + objPayload, err := h.obj.GetObject(r.Context(), getPayloadParams) + if err != nil { + h.logAndSendError(w, "could not get object payload", reqInfo, err) + return + } + writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned()) if params != nil { writeRangeHeaders(w, params, info.Size) @@ -209,15 +223,9 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } - getParams := &layer.GetObjectParams{ - ObjectInfo: info, - Writer: w, - Range: params, - BucketInfo: bktInfo, - Encryption: encryptionParams, - } - if err = h.obj.GetObject(r.Context(), getParams); err != nil { - h.logAndSendError(w, "could not get object", reqInfo, err) + if err = objPayload.StreamTo(w); err != nil { + h.logAndSendError(w, "could not stream object payload", reqInfo, err) + return } } diff --git a/api/handler/get_test.go b/api/handler/get_test.go index 244a9f59..22c7784a 100644 --- a/api/handler/get_test.go +++ b/api/handler/get_test.go @@ -5,12 +5,18 @@ import ( "fmt" "io" "net/http" + "net/http/httptest" + "net/url" "testing" "time" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" + apiErrors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" + s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "github.com/stretchr/testify/require" ) @@ -170,6 +176,24 @@ func TestGetRange(t *testing.T) { require.Equal(t, "bcdef", string(end)) } +func TestGetObject(t *testing.T) { + hc := prepareHandlerContext(t) + bktName, objName := "bucket", "obj" + bktInfo, objInfo := createVersionedBucketAndObject(hc.t, hc, bktName, objName) + + putObject(hc.t, hc, bktName, objName) + + checkFound(hc.t, hc, bktName, objName, objInfo.VersionID()) + checkFound(hc.t, hc, bktName, objName, emptyVersion) + + addr := getAddressOfLastVersion(hc, bktInfo, objName) + hc.tp.SetObjectError(addr, apistatus.ObjectNotFound{}) + hc.tp.SetObjectError(objInfo.Address(), apistatus.ObjectNotFound{}) + + getObjectAssertS3Error(hc, bktName, objName, objInfo.VersionID(), s3errors.ErrNoSuchVersion) + getObjectAssertS3Error(hc, bktName, objName, emptyVersion, s3errors.ErrNoSuchKey) +} + func putObjectContent(hc *handlerContext, bktName, objName, content string) { body := bytes.NewReader([]byte(content)) w, r := prepareTestPayloadRequest(hc, bktName, objName, body) @@ -186,3 +210,17 @@ func getObjectRange(t *testing.T, tc *handlerContext, bktName, objName string, s require.NoError(t, err) return content } + +func getObjectAssertS3Error(hc *handlerContext, bktName, objName, version string, code apiErrors.ErrorCode) { + w := getObjectBase(hc, bktName, objName, version) + assertS3Error(hc.t, w, apiErrors.GetAPIError(code)) +} + +func getObjectBase(hc *handlerContext, bktName, objName, version string) *httptest.ResponseRecorder { + query := make(url.Values) + query.Add(api.QueryVersionID, version) + + w, r := prepareTestFullRequest(hc, bktName, objName, query, nil) + hc.Handler().GetObjectHandler(w, r) + return w +} diff --git a/api/handler/handlers_test.go b/api/handler/handlers_test.go index c394918b..f700cf5e 100644 --- a/api/handler/handlers_test.go +++ b/api/handler/handlers_test.go @@ -272,10 +272,16 @@ func existInMockedFrostFS(tc *handlerContext, bktInfo *data.BucketInfo, objInfo p := &layer.GetObjectParams{ BucketInfo: bktInfo, ObjectInfo: objInfo, - Writer: io.Discard, } - return tc.Layer().GetObject(tc.Context(), p) == nil + objPayload, err := tc.Layer().GetObject(tc.Context(), p) + if err != nil { + return false + } + + _, err = io.ReadAll(objPayload) + require.NoError(tc.t, err) + return true } func listOIDsFromMockedFrostFS(t *testing.T, tc *handlerContext, bktName string) []oid.ID { diff --git a/api/handler/head.go b/api/handler/head.go index 1152ea8c..a2f1f143 100644 --- a/api/handler/head.go +++ b/api/handler/head.go @@ -1,7 +1,7 @@ package handler import ( - "bytes" + "io" "net/http" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" @@ -84,18 +84,26 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { if len(info.ContentType) == 0 { if info.ContentType = layer.MimeByFilePath(info.Name); len(info.ContentType) == 0 { - buffer := bytes.NewBuffer(make([]byte, 0, sizeToDetectType)) getParams := &layer.GetObjectParams{ ObjectInfo: info, - Writer: buffer, + Versioned: p.Versioned(), Range: getRangeToDetectContentType(info.Size), BucketInfo: bktInfo, } - if err = h.obj.GetObject(r.Context(), getParams); err != nil { + + objPayload, err := h.obj.GetObject(r.Context(), getParams) + if err != nil { h.logAndSendError(w, "could not get object", reqInfo, err, zap.Stringer("oid", info.ID)) return } - info.ContentType = http.DetectContentType(buffer.Bytes()) + + buffer, err := io.ReadAll(objPayload) + if err != nil { + h.logAndSendError(w, "could not partly read payload to detect content type", reqInfo, err, zap.Stringer("oid", info.ID)) + return + } + + info.ContentType = http.DetectContentType(buffer) } } diff --git a/api/handler/multipart_upload.go b/api/handler/multipart_upload.go index 9fc82ee9..e97c46a7 100644 --- a/api/handler/multipart_upload.go +++ b/api/handler/multipart_upload.go @@ -320,11 +320,13 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) { return } - srcInfo, err := h.obj.GetObjectInfo(ctx, &layer.HeadObjectParams{ + headPrm := &layer.HeadObjectParams{ BktInfo: srcBktInfo, Object: srcObject, VersionID: versionID, - }) + } + + srcInfo, err := h.obj.GetObjectInfo(ctx, headPrm) if err != nil { if errors.IsS3Error(err, errors.ErrNoSuchKey) && versionID != "" { h.logAndSendError(w, "could not head source object version", reqInfo, @@ -349,6 +351,7 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) { } p := &layer.UploadCopyParams{ + Versioned: headPrm.Versioned(), Info: &layer.UploadInfoParams{ UploadID: uploadID, Bkt: bktInfo, diff --git a/api/layer/layer.go b/api/layer/layer.go index b28a182f..17cc6fe7 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -74,7 +74,7 @@ type ( Range *RangeParams ObjectInfo *data.ObjectInfo BucketInfo *data.BucketInfo - Writer io.Writer + Versioned bool Encryption encryption.Params } @@ -132,6 +132,7 @@ type ( // CopyObjectParams stores object copy request parameters. CopyObjectParams struct { + SrcVersioned bool SrcObject *data.ObjectInfo ScrBktInfo *data.BucketInfo DstBktInfo *data.BucketInfo @@ -185,6 +186,13 @@ type ( Error error } + ObjectPayload struct { + r io.Reader + params getParams + encrypted bool + decryptedLen uint64 + } + // Client provides S3 API client interface. Client interface { Initialize(ctx context.Context, c EventListener) error @@ -204,7 +212,7 @@ type ( CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error - GetObject(ctx context.Context, p *GetObjectParams) error + GetObject(ctx context.Context, p *GetObjectParams) (*ObjectPayload, error) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ObjectInfo, error) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error) @@ -268,6 +276,10 @@ func (f MsgHandlerFunc) HandleMessage(ctx context.Context, msg *nats.Msg) error return f(ctx, msg) } +func (p HeadObjectParams) Versioned() bool { + return len(p.VersionID) > 0 +} + // NewLayer creates an instance of a layer. It checks credentials // and establishes gRPC connection with the node. func NewLayer(log *zap.Logger, frostFS FrostFS, config *Config) Client { @@ -395,7 +407,7 @@ func (n *layer) ListBuckets(ctx context.Context) ([]*data.BucketInfo, error) { } // GetObject from storage. -func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error { +func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) (*ObjectPayload, error) { var params getParams params.oid = p.ObjectInfo.ID @@ -406,7 +418,7 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error { var err error decReader, err = getDecrypter(p) if err != nil { - return fmt.Errorf("creating decrypter: %w", err) + return nil, fmt.Errorf("creating decrypter: %w", err) } params.off = decReader.EncryptedOffset() params.ln = decReader.EncryptedLength() @@ -420,32 +432,58 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error { } } - payload, err := n.initObjectPayloadReader(ctx, params) + r, err := n.initObjectPayloadReader(ctx, params) if err != nil { - return fmt.Errorf("init object payload reader: %w", err) + if client.IsErrObjectNotFound(err) { + if p.Versioned { + err = fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchVersion), err.Error()) + } else { + err = fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchKey), err.Error()) + } + } + + return nil, fmt.Errorf("init object payload reader: %w", err) } + var decryptedLen uint64 + if decReader != nil { + if err = decReader.SetReader(r); err != nil { + return nil, fmt.Errorf("set reader to decrypter: %w", err) + } + r = io.LimitReader(decReader, int64(decReader.DecryptedLength())) + decryptedLen = decReader.DecryptedLength() + } + + return &ObjectPayload{ + r: r, + params: params, + encrypted: decReader != nil, + decryptedLen: decryptedLen, + }, nil +} + +// Read implements io.Reader. If you want to use ObjectPayload as io.Reader +// you must not use ObjectPayload.StreamTo method and vice versa. +func (o *ObjectPayload) Read(p []byte) (int, error) { + return o.r.Read(p) +} + +// StreamTo reads all payload to provided writer. +// If you want to use this method you must not use ObjectPayload.Read and vice versa. +func (o *ObjectPayload) StreamTo(w io.Writer) error { bufSize := uint64(32 * 1024) // configure? - if params.ln != 0 && params.ln < bufSize { - bufSize = params.ln + if o.params.ln != 0 && o.params.ln < bufSize { + bufSize = o.params.ln } // alloc buffer for copying buf := make([]byte, bufSize) // sync-pool it? - r := payload - if decReader != nil { - if err = decReader.SetReader(payload); err != nil { - return fmt.Errorf("set reader to decrypter: %w", err) - } - r = io.LimitReader(decReader, int64(decReader.DecryptedLength())) - } - // copy full payload - written, err := io.CopyBuffer(p.Writer, r, buf) + written, err := io.CopyBuffer(w, o.r, buf) if err != nil { - if decReader != nil { - return fmt.Errorf("copy object payload written: '%d', decLength: '%d', params.ln: '%d' : %w", written, decReader.DecryptedLength(), params.ln, err) + if o.encrypted { + return fmt.Errorf("copy object payload written: '%d', decLength: '%d', params.ln: '%d' : %w", written, o.decryptedLen, o.params.ln, err) } return fmt.Errorf("copy object payload written: '%d': %w", written, err) } @@ -497,10 +535,10 @@ func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) var objInfo *data.ExtendedObjectInfo var err error - if len(p.VersionID) == 0 { - objInfo, err = n.headLastVersionIfNotDeleted(ctx, p.BktInfo, p.Object) - } else { + if p.Versioned() { objInfo, err = n.headVersion(ctx, p.BktInfo, p) + } else { + objInfo, err = n.headLastVersionIfNotDeleted(ctx, p.BktInfo, p.Object) } if err != nil { return nil, err @@ -515,27 +553,22 @@ func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) // CopyObject from one bucket into another bucket. func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ExtendedObjectInfo, error) { - pr, pw := io.Pipe() - - go func() { - err := n.GetObject(ctx, &GetObjectParams{ - ObjectInfo: p.SrcObject, - Writer: pw, - Range: p.Range, - BucketInfo: p.ScrBktInfo, - Encryption: p.Encryption, - }) - - if err = pw.CloseWithError(err); err != nil { - n.reqLogger(ctx).Error("could not get object", zap.Error(err)) - } - }() + objPayload, err := n.GetObject(ctx, &GetObjectParams{ + ObjectInfo: p.SrcObject, + Versioned: p.SrcVersioned, + Range: p.Range, + BucketInfo: p.ScrBktInfo, + Encryption: p.Encryption, + }) + if err != nil { + return nil, fmt.Errorf("get object to copy: %w", err) + } return n.PutObject(ctx, &PutObjectParams{ BktInfo: p.DstBktInfo, Object: p.DstObject, Size: p.SrcSize, - Reader: pr, + Reader: objPayload, Header: p.Header, Encryption: p.Encryption, CopiesNumbers: p.CopiesNumbers, diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index b453269b..129efb55 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -64,6 +64,7 @@ type ( } UploadCopyParams struct { + Versioned bool Info *UploadInfoParams SrcObjInfo *data.ObjectInfo SrcBktInfo *data.BucketInfo @@ -292,26 +293,21 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data. return nil, fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooLarge), size, uploadMaxSize) } - pr, pw := io.Pipe() - - go func() { - err = n.GetObject(ctx, &GetObjectParams{ - ObjectInfo: p.SrcObjInfo, - Writer: pw, - Range: p.Range, - BucketInfo: p.SrcBktInfo, - }) - - if err = pw.CloseWithError(err); err != nil { - n.reqLogger(ctx).Error("could not get object", zap.Error(err)) - } - }() + objPayload, err := n.GetObject(ctx, &GetObjectParams{ + ObjectInfo: p.SrcObjInfo, + Versioned: p.Versioned, + Range: p.Range, + BucketInfo: p.SrcBktInfo, + }) + if err != nil { + return nil, fmt.Errorf("get object to upload copy: %w", err) + } params := &UploadPartParams{ Info: p.Info, PartNumber: p.PartNumber, Size: size, - Reader: pr, + Reader: objPayload, } return n.uploadPart(ctx, multipartInfo, params) diff --git a/api/layer/versioning_test.go b/api/layer/versioning_test.go index 884f477d..5b6e36f6 100644 --- a/api/layer/versioning_test.go +++ b/api/layer/versioning_test.go @@ -3,6 +3,7 @@ package layer import ( "bytes" "context" + "io" "testing" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" @@ -31,26 +32,29 @@ func (tc *testContext) putObject(content []byte) *data.ObjectInfo { } func (tc *testContext) getObject(objectName, versionID string, needError bool) (*data.ObjectInfo, []byte) { - objInfo, err := tc.layer.GetObjectInfo(tc.ctx, &HeadObjectParams{ + headPrm := &HeadObjectParams{ BktInfo: tc.bktInfo, Object: objectName, VersionID: versionID, - }) + } + objInfo, err := tc.layer.GetObjectInfo(tc.ctx, headPrm) if needError { require.Error(tc.t, err) return nil, nil } require.NoError(tc.t, err) - content := bytes.NewBuffer(nil) - err = tc.layer.GetObject(tc.ctx, &GetObjectParams{ + objPayload, err := tc.layer.GetObject(tc.ctx, &GetObjectParams{ ObjectInfo: objInfo, - Writer: content, + Versioned: headPrm.Versioned(), BucketInfo: tc.bktInfo, }) require.NoError(tc.t, err) - return objInfo, content.Bytes() + payload, err := io.ReadAll(objPayload) + require.NoError(tc.t, err) + + return objInfo, payload } func (tc *testContext) deleteObject(objectName, versionID string, settings *data.BucketSettings) {