forked from TrueCloudLab/frostfs-node
[#1184] node: Add audit middleware for grpc services
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
ac1f014747
commit
ecd1ed7a5e
16 changed files with 967 additions and 36 deletions
172
pkg/services/object/audit.go
Normal file
172
pkg/services/object/audit.go
Normal file
|
@ -0,0 +1,172 @@
|
|||
package object
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
objectGRPC "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object/grpc"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/audit"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
var _ ServiceServer = (*auditService)(nil)
|
||||
|
||||
type auditService struct {
|
||||
next ServiceServer
|
||||
log *logger.Logger
|
||||
enabled *atomic.Bool
|
||||
}
|
||||
|
||||
func NewAuditService(next ServiceServer, log *logger.Logger, enabled *atomic.Bool) ServiceServer {
|
||||
return &auditService{
|
||||
next: next,
|
||||
log: log,
|
||||
enabled: enabled,
|
||||
}
|
||||
}
|
||||
|
||||
// Delete implements ServiceServer.
|
||||
func (a *auditService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
|
||||
res, err := a.next.Delete(ctx, req)
|
||||
if !a.enabled.Load() {
|
||||
return res, err
|
||||
}
|
||||
audit.LogRequest(a.log, objectGRPC.ObjectService_Delete_FullMethodName, req,
|
||||
audit.TargetFromRef(req.GetBody().GetAddress(), &oid.Address{}), err == nil)
|
||||
return res, err
|
||||
}
|
||||
|
||||
// Get implements ServiceServer.
|
||||
func (a *auditService) Get(req *object.GetRequest, stream GetObjectStream) error {
|
||||
err := a.next.Get(req, stream)
|
||||
if !a.enabled.Load() {
|
||||
return err
|
||||
}
|
||||
audit.LogRequest(a.log, objectGRPC.ObjectService_Get_FullMethodName, req,
|
||||
audit.TargetFromRef(req.GetBody().GetAddress(), &oid.Address{}), err == nil)
|
||||
return err
|
||||
}
|
||||
|
||||
// GetRange implements ServiceServer.
|
||||
func (a *auditService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
|
||||
err := a.next.GetRange(req, stream)
|
||||
if !a.enabled.Load() {
|
||||
return err
|
||||
}
|
||||
audit.LogRequest(a.log, objectGRPC.ObjectService_GetRange_FullMethodName, req,
|
||||
audit.TargetFromRef(req.GetBody().GetAddress(), &oid.Address{}), err == nil)
|
||||
return err
|
||||
}
|
||||
|
||||
// GetRangeHash implements ServiceServer.
|
||||
func (a *auditService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
|
||||
resp, err := a.next.GetRangeHash(ctx, req)
|
||||
if !a.enabled.Load() {
|
||||
return resp, err
|
||||
}
|
||||
audit.LogRequest(a.log, objectGRPC.ObjectService_GetRangeHash_FullMethodName, req,
|
||||
audit.TargetFromRef(req.GetBody().GetAddress(), &oid.Address{}), err == nil)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// Head implements ServiceServer.
|
||||
func (a *auditService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
||||
resp, err := a.next.Head(ctx, req)
|
||||
if !a.enabled.Load() {
|
||||
return resp, err
|
||||
}
|
||||
audit.LogRequest(a.log, objectGRPC.ObjectService_Head_FullMethodName, req,
|
||||
audit.TargetFromRef(req.GetBody().GetAddress(), &oid.Address{}), err == nil)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// Put implements ServiceServer.
|
||||
func (a *auditService) Put() (PutObjectStream, error) {
|
||||
res, err := a.next.Put()
|
||||
if !a.enabled.Load() {
|
||||
return res, err
|
||||
}
|
||||
if err != nil {
|
||||
audit.LogRequest(a.log, objectGRPC.ObjectService_Put_FullMethodName, nil, nil, false)
|
||||
return res, err
|
||||
}
|
||||
return &auditPutStream{
|
||||
stream: res,
|
||||
log: a.log,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// PutSingle implements ServiceServer.
|
||||
func (a *auditService) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
||||
resp, err := a.next.PutSingle(ctx, req)
|
||||
if !a.enabled.Load() {
|
||||
return resp, err
|
||||
}
|
||||
audit.LogRequest(a.log, objectGRPC.ObjectService_PutSingle_FullMethodName, req,
|
||||
audit.TargetFromContainerIDObjectID(req.GetBody().GetObject().GetHeader().GetContainerID(),
|
||||
req.GetBody().GetObject().GetObjectID()),
|
||||
err == nil)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// Search implements ServiceServer.
|
||||
func (a *auditService) Search(req *object.SearchRequest, stream SearchStream) error {
|
||||
err := a.next.Search(req, stream)
|
||||
if !a.enabled.Load() {
|
||||
return err
|
||||
}
|
||||
audit.LogRequest(a.log, objectGRPC.ObjectService_Search_FullMethodName, req,
|
||||
audit.TargetFromRef(req.GetBody().GetContainerID(), &cid.ID{}), err == nil)
|
||||
return err
|
||||
}
|
||||
|
||||
var _ PutObjectStream = (*auditPutStream)(nil)
|
||||
|
||||
type auditPutStream struct {
|
||||
stream PutObjectStream
|
||||
log *logger.Logger
|
||||
|
||||
failed bool
|
||||
key []byte
|
||||
containerID *refs.ContainerID
|
||||
objectID *refs.ObjectID
|
||||
}
|
||||
|
||||
// CloseAndRecv implements PutObjectStream.
|
||||
func (a *auditPutStream) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) {
|
||||
resp, err := a.stream.CloseAndRecv(ctx)
|
||||
if err != nil {
|
||||
a.failed = true
|
||||
}
|
||||
a.objectID = resp.GetBody().GetObjectID()
|
||||
audit.LogRequestWithKey(a.log, objectGRPC.ObjectService_Put_FullMethodName, a.key,
|
||||
audit.TargetFromContainerIDObjectID(a.containerID, a.objectID),
|
||||
!a.failed)
|
||||
return resp, err
|
||||
}
|
||||
|
||||
// Send implements PutObjectStream.
|
||||
func (a *auditPutStream) Send(ctx context.Context, req *object.PutRequest) error {
|
||||
if partInit, ok := req.GetBody().GetObjectPart().(*object.PutObjectPartInit); ok {
|
||||
a.containerID = partInit.GetHeader().GetContainerID()
|
||||
a.objectID = partInit.GetObjectID()
|
||||
a.key = req.GetVerificationHeader().GetBodySignature().GetKey()
|
||||
}
|
||||
|
||||
err := a.stream.Send(ctx, req)
|
||||
if err != nil {
|
||||
a.failed = true
|
||||
}
|
||||
if !errors.Is(err, util.ErrAbortStream) { // CloseAndRecv will not be called, so log here
|
||||
audit.LogRequestWithKey(a.log, objectGRPC.ObjectService_Put_FullMethodName, a.key,
|
||||
audit.TargetFromContainerIDObjectID(a.containerID, a.objectID),
|
||||
!a.failed)
|
||||
}
|
||||
return err
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue