From 7b76527759924bd82c5849c8c8bb451244117d98 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 3 Jul 2023 11:36:20 +0300 Subject: [PATCH] [#486] node: Add PutSingle wrappers Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/object.go | 4 ++ pkg/network/transport/object/grpc/service.go | 14 +++++ pkg/services/object/acl/v2/service.go | 66 +++++++++++++++++++- pkg/services/object/acl/v2/util.go | 6 ++ pkg/services/object/common.go | 8 +++ pkg/services/object/metrics.go | 16 +++++ pkg/services/object/put/v2/service.go | 6 ++ pkg/services/object/response.go | 10 +++ pkg/services/object/server.go | 1 + pkg/services/object/sign.go | 16 +++++ pkg/services/object/transport_splitter.go | 4 ++ 11 files changed, 148 insertions(+), 3 deletions(-) diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 4106f5dc1..1d99d958b 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -70,6 +70,10 @@ func (s *objectSvc) Put() (objectService.PutObjectStream, error) { return s.put.Put() } +func (s *objectSvc) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) { + return s.put.PutSingle(ctx, req) +} + func (s *objectSvc) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { return s.get.Head(ctx, req) } diff --git a/pkg/network/transport/object/grpc/service.go b/pkg/network/transport/object/grpc/service.go index 7fa60f99c..7c6b395d5 100644 --- a/pkg/network/transport/object/grpc/service.go +++ b/pkg/network/transport/object/grpc/service.go @@ -110,3 +110,17 @@ func (s *Server) GetRangeHash(ctx context.Context, req *objectGRPC.GetRangeHashR return resp.ToGRPCMessage().(*objectGRPC.GetRangeHashResponse), nil } + +func (s *Server) PutSingle(ctx context.Context, req *objectGRPC.PutSingleRequest) (*objectGRPC.PutSingleResponse, error) { + putSingleReq := &object.PutSingleRequest{} + if err := putSingleReq.FromGRPCMessage(req); err != nil { + return nil, err + } + + resp, err := s.srv.PutSingle(ctx, putSingleReq) + if err != nil { + return nil, err + } + + return resp.ToGRPCMessage().(*objectGRPC.PutSingleResponse), nil +} diff --git a/pkg/services/object/acl/v2/service.go b/pkg/services/object/acl/v2/service.go index 6544d78d7..c75bd326f 100644 --- a/pkg/services/object/acl/v2/service.go +++ b/pkg/services/object/acl/v2/service.go @@ -6,6 +6,7 @@ import ( "fmt" objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" + "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" @@ -443,6 +444,65 @@ func (b Service) GetRangeHash( return b.next.GetRangeHash(ctx, request) } +func (b Service) PutSingle(ctx context.Context, request *objectV2.PutSingleRequest) (*objectV2.PutSingleResponse, error) { + cnr, err := getContainerIDFromRequest(request) + if err != nil { + return nil, err + } + + idV2 := request.GetBody().GetObject().GetHeader().GetOwnerID() + if idV2 == nil { + return nil, errors.New("missing object owner") + } + + var idOwner user.ID + + err = idOwner.ReadFromV2(*idV2) + if err != nil { + return nil, fmt.Errorf("invalid object owner: %w", err) + } + + obj, err := getObjectIDFromRefObjectID(request.GetBody().GetObject().GetObjectID()) + if err != nil { + return nil, err + } + + var sTok *sessionSDK.Object + sTok, err = readSessionToken(cnr, obj, request.GetMetaHeader().GetSessionToken()) + if err != nil { + return nil, err + } + + bTok, err := originalBearerToken(request.GetMetaHeader()) + if err != nil { + return nil, err + } + + req := MetaWithToken{ + vheader: request.GetVerificationHeader(), + token: sTok, + bearer: bTok, + src: request, + } + + reqInfo, err := b.findRequestInfo(req, cnr, acl.OpObjectPut) + if err != nil { + return nil, err + } + + reqInfo.obj = obj + + if !b.checker.CheckBasicACL(reqInfo) || !b.checker.StickyBitCheck(reqInfo, idOwner) { + return nil, basicACLErr(reqInfo) + } + + if err := b.checker.CheckEACL(request, reqInfo); err != nil { + return nil, eACLErr(reqInfo, err) + } + + return b.next.PutSingle(ctx, request) +} + func (p putStreamBasicChecker) Send(ctx context.Context, request *objectV2.PutRequest) error { body := request.GetBody() if body == nil { @@ -481,7 +541,7 @@ func (p putStreamBasicChecker) Send(ctx context.Context, request *objectV2.PutRe } var sTok *sessionSDK.Object - sTok, err = p.readSessionToken(cnr, obj, request) + sTok, err = readSessionToken(cnr, obj, request.GetMetaHeader().GetSessionToken()) if err != nil { return err } @@ -515,10 +575,10 @@ func (p putStreamBasicChecker) Send(ctx context.Context, request *objectV2.PutRe return p.next.Send(ctx, request) } -func (p putStreamBasicChecker) readSessionToken(cnr cid.ID, obj *oid.ID, request *objectV2.PutRequest) (*sessionSDK.Object, error) { +func readSessionToken(cnr cid.ID, obj *oid.ID, tokV2 *session.Token) (*sessionSDK.Object, error) { var sTok *sessionSDK.Object - if tokV2 := request.GetMetaHeader().GetSessionToken(); tokV2 != nil { + if tokV2 != nil { sTok = new(sessionSDK.Object) err := sTok.ReadFromV2(*tokV2) diff --git a/pkg/services/object/acl/v2/util.go b/pkg/services/object/acl/v2/util.go index cd45b63fc..feda6a3cf 100644 --- a/pkg/services/object/acl/v2/util.go +++ b/pkg/services/object/acl/v2/util.go @@ -44,6 +44,8 @@ func getContainerIDFromRequest(req any) (cid.ID, error) { idV2 = v.GetBody().GetAddress().GetContainerID() case *objectV2.GetRangeHashRequest: idV2 = v.GetBody().GetAddress().GetContainerID() + case *objectV2.PutSingleRequest: + idV2 = v.GetBody().GetObject().GetHeader().GetContainerID() default: return cid.ID{}, errors.New("unknown request type") } @@ -97,6 +99,10 @@ func originalSessionToken(header *sessionV2.RequestMetaHeader) (*sessionSDK.Obje // object reference's holders. Returns an error if object ID is missing in the request. func getObjectIDFromRequestBody(body interface{ GetAddress() *refsV2.Address }) (*oid.ID, error) { idV2 := body.GetAddress().GetObjectID() + return getObjectIDFromRefObjectID(idV2) +} + +func getObjectIDFromRefObjectID(idV2 *refsV2.ObjectID) (*oid.ID, error) { if idV2 == nil { return nil, errors.New("missing object ID") } diff --git a/pkg/services/object/common.go b/pkg/services/object/common.go index 5b139d8eb..0d39dce0b 100644 --- a/pkg/services/object/common.go +++ b/pkg/services/object/common.go @@ -89,3 +89,11 @@ func (x *Common) GetRangeHash(ctx context.Context, req *objectV2.GetRangeHashReq return x.nextHandler.GetRangeHash(ctx, req) } + +func (x *Common) PutSingle(ctx context.Context, req *objectV2.PutSingleRequest) (*objectV2.PutSingleResponse, error) { + if x.state.IsMaintenance() { + return nil, errMaintenance + } + + return x.nextHandler.PutSingle(ctx, req) +} diff --git a/pkg/services/object/metrics.go b/pkg/services/object/metrics.go index 487374940..f972f43ae 100644 --- a/pkg/services/object/metrics.go +++ b/pkg/services/object/metrics.go @@ -76,6 +76,22 @@ func (m MetricCollector) Put() (PutObjectStream, error) { return m.next.Put() } +func (m MetricCollector) PutSingle(ctx context.Context, request *object.PutSingleRequest) (*object.PutSingleResponse, error) { + if m.enabled { + t := time.Now() + + res, err := m.next.PutSingle(ctx, request) + + m.metrics.AddRequestDuration("PutSingle", time.Since(t), err == nil) + if err == nil { + m.metrics.AddPayloadSize("PutSingle", len(request.GetBody().GetObject().GetPayload())) + } + + return res, err + } + return m.next.PutSingle(ctx, request) +} + func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) { if m.enabled { t := time.Now() diff --git a/pkg/services/object/put/v2/service.go b/pkg/services/object/put/v2/service.go index 656f8df9c..5af62cd40 100644 --- a/pkg/services/object/put/v2/service.go +++ b/pkg/services/object/put/v2/service.go @@ -1,8 +1,10 @@ package putsvc import ( + "context" "fmt" + objectAPI "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util" @@ -47,6 +49,10 @@ func (s *Service) Put() (object.PutObjectStream, error) { }, nil } +func (s *Service) PutSingle(context.Context, *objectAPI.PutSingleRequest) (*objectAPI.PutSingleResponse, error) { + return nil, fmt.Errorf("unimplemented") //TODO +} + func WithInternalService(v *putsvc.Service) Option { return func(c *cfg) { c.svc = v diff --git a/pkg/services/object/response.go b/pkg/services/object/response.go index 81d6aaaaf..a10f26a34 100644 --- a/pkg/services/object/response.go +++ b/pkg/services/object/response.go @@ -87,6 +87,16 @@ func (s *ResponseService) Put() (PutObjectStream, error) { }, nil } +func (s *ResponseService) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) { + resp, err := s.svc.PutSingle(ctx, req) + if err != nil { + return nil, err + } + + s.respSvc.SetMeta(resp) + return resp, nil +} + func (s *ResponseService) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { resp, err := s.svc.Head(ctx, req) if err != nil { diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index ccce9c4f4..73b88f233 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -41,4 +41,5 @@ type ServiceServer interface { Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error) GetRange(*object.GetRangeRequest, GetObjectRangeStream) error GetRangeHash(context.Context, *object.GetRangeHashRequest) (*object.GetRangeHashResponse, error) + PutSingle(context.Context, *object.PutSingleRequest) (*object.PutSingleResponse, error) } diff --git a/pkg/services/object/sign.go b/pkg/services/object/sign.go index 9d66c76ba..5b3578e29 100644 --- a/pkg/services/object/sign.go +++ b/pkg/services/object/sign.go @@ -120,6 +120,22 @@ func (s *SignService) Head(ctx context.Context, req *object.HeadRequest) (*objec return resp.(*object.HeadResponse), nil } +func (s *SignService) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) { + resp, err := s.sigSvc.HandleUnaryRequest(ctx, req, + func(ctx context.Context, req any) (util.ResponseMessage, error) { + return s.svc.PutSingle(ctx, req.(*object.PutSingleRequest)) + }, + func() util.ResponseMessage { + return new(object.PutSingleResponse) + }, + ) + if err != nil { + return nil, err + } + + return resp.(*object.PutSingleResponse), nil +} + func (s *searchStreamSigner) Send(resp *object.SearchResponse) error { s.nonEmptyResp = true return s.respWriter(resp) diff --git a/pkg/services/object/transport_splitter.go b/pkg/services/object/transport_splitter.go index a7d1c486a..2d9810cd3 100644 --- a/pkg/services/object/transport_splitter.go +++ b/pkg/services/object/transport_splitter.go @@ -107,6 +107,10 @@ func (c TransportSplitter) Delete(ctx context.Context, request *object.DeleteReq return c.next.Delete(ctx, request) } +func (c TransportSplitter) PutSingle(ctx context.Context, req *object.PutSingleRequest) (*object.PutSingleResponse, error) { + return c.next.PutSingle(ctx, req) +} + func (s *rangeStreamMsgSizeCtrl) Send(resp *object.GetRangeResponse) error { body := resp.GetBody()