frostfs-s3-gw/pkg/service/control/server/server.go
Alex Vanin 13d00dd7ce [#362] Expand control service
Signed-off-by: Alex Vanin <a.vanin@yadro.com>
2024-04-12 16:52:08 +03:00

352 lines
9.6 KiB
Go

package server
import (
"bytes"
"context"
"crypto/ecdsa"
"encoding/hex"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/control"
frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto"
frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine/inmemory"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type Server struct {
*cfg
}
type Settings interface {
ResolveNamespaceAlias(ns string) string
FetchRawKeys() [][]byte
}
type defaultSettings struct{}
func (f defaultSettings) FetchRawKeys() [][]byte { return nil }
func (f defaultSettings) ResolveNamespaceAlias(ns string) string { return ns }
// Option of the Server's constructor.
type Option func(*cfg)
type cfg struct {
log *zap.Logger
settings Settings
chainStorage engine.LocalOverrideStorage
}
func defaultCfg() *cfg {
return &cfg{
log: zap.NewNop(),
settings: defaultSettings{},
chainStorage: inmemory.NewInmemoryLocalStorage(),
}
}
// New creates, initializes and returns new Server instance.
func New(opts ...Option) *Server {
c := defaultCfg()
for _, opt := range opts {
opt(c)
}
c.log = c.log.With(zap.String("service", "control API"))
return &Server{
cfg: c,
}
}
// WithSettings returns option to add settings to use Control service.
func WithSettings(settings Settings) Option {
return func(c *cfg) {
c.settings = settings
}
}
// WithLogger returns option to set logger.
func WithLogger(log *zap.Logger) Option {
return func(c *cfg) {
c.log = log
}
}
// WithChainStorage returns option to set logger.
func WithChainStorage(chainStorage engine.LocalOverrideStorage) Option {
return func(c *cfg) {
c.chainStorage = chainStorage
}
}
// HealthCheck returns health status of the local node.
//
// If request is unsigned or signed by disallowed key, permission error returns.
func (s *Server) HealthCheck(_ context.Context, req *control.HealthCheckRequest) (*control.HealthCheckResponse, error) {
s.log.Info(logs.ControlAPIHealthcheck, zap.String("key", hex.EncodeToString(req.Signature.Key)))
// verify request
if err := s.isValidRequest(req); err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
resp := &control.HealthCheckResponse{
Body: &control.HealthCheckResponse_Body{
HealthStatus: control.HealthStatus_READY,
},
}
return resp, nil
}
// PutPolicies replaces existing policies.
//
// If request is unsigned or signed by disallowed key, permission error returns.
func (s *Server) PutPolicies(_ context.Context, req *control.PutPoliciesRequest) (*control.PutPoliciesResponse, error) {
s.log.Info(logs.ControlAPIPutPolicies, zap.String("key", hex.EncodeToString(req.Signature.Key)))
// verify request
if err := s.isValidRequest(req); err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
for _, data := range req.GetBody().GetChainDatas() {
if err := s.putPolicy(data); err != nil {
return nil, err
}
}
return &control.PutPoliciesResponse{}, nil
}
func (s *Server) putPolicy(data *control.PutPoliciesRequest_ChainData) error {
var overrideChain chain.Chain
if err := overrideChain.DecodeBytes(data.GetChain()); err != nil {
return status.Error(codes.InvalidArgument, fmt.Sprintf("failed to parse body: %s", err.Error()))
}
if len(overrideChain.ID) == 0 {
return status.Error(codes.InvalidArgument, "missing chain id")
}
t, err := s.parseTarget(data.Target, data.Name)
if err != nil {
return err
}
if _, err := s.chainStorage.AddOverride(chain.S3, t, &overrideChain); err != nil {
return status.Error(codes.Internal, err.Error())
}
return nil
}
func (s *Server) parseTarget(target control.PolicyTarget, name string) (engine.Target, error) {
var (
t engine.Target
err error
)
switch target {
case control.PolicyTarget_NAMESPACE:
ns := s.settings.ResolveNamespaceAlias(name)
t = engine.NamespaceTarget(ns)
case control.PolicyTarget_CONTAINER:
t = engine.ContainerTarget(name)
case control.PolicyTarget_USER:
t = engine.UserTarget(name)
case control.PolicyTarget_GROUP:
t = engine.GroupTarget(name)
default:
err = status.Error(codes.InvalidArgument, fmt.Sprintf("invalid target: %s", target.String()))
}
return t, err
}
// RemovePolicies removes existing policies.
//
// If request is unsigned or signed by disallowed key, permission error returns.
func (s *Server) RemovePolicies(_ context.Context, req *control.RemovePoliciesRequest) (*control.RemovePoliciesResponse, error) {
s.log.Info(logs.ControlAPIRemovePolicies, zap.String("key", hex.EncodeToString(req.Signature.Key)))
// verify request
if err := s.isValidRequest(req); err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
for _, info := range req.GetBody().GetChainInfos() {
if err := s.removePolicy(info); err != nil {
return nil, err
}
}
return &control.RemovePoliciesResponse{}, nil
}
func (s *Server) removePolicy(info *control.RemovePoliciesRequest_ChainInfo) error {
t, err := s.parseTarget(info.Target, info.Name)
if err != nil {
return err
}
err = s.chainStorage.RemoveOverride(chain.S3, t, chain.ID(info.GetChainID()))
if err != nil {
if isNotFoundError(err) {
return status.Error(codes.NotFound, err.Error())
}
return status.Error(codes.InvalidArgument, err.Error())
}
return nil
}
// GetPolicy returns existing policy.
//
// If request is unsigned or signed by disallowed key, permission error returns.
func (s *Server) GetPolicy(_ context.Context, req *control.GetPolicyRequest) (*control.GetPolicyResponse, error) {
s.log.Info(logs.ControlAPIGetPolicy, zap.Stringer("target", req.GetBody().GetTarget()), zap.String("name", req.GetBody().GetName()),
zap.Binary("chainId", req.GetBody().GetChainID()), zap.String("key", hex.EncodeToString(req.Signature.Key)))
t, err := s.parseTarget(req.GetBody().GetTarget(), req.GetBody().GetName())
if err != nil {
return nil, err
}
// verify request
if err := s.isValidRequest(req); err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
overrideChain, err := s.chainStorage.GetOverride(chain.S3, t, chain.ID(req.GetBody().GetChainID()))
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
return &control.GetPolicyResponse{Body: &control.GetPolicyResponse_Body{Chain: overrideChain.Bytes()}}, nil
}
// ListPolicies lists existing policies.
//
// If request is unsigned or signed by disallowed key, permission error returns.
func (s *Server) ListPolicies(_ context.Context, req *control.ListPoliciesRequest) (*control.ListPoliciesResponse, error) {
s.log.Info(logs.ControlAPIListPolicies, zap.Stringer("target", req.GetBody().GetTarget()), zap.String("name", req.GetBody().GetName()),
zap.String("key", hex.EncodeToString(req.Signature.Key)))
t, err := s.parseTarget(req.GetBody().GetTarget(), req.GetBody().GetName())
if err != nil {
return nil, err
}
// verify request
if err := s.isValidRequest(req); err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
chains, err := s.chainStorage.ListOverrides(chain.S3, t)
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
res := make([][]byte, len(chains))
for i := range chains {
res[i] = []byte(chains[i].ID)
}
return &control.ListPoliciesResponse{Body: &control.ListPoliciesResponse_Body{ChainIDs: res}}, nil
}
// SignedMessage is an interface of Control service message.
type SignedMessage interface {
ReadSignedData([]byte) ([]byte, error)
GetSignature() *control.Signature
SetSignature(*control.Signature)
}
var errDisallowedKey = errors.New("key is not in the allowed list")
var errMissingSignature = errors.New("missing signature")
var errInvalidSignature = errors.New("invalid signature")
func (s *Server) isValidRequest(req SignedMessage) error {
sign := req.GetSignature()
if sign == nil {
return errMissingSignature
}
var (
key = sign.GetKey()
allowed = false
)
// check if key is allowed
for _, authKey := range s.settings.FetchRawKeys() {
if allowed = bytes.Equal(authKey, key); allowed {
break
}
}
if !allowed {
return errDisallowedKey
}
// verify signature
binBody, err := req.ReadSignedData(nil)
if err != nil {
return fmt.Errorf("marshal request body: %w", err)
}
var sigV2 refs.Signature
sigV2.SetKey(sign.GetKey())
sigV2.SetSign(sign.GetSign())
sigV2.SetScheme(refs.ECDSA_SHA512)
var sig frostfscrypto.Signature
if err := sig.ReadFromV2(sigV2); err != nil {
return fmt.Errorf("can't read signature: %w", err)
}
if !sig.Verify(binBody) {
return errInvalidSignature
}
return nil
}
// SignMessage signs Control service message with private key.
func SignMessage(key *ecdsa.PrivateKey, msg SignedMessage) error {
binBody, err := msg.ReadSignedData(nil)
if err != nil {
return fmt.Errorf("marshal request body: %w", err)
}
var sig frostfscrypto.Signature
err = sig.Calculate(frostfsecdsa.Signer(*key), binBody)
if err != nil {
return fmt.Errorf("calculate signature: %w", err)
}
var sigV2 refs.Signature
sig.WriteToV2(&sigV2)
var sigControl control.Signature
sigControl.Key = sigV2.GetKey()
sigControl.Sign = sigV2.GetSign()
msg.SetSignature(&sigControl)
return nil
}
func isNotFoundError(err error) bool {
return errors.Is(err, engine.ErrChainNameNotFound) ||
errors.Is(err, engine.ErrChainNotFound) ||
errors.Is(err, engine.ErrResourceNotFound)
}