[#417] Create multipart upload using tree service

Signed-off-by: Denis Kirillov <denis@nspcc.ru>
Signed-off-by: Alex Vanin <alexey@nspcc.ru>
This commit is contained in:
Denis Kirillov 2022-05-23 17:34:13 +03:00 committed by Alex Vanin
parent 24bad60048
commit 13e01164d7
6 changed files with 135 additions and 81 deletions

View file

@ -42,3 +42,11 @@ type ObjectTaggingInfo struct {
ObjName string
VersionID string
}
// MultipartInfo is multipart upload information.
type MultipartInfo struct {
UploadID string
Owner user.ID
Created time.Time
Meta map[string]string
}

View file

@ -100,8 +100,6 @@ const (
)
func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Request) {
/* initiation of multipart uploads is implemented via creation of "system" upload part with 0 part number
(min value of partNumber of a common part is 1) and holding data: metadata, acl, tagging */
reqInfo := api.GetReqInfo(r.Context())
bktInfo, err := h.getBucketAndCheckOwner(r, reqInfo.BucketName)
@ -110,22 +108,19 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re
return
}
var (
hasData bool
b []byte
uploadID := uuid.New()
additional := []zap.Field{
zap.String("uploadID", uploadID.String()),
zap.String("Key", reqInfo.ObjectName),
}
uploadID = uuid.New()
data = &UploadData{}
additional = []zap.Field{
zap.String("uploadID", uploadID.String()),
zap.String("Key", reqInfo.ObjectName),
}
uploadInfo = &layer.UploadInfoParams{
p := &layer.CreateMultipartParams{
Info: &layer.UploadInfoParams{
UploadID: uploadID.String(),
Bkt: bktInfo,
Key: reqInfo.ObjectName,
}
)
},
}
if containsACLHeaders(r) {
key, err := h.bearerTokenIssuerKey(r.Context())
@ -133,53 +128,35 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re
h.logAndSendError(w, "couldn't get gate key", reqInfo, err)
return
}
data.ACL, err = parseACLHeaders(r.Header, key)
if err != nil {
if _, err = parseACLHeaders(r.Header, key); err != nil {
h.logAndSendError(w, "could not parse acl", reqInfo, err)
return
}
hasData = true
p.ACLHeaders = formACLHeadersForMultipart(r.Header)
}
if len(r.Header.Get(api.AmzTagging)) > 0 {
data.TagSet, err = parseTaggingHeader(r.Header)
p.TagSet, err = parseTaggingHeader(r.Header)
if err != nil {
h.logAndSendError(w, "could not parse tagging", reqInfo, err, additional...)
return
}
hasData = true
}
metadata := parseMetadata(r)
p.Header = parseMetadata(r)
if contentType := r.Header.Get(api.ContentType); len(contentType) > 0 {
metadata[api.ContentType] = contentType
p.Header[api.ContentType] = contentType
}
p := &layer.UploadPartParams{
Info: uploadInfo,
PartNumber: 0,
Header: metadata,
}
if hasData {
b, err = json.Marshal(data)
if err != nil {
h.logAndSendError(w, "could not marshal json with acl and/or tagging", reqInfo, err, additional...)
return
}
p.Reader = bytes.NewReader(b)
}
info, err := h.obj.UploadPart(r.Context(), p)
if err != nil {
if err = h.obj.CreateMultipartUpload(r.Context(), p); err != nil {
h.logAndSendError(w, "could not upload a part", reqInfo, err, additional...)
return
}
resp := InitiateMultipartUploadResponse{
Bucket: info.Bucket,
Key: info.Headers[layer.UploadKeyAttributeName],
UploadID: info.Headers[layer.UploadIDAttributeName],
Bucket: reqInfo.BucketName,
Key: reqInfo.ObjectName,
UploadID: uploadID.String(),
}
if err = api.EncodeToResponse(w, resp); err != nil {
@ -188,6 +165,25 @@ func (h *handler) CreateMultipartUploadHandler(w http.ResponseWriter, r *http.Re
}
}
func formACLHeadersForMultipart(header http.Header) map[string]string {
result := make(map[string]string)
if value := header.Get(api.AmzACL); value != "" {
result[api.AmzACL] = value
}
if value := header.Get(api.AmzGrantRead); value != "" {
result[api.AmzGrantRead] = value
}
if value := header.Get(api.AmzGrantFullControl); value != "" {
result[api.AmzGrantFullControl] = value
}
if value := header.Get(api.AmzGrantWrite); value != "" {
result[api.AmzGrantWrite] = value
}
return result
}
func (h *handler) UploadPartHandler(w http.ResponseWriter, r *http.Request) {
reqInfo := api.GetReqInfo(r.Context())

View file

@ -231,6 +231,7 @@ type (
DeleteObjects(ctx context.Context, p *DeleteObjectParams) ([]*VersionedObject, error)
DeleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error
CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error
CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*data.ObjectInfo, error)
UploadPart(ctx context.Context, p *UploadPartParams) (*data.ObjectInfo, error)
UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error)

View file

@ -26,6 +26,9 @@ const (
UploadCompletedParts = "S3-Completed-Parts"
UploadPartKeyPrefix = ".upload-"
metaPrefix = "meta-"
aclPrefix = "acl-"
MaxSizeUploadsList = 1000
MaxSizePartsList = 1000
UploadMinPartNumber = 1
@ -41,12 +44,18 @@ type (
Key string
}
CreateMultipartParams struct {
Info *UploadInfoParams
Header map[string]string
TagSet map[string]string
ACLHeaders map[string]string
}
UploadPartParams struct {
Info *UploadInfoParams
PartNumber int
Size int64
Reader io.Reader
Header map[string]string
}
UploadCopyParams struct {
@ -113,6 +122,29 @@ type (
}
)
func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartParams) error {
info := &data.MultipartInfo{
UploadID: p.Info.UploadID,
Owner: n.Owner(ctx),
Created: time.Now(),
Meta: make(map[string]string, len(p.Header)+len(p.ACLHeaders)+len(p.TagSet)),
}
for key, val := range p.Header {
info.Meta[metaPrefix+key] = val
}
for key, val := range p.ACLHeaders {
info.Meta[aclPrefix+key] = val
}
for key, val := range p.TagSet {
info.Meta[tagPrefix+key] = val
}
return n.treeService.CreateMultipart(ctx, &p.Info.Bkt.CID, p.Info.Key, info)
}
func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (*data.ObjectInfo, error) {
if p.PartNumber != 0 {
if _, err := n.GetUploadInitInfo(ctx, p.Info); err != nil {
@ -124,17 +156,13 @@ func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (*data.Obje
return nil, errors.GetAPIError(errors.ErrEntityTooLarge)
}
if p.Header == nil {
p.Header = make(map[string]string)
}
appendUploadHeaders(p.Header, p.Info.UploadID, p.Info.Key, p.PartNumber)
header := make(map[string]string)
appendUploadHeaders(header, p.Info.UploadID, p.Info.Key, p.PartNumber)
params := &PutSystemObjectParams{
BktInfo: p.Info.Bkt,
ObjName: FormUploadPartName(p.Info.UploadID, p.Info.Key, p.PartNumber),
Metadata: p.Header,
Prefix: "",
Metadata: header,
Reader: p.Reader,
Size: p.Size,
}
@ -609,5 +637,4 @@ func uploadInfoFromMeta(meta *object.Object, prefix, delimiter string) *UploadIn
func appendUploadHeaders(metadata map[string]string, uploadID, key string, partNumber int) {
metadata[UploadIDAttributeName] = uploadID
metadata[UploadPartNumberAttributeName] = strconv.Itoa(partNumber)
metadata[UploadKeyAttributeName] = key
}

View file

@ -49,6 +49,8 @@ type TreeService interface {
AddSystemVersion(ctx context.Context, cnrID *cid.ID, objectName string, newVersion *data.BaseNodeVersion) error
GetSystemVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*data.BaseNodeVersion, error)
RemoveSystemVersion(ctx context.Context, cnrID *cid.ID, nodeID uint64) error
CreateMultipart(ctx context.Context, cnrID *cid.ID, objectName string, info *data.MultipartInfo) error
}
// ErrNodeNotFound is returned from Tree service in case of not found error.

View file

@ -39,7 +39,6 @@ type (
getNodesParams struct {
CnrID *cid.ID
TreeID string
PathAttr string
Path []string
Meta []string
LatestOnly bool
@ -55,8 +54,9 @@ const (
systemNameKV = "SystemName"
isUnversionedKV = "IsUnversioned"
isTagKV = "isTag"
uploadIDKV = "UploadId"
// keys for delete marker nodes
// keys for delete marker nodes.
isDeleteMarkerKV = "IdDeleteMarker"
filePathKV = "FilePath"
ownerKV = "Owner"
@ -179,7 +179,7 @@ func newNodeVersionFromTreeNode(treeNode *TreeNode) *data.NodeVersion {
func (c *TreeClient) GetSettingsNode(ctx context.Context, cnrID *cid.ID) (*data.BucketSettings, error) {
keysToReturn := []string{versioningEnabledKV, lockConfigurationKV}
node, err := c.getSystemNode(ctx, cnrID, systemTree, []string{settingsFileName}, keysToReturn)
node, err := c.getSystemNode(ctx, cnrID, []string{settingsFileName}, keysToReturn)
if err != nil {
return nil, fmt.Errorf("couldn't get node: %w", err)
}
@ -202,7 +202,7 @@ func (c *TreeClient) GetSettingsNode(ctx context.Context, cnrID *cid.ID) (*data.
}
func (c *TreeClient) PutSettingsNode(ctx context.Context, cnrID *cid.ID, settings *data.BucketSettings) error {
node, err := c.getSystemNode(ctx, cnrID, systemTree, []string{settingsFileName}, []string{})
node, err := c.getSystemNode(ctx, cnrID, []string{settingsFileName}, []string{})
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return fmt.Errorf("couldn't get node: %w", err)
@ -219,7 +219,7 @@ func (c *TreeClient) PutSettingsNode(ctx context.Context, cnrID *cid.ID, setting
}
func (c *TreeClient) GetNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID) (*oid.ID, error) {
node, err := c.getSystemNode(ctx, cnrID, systemTree, []string{notifConfFileName}, []string{oidKV})
node, err := c.getSystemNode(ctx, cnrID, []string{notifConfFileName}, []string{oidKV})
if err != nil {
return nil, err
}
@ -228,7 +228,7 @@ func (c *TreeClient) GetNotificationConfigurationNode(ctx context.Context, cnrID
}
func (c *TreeClient) PutNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID, objID *oid.ID) (*oid.ID, error) {
node, err := c.getSystemNode(ctx, cnrID, systemTree, []string{notifConfFileName}, []string{oidKV})
node, err := c.getSystemNode(ctx, cnrID, []string{notifConfFileName}, []string{oidKV})
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return nil, fmt.Errorf("couldn't get node: %w", err)
@ -247,7 +247,7 @@ func (c *TreeClient) PutNotificationConfigurationNode(ctx context.Context, cnrID
}
func (c *TreeClient) GetBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID, error) {
node, err := c.getSystemNode(ctx, cnrID, systemTree, []string{corsFilename}, []string{oidKV})
node, err := c.getSystemNode(ctx, cnrID, []string{corsFilename}, []string{oidKV})
if err != nil {
return nil, err
}
@ -256,7 +256,7 @@ func (c *TreeClient) GetBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID,
}
func (c *TreeClient) PutBucketCORS(ctx context.Context, cnrID *cid.ID, objID *oid.ID) (*oid.ID, error) {
node, err := c.getSystemNode(ctx, cnrID, systemTree, []string{corsFilename}, []string{oidKV})
node, err := c.getSystemNode(ctx, cnrID, []string{corsFilename}, []string{oidKV})
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return nil, fmt.Errorf("couldn't get node: %w", err)
@ -275,7 +275,7 @@ func (c *TreeClient) PutBucketCORS(ctx context.Context, cnrID *cid.ID, objID *oi
}
func (c *TreeClient) DeleteBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID, error) {
node, err := c.getSystemNode(ctx, cnrID, systemTree, []string{corsFilename}, []string{oidKV})
node, err := c.getSystemNode(ctx, cnrID, []string{corsFilename}, []string{oidKV})
if err != nil && !errors.Is(err, layer.ErrNodeNotFound) {
return nil, err
}
@ -344,7 +344,7 @@ func (c *TreeClient) DeleteObjectTagging(ctx context.Context, cnrID *cid.ID, obj
}
func (c *TreeClient) GetBucketTagging(ctx context.Context, cnrID *cid.ID) (map[string]string, error) {
node, err := c.getSystemNodeWithAllAttributes(ctx, cnrID, systemTree, []string{bucketTaggingFilename})
node, err := c.getSystemNodeWithAllAttributes(ctx, cnrID, []string{bucketTaggingFilename})
if err != nil {
if strings.Contains(err.Error(), "not found") {
return nil, layer.ErrNodeNotFound
@ -364,7 +364,7 @@ func (c *TreeClient) GetBucketTagging(ctx context.Context, cnrID *cid.ID) (map[s
}
func (c *TreeClient) PutBucketTagging(ctx context.Context, cnrID *cid.ID, tagSet map[string]string) error {
node, err := c.getSystemNode(ctx, cnrID, systemTree, []string{bucketTaggingFilename}, []string{})
node, err := c.getSystemNode(ctx, cnrID, []string{bucketTaggingFilename}, []string{})
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound {
return fmt.Errorf("couldn't get node: %w", err)
@ -387,7 +387,7 @@ func (c *TreeClient) PutBucketTagging(ctx context.Context, cnrID *cid.ID, tagSet
}
func (c *TreeClient) DeleteBucketTagging(ctx context.Context, cnrID *cid.ID) error {
node, err := c.getSystemNode(ctx, cnrID, systemTree, []string{bucketTaggingFilename}, nil)
node, err := c.getSystemNode(ctx, cnrID, []string{bucketTaggingFilename}, nil)
if err != nil && !errors.Is(err, layer.ErrNodeNotFound) {
return err
}
@ -429,7 +429,7 @@ func (c *TreeClient) GetLatestVersion(ctx context.Context, cnrID *cid.ID, object
meta := []string{oidKV, isUnversionedKV, isDeleteMarkerKV}
path := pathFromName(objectName)
return c.getLatestVersion(ctx, cnrID, versionTree, fileNameKV, path, meta)
return c.getLatestVersion(ctx, cnrID, versionTree, path, meta)
}
// pathFromName splits name by '/' and add an empty marker if name has trailing slash.
@ -483,7 +483,6 @@ func (c *TreeClient) getPrefixNodeID(ctx context.Context, cnrID *cid.ID, prefixP
p := &getNodesParams{
CnrID: cnrID,
TreeID: versionTree,
PathAttr: fileNameKV,
Path: prefixPath,
Meta: []string{fileNameKV, oidKV},
LatestOnly: false,
@ -617,18 +616,17 @@ func (c *TreeClient) GetSystemVersion(ctx context.Context, cnrID *cid.ID, object
meta := []string{oidKV}
path := pathFromName(objectName)
node, err := c.getLatestVersion(ctx, cnrID, systemTree, systemNameKV, path, meta)
node, err := c.getLatestVersion(ctx, cnrID, systemTree, path, meta)
if err != nil {
return nil, err
}
return &node.BaseNodeVersion, nil
}
func (c *TreeClient) getLatestVersion(ctx context.Context, cnrID *cid.ID, treeID, attrPath string, path, meta []string) (*data.NodeVersion, error) {
func (c *TreeClient) getLatestVersion(ctx context.Context, cnrID *cid.ID, treeID string, path, meta []string) (*data.NodeVersion, error) {
p := &getNodesParams{
CnrID: cnrID,
TreeID: treeID,
PathAttr: attrPath,
Path: path,
Meta: meta,
LatestOnly: true,
@ -671,7 +669,7 @@ func (c *TreeClient) getUnversioned(ctx context.Context, cnrID *cid.ID, treeID,
}
func (c *TreeClient) AddVersion(ctx context.Context, cnrID *cid.ID, filepath string, version *data.NodeVersion) error {
return c.addVersion(ctx, cnrID, versionTree, fileNameKV, filepath, version)
return c.addVersion(ctx, cnrID, versionTree, filepath, version)
}
func (c *TreeClient) AddSystemVersion(ctx context.Context, cnrID *cid.ID, filepath string, version *data.BaseNodeVersion) error {
@ -679,7 +677,7 @@ func (c *TreeClient) AddSystemVersion(ctx context.Context, cnrID *cid.ID, filepa
BaseNodeVersion: *version,
IsUnversioned: true,
}
return c.addVersion(ctx, cnrID, systemTree, systemNameKV, filepath, newVersion)
return c.addVersion(ctx, cnrID, systemTree, filepath, newVersion)
}
func (c *TreeClient) RemoveVersion(ctx context.Context, cnrID *cid.ID, id uint64) error {
@ -690,6 +688,13 @@ func (c *TreeClient) RemoveSystemVersion(ctx context.Context, cnrID *cid.ID, id
return c.removeNode(ctx, cnrID, systemTree, id)
}
func (c *TreeClient) CreateMultipart(ctx context.Context, cnrID *cid.ID, objectName string, info *data.MultipartInfo) error {
path := pathFromName(objectName)
meta := metaFromMultipart(info)
return c.addNodeByPath(ctx, cnrID, systemTree, path, meta)
}
func (c *TreeClient) Close() error {
if c.conn != nil {
return c.conn.Close()
@ -698,11 +703,11 @@ func (c *TreeClient) Close() error {
return nil
}
func (c *TreeClient) addVersion(ctx context.Context, cnrID *cid.ID, treeID, attrPath, filepath string, version *data.NodeVersion) error {
func (c *TreeClient) addVersion(ctx context.Context, cnrID *cid.ID, treeID, filepath string, version *data.NodeVersion) error {
path := pathFromName(filepath)
meta := map[string]string{
oidKV: version.OID.EncodeToString(),
attrPath: path[len(path)-1],
oidKV: version.OID.EncodeToString(),
pathAttributeFromTreeID(treeID): path[len(path)-1],
}
if version.DeleteMarker != nil {
@ -739,7 +744,6 @@ func (c *TreeClient) getVersions(ctx context.Context, cnrID *cid.ID, treeID, fil
p := &getNodesParams{
CnrID: cnrID,
TreeID: treeID,
PathAttr: fileNameKV,
Path: path,
Meta: keysToReturn,
LatestOnly: false,
@ -834,19 +838,26 @@ func metaFromSettings(settings *data.BucketSettings) map[string]string {
return results
}
func (c *TreeClient) getSystemNode(ctx context.Context, cnrID *cid.ID, treeID string, path, meta []string) (*TreeNode, error) {
return c.getNode(ctx, cnrID, treeID, systemNameKV, path, meta, false)
func metaFromMultipart(info *data.MultipartInfo) map[string]string {
info.Meta[uploadIDKV] = info.UploadID
info.Meta[ownerKV] = info.Owner.EncodeToString()
info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10)
return info.Meta
}
func (c *TreeClient) getSystemNodeWithAllAttributes(ctx context.Context, cnrID *cid.ID, treeID string, path []string) (*TreeNode, error) {
return c.getNode(ctx, cnrID, treeID, systemNameKV, path, []string{}, true)
func (c *TreeClient) getSystemNode(ctx context.Context, cnrID *cid.ID, path, meta []string) (*TreeNode, error) {
return c.getNode(ctx, cnrID, systemTree, path, meta, false)
}
func (c *TreeClient) getNode(ctx context.Context, cnrID *cid.ID, treeID, pathAttr string, path, meta []string, allAttrs bool) (*TreeNode, error) {
func (c *TreeClient) getSystemNodeWithAllAttributes(ctx context.Context, cnrID *cid.ID, path []string) (*TreeNode, error) {
return c.getNode(ctx, cnrID, systemTree, path, []string{}, true)
}
func (c *TreeClient) getNode(ctx context.Context, cnrID *cid.ID, treeID string, path, meta []string, allAttrs bool) (*TreeNode, error) {
p := &getNodesParams{
CnrID: cnrID,
TreeID: treeID,
PathAttr: pathAttr,
Path: path,
Meta: meta,
LatestOnly: false,
@ -876,7 +887,7 @@ func (c *TreeClient) getNodes(ctx context.Context, p *getNodesParams) ([]*tree.G
TreeId: p.TreeID,
Path: p.Path,
Attributes: p.Meta,
PathAttribute: p.PathAttr,
PathAttribute: pathAttributeFromTreeID(p.TreeID),
LatestOnly: p.LatestOnly,
AllAttributes: p.AllAttrs,
BearerToken: getBearer(ctx),
@ -943,7 +954,7 @@ func (c *TreeClient) addNodeByPath(ctx context.Context, cnrID *cid.ID, treeID st
TreeId: treeID,
Path: path,
Meta: metaToKV(meta),
PathAttribute: fileNameKV,
PathAttribute: pathAttributeFromTreeID(treeID),
BearerToken: getBearer(ctx),
},
}
@ -961,6 +972,15 @@ func (c *TreeClient) addNodeByPath(ctx context.Context, cnrID *cid.ID, treeID st
return err
}
func pathAttributeFromTreeID(treeID string) string {
switch treeID {
case systemTree:
return systemNameKV
default:
return fileNameKV
}
}
func (c *TreeClient) moveNode(ctx context.Context, cnrID *cid.ID, treeID string, nodeID, parentID uint64, meta map[string]string) error {
request := &tree.MoveRequest{
Body: &tree.MoveRequest_Body{