diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a1e94b..9c3f4c9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ This document outlines major changes between releases. - Handle negative `Content-Length` on put (#125) - Use `DisableURIPathEscaping` to presign urls (#125) - Use specific s3 errors instead of `InternalError` where possible (#143) +- Return appropriate 404 code when object missed in storage but there is in gate cache (#158) ### Added - Implement chunk uploading (#106) diff --git a/api/handler/copy.go b/api/handler/copy.go index 494134c..21e277b 100644 --- a/api/handler/copy.go +++ b/api/handler/copy.go @@ -170,13 +170,14 @@ func (h *handler) CopyObjectHandler(w http.ResponseWriter, r *http.Request) { } params := &layer.CopyObjectParams{ - SrcObject: srcObjInfo, - ScrBktInfo: srcObjPrm.BktInfo, - DstBktInfo: dstBktInfo, - DstObject: reqInfo.ObjectName, - SrcSize: srcObjInfo.Size, - Header: metadata, - Encryption: encryptionParams, + SrcVersioned: srcObjPrm.Versioned(), + SrcObject: srcObjInfo, + ScrBktInfo: srcObjPrm.BktInfo, + DstBktInfo: dstBktInfo, + DstObject: reqInfo.ObjectName, + SrcSize: srcObjInfo.Size, + Header: metadata, + Encryption: encryptionParams, } params.CopiesNumbers, err = h.pickCopiesNumbers(metadata, dstBktInfo.LocationConstraint) diff --git a/api/handler/get.go b/api/handler/get.go index 1605dce..615f7f0 100644 --- a/api/handler/get.go +++ b/api/handler/get.go @@ -202,6 +202,20 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) { return } + getPayloadParams := &layer.GetObjectParams{ + ObjectInfo: info, + Versioned: p.Versioned(), + Range: params, + BucketInfo: bktInfo, + Encryption: encryptionParams, + } + + objPayload, err := h.obj.GetObject(r.Context(), getPayloadParams) + if err != nil { + h.logAndSendError(w, "could not get object payload", reqInfo, err) + return + } + writeHeaders(w.Header(), r.Header, extendedInfo, len(tagSet), bktSettings.Unversioned()) if params != nil { writeRangeHeaders(w, params, info.Size) @@ -209,15 +223,9 @@ func (h *handler) GetObjectHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } - getParams := &layer.GetObjectParams{ - ObjectInfo: info, - Writer: w, - Range: params, - BucketInfo: bktInfo, - Encryption: encryptionParams, - } - if err = h.obj.GetObject(r.Context(), getParams); err != nil { - h.logAndSendError(w, "could not get object", reqInfo, err) + if err = objPayload.StreamTo(w); err != nil { + h.logAndSendError(w, "could not stream object payload", reqInfo, err) + return } } diff --git a/api/handler/get_test.go b/api/handler/get_test.go index 244a9f5..22c7784 100644 --- a/api/handler/get_test.go +++ b/api/handler/get_test.go @@ -5,12 +5,18 @@ import ( "fmt" "io" "net/http" + "net/http/httptest" + "net/url" "testing" "time" + "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" + apiErrors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" + s3errors "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/errors" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/layer" + apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "github.com/stretchr/testify/require" ) @@ -170,6 +176,24 @@ func TestGetRange(t *testing.T) { require.Equal(t, "bcdef", string(end)) } +func TestGetObject(t *testing.T) { + hc := prepareHandlerContext(t) + bktName, objName := "bucket", "obj" + bktInfo, objInfo := createVersionedBucketAndObject(hc.t, hc, bktName, objName) + + putObject(hc.t, hc, bktName, objName) + + checkFound(hc.t, hc, bktName, objName, objInfo.VersionID()) + checkFound(hc.t, hc, bktName, objName, emptyVersion) + + addr := getAddressOfLastVersion(hc, bktInfo, objName) + hc.tp.SetObjectError(addr, apistatus.ObjectNotFound{}) + hc.tp.SetObjectError(objInfo.Address(), apistatus.ObjectNotFound{}) + + getObjectAssertS3Error(hc, bktName, objName, objInfo.VersionID(), s3errors.ErrNoSuchVersion) + getObjectAssertS3Error(hc, bktName, objName, emptyVersion, s3errors.ErrNoSuchKey) +} + func putObjectContent(hc *handlerContext, bktName, objName, content string) { body := bytes.NewReader([]byte(content)) w, r := prepareTestPayloadRequest(hc, bktName, objName, body) @@ -186,3 +210,17 @@ func getObjectRange(t *testing.T, tc *handlerContext, bktName, objName string, s require.NoError(t, err) return content } + +func getObjectAssertS3Error(hc *handlerContext, bktName, objName, version string, code apiErrors.ErrorCode) { + w := getObjectBase(hc, bktName, objName, version) + assertS3Error(hc.t, w, apiErrors.GetAPIError(code)) +} + +func getObjectBase(hc *handlerContext, bktName, objName, version string) *httptest.ResponseRecorder { + query := make(url.Values) + query.Add(api.QueryVersionID, version) + + w, r := prepareTestFullRequest(hc, bktName, objName, query, nil) + hc.Handler().GetObjectHandler(w, r) + return w +} diff --git a/api/handler/handlers_test.go b/api/handler/handlers_test.go index c394918..f700cf5 100644 --- a/api/handler/handlers_test.go +++ b/api/handler/handlers_test.go @@ -272,10 +272,16 @@ func existInMockedFrostFS(tc *handlerContext, bktInfo *data.BucketInfo, objInfo p := &layer.GetObjectParams{ BucketInfo: bktInfo, ObjectInfo: objInfo, - Writer: io.Discard, } - return tc.Layer().GetObject(tc.Context(), p) == nil + objPayload, err := tc.Layer().GetObject(tc.Context(), p) + if err != nil { + return false + } + + _, err = io.ReadAll(objPayload) + require.NoError(tc.t, err) + return true } func listOIDsFromMockedFrostFS(t *testing.T, tc *handlerContext, bktName string) []oid.ID { diff --git a/api/handler/head.go b/api/handler/head.go index 1152ea8..a2f1f14 100644 --- a/api/handler/head.go +++ b/api/handler/head.go @@ -1,7 +1,7 @@ package handler import ( - "bytes" + "io" "net/http" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api" @@ -84,18 +84,26 @@ func (h *handler) HeadObjectHandler(w http.ResponseWriter, r *http.Request) { if len(info.ContentType) == 0 { if info.ContentType = layer.MimeByFilePath(info.Name); len(info.ContentType) == 0 { - buffer := bytes.NewBuffer(make([]byte, 0, sizeToDetectType)) getParams := &layer.GetObjectParams{ ObjectInfo: info, - Writer: buffer, + Versioned: p.Versioned(), Range: getRangeToDetectContentType(info.Size), BucketInfo: bktInfo, } - if err = h.obj.GetObject(r.Context(), getParams); err != nil { + + objPayload, err := h.obj.GetObject(r.Context(), getParams) + if err != nil { h.logAndSendError(w, "could not get object", reqInfo, err, zap.Stringer("oid", info.ID)) return } - info.ContentType = http.DetectContentType(buffer.Bytes()) + + buffer, err := io.ReadAll(objPayload) + if err != nil { + h.logAndSendError(w, "could not partly read payload to detect content type", reqInfo, err, zap.Stringer("oid", info.ID)) + return + } + + info.ContentType = http.DetectContentType(buffer) } } diff --git a/api/handler/multipart_upload.go b/api/handler/multipart_upload.go index 9fc82ee..e97c46a 100644 --- a/api/handler/multipart_upload.go +++ b/api/handler/multipart_upload.go @@ -320,11 +320,13 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) { return } - srcInfo, err := h.obj.GetObjectInfo(ctx, &layer.HeadObjectParams{ + headPrm := &layer.HeadObjectParams{ BktInfo: srcBktInfo, Object: srcObject, VersionID: versionID, - }) + } + + srcInfo, err := h.obj.GetObjectInfo(ctx, headPrm) if err != nil { if errors.IsS3Error(err, errors.ErrNoSuchKey) && versionID != "" { h.logAndSendError(w, "could not head source object version", reqInfo, @@ -349,6 +351,7 @@ func (h *handler) UploadPartCopy(w http.ResponseWriter, r *http.Request) { } p := &layer.UploadCopyParams{ + Versioned: headPrm.Versioned(), Info: &layer.UploadInfoParams{ UploadID: uploadID, Bkt: bktInfo, diff --git a/api/layer/layer.go b/api/layer/layer.go index b28a182..17cc6fe 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -74,7 +74,7 @@ type ( Range *RangeParams ObjectInfo *data.ObjectInfo BucketInfo *data.BucketInfo - Writer io.Writer + Versioned bool Encryption encryption.Params } @@ -132,6 +132,7 @@ type ( // CopyObjectParams stores object copy request parameters. CopyObjectParams struct { + SrcVersioned bool SrcObject *data.ObjectInfo ScrBktInfo *data.BucketInfo DstBktInfo *data.BucketInfo @@ -185,6 +186,13 @@ type ( Error error } + ObjectPayload struct { + r io.Reader + params getParams + encrypted bool + decryptedLen uint64 + } + // Client provides S3 API client interface. Client interface { Initialize(ctx context.Context, c EventListener) error @@ -204,7 +212,7 @@ type ( CreateBucket(ctx context.Context, p *CreateBucketParams) (*data.BucketInfo, error) DeleteBucket(ctx context.Context, p *DeleteBucketParams) error - GetObject(ctx context.Context, p *GetObjectParams) error + GetObject(ctx context.Context, p *GetObjectParams) (*ObjectPayload, error) GetObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ObjectInfo, error) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) (*data.ExtendedObjectInfo, error) @@ -268,6 +276,10 @@ func (f MsgHandlerFunc) HandleMessage(ctx context.Context, msg *nats.Msg) error return f(ctx, msg) } +func (p HeadObjectParams) Versioned() bool { + return len(p.VersionID) > 0 +} + // NewLayer creates an instance of a layer. It checks credentials // and establishes gRPC connection with the node. func NewLayer(log *zap.Logger, frostFS FrostFS, config *Config) Client { @@ -395,7 +407,7 @@ func (n *layer) ListBuckets(ctx context.Context) ([]*data.BucketInfo, error) { } // GetObject from storage. -func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error { +func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) (*ObjectPayload, error) { var params getParams params.oid = p.ObjectInfo.ID @@ -406,7 +418,7 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error { var err error decReader, err = getDecrypter(p) if err != nil { - return fmt.Errorf("creating decrypter: %w", err) + return nil, fmt.Errorf("creating decrypter: %w", err) } params.off = decReader.EncryptedOffset() params.ln = decReader.EncryptedLength() @@ -420,32 +432,58 @@ func (n *layer) GetObject(ctx context.Context, p *GetObjectParams) error { } } - payload, err := n.initObjectPayloadReader(ctx, params) + r, err := n.initObjectPayloadReader(ctx, params) if err != nil { - return fmt.Errorf("init object payload reader: %w", err) + if client.IsErrObjectNotFound(err) { + if p.Versioned { + err = fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchVersion), err.Error()) + } else { + err = fmt.Errorf("%w: %s", errors.GetAPIError(errors.ErrNoSuchKey), err.Error()) + } + } + + return nil, fmt.Errorf("init object payload reader: %w", err) } + var decryptedLen uint64 + if decReader != nil { + if err = decReader.SetReader(r); err != nil { + return nil, fmt.Errorf("set reader to decrypter: %w", err) + } + r = io.LimitReader(decReader, int64(decReader.DecryptedLength())) + decryptedLen = decReader.DecryptedLength() + } + + return &ObjectPayload{ + r: r, + params: params, + encrypted: decReader != nil, + decryptedLen: decryptedLen, + }, nil +} + +// Read implements io.Reader. If you want to use ObjectPayload as io.Reader +// you must not use ObjectPayload.StreamTo method and vice versa. +func (o *ObjectPayload) Read(p []byte) (int, error) { + return o.r.Read(p) +} + +// StreamTo reads all payload to provided writer. +// If you want to use this method you must not use ObjectPayload.Read and vice versa. +func (o *ObjectPayload) StreamTo(w io.Writer) error { bufSize := uint64(32 * 1024) // configure? - if params.ln != 0 && params.ln < bufSize { - bufSize = params.ln + if o.params.ln != 0 && o.params.ln < bufSize { + bufSize = o.params.ln } // alloc buffer for copying buf := make([]byte, bufSize) // sync-pool it? - r := payload - if decReader != nil { - if err = decReader.SetReader(payload); err != nil { - return fmt.Errorf("set reader to decrypter: %w", err) - } - r = io.LimitReader(decReader, int64(decReader.DecryptedLength())) - } - // copy full payload - written, err := io.CopyBuffer(p.Writer, r, buf) + written, err := io.CopyBuffer(w, o.r, buf) if err != nil { - if decReader != nil { - return fmt.Errorf("copy object payload written: '%d', decLength: '%d', params.ln: '%d' : %w", written, decReader.DecryptedLength(), params.ln, err) + if o.encrypted { + return fmt.Errorf("copy object payload written: '%d', decLength: '%d', params.ln: '%d' : %w", written, o.decryptedLen, o.params.ln, err) } return fmt.Errorf("copy object payload written: '%d': %w", written, err) } @@ -497,10 +535,10 @@ func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) var objInfo *data.ExtendedObjectInfo var err error - if len(p.VersionID) == 0 { - objInfo, err = n.headLastVersionIfNotDeleted(ctx, p.BktInfo, p.Object) - } else { + if p.Versioned() { objInfo, err = n.headVersion(ctx, p.BktInfo, p) + } else { + objInfo, err = n.headLastVersionIfNotDeleted(ctx, p.BktInfo, p.Object) } if err != nil { return nil, err @@ -515,27 +553,22 @@ func (n *layer) GetExtendedObjectInfo(ctx context.Context, p *HeadObjectParams) // CopyObject from one bucket into another bucket. func (n *layer) CopyObject(ctx context.Context, p *CopyObjectParams) (*data.ExtendedObjectInfo, error) { - pr, pw := io.Pipe() - - go func() { - err := n.GetObject(ctx, &GetObjectParams{ - ObjectInfo: p.SrcObject, - Writer: pw, - Range: p.Range, - BucketInfo: p.ScrBktInfo, - Encryption: p.Encryption, - }) - - if err = pw.CloseWithError(err); err != nil { - n.reqLogger(ctx).Error("could not get object", zap.Error(err)) - } - }() + objPayload, err := n.GetObject(ctx, &GetObjectParams{ + ObjectInfo: p.SrcObject, + Versioned: p.SrcVersioned, + Range: p.Range, + BucketInfo: p.ScrBktInfo, + Encryption: p.Encryption, + }) + if err != nil { + return nil, fmt.Errorf("get object to copy: %w", err) + } return n.PutObject(ctx, &PutObjectParams{ BktInfo: p.DstBktInfo, Object: p.DstObject, Size: p.SrcSize, - Reader: pr, + Reader: objPayload, Header: p.Header, Encryption: p.Encryption, CopiesNumbers: p.CopiesNumbers, diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index b453269..129efb5 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -64,6 +64,7 @@ type ( } UploadCopyParams struct { + Versioned bool Info *UploadInfoParams SrcObjInfo *data.ObjectInfo SrcBktInfo *data.BucketInfo @@ -292,26 +293,21 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data. return nil, fmt.Errorf("%w: %d/%d", s3errors.GetAPIError(s3errors.ErrEntityTooLarge), size, uploadMaxSize) } - pr, pw := io.Pipe() - - go func() { - err = n.GetObject(ctx, &GetObjectParams{ - ObjectInfo: p.SrcObjInfo, - Writer: pw, - Range: p.Range, - BucketInfo: p.SrcBktInfo, - }) - - if err = pw.CloseWithError(err); err != nil { - n.reqLogger(ctx).Error("could not get object", zap.Error(err)) - } - }() + objPayload, err := n.GetObject(ctx, &GetObjectParams{ + ObjectInfo: p.SrcObjInfo, + Versioned: p.Versioned, + Range: p.Range, + BucketInfo: p.SrcBktInfo, + }) + if err != nil { + return nil, fmt.Errorf("get object to upload copy: %w", err) + } params := &UploadPartParams{ Info: p.Info, PartNumber: p.PartNumber, Size: size, - Reader: pr, + Reader: objPayload, } return n.uploadPart(ctx, multipartInfo, params) diff --git a/api/layer/versioning_test.go b/api/layer/versioning_test.go index 884f477..5b6e36f 100644 --- a/api/layer/versioning_test.go +++ b/api/layer/versioning_test.go @@ -3,6 +3,7 @@ package layer import ( "bytes" "context" + "io" "testing" "git.frostfs.info/TrueCloudLab/frostfs-s3-gw/api/data" @@ -31,26 +32,29 @@ func (tc *testContext) putObject(content []byte) *data.ObjectInfo { } func (tc *testContext) getObject(objectName, versionID string, needError bool) (*data.ObjectInfo, []byte) { - objInfo, err := tc.layer.GetObjectInfo(tc.ctx, &HeadObjectParams{ + headPrm := &HeadObjectParams{ BktInfo: tc.bktInfo, Object: objectName, VersionID: versionID, - }) + } + objInfo, err := tc.layer.GetObjectInfo(tc.ctx, headPrm) if needError { require.Error(tc.t, err) return nil, nil } require.NoError(tc.t, err) - content := bytes.NewBuffer(nil) - err = tc.layer.GetObject(tc.ctx, &GetObjectParams{ + objPayload, err := tc.layer.GetObject(tc.ctx, &GetObjectParams{ ObjectInfo: objInfo, - Writer: content, + Versioned: headPrm.Versioned(), BucketInfo: tc.bktInfo, }) require.NoError(tc.t, err) - return objInfo, content.Bytes() + payload, err := io.ReadAll(objPayload) + require.NoError(tc.t, err) + + return objInfo, payload } func (tc *testContext) deleteObject(objectName, versionID string, settings *data.BucketSettings) {