frostfs-node/pkg/services/object/put/v2/streamer.go
Leonard Lyubich 096acb2a44 [#580] v2/object/put: Check payload size of the relayed objects
NeoFS network dictates the limitation of the maximum size of the "finished"
objects. The payload size of an object is recorded in its header. When
writing finished objects, the nodes must check:

  * satisfying the constraint;
  * matching the value in the header to the number of payload bytes.

Provide value returned by `MaxSizeSource` component to `validatingTarget`.
Check max size value during the stream of the "finished" objects. Check
header value during the streaming and on-close. Check payload size in v2
relay scenario.

Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
2021-06-09 12:08:37 +03:00

159 lines
3.8 KiB
Go

package putsvc
import (
"errors"
"fmt"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/session"
"github.com/nspcc-dev/neofs-api-go/v2/object"
"github.com/nspcc-dev/neofs-api-go/v2/rpc"
sessionV2 "github.com/nspcc-dev/neofs-api-go/v2/session"
"github.com/nspcc-dev/neofs-api-go/v2/signature"
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
)
type streamer struct {
stream *putsvc.Streamer
keyStorage *util.KeyStorage
saveChunks bool
init *object.PutRequest
chunks []*object.PutRequest
*sizes // only for relay streams
}
type sizes struct {
payloadSz uint64 // value from the header
writtenPayload uint64 // sum size of already cached chunks
}
// TODO: errors are copy-pasted from putsvc package
// consider replacing to core library
// errors related to invalid payload size
var (
errExceedingMaxSize = errors.New("payload size is greater than the limit")
errWrongPayloadSize = errors.New("wrong payload size")
)
func (s *streamer) Send(req *object.PutRequest) (err error) {
switch v := req.GetBody().GetObjectPart().(type) {
case *object.PutObjectPartInit:
var initPrm *putsvc.PutInitPrm
initPrm, err = s.toInitPrm(v, req)
if err != nil {
return err
}
if err = s.stream.Init(initPrm); err != nil {
err = fmt.Errorf("(%T) could not init object put stream: %w", s, err)
}
s.saveChunks = v.GetSignature() != nil
if s.saveChunks {
maxSz := s.stream.MaxObjectSize()
s.sizes = &sizes{
payloadSz: uint64(v.GetHeader().GetPayloadLength()),
}
// check payload size limit overflow
if s.payloadSz > maxSz {
return errExceedingMaxSize
}
s.init = req
}
case *object.PutObjectPartChunk:
if s.saveChunks {
s.writtenPayload += uint64(len(v.GetChunk()))
// check payload size overflow
if s.writtenPayload > s.payloadSz {
return errWrongPayloadSize
}
}
if err = s.stream.SendChunk(toChunkPrm(v)); err != nil {
err = fmt.Errorf("(%T) could not send payload chunk: %w", s, err)
}
if s.saveChunks {
s.chunks = append(s.chunks, req)
}
default:
err = fmt.Errorf("(%T) invalid object put stream part type %T", s, v)
}
if err != nil || !s.saveChunks {
return
}
metaHdr := new(sessionV2.RequestMetaHeader)
meta := req.GetMetaHeader()
st := session.NewTokenFromV2(meta.GetSessionToken())
metaHdr.SetTTL(meta.GetTTL() - 1)
metaHdr.SetOrigin(meta)
req.SetMetaHeader(metaHdr)
key, err := s.keyStorage.GetKey(st)
if err != nil {
return err
}
return signature.SignServiceMessage(key, req)
}
func (s *streamer) CloseAndRecv() (*object.PutResponse, error) {
if s.saveChunks {
// check payload size correctness
if s.writtenPayload != s.payloadSz {
return nil, errWrongPayloadSize
}
}
resp, err := s.stream.Close()
if err != nil {
return nil, fmt.Errorf("(%T) could not object put stream: %w", s, err)
}
return fromPutResponse(resp), nil
}
func (s *streamer) relayRequest(c client.Client) error {
// open stream
resp := new(object.PutResponse)
stream, err := rpc.PutObject(c.Raw(), resp)
if err != nil {
return fmt.Errorf("stream opening failed: %w", err)
}
// send init part
err = stream.Write(s.init)
if err != nil {
return fmt.Errorf("sending the initial message to stream failed: %w", err)
}
for i := range s.chunks {
if err := stream.Write(s.chunks[i]); err != nil {
return fmt.Errorf("sending the chunk %d failed: %w", i, err)
}
}
// close object stream and receive response from remote node
err = stream.Close()
if err != nil {
return fmt.Errorf("closing the stream failed: %w", err)
}
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return fmt.Errorf("response verification failed: %w", err)
}
return nil
}