frostfs-s3-gw/pkg/service/control/server/server.go
Denis Kirillov 42862fd69e
All checks were successful
/ Vulncheck (pull_request) Successful in 3m20s
/ DCO (pull_request) Successful in 3m56s
/ Builds (1.20) (pull_request) Successful in 4m13s
/ Builds (1.21) (pull_request) Successful in 4m9s
/ Lint (pull_request) Successful in 3m9s
/ Tests (1.20) (pull_request) Successful in 4m14s
/ Tests (1.21) (pull_request) Successful in 3m59s
[#258] Support policy management in control svc
Add PutPolicies, RemovePolicies, GetPolicy, ListPolicies methods

Signed-off-by: Denis Kirillov <d.kirillov@yadro.com>
2023-12-01 15:47:12 +03:00

316 lines
8.8 KiB
Go

package server
import (
"bytes"
"context"
"crypto/ecdsa"
"encoding/hex"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-s3-gw/pkg/service/control"
frostfscrypto "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto"
frostfsecdsa "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/crypto/ecdsa"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/chain"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine"
"git.frostfs.info/TrueCloudLab/policy-engine/pkg/engine/inmemory"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
type Server struct {
*cfg
}
type AuthorizedKeysFetcher interface {
FetchRawKeys() [][]byte
}
type emptyKeysFetcher struct{}
func (f emptyKeysFetcher) FetchRawKeys() [][]byte { return nil }
// Option of the Server's constructor.
type Option func(*cfg)
type cfg struct {
log *zap.Logger
keysFetcher AuthorizedKeysFetcher
chainStorage engine.LocalOverrideEngine
}
func defaultCfg() *cfg {
return &cfg{
log: zap.NewNop(),
keysFetcher: emptyKeysFetcher{},
chainStorage: inmemory.NewInMemoryLocalOverrides(),
}
}
// New creates, initializes and returns new Server instance.
func New(opts ...Option) *Server {
c := defaultCfg()
for _, opt := range opts {
opt(c)
}
c.log = c.log.With(zap.String("service", "control API"))
return &Server{
cfg: c,
}
}
// WithAuthorizedKeysFetcher returns option to add list of public
// keys that have rights to use Control service.
func WithAuthorizedKeysFetcher(fetcher AuthorizedKeysFetcher) Option {
return func(c *cfg) {
c.keysFetcher = fetcher
}
}
// WithLogger returns option to set logger.
func WithLogger(log *zap.Logger) Option {
return func(c *cfg) {
c.log = log
}
}
// WithChainStorage returns option to set logger.
func WithChainStorage(chainStorage engine.LocalOverrideEngine) Option {
return func(c *cfg) {
c.chainStorage = chainStorage
}
}
// HealthCheck returns health status of the local node.
//
// If request is unsigned or signed by disallowed key, permission error returns.
func (s *Server) HealthCheck(_ context.Context, req *control.HealthCheckRequest) (*control.HealthCheckResponse, error) {
s.log.Info(logs.ControlAPIHealthcheck, zap.String("key", hex.EncodeToString(req.Signature.Key)))
// verify request
if err := s.isValidRequest(req); err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
resp := &control.HealthCheckResponse{
Body: &control.HealthCheckResponse_Body{
HealthStatus: control.HealthStatus_READY,
},
}
return resp, nil
}
// PutPolicies replaces existing policies.
//
// If request is unsigned or signed by disallowed key, permission error returns.
func (s *Server) PutPolicies(_ context.Context, req *control.PutPoliciesRequest) (*control.PutPoliciesResponse, error) {
s.log.Info(logs.ControlAPIPutPolicies, zap.String("key", hex.EncodeToString(req.Signature.Key)))
// verify request
if err := s.isValidRequest(req); err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
for _, data := range req.GetBody().GetChainDatas() {
if err := s.putPolicy(data); err != nil {
return nil, err
}
}
return &control.PutPoliciesResponse{}, nil
}
func (s *Server) putPolicy(data *control.PutPoliciesRequest_ChainData) error {
var overrideChain chain.Chain
if err := overrideChain.DecodeBytes(data.GetChain()); err != nil {
return status.Error(codes.InvalidArgument, fmt.Sprintf("failed to parse body: %s", err.Error()))
}
if overrideChain.ID == "" {
return status.Error(codes.InvalidArgument, "missing chain id")
}
err := s.chainStorage.LocalStorage().RemoveOverride(chain.Ingress, data.GetNamespace(), overrideChain.ID)
if err != nil && !isNotFoundError(err) {
return status.Error(codes.Internal, err.Error())
}
if _, err = s.chainStorage.LocalStorage().AddOverride(chain.Ingress, data.GetNamespace(), &overrideChain); err != nil {
return status.Error(codes.Internal, err.Error())
}
return nil
}
// RemovePolicies removes existing policies.
//
// If request is unsigned or signed by disallowed key, permission error returns.
func (s *Server) RemovePolicies(_ context.Context, req *control.RemovePoliciesRequest) (*control.RemovePoliciesResponse, error) {
s.log.Info(logs.ControlAPIRemovePolicies, zap.String("key", hex.EncodeToString(req.Signature.Key)))
// verify request
if err := s.isValidRequest(req); err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
for _, info := range req.GetBody().GetChainInfos() {
if err := s.removePolicy(info); err != nil {
return nil, err
}
}
return &control.RemovePoliciesResponse{}, nil
}
func (s *Server) removePolicy(info *control.RemovePoliciesRequest_ChainInfo) error {
err := s.chainStorage.LocalStorage().RemoveOverride(chain.Ingress, info.GetNamespace(), chain.ID(info.GetChainID()))
if err != nil {
if isNotFoundError(err) {
return status.Error(codes.NotFound, err.Error())
}
return status.Error(codes.InvalidArgument, err.Error())
}
return nil
}
// GetPolicy returns existing policy.
//
// If request is unsigned or signed by disallowed key, permission error returns.
func (s *Server) GetPolicy(_ context.Context, req *control.GetPolicyRequest) (*control.GetPolicyResponse, error) {
s.log.Info(logs.ControlAPIGetPolicy, zap.String("namespace", req.GetBody().GetNamespace()),
zap.String("chainId", req.GetBody().GetChainID()), zap.String("key", hex.EncodeToString(req.Signature.Key)))
// verify request
if err := s.isValidRequest(req); err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
overrideChain, err := s.chainStorage.LocalStorage().GetOverride(chain.Ingress, req.GetBody().GetNamespace(), chain.ID(req.GetBody().GetChainID()))
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
return &control.GetPolicyResponse{Body: &control.GetPolicyResponse_Body{Chain: overrideChain.Bytes()}}, nil
}
// ListPolicies lists existing policies.
//
// If request is unsigned or signed by disallowed key, permission error returns.
func (s *Server) ListPolicies(_ context.Context, req *control.ListPoliciesRequest) (*control.ListPoliciesResponse, error) {
s.log.Info(logs.ControlAPIListPolicies, zap.String("namespace", req.GetBody().GetNamespace()),
zap.String("key", hex.EncodeToString(req.Signature.Key)))
// verify request
if err := s.isValidRequest(req); err != nil {
return nil, status.Error(codes.PermissionDenied, err.Error())
}
chains, err := s.chainStorage.LocalStorage().ListOverrides(chain.Ingress, req.GetBody().GetNamespace())
if err != nil {
return nil, status.Error(codes.InvalidArgument, err.Error())
}
res := make([]string, len(chains))
for i := range chains {
res[i] = string(chains[i].ID)
}
return &control.ListPoliciesResponse{Body: &control.ListPoliciesResponse_Body{ChainIDs: res}}, nil
}
// SignedMessage is an interface of Control service message.
type SignedMessage interface {
ReadSignedData([]byte) ([]byte, error)
GetSignature() *control.Signature
SetSignature(*control.Signature)
}
var errDisallowedKey = errors.New("key is not in the allowed list")
var errMissingSignature = errors.New("missing signature")
var errInvalidSignature = errors.New("invalid signature")
func (s *Server) isValidRequest(req SignedMessage) error {
sign := req.GetSignature()
if sign == nil {
return errMissingSignature
}
var (
key = sign.GetKey()
allowed = false
)
// check if key is allowed
for _, authKey := range s.keysFetcher.FetchRawKeys() {
if allowed = bytes.Equal(authKey, key); allowed {
break
}
}
if !allowed {
return errDisallowedKey
}
// verify signature
binBody, err := req.ReadSignedData(nil)
if err != nil {
return fmt.Errorf("marshal request body: %w", err)
}
var sigV2 refs.Signature
sigV2.SetKey(sign.GetKey())
sigV2.SetSign(sign.GetSign())
sigV2.SetScheme(refs.ECDSA_SHA512)
var sig frostfscrypto.Signature
if err := sig.ReadFromV2(sigV2); err != nil {
return fmt.Errorf("can't read signature: %w", err)
}
if !sig.Verify(binBody) {
return errInvalidSignature
}
return nil
}
// SignMessage signs Control service message with private key.
func SignMessage(key *ecdsa.PrivateKey, msg SignedMessage) error {
binBody, err := msg.ReadSignedData(nil)
if err != nil {
return fmt.Errorf("marshal request body: %w", err)
}
var sig frostfscrypto.Signature
err = sig.Calculate(frostfsecdsa.Signer(*key), binBody)
if err != nil {
return fmt.Errorf("calculate signature: %w", err)
}
var sigV2 refs.Signature
sig.WriteToV2(&sigV2)
var sigControl control.Signature
sigControl.Key = sigV2.GetKey()
sigControl.Sign = sigV2.GetSign()
msg.SetSignature(&sigControl)
return nil
}
func isNotFoundError(err error) bool {
return errors.Is(err, engine.ErrChainNameNotFound) ||
errors.Is(err, engine.ErrChainNotFound) ||
errors.Is(err, engine.ErrResourceNotFound)
}