forked from TrueCloudLab/frostfs-node
096acb2a44
NeoFS network dictates the limitation of the maximum size of the "finished" objects. The payload size of an object is recorded in its header. When writing finished objects, the nodes must check: * satisfying the constraint; * matching the value in the header to the number of payload bytes. Provide value returned by `MaxSizeSource` component to `validatingTarget`. Check max size value during the stream of the "finished" objects. Check header value during the streaming and on-close. Check payload size in v2 relay scenario. Signed-off-by: Leonard Lyubich <leonard@nspcc.ru>
159 lines
3.8 KiB
Go
159 lines
3.8 KiB
Go
package putsvc
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/session"
|
|
"github.com/nspcc-dev/neofs-api-go/v2/object"
|
|
"github.com/nspcc-dev/neofs-api-go/v2/rpc"
|
|
sessionV2 "github.com/nspcc-dev/neofs-api-go/v2/session"
|
|
"github.com/nspcc-dev/neofs-api-go/v2/signature"
|
|
putsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/put"
|
|
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
|
|
)
|
|
|
|
type streamer struct {
|
|
stream *putsvc.Streamer
|
|
keyStorage *util.KeyStorage
|
|
saveChunks bool
|
|
init *object.PutRequest
|
|
chunks []*object.PutRequest
|
|
|
|
*sizes // only for relay streams
|
|
}
|
|
|
|
type sizes struct {
|
|
payloadSz uint64 // value from the header
|
|
|
|
writtenPayload uint64 // sum size of already cached chunks
|
|
}
|
|
|
|
// TODO: errors are copy-pasted from putsvc package
|
|
// consider replacing to core library
|
|
|
|
// errors related to invalid payload size
|
|
var (
|
|
errExceedingMaxSize = errors.New("payload size is greater than the limit")
|
|
errWrongPayloadSize = errors.New("wrong payload size")
|
|
)
|
|
|
|
func (s *streamer) Send(req *object.PutRequest) (err error) {
|
|
switch v := req.GetBody().GetObjectPart().(type) {
|
|
case *object.PutObjectPartInit:
|
|
var initPrm *putsvc.PutInitPrm
|
|
|
|
initPrm, err = s.toInitPrm(v, req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if err = s.stream.Init(initPrm); err != nil {
|
|
err = fmt.Errorf("(%T) could not init object put stream: %w", s, err)
|
|
}
|
|
|
|
s.saveChunks = v.GetSignature() != nil
|
|
if s.saveChunks {
|
|
maxSz := s.stream.MaxObjectSize()
|
|
|
|
s.sizes = &sizes{
|
|
payloadSz: uint64(v.GetHeader().GetPayloadLength()),
|
|
}
|
|
|
|
// check payload size limit overflow
|
|
if s.payloadSz > maxSz {
|
|
return errExceedingMaxSize
|
|
}
|
|
|
|
s.init = req
|
|
}
|
|
case *object.PutObjectPartChunk:
|
|
if s.saveChunks {
|
|
s.writtenPayload += uint64(len(v.GetChunk()))
|
|
|
|
// check payload size overflow
|
|
if s.writtenPayload > s.payloadSz {
|
|
return errWrongPayloadSize
|
|
}
|
|
}
|
|
|
|
if err = s.stream.SendChunk(toChunkPrm(v)); err != nil {
|
|
err = fmt.Errorf("(%T) could not send payload chunk: %w", s, err)
|
|
}
|
|
|
|
if s.saveChunks {
|
|
s.chunks = append(s.chunks, req)
|
|
}
|
|
default:
|
|
err = fmt.Errorf("(%T) invalid object put stream part type %T", s, v)
|
|
}
|
|
|
|
if err != nil || !s.saveChunks {
|
|
return
|
|
}
|
|
|
|
metaHdr := new(sessionV2.RequestMetaHeader)
|
|
meta := req.GetMetaHeader()
|
|
st := session.NewTokenFromV2(meta.GetSessionToken())
|
|
|
|
metaHdr.SetTTL(meta.GetTTL() - 1)
|
|
metaHdr.SetOrigin(meta)
|
|
req.SetMetaHeader(metaHdr)
|
|
|
|
key, err := s.keyStorage.GetKey(st)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return signature.SignServiceMessage(key, req)
|
|
}
|
|
|
|
func (s *streamer) CloseAndRecv() (*object.PutResponse, error) {
|
|
if s.saveChunks {
|
|
// check payload size correctness
|
|
if s.writtenPayload != s.payloadSz {
|
|
return nil, errWrongPayloadSize
|
|
}
|
|
}
|
|
|
|
resp, err := s.stream.Close()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("(%T) could not object put stream: %w", s, err)
|
|
}
|
|
|
|
return fromPutResponse(resp), nil
|
|
}
|
|
|
|
func (s *streamer) relayRequest(c client.Client) error {
|
|
// open stream
|
|
resp := new(object.PutResponse)
|
|
|
|
stream, err := rpc.PutObject(c.Raw(), resp)
|
|
if err != nil {
|
|
return fmt.Errorf("stream opening failed: %w", err)
|
|
}
|
|
|
|
// send init part
|
|
err = stream.Write(s.init)
|
|
if err != nil {
|
|
return fmt.Errorf("sending the initial message to stream failed: %w", err)
|
|
}
|
|
|
|
for i := range s.chunks {
|
|
if err := stream.Write(s.chunks[i]); err != nil {
|
|
return fmt.Errorf("sending the chunk %d failed: %w", i, err)
|
|
}
|
|
}
|
|
|
|
// close object stream and receive response from remote node
|
|
err = stream.Close()
|
|
if err != nil {
|
|
return fmt.Errorf("closing the stream failed: %w", err)
|
|
}
|
|
|
|
// verify response structure
|
|
if err := signature.VerifyServiceMessage(resp); err != nil {
|
|
return fmt.Errorf("response verification failed: %w", err)
|
|
}
|
|
return nil
|
|
}
|