[#417] Upload part using tree service

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-05-24 14:30:37 +03:00 committed by Alex Vanin
parent e1b9a4432a
commit bc0bdc7767
8 changed files with 221 additions and 32 deletions

23
api/cache/objects.go vendored
View file

@ -5,6 +5,7 @@ import (
"time"
"github.com/bluele/gcache"
"github.com/nspcc-dev/neofs-s3-gw/api/data"
"github.com/nspcc-dev/neofs-sdk-go/object"
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
"go.uber.org/zap"
@ -55,6 +56,21 @@ func (o *ObjectsCache) Get(address oid.Address) *object.Object {
return &result
}
// GetObject returns a cached object info.
func (o *ObjectsCache) GetObject(address oid.Address) *data.ObjectInfo {
entry, err := o.cache.Get(address.EncodeToString())
if err != nil {
return nil
}
result, ok := entry.(*data.ObjectInfo)
if !ok {
return nil
}
return result
}
// Put puts an object to cache.
func (o *ObjectsCache) Put(obj object.Object) error {
cnrID, ok := obj.ContainerID()
@ -73,6 +89,13 @@ func (o *ObjectsCache) Put(obj object.Object) error {
return o.cache.Set(addr.EncodeToString(), obj)
}
// PutObject puts an object info to cache.
func (o *ObjectsCache) PutObject(obj *data.ObjectInfo) error {
cnrID := obj.CID.EncodeToString()
objID := obj.ID.EncodeToString()
return o.cache.Set(cnrID+"/"+objID, obj)
}
// Delete deletes an object from cache.
func (o *ObjectsCache) Delete(address oid.Address) bool {
return o.cache.Remove(address.EncodeToString())

View file

@ -45,9 +45,20 @@ type ObjectTaggingInfo struct {
// MultipartInfo is multipart upload information.
type MultipartInfo struct {
// ID is node id in tree service.
// It's ignored when creating a new multipart upload.
ID uint64
Key string
UploadID string
Owner user.ID
Created time.Time
Meta map[string]string
}
// PartInfo is upload information about part.
type PartInfo struct {
Key string
UploadID string
Number int
OID oid.ID
}

View file

@ -216,13 +216,13 @@ func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
Reader: r.Body,
}
info, err := h.obj.UploadPart(r.Context(), p)
hash, err := h.obj.UploadPart(r.Context(), p)
if err != nil {
h.logAndSendError(w, "could not upload a part", reqInfo, err, additional...)
return
}
w.Header().Set(api.ETag, info.HashSum)
w.Header().Set(api.ETag, hash)
api.WriteSuccessResponseHeadersOnly(w)
}

View file

@ -233,7 +233,7 @@ type (
CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error
CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*data.ObjectInfo, error)
UploadPart(ctx context.Context, p *UploadPartParams) (*data.ObjectInfo, error)
UploadPart(ctx context.Context, p *UploadPartParams) (string, error)
UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error)
ListMultipartUploads(ctx context.Context, p *ListMultipartUploadsParams) (*ListMultipartUploadsInfo, error)
AbortMultipartUpload(ctx context.Context, p *UploadInfoParams) error

View file

@ -2,6 +2,7 @@ package layer
import (
"context"
"encoding/hex"
stderrors "errors"
"fmt"
"io"
@ -142,36 +143,91 @@ func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar
info.Meta[tagPrefix+key] = val
}
return n.treeService.CreateMultipart(ctx, &p.Info.Bkt.CID, info)
return n.treeService.CreateMultipartUpload(ctx, &p.Info.Bkt.CID, info)
}
func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (*data.ObjectInfo, error) {
if p.PartNumber != 0 {
if _, err := n.GetUploadInitInfo(ctx, p.Info); err != nil {
return nil, err
func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, error) {
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, &p.Info.Bkt.CID, p.Info.Key, p.Info.UploadID)
if err != nil {
if stderrors.Is(err, ErrNodeNotFound) {
return "", errors.GetAPIError(errors.ErrNoSuchUpload)
}
return "", err
}
if p.Size > uploadMaxSize {
return nil, errors.GetAPIError(errors.ErrEntityTooLarge)
return "", errors.GetAPIError(errors.ErrEntityTooLarge)
}
header := make(map[string]string)
appendUploadHeaders(header, p.Info.UploadID, p.Info.Key, p.PartNumber)
params := &PutSystemObjectParams{
BktInfo: p.Info.Bkt,
ObjName: FormUploadPartName(p.Info.UploadID, p.Info.Key, p.PartNumber),
Metadata: header,
Reader: p.Reader,
Size: p.Size,
objInfo, err := n.uploadPart(ctx, multipartInfo, p)
if err != nil {
return "", err
}
return n.PutSystemObject(ctx, params)
return objInfo.HashSum, nil
}
func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInfo, p *UploadPartParams) (*data.ObjectInfo, error) {
bktInfo := p.Info.Bkt
prm := PrmObjectCreate{
Container: bktInfo.CID,
Creator: bktInfo.Owner,
Attributes: make([][2]string, 2),
Payload: p.Reader,
}
prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID
prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber)
id, hash, err := n.objectPutAndHash(ctx, prm, bktInfo)
if err != nil {
return nil, err
}
partInfo := &data.PartInfo{
Key: p.Info.Key,
UploadID: p.Info.UploadID,
Number: p.PartNumber,
OID: *id,
}
oldPartID, err := n.treeService.AddPart(ctx, &bktInfo.CID, multipartInfo.ID, partInfo)
if err != nil {
return nil, err
}
if oldPartID != nil {
if err = n.objectDelete(ctx, bktInfo, *oldPartID); err != nil {
n.log.Error("couldn't delete old part object", zap.Error(err),
zap.String("cnrID", bktInfo.CID.EncodeToString()),
zap.String("bucket name", bktInfo.Name),
zap.String("objID", oldPartID.EncodeToString()))
}
}
objInfo := &data.ObjectInfo{
ID: *id,
CID: bktInfo.CID,
Owner: bktInfo.Owner,
Bucket: bktInfo.Name,
Size: p.Size,
Created: time.Now(),
HashSum: hex.EncodeToString(hash),
}
if err = n.objCache.PutObject(objInfo); err != nil {
n.log.Error("couldn't cache system object", zap.Error(err))
}
return objInfo, nil
}
func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) {
if _, err := n.GetUploadInitInfo(ctx, p.Info); err != nil {
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, &p.Info.Bkt.CID, p.Info.Key, p.Info.UploadID)
if err != nil {
if stderrors.Is(err, ErrNodeNotFound) {
return nil, errors.GetAPIError(errors.ErrNoSuchUpload)
}
return nil, err
}
@ -192,7 +248,7 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
pr, pw := io.Pipe()
go func() {
err := n.GetObject(ctx, &GetObjectParams{
err = n.GetObject(ctx, &GetObjectParams{
ObjectInfo: p.SrcObjInfo,
Writer: pw,
Range: p.Range,
@ -204,14 +260,14 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
}
}()
return n.PutSystemObject(ctx, &PutSystemObjectParams{
BktInfo: p.Info.Bkt,
ObjName: FormUploadPartName(p.Info.UploadID, p.Info.Key, p.PartNumber),
Metadata: metadata,
Prefix: "",
Reader: pr,
Size: size,
})
params := &UploadPartParams{
Info: p.Info,
PartNumber: p.PartNumber,
Size: size,
Reader: pr,
}
return n.uploadPart(ctx, multipartInfo, params)
}
// implements io.Reader of payloads of the object list stored in the NeoFS network.

View file

@ -50,8 +50,14 @@ type TreeService interface {
GetSystemVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*data.BaseNodeVersion, error)
RemoveSystemVersion(ctx context.Context, cnrID *cid.ID, nodeID uint64) error
CreateMultipart(ctx context.Context, cnrID *cid.ID, info *data.MultipartInfo) error
CreateMultipartUpload(ctx context.Context, cnrID *cid.ID, info *data.MultipartInfo) error
GetMultipartUploadsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.MultipartInfo, error)
GetMultipartUpload(ctx context.Context, cnrID *cid.ID, objectName, uploadID string) (*data.MultipartInfo, error)
// AddPart puts a node to a system tree as a child of appropriate multipart upload
// and returns objectID of a previous part which must be deleted in NeoFS.
// If a part is being added for the first time, the previous part ID will be nil.
AddPart(ctx context.Context, cnrID *cid.ID, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete *oid.ID, err error)
}
// ErrNodeNotFound is returned from Tree service in case of not found error.

View file

@ -55,6 +55,7 @@ const (
isUnversionedKV = "IsUnversioned"
isTagKV = "isTag"
uploadIDKV = "UploadId"
partNumberKV = "Number"
// keys for delete marker nodes.
isDeleteMarkerKV = "IdDeleteMarker"
@ -179,6 +180,7 @@ func newNodeVersionFromTreeNode(treeNode *TreeNode) *data.NodeVersion {
func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) {
multipartInfo := &data.MultipartInfo{
ID: node.GetNodeId(),
Meta: make(map[string]string, len(node.GetMeta())),
}
@ -206,6 +208,27 @@ func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) {
return multipartInfo, nil
}
func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
partInfo := &data.PartInfo{}
for _, kv := range node.GetMeta() {
switch kv.GetKey() {
case partNumberKV:
partInfo.Number, _ = strconv.Atoi(string(kv.GetValue()))
case oidKV:
if err := partInfo.OID.DecodeString(string(kv.GetValue())); err != nil {
return nil, fmt.Errorf("invalid oid: %w", err)
}
}
}
if partInfo.Number <= 0 {
return nil, fmt.Errorf("it's not a part node")
}
return partInfo, nil
}
func (c *TreeClient) GetSettingsNode(ctx context.Context, cnrID *cid.ID) (*data.BucketSettings, error) {
keysToReturn := []string{versioningEnabledKV, lockConfigurationKV}
node, err := c.getSystemNode(ctx, cnrID, []string{settingsFileName}, keysToReturn)
@ -720,7 +743,7 @@ func (c *TreeClient) RemoveSystemVersion(ctx context.Context, cnrID *cid.ID, id
return c.removeNode(ctx, cnrID, systemTree, id)
}
func (c *TreeClient) CreateMultipart(ctx context.Context, cnrID *cid.ID, info *data.MultipartInfo) error {
func (c *TreeClient) CreateMultipartUpload(ctx context.Context, cnrID *cid.ID, info *data.MultipartInfo) error {
path := pathFromName(info.Key)
meta := metaFromMultipart(info)
@ -763,6 +786,68 @@ func (c *TreeClient) getSubTreeMultipartUploads(ctx context.Context, cnrID *cid.
return result, nil
}
func (c *TreeClient) GetMultipartUpload(ctx context.Context, cnrID *cid.ID, objectName, uploadID string) (*data.MultipartInfo, error) {
path := pathFromName(objectName)
p := &getNodesParams{
CnrID: cnrID,
TreeID: systemTree,
Path: path,
AllAttrs: true,
}
nodes, err := c.getNodes(ctx, p)
if err != nil {
return nil, err
}
for _, node := range nodes {
info, err := newMultipartInfo(node)
if err != nil {
continue
}
if info.UploadID == uploadID {
return info, nil
}
}
return nil, layer.ErrNodeNotFound
}
func (c *TreeClient) AddPart(ctx context.Context, cnrID *cid.ID, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete *oid.ID, err error) {
parts, err := c.getSubTree(ctx, cnrID, systemTree, multipartNodeID, 1)
if err != nil {
return nil, err
}
meta := map[string]string{
partNumberKV: strconv.Itoa(info.Number),
oidKV: info.OID.EncodeToString(),
}
var foundPartID uint64
for _, part := range parts {
if part.GetNodeId() == multipartNodeID {
continue
}
partInfo, err := newPartInfo(part)
if err != nil {
continue
}
if partInfo.Number == info.Number {
foundPartID = part.GetNodeId()
oldObjIDToDelete = &partInfo.OID
break
}
}
if oldObjIDToDelete == nil {
_, err = c.addNode(ctx, cnrID, systemTree, multipartNodeID, meta)
return nil, err
}
return oldObjIDToDelete, c.moveNode(ctx, cnrID, systemTree, foundPartID, multipartNodeID, meta)
}
func (c *TreeClient) Close() error {
if c.conn != nil {
return c.conn.Close()

View file

@ -236,10 +236,18 @@ func (t *TreeServiceMock) GetAllVersionsByPrefix(ctx context.Context, cnrID *cid
panic("implement me")
}
func (t *TreeServiceMock) CreateMultipart(ctx context.Context, cnrID *cid.ID, info *data.MultipartInfo) error {
func (t *TreeServiceMock) CreateMultipartUpload(ctx context.Context, cnrID *cid.ID, info *data.MultipartInfo) error {
panic("implement me")
}
func (t *TreeServiceMock) GetMultipartUploadsByPrefix(ctx context.Context, cnrID *cid.ID, prefix string) ([]*data.MultipartInfo, error) {
panic("implement me")
}
func (t *TreeServiceMock) GetMultipartUpload(ctx context.Context, cnrID *cid.ID, objectName, uploadID string) (*data.MultipartInfo, error) {
panic("implement me")
}
func (t *TreeServiceMock) AddPart(ctx context.Context, cnrID *cid.ID, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete *oid.ID, err error) {
panic("implement me")
}