[#199] putsvc: Refactor put object

Resolve containedctx linter for streamer and remote target

Signed-off-by: Dmitrii Stepanov <d.stepanov@yadro.com>
This commit is contained in:
Dmitrii Stepanov 2023-04-03 14:23:53 +03:00 committed by Gitea
parent cecea8053a
commit 27bdddc48f
24 changed files with 171 additions and 125 deletions

View file

@ -70,8 +70,8 @@ func (c *cfg) MaxObjectSize() uint64 {
return sz return sz
} }
func (s *objectSvc) Put(ctx context.Context) (objectService.PutObjectStream, error) { func (s *objectSvc) Put() (objectService.PutObjectStream, error) {
return s.put.Put(ctx) return s.put.Put()
} }
func (s *objectSvc) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) { func (s *objectSvc) Head(ctx context.Context, req *object.HeadRequest) (*object.HeadResponse, error) {

View file

@ -26,7 +26,7 @@ func New(c objectSvc.ServiceServer) *Server {
// Put opens internal Object service Put stream and overtakes data from gRPC stream to it. // Put opens internal Object service Put stream and overtakes data from gRPC stream to it.
func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error { func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error {
stream, err := s.srv.Put(gStream.Context()) stream, err := s.srv.Put()
if err != nil { if err != nil {
return err return err
} }
@ -35,7 +35,7 @@ func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error {
req, err := gStream.Recv() req, err := gStream.Recv()
if err != nil { if err != nil {
if errors.Is(err, io.EOF) { if errors.Is(err, io.EOF) {
resp, err := stream.CloseAndRecv() resp, err := stream.CloseAndRecv(gStream.Context())
if err != nil { if err != nil {
return err return err
} }
@ -51,9 +51,9 @@ func (s *Server) Put(gStream objectGRPC.ObjectService_PutServer) error {
return err return err
} }
if err := stream.Send(putReq); err != nil { if err := stream.Send(gStream.Context(), putReq); err != nil {
if errors.Is(err, util.ErrAbortStream) { if errors.Is(err, util.ErrAbortStream) {
resp, err := stream.CloseAndRecv() resp, err := stream.CloseAndRecv(gStream.Context())
if err != nil { if err != nil {
return err return err
} }

View file

@ -165,8 +165,8 @@ func (b Service) Get(request *objectV2.GetRequest, stream object.GetObjectStream
}) })
} }
func (b Service) Put(ctx context.Context) (object.PutObjectStream, error) { func (b Service) Put() (object.PutObjectStream, error) {
streamer, err := b.next.Put(ctx) streamer, err := b.next.Put()
return putStreamBasicChecker{ return putStreamBasicChecker{
source: &b, source: &b,
@ -444,7 +444,7 @@ func (b Service) GetRangeHash(
} }
// nolint: funlen // nolint: funlen
func (p putStreamBasicChecker) Send(request *objectV2.PutRequest) error { func (p putStreamBasicChecker) Send(ctx context.Context, request *objectV2.PutRequest) error {
body := request.GetBody() body := request.GetBody()
if body == nil { if body == nil {
return errEmptyBody return errEmptyBody
@ -531,11 +531,11 @@ func (p putStreamBasicChecker) Send(request *objectV2.PutRequest) error {
} }
} }
return p.next.Send(request) return p.next.Send(ctx, request)
} }
func (p putStreamBasicChecker) CloseAndRecv() (*objectV2.PutResponse, error) { func (p putStreamBasicChecker) CloseAndRecv(ctx context.Context) (*objectV2.PutResponse, error) {
return p.next.CloseAndRecv() return p.next.CloseAndRecv(ctx)
} }
func (g *getStreamBasicChecker) Send(resp *objectV2.GetResponse) error { func (g *getStreamBasicChecker) Send(resp *objectV2.GetResponse) error {

View file

@ -42,12 +42,12 @@ func (x *Common) Get(req *objectV2.GetRequest, stream GetObjectStream) error {
return x.nextHandler.Get(req, stream) return x.nextHandler.Get(req, stream)
} }
func (x *Common) Put(ctx context.Context) (PutObjectStream, error) { func (x *Common) Put() (PutObjectStream, error) {
if x.state.IsMaintenance() { if x.state.IsMaintenance() {
return nil, errMaintenance return nil, errMaintenance
} }
return x.nextHandler.Put(ctx) return x.nextHandler.Put()
} }
func (x *Common) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV2.HeadResponse, error) { func (x *Common) Head(ctx context.Context, req *objectV2.HeadRequest) (*objectV2.HeadResponse, error) {

View file

@ -108,7 +108,7 @@ func (s *simpleIDWriter) WriteIDs(ids []oid.ID) error {
} }
func (w *putSvcWrapper) put(exec *execCtx) (*oid.ID, error) { func (w *putSvcWrapper) put(exec *execCtx) (*oid.ID, error) {
streamer, err := (*putsvc.Service)(w).Put(exec.context()) streamer, err := (*putsvc.Service)(w).Put()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -124,12 +124,12 @@ func (w *putSvcWrapper) put(exec *execCtx) (*oid.ID, error) {
return nil, err return nil, err
} }
err = streamer.SendChunk(new(putsvc.PutChunkPrm).WithChunk(payload)) err = streamer.SendChunk(exec.context(), new(putsvc.PutChunkPrm).WithChunk(payload))
if err != nil { if err != nil {
return nil, err return nil, err
} }
r, err := streamer.Close() r, err := streamer.Close(exec.context())
if err != nil { if err != nil {
return nil, err return nil, err
} }

View file

@ -75,11 +75,11 @@ func (m MetricCollector) Get(req *object.GetRequest, stream GetObjectStream) (er
return return
} }
func (m MetricCollector) Put(ctx context.Context) (PutObjectStream, error) { func (m MetricCollector) Put() (PutObjectStream, error) {
if m.enabled { if m.enabled {
t := time.Now() t := time.Now()
stream, err := m.next.Put(ctx) stream, err := m.next.Put()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -90,7 +90,7 @@ func (m MetricCollector) Put(ctx context.Context) (PutObjectStream, error) {
start: t, start: t,
}, nil }, nil
} }
return m.next.Put(ctx) return m.next.Put()
} }
func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) { func (m MetricCollector) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) {
@ -179,17 +179,17 @@ func (s getStreamMetric) Send(resp *object.GetResponse) error {
return s.stream.Send(resp) return s.stream.Send(resp)
} }
func (s putStreamMetric) Send(req *object.PutRequest) error { func (s putStreamMetric) Send(ctx context.Context, req *object.PutRequest) error {
chunk, ok := req.GetBody().GetObjectPart().(*object.PutObjectPartChunk) chunk, ok := req.GetBody().GetObjectPart().(*object.PutObjectPartChunk)
if ok { if ok {
s.metrics.AddPutPayload(len(chunk.GetChunk())) s.metrics.AddPutPayload(len(chunk.GetChunk()))
} }
return s.stream.Send(req) return s.stream.Send(ctx, req)
} }
func (s putStreamMetric) CloseAndRecv() (*object.PutResponse, error) { func (s putStreamMetric) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) {
res, err := s.stream.CloseAndRecv() res, err := s.stream.CloseAndRecv(ctx)
s.metrics.IncPutReqCounter(err == nil) s.metrics.IncPutReqCounter(err == nil)
s.metrics.AddPutReqDuration(time.Since(s.start)) s.metrics.AddPutReqDuration(time.Since(s.start))

View file

@ -1,6 +1,7 @@
package putsvc package putsvc
import ( import (
"context"
"fmt" "fmt"
"sync" "sync"
"sync/atomic" "sync/atomic"
@ -17,7 +18,7 @@ import (
type preparedObjectTarget interface { type preparedObjectTarget interface {
WriteObject(*objectSDK.Object, object.ContentMeta) error WriteObject(*objectSDK.Object, object.ContentMeta) error
Close() (*transformer.AccessIdentifiers, error) Close(ctx context.Context) (*transformer.AccessIdentifiers, error)
} }
type distributedTarget struct { type distributedTarget struct {
@ -121,13 +122,13 @@ func (t *distributedTarget) WriteHeader(obj *objectSDK.Object) error {
return nil return nil
} }
func (t *distributedTarget) Write(p []byte) (n int, err error) { func (t *distributedTarget) Write(_ context.Context, p []byte) (n int, err error) {
t.payload.Data = append(t.payload.Data, p...) t.payload.Data = append(t.payload.Data, p...)
return len(p), nil return len(p), nil
} }
func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) { func (t *distributedTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) {
defer func() { defer func() {
putPayload(t.payload) putPayload(t.payload)
t.payload = nil t.payload = nil
@ -146,10 +147,10 @@ func (t *distributedTarget) Close() (*transformer.AccessIdentifiers, error) {
t.traversal.extraBroadcastEnabled = true t.traversal.extraBroadcastEnabled = true
} }
return t.iteratePlacement(t.sendObject) return t.iteratePlacement(ctx)
} }
func (t *distributedTarget) sendObject(node nodeDesc) error { func (t *distributedTarget) sendObject(ctx context.Context, node nodeDesc) error {
if !node.local && t.relay != nil { if !node.local && t.relay != nil {
return t.relay(node) return t.relay(node)
} }
@ -158,13 +159,13 @@ func (t *distributedTarget) sendObject(node nodeDesc) error {
if err := target.WriteObject(t.obj, t.objMeta); err != nil { if err := target.WriteObject(t.obj, t.objMeta); err != nil {
return fmt.Errorf("could not write header: %w", err) return fmt.Errorf("could not write header: %w", err)
} else if _, err := target.Close(); err != nil { } else if _, err := target.Close(ctx); err != nil {
return fmt.Errorf("could not close object stream: %w", err) return fmt.Errorf("could not close object stream: %w", err)
} }
return nil return nil
} }
func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transformer.AccessIdentifiers, error) { func (t *distributedTarget) iteratePlacement(ctx context.Context) (*transformer.AccessIdentifiers, error) {
id, _ := t.obj.ID() id, _ := t.obj.ID()
traverser, err := placement.NewTraverser( traverser, err := placement.NewTraverser(
@ -182,7 +183,7 @@ func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transform
break break
} }
if t.iterateAddresses(traverser, addrs, f, resErr) { if t.iterateAddresses(ctx, traverser, addrs, resErr) {
break break
} }
} }
@ -195,7 +196,7 @@ func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transform
// perform additional container broadcast if needed // perform additional container broadcast if needed
if t.traversal.submitPrimaryPlacementFinish() { if t.traversal.submitPrimaryPlacementFinish() {
_, err = t.iteratePlacement(f) _, err = t.iteratePlacement(ctx)
if err != nil { if err != nil {
t.log.Error("additional container broadcast failure", zap.Error(err)) t.log.Error("additional container broadcast failure", zap.Error(err))
// we don't fail primary operation because of broadcast failure // we don't fail primary operation because of broadcast failure
@ -208,7 +209,7 @@ func (t *distributedTarget) iteratePlacement(f func(nodeDesc) error) (*transform
WithSelfID(id), nil WithSelfID(id), nil
} }
func (t *distributedTarget) iterateAddresses(traverser *placement.Traverser, addrs []placement.Node, f func(nodeDesc) error, resErr *atomic.Value) bool { func (t *distributedTarget) iterateAddresses(ctx context.Context, traverser *placement.Traverser, addrs []placement.Node, resErr *atomic.Value) bool {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
for i := range addrs { for i := range addrs {
@ -230,7 +231,7 @@ func (t *distributedTarget) iterateAddresses(traverser *placement.Traverser, add
if err := workerPool.Submit(func() { if err := workerPool.Submit(func() {
defer wg.Done() defer wg.Done()
err := f(nodeDesc{local: isLocal, info: addr}) err := t.sendObject(ctx, nodeDesc{local: isLocal, info: addr})
// mark the container node as processed in order to exclude it // mark the container node as processed in order to exclude it
// in subsequent container broadcast. Note that we don't // in subsequent container broadcast. Note that we don't

View file

@ -1,6 +1,7 @@
package putsvc package putsvc
import ( import (
"context"
"fmt" "fmt"
objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object" objectCore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
@ -38,7 +39,7 @@ func (t *localTarget) WriteObject(obj *object.Object, meta objectCore.ContentMet
return nil return nil
} }
func (t *localTarget) Close() (*transformer.AccessIdentifiers, error) { func (t *localTarget) Close(_ context.Context) (*transformer.AccessIdentifiers, error) {
switch t.meta.Type() { switch t.meta.Type() {
case object.TypeTombstone: case object.TypeTombstone:
err := t.storage.Delete(objectCore.AddressOf(t.obj), t.meta.Objects()) err := t.storage.Delete(objectCore.AddressOf(t.obj), t.meta.Objects())

View file

@ -15,10 +15,7 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
) )
// nolint: containedctx
type remoteTarget struct { type remoteTarget struct {
ctx context.Context
privateKey *ecdsa.PrivateKey privateKey *ecdsa.PrivateKey
commonPrm *util.CommonPrm commonPrm *util.CommonPrm
@ -51,7 +48,7 @@ func (t *remoteTarget) WriteObject(obj *object.Object, _ objectcore.ContentMeta)
return nil return nil
} }
func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) { func (t *remoteTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) {
c, err := t.clientConstructor.Get(t.nodeInfo) c, err := t.clientConstructor.Get(t.nodeInfo)
if err != nil { if err != nil {
return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err) return nil, fmt.Errorf("(%T) could not create SDK client %s: %w", t, t.nodeInfo, err)
@ -59,7 +56,7 @@ func (t *remoteTarget) Close() (*transformer.AccessIdentifiers, error) {
var prm internalclient.PutObjectPrm var prm internalclient.PutObjectPrm
prm.SetContext(t.ctx) prm.SetContext(ctx)
prm.SetClient(c) prm.SetClient(c)
prm.SetPrivateKey(t.privateKey) prm.SetPrivateKey(t.privateKey)
prm.SetSessionToken(t.commonPrm.SessionToken()) prm.SetSessionToken(t.commonPrm.SessionToken())
@ -110,7 +107,6 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
} }
t := &remoteTarget{ t := &remoteTarget{
ctx: ctx,
privateKey: key, privateKey: key,
clientConstructor: s.clientConstructor, clientConstructor: s.clientConstructor,
} }
@ -122,7 +118,7 @@ func (s *RemoteSender) PutObject(ctx context.Context, p *RemotePutPrm) error {
if err := t.WriteObject(p.obj, objectcore.ContentMeta{}); err != nil { if err := t.WriteObject(p.obj, objectcore.ContentMeta{}); err != nil {
return fmt.Errorf("(%T) could not send object header: %w", s, err) return fmt.Errorf("(%T) could not send object header: %w", s, err)
} else if _, err := t.Close(); err != nil { } else if _, err := t.Close(ctx); err != nil {
return fmt.Errorf("(%T) could not send object: %w", s, err) return fmt.Errorf("(%T) could not send object: %w", s, err)
} }

View file

@ -1,8 +1,6 @@
package putsvc package putsvc
import ( import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
@ -79,10 +77,9 @@ func NewService(opts ...Option) *Service {
} }
} }
func (p *Service) Put(ctx context.Context) (*Streamer, error) { func (p *Service) Put() (*Streamer, error) {
return &Streamer{ return &Streamer{
cfg: p.cfg, cfg: p.cfg,
ctx: ctx,
}, nil }, nil
} }

View file

@ -16,12 +16,9 @@ import (
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
) )
// nolint: containedctx
type Streamer struct { type Streamer struct {
*cfg *cfg
ctx context.Context
sessionKey *ecdsa.PrivateKey sessionKey *ecdsa.PrivateKey
target transformer.ObjectTarget target transformer.ObjectTarget
@ -232,7 +229,6 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
} }
rt := &remoteTarget{ rt := &remoteTarget{
ctx: p.ctx,
privateKey: p.sessionKey, privateKey: p.sessionKey,
commonPrm: prm.common, commonPrm: prm.common,
clientConstructor: p.clientConstructor, clientConstructor: p.clientConstructor,
@ -250,24 +246,24 @@ func (p *Streamer) newCommonTarget(prm *PutInitPrm) transformer.ObjectTarget {
} }
} }
func (p *Streamer) SendChunk(prm *PutChunkPrm) error { func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error {
if p.target == nil { if p.target == nil {
return errNotInit return errNotInit
} }
if _, err := p.target.Write(prm.chunk); err != nil { if _, err := p.target.Write(ctx, prm.chunk); err != nil {
return fmt.Errorf("(%T) could not write payload chunk to target: %w", p, err) return fmt.Errorf("(%T) could not write payload chunk to target: %w", p, err)
} }
return nil return nil
} }
func (p *Streamer) Close() (*PutResponse, error) { func (p *Streamer) Close(ctx context.Context) (*PutResponse, error) {
if p.target == nil { if p.target == nil {
return nil, errNotInit return nil, errNotInit
} }
ids, err := p.target.Close() ids, err := p.target.Close(ctx)
if err != nil { if err != nil {
return nil, fmt.Errorf("(%T) could not close object target: %w", p, err) return nil, fmt.Errorf("(%T) could not close object target: %w", p, err)
} }

View file

@ -1,7 +1,6 @@
package putsvc package putsvc
import ( import (
"context"
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object"
@ -36,8 +35,8 @@ func NewService(opts ...Option) *Service {
} }
// Put calls internal service and returns v2 object streamer. // Put calls internal service and returns v2 object streamer.
func (s *Service) Put(ctx context.Context) (object.PutObjectStream, error) { func (s *Service) Put() (object.PutObjectStream, error) {
stream, err := s.svc.Put(ctx) stream, err := s.svc.Put()
if err != nil { if err != nil {
return nil, fmt.Errorf("(%T) could not open object put stream: %w", s, err) return nil, fmt.Errorf("(%T) could not open object put stream: %w", s, err)
} }

View file

@ -1,6 +1,7 @@
package putsvc package putsvc
import ( import (
"context"
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object" "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
@ -32,7 +33,7 @@ type sizes struct {
writtenPayload uint64 // sum size of already cached chunks writtenPayload uint64 // sum size of already cached chunks
} }
func (s *streamer) Send(req *object.PutRequest) (err error) { func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error) {
switch v := req.GetBody().GetObjectPart().(type) { switch v := req.GetBody().GetObjectPart().(type) {
case *object.PutObjectPartInit: case *object.PutObjectPartInit:
var initPrm *putsvc.PutInitPrm var initPrm *putsvc.PutInitPrm
@ -71,7 +72,7 @@ func (s *streamer) Send(req *object.PutRequest) (err error) {
} }
} }
if err = s.stream.SendChunk(toChunkPrm(v)); err != nil { if err = s.stream.SendChunk(ctx, toChunkPrm(v)); err != nil {
err = fmt.Errorf("(%T) could not send payload chunk: %w", s, err) err = fmt.Errorf("(%T) could not send payload chunk: %w", s, err)
} }
@ -103,7 +104,7 @@ func (s *streamer) Send(req *object.PutRequest) (err error) {
return signature.SignServiceMessage(key, req) return signature.SignServiceMessage(key, req)
} }
func (s *streamer) CloseAndRecv() (*object.PutResponse, error) { func (s *streamer) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) {
if s.saveChunks { if s.saveChunks {
// check payload size correctness // check payload size correctness
if s.writtenPayload != s.payloadSz { if s.writtenPayload != s.payloadSz {
@ -111,7 +112,7 @@ func (s *streamer) CloseAndRecv() (*object.PutResponse, error) {
} }
} }
resp, err := s.stream.Close() resp, err := s.stream.Close(ctx)
if err != nil { if err != nil {
return nil, fmt.Errorf("(%T) could not object put stream: %w", s, err) return nil, fmt.Errorf("(%T) could not object put stream: %w", s, err)
} }

View file

@ -2,6 +2,7 @@ package putsvc
import ( import (
"bytes" "bytes"
"context"
"crypto/sha256" "crypto/sha256"
"errors" "errors"
"fmt" "fmt"
@ -92,7 +93,7 @@ func (t *validatingTarget) WriteHeader(obj *objectSDK.Object) error {
return nil return nil
} }
func (t *validatingTarget) Write(p []byte) (n int, err error) { func (t *validatingTarget) Write(ctx context.Context, p []byte) (n int, err error) {
chunkLn := uint64(len(p)) chunkLn := uint64(len(p))
if !t.unpreparedObject { if !t.unpreparedObject {
@ -107,7 +108,7 @@ func (t *validatingTarget) Write(p []byte) (n int, err error) {
} }
} }
n, err = t.nextTarget.Write(p) n, err = t.nextTarget.Write(ctx, p)
if err == nil { if err == nil {
t.writtenPayload += uint64(n) t.writtenPayload += uint64(n)
} }
@ -115,7 +116,7 @@ func (t *validatingTarget) Write(p []byte) (n int, err error) {
return return
} }
func (t *validatingTarget) Close() (*transformer.AccessIdentifiers, error) { func (t *validatingTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) {
if !t.unpreparedObject { if !t.unpreparedObject {
// check payload size correctness // check payload size correctness
if t.payloadSz != t.writtenPayload { if t.payloadSz != t.writtenPayload {
@ -127,5 +128,5 @@ func (t *validatingTarget) Close() (*transformer.AccessIdentifiers, error) {
} }
} }
return t.nextTarget.Close() return t.nextTarget.Close(ctx)
} }

View file

@ -59,12 +59,12 @@ func (s *ResponseService) Get(req *object.GetRequest, stream GetObjectStream) er
}) })
} }
func (s *putStreamResponser) Send(req *object.PutRequest) error { func (s *putStreamResponser) Send(ctx context.Context, req *object.PutRequest) error {
return s.stream.Send(req) return s.stream.Send(ctx, req)
} }
func (s *putStreamResponser) CloseAndRecv() (*object.PutResponse, error) { func (s *putStreamResponser) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) {
r, err := s.stream.CloseAndRecv() r, err := s.stream.CloseAndRecv(ctx)
if err != nil { if err != nil {
return nil, fmt.Errorf("(%T) could not receive response: %w", s, err) return nil, fmt.Errorf("(%T) could not receive response: %w", s, err)
} }
@ -72,19 +72,19 @@ func (s *putStreamResponser) CloseAndRecv() (*object.PutResponse, error) {
return r.(*object.PutResponse), nil return r.(*object.PutResponse), nil
} }
func (s *ResponseService) Put(ctx context.Context) (PutObjectStream, error) { func (s *ResponseService) Put() (PutObjectStream, error) {
stream, err := s.svc.Put(ctx) stream, err := s.svc.Put()
if err != nil { if err != nil {
return nil, fmt.Errorf("could not create Put object streamer: %w", err) return nil, fmt.Errorf("could not create Put object streamer: %w", err)
} }
return &putStreamResponser{ return &putStreamResponser{
stream: s.respSvc.CreateRequestStreamer( stream: s.respSvc.CreateRequestStreamer(
func(req any) error { func(ctx context.Context, req any) error {
return stream.Send(req.(*object.PutRequest)) return stream.Send(ctx, req.(*object.PutRequest))
}, },
func() (util.ResponseMessage, error) { func(ctx context.Context) (util.ResponseMessage, error) {
return stream.CloseAndRecv() return stream.CloseAndRecv(ctx)
}, },
), ),
}, nil }, nil

View file

@ -27,15 +27,15 @@ type SearchStream interface {
// PutObjectStream is an interface of FrostFS API v2 compatible client's object streamer. // PutObjectStream is an interface of FrostFS API v2 compatible client's object streamer.
type PutObjectStream interface { type PutObjectStream interface {
Send(*object.PutRequest) error Send(context.Context, *object.PutRequest) error
CloseAndRecv() (*object.PutResponse, error) CloseAndRecv(context.Context) (*object.PutResponse, error)
} }
// ServiceServer is an interface of utility // ServiceServer is an interface of utility
// serving v2 Object service. // serving v2 Object service.
type ServiceServer interface { type ServiceServer interface {
Get(*object.GetRequest, GetObjectStream) error Get(*object.GetRequest, GetObjectStream) error
Put(context.Context) (PutObjectStream, error) Put() (PutObjectStream, error)
Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error) Head(context.Context, *object.HeadRequest) (*object.HeadResponse, error)
Search(*object.SearchRequest, SearchStream) error Search(*object.SearchRequest, SearchStream) error
Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error) Delete(context.Context, *object.DeleteRequest) (*object.DeleteResponse, error)

View file

@ -70,12 +70,12 @@ func (s *SignService) Get(req *object.GetRequest, stream GetObjectStream) error
) )
} }
func (s *putStreamSigner) Send(req *object.PutRequest) error { func (s *putStreamSigner) Send(ctx context.Context, req *object.PutRequest) error {
return s.stream.Send(req) return s.stream.Send(ctx, req)
} }
func (s *putStreamSigner) CloseAndRecv() (*object.PutResponse, error) { func (s *putStreamSigner) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) {
r, err := s.stream.CloseAndRecv() r, err := s.stream.CloseAndRecv(ctx)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not receive response: %w", err) return nil, fmt.Errorf("could not receive response: %w", err)
} }
@ -83,19 +83,19 @@ func (s *putStreamSigner) CloseAndRecv() (*object.PutResponse, error) {
return r.(*object.PutResponse), nil return r.(*object.PutResponse), nil
} }
func (s *SignService) Put(ctx context.Context) (PutObjectStream, error) { func (s *SignService) Put() (PutObjectStream, error) {
stream, err := s.svc.Put(ctx) stream, err := s.svc.Put()
if err != nil { if err != nil {
return nil, fmt.Errorf("could not create Put object streamer: %w", err) return nil, fmt.Errorf("could not create Put object streamer: %w", err)
} }
return &putStreamSigner{ return &putStreamSigner{
stream: s.sigSvc.CreateRequestStreamer( stream: s.sigSvc.CreateRequestStreamer(
func(req any) error { func(ctx context.Context, req any) error {
return stream.Send(req.(*object.PutRequest)) return stream.Send(ctx, req.(*object.PutRequest))
}, },
func() (util.ResponseMessage, error) { func(ctx context.Context) (util.ResponseMessage, error) {
return stream.CloseAndRecv() return stream.CloseAndRecv(ctx)
}, },
func() util.ResponseMessage { func() util.ResponseMessage {
return new(object.PutResponse) return new(object.PutResponse)

View file

@ -87,8 +87,8 @@ func (c *TransportSplitter) Get(req *object.GetRequest, stream GetObjectStream)
}) })
} }
func (c TransportSplitter) Put(ctx context.Context) (PutObjectStream, error) { func (c TransportSplitter) Put() (PutObjectStream, error) {
return c.next.Put(ctx) return c.next.Put()
} }
func (c TransportSplitter) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) { func (c TransportSplitter) Head(ctx context.Context, request *object.HeadRequest) (*object.HeadResponse, error) {

View file

@ -1,6 +1,7 @@
package transformer package transformer
import ( import (
"context"
"crypto/ecdsa" "crypto/ecdsa"
"fmt" "fmt"
@ -53,15 +54,15 @@ func (f *formatter) WriteHeader(obj *object.Object) error {
return nil return nil
} }
func (f *formatter) Write(p []byte) (n int, err error) { func (f *formatter) Write(ctx context.Context, p []byte) (n int, err error) {
n, err = f.prm.NextTarget.Write(p) n, err = f.prm.NextTarget.Write(ctx, p)
f.sz += uint64(n) f.sz += uint64(n)
return return
} }
func (f *formatter) Close() (*AccessIdentifiers, error) { func (f *formatter) Close(ctx context.Context) (*AccessIdentifiers, error) {
curEpoch := f.prm.NetworkState.CurrentEpoch() curEpoch := f.prm.NetworkState.CurrentEpoch()
ver := version.Current() ver := version.Current()
@ -100,7 +101,7 @@ func (f *formatter) Close() (*AccessIdentifiers, error) {
return nil, fmt.Errorf("could not write header to next target: %w", err) return nil, fmt.Errorf("could not write header to next target: %w", err)
} }
if _, err := f.prm.NextTarget.Close(); err != nil { if _, err := f.prm.NextTarget.Close(ctx); err != nil {
return nil, fmt.Errorf("could not close next target: %w", err) return nil, fmt.Errorf("could not close next target: %w", err)
} }

View file

@ -1,10 +1,10 @@
package transformer package transformer
import ( import (
"context"
"crypto/sha256" "crypto/sha256"
"fmt" "fmt"
"hash" "hash"
"io"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/checksum"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
@ -27,7 +27,7 @@ type payloadSizeLimiter struct {
previous []oid.ID previous []oid.ID
chunkWriter io.Writer chunkWriter writer
splitID *object.SplitID splitID *object.SplitID
@ -64,16 +64,16 @@ func (s *payloadSizeLimiter) WriteHeader(hdr *object.Object) error {
return nil return nil
} }
func (s *payloadSizeLimiter) Write(p []byte) (int, error) { func (s *payloadSizeLimiter) Write(ctx context.Context, p []byte) (int, error) {
if err := s.writeChunk(p); err != nil { if err := s.writeChunk(ctx, p); err != nil {
return 0, err return 0, err
} }
return len(p), nil return len(p), nil
} }
func (s *payloadSizeLimiter) Close() (*AccessIdentifiers, error) { func (s *payloadSizeLimiter) Close(ctx context.Context) (*AccessIdentifiers, error) {
return s.release(true) return s.release(ctx, true)
} }
func (s *payloadSizeLimiter) initialize() { func (s *payloadSizeLimiter) initialize() {
@ -117,19 +117,19 @@ func (s *payloadSizeLimiter) initializeCurrent() {
s.currentHashers = payloadHashersForObject(s.current, s.withoutHomomorphicHash) s.currentHashers = payloadHashersForObject(s.current, s.withoutHomomorphicHash)
// compose multi-writer from target and all payload hashers // compose multi-writer from target and all payload hashers
ws := make([]io.Writer, 0, 1+len(s.currentHashers)+len(s.parentHashers)) ws := make([]writer, 0, 1+len(s.currentHashers)+len(s.parentHashers))
ws = append(ws, s.target) ws = append(ws, s.target)
for i := range s.currentHashers { for i := range s.currentHashers {
ws = append(ws, s.currentHashers[i].hasher) ws = append(ws, newWriter(s.currentHashers[i].hasher))
} }
for i := range s.parentHashers { for i := range s.parentHashers {
ws = append(ws, s.parentHashers[i].hasher) ws = append(ws, newWriter(s.parentHashers[i].hasher))
} }
s.chunkWriter = io.MultiWriter(ws...) s.chunkWriter = newMultiWriter(ws...)
} }
func payloadHashersForObject(obj *object.Object, withoutHomomorphicHash bool) []*payloadChecksumHasher { func payloadHashersForObject(obj *object.Object, withoutHomomorphicHash bool) []*payloadChecksumHasher {
@ -174,7 +174,7 @@ func payloadHashersForObject(obj *object.Object, withoutHomomorphicHash bool) []
return hashers return hashers
} }
func (s *payloadSizeLimiter) release(finalize bool) (*AccessIdentifiers, error) { func (s *payloadSizeLimiter) release(ctx context.Context, finalize bool) (*AccessIdentifiers, error) {
// Arg finalize is true only when called from Close method. // Arg finalize is true only when called from Close method.
// We finalize parent and generate linking objects only if it is more // We finalize parent and generate linking objects only if it is more
// than 1 object in split-chain. // than 1 object in split-chain.
@ -194,7 +194,7 @@ func (s *payloadSizeLimiter) release(finalize bool) (*AccessIdentifiers, error)
return nil, fmt.Errorf("could not write header: %w", err) return nil, fmt.Errorf("could not write header: %w", err)
} }
ids, err := s.target.Close() ids, err := s.target.Close(ctx)
if err != nil { if err != nil {
return nil, fmt.Errorf("could not close target: %w", err) return nil, fmt.Errorf("could not close target: %w", err)
} }
@ -207,7 +207,7 @@ func (s *payloadSizeLimiter) release(finalize bool) (*AccessIdentifiers, error)
s.initializeLinking(ids.Parent()) s.initializeLinking(ids.Parent())
s.initializeCurrent() s.initializeCurrent()
if _, err := s.release(false); err != nil { if _, err := s.release(ctx, false); err != nil {
return nil, fmt.Errorf("could not release linking object: %w", err) return nil, fmt.Errorf("could not release linking object: %w", err)
} }
} }
@ -228,7 +228,7 @@ func (s *payloadSizeLimiter) initializeLinking(parHdr *object.Object) {
s.current.SetSplitID(s.splitID) s.current.SetSplitID(s.splitID)
} }
func (s *payloadSizeLimiter) writeChunk(chunk []byte) error { func (s *payloadSizeLimiter) writeChunk(ctx context.Context, chunk []byte) error {
// statement is true if the previous write of bytes reached exactly the boundary. // statement is true if the previous write of bytes reached exactly the boundary.
if s.written > 0 && s.written%s.maxSize == 0 { if s.written > 0 && s.written%s.maxSize == 0 {
if s.written == s.maxSize { if s.written == s.maxSize {
@ -236,7 +236,7 @@ func (s *payloadSizeLimiter) writeChunk(chunk []byte) error {
} }
// we need to release current object // we need to release current object
if _, err := s.release(false); err != nil { if _, err := s.release(ctx, false); err != nil {
return fmt.Errorf("could not release object: %w", err) return fmt.Errorf("could not release object: %w", err)
} }
@ -255,7 +255,7 @@ func (s *payloadSizeLimiter) writeChunk(chunk []byte) error {
cut = leftToEdge cut = leftToEdge
} }
if _, err := s.chunkWriter.Write(chunk[:cut]); err != nil { if _, err := s.chunkWriter.Write(ctx, chunk[:cut]); err != nil {
return fmt.Errorf("could not write chunk to target: %w", err) return fmt.Errorf("could not write chunk to target: %w", err)
} }
@ -264,7 +264,7 @@ func (s *payloadSizeLimiter) writeChunk(chunk []byte) error {
// if there are more bytes in buffer we call method again to start filling another object // if there are more bytes in buffer we call method again to start filling another object
if ln > leftToEdge { if ln > leftToEdge {
return s.writeChunk(chunk[cut:]) return s.writeChunk(ctx, chunk[cut:])
} }
return nil return nil

View file

@ -1,7 +1,7 @@
package transformer package transformer
import ( import (
"io" "context"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
@ -35,7 +35,7 @@ type ObjectTarget interface {
// Can be called multiple times. // Can be called multiple times.
// //
// Must not be called after Close call. // Must not be called after Close call.
io.Writer Write(ctx context.Context, p []byte) (n int, err error)
// Close is used to finish object writing. // Close is used to finish object writing.
// //
@ -45,7 +45,7 @@ type ObjectTarget interface {
// Must be called no more than once. Control remains with the caller. // Must be called no more than once. Control remains with the caller.
// Re-calling can lead to undefined behavior // Re-calling can lead to undefined behavior
// that depends on the implementation. // that depends on the implementation.
Close() (*AccessIdentifiers, error) Close(ctx context.Context) (*AccessIdentifiers, error)
} }
// TargetInitializer represents ObjectTarget constructor. // TargetInitializer represents ObjectTarget constructor.

View file

@ -0,0 +1,52 @@
package transformer
import (
"context"
"io"
)
type writer interface {
Write(ctx context.Context, p []byte) (n int, err error)
}
type multiWriter struct {
writers []writer
}
func (t *multiWriter) Write(ctx context.Context, p []byte) (n int, err error) {
for _, w := range t.writers {
n, err = w.Write(ctx, p)
if err != nil {
return
}
if n != len(p) {
err = io.ErrShortWrite
return
}
}
return len(p), nil
}
func newMultiWriter(writers ...writer) writer {
allWriters := make([]writer, 0, len(writers))
for _, w := range writers {
if mw, ok := w.(*multiWriter); ok {
allWriters = append(allWriters, mw.writers...)
} else {
allWriters = append(allWriters, w)
}
}
return &multiWriter{allWriters}
}
type writerWrapper struct {
Writer io.Writer
}
func (w *writerWrapper) Write(_ context.Context, p []byte) (n int, err error) {
return w.Writer.Write(p)
}
func newWriter(w io.Writer) writer {
return &writerWrapper{Writer: w}
}

View file

@ -1,6 +1,7 @@
package response package response
import ( import (
"context"
"fmt" "fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/util"
@ -17,8 +18,8 @@ type ClientMessageStreamer struct {
} }
// Send calls send method of internal streamer. // Send calls send method of internal streamer.
func (s *ClientMessageStreamer) Send(req any) error { func (s *ClientMessageStreamer) Send(ctx context.Context, req any) error {
if err := s.send(req); err != nil { if err := s.send(ctx, req); err != nil {
return fmt.Errorf("(%T) could not send the request: %w", s, err) return fmt.Errorf("(%T) could not send the request: %w", s, err)
} }
return nil return nil
@ -26,8 +27,8 @@ func (s *ClientMessageStreamer) Send(req any) error {
// CloseAndRecv closes internal stream, receivers the response, // CloseAndRecv closes internal stream, receivers the response,
// sets meta values and returns the result. // sets meta values and returns the result.
func (s *ClientMessageStreamer) CloseAndRecv() (util.ResponseMessage, error) { func (s *ClientMessageStreamer) CloseAndRecv(ctx context.Context) (util.ResponseMessage, error) {
resp, err := s.close() resp, err := s.close(ctx)
if err != nil { if err != nil {
return nil, fmt.Errorf("(%T) could not close stream and receive response: %w", s, err) return nil, fmt.Errorf("(%T) could not close stream and receive response: %w", s, err)
} }

View file

@ -37,9 +37,9 @@ var ErrAbortStream = errors.New("abort message stream")
type ResponseConstructor func() ResponseMessage type ResponseConstructor func() ResponseMessage
type RequestMessageWriter func(any) error type RequestMessageWriter func(context.Context, any) error
type ClientStreamCloser func() (ResponseMessage, error) type ClientStreamCloser func(context.Context) (ResponseMessage, error)
type RequestMessageStreamer struct { type RequestMessageStreamer struct {
key *ecdsa.PrivateKey key *ecdsa.PrivateKey
@ -61,7 +61,7 @@ func NewUnarySignService(key *ecdsa.PrivateKey) *SignService {
} }
} }
func (s *RequestMessageStreamer) Send(req any) error { func (s *RequestMessageStreamer) Send(ctx context.Context, req any) error {
// req argument should be strengthen with type RequestMessage // req argument should be strengthen with type RequestMessage
s.statusSupported = isStatusSupported(req.(RequestMessage)) // panic is OK here for now s.statusSupported = isStatusSupported(req.(RequestMessage)) // panic is OK here for now
@ -71,7 +71,7 @@ func (s *RequestMessageStreamer) Send(req any) error {
if err = signature.VerifyServiceMessage(req); err != nil { if err = signature.VerifyServiceMessage(req); err != nil {
err = fmt.Errorf("could not verify request: %w", err) err = fmt.Errorf("could not verify request: %w", err)
} else { } else {
err = s.send(req) err = s.send(ctx, req)
} }
if err != nil { if err != nil {
@ -87,7 +87,7 @@ func (s *RequestMessageStreamer) Send(req any) error {
return nil return nil
} }
func (s *RequestMessageStreamer) CloseAndRecv() (ResponseMessage, error) { func (s *RequestMessageStreamer) CloseAndRecv(ctx context.Context) (ResponseMessage, error) {
var ( var (
resp ResponseMessage resp ResponseMessage
err error err error
@ -96,7 +96,7 @@ func (s *RequestMessageStreamer) CloseAndRecv() (ResponseMessage, error) {
if s.sendErr != nil { if s.sendErr != nil {
err = s.sendErr err = s.sendErr
} else { } else {
resp, err = s.close() resp, err = s.close(ctx)
if err != nil { if err != nil {
err = fmt.Errorf("could not close stream and receive response: %w", err) err = fmt.Errorf("could not close stream and receive response: %w", err)
} }