From 2bca4755f922aafe597f4c1bb889fbd408afec43 Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Wed, 25 May 2022 18:59:36 +0300 Subject: [PATCH] [#441] Optimize put objects Signed-off-by: Denis Kirillov --- api/layer/cors.go | 7 ++----- api/layer/layer.go | 1 + api/layer/multipart_upload.go | 14 +++++++------- api/layer/notifications.go | 7 ++----- api/layer/object.go | 29 ++++++++++++++++++++-------- api/layer/system_object.go | 36 ++++++++++++++++++++++++++--------- 6 files changed, 60 insertions(+), 34 deletions(-) diff --git a/api/layer/cors.go b/api/layer/cors.go index 1c21caf3..ab450fce 100644 --- a/api/layer/cors.go +++ b/api/layer/cors.go @@ -41,17 +41,14 @@ func (n *layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error { Metadata: map[string]string{}, Prefix: "", Reader: &buf, + Size: int64(buf.Len()), } - obj, err := n.putSystemObjectIntoNeoFS(ctx, s) + _, err := n.putSystemObjectIntoNeoFS(ctx, s) if err != nil { return err } - if obj.Size == 0 { - return errors.GetAPIError(errors.ErrInternalError) - } - if err = n.systemCache.PutCORS(systemObjectKey(p.BktInfo, s.ObjName), cors); err != nil { n.log.Error("couldn't cache system object", zap.Error(err)) } diff --git a/api/layer/layer.go b/api/layer/layer.go index 8448c614..42f91436 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -159,6 +159,7 @@ type ( Metadata map[string]string Prefix string Reader io.Reader + Size int64 Lock *data.ObjectLock } diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index 86df4d41..ef664f42 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -134,6 +134,7 @@ func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (*data.Obje Metadata: p.Header, Prefix: "", Reader: p.Reader, + Size: p.Size, } return n.PutSystemObject(ctx, params) @@ -144,17 +145,15 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data. return nil, err } + size := p.SrcObjInfo.Size if p.Range != nil { - if p.Range.End-p.Range.Start > uploadMaxSize { - return nil, errors.GetAPIError(errors.ErrEntityTooLarge) - } + size = int64(p.Range.End - p.Range.Start + 1) if p.Range.End > uint64(p.SrcObjInfo.Size) { return nil, errors.GetAPIError(errors.ErrInvalidCopyPartRangeSource) } - } else { - if p.SrcObjInfo.Size > uploadMaxSize { - return nil, errors.GetAPIError(errors.ErrEntityTooLarge) - } + } + if size > uploadMaxSize { + return nil, errors.GetAPIError(errors.ErrEntityTooLarge) } metadata := make(map[string]string) @@ -180,6 +179,7 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data. Metadata: metadata, Prefix: "", Reader: pr, + Size: size, }) } diff --git a/api/layer/notifications.go b/api/layer/notifications.go index 008204b0..3de6bfb0 100644 --- a/api/layer/notifications.go +++ b/api/layer/notifications.go @@ -28,17 +28,14 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu ObjName: p.BktInfo.NotificationConfigurationObjectName(), Metadata: map[string]string{}, Reader: bytes.NewReader(confXML), + Size: int64(len(confXML)), } - obj, err := n.putSystemObjectIntoNeoFS(ctx, s) + _, err = n.putSystemObjectIntoNeoFS(ctx, s) if err != nil { return err } - if obj.Size == 0 && !p.Configuration.IsEmpty() { - return errors.GetAPIError(errors.ErrInternalError) - } - if err = n.systemCache.PutNotificationConfiguration(systemObjectKey(p.BktInfo, s.ObjName), p.Configuration); err != nil { n.log.Error("couldn't cache system object", zap.Error(err)) } diff --git a/api/layer/object.go b/api/layer/object.go index ec034aaa..546b9f11 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -202,6 +202,14 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object return nil, err } + currentEpoch, _, err := n.neoFS.TimeToEpoch(ctx, time.Now().Add(time.Minute)) + if err != nil { + n.log.Warn("couldn't get creation epoch", + zap.String("bucket", p.BktInfo.Name), + zap.String("object", p.Object), + zap.Error(err)) + } + if p.Lock != nil { objInfo := &data.ObjectInfo{ID: *id, Name: p.Object} p.Lock.Objects = append(p.Lock.Objects, *id) @@ -240,14 +248,15 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object ID: *id, CID: p.BktInfo.CID, - Owner: own, - Bucket: p.BktInfo.Name, - Name: p.Object, - Size: p.Size, - Created: time.Now(), - Headers: p.Header, - ContentType: p.Header[api.ContentType], - HashSum: hex.EncodeToString(hash), + Owner: own, + Bucket: p.BktInfo.Name, + Name: p.Object, + Size: p.Size, + Created: time.Now(), + CreationEpoch: currentEpoch, + Headers: p.Header, + ContentType: p.Header[api.ContentType], + HashSum: hex.EncodeToString(hash), }, nil } @@ -712,6 +721,10 @@ func (n *layer) transformNeofsError(ctx context.Context, err error) error { } func wrapReader(input io.Reader, bufSize int, f func(buf []byte)) io.Reader { + if input == nil { + return nil + } + r, w := io.Pipe() go func() { var buf = make([]byte, bufSize) diff --git a/api/layer/system_object.go b/api/layer/system_object.go index 1c13655a..998f89a5 100644 --- a/api/layer/system_object.go +++ b/api/layer/system_object.go @@ -3,6 +3,7 @@ package layer import ( "bytes" "context" + "encoding/hex" "encoding/json" "encoding/xml" "fmt" @@ -117,14 +118,17 @@ func (n *layer) putSystemObjectIntoNeoFS(ctx context.Context, p *PutSystemObject prm.Attributes = append(prm.Attributes, [2]string{k, v}) } - id, err := n.objectPut(ctx, prm) + id, hash, err := n.objectPutAndHash(ctx, prm) if err != nil { return nil, err } - meta, err := n.objectHead(ctx, p.BktInfo.CID, *id) + currentEpoch, _, err := n.neoFS.TimeToEpoch(ctx, time.Now().Add(time.Minute)) if err != nil { - return nil, err + n.log.Warn("couldn't get creation epoch", + zap.String("bucket", p.BktInfo.Name), + zap.String("object", p.ObjName), + zap.Error(err)) } for _, id := range idsToDeleteArr { @@ -136,7 +140,24 @@ func (n *layer) putSystemObjectIntoNeoFS(ctx context.Context, p *PutSystemObject } } - return objInfoFromMeta(p.BktInfo, meta), nil + headers := make(map[string]string, len(p.Metadata)) + for _, attr := range prm.Attributes { + headers[attr[0]] = attr[1] + } + + return &data.ObjectInfo{ + ID: *id, + CID: p.BktInfo.CID, + + Owner: p.BktInfo.Owner, + Bucket: p.BktInfo.Name, + Name: p.ObjName, + Created: time.Now(), + CreationEpoch: currentEpoch, + Size: p.Size, + Headers: headers, + HashSum: hex.EncodeToString(hash), + }, nil } func (n *layer) getSystemObjectFromNeoFS(ctx context.Context, bkt *data.BucketInfo, objName string) (*object.Object, error) { @@ -272,17 +293,14 @@ func (n *layer) PutBucketSettings(ctx context.Context, p *PutSettingsParams) err ObjName: p.BktInfo.SettingsObjectName(), Metadata: map[string]string{}, Reader: bytes.NewReader(rawSettings), + Size: int64(len(rawSettings)), } - obj, err := n.putSystemObjectIntoNeoFS(ctx, s) + _, err = n.putSystemObjectIntoNeoFS(ctx, s) if err != nil { return err } - if obj.Size == 0 { - return errors.GetAPIError(errors.ErrInternalError) - } - systemKey := systemObjectKey(p.BktInfo, p.BktInfo.SettingsObjectName()) if err = n.systemCache.PutSettings(systemKey, p.Settings); err != nil { n.log.Error("couldn't cache system object", zap.Error(err))