package getsvc import ( "crypto/ecdsa" "errors" "io" coreclient "github.com/TrueCloudLab/frostfs-node/pkg/core/client" "github.com/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" internal "github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" internalclient "github.com/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" apistatus "github.com/TrueCloudLab/frostfs-sdk-go/client/status" "github.com/TrueCloudLab/frostfs-sdk-go/object" ) type SimpleObjectWriter struct { obj *object.Object pld []byte } type clientCacheWrapper struct { cache ClientConstructor } type clientWrapper struct { client coreclient.MultiAddressClient } type storageEngineWrapper engine.StorageEngine type partWriter struct { ObjectWriter headWriter HeaderWriter chunkWriter ChunkWriter } type hasherWrapper struct { hash io.Writer } func NewSimpleObjectWriter() *SimpleObjectWriter { return &SimpleObjectWriter{ obj: object.New(), } } func (s *SimpleObjectWriter) WriteHeader(obj *object.Object) error { s.obj = obj s.pld = make([]byte, 0, obj.PayloadSize()) return nil } func (s *SimpleObjectWriter) WriteChunk(p []byte) error { s.pld = append(s.pld, p...) return nil } func (s *SimpleObjectWriter) Object() *object.Object { if len(s.pld) > 0 { s.obj.SetPayload(s.pld) } return s.obj } func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) { clt, err := c.cache.Get(info) if err != nil { return nil, err } return &clientWrapper{ client: clt, }, nil } func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) { if exec.prm.forwarder != nil { return exec.prm.forwarder(info, c.client) } key, err := exec.key() if err != nil { return nil, err } if exec.headOnly { var prm internalclient.HeadObjectPrm prm.SetContext(exec.ctx) prm.SetClient(c.client) prm.SetTTL(exec.prm.common.TTL()) prm.SetNetmapEpoch(exec.curProcEpoch) prm.SetAddress(exec.prm.addr) prm.SetPrivateKey(key) prm.SetSessionToken(exec.prm.common.SessionToken()) prm.SetBearerToken(exec.prm.common.BearerToken()) prm.SetXHeaders(exec.prm.common.XHeaders()) if exec.prm.raw { prm.SetRawFlag() } res, err := internalclient.HeadObject(prm) if err != nil { return nil, err } return res.Header(), nil } // we don't specify payload writer because we accumulate // the object locally (even huge). if rng := exec.prm.rng; rng != nil { var prm internalclient.PayloadRangePrm prm.SetContext(exec.ctx) prm.SetClient(c.client) prm.SetTTL(exec.prm.common.TTL()) prm.SetNetmapEpoch(exec.curProcEpoch) prm.SetAddress(exec.prm.addr) prm.SetPrivateKey(key) prm.SetSessionToken(exec.prm.common.SessionToken()) prm.SetBearerToken(exec.prm.common.BearerToken()) prm.SetXHeaders(exec.prm.common.XHeaders()) prm.SetRange(rng) if exec.prm.raw { prm.SetRawFlag() } res, err := internalclient.PayloadRange(prm) if err != nil { var errAccessDenied *apistatus.ObjectAccessDenied if errors.As(err, &errAccessDenied) { // Current spec allows other storage node to deny access, // fallback to GET here. obj, err := c.get(exec, key) if err != nil { return nil, err } payload := obj.Payload() from := rng.GetOffset() to := from + rng.GetLength() if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to { return nil, new(apistatus.ObjectOutOfRange) } return payloadOnlyObject(payload[from:to]), nil } return nil, err } return payloadOnlyObject(res.PayloadRange()), nil } return c.get(exec, key) } func (c *clientWrapper) get(exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) { var prm internalclient.GetObjectPrm prm.SetContext(exec.ctx) prm.SetClient(c.client) prm.SetTTL(exec.prm.common.TTL()) prm.SetNetmapEpoch(exec.curProcEpoch) prm.SetAddress(exec.prm.addr) prm.SetPrivateKey(key) prm.SetSessionToken(exec.prm.common.SessionToken()) prm.SetBearerToken(exec.prm.common.BearerToken()) prm.SetXHeaders(exec.prm.common.XHeaders()) if exec.prm.raw { prm.SetRawFlag() } res, err := internal.GetObject(prm) if err != nil { return nil, err } return res.Object(), nil } func (w *storageEngineWrapper) get(exec *execCtx) (*object.Object, error) { e := (*engine.StorageEngine)(w) if exec.headOnly { var headPrm engine.HeadPrm headPrm.WithAddress(exec.prm.addr) headPrm.WithRaw(exec.prm.raw) r, err := e.Head(headPrm) if err != nil { return nil, err } return r.Header(), nil } else if rng := exec.prm.rng; rng != nil { var getRange engine.RngPrm getRange.WithAddress(exec.prm.addr) getRange.WithPayloadRange(rng) r, err := e.GetRange(getRange) if err != nil { return nil, err } return r.Object(), nil } else { var getPrm engine.GetPrm getPrm.WithAddress(exec.prm.addr) r, err := e.Get(getPrm) if err != nil { return nil, err } return r.Object(), nil } } func (w *partWriter) WriteChunk(p []byte) error { return w.chunkWriter.WriteChunk(p) } func (w *partWriter) WriteHeader(o *object.Object) error { return w.headWriter.WriteHeader(o) } func payloadOnlyObject(payload []byte) *object.Object { obj := object.New() obj.SetPayload(payload) return obj } func (h *hasherWrapper) WriteChunk(p []byte) error { _, err := h.hash.Write(p) return err }