[#486] node: Add PutSingle wrappers
Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
parent
9be5d44a46
commit
7b76527759
11 changed files with 148 additions and 3 deletions
|
@ -70,6 +70,10 @@ func (s *objectSvc) Put() (objectService.PutObjectStream, error) {
|
||||||
return s.put.Put()
|
return s.put.Put()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *objectSvc) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
||||||
|
return s.put.PutSingle(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *objectSvc) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
func (s *objectSvc) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
||||||
return s.get.Head(ctx, req)
|
return s.get.Head(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
|
@ -110,3 +110,17 @@ func (s *Server) GetRangeHash(ctx context.Context, req *objectGRPC.GetRangeHashR
|
||||||
|
|
||||||
return resp.ToGRPCMessage().(*objectGRPC.GetRangeHashResponse), nil
|
return resp.ToGRPCMessage().(*objectGRPC.GetRangeHashResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) PutSingle(ctx context.Context, req *objectGRPC.PutSingleRequest) (*objectGRPC.PutSingleResponse, error) {
|
||||||
|
putSingleReq := &object.PutSingleRequest{}
|
||||||
|
if err := putSingleReq.FromGRPCMessage(req); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := s.srv.PutSingle(ctx, putSingleReq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.ToGRPCMessage().(*objectGRPC.PutSingleResponse), nil
|
||||||
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||||
|
@ -443,6 +444,65 @@ func (b Service) GetRangeHash(
|
||||||
return b.next.GetRangeHash(ctx, request)
|
return b.next.GetRangeHash(ctx, request)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b Service) PutSingle(ctx context.Context, request *objectV2.PutSingleRequest) (*objectV2.PutSingleResponse, error) {
|
||||||
|
cnr, err := getContainerIDFromRequest(request)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
idV2 := request.GetBody().GetObject().GetHeader().GetOwnerID()
|
||||||
|
if idV2 == nil {
|
||||||
|
return nil, errors.New("missing object owner")
|
||||||
|
}
|
||||||
|
|
||||||
|
var idOwner user.ID
|
||||||
|
|
||||||
|
err = idOwner.ReadFromV2(*idV2)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid object owner: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
obj, err := getObjectIDFromRefObjectID(request.GetBody().GetObject().GetObjectID())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var sTok *sessionSDK.Object
|
||||||
|
sTok, err = readSessionToken(cnr, obj, request.GetMetaHeader().GetSessionToken())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
bTok, err := originalBearerToken(request.GetMetaHeader())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
req := MetaWithToken{
|
||||||
|
vheader: request.GetVerificationHeader(),
|
||||||
|
token: sTok,
|
||||||
|
bearer: bTok,
|
||||||
|
src: request,
|
||||||
|
}
|
||||||
|
|
||||||
|
reqInfo, err := b.findRequestInfo(req, cnr, acl.OpObjectPut)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
reqInfo.obj = obj
|
||||||
|
|
||||||
|
if !b.checker.CheckBasicACL(reqInfo) || !b.checker.StickyBitCheck(reqInfo, idOwner) {
|
||||||
|
return nil, basicACLErr(reqInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := b.checker.CheckEACL(request, reqInfo); err != nil {
|
||||||
|
return nil, eACLErr(reqInfo, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return b.next.PutSingle(ctx, request)
|
||||||
|
}
|
||||||
|
|
||||||
func (p putStreamBasicChecker) Send(ctx context.Context, request *objectV2.PutRequest) error {
|
func (p putStreamBasicChecker) Send(ctx context.Context, request *objectV2.PutRequest) error {
|
||||||
body := request.GetBody()
|
body := request.GetBody()
|
||||||
if body == nil {
|
if body == nil {
|
||||||
|
@ -481,7 +541,7 @@ func (p putStreamBasicChecker) Send(ctx context.Context, request *objectV2.PutRe
|
||||||
}
|
}
|
||||||
|
|
||||||
var sTok *sessionSDK.Object
|
var sTok *sessionSDK.Object
|
||||||
sTok, err = p.readSessionToken(cnr, obj, request)
|
sTok, err = readSessionToken(cnr, obj, request.GetMetaHeader().GetSessionToken())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -515,10 +575,10 @@ func (p putStreamBasicChecker) Send(ctx context.Context, request *objectV2.PutRe
|
||||||
return p.next.Send(ctx, request)
|
return p.next.Send(ctx, request)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p putStreamBasicChecker) readSessionToken(cnr cid.ID, obj *oid.ID, request *objectV2.PutRequest) (*sessionSDK.Object, error) {
|
func readSessionToken(cnr cid.ID, obj *oid.ID, tokV2 *session.Token) (*sessionSDK.Object, error) {
|
||||||
var sTok *sessionSDK.Object
|
var sTok *sessionSDK.Object
|
||||||
|
|
||||||
if tokV2 := request.GetMetaHeader().GetSessionToken(); tokV2 != nil {
|
if tokV2 != nil {
|
||||||
sTok = new(sessionSDK.Object)
|
sTok = new(sessionSDK.Object)
|
||||||
|
|
||||||
err := sTok.ReadFromV2(*tokV2)
|
err := sTok.ReadFromV2(*tokV2)
|
||||||
|
|
|
@ -44,6 +44,8 @@ func getContainerIDFromRequest(req any) (cid.ID, error) {
|
||||||
idV2 = v.GetBody().GetAddress().GetContainerID()
|
idV2 = v.GetBody().GetAddress().GetContainerID()
|
||||||
case *objectV2.GetRangeHashRequest:
|
case *objectV2.GetRangeHashRequest:
|
||||||
idV2 = v.GetBody().GetAddress().GetContainerID()
|
idV2 = v.GetBody().GetAddress().GetContainerID()
|
||||||
|
case *objectV2.PutSingleRequest:
|
||||||
|
idV2 = v.GetBody().GetObject().GetHeader().GetContainerID()
|
||||||
default:
|
default:
|
||||||
return cid.ID{}, errors.New("unknown request type")
|
return cid.ID{}, errors.New("unknown request type")
|
||||||
}
|
}
|
||||||
|
@ -97,6 +99,10 @@ func originalSessionToken(header *sessionV2.RequestMetaHeader) (*sessionSDK.Obje
|
||||||
// object reference's holders. Returns an error if object ID is missing in the request.
|
// object reference's holders. Returns an error if object ID is missing in the request.
|
||||||
func getObjectIDFromRequestBody(body interface{ GetAddress() *refsV2.Address }) (*oid.ID, error) {
|
func getObjectIDFromRequestBody(body interface{ GetAddress() *refsV2.Address }) (*oid.ID, error) {
|
||||||
idV2 := body.GetAddress().GetObjectID()
|
idV2 := body.GetAddress().GetObjectID()
|
||||||
|
return getObjectIDFromRefObjectID(idV2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func getObjectIDFromRefObjectID(idV2 *refsV2.ObjectID) (*oid.ID, error) {
|
||||||
if idV2 == nil {
|
if idV2 == nil {
|
||||||
return nil, errors.New("missing object ID")
|
return nil, errors.New("missing object ID")
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,3 +89,11 @@ func (x *Common) GetRangeHash(ctx context.Context, req *objectV2.GetRangeHashReq
|
||||||
|
|
||||||
return x.nextHandler.GetRangeHash(ctx, req)
|
return x.nextHandler.GetRangeHash(ctx, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (x *Common) PutSingle(ctx context.Context, req *objectV2.PutSingleRequest) (*objectV2.PutSingleResponse, error) {
|
||||||
|
if x.state.IsMaintenance() {
|
||||||
|
return nil, errMaintenance
|
||||||
|
}
|
||||||
|
|
||||||
|
return x.nextHandler.PutSingle(ctx, req)
|
||||||
|
}
|
||||||
|
|
|
@ -76,6 +76,22 @@ func (m MetricCollector) Put() (PutObjectStream, error) {
|
||||||
return m.next.Put()
|
return m.next.Put()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m MetricCollector) PutSingle(ctx context.Context, request *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
||||||
|
if m.enabled {
|
||||||
|
t := time.Now()
|
||||||
|
|
||||||
|
res, err := m.next.PutSingle(ctx, request)
|
||||||
|
|
||||||
|
m.metrics.AddRequestDuration("PutSingle", time.Since(t), err == nil)
|
||||||
|
if err == nil {
|
||||||
|
m.metrics.AddPayloadSize("PutSingle", len(request.GetBody().GetObject().GetPayload()))
|
||||||
|
}
|
||||||
|
|
||||||
|
return res, err
|
||||||
|
}
|
||||||
|
return m.next.PutSingle(ctx, request)
|
||||||
|
}
|
||||||
|
|
||||||
func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) {
|
func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) {
|
||||||
if m.enabled {
|
if m.enabled {
|
||||||
t := time.Now()
|
t := time.Now()
|
||||||
|
|
|
@ -1,8 +1,10 @@
|
||||||
package putsvc
|
package putsvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
objectAPI "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
|
||||||
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
@ -47,6 +49,10 @@ func (s *Service) Put() (object.PutObjectStream, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Service) PutSingle(context.Context, *objectAPI.PutSingleRequest) (*objectAPI.PutSingleResponse, error) {
|
||||||
|
return nil, fmt.Errorf("unimplemented") //TODO
|
||||||
|
}
|
||||||
|
|
||||||
func WithInternalService(v *putsvc.Service) Option {
|
func WithInternalService(v *putsvc.Service) Option {
|
||||||
return func(c *cfg) {
|
return func(c *cfg) {
|
||||||
c.svc = v
|
c.svc = v
|
||||||
|
|
|
@ -87,6 +87,16 @@ func (s *ResponseService) Put() (PutObjectStream, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *ResponseService) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
||||||
|
resp, err := s.svc.PutSingle(ctx, req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.respSvc.SetMeta(resp)
|
||||||
|
return resp, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *ResponseService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
func (s *ResponseService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {
|
||||||
resp, err := s.svc.Head(ctx, req)
|
resp, err := s.svc.Head(ctx, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -41,4 +41,5 @@ type ServiceServer interface {
|
||||||
Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error)
|
Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error)
|
||||||
GetRange(*object.GetRangeRequest, GetObjectRangeStream) error
|
GetRange(*object.GetRangeRequest, GetObjectRangeStream) error
|
||||||
GetRangeHash(context.Context, *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error)
|
GetRangeHash(context.Context, *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error)
|
||||||
|
PutSingle(context.Context, *object.PutSingleRequest) (*object.PutSingleResponse, error)
|
||||||
}
|
}
|
||||||
|
|
|
@ -120,6 +120,22 @@ func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*objec
|
||||||
return resp.(*object.HeadResponse), nil
|
return resp.(*object.HeadResponse), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *SignService) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
||||||
|
resp, err := s.sigSvc.HandleUnaryRequest(ctx, req,
|
||||||
|
func(ctx context.Context, req any) (util.ResponseMessage, error) {
|
||||||
|
return s.svc.PutSingle(ctx, req.(*object.PutSingleRequest))
|
||||||
|
},
|
||||||
|
func() util.ResponseMessage {
|
||||||
|
return new(object.PutSingleResponse)
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.(*object.PutSingleResponse), nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *searchStreamSigner) Send(resp *object.SearchResponse) error {
|
func (s *searchStreamSigner) Send(resp *object.SearchResponse) error {
|
||||||
s.nonEmptyResp = true
|
s.nonEmptyResp = true
|
||||||
return s.respWriter(resp)
|
return s.respWriter(resp)
|
||||||
|
|
|
@ -107,6 +107,10 @@ func (c TransportSplitter) Delete(ctx context.Context, request *object.DeleteReq
|
||||||
return c.next.Delete(ctx, request)
|
return c.next.Delete(ctx, request)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c TransportSplitter) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) {
|
||||||
|
return c.next.PutSingle(ctx, req)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *rangeStreamMsgSizeCtrl) Send(resp *object.GetRangeResponse) error {
|
func (s *rangeStreamMsgSizeCtrl) Send(resp *object.GetRangeResponse) error {
|
||||||
body := resp.GetBody()
|
body := resp.GetBody()
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue