From 27bdddc48f1ae68d9eee638b28906fab1ee22938 Mon Sep 17 00:00:00 2001 From: Dmitrii Stepanov Date: Mon, 3 Apr 2023 14:23:53 +0300 Subject: [PATCH] [#199] putsvc: Refactor put object Resolve containedctx linter for streamer and remote target Signed-off-by: Dmitrii Stepanov --- cmd/frostfs-node/object.go | 4 +- pkg/network/transport/object/grpc/service.go | 8 +-- pkg/services/object/acl/v2/service.go | 12 ++--- pkg/services/object/common.go | 4 +- pkg/services/object/delete/util.go | 6 +-- pkg/services/object/metrics.go | 14 ++--- pkg/services/object/put/distributed.go | 23 ++++---- pkg/services/object/put/local.go | 3 +- pkg/services/object/put/remote.go | 10 ++-- pkg/services/object/put/service.go | 5 +- pkg/services/object/put/streamer.go | 12 ++--- pkg/services/object/put/v2/service.go | 5 +- pkg/services/object/put/v2/streamer.go | 9 ++-- pkg/services/object/put/validation.go | 9 ++-- pkg/services/object/response.go | 20 +++---- pkg/services/object/server.go | 6 +-- pkg/services/object/sign.go | 20 +++---- pkg/services/object/transport_splitter.go | 4 +- .../object_manager/transformer/fmt.go | 9 ++-- .../object_manager/transformer/transformer.go | 34 ++++++------ .../object_manager/transformer/types.go | 6 +-- .../object_manager/transformer/writer.go | 52 +++++++++++++++++++ pkg/services/util/response/client_stream.go | 9 ++-- pkg/services/util/sign.go | 12 ++--- 24 files changed, 171 insertions(+), 125 deletions(-) create mode 100644 pkg/services/object_manager/transformer/writer.go diff --git a/cmd/frostfs-node/object.go b/cmd/frostfs-node/object.go index 8680aac28..3b6bdcc7d 100644 --- a/cmd/frostfs-node/object.go +++ b/cmd/frostfs-node/object.go @@ -70,8 +70,8 @@ func (c *cfg) MaxObjectSize() uint64 { return sz } -func (s *objectSvc) Put(ctx context.Context) (objectService.PutObjectStream, error) { - return s.put.Put(ctx) +func (s *objectSvc) Put() (objectService.PutObjectStream, error) { + return s.put.Put() } func (s *objectSvc) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { diff --git a/pkg/network/transport/object/grpc/service.go b/pkg/network/transport/object/grpc/service.go index 82e323a3c..7fa60f99c 100644 --- a/pkg/network/transport/object/grpc/service.go +++ b/pkg/network/transport/object/grpc/service.go @@ -26,7 +26,7 @@ func New(c objectSvc.ServiceServer) *Server { // Put opens internal Object service Put stream and overtakes data from gRPC stream to it. func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error { - stream, err := s.srv.Put(gStream.Context()) + stream, err := s.srv.Put() if err != nil { return err } @@ -35,7 +35,7 @@ func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error { req, err := gStream.Recv() if err != nil { if errors.Is(err, io.EOF) { - resp, err := stream.CloseAndRecv() + resp, err := stream.CloseAndRecv(gStream.Context()) if err != nil { return err } @@ -51,9 +51,9 @@ func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error { return err } - if err := stream.Send(putReq); err != nil { + if err := stream.Send(gStream.Context(), putReq); err != nil { if errors.Is(err, util.ErrAbortStream) { - resp, err := stream.CloseAndRecv() + resp, err := stream.CloseAndRecv(gStream.Context()) if err != nil { return err } diff --git a/pkg/services/object/acl/v2/service.go b/pkg/services/object/acl/v2/service.go index 6bf8c4405..1e451a99f 100644 --- a/pkg/services/object/acl/v2/service.go +++ b/pkg/services/object/acl/v2/service.go @@ -165,8 +165,8 @@ func (b Service) Get(request *objectV2.GetRequest, stream object.GetObjectStream }) } -func (b Service) Put(ctx context.Context) (object.PutObjectStream, error) { - streamer, err := b.next.Put(ctx) +func (b Service) Put() (object.PutObjectStream, error) { + streamer, err := b.next.Put() return putStreamBasicChecker{ source: &b, @@ -444,7 +444,7 @@ func (b Service) GetRangeHash( } // nolint: funlen -func (p putStreamBasicChecker) Send(request *objectV2.PutRequest) error { +func (p putStreamBasicChecker) Send(ctx context.Context, request *objectV2.PutRequest) error { body := request.GetBody() if body == nil { return errEmptyBody @@ -531,11 +531,11 @@ func (p putStreamBasicChecker) Send(request *objectV2.PutRequest) error { } } - return p.next.Send(request) + return p.next.Send(ctx, request) } -func (p putStreamBasicChecker) CloseAndRecv() (*objectV2.PutResponse, error) { - return p.next.CloseAndRecv() +func (p putStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PutResponse, error) { + return p.next.CloseAndRecv(ctx) } func (g *getStreamBasicChecker) Send(resp *objectV2.GetResponse) error { diff --git a/pkg/services/object/common.go b/pkg/services/object/common.go index e797f1a64..5b139d8eb 100644 --- a/pkg/services/object/common.go +++ b/pkg/services/object/common.go @@ -42,12 +42,12 @@ func (x *Common) Get(req *objectV2.GetRequest, stream GetObjectStream) error { return x.nextHandler.Get(req, stream) } -func (x *Common) Put(ctx context.Context) (PutObjectStream, error) { +func (x *Common) Put() (PutObjectStream, error) { if x.state.IsMaintenance() { return nil, errMaintenance } - return x.nextHandler.Put(ctx) + return x.nextHandler.Put() } func (x *Common) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV2.HeadResponse, error) { diff --git a/pkg/services/object/delete/util.go b/pkg/services/object/delete/util.go index a8ebb3065..cc5433740 100644 --- a/pkg/services/object/delete/util.go +++ b/pkg/services/object/delete/util.go @@ -108,7 +108,7 @@ func (s *simpleIDWriter) WriteIDs(ids []oid.ID) error { } func (w *putSvcWrapper) put(exec *execCtx) (*oid.ID, error) { - streamer, err := (*putsvc.Service)(w).Put(exec.context()) + streamer, err := (*putsvc.Service)(w).Put() if err != nil { return nil, err } @@ -124,12 +124,12 @@ func (w *putSvcWrapper) put(exec *execCtx) (*oid.ID, error) { return nil, err } - err = streamer.SendChunk(new(putsvc.PutChunkPrm).WithChunk(payload)) + err = streamer.SendChunk(exec.context(), new(putsvc.PutChunkPrm).WithChunk(payload)) if err != nil { return nil, err } - r, err := streamer.Close() + r, err := streamer.Close(exec.context()) if err != nil { return nil, err } diff --git a/pkg/services/object/metrics.go b/pkg/services/object/metrics.go index 9f15e834a..3ea16dafd 100644 --- a/pkg/services/object/metrics.go +++ b/pkg/services/object/metrics.go @@ -75,11 +75,11 @@ func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) (er return } -func (m MetricCollector) Put(ctx context.Context) (PutObjectStream, error) { +func (m MetricCollector) Put() (PutObjectStream, error) { if m.enabled { t := time.Now() - stream, err := m.next.Put(ctx) + stream, err := m.next.Put() if err != nil { return nil, err } @@ -90,7 +90,7 @@ func (m MetricCollector) Put(ctx context.Context) (PutObjectStream, error) { start: t, }, nil } - return m.next.Put(ctx) + return m.next.Put() } func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) { @@ -179,17 +179,17 @@ func (s getStreamMetric) Send(resp *object.GetResponse) error { return s.stream.Send(resp) } -func (s putStreamMetric) Send(req *object.PutRequest) error { +func (s putStreamMetric) Send(ctx context.Context, req *object.PutRequest) error { chunk, ok := req.GetBody().GetObjectPart().(*object.PutObjectPartChunk) if ok { s.metrics.AddPutPayload(len(chunk.GetChunk())) } - return s.stream.Send(req) + return s.stream.Send(ctx, req) } -func (s putStreamMetric) CloseAndRecv() (*object.PutResponse, error) { - res, err := s.stream.CloseAndRecv() +func (s putStreamMetric) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) { + res, err := s.stream.CloseAndRecv(ctx) s.metrics.IncPutReqCounter(err == nil) s.metrics.AddPutReqDuration(time.Since(s.start)) diff --git a/pkg/services/object/put/distributed.go b/pkg/services/object/put/distributed.go index e4566157e..d8b59487e 100644 --- a/pkg/services/object/put/distributed.go +++ b/pkg/services/object/put/distributed.go @@ -1,6 +1,7 @@ package putsvc import ( + "context" "fmt" "sync" "sync/atomic" @@ -17,7 +18,7 @@ import ( type preparedObjectTarget interface { WriteObject(*objectSDK.Object, object.ContentMeta) error - Close() (*transformer.AccessIdentifiers, error) + Close(ctx context.Context) (*transformer.AccessIdentifiers, error) } type distributedTarget struct { @@ -121,13 +122,13 @@ func (t *distributedTarget) WriteHeader(obj *objectSDK.Object) error { return nil } -func (t *distributedTarget) Write(p []byte) (n int, err error) { +func (t *distributedTarget) Write(_ context.Context, p []byte) (n int, err error) { t.payload.Data = append(t.payload.Data, p...) return len(p), nil } -func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) { +func (t *distributedTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) { defer func() { putPayload(t.payload) t.payload = nil @@ -146,10 +147,10 @@ func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) { t.traversal.extraBroadcastEnabled = true } - return t.iteratePlacement(t.sendObject) + return t.iteratePlacement(ctx) } -func (t *distributedTarget) sendObject(node nodeDesc) error { +func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error { if !node.local && t.relay != nil { return t.relay(node) } @@ -158,13 +159,13 @@ func (t *distributedTarget) sendObject(node nodeDesc) error { if err := target.WriteObject(t.obj, t.objMeta); err != nil { return fmt.Errorf("could not write header: %w", err) - } else if _, err := target.Close(); err != nil { + } else if _, err := target.Close(ctx); err != nil { return fmt.Errorf("could not close object stream: %w", err) } return nil } -func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transformer.AccessIdentifiers, error) { +func (t *distributedTarget) iteratePlacement(ctx context.Context) (*transformer.AccessIdentifiers, error) { id, _ := t.obj.ID() traverser, err := placement.NewTraverser( @@ -182,7 +183,7 @@ func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transform break } - if t.iterateAddresses(traverser, addrs, f, resErr) { + if t.iterateAddresses(ctx, traverser, addrs, resErr) { break } } @@ -195,7 +196,7 @@ func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transform // perform additional container broadcast if needed if t.traversal.submitPrimaryPlacementFinish() { - _, err = t.iteratePlacement(f) + _, err = t.iteratePlacement(ctx) if err != nil { t.log.Error("additional container broadcast failure", zap.Error(err)) // we don't fail primary operation because of broadcast failure @@ -208,7 +209,7 @@ func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transform WithSelfID(id), nil } -func (t *distributedTarget) iterateAddresses(traverser *placement.Traverser, addrs []placement.Node, f func(nodeDesc) error, resErr *atomic.Value) bool { +func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *placement.Traverser, addrs []placement.Node, resErr *atomic.Value) bool { wg := &sync.WaitGroup{} for i := range addrs { @@ -230,7 +231,7 @@ func (t *distributedTarget) iterateAddresses(traverser *placement.Traverser, add if err := workerPool.Submit(func() { defer wg.Done() - err := f(nodeDesc{local: isLocal, info: addr}) + err := t.sendObject(ctx, nodeDesc{local: isLocal, info: addr}) // mark the container node as processed in order to exclude it // in subsequent container broadcast. Note that we don't diff --git a/pkg/services/object/put/local.go b/pkg/services/object/put/local.go index f344f77e9..12e3a2eee 100644 --- a/pkg/services/object/put/local.go +++ b/pkg/services/object/put/local.go @@ -1,6 +1,7 @@ package putsvc import ( + "context" "fmt" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" @@ -38,7 +39,7 @@ func (t *localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMet return nil } -func (t *localTarget) Close() (*transformer.AccessIdentifiers, error) { +func (t *localTarget) Close(_ context.Context) (*transformer.AccessIdentifiers, error) { switch t.meta.Type() { case object.TypeTombstone: err := t.storage.Delete(objectCore.AddressOf(t.obj), t.meta.Objects()) diff --git a/pkg/services/object/put/remote.go b/pkg/services/object/put/remote.go index 760de7508..6933abca6 100644 --- a/pkg/services/object/put/remote.go +++ b/pkg/services/object/put/remote.go @@ -15,10 +15,7 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" ) -// nolint: containedctx type remoteTarget struct { - ctx context.Context - privateKey *ecdsa.PrivateKey commonPrm *util.CommonPrm @@ -51,7 +48,7 @@ func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta) return nil } -func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) { +func (t *remoteTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) { c, err := t.clientConstructor.Get(t.nodeInfo) if err != nil { return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err) @@ -59,7 +56,7 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) { var prm internalclient.PutObjectPrm - prm.SetContext(t.ctx) + prm.SetContext(ctx) prm.SetClient(c) prm.SetPrivateKey(t.privateKey) prm.SetSessionToken(t.commonPrm.SessionToken()) @@ -110,7 +107,6 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error { } t := &remoteTarget{ - ctx: ctx, privateKey: key, clientConstructor: s.clientConstructor, } @@ -122,7 +118,7 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error { if err := t.WriteObject(p.obj, objectcore.ContentMeta{}); err != nil { return fmt.Errorf("(%T) could not send object header: %w", s, err) - } else if _, err := t.Close(); err != nil { + } else if _, err := t.Close(ctx); err != nil { return fmt.Errorf("(%T) could not send object: %w", s, err) } diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index b74c97d49..567a3fea1 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -1,8 +1,6 @@ package putsvc import ( - "context" - "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" @@ -79,10 +77,9 @@ func NewService(opts ...Option) *Service { } } -func (p *Service) Put(ctx context.Context) (*Streamer, error) { +func (p *Service) Put() (*Streamer, error) { return &Streamer{ cfg: p.cfg, - ctx: ctx, }, nil } diff --git a/pkg/services/object/put/streamer.go b/pkg/services/object/put/streamer.go index 915b718a3..678cff572 100644 --- a/pkg/services/object/put/streamer.go +++ b/pkg/services/object/put/streamer.go @@ -16,12 +16,9 @@ import ( "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" ) -// nolint: containedctx type Streamer struct { *cfg - ctx context.Context - sessionKey *ecdsa.PrivateKey target transformer.ObjectTarget @@ -232,7 +229,6 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { } rt := &remoteTarget{ - ctx: p.ctx, privateKey: p.sessionKey, commonPrm: prm.common, clientConstructor: p.clientConstructor, @@ -250,24 +246,24 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget { } } -func (p *Streamer) SendChunk(prm *PutChunkPrm) error { +func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error { if p.target == nil { return errNotInit } - if _, err := p.target.Write(prm.chunk); err != nil { + if _, err := p.target.Write(ctx, prm.chunk); err != nil { return fmt.Errorf("(%T) could not write payload chunk to target: %w", p, err) } return nil } -func (p *Streamer) Close() (*PutResponse, error) { +func (p *Streamer) Close(ctx context.Context) (*PutResponse, error) { if p.target == nil { return nil, errNotInit } - ids, err := p.target.Close() + ids, err := p.target.Close(ctx) if err != nil { return nil, fmt.Errorf("(%T) could not close object target: %w", p, err) } diff --git a/pkg/services/object/put/v2/service.go b/pkg/services/object/put/v2/service.go index 7d0dfc613..656f8df9c 100644 --- a/pkg/services/object/put/v2/service.go +++ b/pkg/services/object/put/v2/service.go @@ -1,7 +1,6 @@ package putsvc import ( - "context" "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" @@ -36,8 +35,8 @@ func NewService(opts ...Option) *Service { } // Put calls internal service and returns v2 object streamer. -func (s *Service) Put(ctx context.Context) (object.PutObjectStream, error) { - stream, err := s.svc.Put(ctx) +func (s *Service) Put() (object.PutObjectStream, error) { + stream, err := s.svc.Put() if err != nil { return nil, fmt.Errorf("(%T) could not open object put stream: %w", s, err) } diff --git a/pkg/services/object/put/v2/streamer.go b/pkg/services/object/put/v2/streamer.go index 85827cd4c..65846ea9f 100644 --- a/pkg/services/object/put/v2/streamer.go +++ b/pkg/services/object/put/v2/streamer.go @@ -1,6 +1,7 @@ package putsvc import ( + "context" "fmt" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" @@ -32,7 +33,7 @@ type sizes struct { writtenPayload uint64 // sum size of already cached chunks } -func (s *streamer) Send(req *object.PutRequest) (err error) { +func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error) { switch v := req.GetBody().GetObjectPart().(type) { case *object.PutObjectPartInit: var initPrm *putsvc.PutInitPrm @@ -71,7 +72,7 @@ func (s *streamer) Send(req *object.PutRequest) (err error) { } } - if err = s.stream.SendChunk(toChunkPrm(v)); err != nil { + if err = s.stream.SendChunk(ctx, toChunkPrm(v)); err != nil { err = fmt.Errorf("(%T) could not send payload chunk: %w", s, err) } @@ -103,7 +104,7 @@ func (s *streamer) Send(req *object.PutRequest) (err error) { return signature.SignServiceMessage(key, req) } -func (s *streamer) CloseAndRecv() (*object.PutResponse, error) { +func (s *streamer) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) { if s.saveChunks { // check payload size correctness if s.writtenPayload != s.payloadSz { @@ -111,7 +112,7 @@ func (s *streamer) CloseAndRecv() (*object.PutResponse, error) { } } - resp, err := s.stream.Close() + resp, err := s.stream.Close(ctx) if err != nil { return nil, fmt.Errorf("(%T) could not object put stream: %w", s, err) } diff --git a/pkg/services/object/put/validation.go b/pkg/services/object/put/validation.go index 2d6ada5a1..70c6974d3 100644 --- a/pkg/services/object/put/validation.go +++ b/pkg/services/object/put/validation.go @@ -2,6 +2,7 @@ package putsvc import ( "bytes" + "context" "crypto/sha256" "errors" "fmt" @@ -92,7 +93,7 @@ func (t *validatingTarget) WriteHeader(obj *objectSDK.Object) error { return nil } -func (t *validatingTarget) Write(p []byte) (n int, err error) { +func (t *validatingTarget) Write(ctx context.Context, p []byte) (n int, err error) { chunkLn := uint64(len(p)) if !t.unpreparedObject { @@ -107,7 +108,7 @@ func (t *validatingTarget) Write(p []byte) (n int, err error) { } } - n, err = t.nextTarget.Write(p) + n, err = t.nextTarget.Write(ctx, p) if err == nil { t.writtenPayload += uint64(n) } @@ -115,7 +116,7 @@ func (t *validatingTarget) Write(p []byte) (n int, err error) { return } -func (t *validatingTarget) Close() (*transformer.AccessIdentifiers, error) { +func (t *validatingTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) { if !t.unpreparedObject { // check payload size correctness if t.payloadSz != t.writtenPayload { @@ -127,5 +128,5 @@ func (t *validatingTarget) Close() (*transformer.AccessIdentifiers, error) { } } - return t.nextTarget.Close() + return t.nextTarget.Close(ctx) } diff --git a/pkg/services/object/response.go b/pkg/services/object/response.go index 4da2b23a7..def934ea6 100644 --- a/pkg/services/object/response.go +++ b/pkg/services/object/response.go @@ -59,12 +59,12 @@ func (s *ResponseService) Get(req *object.GetRequest, stream GetObjectStream) er }) } -func (s *putStreamResponser) Send(req *object.PutRequest) error { - return s.stream.Send(req) +func (s *putStreamResponser) Send(ctx context.Context, req *object.PutRequest) error { + return s.stream.Send(ctx, req) } -func (s *putStreamResponser) CloseAndRecv() (*object.PutResponse, error) { - r, err := s.stream.CloseAndRecv() +func (s *putStreamResponser) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) { + r, err := s.stream.CloseAndRecv(ctx) if err != nil { return nil, fmt.Errorf("(%T) could not receive response: %w", s, err) } @@ -72,19 +72,19 @@ func (s *putStreamResponser) CloseAndRecv() (*object.PutResponse, error) { return r.(*object.PutResponse), nil } -func (s *ResponseService) Put(ctx context.Context) (PutObjectStream, error) { - stream, err := s.svc.Put(ctx) +func (s *ResponseService) Put() (PutObjectStream, error) { + stream, err := s.svc.Put() if err != nil { return nil, fmt.Errorf("could not create Put object streamer: %w", err) } return &putStreamResponser{ stream: s.respSvc.CreateRequestStreamer( - func(req any) error { - return stream.Send(req.(*object.PutRequest)) + func(ctx context.Context, req any) error { + return stream.Send(ctx, req.(*object.PutRequest)) }, - func() (util.ResponseMessage, error) { - return stream.CloseAndRecv() + func(ctx context.Context) (util.ResponseMessage, error) { + return stream.CloseAndRecv(ctx) }, ), }, nil diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index d95c6c906..ccce9c4f4 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -27,15 +27,15 @@ type SearchStream interface { // PutObjectStream is an interface of FrostFS API v2 compatible client's object streamer. type PutObjectStream interface { - Send(*object.PutRequest) error - CloseAndRecv() (*object.PutResponse, error) + Send(context.Context, *object.PutRequest) error + CloseAndRecv(context.Context) (*object.PutResponse, error) } // ServiceServer is an interface of utility // serving v2 Object service. type ServiceServer interface { Get(*object.GetRequest, GetObjectStream) error - Put(context.Context) (PutObjectStream, error) + Put() (PutObjectStream, error) Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error) Search(*object.SearchRequest, SearchStream) error Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error) diff --git a/pkg/services/object/sign.go b/pkg/services/object/sign.go index 585fc659a..9d66c76ba 100644 --- a/pkg/services/object/sign.go +++ b/pkg/services/object/sign.go @@ -70,12 +70,12 @@ func (s *SignService) Get(req *object.GetRequest, stream GetObjectStream) error ) } -func (s *putStreamSigner) Send(req *object.PutRequest) error { - return s.stream.Send(req) +func (s *putStreamSigner) Send(ctx context.Context, req *object.PutRequest) error { + return s.stream.Send(ctx, req) } -func (s *putStreamSigner) CloseAndRecv() (*object.PutResponse, error) { - r, err := s.stream.CloseAndRecv() +func (s *putStreamSigner) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) { + r, err := s.stream.CloseAndRecv(ctx) if err != nil { return nil, fmt.Errorf("could not receive response: %w", err) } @@ -83,19 +83,19 @@ func (s *putStreamSigner) CloseAndRecv() (*object.PutResponse, error) { return r.(*object.PutResponse), nil } -func (s *SignService) Put(ctx context.Context) (PutObjectStream, error) { - stream, err := s.svc.Put(ctx) +func (s *SignService) Put() (PutObjectStream, error) { + stream, err := s.svc.Put() if err != nil { return nil, fmt.Errorf("could not create Put object streamer: %w", err) } return &putStreamSigner{ stream: s.sigSvc.CreateRequestStreamer( - func(req any) error { - return stream.Send(req.(*object.PutRequest)) + func(ctx context.Context, req any) error { + return stream.Send(ctx, req.(*object.PutRequest)) }, - func() (util.ResponseMessage, error) { - return stream.CloseAndRecv() + func(ctx context.Context) (util.ResponseMessage, error) { + return stream.CloseAndRecv(ctx) }, func() util.ResponseMessage { return new(object.PutResponse) diff --git a/pkg/services/object/transport_splitter.go b/pkg/services/object/transport_splitter.go index 3836103de..a7d1c486a 100644 --- a/pkg/services/object/transport_splitter.go +++ b/pkg/services/object/transport_splitter.go @@ -87,8 +87,8 @@ func (c *TransportSplitter) Get(req *object.GetRequest, stream GetObjectStream) }) } -func (c TransportSplitter) Put(ctx context.Context) (PutObjectStream, error) { - return c.next.Put(ctx) +func (c TransportSplitter) Put() (PutObjectStream, error) { + return c.next.Put() } func (c TransportSplitter) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) { diff --git a/pkg/services/object_manager/transformer/fmt.go b/pkg/services/object_manager/transformer/fmt.go index c9b5dc967..462cc7474 100644 --- a/pkg/services/object_manager/transformer/fmt.go +++ b/pkg/services/object_manager/transformer/fmt.go @@ -1,6 +1,7 @@ package transformer import ( + "context" "crypto/ecdsa" "fmt" @@ -53,15 +54,15 @@ func (f *formatter) WriteHeader(obj *object.Object) error { return nil } -func (f *formatter) Write(p []byte) (n int, err error) { - n, err = f.prm.NextTarget.Write(p) +func (f *formatter) Write(ctx context.Context, p []byte) (n int, err error) { + n, err = f.prm.NextTarget.Write(ctx, p) f.sz += uint64(n) return } -func (f *formatter) Close() (*AccessIdentifiers, error) { +func (f *formatter) Close(ctx context.Context) (*AccessIdentifiers, error) { curEpoch := f.prm.NetworkState.CurrentEpoch() ver := version.Current() @@ -100,7 +101,7 @@ func (f *formatter) Close() (*AccessIdentifiers, error) { return nil, fmt.Errorf("could not write header to next target: %w", err) } - if _, err := f.prm.NextTarget.Close(); err != nil { + if _, err := f.prm.NextTarget.Close(ctx); err != nil { return nil, fmt.Errorf("could not close next target: %w", err) } diff --git a/pkg/services/object_manager/transformer/transformer.go b/pkg/services/object_manager/transformer/transformer.go index 7b717d3df..199f5d0c1 100644 --- a/pkg/services/object_manager/transformer/transformer.go +++ b/pkg/services/object_manager/transformer/transformer.go @@ -1,10 +1,10 @@ package transformer import ( + "context" "crypto/sha256" "fmt" "hash" - "io" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" @@ -27,7 +27,7 @@ type payloadSizeLimiter struct { previous []oid.ID - chunkWriter io.Writer + chunkWriter writer splitID *object.SplitID @@ -64,16 +64,16 @@ func (s *payloadSizeLimiter) WriteHeader(hdr *object.Object) error { return nil } -func (s *payloadSizeLimiter) Write(p []byte) (int, error) { - if err := s.writeChunk(p); err != nil { +func (s *payloadSizeLimiter) Write(ctx context.Context, p []byte) (int, error) { + if err := s.writeChunk(ctx, p); err != nil { return 0, err } return len(p), nil } -func (s *payloadSizeLimiter) Close() (*AccessIdentifiers, error) { - return s.release(true) +func (s *payloadSizeLimiter) Close(ctx context.Context) (*AccessIdentifiers, error) { + return s.release(ctx, true) } func (s *payloadSizeLimiter) initialize() { @@ -117,19 +117,19 @@ func (s *payloadSizeLimiter) initializeCurrent() { s.currentHashers = payloadHashersForObject(s.current, s.withoutHomomorphicHash) // compose multi-writer from target and all payload hashers - ws := make([]io.Writer, 0, 1+len(s.currentHashers)+len(s.parentHashers)) + ws := make([]writer, 0, 1+len(s.currentHashers)+len(s.parentHashers)) ws = append(ws, s.target) for i := range s.currentHashers { - ws = append(ws, s.currentHashers[i].hasher) + ws = append(ws, newWriter(s.currentHashers[i].hasher)) } for i := range s.parentHashers { - ws = append(ws, s.parentHashers[i].hasher) + ws = append(ws, newWriter(s.parentHashers[i].hasher)) } - s.chunkWriter = io.MultiWriter(ws...) + s.chunkWriter = newMultiWriter(ws...) } func payloadHashersForObject(obj *object.Object, withoutHomomorphicHash bool) []*payloadChecksumHasher { @@ -174,7 +174,7 @@ func payloadHashersForObject(obj *object.Object, withoutHomomorphicHash bool) [] return hashers } -func (s *payloadSizeLimiter) release(finalize bool) (*AccessIdentifiers, error) { +func (s *payloadSizeLimiter) release(ctx context.Context, finalize bool) (*AccessIdentifiers, error) { // Arg finalize is true only when called from Close method. // We finalize parent and generate linking objects only if it is more // than 1 object in split-chain. @@ -194,7 +194,7 @@ func (s *payloadSizeLimiter) release(finalize bool) (*AccessIdentifiers, error) return nil, fmt.Errorf("could not write header: %w", err) } - ids, err := s.target.Close() + ids, err := s.target.Close(ctx) if err != nil { return nil, fmt.Errorf("could not close target: %w", err) } @@ -207,7 +207,7 @@ func (s *payloadSizeLimiter) release(finalize bool) (*AccessIdentifiers, error) s.initializeLinking(ids.Parent()) s.initializeCurrent() - if _, err := s.release(false); err != nil { + if _, err := s.release(ctx, false); err != nil { return nil, fmt.Errorf("could not release linking object: %w", err) } } @@ -228,7 +228,7 @@ func (s *payloadSizeLimiter) initializeLinking(parHdr *object.Object) { s.current.SetSplitID(s.splitID) } -func (s *payloadSizeLimiter) writeChunk(chunk []byte) error { +func (s *payloadSizeLimiter) writeChunk(ctx context.Context, chunk []byte) error { // statement is true if the previous write of bytes reached exactly the boundary. if s.written > 0 && s.written%s.maxSize == 0 { if s.written == s.maxSize { @@ -236,7 +236,7 @@ func (s *payloadSizeLimiter) writeChunk(chunk []byte) error { } // we need to release current object - if _, err := s.release(false); err != nil { + if _, err := s.release(ctx, false); err != nil { return fmt.Errorf("could not release object: %w", err) } @@ -255,7 +255,7 @@ func (s *payloadSizeLimiter) writeChunk(chunk []byte) error { cut = leftToEdge } - if _, err := s.chunkWriter.Write(chunk[:cut]); err != nil { + if _, err := s.chunkWriter.Write(ctx, chunk[:cut]); err != nil { return fmt.Errorf("could not write chunk to target: %w", err) } @@ -264,7 +264,7 @@ func (s *payloadSizeLimiter) writeChunk(chunk []byte) error { // if there are more bytes in buffer we call method again to start filling another object if ln > leftToEdge { - return s.writeChunk(chunk[cut:]) + return s.writeChunk(ctx, chunk[cut:]) } return nil diff --git a/pkg/services/object_manager/transformer/types.go b/pkg/services/object_manager/transformer/types.go index 0fa3b6436..3e6e2feff 100644 --- a/pkg/services/object_manager/transformer/types.go +++ b/pkg/services/object_manager/transformer/types.go @@ -1,7 +1,7 @@ package transformer import ( - "io" + "context" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" @@ -35,7 +35,7 @@ type ObjectTarget interface { // Can be called multiple times. // // Must not be called after Close call. - io.Writer + Write(ctx context.Context, p []byte) (n int, err error) // Close is used to finish object writing. // @@ -45,7 +45,7 @@ type ObjectTarget interface { // Must be called no more than once. Control remains with the caller. // Re-calling can lead to undefined behavior // that depends on the implementation. - Close() (*AccessIdentifiers, error) + Close(ctx context.Context) (*AccessIdentifiers, error) } // TargetInitializer represents ObjectTarget constructor. diff --git a/pkg/services/object_manager/transformer/writer.go b/pkg/services/object_manager/transformer/writer.go new file mode 100644 index 000000000..27aed16ff --- /dev/null +++ b/pkg/services/object_manager/transformer/writer.go @@ -0,0 +1,52 @@ +package transformer + +import ( + "context" + "io" +) + +type writer interface { + Write(ctx context.Context, p []byte) (n int, err error) +} + +type multiWriter struct { + writers []writer +} + +func (t *multiWriter) Write(ctx context.Context, p []byte) (n int, err error) { + for _, w := range t.writers { + n, err = w.Write(ctx, p) + if err != nil { + return + } + if n != len(p) { + err = io.ErrShortWrite + return + } + } + return len(p), nil +} + +func newMultiWriter(writers ...writer) writer { + allWriters := make([]writer, 0, len(writers)) + for _, w := range writers { + if mw, ok := w.(*multiWriter); ok { + allWriters = append(allWriters, mw.writers...) + } else { + allWriters = append(allWriters, w) + } + } + return &multiWriter{allWriters} +} + +type writerWrapper struct { + Writer io.Writer +} + +func (w *writerWrapper) Write(_ context.Context, p []byte) (n int, err error) { + return w.Writer.Write(p) +} + +func newWriter(w io.Writer) writer { + return &writerWrapper{Writer: w} +} diff --git a/pkg/services/util/response/client_stream.go b/pkg/services/util/response/client_stream.go index f167f005a..b541c73db 100644 --- a/pkg/services/util/response/client_stream.go +++ b/pkg/services/util/response/client_stream.go @@ -1,6 +1,7 @@ package response import ( + "context" "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util" @@ -17,8 +18,8 @@ type ClientMessageStreamer struct { } // Send calls send method of internal streamer. -func (s *ClientMessageStreamer) Send(req any) error { - if err := s.send(req); err != nil { +func (s *ClientMessageStreamer) Send(ctx context.Context, req any) error { + if err := s.send(ctx, req); err != nil { return fmt.Errorf("(%T) could not send the request: %w", s, err) } return nil @@ -26,8 +27,8 @@ func (s *ClientMessageStreamer) Send(req any) error { // CloseAndRecv closes internal stream, receivers the response, // sets meta values and returns the result. -func (s *ClientMessageStreamer) CloseAndRecv() (util.ResponseMessage, error) { - resp, err := s.close() +func (s *ClientMessageStreamer) CloseAndRecv(ctx context.Context) (util.ResponseMessage, error) { + resp, err := s.close(ctx) if err != nil { return nil, fmt.Errorf("(%T) could not close stream and receive response: %w", s, err) } diff --git a/pkg/services/util/sign.go b/pkg/services/util/sign.go index 2478e6256..cb4be3084 100644 --- a/pkg/services/util/sign.go +++ b/pkg/services/util/sign.go @@ -37,9 +37,9 @@ var ErrAbortStream = errors.New("abort message stream") type ResponseConstructor func() ResponseMessage -type RequestMessageWriter func(any) error +type RequestMessageWriter func(context.Context, any) error -type ClientStreamCloser func() (ResponseMessage, error) +type ClientStreamCloser func(context.Context) (ResponseMessage, error) type RequestMessageStreamer struct { key *ecdsa.PrivateKey @@ -61,7 +61,7 @@ func NewUnarySignService(key *ecdsa.PrivateKey) *SignService { } } -func (s *RequestMessageStreamer) Send(req any) error { +func (s *RequestMessageStreamer) Send(ctx context.Context, req any) error { // req argument should be strengthen with type RequestMessage s.statusSupported = isStatusSupported(req.(RequestMessage)) // panic is OK here for now @@ -71,7 +71,7 @@ func (s *RequestMessageStreamer) Send(req any) error { if err = signature.VerifyServiceMessage(req); err != nil { err = fmt.Errorf("could not verify request: %w", err) } else { - err = s.send(req) + err = s.send(ctx, req) } if err != nil { @@ -87,7 +87,7 @@ func (s *RequestMessageStreamer) Send(req any) error { return nil } -func (s *RequestMessageStreamer) CloseAndRecv() (ResponseMessage, error) { +func (s *RequestMessageStreamer) CloseAndRecv(ctx context.Context) (ResponseMessage, error) { var ( resp ResponseMessage err error @@ -96,7 +96,7 @@ func (s *RequestMessageStreamer) CloseAndRecv() (ResponseMessage, error) { if s.sendErr != nil { err = s.sendErr } else { - resp, err = s.close() + resp, err = s.close(ctx) if err != nil { err = fmt.Errorf("could not close stream and receive response: %w", err) }