frostfs-node/pkg/services/object/ape/service.go

405 lines
10 KiB
Go
Raw Normal View History

package ape
import (
"context"
"encoding/hex"
"fmt"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
nativeschema "git.frostfs.info/TrueCloudLab/policy-engine/schema/native"
)
type Service struct {
log *logger.Logger
apeChecker Checker
next objectSvc.ServiceServer
}
var _ objectSvc.ServiceServer = (*Service)(nil)
type HeaderProvider interface {
GetHeader(ctx context.Context, cnr cid.ID, oid oid.ID) (*objectSDK.Object, error)
}
type storageEngineHeaderProvider struct {
storageEngine *engine.StorageEngine
}
func (p storageEngineHeaderProvider) GetHeader(ctx context.Context, cnr cid.ID, objID oid.ID) (*objectSDK.Object, error) {
var addr oid.Address
addr.SetContainer(cnr)
addr.SetObject(objID)
return engine.Head(ctx, p.storageEngine, addr)
}
func NewStorageEngineHeaderProvider(e *engine.StorageEngine) HeaderProvider {
return storageEngineHeaderProvider{
storageEngine: e,
}
}
func NewService(log *logger.Logger, apeChecker Checker, next objectSvc.ServiceServer) *Service {
return &Service{
log: log,
apeChecker: apeChecker,
next: next,
}
}
type getStreamBasicChecker struct {
objectSvc.GetObjectStream
apeChecker Checker
senderKey []byte
role string
}
func (g *getStreamBasicChecker) Send(resp *objectV2.GetResponse) error {
if partInit, ok := resp.GetBody().GetObjectPart().(*objectV2.GetObjectPartInit); ok {
cnrID, objID, err := getAddressParamsSDK(partInit.GetHeader().GetContainerID(), partInit.GetObjectID())
if err != nil {
return toStatusErr(err)
}
prm := Prm{
Container: cnrID,
Object: objID,
Header: partInit.GetHeader(),
Method: nativeschema.MethodGetObject,
SenderKey: hex.EncodeToString(g.senderKey),
Role: g.role,
}
if err := g.apeChecker.CheckAPE(g.Context(), prm); err != nil {
return toStatusErr(err)
}
}
return g.GetObjectStream.Send(resp)
}
func requestContext(ctx context.Context) (*objectSvc.RequestContext, error) {
untyped := ctx.Value(objectSvc.RequestContextKey)
if untyped == nil {
return nil, fmt.Errorf("no key %s in context", objectSvc.RequestContextKey)
}
rc, ok := untyped.(*objectSvc.RequestContext)
if !ok {
return nil, fmt.Errorf("failed cast to RequestContext")
}
return rc, nil
}
func (c *Service) Get(request *objectV2.GetRequest, stream objectSvc.GetObjectStream) error {
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
if err != nil {
return toStatusErr(err)
}
reqCtx, err := requestContext(stream.Context())
if err != nil {
return toStatusErr(err)
}
err = c.apeChecker.CheckAPE(stream.Context(), Prm{
Namespace: reqCtx.Namespace,
Container: cnrID,
Object: objID,
Method: nativeschema.MethodGetObject,
Role: nativeSchemaRole(reqCtx.Role),
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
})
if err != nil {
return toStatusErr(err)
}
return c.next.Get(request, &getStreamBasicChecker{
GetObjectStream: stream,
apeChecker: c.apeChecker,
})
}
type putStreamBasicChecker struct {
apeChecker Checker
next objectSvc.PutObjectStream
}
func (p *putStreamBasicChecker) Send(ctx context.Context, request *objectV2.PutRequest) error {
if partInit, ok := request.GetBody().GetObjectPart().(*objectV2.PutObjectPartInit); ok {
reqCtx, err := requestContext(ctx)
if err != nil {
return toStatusErr(err)
}
cnrID, objID, err := getAddressParamsSDK(partInit.GetHeader().GetContainerID(), partInit.GetObjectID())
if err != nil {
return toStatusErr(err)
}
prm := Prm{
Namespace: reqCtx.Namespace,
Container: cnrID,
Object: objID,
Header: partInit.GetHeader(),
Method: nativeschema.MethodPutObject,
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
Role: nativeSchemaRole(reqCtx.Role),
}
if err := p.apeChecker.CheckAPE(ctx, prm); err != nil {
return toStatusErr(err)
}
}
return p.next.Send(ctx, request)
}
func (p putStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PutResponse, error) {
return p.next.CloseAndRecv(ctx)
}
func (c *Service) Put() (objectSvc.PutObjectStream, error) {
streamer, err := c.next.Put()
return &putStreamBasicChecker{
apeChecker: c.apeChecker,
next: streamer,
}, err
}
func (c *Service) Head(ctx context.Context, request *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
if err != nil {
return nil, err
}
reqCtx, err := requestContext(ctx)
if err != nil {
return nil, err
}
err = c.apeChecker.CheckAPE(ctx, Prm{
Namespace: reqCtx.Namespace,
Container: cnrID,
Object: objID,
Method: nativeschema.MethodHeadObject,
Role: nativeSchemaRole(reqCtx.Role),
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
})
if err != nil {
return nil, err
}
resp, err := c.next.Head(ctx, request)
if err != nil {
return nil, err
}
header := new(objectV2.Header)
switch headerPart := resp.GetBody().GetHeaderPart().(type) {
case *objectV2.ShortHeader:
cidV2 := new(refs.ContainerID)
cnrID.WriteToV2(cidV2)
header.SetContainerID(cidV2)
header.SetVersion(headerPart.GetVersion())
header.SetCreationEpoch(headerPart.GetCreationEpoch())
header.SetOwnerID(headerPart.GetOwnerID())
header.SetObjectType(headerPart.GetObjectType())
header.SetHomomorphicHash(header.GetHomomorphicHash())
header.SetPayloadLength(headerPart.GetPayloadLength())
header.SetPayloadHash(headerPart.GetPayloadHash())
case *objectV2.HeaderWithSignature:
header = headerPart.GetHeader()
default:
return resp, nil
}
err = c.apeChecker.CheckAPE(ctx, Prm{
Namespace: reqCtx.Namespace,
Container: cnrID,
Object: objID,
Header: header,
Method: nativeschema.MethodHeadObject,
Role: nativeSchemaRole(reqCtx.Role),
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
})
if err != nil {
return nil, err
}
return resp, nil
}
func (c *Service) Search(request *objectV2.SearchRequest, stream objectSvc.SearchStream) error {
var cnrID cid.ID
if cnrV2 := request.GetBody().GetContainerID(); cnrV2 != nil {
if err := cnrID.ReadFromV2(*cnrV2); err != nil {
return toStatusErr(err)
}
}
reqCtx, err := requestContext(stream.Context())
if err != nil {
return toStatusErr(err)
}
err = c.apeChecker.CheckAPE(stream.Context(), Prm{
Namespace: reqCtx.Namespace,
Container: cnrID,
Method: nativeschema.MethodSearchObject,
Role: nativeSchemaRole(reqCtx.Role),
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
})
if err != nil {
return toStatusErr(err)
}
return c.next.Search(request, stream)
}
func (c *Service) Delete(ctx context.Context, request *objectV2.DeleteRequest) (*objectV2.DeleteResponse, error) {
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
if err != nil {
return nil, err
}
reqCtx, err := requestContext(ctx)
if err != nil {
return nil, err
}
err = c.apeChecker.CheckAPE(ctx, Prm{
Namespace: reqCtx.Namespace,
Container: cnrID,
Object: objID,
Method: nativeschema.MethodDeleteObject,
Role: nativeSchemaRole(reqCtx.Role),
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
})
if err != nil {
return nil, err
}
resp, err := c.next.Delete(ctx, request)
if err != nil {
return nil, err
}
return resp, nil
}
func (c *Service) GetRange(request *objectV2.GetRangeRequest, stream objectSvc.GetObjectRangeStream) error {
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
if err != nil {
return toStatusErr(err)
}
reqCtx, err := requestContext(stream.Context())
if err != nil {
return toStatusErr(err)
}
err = c.apeChecker.CheckAPE(stream.Context(), Prm{
Namespace: reqCtx.Namespace,
Container: cnrID,
Object: objID,
Method: nativeschema.MethodRangeObject,
Role: nativeSchemaRole(reqCtx.Role),
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
})
if err != nil {
return toStatusErr(err)
}
return c.next.GetRange(request, stream)
}
func (c *Service) GetRangeHash(ctx context.Context, request *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) {
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
if err != nil {
return nil, err
}
reqCtx, err := requestContext(ctx)
if err != nil {
return nil, err
}
prm := Prm{
Namespace: reqCtx.Namespace,
Container: cnrID,
Object: objID,
Method: nativeschema.MethodHashObject,
Role: nativeSchemaRole(reqCtx.Role),
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
}
if err = c.apeChecker.CheckAPE(ctx, prm); err != nil {
return nil, err
}
resp, err := c.next.GetRangeHash(ctx, request)
if err != nil {
return nil, err
}
if err = c.apeChecker.CheckAPE(ctx, prm); err != nil {
return nil, err
}
return resp, nil
}
func (c *Service) PutSingle(ctx context.Context, request *objectV2.PutSingleRequest) (*objectV2.PutSingleResponse, error) {
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetObject().GetHeader().GetContainerID(), request.GetBody().GetObject().GetObjectID())
if err != nil {
return nil, err
}
reqCtx, err := requestContext(ctx)
if err != nil {
return nil, err
}
prm := Prm{
Namespace: reqCtx.Namespace,
Container: cnrID,
Object: objID,
Header: request.GetBody().GetObject().GetHeader(),
Method: nativeschema.MethodPutObject,
Role: nativeSchemaRole(reqCtx.Role),
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
}
if err = c.apeChecker.CheckAPE(ctx, prm); err != nil {
return nil, toStatusErr(err)
}
return c.next.PutSingle(ctx, request)
}
func getAddressParamsSDK(cidV2 *refs.ContainerID, objV2 *refs.ObjectID) (cnrID cid.ID, objID *oid.ID, err error) {
if cidV2 != nil {
if err = cnrID.ReadFromV2(*cidV2); err != nil {
return
}
}
if objV2 != nil {
objID = new(oid.ID)
if err = objID.ReadFromV2(*objV2); err != nil {
return
}
}
return
}