frostfs-node/pkg/services/object/ape/service.go
Airat Arifullin 73e35bc885 [#1052] object: Make ape middleware form request info
* Move some helpers from `acl/v2` package to `ape`. Also move errors;
* Introduce `Metadata`, `RequestInfo` types;
* Introduce `RequestInfoExtractor` interface and its implementation.
  The extractor's purpose is to extract request info based on request
  metadata. It also validates session token;
* Refactor ape service - each handler forms request info and pass
  necessary fields to checker.

Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
2025-03-23 06:39:32 +00:00

471 lines
13 KiB
Go

package ape
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/session"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
nativeschema "git.frostfs.info/TrueCloudLab/policy-engine/schema/native"
)
type Service struct {
apeChecker Checker
extractor RequestInfoExtractor
next objectSvc.ServiceServer
}
var _ objectSvc.ServiceServer = (*Service)(nil)
type HeaderProvider interface {
GetHeader(ctx context.Context, cnr cid.ID, oid oid.ID, local bool) (*objectSDK.Object, error)
}
type storageEngineHeaderProvider struct {
storageEngine *engine.StorageEngine
getSvc *getsvc.Service
}
func (p storageEngineHeaderProvider) GetHeader(ctx context.Context, cnr cid.ID, objID oid.ID, local bool) (*objectSDK.Object, error) {
var addr oid.Address
addr.SetContainer(cnr)
addr.SetObject(objID)
if local {
return engine.Head(ctx, p.storageEngine, addr)
}
w := getsvc.NewSimpleObjectWriter()
var headPrm getsvc.HeadPrm
headPrm.WithAddress(addr)
headPrm.SetHeaderWriter(w)
headPrm.SetCommonParameters(&util.CommonPrm{}) // default values are ok
if err := p.getSvc.Head(ctx, headPrm); err != nil {
return nil, err
}
return w.Object(), nil
}
func NewStorageEngineHeaderProvider(e *engine.StorageEngine, s *getsvc.Service) HeaderProvider {
return storageEngineHeaderProvider{
storageEngine: e,
getSvc: s,
}
}
func NewService(apeChecker Checker, extractor RequestInfoExtractor, next objectSvc.ServiceServer) *Service {
return &Service{
apeChecker: apeChecker,
extractor: extractor,
next: next,
}
}
type getStreamBasicChecker struct {
objectSvc.GetObjectStream
apeChecker Checker
metadata Metadata
reqInfo RequestInfo
}
func (g *getStreamBasicChecker) Send(resp *objectV2.GetResponse) error {
if partInit, ok := resp.GetBody().GetObjectPart().(*objectV2.GetObjectPartInit); ok {
cnrID, objID, err := getAddressParamsSDK(partInit.GetHeader().GetContainerID(), partInit.GetObjectID())
if err != nil {
return toStatusErr(err)
}
prm := Prm{
Namespace: g.reqInfo.Namespace,
Container: cnrID,
Object: objID,
Header: partInit.GetHeader(),
Method: nativeschema.MethodGetObject,
SenderKey: g.reqInfo.SenderKey,
ContainerOwner: g.reqInfo.ContainerOwner,
Role: g.reqInfo.Role,
BearerToken: g.metadata.BearerToken,
XHeaders: resp.GetMetaHeader().GetXHeaders(),
}
if err := g.apeChecker.CheckAPE(g.Context(), prm); err != nil {
return toStatusErr(err)
}
}
return g.GetObjectStream.Send(resp)
}
func (c *Service) Get(request *objectV2.GetRequest, stream objectSvc.GetObjectStream) error {
md, err := newMetadata(request, request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
if err != nil {
return err
}
reqInfo, err := c.extractor.GetRequestInfo(stream.Context(), md, nativeschema.MethodGetObject)
if err != nil {
return err
}
return c.next.Get(request, &getStreamBasicChecker{
GetObjectStream: stream,
apeChecker: c.apeChecker,
metadata: md,
reqInfo: reqInfo,
})
}
type putStreamBasicChecker struct {
apeChecker Checker
extractor RequestInfoExtractor
next objectSvc.PutObjectStream
}
func (p *putStreamBasicChecker) Send(ctx context.Context, request *objectV2.PutRequest) error {
if partInit, ok := request.GetBody().GetObjectPart().(*objectV2.PutObjectPartInit); ok {
md, err := newMetadata(request, partInit.GetHeader().GetContainerID(), partInit.GetObjectID())
if err != nil {
return err
}
reqInfo, err := p.extractor.GetRequestInfo(ctx, md, nativeschema.MethodPutObject)
if err != nil {
return err
}
prm := Prm{
Namespace: reqInfo.Namespace,
Container: md.Container,
Object: md.Object,
Header: partInit.GetHeader(),
Method: nativeschema.MethodPutObject,
SenderKey: reqInfo.SenderKey,
ContainerOwner: reqInfo.ContainerOwner,
Role: reqInfo.Role,
BearerToken: md.BearerToken,
XHeaders: md.MetaHeader.GetXHeaders(),
}
if err := p.apeChecker.CheckAPE(ctx, prm); err != nil {
return toStatusErr(err)
}
}
return p.next.Send(ctx, request)
}
func (p putStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PutResponse, error) {
return p.next.CloseAndRecv(ctx)
}
func (c *Service) Put(ctx context.Context) (objectSvc.PutObjectStream, error) {
streamer, err := c.next.Put(ctx)
return &putStreamBasicChecker{
apeChecker: c.apeChecker,
extractor: c.extractor,
next: streamer,
}, err
}
type patchStreamBasicChecker struct {
apeChecker Checker
extractor RequestInfoExtractor
next objectSvc.PatchObjectStream
nonFirstSend bool
}
func (p *patchStreamBasicChecker) Send(ctx context.Context, request *objectV2.PatchRequest) error {
if !p.nonFirstSend {
p.nonFirstSend = true
md, err := newMetadata(request, request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
if err != nil {
return err
}
reqInfo, err := p.extractor.GetRequestInfo(ctx, md, nativeschema.MethodPatchObject)
if err != nil {
return err
}
prm := Prm{
Namespace: reqInfo.Namespace,
Container: md.Container,
Object: md.Object,
Method: nativeschema.MethodPatchObject,
SenderKey: reqInfo.SenderKey,
ContainerOwner: reqInfo.ContainerOwner,
Role: reqInfo.Role,
BearerToken: md.BearerToken,
XHeaders: md.MetaHeader.GetXHeaders(),
}
if err := p.apeChecker.CheckAPE(ctx, prm); err != nil {
return toStatusErr(err)
}
}
return p.next.Send(ctx, request)
}
func (p patchStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) {
return p.next.CloseAndRecv(ctx)
}
func (c *Service) Patch(ctx context.Context) (objectSvc.PatchObjectStream, error) {
streamer, err := c.next.Patch(ctx)
return &patchStreamBasicChecker{
apeChecker: c.apeChecker,
extractor: c.extractor,
next: streamer,
}, err
}
func (c *Service) Head(ctx context.Context, request *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
md, err := newMetadata(request, request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
if err != nil {
return nil, err
}
reqInfo, err := c.extractor.GetRequestInfo(ctx, md, nativeschema.MethodHeadObject)
if err != nil {
return nil, err
}
resp, err := c.next.Head(ctx, request)
if err != nil {
return nil, err
}
header := new(objectV2.Header)
switch headerPart := resp.GetBody().GetHeaderPart().(type) {
case *objectV2.ShortHeader:
cidV2 := new(refs.ContainerID)
md.Container.WriteToV2(cidV2)
header.SetContainerID(cidV2)
header.SetVersion(headerPart.GetVersion())
header.SetCreationEpoch(headerPart.GetCreationEpoch())
header.SetOwnerID(headerPart.GetOwnerID())
header.SetObjectType(headerPart.GetObjectType())
header.SetHomomorphicHash(header.GetHomomorphicHash())
header.SetPayloadLength(headerPart.GetPayloadLength())
header.SetPayloadHash(headerPart.GetPayloadHash())
case *objectV2.HeaderWithSignature:
header = headerPart.GetHeader()
default:
return resp, nil
}
err = c.apeChecker.CheckAPE(ctx, Prm{
Namespace: reqInfo.Namespace,
Container: md.Container,
Object: md.Object,
Header: header,
Method: nativeschema.MethodHeadObject,
Role: reqInfo.Role,
SenderKey: reqInfo.SenderKey,
ContainerOwner: reqInfo.ContainerOwner,
BearerToken: md.BearerToken,
XHeaders: md.MetaHeader.GetXHeaders(),
})
if err != nil {
return nil, toStatusErr(err)
}
return resp, nil
}
func (c *Service) Search(request *objectV2.SearchRequest, stream objectSvc.SearchStream) error {
md, err := newMetadata(request, request.GetBody().GetContainerID(), nil)
if err != nil {
return err
}
reqInfo, err := c.extractor.GetRequestInfo(stream.Context(), md, nativeschema.MethodSearchObject)
if err != nil {
return err
}
err = c.apeChecker.CheckAPE(stream.Context(), Prm{
Namespace: reqInfo.Namespace,
Container: md.Container,
Method: nativeschema.MethodSearchObject,
Role: reqInfo.Role,
SenderKey: reqInfo.SenderKey,
ContainerOwner: reqInfo.ContainerOwner,
BearerToken: md.BearerToken,
XHeaders: md.MetaHeader.GetXHeaders(),
})
if err != nil {
return toStatusErr(err)
}
return c.next.Search(request, stream)
}
func (c *Service) Delete(ctx context.Context, request *objectV2.DeleteRequest) (*objectV2.DeleteResponse, error) {
md, err := newMetadata(request, request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
if err != nil {
return nil, err
}
reqInfo, err := c.extractor.GetRequestInfo(ctx, md, nativeschema.MethodDeleteObject)
if err != nil {
return nil, err
}
err = c.apeChecker.CheckAPE(ctx, Prm{
Namespace: reqInfo.Namespace,
Container: md.Container,
Object: md.Object,
Method: nativeschema.MethodDeleteObject,
Role: reqInfo.Role,
SenderKey: reqInfo.SenderKey,
ContainerOwner: reqInfo.ContainerOwner,
BearerToken: md.BearerToken,
XHeaders: md.MetaHeader.GetXHeaders(),
})
if err != nil {
return nil, toStatusErr(err)
}
resp, err := c.next.Delete(ctx, request)
if err != nil {
return nil, err
}
return resp, nil
}
func (c *Service) GetRange(request *objectV2.GetRangeRequest, stream objectSvc.GetObjectRangeStream) error {
md, err := newMetadata(request, request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
if err != nil {
return err
}
reqInfo, err := c.extractor.GetRequestInfo(stream.Context(), md, nativeschema.MethodRangeObject)
if err != nil {
return err
}
err = c.apeChecker.CheckAPE(stream.Context(), Prm{
Namespace: reqInfo.Namespace,
Container: md.Container,
Object: md.Object,
Method: nativeschema.MethodRangeObject,
Role: reqInfo.Role,
SenderKey: reqInfo.SenderKey,
ContainerOwner: reqInfo.ContainerOwner,
BearerToken: md.BearerToken,
XHeaders: md.MetaHeader.GetXHeaders(),
})
if err != nil {
return toStatusErr(err)
}
return c.next.GetRange(request, stream)
}
func (c *Service) GetRangeHash(ctx context.Context, request *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) {
md, err := newMetadata(request, request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
if err != nil {
return nil, err
}
reqInfo, err := c.extractor.GetRequestInfo(ctx, md, nativeschema.MethodHashObject)
if err != nil {
return nil, err
}
prm := Prm{
Namespace: reqInfo.Namespace,
Container: md.Container,
Object: md.Object,
Method: nativeschema.MethodHashObject,
Role: reqInfo.Role,
SenderKey: reqInfo.SenderKey,
ContainerOwner: reqInfo.ContainerOwner,
BearerToken: md.BearerToken,
XHeaders: md.MetaHeader.GetXHeaders(),
}
resp, err := c.next.GetRangeHash(ctx, request)
if err != nil {
return nil, err
}
if err = c.apeChecker.CheckAPE(ctx, prm); err != nil {
return nil, toStatusErr(err)
}
return resp, nil
}
func (c *Service) PutSingle(ctx context.Context, request *objectV2.PutSingleRequest) (*objectV2.PutSingleResponse, error) {
md, err := newMetadata(request, request.GetBody().GetObject().GetHeader().GetContainerID(), request.GetBody().GetObject().GetObjectID())
if err != nil {
return nil, err
}
reqInfo, err := c.extractor.GetRequestInfo(ctx, md, nativeschema.MethodPutObject)
if err != nil {
return nil, err
}
prm := Prm{
Namespace: reqInfo.Namespace,
Container: md.Container,
Object: md.Object,
Header: request.GetBody().GetObject().GetHeader(),
Method: nativeschema.MethodPutObject,
Role: reqInfo.Role,
SenderKey: reqInfo.SenderKey,
ContainerOwner: reqInfo.ContainerOwner,
BearerToken: md.BearerToken,
XHeaders: md.MetaHeader.GetXHeaders(),
}
if err = c.apeChecker.CheckAPE(ctx, prm); err != nil {
return nil, toStatusErr(err)
}
return c.next.PutSingle(ctx, request)
}
type request interface {
GetMetaHeader() *session.RequestMetaHeader
GetVerificationHeader() *session.RequestVerificationHeader
}
func newMetadata(request request, cnrV2 *refs.ContainerID, objV2 *refs.ObjectID) (md Metadata, err error) {
meta := request.GetMetaHeader()
for origin := meta.GetOrigin(); origin != nil; origin = meta.GetOrigin() {
meta = origin
}
cnrID, objID, err := getAddressParamsSDK(cnrV2, objV2)
if err != nil {
return
}
session, err := readSessionToken(cnrID, objID, meta.GetSessionToken())
if err != nil {
return
}
bearer, err := originalBearerToken(request.GetMetaHeader())
if err != nil {
return
}
md = Metadata{
Container: cnrID,
Object: objID,
VerificationHeader: request.GetVerificationHeader(),
SessionToken: session,
BearerToken: bearer,
}
return
}