forked from TrueCloudLab/frostfs-node
Airat Arifullin
9b13a18aac
* Update version within go.mod; * Fix deprecated frostfs-api-go/v2 package and use frostfs-sdk-go/api instead. Signed-off-by: Airat Arifullin <a.arifullin@yadro.com>
490 lines
14 KiB
Go
490 lines
14 KiB
Go
package ape
|
|
|
|
import (
|
|
"context"
|
|
"encoding/hex"
|
|
"errors"
|
|
"fmt"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
|
objectSvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
|
getsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/get"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/api/refs"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
|
nativeschema "git.frostfs.info/TrueCloudLab/policy-engine/schema/native"
|
|
)
|
|
|
|
var errFailedToCastToRequestContext = errors.New("failed cast to RequestContext")
|
|
|
|
type Service struct {
|
|
apeChecker Checker
|
|
|
|
next objectSvc.ServiceServer
|
|
}
|
|
|
|
var _ objectSvc.ServiceServer = (*Service)(nil)
|
|
|
|
type HeaderProvider interface {
|
|
GetHeader(ctx context.Context, cnr cid.ID, oid oid.ID, local bool) (*objectSDK.Object, error)
|
|
}
|
|
|
|
type storageEngineHeaderProvider struct {
|
|
storageEngine *engine.StorageEngine
|
|
getSvc *getsvc.Service
|
|
}
|
|
|
|
func (p storageEngineHeaderProvider) GetHeader(ctx context.Context, cnr cid.ID, objID oid.ID, local bool) (*objectSDK.Object, error) {
|
|
var addr oid.Address
|
|
addr.SetContainer(cnr)
|
|
addr.SetObject(objID)
|
|
if local {
|
|
return engine.Head(ctx, p.storageEngine, addr)
|
|
}
|
|
w := getsvc.NewSimpleObjectWriter()
|
|
var headPrm getsvc.HeadPrm
|
|
headPrm.WithAddress(addr)
|
|
headPrm.SetHeaderWriter(w)
|
|
headPrm.SetCommonParameters(&util.CommonPrm{}) // default values are ok
|
|
if err := p.getSvc.Head(ctx, headPrm); err != nil {
|
|
return nil, err
|
|
}
|
|
return w.Object(), nil
|
|
}
|
|
|
|
func NewStorageEngineHeaderProvider(e *engine.StorageEngine, s *getsvc.Service) HeaderProvider {
|
|
return storageEngineHeaderProvider{
|
|
storageEngine: e,
|
|
getSvc: s,
|
|
}
|
|
}
|
|
|
|
func NewService(apeChecker Checker, next objectSvc.ServiceServer) *Service {
|
|
return &Service{
|
|
apeChecker: apeChecker,
|
|
next: next,
|
|
}
|
|
}
|
|
|
|
type getStreamBasicChecker struct {
|
|
objectSvc.GetObjectStream
|
|
|
|
apeChecker Checker
|
|
|
|
namespace string
|
|
|
|
senderKey []byte
|
|
|
|
containerOwner user.ID
|
|
|
|
role string
|
|
|
|
softAPECheck bool
|
|
|
|
bearerToken *bearer.Token
|
|
}
|
|
|
|
func (g *getStreamBasicChecker) Send(resp *objectV2.GetResponse) error {
|
|
if partInit, ok := resp.GetBody().GetObjectPart().(*objectV2.GetObjectPartInit); ok {
|
|
cnrID, objID, err := getAddressParamsSDK(partInit.GetHeader().GetContainerID(), partInit.GetObjectID())
|
|
if err != nil {
|
|
return toStatusErr(err)
|
|
}
|
|
|
|
prm := Prm{
|
|
Namespace: g.namespace,
|
|
Container: cnrID,
|
|
Object: objID,
|
|
Header: partInit.GetHeader(),
|
|
Method: nativeschema.MethodGetObject,
|
|
SenderKey: hex.EncodeToString(g.senderKey),
|
|
ContainerOwner: g.containerOwner,
|
|
Role: g.role,
|
|
SoftAPECheck: g.softAPECheck,
|
|
BearerToken: g.bearerToken,
|
|
XHeaders: resp.GetMetaHeader().GetXHeaders(),
|
|
}
|
|
|
|
if err := g.apeChecker.CheckAPE(g.Context(), prm); err != nil {
|
|
return toStatusErr(err)
|
|
}
|
|
}
|
|
return g.GetObjectStream.Send(resp)
|
|
}
|
|
|
|
func requestContext(ctx context.Context) (*objectSvc.RequestContext, error) {
|
|
untyped := ctx.Value(objectSvc.RequestContextKey)
|
|
if untyped == nil {
|
|
return nil, fmt.Errorf("no key %s in context", objectSvc.RequestContextKey)
|
|
}
|
|
rc, ok := untyped.(*objectSvc.RequestContext)
|
|
if !ok {
|
|
return nil, errFailedToCastToRequestContext
|
|
}
|
|
return rc, nil
|
|
}
|
|
|
|
func (c *Service) Get(request *objectV2.GetRequest, stream objectSvc.GetObjectStream) error {
|
|
reqCtx, err := requestContext(stream.Context())
|
|
if err != nil {
|
|
return toStatusErr(err)
|
|
}
|
|
|
|
return c.next.Get(request, &getStreamBasicChecker{
|
|
GetObjectStream: stream,
|
|
apeChecker: c.apeChecker,
|
|
namespace: reqCtx.Namespace,
|
|
senderKey: reqCtx.SenderKey,
|
|
containerOwner: reqCtx.ContainerOwner,
|
|
role: nativeSchemaRole(reqCtx.Role),
|
|
softAPECheck: reqCtx.SoftAPECheck,
|
|
bearerToken: reqCtx.BearerToken,
|
|
})
|
|
}
|
|
|
|
type putStreamBasicChecker struct {
|
|
apeChecker Checker
|
|
|
|
next objectSvc.PutObjectStream
|
|
}
|
|
|
|
func (p *putStreamBasicChecker) Send(ctx context.Context, request *objectV2.PutRequest) error {
|
|
if partInit, ok := request.GetBody().GetObjectPart().(*objectV2.PutObjectPartInit); ok {
|
|
reqCtx, err := requestContext(ctx)
|
|
if err != nil {
|
|
return toStatusErr(err)
|
|
}
|
|
|
|
cnrID, objID, err := getAddressParamsSDK(partInit.GetHeader().GetContainerID(), partInit.GetObjectID())
|
|
if err != nil {
|
|
return toStatusErr(err)
|
|
}
|
|
|
|
prm := Prm{
|
|
Namespace: reqCtx.Namespace,
|
|
Container: cnrID,
|
|
Object: objID,
|
|
Header: partInit.GetHeader(),
|
|
Method: nativeschema.MethodPutObject,
|
|
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
|
ContainerOwner: reqCtx.ContainerOwner,
|
|
Role: nativeSchemaRole(reqCtx.Role),
|
|
SoftAPECheck: reqCtx.SoftAPECheck,
|
|
BearerToken: reqCtx.BearerToken,
|
|
XHeaders: request.GetMetaHeader().GetXHeaders(),
|
|
}
|
|
|
|
if err := p.apeChecker.CheckAPE(ctx, prm); err != nil {
|
|
return toStatusErr(err)
|
|
}
|
|
}
|
|
|
|
return p.next.Send(ctx, request)
|
|
}
|
|
|
|
func (p putStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PutResponse, error) {
|
|
return p.next.CloseAndRecv(ctx)
|
|
}
|
|
|
|
func (c *Service) Put() (objectSvc.PutObjectStream, error) {
|
|
streamer, err := c.next.Put()
|
|
|
|
return &putStreamBasicChecker{
|
|
apeChecker: c.apeChecker,
|
|
next: streamer,
|
|
}, err
|
|
}
|
|
|
|
type patchStreamBasicChecker struct {
|
|
apeChecker Checker
|
|
|
|
next objectSvc.PatchObjectStream
|
|
|
|
nonFirstSend bool
|
|
}
|
|
|
|
func (p *patchStreamBasicChecker) Send(ctx context.Context, request *objectV2.PatchRequest) error {
|
|
if !p.nonFirstSend {
|
|
p.nonFirstSend = true
|
|
|
|
reqCtx, err := requestContext(ctx)
|
|
if err != nil {
|
|
return toStatusErr(err)
|
|
}
|
|
|
|
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
|
|
if err != nil {
|
|
return toStatusErr(err)
|
|
}
|
|
|
|
prm := Prm{
|
|
Namespace: reqCtx.Namespace,
|
|
Container: cnrID,
|
|
Object: objID,
|
|
Method: nativeschema.MethodPatchObject,
|
|
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
|
ContainerOwner: reqCtx.ContainerOwner,
|
|
Role: nativeSchemaRole(reqCtx.Role),
|
|
SoftAPECheck: reqCtx.SoftAPECheck,
|
|
BearerToken: reqCtx.BearerToken,
|
|
XHeaders: request.GetMetaHeader().GetXHeaders(),
|
|
}
|
|
|
|
if err := p.apeChecker.CheckAPE(ctx, prm); err != nil {
|
|
return toStatusErr(err)
|
|
}
|
|
}
|
|
|
|
return p.next.Send(ctx, request)
|
|
}
|
|
|
|
func (p patchStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PatchResponse, error) {
|
|
return p.next.CloseAndRecv(ctx)
|
|
}
|
|
|
|
func (c *Service) Patch() (objectSvc.PatchObjectStream, error) {
|
|
streamer, err := c.next.Patch()
|
|
|
|
return &patchStreamBasicChecker{
|
|
apeChecker: c.apeChecker,
|
|
next: streamer,
|
|
}, err
|
|
}
|
|
|
|
func (c *Service) Head(ctx context.Context, request *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {
|
|
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
reqCtx, err := requestContext(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := c.next.Head(ctx, request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
header := new(objectV2.Header)
|
|
switch headerPart := resp.GetBody().GetHeaderPart().(type) {
|
|
case *objectV2.ShortHeader:
|
|
cidV2 := new(refs.ContainerID)
|
|
cnrID.WriteToV2(cidV2)
|
|
header.SetContainerID(cidV2)
|
|
header.SetVersion(headerPart.GetVersion())
|
|
header.SetCreationEpoch(headerPart.GetCreationEpoch())
|
|
header.SetOwnerID(headerPart.GetOwnerID())
|
|
header.SetObjectType(headerPart.GetObjectType())
|
|
header.SetHomomorphicHash(header.GetHomomorphicHash())
|
|
header.SetPayloadLength(headerPart.GetPayloadLength())
|
|
header.SetPayloadHash(headerPart.GetPayloadHash())
|
|
case *objectV2.HeaderWithSignature:
|
|
header = headerPart.GetHeader()
|
|
default:
|
|
return resp, nil
|
|
}
|
|
|
|
err = c.apeChecker.CheckAPE(ctx, Prm{
|
|
Namespace: reqCtx.Namespace,
|
|
Container: cnrID,
|
|
Object: objID,
|
|
Header: header,
|
|
Method: nativeschema.MethodHeadObject,
|
|
Role: nativeSchemaRole(reqCtx.Role),
|
|
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
|
ContainerOwner: reqCtx.ContainerOwner,
|
|
SoftAPECheck: reqCtx.SoftAPECheck,
|
|
BearerToken: reqCtx.BearerToken,
|
|
XHeaders: request.GetMetaHeader().GetXHeaders(),
|
|
})
|
|
if err != nil {
|
|
return nil, toStatusErr(err)
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (c *Service) Search(request *objectV2.SearchRequest, stream objectSvc.SearchStream) error {
|
|
var cnrID cid.ID
|
|
if cnrV2 := request.GetBody().GetContainerID(); cnrV2 != nil {
|
|
if err := cnrID.ReadFromV2(*cnrV2); err != nil {
|
|
return toStatusErr(err)
|
|
}
|
|
}
|
|
|
|
reqCtx, err := requestContext(stream.Context())
|
|
if err != nil {
|
|
return toStatusErr(err)
|
|
}
|
|
|
|
err = c.apeChecker.CheckAPE(stream.Context(), Prm{
|
|
Namespace: reqCtx.Namespace,
|
|
Container: cnrID,
|
|
Method: nativeschema.MethodSearchObject,
|
|
Role: nativeSchemaRole(reqCtx.Role),
|
|
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
|
ContainerOwner: reqCtx.ContainerOwner,
|
|
SoftAPECheck: reqCtx.SoftAPECheck,
|
|
BearerToken: reqCtx.BearerToken,
|
|
XHeaders: request.GetMetaHeader().GetXHeaders(),
|
|
})
|
|
if err != nil {
|
|
return toStatusErr(err)
|
|
}
|
|
|
|
return c.next.Search(request, stream)
|
|
}
|
|
|
|
func (c *Service) Delete(ctx context.Context, request *objectV2.DeleteRequest) (*objectV2.DeleteResponse, error) {
|
|
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
reqCtx, err := requestContext(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = c.apeChecker.CheckAPE(ctx, Prm{
|
|
Namespace: reqCtx.Namespace,
|
|
Container: cnrID,
|
|
Object: objID,
|
|
Method: nativeschema.MethodDeleteObject,
|
|
Role: nativeSchemaRole(reqCtx.Role),
|
|
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
|
ContainerOwner: reqCtx.ContainerOwner,
|
|
SoftAPECheck: reqCtx.SoftAPECheck,
|
|
BearerToken: reqCtx.BearerToken,
|
|
XHeaders: request.GetMetaHeader().GetXHeaders(),
|
|
})
|
|
if err != nil {
|
|
return nil, toStatusErr(err)
|
|
}
|
|
|
|
resp, err := c.next.Delete(ctx, request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return resp, nil
|
|
}
|
|
|
|
func (c *Service) GetRange(request *objectV2.GetRangeRequest, stream objectSvc.GetObjectRangeStream) error {
|
|
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
|
|
if err != nil {
|
|
return toStatusErr(err)
|
|
}
|
|
|
|
reqCtx, err := requestContext(stream.Context())
|
|
if err != nil {
|
|
return toStatusErr(err)
|
|
}
|
|
|
|
err = c.apeChecker.CheckAPE(stream.Context(), Prm{
|
|
Namespace: reqCtx.Namespace,
|
|
Container: cnrID,
|
|
Object: objID,
|
|
Method: nativeschema.MethodRangeObject,
|
|
Role: nativeSchemaRole(reqCtx.Role),
|
|
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
|
ContainerOwner: reqCtx.ContainerOwner,
|
|
SoftAPECheck: reqCtx.SoftAPECheck,
|
|
BearerToken: reqCtx.BearerToken,
|
|
XHeaders: request.GetMetaHeader().GetXHeaders(),
|
|
})
|
|
if err != nil {
|
|
return toStatusErr(err)
|
|
}
|
|
|
|
return c.next.GetRange(request, stream)
|
|
}
|
|
|
|
func (c *Service) GetRangeHash(ctx context.Context, request *objectV2.GetRangeHashRequest) (*objectV2.GetRangeHashResponse, error) {
|
|
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetAddress().GetContainerID(), request.GetBody().GetAddress().GetObjectID())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
reqCtx, err := requestContext(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
prm := Prm{
|
|
Namespace: reqCtx.Namespace,
|
|
Container: cnrID,
|
|
Object: objID,
|
|
Method: nativeschema.MethodHashObject,
|
|
Role: nativeSchemaRole(reqCtx.Role),
|
|
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
|
ContainerOwner: reqCtx.ContainerOwner,
|
|
SoftAPECheck: reqCtx.SoftAPECheck,
|
|
BearerToken: reqCtx.BearerToken,
|
|
XHeaders: request.GetMetaHeader().GetXHeaders(),
|
|
}
|
|
|
|
resp, err := c.next.GetRangeHash(ctx, request)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err = c.apeChecker.CheckAPE(ctx, prm); err != nil {
|
|
return nil, toStatusErr(err)
|
|
}
|
|
return resp, nil
|
|
}
|
|
|
|
func (c *Service) PutSingle(ctx context.Context, request *objectV2.PutSingleRequest) (*objectV2.PutSingleResponse, error) {
|
|
cnrID, objID, err := getAddressParamsSDK(request.GetBody().GetObject().GetHeader().GetContainerID(), request.GetBody().GetObject().GetObjectID())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
reqCtx, err := requestContext(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
prm := Prm{
|
|
Namespace: reqCtx.Namespace,
|
|
Container: cnrID,
|
|
Object: objID,
|
|
Header: request.GetBody().GetObject().GetHeader(),
|
|
Method: nativeschema.MethodPutObject,
|
|
Role: nativeSchemaRole(reqCtx.Role),
|
|
SenderKey: hex.EncodeToString(reqCtx.SenderKey),
|
|
ContainerOwner: reqCtx.ContainerOwner,
|
|
SoftAPECheck: reqCtx.SoftAPECheck,
|
|
BearerToken: reqCtx.BearerToken,
|
|
XHeaders: request.GetMetaHeader().GetXHeaders(),
|
|
}
|
|
|
|
if err = c.apeChecker.CheckAPE(ctx, prm); err != nil {
|
|
return nil, toStatusErr(err)
|
|
}
|
|
|
|
return c.next.PutSingle(ctx, request)
|
|
}
|
|
|
|
func getAddressParamsSDK(cidV2 *refs.ContainerID, objV2 *refs.ObjectID) (cnrID cid.ID, objID *oid.ID, err error) {
|
|
if cidV2 != nil {
|
|
if err = cnrID.ReadFromV2(*cidV2); err != nil {
|
|
return
|
|
}
|
|
}
|
|
|
|
if objV2 != nil {
|
|
objID = new(oid.ID)
|
|
if err = objID.ReadFromV2(*objV2); err != nil {
|
|
return
|
|
}
|
|
}
|
|
return
|
|
}
|