From 36f3c43af56b72402f6af2ee5ea529547ff1eabf Mon Sep 17 00:00:00 2001 From: Denis Kirillov Date: Tue, 17 May 2022 17:56:05 +0300 Subject: [PATCH] [#413] Use tree service to put object Signed-off-by: Denis Kirillov --- api/layer/object.go | 36 ++------ api/layer/system_object.go | 22 ++--- api/layer/tree_service.go | 23 +++++ internal/neofs/tree.go | 166 +++++++++++++++++++++++++++++++++++++ 4 files changed, 201 insertions(+), 46 deletions(-) diff --git a/api/layer/object.go b/api/layer/object.go index f73206b..7a107af 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -171,18 +171,14 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object own := n.Owner(ctx) versioningEnabled := n.isVersioningEnabled(ctx, p.BktInfo) - versions, err := n.headVersions(ctx, p.BktInfo, p.Object) - if err != nil && !apiErrors.IsS3Error(err, apiErrors.ErrNoSuchKey) { - return nil, err - } - idsToDeleteArr := updateCRDT2PSetHeaders(p.Header, versions, versioningEnabled) + newVersion := &NodeVersion{IsUnversioned: !versioningEnabled} r := p.Reader if r != nil { if len(p.Header[api.ContentType]) == 0 { if contentType := MimeByFileName(p.Object); len(contentType) == 0 { d := newDetector(r) - if contentType, err = d.Detect(); err == nil { + if contentType, err := d.Detect(); err == nil { p.Header[api.ContentType] = contentType } r = d.MultiReader() @@ -206,17 +202,16 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object prm.Attributes = append(prm.Attributes, [2]string{k, v}) } - if p.Header[VersionsDeleteMarkAttr] == DelMarkFullObject { - if last := versions.getLast(); last != nil { - n.objCache.Delete(last.Address()) - } - } - id, hash, err := n.objectPutAndHash(ctx, prm, p.BktInfo) if err != nil { return nil, err } + newVersion.OID = id + if err = n.treeService.AddVersion(ctx, &p.BktInfo.CID, p.Object, newVersion); err != nil { + return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err) + } + currentEpoch, _, err := n.neoFS.TimeToEpoch(ctx, time.Now().Add(time.Minute)) if err != nil { n.log.Warn("couldn't get creation epoch", @@ -242,23 +237,6 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Object n.listsCache.CleanCacheEntriesContainingObject(p.Object, p.BktInfo.CID) - for _, id := range idsToDeleteArr { - if err = n.objectDelete(ctx, p.BktInfo, id); err != nil { - n.log.Warn("couldn't delete object", - zap.Stringer("version id", id), - zap.Error(err)) - } - if !versioningEnabled { - if objVersion := versions.getVersion(id); objVersion != nil { - if err = n.DeleteObjectTagging(ctx, p.BktInfo, objVersion); err != nil { - n.log.Warn("couldn't delete object tagging", - zap.Stringer("version id", id), - zap.Error(err)) - } - } - } - } - return &data.ObjectInfo{ ID: *id, CID: p.BktInfo.CID, diff --git a/api/layer/system_object.go b/api/layer/system_object.go index 2757571..4624368 100644 --- a/api/layer/system_object.go +++ b/api/layer/system_object.go @@ -75,14 +75,6 @@ func (n *layer) DeleteSystemObject(ctx context.Context, bktInfo *data.BucketInfo } func (n *layer) putSystemObjectIntoNeoFS(ctx context.Context, p *PutSystemObjectParams) (*data.ObjectInfo, error) { - versions, err := n.headSystemVersions(ctx, p.BktInfo, p.ObjName) - if err != nil && !errors.IsS3Error(err, errors.ErrNoSuchKey) { - return nil, err - } - - idsToDeleteArr := updateCRDT2PSetHeaders(p.Metadata, versions, false) // false means "last write wins" - // note that updateCRDT2PSetHeaders modifies p.Metadata and must be called further processing - prm := PrmObjectCreate{ Container: p.BktInfo.CID, Creator: p.BktInfo.Owner, @@ -121,6 +113,11 @@ func (n *layer) putSystemObjectIntoNeoFS(ctx context.Context, p *PutSystemObject return nil, err } + newVersion := &BaseNodeVersion{OID: id} + if err = n.treeService.AddSystemVersion(ctx, &p.BktInfo.CID, p.ObjName, newVersion); err != nil { + return nil, fmt.Errorf("couldn't add new verion to tree service: %w", err) + } + currentEpoch, _, err := n.neoFS.TimeToEpoch(ctx, time.Now().Add(time.Minute)) if err != nil { n.log.Warn("couldn't get creation epoch", @@ -129,15 +126,6 @@ func (n *layer) putSystemObjectIntoNeoFS(ctx context.Context, p *PutSystemObject zap.Error(err)) } - for _, id := range idsToDeleteArr { - if err = n.objectDelete(ctx, p.BktInfo, id); err != nil { - n.log.Warn("couldn't delete system object", - zap.Stringer("version id", id), - zap.String("name", misc.SanitizeString(p.ObjName)), - zap.Error(err)) - } - } - headers := make(map[string]string, len(p.Metadata)) for _, attr := range prm.Attributes { headers[attr[0]] = attr[1] diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index 53a231a..321664d 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -29,6 +29,29 @@ type TreeService interface { PutBucketCORS(ctx context.Context, cnrID *cid.ID, objID *oid.ID) (*oid.ID, error) // DeleteBucketCORS removes a node from a system tree and returns objID which must be deleted in NeoFS DeleteBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID, error) + + GetVersions(ctx context.Context, cnrID *cid.ID, objectName string) ([]*NodeVersion, error) + + //GetUnversioned(context.Context, *cid.ID, string) (*NodeVersion, error) + + AddVersion(ctx context.Context, cnrID *cid.ID, objectName string, newVersion *NodeVersion) error + + RemoveVersion(ctx context.Context, cnrID *cid.ID, nodeID uint64) error + + AddSystemVersion(ctx context.Context, cnrID *cid.ID, objectName string, newVersion *BaseNodeVersion) error + + RemoveSystemVersion(ctx context.Context, cnrID *cid.ID, nodeID uint64) error +} + +type NodeVersion struct { + BaseNodeVersion + IsDeleteMarker bool + IsUnversioned bool +} + +type BaseNodeVersion struct { + ID uint64 + OID *oid.ID } // ErrNodeNotFound is returned from Tree service in case of not found error. diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 5121c09..c7aacaa 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -36,6 +36,7 @@ const ( oidKv = "OID" fileNameKV = "FileName" systemNameKV = "SystemName" + isUnversionedKV = "IsUnversioned" settingsFileName = "bucket-settings" notifConfFileName = "bucket-notifications" @@ -44,6 +45,11 @@ const ( // bucketSystemObjectsTreeID -- ID of a tree with system objects for bucket // i.e. bucket settings with versioning and lock configuration, cors, notifications bucketSystemObjectsTreeID = "system-bucket" + + versionTree = "version" + systemTree = "system" + + separator = "/" ) // NewTreeClient creates instance of TreeClient using provided address and create grpc connection. @@ -201,6 +207,47 @@ func (c *TreeClient) DeleteBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid. return nil, nil } +func (c *TreeClient) GetVersions(ctx context.Context, cnrID *cid.ID, filepath string) ([]*layer.NodeVersion, error) { + return c.getVersions(ctx, cnrID, versionTree, filepath, false) +} + +func (c *TreeClient) getUnversioned(ctx context.Context, cnrID *cid.ID, treeID, filepath string) (*layer.NodeVersion, error) { + nodes, err := c.getVersions(ctx, cnrID, treeID, filepath, true) + if err != nil { + return nil, err + } + + if len(nodes) > 1 { + return nil, fmt.Errorf("found more than one unversioned node") + } + + if len(nodes) != 1 { + return nil, layer.ErrNotFound + } + + return nodes[0], nil +} + +func (c *TreeClient) AddVersion(ctx context.Context, cnrID *cid.ID, filepath string, version *layer.NodeVersion) error { + return c.addVersion(ctx, cnrID, versionTree, filepath, version) +} + +func (c *TreeClient) AddSystemVersion(ctx context.Context, cnrID *cid.ID, filepath string, version *layer.BaseNodeVersion) error { + newVersion := &layer.NodeVersion{ + BaseNodeVersion: *version, + IsUnversioned: true, + } + return c.addVersion(ctx, cnrID, systemTree, filepath, newVersion) +} + +func (c *TreeClient) RemoveVersion(ctx context.Context, cnrID *cid.ID, id uint64) error { + return c.removeVersion(ctx, cnrID, versionTree, id) +} + +func (c *TreeClient) RemoveSystemVersion(ctx context.Context, cnrID *cid.ID, id uint64) error { + return c.removeVersion(ctx, cnrID, systemTree, id) +} + func (c *TreeClient) Close() error { if c.conn != nil { return c.conn.Close() @@ -209,6 +256,110 @@ func (c *TreeClient) Close() error { return nil } +func (c *TreeClient) addVersion(ctx context.Context, cnrID *cid.ID, treeID, filepath string, version *layer.NodeVersion) error { + path := strings.Split(filepath, separator) + meta := map[string]string{ + oidKV: version.OID.EncodeToString(), + fileNameKV: path[len(path)-1], + } + + if version.IsUnversioned { + meta[isUnversionedKV] = "true" + + node, err := c.getUnversioned(ctx, cnrID, treeID, filepath) + if err == nil { + parentID, err := c.getParent(ctx, cnrID, treeID, node.ID) + if err != nil { + return err + } + + return c.moveNode(ctx, cnrID, treeID, version.ID, parentID, meta) + } + + if !errors.Is(err, layer.ErrNotFound) { + return err + } + } + + return c.addNodeByPath(ctx, cnrID, treeID, path[:len(path)-1], meta) +} + +func (c *TreeClient) removeVersion(ctx context.Context, cnrID *cid.ID, treeID string, id uint64) error { + request := &tree.RemoveRequest{ + Body: &tree.RemoveRequest_Body{ + ContainerId: []byte(cnrID.EncodeToString()), + TreeId: treeID, + NodeId: id, + }, + } + + _, err := c.service.Remove(ctx, request) + return err +} + +func (c *TreeClient) getVersions(ctx context.Context, cnrID *cid.ID, treeID, filepath string, onlyUnversioned bool) ([]*layer.NodeVersion, error) { + keysToReturn := []string{versioningEnabledKV, lockConfigurationKV} + path := strings.Split(filepath, separator) + nodes, err := c.getNodes(ctx, cnrID, treeID, fileNameKV, path, keysToReturn) + if err != nil { + if strings.Contains(err.Error(), "not found") { + return nil, nil + } + return nil, fmt.Errorf("couldn't get nodes: %w", err) + } + + result := make([]*layer.NodeVersion, 0, len(nodes)) + for _, node := range nodes { + treeNode := newNode(node) + + objIDStr, ok := treeNode.Get(oidKV) + if !ok { + continue + } + var objId oid.ID + if err = objId.DecodeString(objIDStr); err != nil { + return nil, fmt.Errorf("invalid object id '%s': %w", objIDStr, err) + } + + _, isUnversioned := treeNode.Get(isUnversionedKV) + if onlyUnversioned && !isUnversioned { + continue + } + + result = append(result, &layer.NodeVersion{ + BaseNodeVersion: layer.BaseNodeVersion{ + ID: node.NodeId, + OID: &objId, + }, + IsUnversioned: isUnversioned, + }) + } + + return result, nil +} + +func (c *TreeClient) getParent(ctx context.Context, cnrID *cid.ID, treeID string, id uint64) (uint64, error) { + request := &tree.GetSubTreeRequest{ + Body: &tree.GetSubTreeRequest_Body{ + ContainerId: []byte(cnrID.EncodeToString()), + TreeId: treeID, + RootId: id, + }, + } + + cli, err := c.service.GetSubTree(ctx, request) + if err != nil { + return 0, fmt.Errorf("failed to get sub tree client: %w", err) + } + + resp, err := cli.Recv() + if err != nil { + return 0, fmt.Errorf("failed to get sub tree: %w", err) + } + + return resp.GetBody().GetParentId(), nil +} + func metaFromSettings(settings *data.BucketSettings) map[string]string { results := make(map[string]string, 3) @@ -264,6 +415,21 @@ func (c *TreeClient) addNode(ctx context.Context, cnrID *cid.ID, treeID string, return resp.GetBody().GetNodeId(), nil } +func (c *TreeClient) addNodeByPath(ctx context.Context, cnrID *cid.ID, treeID string, path []string, meta map[string]string) error { + request := &tree.AddByPathRequest{ + Body: &tree.AddByPathRequest_Body{ + ContainerId: []byte(cnrID.EncodeToString()), + TreeId: treeID, + Path: path, + Meta: metaToKV(meta), + PathAttribute: fileNameKV, + }, + } + + _, err := c.service.AddByPath(ctx, request) + return err +} + func (c *TreeClient) moveNode(ctx context.Context, cnrID *cid.ID, treeID string, nodeID, parentID uint64, meta map[string]string) error { request := &tree.MoveRequest{ Body: &tree.MoveRequest_Body{