package getsvc import ( "context" "crypto/ecdsa" "errors" "io" coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine" internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client" apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" ) type SimpleObjectWriter struct { obj *object.Object pld []byte } type clientCacheWrapper struct { cache ClientConstructor } type clientWrapper struct { client coreclient.MultiAddressClient } type storageEngineWrapper struct { engine *engine.StorageEngine } type partWriter struct { ObjectWriter headWriter HeaderWriter chunkWriter ChunkWriter } type hasherWrapper struct { hash io.Writer } type nmSrcWrapper struct { nmSrc netmap.Source } func NewSimpleObjectWriter() *SimpleObjectWriter { return &SimpleObjectWriter{ obj: object.New(), } } func (s *SimpleObjectWriter) WriteHeader(_ context.Context, obj *object.Object) error { s.obj = obj s.pld = make([]byte, 0, obj.PayloadSize()) return nil } func (s *SimpleObjectWriter) WriteChunk(_ context.Context, p []byte) error { s.pld = append(s.pld, p...) return nil } func (s *SimpleObjectWriter) Object() *object.Object { if len(s.pld) > 0 { s.obj.SetPayload(s.pld) } return s.obj } func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) { clt, err := c.cache.Get(info) if err != nil { return nil, err } return &clientWrapper{ client: clt, }, nil } func (c *clientWrapper) getObject(ctx context.Context, exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) { if exec.isForwardingEnabled() { return exec.prm.forwarder(ctx, info, c.client) } key, err := exec.key() if err != nil { return nil, err } if exec.headOnly() { return c.getHeadOnly(ctx, exec, key) } // we don't specify payload writer because we accumulate // the object locally (even huge). if rng := exec.ctxRange(); rng != nil { // Current spec allows other storage node to deny access, // fallback to GET here. return c.getRange(ctx, exec, key, rng) } return c.get(ctx, exec, key) } func (c *clientWrapper) getRange(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey, rng *object.Range) (*object.Object, error) { var prm internalclient.PayloadRangePrm prm.SetClient(c.client) prm.SetTTL(exec.prm.common.TTL()) prm.SetNetmapEpoch(exec.curProcEpoch) prm.SetAddress(exec.address()) prm.SetPrivateKey(key) prm.SetSessionToken(exec.prm.common.SessionToken()) prm.SetBearerToken(exec.prm.common.BearerToken()) prm.SetXHeaders(exec.prm.common.XHeaders()) prm.SetRange(rng) if exec.isRaw() { prm.SetRawFlag() } res, err := internalclient.PayloadRange(ctx, prm) if err != nil { var errAccessDenied *apistatus.ObjectAccessDenied if errors.As(err, &errAccessDenied) { obj, err := c.get(ctx, exec, key) if err != nil { return nil, err } payload := obj.Payload() from := rng.GetOffset() to := from + rng.GetLength() if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to { return nil, new(apistatus.ObjectOutOfRange) } return payloadOnlyObject(payload[from:to]), nil } return nil, err } return payloadOnlyObject(res.PayloadRange()), nil } func (c *clientWrapper) getHeadOnly(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) { var prm internalclient.HeadObjectPrm prm.SetClient(c.client) prm.SetTTL(exec.prm.common.TTL()) prm.SetNetmapEpoch(exec.curProcEpoch) prm.SetAddress(exec.address()) prm.SetPrivateKey(key) prm.SetSessionToken(exec.prm.common.SessionToken()) prm.SetBearerToken(exec.prm.common.BearerToken()) prm.SetXHeaders(exec.prm.common.XHeaders()) if exec.isRaw() { prm.SetRawFlag() } res, err := internalclient.HeadObject(ctx, prm) if err != nil { return nil, err } return res.Header(), nil } func (c *clientWrapper) get(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) { var prm internalclient.GetObjectPrm prm.SetClient(c.client) prm.SetTTL(exec.prm.common.TTL()) prm.SetNetmapEpoch(exec.curProcEpoch) prm.SetAddress(exec.address()) prm.SetPrivateKey(key) prm.SetSessionToken(exec.prm.common.SessionToken()) prm.SetBearerToken(exec.prm.common.BearerToken()) prm.SetXHeaders(exec.prm.common.XHeaders()) if exec.isRaw() { prm.SetRawFlag() } res, err := internalclient.GetObject(ctx, prm) if err != nil { return nil, err } return res.Object(), nil } func (e *storageEngineWrapper) get(ctx context.Context, exec *execCtx) (*object.Object, error) { if exec.headOnly() { var headPrm engine.HeadPrm headPrm.WithAddress(exec.address()) headPrm.WithRaw(exec.isRaw()) r, err := e.engine.Head(ctx, headPrm) if err != nil { return nil, err } return r.Header(), nil } else if rng := exec.ctxRange(); rng != nil { var getRange engine.RngPrm getRange.WithAddress(exec.address()) getRange.WithPayloadRange(rng) r, err := e.engine.GetRange(ctx, getRange) if err != nil { return nil, err } return r.Object(), nil } else { var getPrm engine.GetPrm getPrm.WithAddress(exec.address()) r, err := e.engine.Get(ctx, getPrm) if err != nil { return nil, err } return r.Object(), nil } } func (w *partWriter) WriteChunk(ctx context.Context, p []byte) error { return w.chunkWriter.WriteChunk(ctx, p) } func (w *partWriter) WriteHeader(ctx context.Context, o *object.Object) error { return w.headWriter.WriteHeader(ctx, o) } func payloadOnlyObject(payload []byte) *object.Object { obj := object.New() obj.SetPayload(payload) return obj } func (h *hasherWrapper) WriteChunk(_ context.Context, p []byte) error { _, err := h.hash.Write(p) return err } func (n *nmSrcWrapper) currentEpoch() (uint64, error) { return n.nmSrc.Epoch() }