Refactor get service #193

Merged
fyrchik merged 7 commits from dstepanov-yadro/frostfs-node:refactoring/OBJECT-3610_getsvc into master 2023-04-05 14:38:49 +00:00
Showing only changes of commit e7e63f27e8 - Show all commits

View file

@ -87,7 +87,6 @@ func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) {
}, nil
}
// nolint: funlen
func (c *clientWrapper) getObject(ctx context.Context, exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
if exec.isForwardingEnabled() {
return exec.prm.forwarder(info, c.client)
@ -99,6 +98,63 @@ func (c *clientWrapper) getObject(ctx context.Context, exec *execCtx, info corec
}
if exec.headOnly() {
return c.getHeadOnly(ctx, exec, key)
}
// we don't specify payload writer because we accumulate
// the object locally (even huge).
if rng := exec.ctxRange(); rng != nil {
// Current spec allows other storage node to deny access,
// fallback to GET here.
return c.getRange(ctx, exec, key, rng)
}
return c.get(ctx, exec, key)
}
func (c *clientWrapper) getRange(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey, rng *object.Range) (*object.Object, error) {
var prm internalclient.PayloadRangePrm
prm.SetContext(ctx)
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
prm.SetRange(rng)
if exec.isRaw() {
prm.SetRawFlag()
}
res, err := internalclient.PayloadRange(prm)
if err != nil {
var errAccessDenied *apistatus.ObjectAccessDenied
if errors.As(err, &errAccessDenied) {
obj, err := c.get(ctx, exec, key)
if err != nil {
return nil, err
}
payload := obj.Payload()
from := rng.GetOffset()
to := from + rng.GetLength()
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
return nil, new(apistatus.ObjectOutOfRange)
}
return payloadOnlyObject(payload[from:to]), nil
}
return nil, err
}
return payloadOnlyObject(res.PayloadRange()), nil
}
func (c *clientWrapper) getHeadOnly(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
var prm internalclient.HeadObjectPrm
prm.SetContext(ctx)
@ -122,55 +178,6 @@ func (c *clientWrapper) getObject(ctx context.Context, exec *execCtx, info corec
return res.Header(), nil
}
// we don't specify payload writer because we accumulate
// the object locally (even huge).
if rng := exec.ctxRange(); rng != nil {
var prm internalclient.PayloadRangePrm
prm.SetContext(ctx)
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
prm.SetRange(rng)
if exec.isRaw() {
prm.SetRawFlag()
}
res, err := internalclient.PayloadRange(prm)
if err != nil {
var errAccessDenied *apistatus.ObjectAccessDenied
if errors.As(err, &errAccessDenied) {
// Current spec allows other storage node to deny access,
// fallback to GET here.
obj, err := c.get(ctx, exec, key)
if err != nil {
return nil, err
}
payload := obj.Payload()
from := rng.GetOffset()
to := from + rng.GetLength()
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
return nil, new(apistatus.ObjectOutOfRange)
}
return payloadOnlyObject(payload[from:to]), nil
}
return nil, err
}
return payloadOnlyObject(res.PayloadRange()), nil
}
return c.get(ctx, exec, key)
}
func (c *clientWrapper) get(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
var prm internalclient.GetObjectPrm