forked from TrueCloudLab/frostfs-node
[#1105] apemanager: Implement apemanager service
* Introduce grpc server for apemanager service and its implementation in `pkg/services/apemanager`. Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
This commit is contained in:
parent
51ade979e8
commit
542d3adcb2
8 changed files with 434 additions and 0 deletions
29
cmd/frostfs-node/apemanager.go
Normal file
29
cmd/frostfs-node/apemanager.go
Normal file
|
@ -0,0 +1,29 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
apemanager_grpc "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/apemanager/grpc"
|
||||
ape_contract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/contract_storage"
|
||||
morph "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/morph/client"
|
||||
apemanager_transport "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network/transport/apemanager/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/apemanager"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func initAPEManagerService(c *cfg) {
|
||||
contractStorage := ape_contract.NewProxyVerificationContractStorage(
|
||||
morph.NewSwitchRPCGuardedActor(c.cfgMorph.client),
|
||||
c.shared.key,
|
||||
c.cfgMorph.proxyScriptHash,
|
||||
c.cfgObject.cfgAccessPolicyEngine.policyContractHash)
|
||||
|
||||
execsvc := apemanager.New(c.cfgObject.cnrSource, contractStorage,
|
||||
apemanager.WithLogger(c.log))
|
||||
sigsvc := apemanager.NewSignService(&c.key.PrivateKey, execsvc)
|
||||
server := apemanager_transport.New(sigsvc)
|
||||
|
||||
c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) {
|
||||
apemanager_grpc.RegisterAPEManagerServiceServer(s, server)
|
||||
})
|
||||
}
|
|
@ -114,6 +114,7 @@ func initApp(ctx context.Context, c *cfg) {
|
|||
initAndLog(c, "notification", func(c *cfg) { initNotifications(ctx, c) })
|
||||
initAndLog(c, "object", initObjectService)
|
||||
initAndLog(c, "tree", initTreeService)
|
||||
initAndLog(c, "apemanager", initAPEManagerService)
|
||||
initAndLog(c, "control", initControlService)
|
||||
|
||||
initAndLog(c, "morph notifications", func(c *cfg) { listenMorphNotifications(ctx, c) })
|
||||
|
|
63
pkg/network/transport/apemanager/grpc/service.go
Normal file
63
pkg/network/transport/apemanager/grpc/service.go
Normal file
|
@ -0,0 +1,63 @@
|
|||
package apemanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
apemanager_v2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/apemanager"
|
||||
apemanager_grpc "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/apemanager/grpc"
|
||||
apemanager_svc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/apemanager"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
srv apemanager_svc.Server
|
||||
}
|
||||
|
||||
var _ apemanager_grpc.APEManagerServiceServer = (*Server)(nil)
|
||||
|
||||
func New(c apemanager_svc.Server) *Server {
|
||||
return &Server{
|
||||
srv: c,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) AddChain(ctx context.Context, req *apemanager_grpc.AddChainRequest) (*apemanager_grpc.AddChainResponse, error) {
|
||||
v2req := new(apemanager_v2.AddChainRequest)
|
||||
if err := v2req.FromGRPCMessage(req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := s.srv.AddChain(ctx, v2req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.ToGRPCMessage().(*apemanager_grpc.AddChainResponse), nil
|
||||
}
|
||||
|
||||
func (s *Server) RemoveChain(ctx context.Context, req *apemanager_grpc.RemoveChainRequest) (*apemanager_grpc.RemoveChainResponse, error) {
|
||||
v2req := new(apemanager_v2.RemoveChainRequest)
|
||||
if err := v2req.FromGRPCMessage(req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := s.srv.RemoveChain(ctx, v2req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.ToGRPCMessage().(*apemanager_grpc.RemoveChainResponse), nil
|
||||
}
|
||||
|
||||
func (s *Server) ListChains(ctx context.Context, req *apemanager_grpc.ListChainsRequest) (*apemanager_grpc.ListChainsResponse, error) {
|
||||
v2req := new(apemanager_v2.ListChainsRequest)
|
||||
if err := v2req.FromGRPCMessage(req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := s.srv.ListChains(ctx, v2req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp.ToGRPCMessage().(*apemanager_grpc.ListChainsResponse), nil
|
||||
}
|
11
pkg/services/apemanager/errors/errors.go
Normal file
11
pkg/services/apemanager/errors/errors.go
Normal file
|
@ -0,0 +1,11 @@
|
|||
package errors
|
||||
|
||||
import (
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
)
|
||||
|
||||
func ErrAPEManagerAccessDenied(reason string) error {
|
||||
err := new(apistatus.APEManagerAccessDenied)
|
||||
err.WriteReason(reason)
|
||||
return err
|
||||
}
|
245
pkg/services/apemanager/executor.go
Normal file
245
pkg/services/apemanager/executor.go
Normal file
|
@ -0,0 +1,245 @@
|
|||
package apemanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"crypto/elliptic"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
apemanager_v2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/apemanager"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
session "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||
ape_contract "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/ape/contract_storage"
|
||||
containercore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||
apemanager_errors "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/apemanager/errors"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
cidSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||||
apechain "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||
policy_engine "git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
|
||||
"github.com/mr-tron/base58/base58"
|
||||
"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var (
|
||||
errEmptyBodySignature = errors.New("malformed request: empty body signature")
|
||||
)
|
||||
|
||||
type cfg struct {
|
||||
log *logger.Logger
|
||||
}
|
||||
|
||||
type Service struct {
|
||||
cfg
|
||||
|
||||
cnrSrc containercore.Source
|
||||
|
||||
contractStorage ape_contract.ProxyAdaptedContractStorage
|
||||
}
|
||||
|
||||
type Option func(*cfg)
|
||||
|
||||
func New(cnrSrc containercore.Source, contractStorage ape_contract.ProxyAdaptedContractStorage, opts ...Option) *Service {
|
||||
s := &Service{
|
||||
cnrSrc: cnrSrc,
|
||||
|
||||
contractStorage: contractStorage,
|
||||
}
|
||||
|
||||
for i := range opts {
|
||||
opts[i](&s.cfg)
|
||||
}
|
||||
|
||||
if s.log == nil {
|
||||
s.log = &logger.Logger{Logger: zap.NewNop()}
|
||||
}
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func WithLogger(log *logger.Logger) Option {
|
||||
return func(c *cfg) {
|
||||
c.log = log
|
||||
}
|
||||
}
|
||||
|
||||
var _ Server = (*Service)(nil)
|
||||
|
||||
// validateContainerTargetRequest validates request for the container target.
|
||||
// It checks if request actor is the owner of the container, otherwise it denies the request.
|
||||
func (s *Service) validateContainerTargetRequest(cid string, pubKey *keys.PublicKey) error {
|
||||
var cidSDK cidSDK.ID
|
||||
if err := cidSDK.DecodeString(cid); err != nil {
|
||||
return fmt.Errorf("invalid CID format: %w", err)
|
||||
}
|
||||
isOwner, err := s.isActorContainerOwner(cidSDK, pubKey)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to check owner: %w", err)
|
||||
}
|
||||
if !isOwner {
|
||||
return apemanager_errors.ErrAPEManagerAccessDenied("actor must be container owner")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Service) AddChain(_ context.Context, req *apemanager_v2.AddChainRequest) (*apemanager_v2.AddChainResponse, error) {
|
||||
pub, err := getSignaturePublicKey(req.GetVerificationHeader())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
chain, err := decodeAndValidateChain(req.GetBody().GetChain().GetKind().(*apemanager_v2.ChainRaw).GetRaw())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(chain.ID) == 0 {
|
||||
const randomIDLength = 10
|
||||
randID, err := base58Str(randomIDLength)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("randomize chain ID error: %w", err)
|
||||
}
|
||||
chain.ID = apechain.ID(randID)
|
||||
}
|
||||
|
||||
var target policy_engine.Target
|
||||
|
||||
switch targetType := req.GetBody().GetTarget().GetTargetType(); targetType {
|
||||
case apemanager_v2.TargetTypeContainer:
|
||||
reqCID := req.GetBody().GetTarget().GetName()
|
||||
if err = s.validateContainerTargetRequest(reqCID, pub); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
target = policy_engine.ContainerTarget(reqCID)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported target type: %s", targetType)
|
||||
}
|
||||
|
||||
if _, _, err = s.contractStorage.AddMorphRuleChain(apechain.Ingress, target, &chain); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
body := new(apemanager_v2.AddChainResponseBody)
|
||||
body.SetChainID(chain.ID)
|
||||
|
||||
resp := new(apemanager_v2.AddChainResponse)
|
||||
resp.SetBody(body)
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *Service) RemoveChain(_ context.Context, req *apemanager_v2.RemoveChainRequest) (*apemanager_v2.RemoveChainResponse, error) {
|
||||
pub, err := getSignaturePublicKey(req.GetVerificationHeader())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var target policy_engine.Target
|
||||
|
||||
switch targetType := req.GetBody().GetTarget().GetTargetType(); targetType {
|
||||
case apemanager_v2.TargetTypeContainer:
|
||||
reqCID := req.GetBody().GetTarget().GetName()
|
||||
if err = s.validateContainerTargetRequest(reqCID, pub); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
target = policy_engine.ContainerTarget(reqCID)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported target type: %s", targetType)
|
||||
}
|
||||
|
||||
if _, _, err = s.contractStorage.RemoveMorphRuleChain(apechain.Ingress, target, req.GetBody().GetChainID()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
body := new(apemanager_v2.RemoveChainResponseBody)
|
||||
|
||||
resp := new(apemanager_v2.RemoveChainResponse)
|
||||
resp.SetBody(body)
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (s *Service) ListChains(_ context.Context, req *apemanager_v2.ListChainsRequest) (*apemanager_v2.ListChainsResponse, error) {
|
||||
pub, err := getSignaturePublicKey(req.GetVerificationHeader())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var target policy_engine.Target
|
||||
|
||||
switch targetType := req.GetBody().GetTarget().GetTargetType(); targetType {
|
||||
case apemanager_v2.TargetTypeContainer:
|
||||
reqCID := req.GetBody().GetTarget().GetName()
|
||||
if err = s.validateContainerTargetRequest(reqCID, pub); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
target = policy_engine.ContainerTarget(reqCID)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported target type: %s", targetType)
|
||||
}
|
||||
|
||||
chs, err := s.contractStorage.ListMorphRuleChains(apechain.Ingress, target)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res := make([]*apemanager_v2.Chain, 0, len(chs))
|
||||
for _, ch := range chs {
|
||||
v2chraw := new(apemanager_v2.ChainRaw)
|
||||
v2chraw.SetRaw(ch.Bytes())
|
||||
|
||||
v2ch := new(apemanager_v2.Chain)
|
||||
v2ch.SetKind(v2chraw)
|
||||
|
||||
res = append(res, v2ch)
|
||||
}
|
||||
|
||||
body := new(apemanager_v2.ListChainsResponseBody)
|
||||
body.SetChains(res)
|
||||
|
||||
resp := new(apemanager_v2.ListChainsResponse)
|
||||
resp.SetBody(body)
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func getSignaturePublicKey(vh *session.RequestVerificationHeader) (*keys.PublicKey, error) {
|
||||
for vh.GetOrigin() != nil {
|
||||
vh = vh.GetOrigin()
|
||||
}
|
||||
sig := vh.GetBodySignature()
|
||||
if sig == nil {
|
||||
return nil, errEmptyBodySignature
|
||||
}
|
||||
key, err := keys.NewPublicKeyFromBytes(sig.GetKey(), elliptic.P256())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid signature key: %w", err)
|
||||
}
|
||||
|
||||
return key, nil
|
||||
}
|
||||
|
||||
func (s *Service) isActorContainerOwner(cid cidSDK.ID, pk *keys.PublicKey) (bool, error) {
|
||||
var actor user.ID
|
||||
user.IDFromKey(&actor, (ecdsa.PublicKey)(*pk))
|
||||
actorOwnerID := new(refs.OwnerID)
|
||||
actor.WriteToV2(actorOwnerID)
|
||||
|
||||
cnr, err := s.cnrSrc.Get(cid)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("get container error: %w", err)
|
||||
}
|
||||
return cnr.Value.Owner().Equals(actor), nil
|
||||
}
|
||||
|
||||
// base58Str generates base58 string.
|
||||
func base58Str(n int) (string, error) {
|
||||
b := make([]byte, n)
|
||||
_, err := rand.Read(b)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return base58.FastBase58Encoding(b), nil
|
||||
}
|
13
pkg/services/apemanager/server.go
Normal file
13
pkg/services/apemanager/server.go
Normal file
|
@ -0,0 +1,13 @@
|
|||
package apemanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
apemanager_v2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/apemanager"
|
||||
)
|
||||
|
||||
type Server interface {
|
||||
AddChain(context.Context, *apemanager_v2.AddChainRequest) (*apemanager_v2.AddChainResponse, error)
|
||||
RemoveChain(context.Context, *apemanager_v2.RemoveChainRequest) (*apemanager_v2.RemoveChainResponse, error)
|
||||
ListChains(context.Context, *apemanager_v2.ListChainsRequest) (*apemanager_v2.ListChainsResponse, error)
|
||||
}
|
49
pkg/services/apemanager/sign.go
Normal file
49
pkg/services/apemanager/sign.go
Normal file
|
@ -0,0 +1,49 @@
|
|||
package apemanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
|
||||
apemanager_v2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/apemanager"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||
)
|
||||
|
||||
type signService struct {
|
||||
sigSvc *util.SignService
|
||||
|
||||
next Server
|
||||
}
|
||||
|
||||
func NewSignService(key *ecdsa.PrivateKey, next Server) Server {
|
||||
return &signService{
|
||||
sigSvc: util.NewUnarySignService(key),
|
||||
next: next,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *signService) AddChain(ctx context.Context, req *apemanager_v2.AddChainRequest) (*apemanager_v2.AddChainResponse, error) {
|
||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
||||
resp := new(apemanager_v2.AddChainResponse)
|
||||
return resp, s.sigSvc.SignResponse(resp, err)
|
||||
}
|
||||
resp, err := util.EnsureNonNilResponse(s.next.AddChain(ctx, req))
|
||||
return resp, s.sigSvc.SignResponse(resp, err)
|
||||
}
|
||||
|
||||
func (s *signService) RemoveChain(ctx context.Context, req *apemanager_v2.RemoveChainRequest) (*apemanager_v2.RemoveChainResponse, error) {
|
||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
||||
resp := new(apemanager_v2.RemoveChainResponse)
|
||||
return resp, s.sigSvc.SignResponse(resp, err)
|
||||
}
|
||||
resp, err := util.EnsureNonNilResponse(s.next.RemoveChain(ctx, req))
|
||||
return resp, s.sigSvc.SignResponse(resp, err)
|
||||
}
|
||||
|
||||
func (s *signService) ListChains(ctx context.Context, req *apemanager_v2.ListChainsRequest) (*apemanager_v2.ListChainsResponse, error) {
|
||||
if err := s.sigSvc.VerifyRequest(req); err != nil {
|
||||
resp := new(apemanager_v2.ListChainsResponse)
|
||||
return resp, s.sigSvc.SignResponse(resp, err)
|
||||
}
|
||||
resp, err := util.EnsureNonNilResponse(s.next.ListChains(ctx, req))
|
||||
return resp, s.sigSvc.SignResponse(resp, err)
|
||||
}
|
23
pkg/services/apemanager/validation.go
Normal file
23
pkg/services/apemanager/validation.go
Normal file
|
@ -0,0 +1,23 @@
|
|||
package apemanager
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control/server/ape"
|
||||
apechain "git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
|
||||
)
|
||||
|
||||
func decodeAndValidateChain(encodedChain []byte) (chain apechain.Chain, err error) {
|
||||
if err = chain.DecodeBytes(encodedChain); err != nil {
|
||||
return
|
||||
}
|
||||
for _, rule := range chain.Rules {
|
||||
for _, name := range rule.Resources.Names {
|
||||
if err = ape.ValidateResourceName(name); err != nil {
|
||||
err = fmt.Errorf("invalid resource: %w", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
Loading…
Reference in a new issue