forked from TrueCloudLab/frostfs-s3-gw
[#441] Optimize put objects
Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
parent
6cf7dc4010
commit
2bca4755f9
6 changed files with 60 additions and 34 deletions
|
@ -41,17 +41,14 @@ func (n *layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
|
||||||
Metadata: map[string]string{},
|
Metadata: map[string]string{},
|
||||||
Prefix: "",
|
Prefix: "",
|
||||||
Reader: &buf,
|
Reader: &buf,
|
||||||
|
Size: int64(buf.Len()),
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err := n.putSystemObjectIntoNeoFS(ctx, s)
|
_, err := n.putSystemObjectIntoNeoFS(ctx, s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if obj.Size == 0 {
|
|
||||||
return errors.GetAPIError(errors.ErrInternalError)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = n.systemCache.PutCORS(systemObjectKey(p.BktInfo, s.ObjName), cors); err != nil {
|
if err = n.systemCache.PutCORS(systemObjectKey(p.BktInfo, s.ObjName), cors); err != nil {
|
||||||
n.log.Error("couldn't cache system object", zap.Error(err))
|
n.log.Error("couldn't cache system object", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
|
@ -159,6 +159,7 @@ type (
|
||||||
Metadata map[string]string
|
Metadata map[string]string
|
||||||
Prefix string
|
Prefix string
|
||||||
Reader io.Reader
|
Reader io.Reader
|
||||||
|
Size int64
|
||||||
Lock *data.ObjectLock
|
Lock *data.ObjectLock
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -134,6 +134,7 @@ func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (*data.Obje
|
||||||
Metadata: p.Header,
|
Metadata: p.Header,
|
||||||
Prefix: "",
|
Prefix: "",
|
||||||
Reader: p.Reader,
|
Reader: p.Reader,
|
||||||
|
Size: p.Size,
|
||||||
}
|
}
|
||||||
|
|
||||||
return n.PutSystemObject(ctx, params)
|
return n.PutSystemObject(ctx, params)
|
||||||
|
@ -144,17 +145,15 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size := p.SrcObjInfo.Size
|
||||||
if p.Range != nil {
|
if p.Range != nil {
|
||||||
if p.Range.End-p.Range.Start > uploadMaxSize {
|
size = int64(p.Range.End - p.Range.Start + 1)
|
||||||
return nil, errors.GetAPIError(errors.ErrEntityTooLarge)
|
|
||||||
}
|
|
||||||
if p.Range.End > uint64(p.SrcObjInfo.Size) {
|
if p.Range.End > uint64(p.SrcObjInfo.Size) {
|
||||||
return nil, errors.GetAPIError(errors.ErrInvalidCopyPartRangeSource)
|
return nil, errors.GetAPIError(errors.ErrInvalidCopyPartRangeSource)
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
if p.SrcObjInfo.Size > uploadMaxSize {
|
if size > uploadMaxSize {
|
||||||
return nil, errors.GetAPIError(errors.ErrEntityTooLarge)
|
return nil, errors.GetAPIError(errors.ErrEntityTooLarge)
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
metadata := make(map[string]string)
|
metadata := make(map[string]string)
|
||||||
|
@ -180,6 +179,7 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
|
||||||
Metadata: metadata,
|
Metadata: metadata,
|
||||||
Prefix: "",
|
Prefix: "",
|
||||||
Reader: pr,
|
Reader: pr,
|
||||||
|
Size: size,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,17 +28,14 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu
|
||||||
ObjName: p.BktInfo.NotificationConfigurationObjectName(),
|
ObjName: p.BktInfo.NotificationConfigurationObjectName(),
|
||||||
Metadata: map[string]string{},
|
Metadata: map[string]string{},
|
||||||
Reader: bytes.NewReader(confXML),
|
Reader: bytes.NewReader(confXML),
|
||||||
|
Size: int64(len(confXML)),
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err := n.putSystemObjectIntoNeoFS(ctx, s)
|
_, err = n.putSystemObjectIntoNeoFS(ctx, s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if obj.Size == 0 && !p.Configuration.IsEmpty() {
|
|
||||||
return errors.GetAPIError(errors.ErrInternalError)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err = n.systemCache.PutNotificationConfiguration(systemObjectKey(p.BktInfo, s.ObjName), p.Configuration); err != nil {
|
if err = n.systemCache.PutNotificationConfiguration(systemObjectKey(p.BktInfo, s.ObjName), p.Configuration); err != nil {
|
||||||
n.log.Error("couldn't cache system object", zap.Error(err))
|
n.log.Error("couldn't cache system object", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
|
@ -202,6 +202,14 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
currentEpoch, _, err := n.neoFS.TimeToEpoch(ctx, time.Now().Add(time.Minute))
|
||||||
|
if err != nil {
|
||||||
|
n.log.Warn("couldn't get creation epoch",
|
||||||
|
zap.String("bucket", p.BktInfo.Name),
|
||||||
|
zap.String("object", p.Object),
|
||||||
|
zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
if p.Lock != nil {
|
if p.Lock != nil {
|
||||||
objInfo := &data.ObjectInfo{ID: *id, Name: p.Object}
|
objInfo := &data.ObjectInfo{ID: *id, Name: p.Object}
|
||||||
p.Lock.Objects = append(p.Lock.Objects, *id)
|
p.Lock.Objects = append(p.Lock.Objects, *id)
|
||||||
|
@ -240,14 +248,15 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object
|
||||||
ID: *id,
|
ID: *id,
|
||||||
CID: p.BktInfo.CID,
|
CID: p.BktInfo.CID,
|
||||||
|
|
||||||
Owner: own,
|
Owner: own,
|
||||||
Bucket: p.BktInfo.Name,
|
Bucket: p.BktInfo.Name,
|
||||||
Name: p.Object,
|
Name: p.Object,
|
||||||
Size: p.Size,
|
Size: p.Size,
|
||||||
Created: time.Now(),
|
Created: time.Now(),
|
||||||
Headers: p.Header,
|
CreationEpoch: currentEpoch,
|
||||||
ContentType: p.Header[api.ContentType],
|
Headers: p.Header,
|
||||||
HashSum: hex.EncodeToString(hash),
|
ContentType: p.Header[api.ContentType],
|
||||||
|
HashSum: hex.EncodeToString(hash),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -712,6 +721,10 @@ func (n *layer) transformNeofsError(ctx context.Context, err error) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func wrapReader(input io.Reader, bufSize int, f func(buf []byte)) io.Reader {
|
func wrapReader(input io.Reader, bufSize int, f func(buf []byte)) io.Reader {
|
||||||
|
if input == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
r, w := io.Pipe()
|
r, w := io.Pipe()
|
||||||
go func() {
|
go func() {
|
||||||
var buf = make([]byte, bufSize)
|
var buf = make([]byte, bufSize)
|
||||||
|
|
|
@ -3,6 +3,7 @@ package layer
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"encoding/xml"
|
"encoding/xml"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -117,14 +118,17 @@ func (n *layer) putSystemObjectIntoNeoFS(ctx context.Context, p *PutSystemObject
|
||||||
prm.Attributes = append(prm.Attributes, [2]string{k, v})
|
prm.Attributes = append(prm.Attributes, [2]string{k, v})
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := n.objectPut(ctx, prm)
|
id, hash, err := n.objectPutAndHash(ctx, prm)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
meta, err := n.objectHead(ctx, p.BktInfo.CID, *id)
|
currentEpoch, _, err := n.neoFS.TimeToEpoch(ctx, time.Now().Add(time.Minute))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
n.log.Warn("couldn't get creation epoch",
|
||||||
|
zap.String("bucket", p.BktInfo.Name),
|
||||||
|
zap.String("object", p.ObjName),
|
||||||
|
zap.Error(err))
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, id := range idsToDeleteArr {
|
for _, id := range idsToDeleteArr {
|
||||||
|
@ -136,7 +140,24 @@ func (n *layer) putSystemObjectIntoNeoFS(ctx context.Context, p *PutSystemObject
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return objInfoFromMeta(p.BktInfo, meta), nil
|
headers := make(map[string]string, len(p.Metadata))
|
||||||
|
for _, attr := range prm.Attributes {
|
||||||
|
headers[attr[0]] = attr[1]
|
||||||
|
}
|
||||||
|
|
||||||
|
return &data.ObjectInfo{
|
||||||
|
ID: *id,
|
||||||
|
CID: p.BktInfo.CID,
|
||||||
|
|
||||||
|
Owner: p.BktInfo.Owner,
|
||||||
|
Bucket: p.BktInfo.Name,
|
||||||
|
Name: p.ObjName,
|
||||||
|
Created: time.Now(),
|
||||||
|
CreationEpoch: currentEpoch,
|
||||||
|
Size: p.Size,
|
||||||
|
Headers: headers,
|
||||||
|
HashSum: hex.EncodeToString(hash),
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *layer) getSystemObjectFromNeoFS(ctx context.Context, bkt *data.BucketInfo, objName string) (*object.Object, error) {
|
func (n *layer) getSystemObjectFromNeoFS(ctx context.Context, bkt *data.BucketInfo, objName string) (*object.Object, error) {
|
||||||
|
@ -272,17 +293,14 @@ func (n *layer) PutBucketSettings(ctx context.Context, p *PutSettingsParams) err
|
||||||
ObjName: p.BktInfo.SettingsObjectName(),
|
ObjName: p.BktInfo.SettingsObjectName(),
|
||||||
Metadata: map[string]string{},
|
Metadata: map[string]string{},
|
||||||
Reader: bytes.NewReader(rawSettings),
|
Reader: bytes.NewReader(rawSettings),
|
||||||
|
Size: int64(len(rawSettings)),
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err := n.putSystemObjectIntoNeoFS(ctx, s)
|
_, err = n.putSystemObjectIntoNeoFS(ctx, s)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if obj.Size == 0 {
|
|
||||||
return errors.GetAPIError(errors.ErrInternalError)
|
|
||||||
}
|
|
||||||
|
|
||||||
systemKey := systemObjectKey(p.BktInfo, p.BktInfo.SettingsObjectName())
|
systemKey := systemObjectKey(p.BktInfo, p.BktInfo.SettingsObjectName())
|
||||||
if err = n.systemCache.PutSettings(systemKey, p.Settings); err != nil {
|
if err = n.systemCache.PutSettings(systemKey, p.Settings); err != nil {
|
||||||
n.log.Error("couldn't cache system object", zap.Error(err))
|
n.log.Error("couldn't cache system object", zap.Error(err))
|
||||||
|
|
Loading…
Reference in a new issue