From ecd1ed7a5e461c2b48e67949b4b2ef2a353e5307 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Tue, 18 Jun 2024 12:40:03 +0300 Subject: [PATCH] [#1184] node: Add audit middleware for grpc services Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/accounting.go | 15 +- cmd/frostfs-node/apemanager.go | 3 +- cmd/frostfs-node/container.go | 14 +- cmd/frostfs-node/control.go | 5 +- cmd/frostfs-node/netmap.go | 28 +-- cmd/frostfs-node/object.go | 5 +- cmd/frostfs-node/session.go | 5 +- cmd/frostfs-node/tree.go | 2 +- pkg/services/accounting/audit.go | 42 ++++ pkg/services/apemanager/audit.go | 75 +++++++ pkg/services/container/audit.go | 116 +++++++++++ pkg/services/control/server/audit.go | 287 +++++++++++++++++++++++++++ pkg/services/netmap/audit.go | 60 ++++++ pkg/services/object/audit.go | 172 ++++++++++++++++ pkg/services/session/audit.go | 39 ++++ pkg/services/tree/audit.go | 135 +++++++++++++ 16 files changed, 967 insertions(+), 36 deletions(-) create mode 100644 pkg/services/accounting/audit.go create mode 100644 pkg/services/apemanager/audit.go create mode 100644 pkg/services/container/audit.go create mode 100644 pkg/services/control/server/audit.go create mode 100644 pkg/services/netmap/audit.go create mode 100644 pkg/services/object/audit.go create mode 100644 pkg/services/session/audit.go create mode 100644 pkg/services/tree/audit.go diff --git a/cmd/frostfs-node/accounting.go b/cmd/frostfs-node/accounting.go index ec737f8a..f0e2abf3 100644 --- a/cmd/frostfs-node/accounting.go +++ b/cmd/frostfs-node/accounting.go @@ -20,15 +20,16 @@ func initAccountingService(ctx context.Context, c *cfg) { balanceMorphWrapper, err := balance.NewFromMorph(c.cfgMorph.client, c.cfgAccounting.scriptHash, 0) fatalOnErr(err) - server := accountingTransportGRPC.New( - accountingService.NewSignService( - &c.key.PrivateKey, - accountingService.NewExecutionService( - accounting.NewExecutor(balanceMorphWrapper), - c.respSvc, - ), + service := accountingService.NewSignService( + &c.key.PrivateKey, + accountingService.NewExecutionService( + accounting.NewExecutor(balanceMorphWrapper), + c.respSvc, ), ) + service = accountingService.NewAuditService(service, c.log, c.audit) + + server := accountingTransportGRPC.New(service) c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { accountingGRPC.RegisterAccountingServiceServer(s, server) diff --git a/cmd/frostfs-node/apemanager.go b/cmd/frostfs-node/apemanager.go index b9928faa..79c45c25 100644 --- a/cmd/frostfs-node/apemanager.go +++ b/cmd/frostfs-node/apemanager.go @@ -21,7 +21,8 @@ func initAPEManagerService(c *cfg) { execsvc := apemanager.New(c.cfgObject.cnrSource, contractStorage, apemanager.WithLogger(c.log)) sigsvc := apemanager.NewSignService(&c.key.PrivateKey, execsvc) - server := apemanager_transport.New(sigsvc) + auditSvc := apemanager.NewAuditService(sigsvc, c.log, c.audit) + server := apemanager_transport.New(auditSvc) c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { apemanager_grpc.RegisterAPEManagerServiceServer(s, server) diff --git a/cmd/frostfs-node/container.go b/cmd/frostfs-node/container.go index 61600376..b14e1916 100644 --- a/cmd/frostfs-node/container.go +++ b/cmd/frostfs-node/container.go @@ -46,15 +46,15 @@ func initContainerService(_ context.Context, c *cfg) { c.shared.frostfsidClient = frostfsIDSubjectProvider - server := containerTransportGRPC.New( - containerService.NewSignService( - &c.key.PrivateKey, - containerService.NewAPEServer(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine, cnrRdr, - newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient, - containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc), - ), + service := containerService.NewSignService( + &c.key.PrivateKey, + containerService.NewAPEServer(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine, cnrRdr, + newCachedIRFetcher(createInnerRingFetcher(c)), c.netMapSource, c.shared.frostfsidClient, + containerService.NewExecutionService(containerMorph.NewExecutor(cnrRdr, cnrWrt), c.respSvc), ), ) + service = containerService.NewAuditService(service, c.log, c.audit) + server := containerTransportGRPC.New(service) c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { containerGRPC.RegisterContainerServiceServer(s, server) diff --git a/cmd/frostfs-node/control.go b/cmd/frostfs-node/control.go index e1e6e3ac..8ee1ab69 100644 --- a/cmd/frostfs-node/control.go +++ b/cmd/frostfs-node/control.go @@ -30,8 +30,8 @@ func initControlService(c *cfg) { for i := range pubs { rawPubs = append(rawPubs, pubs[i].Bytes()) } - - ctlSvc := controlSvc.New( + var ctlSvc control.ControlServiceServer + ctlSvc = controlSvc.New( controlSvc.WithKey(&c.key.PrivateKey), controlSvc.WithAuthorizedKeys(rawPubs), controlSvc.WithHealthChecker(c), @@ -43,6 +43,7 @@ func initControlService(c *cfg) { controlSvc.WithTreeService(c.treeService), controlSvc.WithLocalOverrideStorage(c.cfgObject.cfgAccessPolicyEngine.accessPolicyEngine), ) + ctlSvc = controlSvc.NewAuditService(ctlSvc, c.log, c.audit) lis, err := net.Listen("tcp", endpoint) if err != nil { diff --git a/cmd/frostfs-node/netmap.go b/cmd/frostfs-node/netmap.go index 56f2ca98..6aff8ddf 100644 --- a/cmd/frostfs-node/netmap.go +++ b/cmd/frostfs-node/netmap.go @@ -147,22 +147,22 @@ func initNetmapService(ctx context.Context, c *cfg) { initNetmapState(c) - server := netmapTransportGRPC.New( - netmapService.NewSignService( - &c.key.PrivateKey, - netmapService.NewExecutionService( - c, - c.apiVersion, - &netInfo{ - netState: c.cfgNetmap.state, - magic: c.cfgMorph.client, - morphClientNetMap: c.cfgNetmap.wrapper, - msPerBlockRdr: c.cfgMorph.client.MsPerBlock, - }, - c.respSvc, - ), + svc := netmapService.NewSignService( + &c.key.PrivateKey, + netmapService.NewExecutionService( + c, + c.apiVersion, + &netInfo{ + netState: c.cfgNetmap.state, + magic: c.cfgMorph.client, + morphClientNetMap: c.cfgNetmap.wrapper, + msPerBlockRdr: c.cfgMorph.client.MsPerBlock, + }, + c.respSvc, ), ) + svc = netmapService.NewAuditService(svc, c.log, c.audit) + server := netmapTransportGRPC.New(svc) c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { netmapGRPC.RegisterNetmapServiceServer(s, server) diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 907e517a..62183d31 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -182,7 +182,7 @@ func initObjectService(c *cfg) { sDeleteV2 := createDeleteServiceV2(sDelete) // build service pipeline - // grpc | | signature | response | acl | ape | split + // grpc | audit | | signature | response | acl | ape | split splitSvc := createSplitService(c, sPutV2, sGetV2, sSearchV2, sDeleteV2) @@ -205,7 +205,8 @@ func initObjectService(c *cfg) { c.shared.metricsSvc = objectService.NewMetricCollector( signSvc, c.metricsCollector.ObjectService(), metricsconfig.Enabled(c.appCfg)) - server := objectTransportGRPC.New(c.shared.metricsSvc) + auditSvc := objectService.NewAuditService(c.shared.metricsSvc, c.log, c.audit) + server := objectTransportGRPC.New(auditSvc) c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { objectGRPC.RegisterObjectServiceServer(s, server) diff --git a/cmd/frostfs-node/session.go b/cmd/frostfs-node/session.go index ee21ec23..d286fc63 100644 --- a/cmd/frostfs-node/session.go +++ b/cmd/frostfs-node/session.go @@ -52,12 +52,13 @@ func initSessionService(c *cfg) { c.privateTokenStore.RemoveOld(ev.(netmap.NewEpoch).EpochNumber()) }) - server := sessionTransportGRPC.New( + svc := sessionSvc.NewAuditService( sessionSvc.NewSignService( &c.key.PrivateKey, sessionSvc.NewExecutionService(c.privateTokenStore, c.respSvc, c.log), ), - ) + c.log, c.audit) + server := sessionTransportGRPC.New(svc) c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { sessionGRPC.RegisterSessionServiceServer(s, server) diff --git a/cmd/frostfs-node/tree.go b/cmd/frostfs-node/tree.go index daaaa64a..9f5c89ef 100644 --- a/cmd/frostfs-node/tree.go +++ b/cmd/frostfs-node/tree.go @@ -70,7 +70,7 @@ func initTreeService(c *cfg) { ) c.cfgGRPC.performAndSave(func(_ string, _ net.Listener, s *grpc.Server) { - tree.RegisterTreeServiceServer(s, c.treeService) + tree.RegisterTreeServiceServer(s, tree.NewAuditService(c.treeService, c.log, c.audit)) }) c.workers = append(c.workers, newWorkerFromFunc(func(ctx context.Context) { diff --git a/pkg/services/accounting/audit.go b/pkg/services/accounting/audit.go new file mode 100644 index 00000000..1d8f8836 --- /dev/null +++ b/pkg/services/accounting/audit.go @@ -0,0 +1,42 @@ +package accounting + +import ( + "context" + "sync/atomic" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting" + acc_grpc "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/accounting/grpc" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/audit" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" +) + +var _ Server = (*auditService)(nil) + +type auditService struct { + next Server + log *logger.Logger + enabled *atomic.Bool +} + +func NewAuditService(next Server, log *logger.Logger, enabled *atomic.Bool) Server { + return &auditService{ + next: next, + log: log, + enabled: enabled, + } +} + +// Balance implements Server. +func (l *auditService) Balance(ctx context.Context, req *accounting.BalanceRequest) (*accounting.BalanceResponse, error) { + res, err := l.next.Balance(ctx, req) + + if !l.enabled.Load() { + return res, err + } + + audit.LogRequest(l.log, acc_grpc.AccountingService_Balance_FullMethodName, req, + audit.TargetFromRef(req.GetBody().GetOwnerID(), &user.ID{}), err == nil) + + return res, err +} diff --git a/pkg/services/apemanager/audit.go b/pkg/services/apemanager/audit.go new file mode 100644 index 00000000..d132ae7d --- /dev/null +++ b/pkg/services/apemanager/audit.go @@ -0,0 +1,75 @@ +package apemanager + +import ( + "context" + "sync/atomic" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/apemanager" + ape_grpc "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/apemanager/grpc" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/audit" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" +) + +var _ Server = (*auditService)(nil) + +type auditService struct { + next Server + log *logger.Logger + enabled *atomic.Bool +} + +func NewAuditService(next Server, log *logger.Logger, enabled *atomic.Bool) Server { + return &auditService{ + next: next, + log: log, + enabled: enabled, + } +} + +// AddChain implements Server. +func (a *auditService) AddChain(ctx context.Context, req *apemanager.AddChainRequest) (*apemanager.AddChainResponse, error) { + res, err := a.next.AddChain(ctx, req) + if !a.enabled.Load() { + return res, err + } + + audit.LogRequest(a.log, ape_grpc.APEManagerService_AddChain_FullMethodName, req, + audit.TargetFromChainID(req.GetBody().GetTarget().GetTargetType().String(), + req.GetBody().GetTarget().GetName(), + res.GetBody().GetChainID()), + err == nil) + + return res, err +} + +// ListChains implements Server. +func (a *auditService) ListChains(ctx context.Context, req *apemanager.ListChainsRequest) (*apemanager.ListChainsResponse, error) { + res, err := a.next.ListChains(ctx, req) + if !a.enabled.Load() { + return res, err + } + + audit.LogRequest(a.log, ape_grpc.APEManagerService_ListChains_FullMethodName, req, + audit.TargetFromChainID(req.GetBody().GetTarget().GetTargetType().String(), + req.GetBody().GetTarget().GetName(), + nil), + err == nil) + + return res, err +} + +// RemoveChain implements Server. +func (a *auditService) RemoveChain(ctx context.Context, req *apemanager.RemoveChainRequest) (*apemanager.RemoveChainResponse, error) { + res, err := a.next.RemoveChain(ctx, req) + if !a.enabled.Load() { + return res, err + } + + audit.LogRequest(a.log, ape_grpc.APEManagerService_RemoveChain_FullMethodName, req, + audit.TargetFromChainID(req.GetBody().GetTarget().GetTargetType().String(), + req.GetBody().GetTarget().GetName(), + req.GetBody().GetChainID()), + err == nil) + + return res, err +} diff --git a/pkg/services/container/audit.go b/pkg/services/container/audit.go new file mode 100644 index 00000000..7ef432bb --- /dev/null +++ b/pkg/services/container/audit.go @@ -0,0 +1,116 @@ +package container + +import ( + "context" + "sync/atomic" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container" + container_grpc "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/container/grpc" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/audit" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" +) + +var _ Server = (*auditService)(nil) + +type auditService struct { + next Server + log *logger.Logger + enabled *atomic.Bool +} + +func NewAuditService(next Server, log *logger.Logger, enabled *atomic.Bool) Server { + return &auditService{ + next: next, + log: log, + enabled: enabled, + } +} + +// AnnounceUsedSpace implements Server. +func (a *auditService) AnnounceUsedSpace(ctx context.Context, req *container.AnnounceUsedSpaceRequest) (*container.AnnounceUsedSpaceResponse, error) { + res, err := a.next.AnnounceUsedSpace(ctx, req) + if !a.enabled.Load() { + return res, err + } + + var ids []*refs.ContainerID + for _, v := range req.GetBody().GetAnnouncements() { + ids = append(ids, v.GetContainerID()) + } + + audit.LogRequest(a.log, container_grpc.ContainerService_AnnounceUsedSpace_FullMethodName, req, + audit.TargetFromRefs(ids, &cid.ID{}), err == nil) + + return res, err +} + +// Delete implements Server. +func (a *auditService) Delete(ctx context.Context, req *container.DeleteRequest) (*container.DeleteResponse, error) { + res, err := a.next.Delete(ctx, req) + if !a.enabled.Load() { + return res, err + } + + audit.LogRequest(a.log, container_grpc.ContainerService_Delete_FullMethodName, req, + audit.TargetFromRef(req.GetBody().GetContainerID(), &cid.ID{}), err == nil) + + return res, err +} + +// Get implements Server. +func (a *auditService) Get(ctx context.Context, req *container.GetRequest) (*container.GetResponse, error) { + res, err := a.next.Get(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequest(a.log, container_grpc.ContainerService_Get_FullMethodName, req, + audit.TargetFromRef(req.GetBody().GetContainerID(), &cid.ID{}), err == nil) + return res, err +} + +// GetExtendedACL implements Server. +func (a *auditService) GetExtendedACL(ctx context.Context, req *container.GetExtendedACLRequest) (*container.GetExtendedACLResponse, error) { + res, err := a.next.GetExtendedACL(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequest(a.log, container_grpc.ContainerService_GetExtendedACL_FullMethodName, req, + audit.TargetFromRef(req.GetBody().GetContainerID(), &cid.ID{}), err == nil) + return res, err +} + +// List implements Server. +func (a *auditService) List(ctx context.Context, req *container.ListRequest) (*container.ListResponse, error) { + res, err := a.next.List(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequest(a.log, container_grpc.ContainerService_List_FullMethodName, req, + audit.TargetFromRef(req.GetBody().GetOwnerID(), &user.ID{}), err == nil) + return res, err +} + +// Put implements Server. +func (a *auditService) Put(ctx context.Context, req *container.PutRequest) (*container.PutResponse, error) { + res, err := a.next.Put(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequest(a.log, container_grpc.ContainerService_Put_FullMethodName, req, + audit.TargetFromRef(res.GetBody().GetContainerID(), &cid.ID{}), err == nil) + return res, err +} + +// SetExtendedACL implements Server. +func (a *auditService) SetExtendedACL(ctx context.Context, req *container.SetExtendedACLRequest) (*container.SetExtendedACLResponse, error) { + res, err := a.next.SetExtendedACL(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequest(a.log, container_grpc.ContainerService_SetExtendedACL_FullMethodName, req, + audit.TargetFromRef(req.GetBody().GetEACL().GetContainerID(), &cid.ID{}), err == nil) + return res, err +} diff --git a/pkg/services/control/server/audit.go b/pkg/services/control/server/audit.go new file mode 100644 index 00000000..16c04a8c --- /dev/null +++ b/pkg/services/control/server/audit.go @@ -0,0 +1,287 @@ +package control + +import ( + "context" + "sync/atomic" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/audit" + ctl "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/control" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +var _ ctl.ControlServiceServer = (*auditService)(nil) + +type auditService struct { + next ctl.ControlServiceServer + log *logger.Logger + enabled *atomic.Bool +} + +func NewAuditService(next ctl.ControlServiceServer, log *logger.Logger, enabled *atomic.Bool) ctl.ControlServiceServer { + return &auditService{ + next: next, + log: log, + enabled: enabled, + } +} + +// AddChainLocalOverride implements control.ControlServiceServer. +func (a *auditService) AddChainLocalOverride(ctx context.Context, req *ctl.AddChainLocalOverrideRequest) (*ctl.AddChainLocalOverrideResponse, error) { + res, err := a.next.AddChainLocalOverride(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, ctl.ControlService_AddChainLocalOverride_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromChainID(req.GetBody().GetTarget().GetType().String(), + req.GetBody().GetTarget().GetName(), + res.GetBody().GetChainId()), + err == nil) + return res, err +} + +// DetachShards implements control.ControlServiceServer. +func (a *auditService) DetachShards(ctx context.Context, req *ctl.DetachShardsRequest) (*ctl.DetachShardsResponse, error) { + res, err := a.next.DetachShards(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, ctl.ControlService_DetachShards_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromShardIDs(req.GetBody().GetShard_ID()), err == nil) + return res, err +} + +// Doctor implements control.ControlServiceServer. +func (a *auditService) Doctor(ctx context.Context, req *ctl.DoctorRequest) (*ctl.DoctorResponse, error) { + res, err := a.next.Doctor(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, ctl.ControlService_Doctor_FullMethodName, req.GetSignature().GetKey(), nil, err == nil) + return res, err +} + +// DropObjects implements control.ControlServiceServer. +func (a *auditService) DropObjects(ctx context.Context, req *ctl.DropObjectsRequest) (*ctl.DropObjectsResponse, error) { + res, err := a.next.DropObjects(ctx, req) + if !a.enabled.Load() { + return res, err + } + var list []string + for _, v := range req.GetBody().GetAddressList() { + if len(v) == 0 { + list = append(list, audit.Empty) + continue + } + var a oid.Address + if e := a.DecodeString(string(v)); e != nil { + list = append(list, audit.InvalidValue) + } else { + list = append(list, a.EncodeToString()) + } + } + + audit.LogRequestWithKey(a.log, ctl.ControlService_DropObjects_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromStringSlice(list), err == nil) + return res, err +} + +// EvacuateShard implements control.ControlServiceServer. +func (a *auditService) EvacuateShard(ctx context.Context, req *ctl.EvacuateShardRequest) (*ctl.EvacuateShardResponse, error) { + res, err := a.next.EvacuateShard(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, ctl.ControlService_EvacuateShard_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromShardIDs(req.GetBody().GetShard_ID()), err == nil) + return res, err +} + +// FlushCache implements control.ControlServiceServer. +func (a *auditService) FlushCache(ctx context.Context, req *ctl.FlushCacheRequest) (*ctl.FlushCacheResponse, error) { + res, err := a.next.FlushCache(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, ctl.ControlService_FlushCache_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromShardIDs(req.GetBody().GetShard_ID()), err == nil) + return res, err +} + +// GetChainLocalOverride implements control.ControlServiceServer. +func (a *auditService) GetChainLocalOverride(ctx context.Context, req *ctl.GetChainLocalOverrideRequest) (*ctl.GetChainLocalOverrideResponse, error) { + res, err := a.next.GetChainLocalOverride(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, ctl.ControlService_GetChainLocalOverride_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromChainID( + req.GetBody().GetTarget().GetType().String(), + req.GetBody().GetTarget().GetName(), + req.GetBody().GetChainId()), + err == nil) + return res, err +} + +// GetShardEvacuationStatus implements control.ControlServiceServer. +func (a *auditService) GetShardEvacuationStatus(ctx context.Context, req *ctl.GetShardEvacuationStatusRequest) (*ctl.GetShardEvacuationStatusResponse, error) { + res, err := a.next.GetShardEvacuationStatus(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, ctl.ControlService_GetShardEvacuationStatus_FullMethodName, req.GetSignature().GetKey(), + nil, err == nil) + return res, err +} + +// HealthCheck implements control.ControlServiceServer. +func (a *auditService) HealthCheck(ctx context.Context, req *ctl.HealthCheckRequest) (*ctl.HealthCheckResponse, error) { + res, err := a.next.HealthCheck(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, ctl.ControlService_HealthCheck_FullMethodName, req.GetSignature().GetKey(), + nil, err == nil) + return res, err +} + +// ListChainLocalOverrides implements control.ControlServiceServer. +func (a *auditService) ListChainLocalOverrides(ctx context.Context, req *ctl.ListChainLocalOverridesRequest) (*ctl.ListChainLocalOverridesResponse, error) { + res, err := a.next.ListChainLocalOverrides(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, ctl.ControlService_ListChainLocalOverrides_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromChainID(req.GetBody().GetTarget().GetType().String(), + req.GetBody().GetTarget().GetName(), + nil), + err == nil) + return res, err +} + +// ListShards implements control.ControlServiceServer. +func (a *auditService) ListShards(ctx context.Context, req *ctl.ListShardsRequest) (*ctl.ListShardsResponse, error) { + res, err := a.next.ListShards(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, ctl.ControlService_ListShards_FullMethodName, req.GetSignature().GetKey(), + nil, err == nil) + return res, err +} + +// ListTargetsLocalOverrides implements control.ControlServiceServer. +func (a *auditService) ListTargetsLocalOverrides(ctx context.Context, req *ctl.ListTargetsLocalOverridesRequest) (*ctl.ListTargetsLocalOverridesResponse, error) { + res, err := a.next.ListTargetsLocalOverrides(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, ctl.ControlService_ListTargetsLocalOverrides_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromString(req.GetBody().GetChainName()), err == nil) + return res, err +} + +// RemoveChainLocalOverride implements control.ControlServiceServer. +func (a *auditService) RemoveChainLocalOverride(ctx context.Context, req *ctl.RemoveChainLocalOverrideRequest) (*ctl.RemoveChainLocalOverrideResponse, error) { + res, err := a.next.RemoveChainLocalOverride(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, ctl.ControlService_RemoveChainLocalOverride_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromChainID(req.GetBody().GetTarget().GetType().String(), + req.GetBody().GetTarget().GetName(), + req.GetBody().GetChainId()), + err == nil) + return res, err +} + +// RemoveChainLocalOverridesByTarget implements control.ControlServiceServer. +func (a *auditService) RemoveChainLocalOverridesByTarget(ctx context.Context, req *ctl.RemoveChainLocalOverridesByTargetRequest) (*ctl.RemoveChainLocalOverridesByTargetResponse, error) { + res, err := a.next.RemoveChainLocalOverridesByTarget(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, ctl.ControlService_RemoveChainLocalOverridesByTarget_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromChainID(req.GetBody().GetTarget().GetType().String(), + req.GetBody().GetTarget().GetName(), + nil), + err == nil) + return res, err +} + +// ResetShardEvacuationStatus implements control.ControlServiceServer. +func (a *auditService) ResetShardEvacuationStatus(ctx context.Context, req *ctl.ResetShardEvacuationStatusRequest) (*ctl.ResetShardEvacuationStatusResponse, error) { + res, err := a.next.ResetShardEvacuationStatus(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, ctl.ControlService_ResetShardEvacuationStatus_FullMethodName, req.GetSignature().GetKey(), + nil, err == nil) + return res, err +} + +// SealWriteCache implements control.ControlServiceServer. +func (a *auditService) SealWriteCache(ctx context.Context, req *ctl.SealWriteCacheRequest) (*ctl.SealWriteCacheResponse, error) { + res, err := a.next.SealWriteCache(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, ctl.ControlService_SealWriteCache_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromShardIDs(req.GetBody().GetShard_ID()), err == nil) + return res, err +} + +// SetNetmapStatus implements control.ControlServiceServer. +func (a *auditService) SetNetmapStatus(ctx context.Context, req *ctl.SetNetmapStatusRequest) (*ctl.SetNetmapStatusResponse, error) { + res, err := a.next.SetNetmapStatus(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, ctl.ControlService_SetNetmapStatus_FullMethodName, req.GetSignature().GetKey(), + nil, err == nil) + return res, err +} + +// SetShardMode implements control.ControlServiceServer. +func (a *auditService) SetShardMode(ctx context.Context, req *ctl.SetShardModeRequest) (*ctl.SetShardModeResponse, error) { + res, err := a.next.SetShardMode(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, ctl.ControlService_SetShardMode_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromShardIDs(req.GetBody().GetShard_ID()), err == nil) + return res, err +} + +// StartShardEvacuation implements control.ControlServiceServer. +func (a *auditService) StartShardEvacuation(ctx context.Context, req *ctl.StartShardEvacuationRequest) (*ctl.StartShardEvacuationResponse, error) { + res, err := a.next.StartShardEvacuation(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, ctl.ControlService_StartShardEvacuation_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromShardIDs(req.GetBody().GetShard_ID()), err == nil) + return res, err +} + +// StopShardEvacuation implements control.ControlServiceServer. +func (a *auditService) StopShardEvacuation(ctx context.Context, req *ctl.StopShardEvacuationRequest) (*ctl.StopShardEvacuationResponse, error) { + res, err := a.next.StopShardEvacuation(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, ctl.ControlService_StopShardEvacuation_FullMethodName, req.GetSignature().GetKey(), + nil, err == nil) + return res, err +} + +// SynchronizeTree implements control.ControlServiceServer. +func (a *auditService) SynchronizeTree(ctx context.Context, req *ctl.SynchronizeTreeRequest) (*ctl.SynchronizeTreeResponse, error) { + res, err := a.next.SynchronizeTree(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, ctl.ControlService_SynchronizeTree_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromTreeID(req.GetBody().GetContainerId(), req.GetBody().GetTreeId()), err == nil) + return res, err +} diff --git a/pkg/services/netmap/audit.go b/pkg/services/netmap/audit.go new file mode 100644 index 00000000..906fd398 --- /dev/null +++ b/pkg/services/netmap/audit.go @@ -0,0 +1,60 @@ +package netmap + +import ( + "context" + "sync/atomic" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap" + netmapGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/netmap/grpc" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/audit" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" +) + +var _ Server = (*auditService)(nil) + +type auditService struct { + next Server + log *logger.Logger + enabled *atomic.Bool +} + +func NewAuditService(next Server, log *logger.Logger, enabled *atomic.Bool) Server { + return &auditService{ + next: next, + log: log, + enabled: enabled, + } +} + +// LocalNodeInfo implements Server. +func (a *auditService) LocalNodeInfo(ctx context.Context, req *netmap.LocalNodeInfoRequest) (*netmap.LocalNodeInfoResponse, error) { + res, err := a.next.LocalNodeInfo(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequest(a.log, netmapGRPC.NetmapService_LocalNodeInfo_FullMethodName, req, + nil, err == nil) + return res, err +} + +// NetworkInfo implements Server. +func (a *auditService) NetworkInfo(ctx context.Context, req *netmap.NetworkInfoRequest) (*netmap.NetworkInfoResponse, error) { + res, err := a.next.NetworkInfo(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequest(a.log, netmapGRPC.NetmapService_NetworkInfo_FullMethodName, req, + nil, err == nil) + return res, err +} + +// Snapshot implements Server. +func (a *auditService) Snapshot(ctx context.Context, req *netmap.SnapshotRequest) (*netmap.SnapshotResponse, error) { + res, err := a.next.Snapshot(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequest(a.log, netmapGRPC.NetmapService_NetmapSnapshot_FullMethodName, req, + nil, err == nil) + return res, err +} diff --git a/pkg/services/object/audit.go b/pkg/services/object/audit.go new file mode 100644 index 00000000..1305fa00 --- /dev/null +++ b/pkg/services/object/audit.go @@ -0,0 +1,172 @@ +package object + +import ( + "context" + "errors" + "sync/atomic" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + objectGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object/grpc" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/audit" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id" + oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" +) + +var _ ServiceServer = (*auditService)(nil) + +type auditService struct { + next ServiceServer + log *logger.Logger + enabled *atomic.Bool +} + +func NewAuditService(next ServiceServer, log *logger.Logger, enabled *atomic.Bool) ServiceServer { + return &auditService{ + next: next, + log: log, + enabled: enabled, + } +} + +// Delete implements ServiceServer. +func (a *auditService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) { + res, err := a.next.Delete(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequest(a.log, objectGRPC.ObjectService_Delete_FullMethodName, req, + audit.TargetFromRef(req.GetBody().GetAddress(), &oid.Address{}), err == nil) + return res, err +} + +// Get implements ServiceServer. +func (a *auditService) Get(req *object.GetRequest, stream GetObjectStream) error { + err := a.next.Get(req, stream) + if !a.enabled.Load() { + return err + } + audit.LogRequest(a.log, objectGRPC.ObjectService_Get_FullMethodName, req, + audit.TargetFromRef(req.GetBody().GetAddress(), &oid.Address{}), err == nil) + return err +} + +// GetRange implements ServiceServer. +func (a *auditService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error { + err := a.next.GetRange(req, stream) + if !a.enabled.Load() { + return err + } + audit.LogRequest(a.log, objectGRPC.ObjectService_GetRange_FullMethodName, req, + audit.TargetFromRef(req.GetBody().GetAddress(), &oid.Address{}), err == nil) + return err +} + +// GetRangeHash implements ServiceServer. +func (a *auditService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) { + resp, err := a.next.GetRangeHash(ctx, req) + if !a.enabled.Load() { + return resp, err + } + audit.LogRequest(a.log, objectGRPC.ObjectService_GetRangeHash_FullMethodName, req, + audit.TargetFromRef(req.GetBody().GetAddress(), &oid.Address{}), err == nil) + return resp, err +} + +// Head implements ServiceServer. +func (a *auditService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { + resp, err := a.next.Head(ctx, req) + if !a.enabled.Load() { + return resp, err + } + audit.LogRequest(a.log, objectGRPC.ObjectService_Head_FullMethodName, req, + audit.TargetFromRef(req.GetBody().GetAddress(), &oid.Address{}), err == nil) + return resp, err +} + +// Put implements ServiceServer. +func (a *auditService) Put() (PutObjectStream, error) { + res, err := a.next.Put() + if !a.enabled.Load() { + return res, err + } + if err != nil { + audit.LogRequest(a.log, objectGRPC.ObjectService_Put_FullMethodName, nil, nil, false) + return res, err + } + return &auditPutStream{ + stream: res, + log: a.log, + }, nil +} + +// PutSingle implements ServiceServer. +func (a *auditService) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) { + resp, err := a.next.PutSingle(ctx, req) + if !a.enabled.Load() { + return resp, err + } + audit.LogRequest(a.log, objectGRPC.ObjectService_PutSingle_FullMethodName, req, + audit.TargetFromContainerIDObjectID(req.GetBody().GetObject().GetHeader().GetContainerID(), + req.GetBody().GetObject().GetObjectID()), + err == nil) + return resp, err +} + +// Search implements ServiceServer. +func (a *auditService) Search(req *object.SearchRequest, stream SearchStream) error { + err := a.next.Search(req, stream) + if !a.enabled.Load() { + return err + } + audit.LogRequest(a.log, objectGRPC.ObjectService_Search_FullMethodName, req, + audit.TargetFromRef(req.GetBody().GetContainerID(), &cid.ID{}), err == nil) + return err +} + +var _ PutObjectStream = (*auditPutStream)(nil) + +type auditPutStream struct { + stream PutObjectStream + log *logger.Logger + + failed bool + key []byte + containerID *refs.ContainerID + objectID *refs.ObjectID +} + +// CloseAndRecv implements PutObjectStream. +func (a *auditPutStream) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) { + resp, err := a.stream.CloseAndRecv(ctx) + if err != nil { + a.failed = true + } + a.objectID = resp.GetBody().GetObjectID() + audit.LogRequestWithKey(a.log, objectGRPC.ObjectService_Put_FullMethodName, a.key, + audit.TargetFromContainerIDObjectID(a.containerID, a.objectID), + !a.failed) + return resp, err +} + +// Send implements PutObjectStream. +func (a *auditPutStream) Send(ctx context.Context, req *object.PutRequest) error { + if partInit, ok := req.GetBody().GetObjectPart().(*object.PutObjectPartInit); ok { + a.containerID = partInit.GetHeader().GetContainerID() + a.objectID = partInit.GetObjectID() + a.key = req.GetVerificationHeader().GetBodySignature().GetKey() + } + + err := a.stream.Send(ctx, req) + if err != nil { + a.failed = true + } + if !errors.Is(err, util.ErrAbortStream) { // CloseAndRecv will not be called, so log here + audit.LogRequestWithKey(a.log, objectGRPC.ObjectService_Put_FullMethodName, a.key, + audit.TargetFromContainerIDObjectID(a.containerID, a.objectID), + !a.failed) + } + return err +} diff --git a/pkg/services/session/audit.go b/pkg/services/session/audit.go new file mode 100644 index 00000000..19d3383d --- /dev/null +++ b/pkg/services/session/audit.go @@ -0,0 +1,39 @@ +package session + +import ( + "context" + "sync/atomic" + + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" + sessionGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session/grpc" + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/audit" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" + "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" +) + +var _ Server = (*auditService)(nil) + +type auditService struct { + next Server + log *logger.Logger + enabled *atomic.Bool +} + +func NewAuditService(next Server, log *logger.Logger, enabled *atomic.Bool) Server { + return &auditService{ + next: next, + log: log, + enabled: enabled, + } +} + +// Create implements Server. +func (a *auditService) Create(ctx context.Context, req *session.CreateRequest) (*session.CreateResponse, error) { + res, err := a.next.Create(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequest(a.log, sessionGRPC.SessionService_Create_FullMethodName, req, + audit.TargetFromRef(req.GetBody().GetOwnerID(), &user.ID{}), err == nil) + return res, err +} diff --git a/pkg/services/tree/audit.go b/pkg/services/tree/audit.go new file mode 100644 index 00000000..bec71f5d --- /dev/null +++ b/pkg/services/tree/audit.go @@ -0,0 +1,135 @@ +package tree + +import ( + "context" + "sync/atomic" + + "git.frostfs.info/TrueCloudLab/frostfs-node/internal/audit" + "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" +) + +var _ TreeServiceServer = (*auditService)(nil) + +type auditService struct { + next TreeServiceServer + log *logger.Logger + enabled *atomic.Bool +} + +func NewAuditService(next TreeServiceServer, log *logger.Logger, enabled *atomic.Bool) TreeServiceServer { + return &auditService{ + next: next, + log: log, + enabled: enabled, + } +} + +// Add implements TreeServiceServer. +func (a *auditService) Add(ctx context.Context, req *AddRequest) (*AddResponse, error) { + res, err := a.next.Add(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, TreeService_Add_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromTreeID(req.GetBody().GetContainerId(), req.GetBody().GetTreeId()), err == nil) + return res, err +} + +// AddByPath implements TreeServiceServer. +func (a *auditService) AddByPath(ctx context.Context, req *AddByPathRequest) (*AddByPathResponse, error) { + res, err := a.next.AddByPath(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, TreeService_AddByPath_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromTreeID(req.GetBody().GetContainerId(), req.GetBody().GetTreeId()), err == nil) + return res, err +} + +// Apply implements TreeServiceServer. +func (a *auditService) Apply(ctx context.Context, req *ApplyRequest) (*ApplyResponse, error) { + res, err := a.next.Apply(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, TreeService_Apply_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromTreeID(req.GetBody().GetContainerId(), req.GetBody().GetTreeId()), err == nil) + return res, err +} + +// GetNodeByPath implements TreeServiceServer. +func (a *auditService) GetNodeByPath(ctx context.Context, req *GetNodeByPathRequest) (*GetNodeByPathResponse, error) { + res, err := a.next.GetNodeByPath(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, TreeService_GetNodeByPath_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromTreeID(req.GetBody().GetContainerId(), req.GetBody().GetTreeId()), err == nil) + return res, err +} + +// GetOpLog implements TreeServiceServer. +func (a *auditService) GetOpLog(req *GetOpLogRequest, srv TreeService_GetOpLogServer) error { + err := a.next.GetOpLog(req, srv) + if !a.enabled.Load() { + return err + } + audit.LogRequestWithKey(a.log, TreeService_GetOpLog_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromTreeID(req.GetBody().GetContainerId(), req.GetBody().GetTreeId()), err == nil) + return err +} + +// GetSubTree implements TreeServiceServer. +func (a *auditService) GetSubTree(req *GetSubTreeRequest, srv TreeService_GetSubTreeServer) error { + err := a.next.GetSubTree(req, srv) + if !a.enabled.Load() { + return err + } + audit.LogRequestWithKey(a.log, TreeService_GetSubTree_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromTreeID(req.GetBody().GetContainerId(), req.GetBody().GetTreeId()), err == nil) + return err +} + +// Healthcheck implements TreeServiceServer. +func (a *auditService) Healthcheck(ctx context.Context, req *HealthcheckRequest) (*HealthcheckResponse, error) { + res, err := a.next.Healthcheck(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, TreeService_Healthcheck_FullMethodName, req.GetSignature().GetKey(), + nil, err == nil) + return res, err +} + +// Move implements TreeServiceServer. +func (a *auditService) Move(ctx context.Context, req *MoveRequest) (*MoveResponse, error) { + res, err := a.next.Move(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, TreeService_Move_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromTreeID(req.GetBody().GetContainerId(), req.GetBody().GetTreeId()), err == nil) + return res, err +} + +// Remove implements TreeServiceServer. +func (a *auditService) Remove(ctx context.Context, req *RemoveRequest) (*RemoveResponse, error) { + res, err := a.next.Remove(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, TreeService_Remove_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromTreeID(req.GetBody().GetContainerId(), req.GetBody().GetTreeId()), err == nil) + return res, err +} + +// TreeList implements TreeServiceServer. +func (a *auditService) TreeList(ctx context.Context, req *TreeListRequest) (*TreeListResponse, error) { + res, err := a.next.TreeList(ctx, req) + if !a.enabled.Load() { + return res, err + } + audit.LogRequestWithKey(a.log, TreeService_TreeList_FullMethodName, req.GetSignature().GetKey(), + audit.TargetFromTreeID(req.GetBody().GetContainerId(), ""), err == nil) + return res, err +}