[#429] Add tree service for notifications
Signed-off-by: Angira Kekteeva <kira@nspcc.ru>
This commit is contained in:
parent
b03ae827fb
commit
7520952792
3 changed files with 93 additions and 25 deletions
|
@ -4,11 +4,11 @@ import (
|
|||
"bytes"
|
||||
"context"
|
||||
"encoding/xml"
|
||||
errorsStd "errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/errors"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
|
@ -24,20 +24,43 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu
|
|||
return fmt.Errorf("marshal notify configuration: %w", err)
|
||||
}
|
||||
|
||||
ids, nodeIds, err := n.treeService.GetNotificationConfigurationNodes(ctx, &p.BktInfo.CID, false)
|
||||
if err != nil && !errorsStd.Is(err, ErrNodeNotFound) {
|
||||
return err
|
||||
}
|
||||
|
||||
sysName := p.BktInfo.NotificationConfigurationObjectName()
|
||||
|
||||
s := &PutSystemObjectParams{
|
||||
BktInfo: p.BktInfo,
|
||||
ObjName: p.BktInfo.NotificationConfigurationObjectName(),
|
||||
ObjName: sysName,
|
||||
Metadata: map[string]string{},
|
||||
Reader: bytes.NewReader(confXML),
|
||||
Size: int64(len(confXML)),
|
||||
}
|
||||
|
||||
_, err = n.putSystemObjectIntoNeoFS(ctx, s)
|
||||
obj, err := n.putSystemObjectIntoNeoFS(ctx, s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = n.systemCache.PutNotificationConfiguration(systemObjectKey(p.BktInfo, s.ObjName), p.Configuration); err != nil {
|
||||
if err = n.treeService.PutNotificationConfigurationNode(ctx, &p.BktInfo.CID, &obj.ID); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 0; i < len(ids); i++ {
|
||||
if err = n.objectDelete(ctx, p.BktInfo, *ids[i]); err != nil {
|
||||
n.log.Error("couldn't delete notification configuration object", zap.Error(err),
|
||||
zap.String("cnrID", p.BktInfo.CID.EncodeToString()),
|
||||
zap.String("bucket name", p.BktInfo.Name),
|
||||
zap.String("objID", ids[i].EncodeToString()))
|
||||
}
|
||||
if err = n.treeService.DeleteNotificationConfigurationNode(ctx, &p.BktInfo.CID, nodeIds[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err = n.systemCache.PutNotificationConfiguration(systemObjectKey(p.BktInfo, sysName), p.Configuration); err != nil {
|
||||
n.log.Error("couldn't cache system object", zap.Error(err))
|
||||
}
|
||||
|
||||
|
@ -45,38 +68,33 @@ func (n *layer) PutBucketNotificationConfiguration(ctx context.Context, p *PutBu
|
|||
}
|
||||
|
||||
func (n *layer) GetBucketNotificationConfiguration(ctx context.Context, bktInfo *data.BucketInfo) (*data.NotificationConfiguration, error) {
|
||||
conf, err := n.getNotificationConf(ctx, bktInfo, bktInfo.NotificationConfigurationObjectName())
|
||||
if err != nil {
|
||||
if errors.IsS3Error(err, errors.ErrNoSuchKey) {
|
||||
return &data.NotificationConfiguration{}, nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
systemCacheKey := systemObjectKey(bktInfo, bktInfo.NotificationConfigurationObjectName())
|
||||
|
||||
if conf := n.systemCache.GetNotificationConfiguration(systemCacheKey); conf != nil {
|
||||
return conf, nil
|
||||
}
|
||||
|
||||
func (n *layer) getNotificationConf(ctx context.Context, bkt *data.BucketInfo, sysName string) (*data.NotificationConfiguration, error) {
|
||||
if conf := n.systemCache.GetNotificationConfiguration(systemObjectKey(bkt, sysName)); conf != nil {
|
||||
return conf, nil
|
||||
}
|
||||
|
||||
obj, err := n.getSystemObjectFromNeoFS(ctx, bkt, sysName)
|
||||
if err != nil {
|
||||
ids, _, err := n.treeService.GetNotificationConfigurationNodes(ctx, &bktInfo.CID, true)
|
||||
if err != nil && !errorsStd.Is(err, ErrNodeNotFound) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
conf := &data.NotificationConfiguration{}
|
||||
|
||||
if len(ids) != 0 {
|
||||
obj, err := n.objectGet(ctx, bktInfo, *ids[0])
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = xml.Unmarshal(obj.Payload(), &conf); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal notify configuration: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err = n.systemCache.PutNotificationConfiguration(systemObjectKey(bkt, sysName), conf); err != nil {
|
||||
objID, _ := obj.ID()
|
||||
if err = n.systemCache.PutNotificationConfiguration(systemCacheKey, conf); err != nil {
|
||||
n.log.Warn("couldn't put system meta to objects cache",
|
||||
zap.Stringer("object id", &objID),
|
||||
zap.Stringer("bucket id", bkt.CID),
|
||||
zap.Stringer("bucket id", bktInfo.CID),
|
||||
zap.Error(err))
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
|
||||
"github.com/nspcc-dev/neofs-s3-gw/api/data"
|
||||
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
|
||||
oid "github.com/nspcc-dev/neofs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
// TreeService provide interface to interact with tree service using s3 data models.
|
||||
|
@ -17,6 +18,10 @@ type TreeService interface {
|
|||
//
|
||||
// If node is not found returns ErrNodeNotFound error.
|
||||
GetSettingsNode(context.Context, *cid.ID, string) (*data.BucketSettings, error)
|
||||
|
||||
GetNotificationConfigurationNodes(ctx context.Context, cnrID *cid.ID, latestOnly bool) ([]*oid.ID, []uint64, error)
|
||||
PutNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID, objID *oid.ID) error
|
||||
DeleteNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID, nodeID uint64) error
|
||||
}
|
||||
|
||||
// ErrNodeNotFound is returned from Tree service in case of not found error.
|
||||
|
|
|
@ -38,6 +38,9 @@ const (
|
|||
systemNameKV = "SystemName"
|
||||
|
||||
settingsFileName = "bucket-settings"
|
||||
notifConfFileName = "bucket-notifications"
|
||||
|
||||
notifTreeID = "notifications"
|
||||
)
|
||||
|
||||
// NewTreeClient creates instance of TreeClient using provided address and create grpc connection.
|
||||
|
@ -126,6 +129,36 @@ func (c *TreeClient) PutSettingsNode(ctx context.Context, cnrID *cid.ID, treeID
|
|||
return c.moveNode(ctx, cnrID, treeID, node.ID, 0, meta)
|
||||
}
|
||||
|
||||
func (c *TreeClient) GetNotificationConfigurationNodes(ctx context.Context, cnrID *cid.ID, latestOnly bool) ([]*oid.ID, []uint64, error) {
|
||||
nodes, err := c.getSystemNodesWithOID(ctx, cnrID, notifTreeID, notifConfFileName, []string{}, latestOnly)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
ids := make([]*oid.ID, 0, len(nodes))
|
||||
nodeIds := make([]uint64, 0, len(nodes))
|
||||
|
||||
for _, n := range nodes {
|
||||
ids = append(ids, n.ObjID)
|
||||
nodeIds = append(nodeIds, n.ID)
|
||||
}
|
||||
|
||||
return ids, nodeIds, nil
|
||||
}
|
||||
|
||||
func (c *TreeClient) PutNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID, objID *oid.ID) error {
|
||||
meta := make(map[string]string)
|
||||
meta[systemNameKV] = notifConfFileName
|
||||
meta[oidKv] = objID.EncodeToString()
|
||||
|
||||
_, err := c.addNode(ctx, cnrID, notifTreeID, 0, meta)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *TreeClient) DeleteNotificationConfigurationNode(ctx context.Context, cnrID *cid.ID, nodeID uint64) error {
|
||||
return c.removeNode(ctx, cnrID, notifTreeID, nodeID)
|
||||
}
|
||||
|
||||
func (c *TreeClient) Close() error {
|
||||
if c.conn != nil {
|
||||
return c.conn.Close()
|
||||
|
@ -239,6 +272,18 @@ func (c *TreeClient) moveNode(ctx context.Context, cnrID *cid.ID, treeID string,
|
|||
return err
|
||||
}
|
||||
|
||||
func (c *TreeClient) removeNode(ctx context.Context, cnrID *cid.ID, treeID string, nodeID uint64) error {
|
||||
r := &tree.RemoveRequest{
|
||||
Body: &tree.RemoveRequest_Body{
|
||||
ContainerId: []byte(cnrID.EncodeToString()),
|
||||
TreeId: treeID,
|
||||
NodeId: nodeID,
|
||||
},
|
||||
}
|
||||
_, err := c.service.Remove(ctx, r)
|
||||
return err
|
||||
}
|
||||
|
||||
func metaToKV(meta map[string]string) []*tree.KeyValue {
|
||||
result := make([]*tree.KeyValue, 0, len(meta))
|
||||
|
||||
|
|
Loading…
Reference in a new issue