From 752095279229f732cb517e2570cac38bb5946971 Mon Sep 17 00:00:00 2001 From: Angira Kekteeva Date: Thu, 12 May 2022 04:50:52 +0300 Subject: [PATCH] [#429] Add tree service for notifications Signed-off-by: Angira Kekteeva --- api/layer/notifications.go | 66 ++++++++++++++++++++++++-------------- api/layer/tree_service.go | 5 +++ internal/neofs/tree.go | 47 ++++++++++++++++++++++++++- 3 files changed, 93 insertions(+), 25 deletions(-) diff --git a/api/layer/notifications.go b/api/layer/notifications.go index 024cbbf8..588729a3 100644 --- a/api/layer/notifications.go +++ b/api/layer/notifications.go @@ -4,11 +4,11 @@ import ( "bytes" "context" "encoding/xml" + errorsStd "errors" "fmt" "github.com/nspcc-dev/neofs-s3-gw/api" "github.com/nspcc-dev/neofs-s3-gw/api/data" - "github.com/nspcc-dev/neofs-s3-gw/api/errors" "go.uber.org/zap" ) @@ -24,20 +24,43 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu return fmt.Errorf("marshal notify configuration: %w", err) } + ids, nodeIds, err := n.treeService.GetNotificationConfigurationNodes(ctx, &p.BktInfo.CID, false) + if err != nil && !errorsStd.Is(err, ErrNodeNotFound) { + return err + } + + sysName := p.BktInfo.NotificationConfigurationObjectName() + s := &PutSystemObjectParams{ BktInfo: p.BktInfo, - ObjName: p.BktInfo.NotificationConfigurationObjectName(), + ObjName: sysName, Metadata: map[string]string{}, Reader: bytes.NewReader(confXML), Size: int64(len(confXML)), } - _, err = n.putSystemObjectIntoNeoFS(ctx, s) + obj, err := n.putSystemObjectIntoNeoFS(ctx, s) if err != nil { return err } - if err = n.systemCache.PutNotificationConfiguration(systemObjectKey(p.BktInfo, s.ObjName), p.Configuration); err != nil { + if err = n.treeService.PutNotificationConfigurationNode(ctx, &p.BktInfo.CID, &obj.ID); err != nil { + return err + } + + for i := 0; i < len(ids); i++ { + if err = n.objectDelete(ctx, p.BktInfo, *ids[i]); err != nil { + n.log.Error("couldn't delete notification configuration object", zap.Error(err), + zap.String("cnrID", p.BktInfo.CID.EncodeToString()), + zap.String("bucket name", p.BktInfo.Name), + zap.String("objID", ids[i].EncodeToString())) + } + if err = n.treeService.DeleteNotificationConfigurationNode(ctx, &p.BktInfo.CID, nodeIds[i]); err != nil { + return err + } + } + + if err = n.systemCache.PutNotificationConfiguration(systemObjectKey(p.BktInfo, sysName), p.Configuration); err != nil { n.log.Error("couldn't cache system object", zap.Error(err)) } @@ -45,38 +68,33 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu } func (n *layer) GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error) { - conf, err := n.getNotificationConf(ctx, bktInfo, bktInfo.NotificationConfigurationObjectName()) - if err != nil { - if errors.IsS3Error(err, errors.ErrNoSuchKey) { - return &data.NotificationConfiguration{}, nil - } - return nil, err - } + systemCacheKey := systemObjectKey(bktInfo, bktInfo.NotificationConfigurationObjectName()) - return conf, nil -} - -func (n *layer) getNotificationConf(ctx context.Context, bkt *data.BucketInfo, sysName string) (*data.NotificationConfiguration, error) { - if conf := n.systemCache.GetNotificationConfiguration(systemObjectKey(bkt, sysName)); conf != nil { + if conf := n.systemCache.GetNotificationConfiguration(systemCacheKey); conf != nil { return conf, nil } - obj, err := n.getSystemObjectFromNeoFS(ctx, bkt, sysName) - if err != nil { + ids, _, err := n.treeService.GetNotificationConfigurationNodes(ctx, &bktInfo.CID, true) + if err != nil && !errorsStd.Is(err, ErrNodeNotFound) { return nil, err } conf := &data.NotificationConfiguration{} - if err = xml.Unmarshal(obj.Payload(), &conf); err != nil { - return nil, fmt.Errorf("unmarshal notify configuration: %w", err) + if len(ids) != 0 { + obj, err := n.objectGet(ctx, bktInfo, *ids[0]) + if err != nil { + return nil, err + } + + if err = xml.Unmarshal(obj.Payload(), &conf); err != nil { + return nil, fmt.Errorf("unmarshal notify configuration: %w", err) + } } - if err = n.systemCache.PutNotificationConfiguration(systemObjectKey(bkt, sysName), conf); err != nil { - objID, _ := obj.ID() + if err = n.systemCache.PutNotificationConfiguration(systemCacheKey, conf); err != nil { n.log.Warn("couldn't put system meta to objects cache", - zap.Stringer("object id", &objID), - zap.Stringer("bucket id", bkt.CID), + zap.Stringer("bucket id", bktInfo.CID), zap.Error(err)) } diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index 4a4d79f1..586a8ada 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -6,6 +6,7 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/data" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" ) // TreeService provide interface to interact with tree service using s3 data models. @@ -17,6 +18,10 @@ type TreeService interface { // // If node is not found returns ErrNodeNotFound error. GetSettingsNode(context.Context, *cid.ID, string) (*data.BucketSettings, error) + + GetNotificationConfigurationNodes(ctx context.Context, cnrID *cid.ID, latestOnly bool) ([]*oid.ID, []uint64, error) + PutNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID, objID *oid.ID) error + DeleteNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID, nodeID uint64) error } // ErrNodeNotFound is returned from Tree service in case of not found error. diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 2d5e321d..9b1cbe93 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -37,7 +37,10 @@ const ( fileNameKV = "FileName" systemNameKV = "SystemName" - settingsFileName = "bucket-settings" + settingsFileName = "bucket-settings" + notifConfFileName = "bucket-notifications" + + notifTreeID = "notifications" ) // NewTreeClient creates instance of TreeClient using provided address and create grpc connection. @@ -126,6 +129,36 @@ func (c *TreeClient) PutSettingsNode(ctx context.Context, cnrID *cid.ID, treeID return c.moveNode(ctx, cnrID, treeID, node.ID, 0, meta) } +func (c *TreeClient) GetNotificationConfigurationNodes(ctx context.Context, cnrID *cid.ID, latestOnly bool) ([]*oid.ID, []uint64, error) { + nodes, err := c.getSystemNodesWithOID(ctx, cnrID, notifTreeID, notifConfFileName, []string{}, latestOnly) + if err != nil { + return nil, nil, err + } + + ids := make([]*oid.ID, 0, len(nodes)) + nodeIds := make([]uint64, 0, len(nodes)) + + for _, n := range nodes { + ids = append(ids, n.ObjID) + nodeIds = append(nodeIds, n.ID) + } + + return ids, nodeIds, nil +} + +func (c *TreeClient) PutNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID, objID *oid.ID) error { + meta := make(map[string]string) + meta[systemNameKV] = notifConfFileName + meta[oidKv] = objID.EncodeToString() + + _, err := c.addNode(ctx, cnrID, notifTreeID, 0, meta) + return err +} + +func (c *TreeClient) DeleteNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID, nodeID uint64) error { + return c.removeNode(ctx, cnrID, notifTreeID, nodeID) +} + func (c *TreeClient) Close() error { if c.conn != nil { return c.conn.Close() @@ -239,6 +272,18 @@ func (c *TreeClient) moveNode(ctx context.Context, cnrID *cid.ID, treeID string, return err } +func (c *TreeClient) removeNode(ctx context.Context, cnrID *cid.ID, treeID string, nodeID uint64) error { + r := &tree.RemoveRequest{ + Body: &tree.RemoveRequest_Body{ + ContainerId: []byte(cnrID.EncodeToString()), + TreeId: treeID, + NodeId: nodeID, + }, + } + _, err := c.service.Remove(ctx, r) + return err +} + func metaToKV(meta map[string]string) []*tree.KeyValue { result := make([]*tree.KeyValue, 0, len(meta))