211 lines
5.3 KiB
Go
211 lines
5.3 KiB
Go
package putsvc
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/pkg/tracing"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc"
|
|
rawclient "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/rpc/client"
|
|
sessionV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/session"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/signature"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/network"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal"
|
|
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
|
putsvc "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/put"
|
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
)
|
|
|
|
type streamer struct {
|
|
stream *putsvc.Streamer
|
|
keyStorage *util.KeyStorage
|
|
saveChunks bool
|
|
init *object.PutRequest
|
|
chunks []*object.PutRequest
|
|
|
|
*sizes // only for relay streams
|
|
}
|
|
|
|
type sizes struct {
|
|
payloadSz uint64 // value from the header
|
|
|
|
writtenPayload uint64 // sum size of already cached chunks
|
|
}
|
|
|
|
func (s *streamer) Send(ctx context.Context, req *object.PutRequest) (err error) {
|
|
ctx, span := tracing.StartSpanFromContext(ctx, "putv2.streamer.Send")
|
|
defer span.End()
|
|
|
|
switch v := req.GetBody().GetObjectPart().(type) {
|
|
case *object.PutObjectPartInit:
|
|
var initPrm *putsvc.PutInitPrm
|
|
|
|
initPrm, err = s.toInitPrm(v, req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = s.stream.Init(ctx, initPrm); err != nil {
|
|
err = fmt.Errorf("(%T) could not init object put stream: %w", s, err)
|
|
}
|
|
|
|
s.saveChunks = v.GetSignature() != nil
|
|
if s.saveChunks {
|
|
maxSz := s.stream.MaxObjectSize()
|
|
|
|
s.sizes = &sizes{
|
|
payloadSz: uint64(v.GetHeader().GetPayloadLength()),
|
|
}
|
|
|
|
// check payload size limit overflow
|
|
if s.payloadSz > maxSz {
|
|
return putsvc.ErrExceedingMaxSize
|
|
}
|
|
|
|
s.init = req
|
|
}
|
|
case *object.PutObjectPartChunk:
|
|
if s.saveChunks {
|
|
s.writtenPayload += uint64(len(v.GetChunk()))
|
|
|
|
// check payload size overflow
|
|
if s.writtenPayload > s.payloadSz {
|
|
return putsvc.ErrWrongPayloadSize
|
|
}
|
|
}
|
|
|
|
if err = s.stream.SendChunk(ctx, toChunkPrm(v)); err != nil {
|
|
err = fmt.Errorf("(%T) could not send payload chunk: %w", s, err)
|
|
}
|
|
|
|
if s.saveChunks {
|
|
s.chunks = append(s.chunks, req)
|
|
}
|
|
default:
|
|
err = fmt.Errorf("(%T) invalid object put stream part type %T", s, v)
|
|
}
|
|
|
|
if err != nil || !s.saveChunks {
|
|
return
|
|
}
|
|
|
|
metaHdr := new(sessionV2.RequestMetaHeader)
|
|
meta := req.GetMetaHeader()
|
|
|
|
metaHdr.SetTTL(meta.GetTTL() - 1)
|
|
metaHdr.SetOrigin(meta)
|
|
req.SetMetaHeader(metaHdr)
|
|
|
|
// session token should not be used there
|
|
// otherwise remote nodes won't be able to
|
|
// process received part of split object
|
|
key, err := s.keyStorage.GetKey(nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return signature.SignServiceMessage(key, req)
|
|
}
|
|
|
|
func (s *streamer) CloseAndRecv(ctx context.Context) (*object.PutResponse, error) {
|
|
ctx, span := tracing.StartSpanFromContext(ctx, "putv2.streamer.CloseAndRecv")
|
|
defer span.End()
|
|
|
|
if s.saveChunks {
|
|
// check payload size correctness
|
|
if s.writtenPayload != s.payloadSz {
|
|
return nil, putsvc.ErrWrongPayloadSize
|
|
}
|
|
}
|
|
|
|
resp, err := s.stream.Close(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("(%T) could not object put stream: %w", s, err)
|
|
}
|
|
|
|
return fromPutResponse(resp), nil
|
|
}
|
|
|
|
func (s *streamer) relayRequest(ctx context.Context, info client.NodeInfo, c client.MultiAddressClient) error {
|
|
ctx, span := tracing.StartSpanFromContext(ctx, "putv2.streamer.relayRequest")
|
|
defer span.End()
|
|
|
|
// open stream
|
|
resp := new(object.PutResponse)
|
|
|
|
key := info.PublicKey()
|
|
|
|
var firstErr error
|
|
|
|
info.AddressGroup().IterateAddresses(func(addr network.Address) (stop bool) {
|
|
ctx, span := tracing.StartSpanFromContext(ctx, "putv2.streamer.iterateAddress",
|
|
trace.WithAttributes(
|
|
attribute.String("address", addr.String()),
|
|
))
|
|
defer span.End()
|
|
|
|
var err error
|
|
|
|
defer func() {
|
|
stop = err == nil
|
|
|
|
if stop || firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
|
|
// would be nice to log otherwise
|
|
}()
|
|
|
|
var stream *rpc.PutRequestWriter
|
|
|
|
err = c.RawForAddress(ctx, addr, func(cli *rawclient.Client) error {
|
|
stream, err = rpc.PutObject(cli, resp, rawclient.WithContext(ctx))
|
|
return err
|
|
})
|
|
if err != nil {
|
|
err = fmt.Errorf("stream opening failed: %w", err)
|
|
return
|
|
}
|
|
|
|
// send init part
|
|
err = stream.Write(s.init)
|
|
if err != nil {
|
|
internalclient.ReportError(c, err)
|
|
err = fmt.Errorf("sending the initial message to stream failed: %w", err)
|
|
return
|
|
}
|
|
|
|
for i := range s.chunks {
|
|
if err = stream.Write(s.chunks[i]); err != nil {
|
|
internalclient.ReportError(c, err)
|
|
err = fmt.Errorf("sending the chunk %d failed: %w", i, err)
|
|
return
|
|
}
|
|
}
|
|
|
|
// close object stream and receive response from remote node
|
|
err = stream.Close()
|
|
if err != nil {
|
|
err = fmt.Errorf("closing the stream failed: %w", err)
|
|
return
|
|
}
|
|
|
|
// verify response key
|
|
if err = internal.VerifyResponseKeyV2(key, resp); err != nil {
|
|
return
|
|
}
|
|
|
|
// verify response structure
|
|
err = signature.VerifyServiceMessage(resp)
|
|
if err != nil {
|
|
err = fmt.Errorf("response verification failed: %w", err)
|
|
}
|
|
|
|
return
|
|
})
|
|
|
|
return firstErr
|
|
}
|