diff --git a/api/layer/object.go b/api/layer/object.go index c522584c..06bd998d 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -2,6 +2,7 @@ package layer import ( "context" + "crypto/sha256" "encoding/hex" "errors" "fmt" @@ -199,7 +200,7 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object } } - id, err := n.objectPut(ctx, prm) + id, hash, err := n.objectPutAndHash(ctx, prm) if err != nil { return nil, err } @@ -219,15 +220,6 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object } } - meta, err := n.objectHead(ctx, p.BktInfo.CID, *id) - if err != nil { - return nil, err - } - - if err = n.objCache.Put(*meta); err != nil { - n.log.Error("couldn't cache an object", zap.Error(err)) - } - n.listsCache.CleanCacheEntriesContainingObject(p.Object, p.BktInfo.CID) for _, id := range idsToDeleteArr { @@ -247,21 +239,18 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object } } - payloadChecksum, _ := meta.PayloadChecksum() - return &data.ObjectInfo{ ID: id, CID: p.BktInfo.CID, - Owner: &own, - Bucket: p.BktInfo.Name, - Name: p.Object, - Size: p.Size, - Created: time.Now(), - CreationEpoch: meta.CreationEpoch(), - Headers: p.Header, - ContentType: p.Header[api.ContentType], - HashSum: hex.EncodeToString(payloadChecksum.Value()), + Owner: &own, + Bucket: p.BktInfo.Name, + Name: p.Object, + Size: p.Size, + Created: time.Now(), + Headers: p.Header, + ContentType: p.Header[api.ContentType], + HashSum: hex.EncodeToString(hash), }, nil } @@ -463,6 +452,18 @@ func (n *layer) objectPut(ctx context.Context, prm neofs.PrmObjectCreate) (*oid. return id, n.transformNeofsError(ctx, err) } +// objectPutAndHash prepare auth parameters and invoke neofs.CreateObject. +// Returns object ID and payload sha256 hash. +func (n *layer) objectPutAndHash(ctx context.Context, prm neofs.PrmObjectCreate) (*oid.ID, []byte, error) { + n.prepareAuthParameters(ctx, &prm.PrmAuth) + hash := sha256.New() + prm.Payload = wrapReader(prm.Payload, 64*1024, func(buf []byte) { + hash.Write(buf) + }) + id, err := n.neoFS.CreateObject(ctx, prm) + return id, hash.Sum(nil), n.transformNeofsError(ctx, err) +} + // ListObjectsV1 returns objects in a bucket for requests of Version 1. func (n *layer) ListObjectsV1(ctx context.Context, p *ListObjectsParamsV1) (*ListObjectsInfoV1, error) { var ( @@ -712,3 +713,22 @@ func (n *layer) transformNeofsError(ctx context.Context, err error) error { return err } + +func wrapReader(input io.Reader, bufSize int, f func(buf []byte)) io.Reader { + r, w := io.Pipe() + go func() { + var buf = make([]byte, bufSize) + for { + n, err := input.Read(buf) + if n > 0 { + f(buf[:n]) + _, _ = w.Write(buf[:n]) // ignore error, input is not ReadCloser + } + if err != nil { + _ = w.CloseWithError(err) + break + } + } + }() + return r +} diff --git a/api/layer/object_test.go b/api/layer/object_test.go index b51ce7d1..942423a6 100644 --- a/api/layer/object_test.go +++ b/api/layer/object_test.go @@ -1,8 +1,10 @@ package layer import ( + "bytes" "crypto/rand" "crypto/sha256" + "io/ioutil" "testing" "github.com/nspcc-dev/neofs-s3-gw/api/data" @@ -112,3 +114,21 @@ func TestTrimAfterObjectID(t *testing.T) { require.Nil(t, actual) }) } + +func TestWrapReader(t *testing.T) { + src := make([]byte, 1024*1024+1) + _, err := rand.Read(src) + require.NoError(t, err) + h := sha256.Sum256(src) + + streamHash := sha256.New() + reader := bytes.NewReader(src) + wrappedReader := wrapReader(reader, 64*1024, func(buf []byte) { + streamHash.Write(buf) + }) + + dst, err := ioutil.ReadAll(wrappedReader) + require.NoError(t, err) + require.Equal(t, src, dst) + require.Equal(t, h[:], streamHash.Sum(nil)) +}