From 385437aacedc093cf558ebe8f2e12110824180f6 Mon Sep 17 00:00:00 2001 From: Alex Vanin Date: Thu, 19 May 2022 17:56:42 +0300 Subject: [PATCH] [#441] Don't produce extra object.Head request at regular object upload Hash can be calculated locally in S3 gateway. Creation epoch used for versioning and will be fetched during get and list requests. To avoid conflicts, put method do not update cache anymore. Signed-off-by: Alex Vanin --- api/layer/object.go | 62 ++++++++++++++++++++++++++-------------- api/layer/object_test.go | 20 +++++++++++++ 2 files changed, 61 insertions(+), 21 deletions(-) diff --git a/api/layer/object.go b/api/layer/object.go index c522584c..06bd998d 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -2,6 +2,7 @@ package layer import ( "context" + "crypto/sha256" "encoding/hex" "errors" "fmt" @@ -199,7 +200,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object } } - id, err := n.objectPut(ctx, prm) + id, hash, err := n.objectPutAndHash(ctx, prm) if err != nil { return nil, err } @@ -219,15 +220,6 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object } } - meta, err := n.objectHead(ctx, p.BktInfo.CID, *id) - if err != nil { - return nil, err - } - - if err = n.objCache.Put(*meta); err != nil { - n.log.Error("couldn't cache an object", zap.Error(err)) - } - n.listsCache.CleanCacheEntriesContainingObject(p.Object, p.BktInfo.CID) for _, id := range idsToDeleteArr { @@ -247,21 +239,18 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object } } - payloadChecksum, _ := meta.PayloadChecksum() - return &data.ObjectInfo{ ID: id, CID: p.BktInfo.CID, - Owner: &own, - Bucket: p.BktInfo.Name, - Name: p.Object, - Size: p.Size, - Created: time.Now(), - CreationEpoch: meta.CreationEpoch(), - Headers: p.Header, - ContentType: p.Header[api.ContentType], - HashSum: hex.EncodeToString(payloadChecksum.Value()), + Owner: &own, + Bucket: p.BktInfo.Name, + Name: p.Object, + Size: p.Size, + Created: time.Now(), + Headers: p.Header, + ContentType: p.Header[api.ContentType], + HashSum: hex.EncodeToString(hash), }, nil } @@ -463,6 +452,18 @@ func (n *layer) objectPut(ctx context.Context, prm neofs.PrmObjectCreate) (*oid. return id, n.transformNeofsError(ctx, err) } +// objectPutAndHash prepare auth parameters and invoke neofs.CreateObject. +// Returns object ID and payload sha256 hash. +func (n *layer) objectPutAndHash(ctx context.Context, prm neofs.PrmObjectCreate) (*oid.ID, []byte, error) { + n.prepareAuthParameters(ctx, &prm.PrmAuth) + hash := sha256.New() + prm.Payload = wrapReader(prm.Payload, 64*1024, func(buf []byte) { + hash.Write(buf) + }) + id, err := n.neoFS.CreateObject(ctx, prm) + return id, hash.Sum(nil), n.transformNeofsError(ctx, err) +} + // ListObjectsV1 returns objects in a bucket for requests of Version 1. func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) { var ( @@ -712,3 +713,22 @@ func (n *layer) transformNeofsError(ctx context.Context, err error) error { return err } + +func wrapReader(input io.Reader, bufSize int, f func(buf []byte)) io.Reader { + r, w := io.Pipe() + go func() { + var buf = make([]byte, bufSize) + for { + n, err := input.Read(buf) + if n > 0 { + f(buf[:n]) + _, _ = w.Write(buf[:n]) // ignore error, input is not ReadCloser + } + if err != nil { + _ = w.CloseWithError(err) + break + } + } + }() + return r +} diff --git a/api/layer/object_test.go b/api/layer/object_test.go index b51ce7d1..942423a6 100644 --- a/api/layer/object_test.go +++ b/api/layer/object_test.go @@ -1,8 +1,10 @@ package layer import ( + "bytes" "crypto/rand" "crypto/sha256" + "io/ioutil" "testing" "github.com/nspcc-dev/neofs-s3-gw/api/data" @@ -112,3 +114,21 @@ func TestTrimAfterObjectID(t *testing.T) { require.Nil(t, actual) }) } + +func TestWrapReader(t *testing.T) { + src := make([]byte, 1024*1024+1) + _, err := rand.Read(src) + require.NoError(t, err) + h := sha256.Sum256(src) + + streamHash := sha256.New() + reader := bytes.NewReader(src) + wrappedReader := wrapReader(reader, 64*1024, func(buf []byte) { + streamHash.Write(buf) + }) + + dst, err := ioutil.ReadAll(wrappedReader) + require.NoError(t, err) + require.Equal(t, src, dst) + require.Equal(t, h[:], streamHash.Sum(nil)) +}