From 49bd77d9cf63e41b0c4d1a9de09f1653ad952dba Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 17 May 2022 17:56:05 +0300 Subject: [PATCH] [#413] Use tree service to head objects Signed-off-by: Denis Kirillov --- api/layer/object.go | 25 ++++-- api/layer/system_object.go | 12 ++- api/layer/tree_service.go | 6 +- internal/neofs/tree.go | 166 +++++++++++++++++++++++++------------ 4 files changed, 147 insertions(+), 62 deletions(-) diff --git a/api/layer/object.go b/api/layer/object.go index 7a107af1..fb653723 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -328,23 +328,34 @@ func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.Bucke } } - versions, err := n.headVersions(ctx, bkt, objectName) + node, err := n.treeService.GetLatestVersion(ctx, &bkt.CID, objectName) if err != nil { return nil, err } - lastVersion := versions.getLast() - if lastVersion == nil { + if node.IsDeleteMarker { return nil, apiErrors.GetAPIError(apiErrors.ErrNoSuchKey) } - if err = n.namesCache.Put(lastVersion.NiceName(), lastVersion.Address()); err != nil { - n.log.Warn("couldn't put obj address to head cache", - zap.String("obj nice name", lastVersion.NiceName()), + meta, err := n.objectHead(ctx, bkt, *node.OID) + if err != nil { + return nil, err + } + if err = n.objCache.Put(*meta); err != nil { + n.log.Warn("couldn't put meta to objects cache", + zap.Stringer("object id", node.OID), + zap.Stringer("bucket id", bkt.CID), zap.Error(err)) } - return lastVersion, nil + objInfo := objInfoFromMeta(bkt, meta) + if err = n.namesCache.Put(objInfo.NiceName(), objInfo.Address()); err != nil { + n.log.Warn("couldn't put obj address to head cache", + zap.String("obj nice name", objInfo.NiceName()), + zap.Error(err)) + } + + return objInfo, nil } func (n *layer) headVersions(ctx context.Context, bkt *data.BucketInfo, objectName string) (*objectVersions, error) { diff --git a/api/layer/system_object.go b/api/layer/system_object.go index 46243684..e8057925 100644 --- a/api/layer/system_object.go +++ b/api/layer/system_object.go @@ -42,16 +42,22 @@ func (n *layer) HeadSystemObject(ctx context.Context, bkt *data.BucketInfo, objN return objInfo, nil } - versions, err := n.headSystemVersions(ctx, bkt, objName) + node, err := n.treeService.GetSystemVersion(ctx, &bkt.CID, objName) if err != nil { return nil, err } - if err = n.systemCache.PutObject(systemObjectKey(bkt, objName), versions.getLast()); err != nil { + meta, err := n.objectHead(ctx, bkt, *node.OID) + if err != nil { + return nil, err + } + + objInfo := objInfoFromMeta(bkt, meta) + if err = n.systemCache.PutObject(systemObjectKey(bkt, objName), objInfo); err != nil { n.log.Error("couldn't cache system object", zap.Error(err)) } - return versions.getLast(), nil + return objInfo, nil } func (n *layer) DeleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo, name string) error { diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index 321664d4..fde0af82 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -32,7 +32,9 @@ type TreeService interface { GetVersions(ctx context.Context, cnrID *cid.ID, objectName string) ([]*NodeVersion, error) - //GetUnversioned(context.Context, *cid.ID, string) (*NodeVersion, error) + GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*NodeVersion, error) + + GetUnversioned(ctx context.Context, cnrID *cid.ID, objectName string) (*NodeVersion, error) AddVersion(ctx context.Context, cnrID *cid.ID, objectName string, newVersion *NodeVersion) error @@ -40,6 +42,8 @@ type TreeService interface { AddSystemVersion(ctx context.Context, cnrID *cid.ID, objectName string, newVersion *BaseNodeVersion) error + GetSystemVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*BaseNodeVersion, error) + RemoveSystemVersion(ctx context.Context, cnrID *cid.ID, nodeID uint64) error } diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index c7aacaa0..a1357632 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -33,10 +33,11 @@ type ( const ( versioningEnabledKV = "versioning_enabled" lockConfigurationKV = "lock_configuration" - oidKv = "OID" + oidKV = "OID" fileNameKV = "FileName" systemNameKV = "SystemName" isUnversionedKV = "IsUnversioned" + isDeleteMarkerKV = "IdDeleteMarker" settingsFileName = "bucket-settings" notifConfFileName = "bucket-notifications" @@ -72,7 +73,7 @@ func newTreeNode(nodeInfo *tree.GetNodeByPathResponse_Info) (*TreeNode, error) { meta := make(map[string]string, len(nodeInfo.GetMeta())) for _, kv := range nodeInfo.GetMeta() { - if kv.GetKey() == oidKv { + if kv.GetKey() == oidKV { objID = new(oid.ID) err := objID.DecodeString(string(kv.GetValue())) if err != nil { @@ -97,9 +98,29 @@ func (n *TreeNode) Get(key string) (string, bool) { return value, ok } +func newNodeVersion(node *tree.GetNodeByPathResponse_Info) (*layer.NodeVersion, error) { + treeNode, err := newTreeNode(node) + if err != nil { + return nil, fmt.Errorf("invalid tree node: %w", err) + } + + _, isUnversioned := treeNode.Get(isUnversionedKV) + _, isDeleteMarker := treeNode.Get(isDeleteMarkerKV) + + return &layer.NodeVersion{ + BaseNodeVersion: layer.BaseNodeVersion{ + ID: node.NodeId, + OID: treeNode.ObjID, + }, + IsUnversioned: isUnversioned, + IsDeleteMarker: isDeleteMarker, + }, nil +} + func (c *TreeClient) GetSettingsNode(ctx context.Context, cnrID *cid.ID) (*data.BucketSettings, error) { keysToReturn := []string{versioningEnabledKV, lockConfigurationKV} - node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, settingsFileName, keysToReturn) + path := []string{settingsFileName} + node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, path, keysToReturn) if err != nil { return nil, fmt.Errorf("couldn't get node: %w", err) } @@ -122,7 +143,8 @@ func (c *TreeClient) GetSettingsNode(ctx context.Context, cnrID *cid.ID) (*data. } func (c *TreeClient) PutSettingsNode(ctx context.Context, cnrID *cid.ID, settings *data.BucketSettings) error { - node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, settingsFileName, []string{}) + path := []string{settingsFileName} + node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, path, []string{}) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) if err != nil && !isErrNotFound { return fmt.Errorf("couldn't get node: %w", err) @@ -139,7 +161,8 @@ func (c *TreeClient) PutSettingsNode(ctx context.Context, cnrID *cid.ID, setting } func (c *TreeClient) GetNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID) (*oid.ID, error) { - node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, notifConfFileName, []string{oidKv}) + path := []string{notifConfFileName} + node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, path, []string{oidKV}) if err != nil { return nil, err } @@ -148,7 +171,8 @@ func (c *TreeClient) GetNotificationConfigurationNode(ctx context.Context, cnrID } func (c *TreeClient) PutNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID, objID *oid.ID) (*oid.ID, error) { - node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, notifConfFileName, []string{oidKv}) + path := []string{notifConfFileName} + node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, path, []string{oidKV}) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) if err != nil && !isErrNotFound { return nil, fmt.Errorf("couldn't get node: %w", err) @@ -156,7 +180,7 @@ func (c *TreeClient) PutNotificationConfigurationNode(ctx context.Context, cnrID meta := make(map[string]string) meta[systemNameKV] = notifConfFileName - meta[oidKv] = objID.EncodeToString() + meta[oidKV] = objID.EncodeToString() if isErrNotFound { _, err = c.addNode(ctx, cnrID, bucketSystemObjectsTreeID, 0, meta) @@ -167,7 +191,7 @@ func (c *TreeClient) PutNotificationConfigurationNode(ctx context.Context, cnrID } func (c *TreeClient) GetBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID, error) { - node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, corsFilename, []string{oidKv}) + node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, []string{corsFilename}, []string{oidKV}) if err != nil { return nil, err } @@ -176,7 +200,7 @@ func (c *TreeClient) GetBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID, } func (c *TreeClient) PutBucketCORS(ctx context.Context, cnrID *cid.ID, objID *oid.ID) (*oid.ID, error) { - node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, corsFilename, []string{oidKv}) + node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, []string{corsFilename}, []string{oidKV}) isErrNotFound := errors.Is(err, layer.ErrNodeNotFound) if err != nil && !isErrNotFound { return nil, fmt.Errorf("couldn't get node: %w", err) @@ -184,7 +208,7 @@ func (c *TreeClient) PutBucketCORS(ctx context.Context, cnrID *cid.ID, objID *oi meta := make(map[string]string) meta[systemNameKV] = corsFilename - meta[oidKv] = objID.EncodeToString() + meta[oidKV] = objID.EncodeToString() if isErrNotFound { _, err = c.addNode(ctx, cnrID, bucketSystemObjectsTreeID, 0, meta) @@ -195,7 +219,7 @@ func (c *TreeClient) PutBucketCORS(ctx context.Context, cnrID *cid.ID, objID *oi } func (c *TreeClient) DeleteBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID, error) { - node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, corsFilename, []string{oidKv}) + node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, []string{corsFilename}, []string{oidKV}) if err != nil && !errors.Is(err, layer.ErrNodeNotFound) { return nil, err } @@ -211,6 +235,40 @@ func (c *TreeClient) GetVersions(ctx context.Context, cnrID *cid.ID, filepath st return c.getVersions(ctx, cnrID, versionTree, filepath, false) } +func (c *TreeClient) GetLatestVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*layer.NodeVersion, error) { + meta := []string{oidKV, isUnversionedKV, isDeleteMarkerKV} + path := strings.Split(objectName, separator) + + return c.getLatestVersion(ctx, cnrID, versionTree, fileNameKV, path, meta) +} + +func (c *TreeClient) GetSystemVersion(ctx context.Context, cnrID *cid.ID, objectName string) (*layer.BaseNodeVersion, error) { + meta := []string{oidKV} + path := strings.Split(objectName, separator) + + node, err := c.getLatestVersion(ctx, cnrID, systemTree, systemNameKV, path, meta) + if err != nil { + return nil, err + } + return &node.BaseNodeVersion, nil +} + +func (c *TreeClient) getLatestVersion(ctx context.Context, cnrID *cid.ID, treeID, attrPath string, path, meta []string) (*layer.NodeVersion, error) { + nodes, err := c.getNodes(ctx, cnrID, treeID, attrPath, path, meta, true) + if err != nil { + if strings.Contains(err.Error(), "not found") { + return nil, layer.ErrNodeNotFound + } + return nil, fmt.Errorf("couldn't get nodes: %w", err) + } + + return newNodeVersion(nodes[0]) +} + +func (c *TreeClient) GetUnversioned(ctx context.Context, cnrID *cid.ID, filepath string) (*layer.NodeVersion, error) { + return c.getUnversioned(ctx, cnrID, versionTree, filepath) +} + func (c *TreeClient) getUnversioned(ctx context.Context, cnrID *cid.ID, treeID, filepath string) (*layer.NodeVersion, error) { nodes, err := c.getVersions(ctx, cnrID, treeID, filepath, true) if err != nil { @@ -222,14 +280,14 @@ func (c *TreeClient) getUnversioned(ctx context.Context, cnrID *cid.ID, treeID, } if len(nodes) != 1 { - return nil, layer.ErrNotFound + return nil, layer.ErrNodeNotFound } return nodes[0], nil } func (c *TreeClient) AddVersion(ctx context.Context, cnrID *cid.ID, filepath string, version *layer.NodeVersion) error { - return c.addVersion(ctx, cnrID, versionTree, filepath, version) + return c.addVersion(ctx, cnrID, versionTree, fileNameKV, filepath, version) } func (c *TreeClient) AddSystemVersion(ctx context.Context, cnrID *cid.ID, filepath string, version *layer.BaseNodeVersion) error { @@ -237,7 +295,7 @@ func (c *TreeClient) AddSystemVersion(ctx context.Context, cnrID *cid.ID, filepa BaseNodeVersion: *version, IsUnversioned: true, } - return c.addVersion(ctx, cnrID, systemTree, filepath, newVersion) + return c.addVersion(ctx, cnrID, systemTree, systemNameKV, filepath, newVersion) } func (c *TreeClient) RemoveVersion(ctx context.Context, cnrID *cid.ID, id uint64) error { @@ -256,11 +314,11 @@ func (c *TreeClient) Close() error { return nil } -func (c *TreeClient) addVersion(ctx context.Context, cnrID *cid.ID, treeID, filepath string, version *layer.NodeVersion) error { +func (c *TreeClient) addVersion(ctx context.Context, cnrID *cid.ID, treeID, attrPath, filepath string, version *layer.NodeVersion) error { path := strings.Split(filepath, separator) meta := map[string]string{ - oidKV: version.OID.EncodeToString(), - fileNameKV: path[len(path)-1], + oidKV: version.OID.EncodeToString(), + attrPath: path[len(path)-1], } if version.IsUnversioned { @@ -276,7 +334,7 @@ func (c *TreeClient) addVersion(ctx context.Context, cnrID *cid.ID, treeID, file return c.moveNode(ctx, cnrID, treeID, version.ID, parentID, meta) } - if !errors.Is(err, layer.ErrNotFound) { + if !errors.Is(err, layer.ErrNodeNotFound) { return err } } @@ -300,7 +358,7 @@ func (c *TreeClient) removeVersion(ctx context.Context, cnrID *cid.ID, treeID st func (c *TreeClient) getVersions(ctx context.Context, cnrID *cid.ID, treeID, filepath string, onlyUnversioned bool) ([]*layer.NodeVersion, error) { keysToReturn := []string{versioningEnabledKV, lockConfigurationKV} path := strings.Split(filepath, separator) - nodes, err := c.getNodes(ctx, cnrID, treeID, fileNameKV, path, keysToReturn) + nodes, err := c.getNodes(ctx, cnrID, treeID, fileNameKV, path, keysToReturn, false) if err != nil { if strings.Contains(err.Error(), "not found") { return nil, nil @@ -310,29 +368,16 @@ func (c *TreeClient) getVersions(ctx context.Context, cnrID *cid.ID, treeID, fil result := make([]*layer.NodeVersion, 0, len(nodes)) for _, node := range nodes { - treeNode := newNode(node) - - objIDStr, ok := treeNode.Get(oidKV) - if !ok { - continue - } - var objId oid.ID - if err = objId.DecodeString(objIDStr); err != nil { - return nil, fmt.Errorf("invalid object id '%s': %w", objIDStr, err) + nodeVersion, err := newNodeVersion(node) + if err != nil { + return nil, err } - _, isUnversioned := treeNode.Get(isUnversionedKV) - if onlyUnversioned && !isUnversioned { + if onlyUnversioned && !nodeVersion.IsUnversioned { continue } - result = append(result, &layer.NodeVersion{ - BaseNodeVersion: layer.BaseNodeVersion{ - ID: node.NodeId, - OID: &objId, - }, - IsUnversioned: isUnversioned, - }) + result = append(result, nodeVersion) } return result, nil @@ -370,31 +415,50 @@ func metaFromSettings(settings *data.BucketSettings) map[string]string { return results } -func (c *TreeClient) getSystemNode(ctx context.Context, cnrID *cid.ID, treeID, path string, meta []string) (*TreeNode, error) { - request := &tree.GetNodeByPathRequest{ - Body: &tree.GetNodeByPathRequest_Body{ - ContainerId: []byte(cnrID.EncodeToString()), - TreeId: treeID, - Path: []string{path}, - Attributes: meta, - PathAttribute: systemNameKV, - }, - } - resp, err := c.service.GetNodeByPath(ctx, request) +func (c *TreeClient) getSystemNode(ctx context.Context, cnrID *cid.ID, treeID string, path, meta []string) (*TreeNode, error) { + return c.getNode(ctx, cnrID, treeID, systemNameKV, path, meta) +} + +func (c *TreeClient) getRegularNode(ctx context.Context, cnrID *cid.ID, treeID string, path, meta []string) (*TreeNode, error) { + return c.getNode(ctx, cnrID, treeID, fileNameKV, path, meta) +} + +func (c *TreeClient) getNode(ctx context.Context, cnrID *cid.ID, treeID, pathAttr string, path, meta []string) (*TreeNode, error) { + nodes, err := c.getNodes(ctx, cnrID, treeID, pathAttr, path, meta, false) if err != nil { if strings.Contains(err.Error(), "not found") { return nil, layer.ErrNodeNotFound } return nil, fmt.Errorf("couldn't get nodes: %w", err) } - if len(resp.Body.GetNodes()) == 0 { + if len(nodes) == 0 { return nil, layer.ErrNodeNotFound } - if len(resp.Body.GetNodes()) != 1 { + if len(nodes) != 1 { return nil, fmt.Errorf("found more than one node") } - return newTreeNode(resp.Body.Nodes[0]) + return newTreeNode(nodes[0]) +} + +func (c *TreeClient) getNodes(ctx context.Context, cnrID *cid.ID, treeID, pathAttr string, path, meta []string, latestOnly bool) ([]*tree.GetNodeByPathResponse_Info, error) { + request := &tree.GetNodeByPathRequest{ + Body: &tree.GetNodeByPathRequest_Body{ + ContainerId: []byte(cnrID.EncodeToString()), + TreeId: treeID, + Path: path, + Attributes: meta, + PathAttribute: pathAttr, + LatestOnly: latestOnly, + }, + } + + resp, err := c.service.GetNodeByPath(ctx, request) + if err != nil { + return nil, fmt.Errorf("failed to get node path: %w", err) + } + + return resp.GetBody().GetNodes(), nil } func (c *TreeClient) addNode(ctx context.Context, cnrID *cid.ID, treeID string, parent uint64, meta map[string]string) (uint64, error) {