forked from TrueCloudLab/frostfs-s3-gw
[#429] Refactor TreeService for notif,cors,settings
Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
This commit is contained in:
parent
8eff857e41
commit
ab5c44ac14
5 changed files with 88 additions and 127 deletions
|
@ -36,11 +36,6 @@ func (n *layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
|
|||
return err
|
||||
}
|
||||
|
||||
ids, nodeIds, err := n.treeService.GetBucketCORS(ctx, &p.BktInfo.CID, false)
|
||||
if err != nil && !errorsStd.Is(err, ErrNodeNotFound) {
|
||||
return err
|
||||
}
|
||||
|
||||
s := &PutSystemObjectParams{
|
||||
BktInfo: p.BktInfo,
|
||||
ObjName: p.BktInfo.CORSObjectName(),
|
||||
|
@ -55,19 +50,17 @@ func (n *layer) PutBucketCORS(ctx context.Context, p *PutCORSParams) error {
|
|||
return fmt.Errorf("put system object: %w", err)
|
||||
}
|
||||
|
||||
if err = n.treeService.PutBucketCORS(ctx, &p.BktInfo.CID, &obj.ID); err != nil {
|
||||
objIDToDelete, err := n.treeService.PutBucketCORS(ctx, &p.BktInfo.CID, &obj.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 0; i < len(ids); i++ {
|
||||
if err = n.objectDelete(ctx, p.BktInfo, *ids[i]); err != nil {
|
||||
if objIDToDelete != nil {
|
||||
if err = n.objectDelete(ctx, p.BktInfo, *objIDToDelete); err != nil {
|
||||
n.log.Error("couldn't delete cors object", zap.Error(err),
|
||||
zap.String("cnrID", p.BktInfo.CID.EncodeToString()),
|
||||
zap.String("bucket name", p.BktInfo.Name),
|
||||
zap.String("objID", ids[i].EncodeToString()))
|
||||
}
|
||||
if err = n.treeService.DeleteBucketCORS(ctx, &p.BktInfo.CID, nodeIds[i]); err != nil {
|
||||
return err
|
||||
zap.String("objID", objIDToDelete.EncodeToString()))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -91,16 +84,12 @@ func (n *layer) GetBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) (*d
|
|||
}
|
||||
|
||||
func (n *layer) DeleteBucketCORS(ctx context.Context, bktInfo *data.BucketInfo) error {
|
||||
ids, nodeIds, err := n.treeService.GetBucketCORS(ctx, &bktInfo.CID, false)
|
||||
if err != nil && !errorsStd.Is(err, ErrNodeNotFound) {
|
||||
objID, err := n.treeService.DeleteBucketCORS(ctx, &bktInfo.CID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 0; i < len(ids); i++ {
|
||||
if err = n.objectDelete(ctx, bktInfo, *ids[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
if err = n.treeService.DeleteBucketCORS(ctx, &bktInfo.CID, nodeIds[i]); err != nil {
|
||||
if objID != nil {
|
||||
if err = n.objectDelete(ctx, bktInfo, *objID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,11 +24,6 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu
|
|||
return fmt.Errorf("marshal notify configuration: %w", err)
|
||||
}
|
||||
|
||||
ids, nodeIds, err := n.treeService.GetNotificationConfigurationNodes(ctx, &p.BktInfo.CID, false)
|
||||
if err != nil && !errorsStd.Is(err, ErrNodeNotFound) {
|
||||
return err
|
||||
}
|
||||
|
||||
sysName := p.BktInfo.NotificationConfigurationObjectName()
|
||||
|
||||
s := &PutSystemObjectParams{
|
||||
|
@ -44,19 +39,17 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu
|
|||
return err
|
||||
}
|
||||
|
||||
if err = n.treeService.PutNotificationConfigurationNode(ctx, &p.BktInfo.CID, &obj.ID); err != nil {
|
||||
objIDToDelete, err := n.treeService.PutNotificationConfigurationNode(ctx, &p.BktInfo.CID, &obj.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 0; i < len(ids); i++ {
|
||||
if err = n.objectDelete(ctx, p.BktInfo, *ids[i]); err != nil {
|
||||
if objIDToDelete != nil {
|
||||
if err = n.objectDelete(ctx, p.BktInfo, *objIDToDelete); err != nil {
|
||||
n.log.Error("couldn't delete notification configuration object", zap.Error(err),
|
||||
zap.String("cnrID", p.BktInfo.CID.EncodeToString()),
|
||||
zap.String("bucket name", p.BktInfo.Name),
|
||||
zap.String("objID", ids[i].EncodeToString()))
|
||||
}
|
||||
if err = n.treeService.DeleteNotificationConfigurationNode(ctx, &p.BktInfo.CID, nodeIds[i]); err != nil {
|
||||
return err
|
||||
zap.String("objID", objIDToDelete.EncodeToString()))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -74,15 +67,15 @@ func (n *layer) GetBucketNotificationConfiguration(ctx context.Context, bktInfo
|
|||
return conf, nil
|
||||
}
|
||||
|
||||
ids, _, err := n.treeService.GetNotificationConfigurationNodes(ctx, &bktInfo.CID, true)
|
||||
objId, err := n.treeService.GetNotificationConfigurationNode(ctx, &bktInfo.CID)
|
||||
if err != nil && !errorsStd.Is(err, ErrNodeNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conf := &data.NotificationConfiguration{}
|
||||
|
||||
if len(ids) != 0 {
|
||||
obj, err := n.objectGet(ctx, bktInfo, *ids[0])
|
||||
if objId != nil {
|
||||
obj, err := n.objectGet(ctx, bktInfo, *objId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -181,15 +181,16 @@ func (n *layer) getCORS(ctx context.Context, bkt *data.BucketInfo, sysName strin
|
|||
if cors := n.systemCache.GetCORS(systemObjectKey(bkt, sysName)); cors != nil {
|
||||
return cors, nil
|
||||
}
|
||||
ids, _, err := n.treeService.GetBucketCORS(ctx, &bkt.CID, true)
|
||||
objID, err := n.treeService.GetBucketCORS(ctx, &bkt.CID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(ids) == 0 {
|
||||
|
||||
if objID == nil {
|
||||
return nil, errors.GetAPIError(errors.ErrNoSuchCORSConfiguration)
|
||||
}
|
||||
|
||||
obj, err := n.objectGet(ctx, bkt, *ids[0])
|
||||
obj, err := n.objectGet(ctx, bkt, *objID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -259,7 +260,7 @@ func (n *layer) GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo)
|
|||
return settings, nil
|
||||
}
|
||||
|
||||
settings, err := n.treeService.GetSettingsNode(ctx, &bktInfo.CID, "version")
|
||||
settings, err := n.treeService.GetSettingsNode(ctx, &bktInfo.CID)
|
||||
if err != nil {
|
||||
if !errorsStd.Is(err, ErrNodeNotFound) {
|
||||
return nil, err
|
||||
|
@ -278,7 +279,7 @@ func (n *layer) GetBucketSettings(ctx context.Context, bktInfo *data.BucketInfo)
|
|||
}
|
||||
|
||||
func (n *layer) PutBucketSettings(ctx context.Context, p *PutSettingsParams) error {
|
||||
if err := n.treeService.PutSettingsNode(ctx, &p.BktInfo.CID, "version", p.Settings); err != nil {
|
||||
if err := n.treeService.PutSettingsNode(ctx, &p.BktInfo.CID, p.Settings); err != nil {
|
||||
return fmt.Errorf("failed to get settings node: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
@ -12,20 +12,23 @@ import (
|
|||
// TreeService provide interface to interact with tree service using s3 data models.
|
||||
type TreeService interface {
|
||||
// PutSettingsNode update or create new settings node in tree service.
|
||||
PutSettingsNode(context.Context, *cid.ID, string, *data.BucketSettings) error
|
||||
PutSettingsNode(context.Context, *cid.ID, *data.BucketSettings) error
|
||||
|
||||
// GetSettingsNode retrieves the settings node from the tree service and form data.BucketSettings.
|
||||
//
|
||||
// If node is not found returns ErrNodeNotFound error.
|
||||
GetSettingsNode(context.Context, *cid.ID, string) (*data.BucketSettings, error)
|
||||
GetSettingsNode(context.Context, *cid.ID) (*data.BucketSettings, error)
|
||||
|
||||
GetNotificationConfigurationNodes(ctx context.Context, cnrID *cid.ID, latestOnly bool) ([]*oid.ID, []uint64, error)
|
||||
PutNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID, objID *oid.ID) error
|
||||
DeleteNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID, nodeID uint64) error
|
||||
GetNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID) (*oid.ID, error)
|
||||
// PutNotificationConfigurationNode puts a node to a system tree
|
||||
// and returns objectID of a previous notif config which must be deleted in NeoFS
|
||||
PutNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID, objID *oid.ID) (*oid.ID, error)
|
||||
|
||||
GetBucketCORS(ctx context.Context, cnrID *cid.ID, latestOnly bool) ([]*oid.ID, []uint64, error)
|
||||
PutBucketCORS(ctx context.Context, cnrID *cid.ID, objID *oid.ID) error
|
||||
DeleteBucketCORS(ctx context.Context, cnrID *cid.ID, nodeID uint64) error
|
||||
GetBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID, error)
|
||||
// PutBucketCORS puts a node to a system tree and returns objectID of a previous cors config which must be deleted in NeoFS
|
||||
PutBucketCORS(ctx context.Context, cnrID *cid.ID, objID *oid.ID) (*oid.ID, error)
|
||||
// DeleteBucketCORS removes a node from a system tree and returns objID which must be deleted in NeoFS
|
||||
DeleteBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID, error)
|
||||
}
|
||||
|
||||
// ErrNodeNotFound is returned from Tree service in case of not found error.
|
||||
|
|
|
@ -41,8 +41,9 @@ const (
|
|||
notifConfFileName = "bucket-notifications"
|
||||
corsFilename = "bucket-cors"
|
||||
|
||||
notifTreeID = "notifications"
|
||||
corsTreeID = "cors"
|
||||
// bucketSystemObjectsTreeID -- ID of a tree with system objects for bucket
|
||||
// i.e. bucket settings with versioning and lock configuration, cors, notifications
|
||||
bucketSystemObjectsTreeID = "system-bucket"
|
||||
)
|
||||
|
||||
// NewTreeClient creates instance of TreeClient using provided address and create grpc connection.
|
||||
|
@ -90,9 +91,9 @@ func (n *TreeNode) Get(key string) (string, bool) {
|
|||
return value, ok
|
||||
}
|
||||
|
||||
func (c *TreeClient) GetSettingsNode(ctx context.Context, cnrID *cid.ID, treeID string) (*data.BucketSettings, error) {
|
||||
func (c *TreeClient) GetSettingsNode(ctx context.Context, cnrID *cid.ID) (*data.BucketSettings, error) {
|
||||
keysToReturn := []string{versioningEnabledKV, lockConfigurationKV}
|
||||
node, err := c.getSystemNode(ctx, cnrID, treeID, settingsFileName, keysToReturn)
|
||||
node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, settingsFileName, keysToReturn)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("couldn't get node: %w", err)
|
||||
}
|
||||
|
@ -114,8 +115,8 @@ func (c *TreeClient) GetSettingsNode(ctx context.Context, cnrID *cid.ID, treeID
|
|||
return settings, nil
|
||||
}
|
||||
|
||||
func (c *TreeClient) PutSettingsNode(ctx context.Context, cnrID *cid.ID, treeID string, settings *data.BucketSettings) error {
|
||||
node, err := c.getSystemNode(ctx, cnrID, treeID, settingsFileName, []string{})
|
||||
func (c *TreeClient) PutSettingsNode(ctx context.Context, cnrID *cid.ID, settings *data.BucketSettings) error {
|
||||
node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, settingsFileName, []string{})
|
||||
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
||||
if err != nil && !isErrNotFound {
|
||||
return fmt.Errorf("couldn't get node: %w", err)
|
||||
|
@ -124,71 +125,80 @@ func (c *TreeClient) PutSettingsNode(ctx context.Context, cnrID *cid.ID, treeID
|
|||
meta := metaFromSettings(settings)
|
||||
|
||||
if isErrNotFound {
|
||||
_, err = c.addNode(ctx, cnrID, treeID, 0, meta)
|
||||
_, err = c.addNode(ctx, cnrID, bucketSystemObjectsTreeID, 0, meta)
|
||||
return err
|
||||
}
|
||||
|
||||
return c.moveNode(ctx, cnrID, treeID, node.ID, 0, meta)
|
||||
return c.moveNode(ctx, cnrID, bucketSystemObjectsTreeID, node.ID, 0, meta)
|
||||
}
|
||||
|
||||
func (c *TreeClient) GetNotificationConfigurationNodes(ctx context.Context, cnrID *cid.ID, latestOnly bool) ([]*oid.ID, []uint64, error) {
|
||||
nodes, err := c.getSystemNodesWithOID(ctx, cnrID, notifTreeID, notifConfFileName, []string{}, latestOnly)
|
||||
func (c *TreeClient) GetNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID) (*oid.ID, error) {
|
||||
node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, notifConfFileName, []string{oidKv})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ids := make([]*oid.ID, 0, len(nodes))
|
||||
nodeIds := make([]uint64, 0, len(nodes))
|
||||
|
||||
for _, n := range nodes {
|
||||
ids = append(ids, n.ObjID)
|
||||
nodeIds = append(nodeIds, n.ID)
|
||||
}
|
||||
|
||||
return ids, nodeIds, nil
|
||||
return node.ObjID, nil
|
||||
}
|
||||
|
||||
func (c *TreeClient) PutNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID, objID *oid.ID) error {
|
||||
func (c *TreeClient) PutNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID, objID *oid.ID) (*oid.ID, error) {
|
||||
node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, notifConfFileName, []string{oidKv})
|
||||
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
||||
if err != nil && !isErrNotFound {
|
||||
return nil, fmt.Errorf("couldn't get node: %w", err)
|
||||
}
|
||||
|
||||
meta := make(map[string]string)
|
||||
meta[systemNameKV] = notifConfFileName
|
||||
meta[oidKv] = objID.EncodeToString()
|
||||
|
||||
_, err := c.addNode(ctx, cnrID, notifTreeID, 0, meta)
|
||||
return err
|
||||
if isErrNotFound {
|
||||
_, err = c.addNode(ctx, cnrID, bucketSystemObjectsTreeID, 0, meta)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return node.ObjID, c.moveNode(ctx, cnrID, bucketSystemObjectsTreeID, node.ID, 0, meta)
|
||||
}
|
||||
|
||||
func (c *TreeClient) DeleteNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID, nodeID uint64) error {
|
||||
return c.removeNode(ctx, cnrID, notifTreeID, nodeID)
|
||||
}
|
||||
|
||||
func (c *TreeClient) GetBucketCORS(ctx context.Context, cnrID *cid.ID, latestOnly bool) ([]*oid.ID, []uint64, error) {
|
||||
nodes, err := c.getSystemNodesWithOID(ctx, cnrID, corsTreeID, corsFilename, []string{}, latestOnly)
|
||||
func (c *TreeClient) GetBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID, error) {
|
||||
node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, corsFilename, []string{oidKv})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ids := make([]*oid.ID, 0, len(nodes))
|
||||
nodeIds := make([]uint64, 0, len(nodes))
|
||||
|
||||
for _, n := range nodes {
|
||||
ids = append(ids, n.ObjID)
|
||||
nodeIds = append(nodeIds, n.ID)
|
||||
}
|
||||
|
||||
return ids, nodeIds, nil
|
||||
return node.ObjID, nil
|
||||
}
|
||||
|
||||
func (c *TreeClient) PutBucketCORS(ctx context.Context, cnrID *cid.ID, objID *oid.ID) error {
|
||||
func (c *TreeClient) PutBucketCORS(ctx context.Context, cnrID *cid.ID, objID *oid.ID) (*oid.ID, error) {
|
||||
node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, corsFilename, []string{oidKv})
|
||||
isErrNotFound := errors.Is(err, layer.ErrNodeNotFound)
|
||||
if err != nil && !isErrNotFound {
|
||||
return nil, fmt.Errorf("couldn't get node: %w", err)
|
||||
}
|
||||
|
||||
meta := make(map[string]string)
|
||||
meta[systemNameKV] = corsFilename
|
||||
meta[oidKv] = objID.EncodeToString()
|
||||
|
||||
_, err := c.addNode(ctx, cnrID, corsTreeID, 0, meta)
|
||||
return err
|
||||
if isErrNotFound {
|
||||
_, err = c.addNode(ctx, cnrID, bucketSystemObjectsTreeID, 0, meta)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return node.ObjID, c.moveNode(ctx, cnrID, bucketSystemObjectsTreeID, node.ID, 0, meta)
|
||||
}
|
||||
|
||||
func (c *TreeClient) DeleteBucketCORS(ctx context.Context, cnrID *cid.ID, nodeID uint64) error {
|
||||
return c.removeNode(ctx, cnrID, corsTreeID, nodeID)
|
||||
func (c *TreeClient) DeleteBucketCORS(ctx context.Context, cnrID *cid.ID) (*oid.ID, error) {
|
||||
node, err := c.getSystemNode(ctx, cnrID, bucketSystemObjectsTreeID, corsFilename, []string{oidKv})
|
||||
if err != nil && !errors.Is(err, layer.ErrNodeNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if node != nil {
|
||||
return node.ObjID, c.removeNode(ctx, cnrID, bucketSystemObjectsTreeID, node.ID)
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (c *TreeClient) Close() error {
|
||||
|
@ -236,41 +246,6 @@ func (c *TreeClient) getSystemNode(ctx context.Context, cnrID *cid.ID, treeID, p
|
|||
return newTreeNode(resp.Body.Nodes[0])
|
||||
}
|
||||
|
||||
func (c *TreeClient) getSystemNodesWithOID(ctx context.Context, cnrID *cid.ID, treeID, path string, meta []string, latestOnly bool) ([]*TreeNode, error) {
|
||||
meta = append(meta, oidKv)
|
||||
|
||||
r := &tree.GetNodeByPathRequest{
|
||||
Body: &tree.GetNodeByPathRequest_Body{
|
||||
ContainerId: []byte(cnrID.EncodeToString()),
|
||||
TreeId: treeID,
|
||||
PathAttribute: systemNameKV,
|
||||
Path: []string{path},
|
||||
Attributes: meta,
|
||||
LatestOnly: latestOnly,
|
||||
AllAttributes: false,
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := c.service.GetNodeByPath(ctx, r)
|
||||
if err != nil {
|
||||
if strings.Contains(err.Error(), "not found") {
|
||||
return nil, layer.ErrNodeNotFound
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nodes := make([]*TreeNode, 0, len(resp.Body.Nodes))
|
||||
for _, n := range resp.Body.GetNodes() {
|
||||
node, err := newTreeNode(n)
|
||||
if err != nil {
|
||||
|
||||
}
|
||||
nodes = append(nodes, node)
|
||||
}
|
||||
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
func (c *TreeClient) addNode(ctx context.Context, cnrID *cid.ID, treeID string, parent uint64, meta map[string]string) (uint64, error) {
|
||||
request := &tree.AddRequest{
|
||||
Body: &tree.AddRequest_Body{
|
||||
|
|
Loading…
Reference in a new issue