forked from TrueCloudLab/frostfs-node
[#872] object: Introduce APE middlewar for object service
Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
This commit is contained in:
parent
e43609c616
commit
c8baf76fae
13 changed files with 1456 additions and 175 deletions
404
pkg/services/object/ape/service.go
Normal file
404
pkg/services/object/ape/service.go
Normal file
|
@ -0,0 +1,404 @@
|
|||
package ape
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/hex"
|
||||
"fmt"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
nativeschema "git.frostfs.info/TrueCloudLab/policy-engine/schema/native"
|
||||
)
|
||||
|
||||
type Service struct {
|
||||
log *logger.Logger
|
||||
|
||||
apeChecker Checker
|
||||
|
||||
next objectSvc.ServiceServer
|
||||
}
|
||||
|
||||
var _ objectSvc.ServiceServer = (*Service)(nil)
|
||||
|
||||
type HeaderProvider interface {
|
||||
GetHeader(ctx context.Context, cnr cid.ID, oid oid.ID) (*objectSDK.Object, error)
|
||||
}
|
||||
|
||||
type storageEngineHeaderProvider struct {
|
||||
storageEngine *engine.StorageEngine
|
||||
}
|
||||
|
||||
func (p storageEngineHeaderProvider) GetHeader(ctx context.Context, cnr cid.ID, objID oid.ID) (*objectSDK.Object, error) {
|
||||
var addr oid.Address
|
||||
addr.SetContainer(cnr)
|
||||
addr.SetObject(objID)
|
||||
return engine.Head(ctx, p.storageEngine, addr)
|
||||
}
|
||||
|
||||
func NewStorageEngineHeaderProvider(e *engine.StorageEngine) HeaderProvider {
|
||||
return storageEngineHeaderProvider{
|
||||
storageEngine: e,
|
||||
}
|
||||
}
|
||||
|
||||
func NewService(log *logger.Logger, apeChecker Checker, next objectSvc.ServiceServer) *Service {
|
||||
return &Service{
|
||||
log: log,
|
||||
apeChecker: apeChecker,
|
||||
next: next,
|
||||
}
|
||||
}
|
||||
|
||||
type getStreamBasicChecker struct {
|
||||
objectSvc.GetObjectStream
|
||||
|
||||
apeChecker Checker
|
||||
|
||||
senderKey []byte
|
||||
|
||||
role string
|
||||
}
|
||||
|
||||
func (g *getStreamBasicChecker) Send(resp *objectV2.GetResponse) error {
|
||||
if partInit, ok := resp.GetBody().GetObjectPart().(*objectV2.GetObjectPartInit); ok {
|
||||
cnrID, objID, err := getAddressParamsSDK(partInit.GetHeader().GetContainerID(), partInit.GetObjectID())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
prm := Prm{
|
||||
Container: cnrID,
|
||||
Object: objID,
|
||||
Header: partInit.GetHeader(),
|
||||
Method: nativeschema.MethodGetContainer,
|
||||
SenderKey: hex.EncodeToString(g.senderKey),
|
||||
Role: g.role,
|
||||
}
|
||||
|
||||
if err := g.apeChecker.CheckAPE(g.Context(), prm); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return g.GetObjectStream.Send(resp)
|
||||
}
|
||||
|
||||
func requestContext(ctx context.Context) (*objectSvc.RequestContext, error) {
|
||||
untyped := ctx.Value(objectSvc.RequestContextKey)
|
||||
if untyped == nil {
|
||||
return nil, fmt.Errorf("no key %s in context", objectSvc.RequestContextKey)
|
||||
}
|
||||
rc, ok := untyped.(*objectSvc.RequestContext)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("failed cast to RequestContext")
|
||||
}
|
||||
return rc, nil
|
||||
}
|
||||
|
||||
func (c *Service) Get(request *objectV2.GetRequest, stream objectSvc.GetObjectStream) error {
|
||||
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reqCtx, err := requestContext(stream.Context())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = c.apeChecker.CheckAPE(stream.Context(), Prm{
|
||||
Namespace: reqCtx.Namespace,
|
||||
Container: cnrID,
|
||||
Object: objID,
|
||||
Method: nativeschema.MethodGetObject,
|
||||
Role: nativeSchemaRole(reqCtx.Role),
|
||||
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.next.Get(request, &getStreamBasicChecker{
|
||||
GetObjectStream: stream,
|
||||
apeChecker: c.apeChecker,
|
||||
})
|
||||
}
|
||||
|
||||
type putStreamBasicChecker struct {
|
||||
apeChecker Checker
|
||||
|
||||
next objectSvc.PutObjectStream
|
||||
}
|
||||
|
||||
func (p *putStreamBasicChecker) Send(ctx context.Context, request *objectV2.PutRequest) error {
|
||||
if partInit, ok := request.GetBody().GetObjectPart().(*objectV2.PutObjectPartInit); ok {
|
||||
reqCtx, err := requestContext(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cnrID, objID, err := getAddressParamsSDK(partInit.GetHeader().GetContainerID(), partInit.GetObjectID())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
prm := Prm{
|
||||
Namespace: reqCtx.Namespace,
|
||||
Container: cnrID,
|
||||
Object: objID,
|
||||
Header: partInit.GetHeader(),
|
||||
Method: nativeschema.MethodPutObject,
|
||||
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
||||
Role: nativeSchemaRole(reqCtx.Role),
|
||||
}
|
||||
|
||||
if err := p.apeChecker.CheckAPE(ctx, prm); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return p.next.Send(ctx, request)
|
||||
}
|
||||
|
||||
func (p putStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PutResponse, error) {
|
||||
return p.next.CloseAndRecv(ctx)
|
||||
}
|
||||
|
||||
func (c *Service) Put() (objectSvc.PutObjectStream, error) {
|
||||
streamer, err := c.next.Put()
|
||||
|
||||
return &putStreamBasicChecker{
|
||||
apeChecker: c.apeChecker,
|
||||
next: streamer,
|
||||
}, err
|
||||
}
|
||||
|
||||
func (c *Service) Head(ctx context.Context, request *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
|
||||
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reqCtx, err := requestContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = c.apeChecker.CheckAPE(ctx, Prm{
|
||||
Namespace: reqCtx.Namespace,
|
||||
Container: cnrID,
|
||||
Object: objID,
|
||||
Method: nativeschema.MethodHeadObject,
|
||||
Role: nativeSchemaRole(reqCtx.Role),
|
||||
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := c.next.Head(ctx, request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
header := new(objectV2.Header)
|
||||
switch headerPart := resp.GetBody().GetHeaderPart().(type) {
|
||||
case *objectV2.ShortHeader:
|
||||
cidV2 := new(refs.ContainerID)
|
||||
cnrID.WriteToV2(cidV2)
|
||||
header.SetContainerID(cidV2)
|
||||
header.SetVersion(headerPart.GetVersion())
|
||||
header.SetCreationEpoch(headerPart.GetCreationEpoch())
|
||||
header.SetOwnerID(headerPart.GetOwnerID())
|
||||
header.SetObjectType(headerPart.GetObjectType())
|
||||
header.SetHomomorphicHash(header.GetHomomorphicHash())
|
||||
header.SetPayloadLength(headerPart.GetPayloadLength())
|
||||
header.SetPayloadHash(headerPart.GetPayloadHash())
|
||||
case *objectV2.HeaderWithSignature:
|
||||
header = headerPart.GetHeader()
|
||||
default:
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
err = c.apeChecker.CheckAPE(ctx, Prm{
|
||||
Namespace: reqCtx.Namespace,
|
||||
Container: cnrID,
|
||||
Object: objID,
|
||||
Header: header,
|
||||
Method: nativeschema.MethodHeadObject,
|
||||
Role: nativeSchemaRole(reqCtx.Role),
|
||||
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (c *Service) Search(request *objectV2.SearchRequest, stream objectSvc.SearchStream) error {
|
||||
var cnrID cid.ID
|
||||
if cnrV2 := request.GetBody().GetContainerID(); cnrV2 != nil {
|
||||
if err := cnrID.ReadFromV2(*cnrV2); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
reqCtx, err := requestContext(stream.Context())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = c.apeChecker.CheckAPE(stream.Context(), Prm{
|
||||
Namespace: reqCtx.Namespace,
|
||||
Container: cnrID,
|
||||
Method: nativeschema.MethodSearchObject,
|
||||
Role: nativeSchemaRole(reqCtx.Role),
|
||||
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.next.Search(request, stream)
|
||||
}
|
||||
|
||||
func (c *Service) Delete(ctx context.Context, request *objectV2.DeleteRequest) (*objectV2.DeleteResponse, error) {
|
||||
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reqCtx, err := requestContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = c.apeChecker.CheckAPE(ctx, Prm{
|
||||
Namespace: reqCtx.Namespace,
|
||||
Container: cnrID,
|
||||
Object: objID,
|
||||
Method: nativeschema.MethodDeleteObject,
|
||||
Role: nativeSchemaRole(reqCtx.Role),
|
||||
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := c.next.Delete(ctx, request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (c *Service) GetRange(request *objectV2.GetRangeRequest, stream objectSvc.GetObjectRangeStream) error {
|
||||
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
reqCtx, err := requestContext(stream.Context())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = c.apeChecker.CheckAPE(stream.Context(), Prm{
|
||||
Namespace: reqCtx.Namespace,
|
||||
Container: cnrID,
|
||||
Object: objID,
|
||||
Method: nativeschema.MethodRangeObject,
|
||||
Role: nativeSchemaRole(reqCtx.Role),
|
||||
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return c.next.GetRange(request, stream)
|
||||
}
|
||||
|
||||
func (c *Service) GetRangeHash(ctx context.Context, request *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) {
|
||||
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reqCtx, err := requestContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
prm := Prm{
|
||||
Namespace: reqCtx.Namespace,
|
||||
Container: cnrID,
|
||||
Object: objID,
|
||||
Method: nativeschema.MethodHashObject,
|
||||
Role: nativeSchemaRole(reqCtx.Role),
|
||||
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
||||
}
|
||||
|
||||
if err = c.apeChecker.CheckAPE(ctx, prm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := c.next.GetRangeHash(ctx, request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err = c.apeChecker.CheckAPE(ctx, prm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (c *Service) PutSingle(ctx context.Context, request *objectV2.PutSingleRequest) (*objectV2.PutSingleResponse, error) {
|
||||
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetObject().GetHeader().GetContainerID(), request.GetBody().GetObject().GetObjectID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
reqCtx, err := requestContext(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
prm := Prm{
|
||||
Namespace: reqCtx.Namespace,
|
||||
Container: cnrID,
|
||||
Object: objID,
|
||||
Header: request.GetBody().GetObject().GetHeader(),
|
||||
Method: nativeschema.MethodPutObject,
|
||||
Role: nativeSchemaRole(reqCtx.Role),
|
||||
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
||||
}
|
||||
|
||||
if err = c.apeChecker.CheckAPE(ctx, prm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return c.next.PutSingle(ctx, request)
|
||||
}
|
||||
|
||||
func getAddressParamsSDK(cidV2 *refs.ContainerID, objV2 *refs.ObjectID) (cnrID cid.ID, objID *oid.ID, err error) {
|
||||
if cidV2 != nil {
|
||||
if err = cnrID.ReadFromV2(*cidV2); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if objV2 != nil {
|
||||
objID = new(oid.ID)
|
||||
if err = objID.ReadFromV2(*objV2); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue