forked from TrueCloudLab/frostfs-node
c30aa20b04
`CommonPrm` structure has private key for remote operations. It obtained in the beginning of request processing. However, not every operation triggers remote calls. Therefore, key might not be used. It is important to avoid early key fetching because `TokenStore` now returns error if session token does not exist. This is valid case when container nodes receive request with session token (for ACL pass) and they should process request locally. Signed-off-by: Alex Vanin <alexey@nspcc.ru>
182 lines
3.6 KiB
Go
182 lines
3.6 KiB
Go
package getsvc
|
|
|
|
import (
|
|
"io"
|
|
|
|
"github.com/nspcc-dev/neofs-api-go/pkg/client"
|
|
objectSDK "github.com/nspcc-dev/neofs-api-go/pkg/object"
|
|
coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client"
|
|
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
|
|
"github.com/nspcc-dev/neofs-node/pkg/core/object"
|
|
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
|
|
)
|
|
|
|
type SimpleObjectWriter struct {
|
|
obj *object.RawObject
|
|
|
|
pld []byte
|
|
}
|
|
|
|
type clientCacheWrapper struct {
|
|
cache ClientConstructor
|
|
}
|
|
|
|
type clientWrapper struct {
|
|
client coreclient.Client
|
|
}
|
|
|
|
type storageEngineWrapper struct {
|
|
engine *engine.StorageEngine
|
|
}
|
|
|
|
type partWriter struct {
|
|
ObjectWriter
|
|
|
|
headWriter HeaderWriter
|
|
|
|
chunkWriter ChunkWriter
|
|
}
|
|
|
|
type hasherWrapper struct {
|
|
hash io.Writer
|
|
}
|
|
|
|
type nmSrcWrapper struct {
|
|
nmSrc netmap.Source
|
|
}
|
|
|
|
func NewSimpleObjectWriter() *SimpleObjectWriter {
|
|
return &SimpleObjectWriter{
|
|
obj: object.NewRaw(),
|
|
}
|
|
}
|
|
|
|
func (s *SimpleObjectWriter) WriteHeader(obj *object.Object) error {
|
|
s.obj = object.NewRawFromObject(obj)
|
|
|
|
s.pld = make([]byte, 0, obj.PayloadSize())
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *SimpleObjectWriter) WriteChunk(p []byte) error {
|
|
s.pld = append(s.pld, p...)
|
|
return nil
|
|
}
|
|
|
|
func (s *SimpleObjectWriter) Object() *object.Object {
|
|
if len(s.pld) > 0 {
|
|
s.obj.SetPayload(s.pld)
|
|
}
|
|
|
|
return s.obj.Object()
|
|
}
|
|
|
|
func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) {
|
|
clt, err := c.cache.Get(info)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &clientWrapper{
|
|
client: clt,
|
|
}, nil
|
|
}
|
|
|
|
func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*objectSDK.Object, error) {
|
|
if exec.isForwardingEnabled() {
|
|
return exec.prm.forwarder(info, c.client)
|
|
}
|
|
|
|
opts, err := exec.callOptions()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if exec.headOnly() {
|
|
return c.client.GetObjectHeader(exec.context(),
|
|
new(client.ObjectHeaderParams).
|
|
WithAddress(exec.address()).
|
|
WithRawFlag(exec.isRaw()),
|
|
opts...,
|
|
)
|
|
}
|
|
// we don't specify payload writer because we accumulate
|
|
// the object locally (even huge).
|
|
if rng := exec.ctxRange(); rng != nil {
|
|
data, err := c.client.ObjectPayloadRangeData(exec.context(),
|
|
new(client.RangeDataParams).
|
|
WithAddress(exec.address()).
|
|
WithRange(rng).
|
|
WithRaw(exec.isRaw()),
|
|
opts...,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return payloadOnlyObject(data), nil
|
|
}
|
|
|
|
return c.client.GetObject(exec.context(),
|
|
exec.remotePrm(),
|
|
opts...,
|
|
)
|
|
}
|
|
|
|
func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
|
|
if exec.headOnly() {
|
|
r, err := e.engine.Head(new(engine.HeadPrm).
|
|
WithAddress(exec.address()).
|
|
WithRaw(exec.isRaw()),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return r.Header(), nil
|
|
} else if rng := exec.ctxRange(); rng != nil {
|
|
r, err := e.engine.GetRange(new(engine.RngPrm).
|
|
WithAddress(exec.address()).
|
|
WithPayloadRange(rng),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return r.Object(), nil
|
|
} else {
|
|
r, err := e.engine.Get(new(engine.GetPrm).
|
|
WithAddress(exec.address()),
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return r.Object(), nil
|
|
}
|
|
}
|
|
|
|
func (w *partWriter) WriteChunk(p []byte) error {
|
|
return w.chunkWriter.WriteChunk(p)
|
|
}
|
|
|
|
func (w *partWriter) WriteHeader(o *object.Object) error {
|
|
return w.headWriter.WriteHeader(o)
|
|
}
|
|
|
|
func payloadOnlyObject(payload []byte) *objectSDK.Object {
|
|
rawObj := object.NewRaw()
|
|
rawObj.SetPayload(payload)
|
|
|
|
return rawObj.Object().SDK()
|
|
}
|
|
|
|
func (h *hasherWrapper) WriteChunk(p []byte) error {
|
|
_, err := h.hash.Write(p)
|
|
return err
|
|
}
|
|
|
|
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
|
|
return n.nmSrc.Epoch()
|
|
}
|