[#430] tree: Fix multipart having system name

Previously if multipart key has the same name as some system node
(e.g. bucket-settings, bucket-cors etc.) it shadows real system node
and bucket started to be unversioned again for example.

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
This commit is contained in:
Denis Kirillov 2024-07-16 17:36:52 +03:00
parent 456319d2f1
commit c0011ebb8d
3 changed files with 53 additions and 20 deletions

View file

@ -471,6 +471,16 @@ func putBucketVersioning(t *testing.T, tc *handlerContext, bktName string, enabl
assertStatus(t, w, http.StatusOK) assertStatus(t, w, http.StatusOK)
} }
func getBucketVersioning(hc *handlerContext, bktName string) *VersioningConfiguration {
w, r := prepareTestRequest(hc, bktName, "", nil)
hc.Handler().GetBucketVersioningHandler(w, r)
assertStatus(hc.t, w, http.StatusOK)
res := &VersioningConfiguration{}
parseTestResponse(hc.t, w, res)
return res
}
func deleteObject(t *testing.T, tc *handlerContext, bktName, objName, version string) (string, bool) { func deleteObject(t *testing.T, tc *handlerContext, bktName, objName, version string) (string, bool) {
query := make(url.Values) query := make(url.Values)
query.Add(api.QueryVersionID, version) query.Add(api.QueryVersionID, version)

View file

@ -68,6 +68,19 @@ func TestDeleteMultipartAllParts(t *testing.T) {
require.Empty(t, hc.tp.Objects()) require.Empty(t, hc.tp.Objects())
} }
func TestSpecialMultipartName(t *testing.T) {
hc := prepareHandlerContextWithMinCache(t)
bktName, objName := "bucket", "bucket-settings"
createTestBucket(hc, bktName)
putBucketVersioning(t, hc, bktName, true)
createMultipartUpload(hc, bktName, objName, nil)
res := getBucketVersioning(hc, bktName)
require.Equal(t, enabledValue, res.Status)
}
func TestMultipartReUploadPart(t *testing.T) { func TestMultipartReUploadPart(t *testing.T) {
hc := prepareHandlerContext(t) hc := prepareHandlerContext(t)

View file

@ -394,8 +394,7 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) {
} }
func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) { func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*data.BucketSettings, error) {
keysToReturn := []string{versioningKV, lockConfigurationKV, cannedACLKV, ownerKeyKV} node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName})
node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName}, keysToReturn)
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't get node: %w", err) return nil, fmt.Errorf("couldn't get node: %w", err)
} }
@ -423,7 +422,7 @@ func (c *Tree) GetSettingsNode(ctx context.Context, bktInfo *data.BucketInfo) (*
} }
func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, settings *data.BucketSettings) error { func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, settings *data.BucketSettings) error {
node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName}, []string{}) node, err := c.getSystemNode(ctx, bktInfo, []string{settingsFileName})
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound { if err != nil && !isErrNotFound {
return fmt.Errorf("couldn't get node: %w", err) return fmt.Errorf("couldn't get node: %w", err)
@ -445,7 +444,7 @@ func (c *Tree) PutSettingsNode(ctx context.Context, bktInfo *data.BucketInfo, se
} }
func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) { func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}, []string{oidKV}) node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename})
if err != nil { if err != nil {
return oid.ID{}, err return oid.ID{}, err
} }
@ -454,7 +453,7 @@ func (c *Tree) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid
} }
func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error) { func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objID oid.ID) (oid.ID, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}, []string{oidKV}) node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename})
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound { if err != nil && !isErrNotFound {
return oid.ID{}, fmt.Errorf("couldn't get node: %w", err) return oid.ID{}, fmt.Errorf("couldn't get node: %w", err)
@ -480,7 +479,7 @@ func (c *Tree) PutBucketCORS(ctx context.Context, bktInfo *data.BucketInfo, objI
} }
func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) { func (c *Tree) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (oid.ID, error) {
node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename}, []string{oidKV}) node, err := c.getSystemNode(ctx, bktInfo, []string{corsFilename})
if err != nil && !errors.Is(err, layer.ErrNodeNotFound) { if err != nil && !errors.Is(err, layer.ErrNodeNotFound) {
return oid.ID{}, err return oid.ID{}, err
} }
@ -553,7 +552,7 @@ func (c *Tree) DeleteObjectTagging(ctx context.Context, bktInfo *data.BucketInfo
} }
func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) { func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (map[string]string, error) {
node, err := c.getSystemNodeWithAllAttributes(ctx, bktInfo, []string{bucketTaggingFilename}) node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -570,7 +569,7 @@ func (c *Tree) GetBucketTagging(ctx context.Context, bktInfo *data.BucketInfo) (
} }
func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error { func (c *Tree) PutBucketTagging(ctx context.Context, bktInfo *data.BucketInfo, tagSet map[string]string) error {
node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename}, []string{}) node, err := c.getSystemNode(ctx, bktInfo, []string{bucketTaggingFilename})
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
if err != nil && !isErrNotFound { if err != nil && !isErrNotFound {
return fmt.Errorf("couldn't get node: %w", err) return fmt.Errorf("couldn't get node: %w", err)
@ -1559,27 +1558,21 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str
return info.Meta return info.Meta
} }
func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, path, meta []string) (*treeNode, error) { func (c *Tree) getSystemNode(ctx context.Context, bktInfo *data.BucketInfo, path []string) (*treeNode, error) {
return c.getNode(ctx, bktInfo, systemTree, path, meta, false)
}
func (c *Tree) getSystemNodeWithAllAttributes(ctx context.Context, bktInfo *data.BucketInfo, path []string) (*treeNode, error) {
return c.getNode(ctx, bktInfo, systemTree, path, []string{}, true)
}
func (c *Tree) getNode(ctx context.Context, bktInfo *data.BucketInfo, treeID string, path, meta []string, allAttrs bool) (*treeNode, error) {
p := &GetNodesParams{ p := &GetNodesParams{
BktInfo: bktInfo, BktInfo: bktInfo,
TreeID: treeID, TreeID: systemTree,
Path: path, Path: path,
Meta: meta,
LatestOnly: false, LatestOnly: false,
AllAttrs: allAttrs, AllAttrs: true,
} }
nodes, err := c.service.GetNodes(ctx, p) nodes, err := c.service.GetNodes(ctx, p)
if err != nil { if err != nil {
return nil, err return nil, err
} }
nodes = filterMultipartNodes(nodes)
if len(nodes) == 0 { if len(nodes) == 0 {
return nil, layer.ErrNodeNotFound return nil, layer.ErrNodeNotFound
} }
@ -1590,6 +1583,23 @@ func (c *Tree) getNode(ctx context.Context, bktInfo *data.BucketInfo, treeID str
return newTreeNode(getLatestNode(nodes)) return newTreeNode(getLatestNode(nodes))
} }
func filterMultipartNodes(nodes []NodeResponse) []NodeResponse {
res := make([]NodeResponse, 0, len(nodes))
LOOP:
for _, node := range nodes {
for _, meta := range node.GetMeta() {
if meta.GetKey() == uploadIDKV {
continue LOOP
}
}
res = append(res, node)
}
return res
}
func (c *Tree) reqLogger(ctx context.Context) *zap.Logger { func (c *Tree) reqLogger(ctx context.Context) *zap.Logger {
reqLogger := middleware.GetReqLog(ctx) reqLogger := middleware.GetReqLog(ctx)
if reqLogger != nil { if reqLogger != nil {