frostfs-node/pkg/services/object/audit.go
Airat Arifullin 9b13a18aac [#1479] go.mod: Bump frostfs-sdk-go version
* Update version within go.mod;
* Fix deprecated frostfs-api-go/v2 package and use frostfs-sdk-go/api
  instead.

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2024-11-08 10:43:19 +03:00

233 lines
7 KiB
Go

package object
import (
"context"
"errors"
"sync/atomic"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/audit"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
objectGRPC "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object/grpc"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
var _ ServiceServer = (*auditService)(nil)
type auditService struct {
next ServiceServer
log *logger.Logger
enabled *atomic.Bool
}
func NewAuditService(next ServiceServer, log *logger.Logger, enabled *atomic.Bool) ServiceServer {
return &auditService{
next: next,
log: log,
enabled: enabled,
}
}
// Delete implements ServiceServer.
func (a *auditService) Delete(ctx context.Context, req *object.DeleteRequest) (*object.DeleteResponse, error) {
res, err := a.next.Delete(ctx, req)
if !a.enabled.Load() {
return res, err
}
audit.LogRequest(a.log, objectGRPC.ObjectService_Delete_FullMethodName, req,
audit.TargetFromRef(req.GetBody().GetAddress(), &oid.Address{}), err == nil)
return res, err
}
// Get implements ServiceServer.
func (a *auditService) Get(req *object.GetRequest, stream GetObjectStream) error {
err := a.next.Get(req, stream)
if !a.enabled.Load() {
return err
}
audit.LogRequest(a.log, objectGRPC.ObjectService_Get_FullMethodName, req,
audit.TargetFromRef(req.GetBody().GetAddress(), &oid.Address{}), err == nil)
return err
}
// GetRange implements ServiceServer.
func (a *auditService) GetRange(req *object.GetRangeRequest, stream GetObjectRangeStream) error {
err := a.next.GetRange(req, stream)
if !a.enabled.Load() {
return err
}
audit.LogRequest(a.log, objectGRPC.ObjectService_GetRange_FullMethodName, req,
audit.TargetFromRef(req.GetBody().GetAddress(), &oid.Address{}), err == nil)
return err
}
// GetRangeHash implements ServiceServer.
func (a *auditService) GetRangeHash(ctx context.Context, req *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) {
resp, err := a.next.GetRangeHash(ctx, req)
if !a.enabled.Load() {
return resp, err
}
audit.LogRequest(a.log, objectGRPC.ObjectService_GetRangeHash_FullMethodName, req,
audit.TargetFromRef(req.GetBody().GetAddress(), &oid.Address{}), err == nil)
return resp, err
}
// Head implements ServiceServer.
func (a *auditService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
resp, err := a.next.Head(ctx, req)
if !a.enabled.Load() {
return resp, err
}
audit.LogRequest(a.log, objectGRPC.ObjectService_Head_FullMethodName, req,
audit.TargetFromRef(req.GetBody().GetAddress(), &oid.Address{}), err == nil)
return resp, err
}
// Put implements ServiceServer.
func (a *auditService) Put() (PutObjectStream, error) {
res, err := a.next.Put()
if !a.enabled.Load() {
return res, err
}
if err != nil {
audit.LogRequest(a.log, objectGRPC.ObjectService_Put_FullMethodName, nil, nil, false)
return res, err
}
return &auditPutStream{
stream: res,
log: a.log,
}, nil
}
// PutSingle implements ServiceServer.
func (a *auditService) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
resp, err := a.next.PutSingle(ctx, req)
if !a.enabled.Load() {
return resp, err
}
audit.LogRequest(a.log, objectGRPC.ObjectService_PutSingle_FullMethodName, req,
audit.TargetFromContainerIDObjectID(req.GetBody().GetObject().GetHeader().GetContainerID(),
req.GetBody().GetObject().GetObjectID()),
err == nil)
return resp, err
}
// Search implements ServiceServer.
func (a *auditService) Search(req *object.SearchRequest, stream SearchStream) error {
err := a.next.Search(req, stream)
if !a.enabled.Load() {
return err
}
audit.LogRequest(a.log, objectGRPC.ObjectService_Search_FullMethodName, req,
audit.TargetFromRef(req.GetBody().GetContainerID(), &cid.ID{}), err == nil)
return err
}
var _ PutObjectStream = (*auditPutStream)(nil)
type auditPutStream struct {
stream PutObjectStream
log *logger.Logger
failed bool
key []byte
containerID *refs.ContainerID
objectID *refs.ObjectID
}
// CloseAndRecv implements PutObjectStream.
func (a *auditPutStream) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) {
resp, err := a.stream.CloseAndRecv(ctx)
if err != nil {
a.failed = true
}
a.objectID = resp.GetBody().GetObjectID()
audit.LogRequestWithKey(a.log, objectGRPC.ObjectService_Put_FullMethodName, a.key,
audit.TargetFromContainerIDObjectID(a.containerID, a.objectID),
!a.failed)
return resp, err
}
// Send implements PutObjectStream.
func (a *auditPutStream) Send(ctx context.Context, req *object.PutRequest) error {
if partInit, ok := req.GetBody().GetObjectPart().(*object.PutObjectPartInit); ok {
a.containerID = partInit.GetHeader().GetContainerID()
a.objectID = partInit.GetObjectID()
a.key = req.GetVerificationHeader().GetBodySignature().GetKey()
}
err := a.stream.Send(ctx, req)
if err != nil {
a.failed = true
}
if !errors.Is(err, util.ErrAbortStream) { // CloseAndRecv will not be called, so log here
audit.LogRequestWithKey(a.log, objectGRPC.ObjectService_Put_FullMethodName, a.key,
audit.TargetFromContainerIDObjectID(a.containerID, a.objectID),
!a.failed)
}
return err
}
type auditPatchStream struct {
stream PatchObjectStream
log *logger.Logger
failed bool
key []byte
containerID *refs.ContainerID
objectID *refs.ObjectID
nonFirstSend bool
}
func (a *auditService) Patch() (PatchObjectStream, error) {
res, err := a.next.Patch()
if !a.enabled.Load() {
return res, err
}
if err != nil {
audit.LogRequest(a.log, objectGRPC.ObjectService_Patch_FullMethodName, nil, nil, false)
return res, err
}
return &auditPatchStream{
stream: res,
log: a.log,
}, nil
}
// CloseAndRecv implements PatchObjectStream.
func (a *auditPatchStream) CloseAndRecv(ctx context.Context) (*object.PatchResponse, error) {
resp, err := a.stream.CloseAndRecv(ctx)
if err != nil {
a.failed = true
}
a.objectID = resp.GetBody().GetObjectID()
audit.LogRequestWithKey(a.log, objectGRPC.ObjectService_Patch_FullMethodName, a.key,
audit.TargetFromContainerIDObjectID(a.containerID, a.objectID),
!a.failed)
return resp, err
}
// Send implements PatchObjectStream.
func (a *auditPatchStream) Send(ctx context.Context, req *object.PatchRequest) error {
if !a.nonFirstSend {
a.containerID = req.GetBody().GetAddress().GetContainerID()
a.objectID = req.GetBody().GetAddress().GetObjectID()
a.key = req.GetVerificationHeader().GetBodySignature().GetKey()
a.nonFirstSend = true
}
err := a.stream.Send(ctx, req)
if err != nil {
a.failed = true
}
if !errors.Is(err, util.ErrAbortStream) { // CloseAndRecv will not be called, so log here
audit.LogRequestWithKey(a.log, objectGRPC.ObjectService_Patch_FullMethodName, a.key,
audit.TargetFromContainerIDObjectID(a.containerID, a.objectID),
!a.failed)
}
return err
}