Refactor getsvc #277

Merged
fyrchik merged 7 commits from dstepanov-yadro/frostfs-node:object-3606 into master 2023-04-28 14:03:13 +00:00
23 changed files with 1046 additions and 939 deletions

View file

@ -336,17 +336,14 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
ls := c.cfgObject.cfgLocalStorage.localStorage
return getsvc.New(
getsvc.WithLogger(c.log),
getsvc.WithLocalStorageEngine(ls),
getsvc.WithClientConstructor(coreConstructor),
getsvc.WithTraverserGenerator(
keyStorage,
c.netMapSource,
ls,
traverseGen.WithTraverseOptions(
placement.SuccessAfter(1),
),
),
getsvc.WithNetMapSource(c.netMapSource),
getsvc.WithKeyStorage(keyStorage),
)
coreConstructor,
getsvc.WithLogger(c.log))
}
func createGetServiceV2(sGet *getsvc.Service, keyStorage *util.KeyStorage) *getsvcV2.Service {

View file

@ -11,9 +11,9 @@ import (
"go.uber.org/zap"
)
func (exec *execCtx) assemble(ctx context.Context) {
if !exec.canAssemble() {
exec.log.Debug(logs.GetCanNotAssembleTheObject)
func (r *request) assemble(ctx context.Context) {
if !r.canAssemble() {
r.log.Debug(logs.GetCanNotAssembleTheObject)
return
}
@ -28,35 +28,35 @@ func (exec *execCtx) assemble(ctx context.Context) {
// - the assembly process is expected to be handled on a container node
// only since the requests forwarding mechanism presentation; such the
// node should have enough rights for getting any child object by design.
exec.prm.common.ForgetTokens()
r.prm.common.ForgetTokens()
// Do not use forwarding during assembly stage.
// Request forwarding closure inherited in produced
// `execCtx` so it should be disabled there.
exec.disableForwarding()
r.disableForwarding()
exec.log.Debug(logs.GetTryingToAssembleTheObject)
r.log.Debug(logs.GetTryingToAssembleTheObject)
assembler := newAssembler(exec.address(), exec.splitInfo(), exec.ctxRange(), exec)
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r)
exec.log.Debug(logs.GetAssemblingSplittedObject,
zap.Stringer("address", exec.address()),
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
zap.Uint64("range_length", exec.ctxRange().GetLength()),
r.log.Debug(logs.GetAssemblingSplittedObject,
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
defer exec.log.Debug(logs.GetAssemblingSplittedObjectCompleted,
zap.Stringer("address", exec.address()),
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
zap.Uint64("range_length", exec.ctxRange().GetLength()),
defer r.log.Debug(logs.GetAssemblingSplittedObjectCompleted,
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
obj, err := assembler.Assemble(ctx, exec.prm.objWriter)
obj, err := assembler.Assemble(ctx, r.prm.objWriter)
if err != nil {
exec.log.Warn(logs.GetFailedToAssembleSplittedObject,
r.log.Warn(logs.GetFailedToAssembleSplittedObject,
zap.Error(err),
zap.Stringer("address", exec.address()),
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
zap.Uint64("range_length", exec.ctxRange().GetLength()),
zap.Stringer("address", r.address()),
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
zap.Uint64("range_length", r.ctxRange().GetLength()),
)
}
@ -68,27 +68,27 @@ func (exec *execCtx) assemble(ctx context.Context) {
switch {
default:
exec.status = statusUndefined
exec.err = err
r.status = statusUndefined
r.err = err
case err == nil:
exec.status = statusOK
exec.err = nil
exec.collectedObject = obj
r.status = statusOK
r.err = nil
r.collectedObject = obj
case errors.As(err, &errRemovedRemote):
exec.status = statusINHUMED
exec.err = errRemovedRemote
r.status = statusINHUMED
r.err = errRemovedRemote
case errors.As(err, &errRemovedLocal):
exec.status = statusINHUMED
exec.err = errRemovedLocal
r.status = statusINHUMED
r.err = errRemovedLocal
case errors.As(err, &errSplitInfo):
exec.status = statusVIRTUAL
exec.err = errSplitInfo
r.status = statusVIRTUAL
r.err = errSplitInfo
case errors.As(err, &errOutOfRangeRemote):
exec.status = statusOutOfRange
exec.err = errOutOfRangeRemote
r.status = statusOutOfRange
r.err = errOutOfRangeRemote
case errors.As(err, &errOutOfRangeLocal):
exec.status = statusOutOfRange
exec.err = errOutOfRangeLocal
r.status = statusOutOfRange
r.err = errOutOfRangeLocal
}
}
@ -96,42 +96,54 @@ func equalAddresses(a, b oid.Address) bool {
return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object())
}
func (exec *execCtx) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) {
p := exec.prm
p.common = p.common.WithLocalOnly(false)
p.addr.SetContainer(exec.containerID())
p.addr.SetObject(id)
prm := HeadPrm{
commonPrm: p.commonPrm,
}
func (r *request) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) {
w := NewSimpleObjectWriter()
prm.SetHeaderWriter(w)
err := exec.svc.Head(ctx, prm)
if err != nil {
p := RequestParameters{}
p.common = p.common.WithLocalOnly(false)
p.addr.SetContainer(r.containerID())
p.addr.SetObject(id)
p.head = true
p.SetHeaderWriter(w)
if err := r.getObjectWithIndependentRequest(ctx, p); err != nil {

what does it mean? 1m was not enough for me to understand "detached" here

what does it mean? 1m was not enough for me to understand "detached" here

renamed

renamed
return nil, err
}
return w.Object(), nil
}
func (exec *execCtx) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Range) (*objectSDK.Object, error) {
func (r *request) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Range) (*objectSDK.Object, error) {
w := NewSimpleObjectWriter()
p := exec.prm
p := r.prm
p.common = p.common.WithLocalOnly(false)
p.objWriter = w
p.SetRange(rng)
p.rng = rng
p.addr.SetContainer(exec.containerID())
p.addr.SetContainer(r.containerID())
p.addr.SetObject(id)
statusError := exec.svc.get(ctx, p.commonPrm, withPayloadRange(rng))
if statusError.err != nil {
return nil, statusError.err
if err := r.getObjectWithIndependentRequest(ctx, p); err != nil {
return nil, err
}
return w.Object(), nil
}
func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm RequestParameters) error {
detachedExecutor := &request{
keyStore: r.keyStore,
traverserGenerator: r.traverserGenerator,
remoteStorageConstructor: r.remoteStorageConstructor,
epochSource: r.epochSource,
localStorage: r.localStorage,
prm: prm,
infoSplit: objectSDK.NewSplitInfo(),
log: r.log,
}
detachedExecutor.execute(ctx)
return detachedExecutor.statusError.err
}

View file

@ -2,7 +2,6 @@ package getsvc
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
@ -15,10 +14,6 @@ type objectGetter interface {
HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error)
}
var (
errParentAddressDiffers = errors.New("parent address in child object differs")
)
type assembler struct {
addr oid.Address
splitInfo *objectSDK.SplitInfo
@ -89,7 +84,7 @@ func (a *assembler) initializeFromSourceObjectID(ctx context.Context, id oid.ID)
parentObject := sourceObject.Parent()
if parentObject == nil {
return nil, nil, errors.New("received child with empty parent")
return nil, nil, errChildWithEmptyParent
}
a.parentObject = parentObject

View file

@ -8,26 +8,26 @@ import (
"go.uber.org/zap"
)
func (exec *execCtx) executeOnContainer(ctx context.Context) {
if exec.isLocal() {
exec.log.Debug(logs.GetReturnResultDirectly)
func (r *request) executeOnContainer(ctx context.Context) {
if r.isLocal() {
r.log.Debug(logs.GetReturnResultDirectly)
return
}
lookupDepth := exec.netmapLookupDepth()
lookupDepth := r.netmapLookupDepth()
exec.log.Debug(logs.TryingToExecuteInContainer,
r.log.Debug(logs.TryingToExecuteInContainer,
zap.Uint64("netmap lookup depth", lookupDepth),
)
// initialize epoch number
ok := exec.initEpoch()
ok := r.initEpoch()
if !ok {
return
}
for {
if exec.processCurrentEpoch(ctx) {
if r.processCurrentEpoch(ctx) {
break
}
@ -39,16 +39,16 @@ func (exec *execCtx) executeOnContainer(ctx context.Context) {
lookupDepth--
// go to the previous epoch
exec.curProcEpoch--
r.curProcEpoch--
}
}
func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
exec.log.Debug(logs.ProcessEpoch,
zap.Uint64("number", exec.curProcEpoch),
func (r *request) processCurrentEpoch(ctx context.Context) bool {
r.log.Debug(logs.ProcessEpoch,
zap.Uint64("number", r.curProcEpoch),
)
traverser, ok := exec.generateTraverser(exec.address())
traverser, ok := r.generateTraverser(r.address())
if !ok {
return true
}
@ -56,12 +56,12 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
exec.status = statusUndefined
r.status = statusUndefined
for {
addrs := traverser.Next()
if len(addrs) == 0 {
exec.log.Debug(logs.NoMoreNodesAbortPlacementIteration)
r.log.Debug(logs.NoMoreNodesAbortPlacementIteration)
return false
}
@ -69,8 +69,8 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
for i := range addrs {
select {
case <-ctx.Done():
exec.log.Debug(logs.InterruptPlacementIterationByContext,
zap.String("error", ctx.Err().Error()),
r.log.Debug(logs.InterruptPlacementIterationByContext,
zap.Error(ctx.Err()),
)
return true
@ -84,8 +84,8 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
client.NodeInfoFromNetmapElement(&info, addrs[i])
if exec.processNode(ctx, info) {
exec.log.Debug(logs.GetCompletingTheOperation)
if r.processNode(ctx, info) {
r.log.Debug(logs.GetCompletingTheOperation)
return true
}
}

View file

@ -0,0 +1,10 @@
package getsvc
import "errors"
var (
errRangeZeroLength = errors.New("zero range length")
errRangeOverflow = errors.New("range overflow")
errChildWithEmptyParent = errors.New("received child with empty parent")
errParentAddressDiffers = errors.New("parent address in child object differs")
)

View file

@ -1,279 +0,0 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
type statusError struct {
status int
err error
}
type execCtx struct {
svc *Service
prm RangePrm
statusError
infoSplit *objectSDK.SplitInfo
log *logger.Logger
collectedObject *objectSDK.Object
head bool
curProcEpoch uint64
}
type execOption func(*execCtx)
const (
statusUndefined int = iota
statusOK
statusINHUMED
statusVIRTUAL
statusOutOfRange
)
func headOnly() execOption {
return func(c *execCtx) {
c.head = true
}
}
func withPayloadRange(r *objectSDK.Range) execOption {
return func(c *execCtx) {
c.prm.rng = r
}
}
func (exec *execCtx) setLogger(l *logger.Logger) {
req := "GET"
if exec.headOnly() {
req = "HEAD"
} else if exec.ctxRange() != nil {
req = "GET_RANGE"
}
exec.log = &logger.Logger{Logger: l.With(
zap.String("request", req),
zap.Stringer("address", exec.address()),
zap.Bool("raw", exec.isRaw()),
zap.Bool("local", exec.isLocal()),
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
)}
}
func (exec execCtx) isLocal() bool {
return exec.prm.common.LocalOnly()
}
func (exec execCtx) isRaw() bool {
return exec.prm.raw
}
func (exec execCtx) address() oid.Address {
return exec.prm.addr
}
func (exec execCtx) key() (*ecdsa.PrivateKey, error) {
if exec.prm.signerKey != nil {
// the key has already been requested and
// cached in the previous operations
return exec.prm.signerKey, nil
}
var sessionInfo *util.SessionInfo
if tok := exec.prm.common.SessionToken(); tok != nil {
sessionInfo = &util.SessionInfo{
ID: tok.ID(),
Owner: tok.Issuer(),
}
}
return exec.svc.keyStore.GetKey(sessionInfo)
}
func (exec *execCtx) canAssemble() bool {
return !exec.isRaw() && !exec.headOnly()
}
func (exec *execCtx) splitInfo() *objectSDK.SplitInfo {
return exec.infoSplit
}
func (exec *execCtx) containerID() cid.ID {
return exec.address().Container()
}
func (exec *execCtx) ctxRange() *objectSDK.Range {
return exec.prm.rng
}
func (exec *execCtx) headOnly() bool {
return exec.head
}
func (exec *execCtx) netmapEpoch() uint64 {
return exec.prm.common.NetmapEpoch()
}
func (exec *execCtx) netmapLookupDepth() uint64 {
return exec.prm.common.NetmapLookupDepth()
}
func (exec *execCtx) initEpoch() bool {
exec.curProcEpoch = exec.netmapEpoch()
if exec.curProcEpoch > 0 {
return true
}
e, err := exec.svc.currentEpochReceiver.currentEpoch()
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug(logs.CouldNotGetCurrentEpochNumber,
zap.String("error", err.Error()),
)
return false
case err == nil:
exec.curProcEpoch = e
return true
}
}
func (exec *execCtx) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
obj := addr.Object()
t, err := exec.svc.traverserGenerator.GenerateTraverser(addr.Container(), &obj, exec.curProcEpoch)
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug(logs.GetCouldNotGenerateContainerTraverser,
zap.String("error", err.Error()),
)
return nil, false
case err == nil:
return t, true
}
}
func (exec execCtx) remoteClient(info clientcore.NodeInfo) (getClient, bool) {
c, err := exec.svc.clientCache.get(info)
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug(logs.GetCouldNotConstructRemoteNodeClient)
case err == nil:
return c, true
}
return nil, false
}
func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
if last, ok := src.LastPart(); ok {
dst.SetLastPart(last)
}
if link, ok := src.Link(); ok {
dst.SetLink(link)
}
if splitID := src.SplitID(); splitID != nil {
dst.SetSplitID(splitID)
}
}
func (exec *execCtx) writeCollectedHeader(ctx context.Context) bool {
if exec.ctxRange() != nil {
return true
}
err := exec.prm.objWriter.WriteHeader(
ctx,
exec.collectedObject.CutPayload(),
)
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug(logs.GetCouldNotWriteHeader,
zap.String("error", err.Error()),
)
case err == nil:
exec.status = statusOK
exec.err = nil
}
return exec.status == statusOK
}
func (exec *execCtx) writeObjectPayload(ctx context.Context, obj *objectSDK.Object) bool {
if exec.headOnly() {
return true
}
err := exec.prm.objWriter.WriteChunk(ctx, obj.Payload())
switch {
default:
exec.status = statusUndefined
exec.err = err
exec.log.Debug(logs.GetCouldNotWritePayloadChunk,
zap.String("error", err.Error()),
)
case err == nil:
exec.status = statusOK
exec.err = nil
}
return err == nil
}
func (exec *execCtx) writeCollectedObject(ctx context.Context) {
if ok := exec.writeCollectedHeader(ctx); ok {
exec.writeObjectPayload(ctx, exec.collectedObject)
}
}
// isForwardingEnabled returns true if common execution
// parameters has request forwarding closure set.
func (exec execCtx) isForwardingEnabled() bool {
return exec.prm.forwarder != nil
}
// disableForwarding removes request forwarding closure from common
// parameters, so it won't be inherited in new execution contexts.
func (exec *execCtx) disableForwarding() {
exec.prm.SetRequestForwarder(nil)
}

View file

@ -11,18 +11,18 @@ import (
// Get serves a request to get an object by address, and returns Streamer instance.
func (s *Service) Get(ctx context.Context, prm Prm) error {
return s.get(ctx, prm.commonPrm).err
return s.get(ctx, RequestParameters{
commonPrm: prm.commonPrm,
})
}
// GetRange serves a request to get an object by address, and returns Streamer instance.
func (s *Service) GetRange(ctx context.Context, prm RangePrm) error {
return s.getRange(ctx, prm)
return s.get(ctx, RequestParameters{
commonPrm: prm.commonPrm,
rng: prm.rng,
})
}
func (s *Service) getRange(ctx context.Context, prm RangePrm, opts ...execOption) error {
return s.get(ctx, prm.commonPrm, append(opts, withPayloadRange(prm.rng))...).err
}
func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHashRes, error) {
hashes := make([][]byte, 0, len(prm.rngs))
@ -34,16 +34,15 @@ func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHas
// 1. Potential gains are insignificant when operating in the Internet given typical latencies and losses.
// 2. Parallel solution is more complex in terms of code.
// 3. TZ-hash is likely to be disabled in private installations.
rngPrm := RangePrm{
reqPrm := RequestParameters{
commonPrm: prm.commonPrm,
rng: &rng,
}
rngPrm.SetRange(&rng)
rngPrm.SetChunkWriter(&hasherWrapper{
reqPrm.SetChunkWriter(&hasherWrapper{
hash: util.NewSaltingWriter(h, prm.salt),
})
if err := s.getRange(ctx, rngPrm); err != nil {
if err := s.get(ctx, reqPrm); err != nil {
return nil, err
}
@ -60,30 +59,32 @@ func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHas
// Returns ErrNotFound if the header was not received for the call.
// Returns SplitInfoError if object is virtual and raw flag is set.
func (s *Service) Head(ctx context.Context, prm HeadPrm) error {
return s.get(ctx, prm.commonPrm, headOnly()).err
return s.get(ctx, RequestParameters{
head: true,
commonPrm: prm.commonPrm,
})
}
func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) statusError {
exec := &execCtx{
svc: s,
prm: RangePrm{
commonPrm: prm,
},
infoSplit: object.NewSplitInfo(),
}
func (s *Service) get(ctx context.Context, prm RequestParameters) error {
exec := &request{
keyStore: s.keyStore,
traverserGenerator: s.traverserGenerator,
remoteStorageConstructor: s.remoteStorageConstructor,
epochSource: s.epochSource,
localStorage: s.localStorage,
for i := range opts {
opts[i](exec)
prm: prm,
infoSplit: object.NewSplitInfo(),
}
exec.setLogger(s.log)
exec.execute(ctx)
return exec.statusError
return exec.statusError.err
}
func (exec *execCtx) execute(ctx context.Context) {
func (exec *request) execute(ctx context.Context) {
exec.log.Debug(logs.ServingRequest)
// perform local operation
@ -92,7 +93,7 @@ func (exec *execCtx) execute(ctx context.Context) {
exec.analyzeStatus(ctx, true)
}
func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) {
func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
// analyze local result
switch exec.status {
case statusOK:
@ -106,7 +107,7 @@ func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) {
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
default:
exec.log.Debug(logs.OperationFinishedWithError,
zap.String("error", exec.err.Error()),
zap.Error(exec.err),
)
if execCnr {

View file

@ -2,6 +2,7 @@ package getsvc
import (
"context"
"crypto/ecdsa"
"crypto/rand"
"errors"
"fmt"
@ -56,7 +57,7 @@ type testClient struct {
type testEpochReceiver uint64
func (e testEpochReceiver) currentEpoch() (uint64, error) {
func (e testEpochReceiver) Epoch() (uint64, error) {
return uint64(e), nil
}
@ -99,7 +100,7 @@ func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.
return vs, nil
}
func (c *testClientCache) get(info client.NodeInfo) (getClient, error) {
func (c *testClientCache) Get(info client.NodeInfo) (remoteStorage, error) {
v, ok := c.clients[network.StringifyGroup(info.AddressGroup())]
if !ok {
return nil, errors.New("could not construct client")
@ -117,8 +118,15 @@ func newTestClient() *testClient {
}
}
func (c *testClient) getObject(ctx context.Context, exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) {
v, ok := c.results[exec.address().EncodeToString()]
func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err error) {
c.results[addr.EncodeToString()] = struct {
obj *objectSDK.Object
err error
}{obj: obj, err: err}
}
func (c *testClient) Get(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
v, ok := c.results[address.EncodeToString()]
if !ok {
var errNotFound apistatus.ObjectNotFound
@ -129,21 +137,38 @@ func (c *testClient) getObject(ctx context.Context, exec *execCtx, _ client.Node
return nil, v.err
}
return cutToRange(v.obj, exec.ctxRange()), nil
return v.obj, nil
}
func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err error) {
c.results[addr.EncodeToString()] = struct {
obj *objectSDK.Object
err error
}{obj: obj, err: err}
func (c *testClient) Head(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
return c.Get(ctx, address, requestParams)
}
func (s *testStorage) get(_ context.Context, exec *execCtx) (*objectSDK.Object, error) {
func (c *testClient) Range(ctx context.Context, address oid.Address, rng *objectSDK.Range, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
obj, err := c.Get(ctx, address, requestParams)
if err != nil {
return nil, err
}
return cutToRange(obj, rng), nil
}
func (c *testClient) ForwardRequest(ctx context.Context, info client.NodeInfo, forwarder RequestForwarder) (*objectSDK.Object, error) {
return nil, fmt.Errorf("not implemented")
}
func (s *testStorage) Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error) {
return s.Range(ctx, address, nil)
}
func (s *testStorage) Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error) {
return s.Range(ctx, address, nil)
}
func (s *testStorage) Range(_ context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error) {
var (
ok bool
obj *objectSDK.Object
sAddr = exec.address().EncodeToString()
sAddr = address.EncodeToString()
)
if _, ok = s.inhumed[sAddr]; ok {
@ -157,7 +182,7 @@ func (s *testStorage) get(_ context.Context, exec *execCtx) (*objectSDK.Object,
}
if obj, ok = s.phy[sAddr]; ok {
return cutToRange(obj, exec.ctxRange()), nil
return cutToRange(obj, rng), nil
}
var errNotFound apistatus.ObjectNotFound
@ -241,15 +266,21 @@ func (w *writePayloadErrorObjectWriter) WriteChunk(_ context.Context, _ []byte)
return &writePayloadError{}
}
type testKeyStorage struct {
}
func (ks *testKeyStorage) GetKey(_ *util.SessionInfo) (*ecdsa.PrivateKey, error) {
return &ecdsa.PrivateKey{}, nil
}
func TestGetLocalOnly(t *testing.T) {
ctx := context.Background()
newSvc := func(storage *testStorage) *Service {
svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(t, false)
svc.localStorage = storage
return svc
return &Service{
log: test.NewLogger(t, false),
localStorage: storage,
}
}
newPrm := func(raw bool, w ObjectWriter) Prm {
@ -506,22 +537,21 @@ func TestGetRemoteSmall(t *testing.T) {
container.CalculateID(&idCnr, cnr)
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(t, false)
svc.localStorage = newTestStorage()
const curEpoch = 13
svc.traverserGenerator = &testTraverserGenerator{
return &Service{
log: test.NewLogger(t, false),
localStorage: newTestStorage(),
traverserGenerator: &testTraverserGenerator{
c: cnr,
b: map[uint64]placement.Builder{
curEpoch: b,
},
},
epochSource: testEpochReceiver(curEpoch),
remoteStorageConstructor: c,
keyStore: &testKeyStorage{},
}
svc.clientCache = c
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
return svc
}
newPrm := func(raw bool, w ObjectWriter) Prm {
@ -1176,7 +1206,7 @@ func TestGetRemoteSmall(t *testing.T) {
err := svc.Get(ctx, p)
require.Error(t, err)
require.Equal(t, err.Error(), "received child with empty parent")
require.ErrorIs(t, err, errChildWithEmptyParent)
w = NewSimpleObjectWriter()
payloadSz := srcObj.PayloadSize()
@ -1189,7 +1219,7 @@ func TestGetRemoteSmall(t *testing.T) {
err = svc.GetRange(ctx, rngPrm)
require.Error(t, err)
require.Equal(t, err.Error(), "received child with empty parent")
require.ErrorIs(t, err, errChildWithEmptyParent)
})
t.Run("out of range", func(t *testing.T) {
@ -1639,13 +1669,13 @@ func TestGetFromPastEpoch(t *testing.T) {
c22 := newTestClient()
c22.addResult(addr, obj, nil)
svc := &Service{cfg: new(cfg)}
svc.log = test.NewLogger(t, false)
svc.localStorage = newTestStorage()
const curEpoch = 13
svc.traverserGenerator = &testTraverserGenerator{
svc := &Service{
log: test.NewLogger(t, false),
localStorage: newTestStorage(),
epochSource: testEpochReceiver(curEpoch),
traverserGenerator: &testTraverserGenerator{
c: cnr,
b: map[uint64]placement.Builder{
curEpoch: &testPlacementBuilder{
@ -1659,19 +1689,18 @@ func TestGetFromPastEpoch(t *testing.T) {
},
},
},
}
svc.clientCache = &testClientCache{
},
remoteStorageConstructor: &testClientCache{
clients: map[string]*testClient{
as[0][0]: c11,
as[0][1]: c12,
as[1][0]: c21,
as[1][1]: c22,
},
},
keyStore: &testKeyStorage{},
}
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
w := NewSimpleObjectWriter()
commonPrm := new(util.CommonPrm)

View file

@ -11,7 +11,7 @@ import (
"go.uber.org/zap"
)
func (exec *execCtx) executeLocal(ctx context.Context) {
func (r *request) executeLocal(ctx context.Context) {
ctx, span := tracing.StartSpanFromContext(ctx, "getService.executeLocal")
defer func() {
span.End()
@ -19,7 +19,7 @@ func (exec *execCtx) executeLocal(ctx context.Context) {
var err error
exec.collectedObject, err = exec.svc.localStorage.get(ctx, exec)
r.collectedObject, err = r.get(ctx)
var errSplitInfo *objectSDK.SplitInfoError
var errRemoved apistatus.ObjectAlreadyRemoved
@ -27,25 +27,33 @@ func (exec *execCtx) executeLocal(ctx context.Context) {
switch {
default:
exec.status = statusUndefined
exec.err = err
r.status = statusUndefined
r.err = err
exec.log.Debug(logs.GetLocalGetFailed,
zap.String("error", err.Error()),
)
r.log.Debug(logs.GetLocalGetFailed, zap.Error(err))
case err == nil:
exec.status = statusOK
exec.err = nil
exec.writeCollectedObject(ctx)
r.status = statusOK
r.err = nil
r.writeCollectedObject(ctx)
case errors.As(err, &errRemoved):
exec.status = statusINHUMED
exec.err = errRemoved
r.status = statusINHUMED
r.err = errRemoved
case errors.As(err, &errSplitInfo):
exec.status = statusVIRTUAL
mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo())
exec.err = objectSDK.NewSplitInfoError(exec.infoSplit)
r.status = statusVIRTUAL
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
case errors.As(err, &errOutOfRange):
exec.status = statusOutOfRange
exec.err = errOutOfRange
r.status = statusOutOfRange
r.err = errOutOfRange
}
}
func (r *request) get(ctx context.Context) (*objectSDK.Object, error) {
if r.headOnly() {
return r.localStorage.Head(ctx, r.address(), r.isRaw())
}
if rng := r.ctxRange(); rng != nil {
return r.localStorage.Range(ctx, r.address(), rng)
}
return r.localStorage.Get(ctx, r.address())
}

View file

@ -3,12 +3,11 @@ package getsvc
import (
"context"
"crypto/ecdsa"
"errors"
"hash"
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
)
@ -21,14 +20,9 @@ type Prm struct {
type RangePrm struct {
commonPrm
rng *object.Range
rng *objectSDK.Range
}
var (
errRangeZeroLength = errors.New("zero range length")
errRangeOverflow = errors.New("range overflow")
)
// Validate pre-validates `OBJECTRANGE` request's parameters content
// without access to the requested object's payload.
func (p RangePrm) Validate() error {
@ -54,12 +48,18 @@ type RangeHashPrm struct {
hashGen func() hash.Hash
rngs []object.Range
rngs []objectSDK.Range
salt []byte
}
type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error)
type RequestParameters struct {
commonPrm
head bool
rng *objectSDK.Range
}
type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) (*objectSDK.Object, error)
// HeadPrm groups parameters of Head service call.
type HeadPrm struct {
@ -83,43 +83,25 @@ type commonPrm struct {
signerKey *ecdsa.PrivateKey
}
// ChunkWriter is an interface of target component
// to write payload chunk.
type ChunkWriter interface {
WriteChunk(context.Context, []byte) error
}
// HeaderWriter is an interface of target component
// to write object header.
type HeaderWriter interface {
WriteHeader(context.Context, *object.Object) error
}
// ObjectWriter is an interface of target component to write object.
type ObjectWriter interface {
HeaderWriter
ChunkWriter
}
// SetObjectWriter sets target component to write the object.
func (p *Prm) SetObjectWriter(w ObjectWriter) {
p.objWriter = w
}
// SetChunkWriter sets target component to write the object payload range.
func (p *RangePrm) SetChunkWriter(w ChunkWriter) {
func (p *commonPrm) SetChunkWriter(w ChunkWriter) {
p.objWriter = &partWriter{
chunkWriter: w,
}
}
// SetRange sets range of the requested payload data.
func (p *RangePrm) SetRange(rng *object.Range) {
func (p *RangePrm) SetRange(rng *objectSDK.Range) {
p.rng = rng
}
// SetRangeList sets a list of object payload ranges.
func (p *RangeHashPrm) SetRangeList(rngs []object.Range) {
func (p *RangeHashPrm) SetRangeList(rngs []objectSDK.Range) {
p.rngs = rngs
}
@ -158,7 +140,7 @@ func (p *commonPrm) WithCachedSignerKey(signerKey *ecdsa.PrivateKey) {
}
// SetHeaderWriter sets target component to write the object header.
func (p *HeadPrm) SetHeaderWriter(w HeaderWriter) {
func (p *commonPrm) SetHeaderWriter(w HeaderWriter) {
p.objWriter = &partWriter{
headWriter: w,
}

View file

@ -12,18 +12,18 @@ import (
"go.uber.org/zap"
)
func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool {
func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
ctx, span := tracing.StartSpanFromContext(ctx, "getService.processNode")
defer span.End()
exec.log.Debug(logs.ProcessingNode)
r.log.Debug(logs.ProcessingNode)
client, ok := exec.remoteClient(info)
rs, ok := r.getRemoteStorage(info)
if !ok {
return true
}
obj, err := client.getObject(ctx, exec, info)
obj, err := r.getRemote(ctx, rs, info)
var errSplitInfo *objectSDK.SplitInfoError
var errRemoved *apistatus.ObjectAlreadyRemoved
@ -33,34 +33,66 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool
default:
var errNotFound apistatus.ObjectNotFound
exec.status = statusUndefined
exec.err = errNotFound
r.status = statusUndefined
r.err = errNotFound
exec.log.Debug(logs.GetRemoteCallFailed,
zap.String("error", err.Error()),
)
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
case err == nil:
exec.status = statusOK
exec.err = nil
r.status = statusOK
r.err = nil
// both object and err are nil only if the original
// request was forwarded to another node and the object
// has already been streamed to the requesting party
if obj != nil {
exec.collectedObject = obj
exec.writeCollectedObject(ctx)
r.collectedObject = obj
r.writeCollectedObject(ctx)
}
case errors.As(err, &errRemoved):
exec.status = statusINHUMED
exec.err = errRemoved
r.status = statusINHUMED
r.err = errRemoved
case errors.As(err, &errOutOfRange):
exec.status = statusOutOfRange
exec.err = errOutOfRange
r.status = statusOutOfRange
r.err = errOutOfRange
case errors.As(err, &errSplitInfo):
exec.status = statusVIRTUAL
mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo())
exec.err = objectSDK.NewSplitInfoError(exec.infoSplit)
r.status = statusVIRTUAL
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
}
return exec.status != statusUndefined
return r.status != statusUndefined
}
func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.NodeInfo) (*objectSDK.Object, error) {
if r.isForwardingEnabled() {
return rs.ForwardRequest(ctx, info, r.prm.forwarder)
}
key, err := r.key()
if err != nil {
return nil, err
}
prm := RemoteRequestParams{
Epoch: r.curProcEpoch,
TTL: r.prm.common.TTL(),
PrivateKey: key,
SessionToken: r.prm.common.SessionToken(),
BearerToken: r.prm.common.BearerToken(),
XHeaders: r.prm.common.XHeaders(),
IsRaw: r.isRaw(),
}
if r.headOnly() {
return rs.Head(ctx, r.address(), prm)
}
// we don't specify payload writer because we accumulate
// the object locally (even huge).
if rng := r.ctxRange(); rng != nil {
// Current spec allows other storage node to deny access,
// fallback to GET here.
return rs.Range(ctx, r.address(), rng, prm)
}
return rs.Get(ctx, r.address(), prm)
}

View file

@ -0,0 +1,244 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
type request struct {
prm RequestParameters
statusError
infoSplit *objectSDK.SplitInfo
log *logger.Logger
collectedObject *objectSDK.Object
curProcEpoch uint64
keyStore keyStorage
epochSource epochSource
traverserGenerator traverserGenerator
remoteStorageConstructor remoteStorageConstructor
localStorage localStorage
}
func (r *request) setLogger(l *logger.Logger) {
req := "GET"
if r.headOnly() {
req = "HEAD"
} else if r.ctxRange() != nil {
req = "GET_RANGE"
}
r.log = &logger.Logger{Logger: l.With(
zap.String("request", req),
zap.Stringer("address", r.address()),
zap.Bool("raw", r.isRaw()),
zap.Bool("local", r.isLocal()),
zap.Bool("with session", r.prm.common.SessionToken() != nil),
zap.Bool("with bearer", r.prm.common.BearerToken() != nil),
)}
}
func (r *request) isLocal() bool {
return r.prm.common.LocalOnly()
}
func (r *request) isRaw() bool {
return r.prm.raw
}
func (r *request) address() oid.Address {
return r.prm.addr
}
func (r *request) key() (*ecdsa.PrivateKey, error) {
if r.prm.signerKey != nil {
// the key has already been requested and
// cached in the previous operations
return r.prm.signerKey, nil
}
var sessionInfo *util.SessionInfo
if tok := r.prm.common.SessionToken(); tok != nil {
sessionInfo = &util.SessionInfo{
ID: tok.ID(),
Owner: tok.Issuer(),
}
}
return r.keyStore.GetKey(sessionInfo)
}
func (r *request) canAssemble() bool {
return !r.isRaw() && !r.headOnly()
}
func (r *request) splitInfo() *objectSDK.SplitInfo {
return r.infoSplit
}
func (r *request) containerID() cid.ID {
return r.address().Container()
}
func (r *request) ctxRange() *objectSDK.Range {
return r.prm.rng
}
func (r *request) headOnly() bool {
return r.prm.head
}
func (r *request) netmapEpoch() uint64 {
return r.prm.common.NetmapEpoch()
}
func (r *request) netmapLookupDepth() uint64 {
return r.prm.common.NetmapLookupDepth()
}
func (r *request) initEpoch() bool {
r.curProcEpoch = r.netmapEpoch()
if r.curProcEpoch > 0 {
return true
}
e, err := r.epochSource.Epoch()
switch {
default:
r.status = statusUndefined
r.err = err
r.log.Debug(logs.CouldNotGetCurrentEpochNumber, zap.Error(err))
return false
case err == nil:
r.curProcEpoch = e
return true
}
}
func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
obj := addr.Object()
t, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch)
switch {
default:
r.status = statusUndefined
r.err = err
r.log.Debug(logs.GetCouldNotGenerateContainerTraverser, zap.Error(err))
return nil, false
case err == nil:
return t, true
}
}
func (r *request) getRemoteStorage(info clientcore.NodeInfo) (remoteStorage, bool) {
rs, err := r.remoteStorageConstructor.Get(info)
if err != nil {
r.status = statusUndefined
r.err = err
r.log.Debug(logs.GetCouldNotConstructRemoteNodeClient)
return nil, false
}
return rs, true
}
func (r *request) writeCollectedHeader(ctx context.Context) bool {
if r.ctxRange() != nil {
return true
}
err := r.prm.objWriter.WriteHeader(
ctx,
r.collectedObject.CutPayload(),
)
switch {
default:
r.status = statusUndefined
r.err = err
r.log.Debug(logs.GetCouldNotWriteHeader, zap.Error(err))
case err == nil:
r.status = statusOK
r.err = nil
}
return r.status == statusOK
}
func (r *request) writeObjectPayload(ctx context.Context, obj *objectSDK.Object) bool {
if r.headOnly() {
return true
}
err := r.prm.objWriter.WriteChunk(ctx, obj.Payload())
switch {
default:
r.status = statusUndefined
r.err = err
r.log.Debug(logs.GetCouldNotWritePayloadChunk, zap.Error(err))
case err == nil:
r.status = statusOK
r.err = nil
}
return err == nil
}
func (r *request) writeCollectedObject(ctx context.Context) {
if ok := r.writeCollectedHeader(ctx); ok {
r.writeObjectPayload(ctx, r.collectedObject)
}
}
// isForwardingEnabled returns true if common execution
// parameters has request forwarding closure set.
func (r request) isForwardingEnabled() bool {
return r.prm.forwarder != nil
}
// disableForwarding removes request forwarding closure from common
// parameters, so it won't be inherited in new execution contexts.
func (r *request) disableForwarding() {
r.prm.SetRequestForwarder(nil)
}
func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
if last, ok := src.LastPart(); ok {
dst.SetLastPart(last)
}
if link, ok := src.Link(); ok {
dst.SetLink(link)
}
if splitID := src.SplitID(); splitID != nil {
dst.SetSplitID(splitID)
}
}

View file

@ -1,9 +0,0 @@
package getsvc
type RangeHashRes struct {
hashes [][]byte
}
func (r *RangeHashRes) Hashes() [][]byte {
return r.hashes
}

View file

@ -1,124 +1,54 @@
package getsvc
import (
"context"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
)
// Option is a Service's constructor option.
type Option func(*Service)
// Service utility serving requests of Object.Get service.
type Service struct {
*cfg
}
// Option is a Service's constructor option.
type Option func(*cfg)
type getClient interface {
getObject(context.Context, *execCtx, client.NodeInfo) (*object.Object, error)
}
type cfg struct {
log *logger.Logger
localStorage interface {
get(context.Context, *execCtx) (*object.Object, error)
}
clientCache interface {
get(client.NodeInfo) (getClient, error)
}
traverserGenerator interface {
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
}
currentEpochReceiver interface {
currentEpoch() (uint64, error)
}
keyStore *util.KeyStorage
}
func defaultCfg() *cfg {
return &cfg{
log: &logger.Logger{Logger: zap.L()},
localStorage: new(storageEngineWrapper),
clientCache: new(clientCacheWrapper),
}
localStorage localStorage
traverserGenerator traverserGenerator
epochSource epochSource
keyStore keyStorage
remoteStorageConstructor remoteStorageConstructor
}
// New creates, initializes and returns utility serving

Why this change? We use functional options in services and you can do everything with both?

Why this change? We use functional options in services and you can do everything with both?

For example, it was not obvious to me which of the dependencies and options were required and which were not.

So New requires mandatory deps, With... - optional.

For example, it was not obvious to me which of the dependencies and options were required and which were not. So `New` requires mandatory deps, `With...` - optional.

Fixed back to Option for logger.

Use this pattern for optional arguments in constructors ...

From https://github.com/uber-go/guide/blob/master/style.md#functional-options

Fixed back to Option for logger. > Use this pattern for optional arguments in constructors ... From https://github.com/uber-go/guide/blob/master/style.md#functional-options

Yes, but 5 positional arguments don't look nice either.

Not that I am against the change, but I if we were to commit to a new scheme, I would like to do this atomically across the whole repo. We can create an issue for the discussion.

Yes, but 5 positional arguments don't look nice either. Not that I am against the change, but I if we were to commit to a new scheme, I would like to do this atomically across the whole repo. We can create an issue for the discussion.

I would prefer the positional arguments if they are required. They can be packed into a Options struct if needed.
It seems relatively easy to build incorrect objects in other places as well, because of this.

I would prefer the positional arguments if they are required. They can be packed into a `Options` struct if needed. It seems relatively easy to build incorrect objects in other places as well, because of this.

I like struct, but it seems we do not enforce anything with it either.
However it may be easier to see what arguments are required.

I like struct, but it seems we do not enforce anything with it either. However it may be easier to see what arguments are required.
We can try adopting this linter https://github.com/GaijinEntertainment/go-exhaustruct

Ofc it's not enforced, especially when having pointer fields. But I guess it's a combination of it being conventional and easier to look at the struct and its doc when calling New, rather than at the multiple option builders which might be spread around.

Ofc it's not enforced, especially when having pointer fields. But I guess it's a combination of it being conventional and easier to look at the struct and its doc when calling `New`, rather than at the multiple option builders which might be spread around.

we do not enforce anything with it either.

how about a struct that shows a caller that it contains only the required params + validaion inside New (pointers to be sure it is not a default value)? i do not like 5 positional args too

> we do not enforce anything with it either. how about a struct that shows a caller that it contains only the required params + validaion inside `New` (pointers to be sure it is not a default value)? i do not like 5 positional args too

we do not enforce anything with it either.

how about a struct that shows a caller that it contains only the required params + validaion inside New (pointers to be sure it is not a default value)? i do not like 5 positional args too

What's the difference? Instead of 5 arguments, there will be a structure with 5 fields that will most likely be filled in when calling the New method.

For example:

But positional arguments have an advantage: if a new dependency is added to the New method, then with a very high probability the program will not assemble until this dependency is added everywhere.

> > we do not enforce anything with it either. > > how about a struct that shows a caller that it contains only the required params + validaion inside `New` (pointers to be sure it is not a default value)? i do not like 5 positional args too What's the difference? Instead of 5 arguments, there will be a structure with 5 fields that will most likely be filled in when calling the `New` method. For example: - `New` declaration: https://git.frostfs.info/TrueCloudLab/frostfs-node/src/branch/master/pkg/innerring/processors/frostfs/processor.go#L88 - `New` call: https://git.frostfs.info/TrueCloudLab/frostfs-node/src/branch/master/pkg/innerring/initialization.go#L412 But positional arguments have an advantage: if a new dependency is added to the `New` method, then with a very high probability the program will not assemble until this dependency is added everywhere.

What's the difference? Instead of 5 arguments, there will be a structure with 5 fields that will most likely be filled in when calling the New method.

For me, it looks unreadable, especially when we provide a bunch of nil vars or empty strings.

> What's the difference? Instead of 5 arguments, there will be a structure with 5 fields that will most likely be filled in when calling the `New` method. For me, it looks unreadable, especially when we provide a bunch of nil vars or empty strings.

For me, it looks unreadable, especially when we provide a bunch of nil vars or empty strings.

It's about required arguments only. If you can pass nil or empty string as an argument value, i think that this argument is not required.

For not required arguments use Option.

> For me, it looks unreadable, especially when we provide a bunch of nil vars or empty strings. It's about required arguments only. If you can pass nil or empty string as an argument value, i think that this argument is not required. For not required arguments use Option.
// Object.Get service requests.
func New(opts ...Option) *Service {
c := defaultCfg()
for i := range opts {
opts[i](c)
func New(
ks keyStorage,
es epochSource,
e localStorageEngine,
tg traverserGenerator,
cc clientConstructor,
opts ...Option,
) *Service {
result := &Service{
keyStore: ks,
epochSource: es,
log: &logger.Logger{Logger: zap.L()},
localStorage: &engineLocalStorage{
engine: e,
},
traverserGenerator: tg,
remoteStorageConstructor: &multiclientRemoteStorageConstructor{
clientConstructor: cc,
},
}
return &Service{
cfg: c,
for _, option := range opts {
option(result)
}
return result
}
// WithLogger returns option to specify Get service's logger.
func WithLogger(l *logger.Logger) Option {
return func(c *cfg) {
c.log = &logger.Logger{Logger: l.With(zap.String("component", "Object.Get service"))}
}
}
// WithLocalStorageEngine returns option to set local storage
// instance.
func WithLocalStorageEngine(e *engine.StorageEngine) Option {
return func(c *cfg) {
c.localStorage.(*storageEngineWrapper).engine = e
}
}
type ClientConstructor interface {
Get(client.NodeInfo) (client.MultiAddressClient, error)
}
// WithClientConstructor returns option to set constructor of remote node clients.
func WithClientConstructor(v ClientConstructor) Option {
return func(c *cfg) {
c.clientCache.(*clientCacheWrapper).cache = v
}
}
// WithTraverserGenerator returns option to set generator of
// placement traverser to get the objects from containers.
func WithTraverserGenerator(t *util.TraverserGenerator) Option {
return func(c *cfg) {
c.traverserGenerator = t
}
}
// WithNetMapSource returns option to set network
// map storage to receive current network state.
func WithNetMapSource(nmSrc netmap.Source) Option {
return func(c *cfg) {
c.currentEpochReceiver = &nmSrcWrapper{
nmSrc: nmSrc,
}
}
}
// WithKeyStorage returns option to set private
// key storage for session tokens and node key.
func WithKeyStorage(store *util.KeyStorage) Option {
return func(c *cfg) {
c.keyStore = store
return func(s *Service) {
s.log = &logger.Logger{Logger: l.With(zap.String("component", "Object.Get service"))}
}
}

View file

@ -0,0 +1,14 @@
package getsvc
const (
statusUndefined int = iota
statusOK
statusINHUMED
statusVIRTUAL
statusOutOfRange
)
type statusError struct {
status int
err error
}

View file

@ -0,0 +1,238 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"errors"
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
)
type epochSource interface {
Epoch() (uint64, error)
}
type traverserGenerator interface {
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
}
type keyStorage interface {
GetKey(info *util.SessionInfo) (*ecdsa.PrivateKey, error)
}
type localStorageEngine interface {
Head(ctx context.Context, p engine.HeadPrm) (engine.HeadRes, error)
GetRange(ctx context.Context, p engine.RngPrm) (engine.RngRes, error)
Get(ctx context.Context, p engine.GetPrm) (engine.GetRes, error)
}
type clientConstructor interface {
Get(coreclient.NodeInfo) (coreclient.MultiAddressClient, error)
}
type remoteStorageConstructor interface {
Get(coreclient.NodeInfo) (remoteStorage, error)
}
type multiclientRemoteStorageConstructor struct {
clientConstructor clientConstructor
}
func (c *multiclientRemoteStorageConstructor) Get(info coreclient.NodeInfo) (remoteStorage, error) {
clt, err := c.clientConstructor.Get(info)
if err != nil {
return nil, err
}
return &multiaddressRemoteStorage{
client: clt,
}, nil
}
type localStorage interface {
Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error)
Range(ctx context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error)
Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error)
}
type engineLocalStorage struct {
engine localStorageEngine
}
func (s *engineLocalStorage) Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error) {
var headPrm engine.HeadPrm
headPrm.WithAddress(address)
headPrm.WithRaw(isRaw)
r, err := s.engine.Head(ctx, headPrm)
if err != nil {
return nil, err
}
return r.Header(), nil
}
func (s *engineLocalStorage) Range(ctx context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error) {
var getRange engine.RngPrm
getRange.WithAddress(address)
getRange.WithPayloadRange(rng)
r, err := s.engine.GetRange(ctx, getRange)
if err != nil {
return nil, err
}
return r.Object(), nil
}
func (s *engineLocalStorage) Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error) {
var getPrm engine.GetPrm
getPrm.WithAddress(address)
r, err := s.engine.Get(ctx, getPrm)
if err != nil {
return nil, err
}
return r.Object(), nil
}
type RemoteRequestParams struct {
Epoch uint64
TTL uint32
PrivateKey *ecdsa.PrivateKey
SessionToken *session.Object
BearerToken *bearer.Token
XHeaders []string
IsRaw bool
}
type remoteStorage interface {
Get(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error)
Head(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error)
Range(ctx context.Context, address oid.Address, rng *objectSDK.Range, requestParams RemoteRequestParams) (*objectSDK.Object, error)
ForwardRequest(ctx context.Context, info coreclient.NodeInfo, forwarder RequestForwarder) (*objectSDK.Object, error)
}
type multiaddressRemoteStorage struct {
client coreclient.MultiAddressClient
}
func (s *multiaddressRemoteStorage) ForwardRequest(ctx context.Context, info coreclient.NodeInfo, forwarder RequestForwarder) (*objectSDK.Object, error) {
return forwarder(ctx, info, s.client)
}
func (s *multiaddressRemoteStorage) Range(ctx context.Context, address oid.Address, rng *objectSDK.Range, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
var prm internalclient.PayloadRangePrm
prm.SetClient(s.client)
prm.SetTTL(requestParams.TTL)
prm.SetNetmapEpoch(requestParams.Epoch)
prm.SetAddress(address)
prm.SetPrivateKey(requestParams.PrivateKey)
prm.SetSessionToken(requestParams.SessionToken)
prm.SetBearerToken(requestParams.BearerToken)
prm.SetXHeaders(requestParams.XHeaders)
prm.SetRange(rng)
if requestParams.IsRaw {
prm.SetRawFlag()
}
res, err := internalclient.PayloadRange(ctx, prm)
if err != nil {
var errAccessDenied *apistatus.ObjectAccessDenied
if errors.As(err, &errAccessDenied) {
obj, err := s.Get(ctx, address, requestParams)
if err != nil {
return nil, err
}
payload := obj.Payload()
from := rng.GetOffset()
to := from + rng.GetLength()
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
return nil, new(apistatus.ObjectOutOfRange)
}
return s.payloadOnlyObject(payload[from:to]), nil
}
return nil, err
}
return s.payloadOnlyObject(res.PayloadRange()), nil
}
func (s *multiaddressRemoteStorage) Head(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
var prm internalclient.HeadObjectPrm
prm.SetClient(s.client)
prm.SetTTL(requestParams.TTL)
prm.SetNetmapEpoch(requestParams.Epoch)
prm.SetAddress(address)
prm.SetPrivateKey(requestParams.PrivateKey)
prm.SetSessionToken(requestParams.SessionToken)
prm.SetBearerToken(requestParams.BearerToken)
prm.SetXHeaders(requestParams.XHeaders)
if requestParams.IsRaw {
prm.SetRawFlag()
}
res, err := internalclient.HeadObject(ctx, prm)
if err != nil {
return nil, err
}
return res.Header(), nil
}
func (s *multiaddressRemoteStorage) Get(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
var prm internalclient.GetObjectPrm
prm.SetClient(s.client)
prm.SetTTL(requestParams.TTL)
prm.SetNetmapEpoch(requestParams.Epoch)
prm.SetAddress(address)
prm.SetPrivateKey(requestParams.PrivateKey)
prm.SetSessionToken(requestParams.SessionToken)
prm.SetBearerToken(requestParams.BearerToken)
prm.SetXHeaders(requestParams.XHeaders)
if requestParams.IsRaw {
prm.SetRawFlag()
}
res, err := internalclient.GetObject(ctx, prm)
if err != nil {
return nil, err
}
return res.Object(), nil
}
func (s *multiaddressRemoteStorage) payloadOnlyObject(payload []byte) *objectSDK.Object {
obj := objectSDK.New()
obj.SetPayload(payload)
return obj
}
type RangeHashRes struct {
hashes [][]byte
}
func (r *RangeHashRes) Hashes() [][]byte {
return r.hashes
}

View file

@ -1,261 +0,0 @@
package getsvc
import (
"context"
"crypto/ecdsa"
"errors"
"io"
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
type SimpleObjectWriter struct {
obj *object.Object
pld []byte
}
type clientCacheWrapper struct {
cache ClientConstructor
}
type clientWrapper struct {
client coreclient.MultiAddressClient
}
type storageEngineWrapper struct {
engine *engine.StorageEngine
}
type partWriter struct {
ObjectWriter
headWriter HeaderWriter
chunkWriter ChunkWriter
}
type hasherWrapper struct {
hash io.Writer
}
type nmSrcWrapper struct {
nmSrc netmap.Source
}
func NewSimpleObjectWriter() *SimpleObjectWriter {
return &SimpleObjectWriter{
obj: object.New(),
}
}
func (s *SimpleObjectWriter) WriteHeader(_ context.Context, obj *object.Object) error {
s.obj = obj
s.pld = make([]byte, 0, obj.PayloadSize())
return nil
}
func (s *SimpleObjectWriter) WriteChunk(_ context.Context, p []byte) error {
s.pld = append(s.pld, p...)
return nil
}
func (s *SimpleObjectWriter) Object() *object.Object {
if len(s.pld) > 0 {
s.obj.SetPayload(s.pld)
}
return s.obj
}
func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) {
clt, err := c.cache.Get(info)
if err != nil {
return nil, err
}
return &clientWrapper{
client: clt,
}, nil
}
func (c *clientWrapper) getObject(ctx context.Context, exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
if exec.isForwardingEnabled() {
return exec.prm.forwarder(ctx, info, c.client)
}
key, err := exec.key()
if err != nil {
return nil, err
}
if exec.headOnly() {
return c.getHeadOnly(ctx, exec, key)
}
// we don't specify payload writer because we accumulate
// the object locally (even huge).
if rng := exec.ctxRange(); rng != nil {
// Current spec allows other storage node to deny access,
// fallback to GET here.
return c.getRange(ctx, exec, key, rng)
}
return c.get(ctx, exec, key)
}
func (c *clientWrapper) getRange(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey, rng *object.Range) (*object.Object, error) {
var prm internalclient.PayloadRangePrm
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
prm.SetRange(rng)
if exec.isRaw() {
prm.SetRawFlag()
}
res, err := internalclient.PayloadRange(ctx, prm)
if err != nil {
var errAccessDenied *apistatus.ObjectAccessDenied
if errors.As(err, &errAccessDenied) {
obj, err := c.get(ctx, exec, key)
if err != nil {
return nil, err
}
payload := obj.Payload()
from := rng.GetOffset()
to := from + rng.GetLength()
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
return nil, new(apistatus.ObjectOutOfRange)
}
return payloadOnlyObject(payload[from:to]), nil
}
return nil, err
}
return payloadOnlyObject(res.PayloadRange()), nil
}
func (c *clientWrapper) getHeadOnly(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
var prm internalclient.HeadObjectPrm
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
if exec.isRaw() {
prm.SetRawFlag()
}
res, err := internalclient.HeadObject(ctx, prm)
if err != nil {
return nil, err
}
return res.Header(), nil
}
func (c *clientWrapper) get(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
var prm internalclient.GetObjectPrm
prm.SetClient(c.client)
prm.SetTTL(exec.prm.common.TTL())
prm.SetNetmapEpoch(exec.curProcEpoch)
prm.SetAddress(exec.address())
prm.SetPrivateKey(key)
prm.SetSessionToken(exec.prm.common.SessionToken())
prm.SetBearerToken(exec.prm.common.BearerToken())
prm.SetXHeaders(exec.prm.common.XHeaders())
if exec.isRaw() {
prm.SetRawFlag()
}
res, err := internalclient.GetObject(ctx, prm)
if err != nil {
return nil, err
}
return res.Object(), nil
}
func (e *storageEngineWrapper) get(ctx context.Context, exec *execCtx) (*object.Object, error) {
if exec.headOnly() {
var headPrm engine.HeadPrm
headPrm.WithAddress(exec.address())
headPrm.WithRaw(exec.isRaw())
r, err := e.engine.Head(ctx, headPrm)
if err != nil {
return nil, err
}
return r.Header(), nil
} else if rng := exec.ctxRange(); rng != nil {
var getRange engine.RngPrm
getRange.WithAddress(exec.address())
getRange.WithPayloadRange(rng)
r, err := e.engine.GetRange(ctx, getRange)
if err != nil {
return nil, err
}
return r.Object(), nil
} else {
var getPrm engine.GetPrm
getPrm.WithAddress(exec.address())
r, err := e.engine.Get(ctx, getPrm)
if err != nil {
return nil, err
}
return r.Object(), nil
}
}
func (w *partWriter) WriteChunk(ctx context.Context, p []byte) error {
return w.chunkWriter.WriteChunk(ctx, p)
}
func (w *partWriter) WriteHeader(ctx context.Context, o *object.Object) error {
return w.headWriter.WriteHeader(ctx, o)
}
func payloadOnlyObject(payload []byte) *object.Object {
obj := object.New()
obj.SetPayload(payload)
return obj
}
func (h *hasherWrapper) WriteChunk(_ context.Context, p []byte) error {
_, err := h.hash.Write(p)
return err
}
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
return n.nmSrc.Epoch()
}

View file

@ -0,0 +1,92 @@
package getsvc
import (
"errors"
"fmt"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
refs "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
)
var (
errMissingObjAddress = errors.New("missing object address")
errWrongMessageSeq = errors.New("incorrect message sequence")
errNilObjectPart = errors.New("nil object part")
errMissingSignature = errors.New("missing signature")
errInvalidObjectIDSign = errors.New("invalid object ID signature")
errWrongHeaderPartTypeExpShortRecvWithSignature = fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
)
errWrongHeaderPartTypeExpWithSignRecvShort = fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
)
)
func errInvalidObjAddress(err error) error {
return fmt.Errorf("invalid object address: %w", err)
}
func errRequestParamsValidation(err error) error {
return fmt.Errorf("request params validation: %w", err)
}
func errFetchingSessionKey(err error) error {
return fmt.Errorf("fetching session key: %w", err)
}
func errUnknownChechsumType(t refs.ChecksumType) error {
return fmt.Errorf("unknown checksum type %v", t)
}
func errResponseVerificationFailed(err error) error {
return fmt.Errorf("response verification failed: %w", err)
}
func errCouldNotWriteObjHeader(err error) error {
return fmt.Errorf("could not write object header in Get forwarder: %w", err)
}
func errStreamOpenningFailed(err error) error {
return fmt.Errorf("stream opening failed: %w", err)
}
func errReadingResponseFailed(err error) error {
return fmt.Errorf("reading the response failed: %w", err)
}
func errUnexpectedObjectPart(v objectV2.GetObjectPart) error {
return fmt.Errorf("unexpected object part %T", v)
}
func errCouldNotWriteObjChunk(forwarder string, err error) error {
return fmt.Errorf("could not write object chunk in %s forwarder: %w", forwarder, err)
}
func errCouldNotVerifyRangeResponse(resp *objectV2.GetRangeResponse, err error) error {
return fmt.Errorf("could not verify %T: %w", resp, err)
}
func errCouldNotCreateGetRangeStream(err error) error {
return fmt.Errorf("could not create Get payload range stream: %w", err)
}
func errUnexpectedRangePart(v objectV2.GetRangePart) error {
return fmt.Errorf("unexpected range type %T", v)
}
func errUnexpectedHeaderPart(v objectV2.GetHeaderPart) error {
return fmt.Errorf("unexpected header type %T", v)
}
func errMarshalID(err error) error {
return fmt.Errorf("marshal ID: %w", err)
}
func errCantReadSignature(err error) error {
return fmt.Errorf("can't read signature: %w", err)
}
func errSendingRequestFailed(err error) error {
return fmt.Errorf("sending the request failed: %w", err)
}

View file

@ -4,7 +4,6 @@ import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"sync"
@ -71,7 +70,7 @@ func (f *getRequestForwarder) verifyResponse(resp *objectV2.GetResponse, pubkey
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return fmt.Errorf("response verification failed: %w", err)
return errResponseVerificationFailed(err)
}
return checkStatus(resp.GetMetaHeader().GetStatus())
@ -89,7 +88,7 @@ func (f *getRequestForwarder) writeHeader(ctx context.Context, v *objectV2.GetOb
err = f.Stream.WriteHeader(ctx, object.NewFromV2(obj))
})
if err != nil {
return fmt.Errorf("could not write object header in Get forwarder: %w", err)
return errCouldNotWriteObjHeader(err)
}
return nil
}
@ -102,7 +101,7 @@ func (f *getRequestForwarder) openStream(ctx context.Context, addr network.Addre
return e
})
if err != nil {
return nil, fmt.Errorf("stream opening failed: %w", err)
return nil, errStreamOpenningFailed(err)
}
return getStream, nil
}
@ -127,7 +126,7 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr
}
internalclient.ReportError(c, err)
return fmt.Errorf("reading the response failed: %w", err)
return errReadingResponseFailed(err)
}
if err := f.verifyResponse(resp, pubkey); err != nil {
@ -136,7 +135,7 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr
switch v := resp.GetBody().GetObjectPart().(type) {
default:
return fmt.Errorf("unexpected object part %T", v)
return errUnexpectedObjectPart(v)
case *objectV2.GetObjectPartInit:
if headWas {
return errWrongMessageSeq
@ -159,7 +158,7 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr
}
if err = f.Stream.WriteChunk(ctx, chunk); err != nil {
return fmt.Errorf("could not write object chunk in Get forwarder: %w", err)
return errCouldNotWriteObjChunk("Get", err)
}
localProgress += len(origChunk)

View file

@ -4,7 +4,6 @@ import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"sync"
@ -73,7 +72,7 @@ func (f *getRangeRequestForwarder) verifyResponse(resp *objectV2.GetRangeRespons
// verify response structure
if err := signature.VerifyServiceMessage(resp); err != nil {
return fmt.Errorf("could not verify %T: %w", resp, err)
return errCouldNotVerifyRangeResponse(resp, err)
}
return checkStatus(resp.GetMetaHeader().GetStatus())
@ -88,7 +87,7 @@ func (f *getRangeRequestForwarder) openStream(ctx context.Context, addr network.
return e
})
if err != nil {
return nil, fmt.Errorf("could not create Get payload range stream: %w", err)
return nil, errCouldNotCreateGetRangeStream(err)
}
return rangeStream, nil
}
@ -105,7 +104,7 @@ func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *
break
}
internalclient.ReportError(c, err)
return fmt.Errorf("reading the response failed: %w", err)
return errReadingResponseFailed(err)
}
if err := f.verifyResponse(resp, pubkey); err != nil {
@ -114,7 +113,7 @@ func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *
switch v := resp.GetBody().GetRangePart().(type) {
case nil:
return fmt.Errorf("unexpected range type %T", v)
return errUnexpectedRangePart(v)
case *objectV2.GetRangePartChunk:
origChunk := v.GetChunk()
@ -125,7 +124,7 @@ func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *
}
if err = f.Stream.WriteChunk(ctx, chunk); err != nil {
return fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err)
return errCouldNotWriteObjChunk("GetRange", err)
}
localProgress += len(origChunk)

View file

@ -3,8 +3,6 @@ package getsvc
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"sync"
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
@ -74,7 +72,7 @@ func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr ne
switch v := headResp.GetBody().GetHeaderPart().(type) {
case nil:
return nil, fmt.Errorf("unexpected header type %T", v)
return nil, errUnexpectedHeaderPart(v)
case *objectV2.ShortHeader:
if hdr, err = f.getHeaderFromShortHeader(v); err != nil {
return nil, err
@ -100,9 +98,7 @@ func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr ne
func (f *headRequestForwarder) getHeaderFromShortHeader(sh *objectV2.ShortHeader) (*objectV2.Header, error) {
if !f.Request.GetBody().GetMainOnly() {
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
)
return nil, errWrongHeaderPartTypeExpShortRecvWithSignature
}
hdr := new(objectV2.Header)
@ -118,35 +114,32 @@ func (f *headRequestForwarder) getHeaderFromShortHeader(sh *objectV2.ShortHeader
func (f *headRequestForwarder) getHeaderAndSignature(hdrWithSig *objectV2.HeaderWithSignature) (*objectV2.Header, *refs.Signature, error) {
if f.Request.GetBody().GetMainOnly() {
return nil, nil, fmt.Errorf("wrong header part type: expected %T, received %T",
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
)
return nil, nil, errWrongHeaderPartTypeExpWithSignRecvShort
}
if hdrWithSig == nil {
return nil, nil, errors.New("nil object part")
return nil, nil, errNilObjectPart
}
hdr := hdrWithSig.GetHeader()
idSig := hdrWithSig.GetSignature()
if idSig == nil {
// TODO(@cthulhu-rider): #1387 use "const" error
return nil, nil, errors.New("missing signature")
return nil, nil, errMissingSignature
}
binID, err := f.ObjectAddr.Object().Marshal()
if err != nil {
return nil, nil, fmt.Errorf("marshal ID: %w", err)
return nil, nil, errMarshalID(err)
}
var sig frostfscrypto.Signature
if err := sig.ReadFromV2(*idSig); err != nil {
return nil, nil, fmt.Errorf("can't read signature: %w", err)
return nil, nil, errCantReadSignature(err)
}
if !sig.Verify(binID) {
return nil, nil, errors.New("invalid object ID signature")
return nil, nil, errInvalidObjectIDSign
}
return hdr, idSig, nil
@ -160,7 +153,7 @@ func (f *headRequestForwarder) sendHeadRequest(ctx context.Context, addr network
return e
})
if err != nil {
return nil, fmt.Errorf("sending the request failed: %w", err)
return nil, errSendingRequestFailed(err)
}
return headResp, nil
}
@ -173,7 +166,7 @@ func (f *headRequestForwarder) verifyResponse(headResp *objectV2.HeadResponse, p
// verify response structure
if err := signature.VerifyServiceMessage(headResp); err != nil {
return fmt.Errorf("response verification failed: %w", err)
return errResponseVerificationFailed(err)
}
return checkStatus(f.Response.GetMetaHeader().GetStatus())

View file

@ -4,7 +4,6 @@ import (
"context"
"crypto/sha256"
"errors"
"fmt"
"hash"
"sync"
@ -24,21 +23,19 @@ import (
"git.frostfs.info/TrueCloudLab/tzhash/tz"
)
var errWrongMessageSeq = errors.New("incorrect message sequence")
func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) {
body := req.GetBody()
addrV2 := body.GetAddress()
if addrV2 == nil {
return nil, errors.New("missing object address")
return nil, errMissingObjAddress
}
var addr oid.Address
err := addr.ReadFromV2(*addrV2)
if err != nil {
return nil, fmt.Errorf("invalid object address: %w", err)
return nil, errInvalidObjAddress(err)
}
commonPrm, err := util.CommonPrmFromV2(req)
@ -81,14 +78,14 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
addrV2 := body.GetAddress()
if addrV2 == nil {
return nil, errors.New("missing object address")
return nil, errMissingObjAddress
}
var addr oid.Address
err := addr.ReadFromV2(*addrV2)
if err != nil {
return nil, fmt.Errorf("invalid object address: %w", err)
return nil, errInvalidObjAddress(err)
}
commonPrm, err := util.CommonPrmFromV2(req)
@ -108,7 +105,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
err = p.Validate()
if err != nil {
return nil, fmt.Errorf("request params validation: %w", err)
return nil, errRequestParamsValidation(err)
}
if !commonPrm.LocalOnly() {
@ -136,14 +133,14 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran
addrV2 := body.GetAddress()
if addrV2 == nil {
return nil, errors.New("missing object address")
return nil, errMissingObjAddress
}
var addr oid.Address
err := addr.ReadFromV2(*addrV2)
if err != nil {
return nil, fmt.Errorf("invalid object address: %w", err)
return nil, errInvalidObjAddress(err)
}
commonPrm, err := util.CommonPrmFromV2(req)
@ -167,7 +164,7 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran
}
if err != nil {
return nil, fmt.Errorf("fetching session key: %w", err)
return nil, errFetchingSessionKey(err)
}
p.WithCachedSignerKey(signerKey)
@ -185,7 +182,7 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran
switch t := body.GetType(); t {
default:
return nil, fmt.Errorf("unknown checksum type %v", t)
return nil, errUnknownChechsumType(t)
case refs.SHA256:
p.SetHashGenerator(func() hash.Hash {
return sha256.New()
@ -220,14 +217,14 @@ func (s *Service) toHeadPrm(req *objectV2.HeadRequest, resp *objectV2.HeadRespon
addrV2 := body.GetAddress()
if addrV2 == nil {
return nil, errors.New("missing object address")
return nil, errMissingObjAddress
}
var objAddr oid.Address
err := objAddr.ReadFromV2(*addrV2)
if err != nil {
return nil, fmt.Errorf("invalid object address: %w", err)
return nil, errInvalidObjAddress(err)
}
commonPrm, err := util.CommonPrmFromV2(req)

View file

@ -0,0 +1,84 @@
package getsvc
import (
"context"
"io"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
)
// ChunkWriter is an interface of target component
// to write payload chunk.
type ChunkWriter interface {
WriteChunk(context.Context, []byte) error
}
// HeaderWriter is an interface of target component
// to write object header.
type HeaderWriter interface {
WriteHeader(context.Context, *object.Object) error
}
// ObjectWriter is an interface of target component to write object.
type ObjectWriter interface {
HeaderWriter
ChunkWriter
}
type SimpleObjectWriter struct {
obj *object.Object
pld []byte
}
type partWriter struct {
ObjectWriter
headWriter HeaderWriter
chunkWriter ChunkWriter
}
type hasherWrapper struct {
hash io.Writer
}
func NewSimpleObjectWriter() *SimpleObjectWriter {
return &SimpleObjectWriter{
obj: object.New(),
}
}
func (s *SimpleObjectWriter) WriteHeader(_ context.Context, obj *object.Object) error {
s.obj = obj
s.pld = make([]byte, 0, obj.PayloadSize())
return nil
}
func (s *SimpleObjectWriter) WriteChunk(_ context.Context, p []byte) error {
s.pld = append(s.pld, p...)
return nil
}
func (s *SimpleObjectWriter) Object() *object.Object {
if len(s.pld) > 0 {
s.obj.SetPayload(s.pld)
}
return s.obj
}
func (w *partWriter) WriteChunk(ctx context.Context, p []byte) error {
return w.chunkWriter.WriteChunk(ctx, p)
}
func (w *partWriter) WriteHeader(ctx context.Context, o *object.Object) error {
return w.headWriter.WriteHeader(ctx, o)
}
func (h *hasherWrapper) WriteChunk(_ context.Context, p []byte) error {
_, err := h.hash.Write(p)
return err
}