frostfs-node/pkg/services/object/get/util.go

266 lines
6 KiB
Go
Raw Normal View History

package getsvc
import (
"context"
"crypto/ecdsa"
"errors"
"io"
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
internal "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
type SimpleObjectWriter struct {
obj *object.Object
pld []byte
}
type clientCacheWrapper struct {
cache ClientConstructor
}
type clientWrapper struct {
client coreclient.MultiAddressClient
}
type storageEngineWrapper struct {
engine *engine.StorageEngine
}
type partWriter struct {
ObjectWriter
headWriter HeaderWriter
chunkWriter ChunkWriter
}
type hasherWrapper struct {
hash io.Writer
}
type nmSrcWrapper struct {
nmSrc netmap.Source
}
func NewSimpleObjectWriter() *SimpleObjectWriter {
return &SimpleObjectWriter{
obj: object.New(),
}
}
func (s *SimpleObjectWriter) WriteHeader(_ context.Context, obj *object.Object) error {
s.obj = obj
s.pld = make([]byte, 0, obj.PayloadSize())
return nil
}
func (s *SimpleObjectWriter) WriteChunk(_ context.Context, p []byte) error {
s.pld = append(s.pld, p...)
return nil
}
func (s *SimpleObjectWriter) Object() *object.Object {
if len(s.pld) > 0 {
s.obj.SetPayload(s.pld)
}
return s.obj
}
func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) {
clt, err := c.cache.Get(info)
if err != nil {
return nil, err
}
return &clientWrapper{
client: clt,
}, nil
}
func (c *clientWrapper) getObject(ctx context.Context, exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
if exec.isForwardingEnabled() {
return exec.prm.forwarder(ctx, info, c.client)
}
key, err := exec.key()
if err != nil {
return nil, err
}
if exec.headOnly() {
return c.getHeadOnly(ctx, exec, key)
}
// we don't specify payload writer because we accumulate
// the object locally (even huge).
if rng := exec.ctxRange(); rng != nil {
// Current spec allows other storage node to deny access,
// fallback to GET here.
return c.getRange(ctx, exec, key, rng)
}
return c.get(ctx, exec, key)
}
func (c *clientWrapper) getRange(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey, rng *object.Range) (*object.Object, error) {
var prm internalclient.PayloadRangePrm
prm.SetContext(ctx)
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
prm.SetRange(rng)
if exec.isRaw() {
prm.SetRawFlag()
}
res, err := internalclient.PayloadRange(prm)
if err != nil {
var errAccessDenied *apistatus.ObjectAccessDenied
if errors.As(err, &errAccessDenied) {
obj, err := c.get(ctx, exec, key)
if err != nil {
return nil, err
}
payload := obj.Payload()
from := rng.GetOffset()
to := from + rng.GetLength()
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
return nil, new(apistatus.ObjectOutOfRange)
}
return payloadOnlyObject(payload[from:to]), nil
}
return nil, err
}
return payloadOnlyObject(res.PayloadRange()), nil
}
func (c *clientWrapper) getHeadOnly(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
var prm internalclient.HeadObjectPrm
prm.SetContext(ctx)
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
if exec.isRaw() {
prm.SetRawFlag()
}
res, err := internalclient.HeadObject(prm)
if err != nil {
return nil, err
}
return res.Header(), nil
}
func (c *clientWrapper) get(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
var prm internalclient.GetObjectPrm
prm.SetContext(ctx)
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
if exec.isRaw() {
prm.SetRawFlag()
}
res, err := internal.GetObject(prm)
if err != nil {
return nil, err
}
return res.Object(), nil
}
func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) {
if exec.headOnly() {
var headPrm engine.HeadPrm
headPrm.WithAddress(exec.address())
headPrm.WithRaw(exec.isRaw())
r, err := e.engine.Head(headPrm)
if err != nil {
return nil, err
}
return r.Header(), nil
} else if rng := exec.ctxRange(); rng != nil {
var getRange engine.RngPrm
getRange.WithAddress(exec.address())
getRange.WithPayloadRange(rng)
r, err := e.engine.GetRange(getRange)
if err != nil {
return nil, err
}
return r.Object(), nil
} else {
var getPrm engine.GetPrm
getPrm.WithAddress(exec.address())
r, err := e.engine.Get(getPrm)
if err != nil {
return nil, err
}
return r.Object(), nil
}
}
func (w *partWriter) WriteChunk(ctx context.Context, p []byte) error {
return w.chunkWriter.WriteChunk(ctx, p)
}
func (w *partWriter) WriteHeader(ctx context.Context, o *object.Object) error {
return w.headWriter.WriteHeader(ctx, o)
}
func payloadOnlyObject(payload []byte) *object.Object {
obj := object.New()
obj.SetPayload(payload)
return obj
}
func (h *hasherWrapper) WriteChunk(_ context.Context, p []byte) error {
_, err := h.hash.Write(p)
return err
}
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
return n.nmSrc.Epoch()
}