Refactor getsvc #277
|
@ -336,17 +336,14 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
|
|||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||
|
||||
return getsvc.New(
|
||||
getsvc.WithLogger(c.log),
|
||||
getsvc.WithLocalStorageEngine(ls),
|
||||
getsvc.WithClientConstructor(coreConstructor),
|
||||
getsvc.WithTraverserGenerator(
|
||||
traverseGen.WithTraverseOptions(
|
||||
placement.SuccessAfter(1),
|
||||
),
|
||||
keyStorage,
|
||||
c.netMapSource,
|
||||
ls,
|
||||
traverseGen.WithTraverseOptions(
|
||||
placement.SuccessAfter(1),
|
||||
),
|
||||
getsvc.WithNetMapSource(c.netMapSource),
|
||||
getsvc.WithKeyStorage(keyStorage),
|
||||
)
|
||||
coreConstructor,
|
||||
getsvc.WithLogger(c.log))
|
||||
}
|
||||
|
||||
func createGetServiceV2(sGet *getsvc.Service, keyStorage *util.KeyStorage) *getsvcV2.Service {
|
||||
|
|
|
@ -11,9 +11,9 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (exec *execCtx) assemble(ctx context.Context) {
|
||||
if !exec.canAssemble() {
|
||||
exec.log.Debug(logs.GetCanNotAssembleTheObject)
|
||||
func (r *request) assemble(ctx context.Context) {
|
||||
if !r.canAssemble() {
|
||||
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -28,35 +28,35 @@ func (exec *execCtx) assemble(ctx context.Context) {
|
|||
// - the assembly process is expected to be handled on a container node
|
||||
// only since the requests forwarding mechanism presentation; such the
|
||||
// node should have enough rights for getting any child object by design.
|
||||
exec.prm.common.ForgetTokens()
|
||||
r.prm.common.ForgetTokens()
|
||||
|
||||
// Do not use forwarding during assembly stage.
|
||||
// Request forwarding closure inherited in produced
|
||||
// `execCtx` so it should be disabled there.
|
||||
exec.disableForwarding()
|
||||
r.disableForwarding()
|
||||
|
||||
exec.log.Debug(logs.GetTryingToAssembleTheObject)
|
||||
r.log.Debug(logs.GetTryingToAssembleTheObject)
|
||||
|
||||
assembler := newAssembler(exec.address(), exec.splitInfo(), exec.ctxRange(), exec)
|
||||
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r)
|
||||
|
||||
exec.log.Debug(logs.GetAssemblingSplittedObject,
|
||||
zap.Stringer("address", exec.address()),
|
||||
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", exec.ctxRange().GetLength()),
|
||||
r.log.Debug(logs.GetAssemblingSplittedObject,
|
||||
zap.Stringer("address", r.address()),
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||
)
|
||||
defer exec.log.Debug(logs.GetAssemblingSplittedObjectCompleted,
|
||||
zap.Stringer("address", exec.address()),
|
||||
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", exec.ctxRange().GetLength()),
|
||||
defer r.log.Debug(logs.GetAssemblingSplittedObjectCompleted,
|
||||
zap.Stringer("address", r.address()),
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||
)
|
||||
|
||||
obj, err := assembler.Assemble(ctx, exec.prm.objWriter)
|
||||
obj, err := assembler.Assemble(ctx, r.prm.objWriter)
|
||||
if err != nil {
|
||||
exec.log.Warn(logs.GetFailedToAssembleSplittedObject,
|
||||
r.log.Warn(logs.GetFailedToAssembleSplittedObject,
|
||||
zap.Error(err),
|
||||
zap.Stringer("address", exec.address()),
|
||||
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", exec.ctxRange().GetLength()),
|
||||
zap.Stringer("address", r.address()),
|
||||
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||
)
|
||||
}
|
||||
|
||||
|
@ -68,27 +68,27 @@ func (exec *execCtx) assemble(ctx context.Context) {
|
|||
|
||||
switch {
|
||||
default:
|
||||
exec.status = statusUndefined
|
||||
exec.err = err
|
||||
r.status = statusUndefined
|
||||
r.err = err
|
||||
case err == nil:
|
||||
exec.status = statusOK
|
||||
exec.err = nil
|
||||
exec.collectedObject = obj
|
||||
r.status = statusOK
|
||||
r.err = nil
|
||||
r.collectedObject = obj
|
||||
case errors.As(err, &errRemovedRemote):
|
||||
exec.status = statusINHUMED
|
||||
exec.err = errRemovedRemote
|
||||
r.status = statusINHUMED
|
||||
r.err = errRemovedRemote
|
||||
case errors.As(err, &errRemovedLocal):
|
||||
exec.status = statusINHUMED
|
||||
exec.err = errRemovedLocal
|
||||
r.status = statusINHUMED
|
||||
r.err = errRemovedLocal
|
||||
case errors.As(err, &errSplitInfo):
|
||||
exec.status = statusVIRTUAL
|
||||
exec.err = errSplitInfo
|
||||
r.status = statusVIRTUAL
|
||||
r.err = errSplitInfo
|
||||
case errors.As(err, &errOutOfRangeRemote):
|
||||
exec.status = statusOutOfRange
|
||||
exec.err = errOutOfRangeRemote
|
||||
r.status = statusOutOfRange
|
||||
r.err = errOutOfRangeRemote
|
||||
case errors.As(err, &errOutOfRangeLocal):
|
||||
exec.status = statusOutOfRange
|
||||
exec.err = errOutOfRangeLocal
|
||||
r.status = statusOutOfRange
|
||||
r.err = errOutOfRangeLocal
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -96,42 +96,54 @@ func equalAddresses(a, b oid.Address) bool {
|
|||
return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object())
|
||||
}
|
||||
|
||||
func (exec *execCtx) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) {
|
||||
p := exec.prm
|
||||
p.common = p.common.WithLocalOnly(false)
|
||||
p.addr.SetContainer(exec.containerID())
|
||||
p.addr.SetObject(id)
|
||||
|
||||
prm := HeadPrm{
|
||||
commonPrm: p.commonPrm,
|
||||
}
|
||||
|
||||
func (r *request) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) {
|
||||
w := NewSimpleObjectWriter()
|
||||
prm.SetHeaderWriter(w)
|
||||
err := exec.svc.Head(ctx, prm)
|
||||
|
||||
if err != nil {
|
||||
p := RequestParameters{}
|
||||
p.common = p.common.WithLocalOnly(false)
|
||||
p.addr.SetContainer(r.containerID())
|
||||
p.addr.SetObject(id)
|
||||
p.head = true
|
||||
p.SetHeaderWriter(w)
|
||||
|
||||
if err := r.getObjectWithIndependentRequest(ctx, p); err != nil {
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return w.Object(), nil
|
||||
}
|
||||
|
||||
func (exec *execCtx) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Range) (*objectSDK.Object, error) {
|
||||
func (r *request) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Range) (*objectSDK.Object, error) {
|
||||
w := NewSimpleObjectWriter()
|
||||
|
||||
p := exec.prm
|
||||
p := r.prm
|
||||
p.common = p.common.WithLocalOnly(false)
|
||||
p.objWriter = w
|
||||
p.SetRange(rng)
|
||||
p.rng = rng
|
||||
|
||||
p.addr.SetContainer(exec.containerID())
|
||||
p.addr.SetContainer(r.containerID())
|
||||
p.addr.SetObject(id)
|
||||
|
||||
statusError := exec.svc.get(ctx, p.commonPrm, withPayloadRange(rng))
|
||||
|
||||
if statusError.err != nil {
|
||||
return nil, statusError.err
|
||||
if err := r.getObjectWithIndependentRequest(ctx, p); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return w.Object(), nil
|
||||
}
|
||||
|
||||
func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm RequestParameters) error {
|
||||
detachedExecutor := &request{
|
||||
keyStore: r.keyStore,
|
||||
traverserGenerator: r.traverserGenerator,
|
||||
remoteStorageConstructor: r.remoteStorageConstructor,
|
||||
epochSource: r.epochSource,
|
||||
localStorage: r.localStorage,
|
||||
|
||||
prm: prm,
|
||||
infoSplit: objectSDK.NewSplitInfo(),
|
||||
log: r.log,
|
||||
}
|
||||
|
||||
detachedExecutor.execute(ctx)
|
||||
|
||||
return detachedExecutor.statusError.err
|
||||
}
|
||||
|
|
|
@ -2,7 +2,6 @@ package getsvc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
|
@ -15,10 +14,6 @@ type objectGetter interface {
|
|||
HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error)
|
||||
}
|
||||
|
||||
var (
|
||||
errParentAddressDiffers = errors.New("parent address in child object differs")
|
||||
)
|
||||
|
||||
type assembler struct {
|
||||
addr oid.Address
|
||||
splitInfo *objectSDK.SplitInfo
|
||||
|
@ -89,7 +84,7 @@ func (a *assembler) initializeFromSourceObjectID(ctx context.Context, id oid.ID)
|
|||
|
||||
parentObject := sourceObject.Parent()
|
||||
if parentObject == nil {
|
||||
return nil, nil, errors.New("received child with empty parent")
|
||||
return nil, nil, errChildWithEmptyParent
|
||||
}
|
||||
|
||||
a.parentObject = parentObject
|
||||
|
|
|
@ -8,26 +8,26 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (exec *execCtx) executeOnContainer(ctx context.Context) {
|
||||
if exec.isLocal() {
|
||||
exec.log.Debug(logs.GetReturnResultDirectly)
|
||||
func (r *request) executeOnContainer(ctx context.Context) {
|
||||
if r.isLocal() {
|
||||
r.log.Debug(logs.GetReturnResultDirectly)
|
||||
return
|
||||
}
|
||||
|
||||
lookupDepth := exec.netmapLookupDepth()
|
||||
lookupDepth := r.netmapLookupDepth()
|
||||
|
||||
exec.log.Debug(logs.TryingToExecuteInContainer,
|
||||
r.log.Debug(logs.TryingToExecuteInContainer,
|
||||
zap.Uint64("netmap lookup depth", lookupDepth),
|
||||
)
|
||||
|
||||
// initialize epoch number
|
||||
ok := exec.initEpoch()
|
||||
ok := r.initEpoch()
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
for {
|
||||
if exec.processCurrentEpoch(ctx) {
|
||||
if r.processCurrentEpoch(ctx) {
|
||||
break
|
||||
}
|
||||
|
||||
|
@ -39,16 +39,16 @@ func (exec *execCtx) executeOnContainer(ctx context.Context) {
|
|||
lookupDepth--
|
||||
|
||||
// go to the previous epoch
|
||||
exec.curProcEpoch--
|
||||
r.curProcEpoch--
|
||||
}
|
||||
}
|
||||
|
||||
func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
|
||||
exec.log.Debug(logs.ProcessEpoch,
|
||||
zap.Uint64("number", exec.curProcEpoch),
|
||||
func (r *request) processCurrentEpoch(ctx context.Context) bool {
|
||||
r.log.Debug(logs.ProcessEpoch,
|
||||
zap.Uint64("number", r.curProcEpoch),
|
||||
)
|
||||
|
||||
traverser, ok := exec.generateTraverser(exec.address())
|
||||
traverser, ok := r.generateTraverser(r.address())
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
|
@ -56,12 +56,12 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
|
|||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
exec.status = statusUndefined
|
||||
r.status = statusUndefined
|
||||
|
||||
for {
|
||||
addrs := traverser.Next()
|
||||
if len(addrs) == 0 {
|
||||
exec.log.Debug(logs.NoMoreNodesAbortPlacementIteration)
|
||||
r.log.Debug(logs.NoMoreNodesAbortPlacementIteration)
|
||||
|
||||
return false
|
||||
}
|
||||
|
@ -69,8 +69,8 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
|
|||
for i := range addrs {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
exec.log.Debug(logs.InterruptPlacementIterationByContext,
|
||||
zap.String("error", ctx.Err().Error()),
|
||||
r.log.Debug(logs.InterruptPlacementIterationByContext,
|
||||
zap.Error(ctx.Err()),
|
||||
)
|
||||
|
||||
return true
|
||||
|
@ -84,8 +84,8 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
|
|||
|
||||
client.NodeInfoFromNetmapElement(&info, addrs[i])
|
||||
|
||||
if exec.processNode(ctx, info) {
|
||||
exec.log.Debug(logs.GetCompletingTheOperation)
|
||||
if r.processNode(ctx, info) {
|
||||
r.log.Debug(logs.GetCompletingTheOperation)
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
|
10
pkg/services/object/get/errors.go
Normal file
|
@ -0,0 +1,10 @@
|
|||
package getsvc
|
||||
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
errRangeZeroLength = errors.New("zero range length")
|
||||
errRangeOverflow = errors.New("range overflow")
|
||||
errChildWithEmptyParent = errors.New("received child with empty parent")
|
||||
errParentAddressDiffers = errors.New("parent address in child object differs")
|
||||
)
|
|
@ -1,279 +0,0 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type statusError struct {
|
||||
status int
|
||||
err error
|
||||
}
|
||||
|
||||
type execCtx struct {
|
||||
svc *Service
|
||||
|
||||
prm RangePrm
|
||||
|
||||
statusError
|
||||
|
||||
infoSplit *objectSDK.SplitInfo
|
||||
|
||||
log *logger.Logger
|
||||
|
||||
collectedObject *objectSDK.Object
|
||||
|
||||
head bool
|
||||
|
||||
curProcEpoch uint64
|
||||
}
|
||||
|
||||
type execOption func(*execCtx)
|
||||
|
||||
const (
|
||||
statusUndefined int = iota
|
||||
statusOK
|
||||
statusINHUMED
|
||||
statusVIRTUAL
|
||||
statusOutOfRange
|
||||
)
|
||||
|
||||
func headOnly() execOption {
|
||||
return func(c *execCtx) {
|
||||
c.head = true
|
||||
}
|
||||
}
|
||||
|
||||
func withPayloadRange(r *objectSDK.Range) execOption {
|
||||
return func(c *execCtx) {
|
||||
c.prm.rng = r
|
||||
}
|
||||
}
|
||||
|
||||
func (exec *execCtx) setLogger(l *logger.Logger) {
|
||||
req := "GET"
|
||||
if exec.headOnly() {
|
||||
req = "HEAD"
|
||||
} else if exec.ctxRange() != nil {
|
||||
req = "GET_RANGE"
|
||||
}
|
||||
|
||||
exec.log = &logger.Logger{Logger: l.With(
|
||||
zap.String("request", req),
|
||||
zap.Stringer("address", exec.address()),
|
||||
zap.Bool("raw", exec.isRaw()),
|
||||
zap.Bool("local", exec.isLocal()),
|
||||
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
|
||||
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
|
||||
)}
|
||||
}
|
||||
|
||||
func (exec execCtx) isLocal() bool {
|
||||
return exec.prm.common.LocalOnly()
|
||||
}
|
||||
|
||||
func (exec execCtx) isRaw() bool {
|
||||
return exec.prm.raw
|
||||
}
|
||||
|
||||
func (exec execCtx) address() oid.Address {
|
||||
return exec.prm.addr
|
||||
}
|
||||
|
||||
func (exec execCtx) key() (*ecdsa.PrivateKey, error) {
|
||||
if exec.prm.signerKey != nil {
|
||||
// the key has already been requested and
|
||||
// cached in the previous operations
|
||||
return exec.prm.signerKey, nil
|
||||
}
|
||||
|
||||
var sessionInfo *util.SessionInfo
|
||||
|
||||
if tok := exec.prm.common.SessionToken(); tok != nil {
|
||||
sessionInfo = &util.SessionInfo{
|
||||
ID: tok.ID(),
|
||||
Owner: tok.Issuer(),
|
||||
}
|
||||
}
|
||||
|
||||
return exec.svc.keyStore.GetKey(sessionInfo)
|
||||
}
|
||||
|
||||
func (exec *execCtx) canAssemble() bool {
|
||||
return !exec.isRaw() && !exec.headOnly()
|
||||
}
|
||||
|
||||
func (exec *execCtx) splitInfo() *objectSDK.SplitInfo {
|
||||
return exec.infoSplit
|
||||
}
|
||||
|
||||
func (exec *execCtx) containerID() cid.ID {
|
||||
return exec.address().Container()
|
||||
}
|
||||
|
||||
func (exec *execCtx) ctxRange() *objectSDK.Range {
|
||||
return exec.prm.rng
|
||||
}
|
||||
|
||||
func (exec *execCtx) headOnly() bool {
|
||||
return exec.head
|
||||
}
|
||||
|
||||
func (exec *execCtx) netmapEpoch() uint64 {
|
||||
return exec.prm.common.NetmapEpoch()
|
||||
}
|
||||
|
||||
func (exec *execCtx) netmapLookupDepth() uint64 {
|
||||
return exec.prm.common.NetmapLookupDepth()
|
||||
}
|
||||
|
||||
func (exec *execCtx) initEpoch() bool {
|
||||
exec.curProcEpoch = exec.netmapEpoch()
|
||||
if exec.curProcEpoch > 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
e, err := exec.svc.currentEpochReceiver.currentEpoch()
|
||||
|
||||
switch {
|
||||
default:
|
||||
exec.status = statusUndefined
|
||||
exec.err = err
|
||||
|
||||
exec.log.Debug(logs.CouldNotGetCurrentEpochNumber,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return false
|
||||
case err == nil:
|
||||
exec.curProcEpoch = e
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (exec *execCtx) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
|
||||
obj := addr.Object()
|
||||
|
||||
t, err := exec.svc.traverserGenerator.GenerateTraverser(addr.Container(), &obj, exec.curProcEpoch)
|
||||
|
||||
switch {
|
||||
default:
|
||||
exec.status = statusUndefined
|
||||
exec.err = err
|
||||
|
||||
exec.log.Debug(logs.GetCouldNotGenerateContainerTraverser,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
|
||||
return nil, false
|
||||
case err == nil:
|
||||
return t, true
|
||||
}
|
||||
}
|
||||
|
||||
func (exec execCtx) remoteClient(info clientcore.NodeInfo) (getClient, bool) {
|
||||
c, err := exec.svc.clientCache.get(info)
|
||||
|
||||
switch {
|
||||
default:
|
||||
exec.status = statusUndefined
|
||||
exec.err = err
|
||||
|
||||
exec.log.Debug(logs.GetCouldNotConstructRemoteNodeClient)
|
||||
case err == nil:
|
||||
return c, true
|
||||
}
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
|
||||
if last, ok := src.LastPart(); ok {
|
||||
dst.SetLastPart(last)
|
||||
}
|
||||
|
||||
if link, ok := src.Link(); ok {
|
||||
dst.SetLink(link)
|
||||
}
|
||||
|
||||
if splitID := src.SplitID(); splitID != nil {
|
||||
dst.SetSplitID(splitID)
|
||||
}
|
||||
}
|
||||
|
||||
func (exec *execCtx) writeCollectedHeader(ctx context.Context) bool {
|
||||
if exec.ctxRange() != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
err := exec.prm.objWriter.WriteHeader(
|
||||
ctx,
|
||||
exec.collectedObject.CutPayload(),
|
||||
)
|
||||
|
||||
switch {
|
||||
default:
|
||||
exec.status = statusUndefined
|
||||
exec.err = err
|
||||
|
||||
exec.log.Debug(logs.GetCouldNotWriteHeader,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
case err == nil:
|
||||
exec.status = statusOK
|
||||
exec.err = nil
|
||||
}
|
||||
|
||||
return exec.status == statusOK
|
||||
}
|
||||
|
||||
func (exec *execCtx) writeObjectPayload(ctx context.Context, obj *objectSDK.Object) bool {
|
||||
if exec.headOnly() {
|
||||
return true
|
||||
}
|
||||
|
||||
err := exec.prm.objWriter.WriteChunk(ctx, obj.Payload())
|
||||
|
||||
switch {
|
||||
default:
|
||||
exec.status = statusUndefined
|
||||
exec.err = err
|
||||
|
||||
exec.log.Debug(logs.GetCouldNotWritePayloadChunk,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
case err == nil:
|
||||
exec.status = statusOK
|
||||
exec.err = nil
|
||||
}
|
||||
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (exec *execCtx) writeCollectedObject(ctx context.Context) {
|
||||
if ok := exec.writeCollectedHeader(ctx); ok {
|
||||
exec.writeObjectPayload(ctx, exec.collectedObject)
|
||||
}
|
||||
}
|
||||
|
||||
// isForwardingEnabled returns true if common execution
|
||||
// parameters has request forwarding closure set.
|
||||
func (exec execCtx) isForwardingEnabled() bool {
|
||||
return exec.prm.forwarder != nil
|
||||
}
|
||||
|
||||
// disableForwarding removes request forwarding closure from common
|
||||
// parameters, so it won't be inherited in new execution contexts.
|
||||
func (exec *execCtx) disableForwarding() {
|
||||
exec.prm.SetRequestForwarder(nil)
|
||||
}
|
|
@ -11,18 +11,18 @@ import (
|
|||
|
||||
// Get serves a request to get an object by address, and returns Streamer instance.
|
||||
func (s *Service) Get(ctx context.Context, prm Prm) error {
|
||||
return s.get(ctx, prm.commonPrm).err
|
||||
return s.get(ctx, RequestParameters{
|
||||
commonPrm: prm.commonPrm,
|
||||
})
|
||||
}
|
||||
|
||||
// GetRange serves a request to get an object by address, and returns Streamer instance.
|
||||
func (s *Service) GetRange(ctx context.Context, prm RangePrm) error {
|
||||
return s.getRange(ctx, prm)
|
||||
return s.get(ctx, RequestParameters{
|
||||
commonPrm: prm.commonPrm,
|
||||
rng: prm.rng,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Service) getRange(ctx context.Context, prm RangePrm, opts ...execOption) error {
|
||||
return s.get(ctx, prm.commonPrm, append(opts, withPayloadRange(prm.rng))...).err
|
||||
}
|
||||
|
||||
func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHashRes, error) {
|
||||
hashes := make([][]byte, 0, len(prm.rngs))
|
||||
|
||||
|
@ -34,16 +34,15 @@ func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHas
|
|||
// 1. Potential gains are insignificant when operating in the Internet given typical latencies and losses.
|
||||
// 2. Parallel solution is more complex in terms of code.
|
||||
// 3. TZ-hash is likely to be disabled in private installations.
|
||||
rngPrm := RangePrm{
|
||||
reqPrm := RequestParameters{
|
||||
commonPrm: prm.commonPrm,
|
||||
rng: &rng,
|
||||
}
|
||||
|
||||
rngPrm.SetRange(&rng)
|
||||
rngPrm.SetChunkWriter(&hasherWrapper{
|
||||
reqPrm.SetChunkWriter(&hasherWrapper{
|
||||
hash: util.NewSaltingWriter(h, prm.salt),
|
||||
})
|
||||
|
||||
if err := s.getRange(ctx, rngPrm); err != nil {
|
||||
if err := s.get(ctx, reqPrm); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
@ -60,30 +59,32 @@ func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHas
|
|||
// Returns ErrNotFound if the header was not received for the call.
|
||||
// Returns SplitInfoError if object is virtual and raw flag is set.
|
||||
func (s *Service) Head(ctx context.Context, prm HeadPrm) error {
|
||||
return s.get(ctx, prm.commonPrm, headOnly()).err
|
||||
return s.get(ctx, RequestParameters{
|
||||
head: true,
|
||||
commonPrm: prm.commonPrm,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) statusError {
|
||||
exec := &execCtx{
|
||||
svc: s,
|
||||
prm: RangePrm{
|
||||
commonPrm: prm,
|
||||
},
|
||||
infoSplit: object.NewSplitInfo(),
|
||||
}
|
||||
func (s *Service) get(ctx context.Context, prm RequestParameters) error {
|
||||
exec := &request{
|
||||
keyStore: s.keyStore,
|
||||
traverserGenerator: s.traverserGenerator,
|
||||
remoteStorageConstructor: s.remoteStorageConstructor,
|
||||
epochSource: s.epochSource,
|
||||
localStorage: s.localStorage,
|
||||
|
||||
for i := range opts {
|
||||
opts[i](exec)
|
||||
prm: prm,
|
||||
infoSplit: object.NewSplitInfo(),
|
||||
}
|
||||
|
||||
exec.setLogger(s.log)
|
||||
|
||||
exec.execute(ctx)
|
||||
|
||||
return exec.statusError
|
||||
return exec.statusError.err
|
||||
}
|
||||
|
||||
func (exec *execCtx) execute(ctx context.Context) {
|
||||
func (exec *request) execute(ctx context.Context) {
|
||||
exec.log.Debug(logs.ServingRequest)
|
||||
|
||||
// perform local operation
|
||||
|
@ -92,7 +93,7 @@ func (exec *execCtx) execute(ctx context.Context) {
|
|||
exec.analyzeStatus(ctx, true)
|
||||
}
|
||||
|
||||
func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) {
|
||||
func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
|
||||
// analyze local result
|
||||
switch exec.status {
|
||||
case statusOK:
|
||||
|
@ -106,7 +107,7 @@ func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) {
|
|||
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
|
||||
default:
|
||||
exec.log.Debug(logs.OperationFinishedWithError,
|
||||
zap.String("error", exec.err.Error()),
|
||||
zap.Error(exec.err),
|
||||
)
|
||||
|
||||
if execCnr {
|
||||
|
|
|
@ -2,6 +2,7 @@ package getsvc
|
|||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
@ -56,7 +57,7 @@ type testClient struct {
|
|||
|
||||
type testEpochReceiver uint64
|
||||
|
||||
func (e testEpochReceiver) currentEpoch() (uint64, error) {
|
||||
func (e testEpochReceiver) Epoch() (uint64, error) {
|
||||
return uint64(e), nil
|
||||
}
|
||||
|
||||
|
@ -99,7 +100,7 @@ func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.
|
|||
return vs, nil
|
||||
}
|
||||
|
||||
func (c *testClientCache) get(info client.NodeInfo) (getClient, error) {
|
||||
func (c *testClientCache) Get(info client.NodeInfo) (remoteStorage, error) {
|
||||
v, ok := c.clients[network.StringifyGroup(info.AddressGroup())]
|
||||
if !ok {
|
||||
return nil, errors.New("could not construct client")
|
||||
|
@ -117,8 +118,15 @@ func newTestClient() *testClient {
|
|||
}
|
||||
}
|
||||
|
||||
func (c *testClient) getObject(ctx context.Context, exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) {
|
||||
v, ok := c.results[exec.address().EncodeToString()]
|
||||
func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err error) {
|
||||
c.results[addr.EncodeToString()] = struct {
|
||||
obj *objectSDK.Object
|
||||
err error
|
||||
}{obj: obj, err: err}
|
||||
}
|
||||
|
||||
func (c *testClient) Get(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
|
||||
v, ok := c.results[address.EncodeToString()]
|
||||
if !ok {
|
||||
var errNotFound apistatus.ObjectNotFound
|
||||
|
||||
|
@ -129,21 +137,38 @@ func (c *testClient) getObject(ctx context.Context, exec *execCtx, _ client.Node
|
|||
return nil, v.err
|
||||
}
|
||||
|
||||
return cutToRange(v.obj, exec.ctxRange()), nil
|
||||
return v.obj, nil
|
||||
}
|
||||
|
||||
func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err error) {
|
||||
c.results[addr.EncodeToString()] = struct {
|
||||
obj *objectSDK.Object
|
||||
err error
|
||||
}{obj: obj, err: err}
|
||||
func (c *testClient) Head(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
|
||||
return c.Get(ctx, address, requestParams)
|
||||
}
|
||||
|
||||
func (s *testStorage) get(_ context.Context, exec *execCtx) (*objectSDK.Object, error) {
|
||||
func (c *testClient) Range(ctx context.Context, address oid.Address, rng *objectSDK.Range, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
|
||||
obj, err := c.Get(ctx, address, requestParams)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return cutToRange(obj, rng), nil
|
||||
}
|
||||
|
||||
func (c *testClient) ForwardRequest(ctx context.Context, info client.NodeInfo, forwarder RequestForwarder) (*objectSDK.Object, error) {
|
||||
return nil, fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
func (s *testStorage) Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error) {
|
||||
return s.Range(ctx, address, nil)
|
||||
}
|
||||
|
||||
func (s *testStorage) Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error) {
|
||||
return s.Range(ctx, address, nil)
|
||||
}
|
||||
|
||||
func (s *testStorage) Range(_ context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error) {
|
||||
var (
|
||||
ok bool
|
||||
obj *objectSDK.Object
|
||||
sAddr = exec.address().EncodeToString()
|
||||
sAddr = address.EncodeToString()
|
||||
)
|
||||
|
||||
if _, ok = s.inhumed[sAddr]; ok {
|
||||
|
@ -157,7 +182,7 @@ func (s *testStorage) get(_ context.Context, exec *execCtx) (*objectSDK.Object,
|
|||
}
|
||||
|
||||
if obj, ok = s.phy[sAddr]; ok {
|
||||
return cutToRange(obj, exec.ctxRange()), nil
|
||||
return cutToRange(obj, rng), nil
|
||||
}
|
||||
|
||||
var errNotFound apistatus.ObjectNotFound
|
||||
|
@ -241,15 +266,21 @@ func (w *writePayloadErrorObjectWriter) WriteChunk(_ context.Context, _ []byte)
|
|||
return &writePayloadError{}
|
||||
}
|
||||
|
||||
type testKeyStorage struct {
|
||||
}
|
||||
|
||||
func (ks *testKeyStorage) GetKey(_ *util.SessionInfo) (*ecdsa.PrivateKey, error) {
|
||||
return &ecdsa.PrivateKey{}, nil
|
||||
}
|
||||
|
||||
func TestGetLocalOnly(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
newSvc := func(storage *testStorage) *Service {
|
||||
svc := &Service{cfg: new(cfg)}
|
||||
svc.log = test.NewLogger(t, false)
|
||||
svc.localStorage = storage
|
||||
|
||||
return svc
|
||||
return &Service{
|
||||
log: test.NewLogger(t, false),
|
||||
localStorage: storage,
|
||||
}
|
||||
}
|
||||
|
||||
newPrm := func(raw bool, w ObjectWriter) Prm {
|
||||
|
@ -506,22 +537,21 @@ func TestGetRemoteSmall(t *testing.T) {
|
|||
container.CalculateID(&idCnr, cnr)
|
||||
|
||||
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
|
||||
svc := &Service{cfg: new(cfg)}
|
||||
svc.log = test.NewLogger(t, false)
|
||||
svc.localStorage = newTestStorage()
|
||||
|
||||
const curEpoch = 13
|
||||
|
||||
svc.traverserGenerator = &testTraverserGenerator{
|
||||
c: cnr,
|
||||
b: map[uint64]placement.Builder{
|
||||
curEpoch: b,
|
||||
return &Service{
|
||||
log: test.NewLogger(t, false),
|
||||
localStorage: newTestStorage(),
|
||||
traverserGenerator: &testTraverserGenerator{
|
||||
c: cnr,
|
||||
b: map[uint64]placement.Builder{
|
||||
curEpoch: b,
|
||||
},
|
||||
},
|
||||
epochSource: testEpochReceiver(curEpoch),
|
||||
remoteStorageConstructor: c,
|
||||
keyStore: &testKeyStorage{},
|
||||
}
|
||||
svc.clientCache = c
|
||||
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
|
||||
|
||||
return svc
|
||||
}
|
||||
|
||||
newPrm := func(raw bool, w ObjectWriter) Prm {
|
||||
|
@ -1176,7 +1206,7 @@ func TestGetRemoteSmall(t *testing.T) {
|
|||
|
||||
err := svc.Get(ctx, p)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, err.Error(), "received child with empty parent")
|
||||
require.ErrorIs(t, err, errChildWithEmptyParent)
|
||||
|
||||
w = NewSimpleObjectWriter()
|
||||
payloadSz := srcObj.PayloadSize()
|
||||
|
@ -1189,7 +1219,7 @@ func TestGetRemoteSmall(t *testing.T) {
|
|||
|
||||
err = svc.GetRange(ctx, rngPrm)
|
||||
require.Error(t, err)
|
||||
require.Equal(t, err.Error(), "received child with empty parent")
|
||||
require.ErrorIs(t, err, errChildWithEmptyParent)
|
||||
})
|
||||
|
||||
t.Run("out of range", func(t *testing.T) {
|
||||
|
@ -1639,39 +1669,38 @@ func TestGetFromPastEpoch(t *testing.T) {
|
|||
c22 := newTestClient()
|
||||
c22.addResult(addr, obj, nil)
|
||||
|
||||
svc := &Service{cfg: new(cfg)}
|
||||
svc.log = test.NewLogger(t, false)
|
||||
svc.localStorage = newTestStorage()
|
||||
|
||||
const curEpoch = 13
|
||||
|
||||
svc.traverserGenerator = &testTraverserGenerator{
|
||||
c: cnr,
|
||||
b: map[uint64]placement.Builder{
|
||||
curEpoch: &testPlacementBuilder{
|
||||
vectors: map[string][][]netmap.NodeInfo{
|
||||
addr.EncodeToString(): ns[:1],
|
||||
svc := &Service{
|
||||
log: test.NewLogger(t, false),
|
||||
localStorage: newTestStorage(),
|
||||
epochSource: testEpochReceiver(curEpoch),
|
||||
traverserGenerator: &testTraverserGenerator{
|
||||
c: cnr,
|
||||
b: map[uint64]placement.Builder{
|
||||
curEpoch: &testPlacementBuilder{
|
||||
vectors: map[string][][]netmap.NodeInfo{
|
||||
addr.EncodeToString(): ns[:1],
|
||||
},
|
||||
},
|
||||
},
|
||||
curEpoch - 1: &testPlacementBuilder{
|
||||
vectors: map[string][][]netmap.NodeInfo{
|
||||
addr.EncodeToString(): ns[1:],
|
||||
curEpoch - 1: &testPlacementBuilder{
|
||||
vectors: map[string][][]netmap.NodeInfo{
|
||||
addr.EncodeToString(): ns[1:],
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
svc.clientCache = &testClientCache{
|
||||
clients: map[string]*testClient{
|
||||
as[0][0]: c11,
|
||||
as[0][1]: c12,
|
||||
as[1][0]: c21,
|
||||
as[1][1]: c22,
|
||||
remoteStorageConstructor: &testClientCache{
|
||||
clients: map[string]*testClient{
|
||||
as[0][0]: c11,
|
||||
as[0][1]: c12,
|
||||
as[1][0]: c21,
|
||||
as[1][1]: c22,
|
||||
},
|
||||
},
|
||||
keyStore: &testKeyStorage{},
|
||||
}
|
||||
|
||||
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
|
||||
|
||||
w := NewSimpleObjectWriter()
|
||||
|
||||
commonPrm := new(util.CommonPrm)
|
||||
|
|
|
@ -11,7 +11,7 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (exec *execCtx) executeLocal(ctx context.Context) {
|
||||
func (r *request) executeLocal(ctx context.Context) {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "getService.executeLocal")
|
||||
defer func() {
|
||||
span.End()
|
||||
|
@ -19,7 +19,7 @@ func (exec *execCtx) executeLocal(ctx context.Context) {
|
|||
|
||||
var err error
|
||||
|
||||
exec.collectedObject, err = exec.svc.localStorage.get(ctx, exec)
|
||||
r.collectedObject, err = r.get(ctx)
|
||||
|
||||
var errSplitInfo *objectSDK.SplitInfoError
|
||||
var errRemoved apistatus.ObjectAlreadyRemoved
|
||||
|
@ -27,25 +27,33 @@ func (exec *execCtx) executeLocal(ctx context.Context) {
|
|||
|
||||
switch {
|
||||
default:
|
||||
exec.status = statusUndefined
|
||||
exec.err = err
|
||||
r.status = statusUndefined
|
||||
r.err = err
|
||||
|
||||
exec.log.Debug(logs.GetLocalGetFailed,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
r.log.Debug(logs.GetLocalGetFailed, zap.Error(err))
|
||||
case err == nil:
|
||||
exec.status = statusOK
|
||||
exec.err = nil
|
||||
exec.writeCollectedObject(ctx)
|
||||
r.status = statusOK
|
||||
r.err = nil
|
||||
r.writeCollectedObject(ctx)
|
||||
case errors.As(err, &errRemoved):
|
||||
exec.status = statusINHUMED
|
||||
exec.err = errRemoved
|
||||
r.status = statusINHUMED
|
||||
r.err = errRemoved
|
||||
case errors.As(err, &errSplitInfo):
|
||||
exec.status = statusVIRTUAL
|
||||
mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo())
|
||||
exec.err = objectSDK.NewSplitInfoError(exec.infoSplit)
|
||||
r.status = statusVIRTUAL
|
||||
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
||||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||
case errors.As(err, &errOutOfRange):
|
||||
exec.status = statusOutOfRange
|
||||
exec.err = errOutOfRange
|
||||
r.status = statusOutOfRange
|
||||
r.err = errOutOfRange
|
||||
}
|
||||
}
|
||||
|
||||
func (r *request) get(ctx context.Context) (*objectSDK.Object, error) {
|
||||
if r.headOnly() {
|
||||
return r.localStorage.Head(ctx, r.address(), r.isRaw())
|
||||
}
|
||||
if rng := r.ctxRange(); rng != nil {
|
||||
return r.localStorage.Range(ctx, r.address(), rng)
|
||||
}
|
||||
return r.localStorage.Get(ctx, r.address())
|
||||
}
|
||||
|
|
|
@ -3,12 +3,11 @@ package getsvc
|
|||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"hash"
|
||||
|
||||
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
)
|
||||
|
||||
|
@ -21,14 +20,9 @@ type Prm struct {
|
|||
type RangePrm struct {
|
||||
commonPrm
|
||||
|
||||
rng *object.Range
|
||||
rng *objectSDK.Range
|
||||
}
|
||||
|
||||
var (
|
||||
errRangeZeroLength = errors.New("zero range length")
|
||||
errRangeOverflow = errors.New("range overflow")
|
||||
)
|
||||
|
||||
// Validate pre-validates `OBJECTRANGE` request's parameters content
|
||||
// without access to the requested object's payload.
|
||||
func (p RangePrm) Validate() error {
|
||||
|
@ -54,12 +48,18 @@ type RangeHashPrm struct {
|
|||
|
||||
hashGen func() hash.Hash
|
||||
|
||||
rngs []object.Range
|
||||
rngs []objectSDK.Range
|
||||
|
||||
salt []byte
|
||||
}
|
||||
|
||||
type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error)
|
||||
type RequestParameters struct {
|
||||
commonPrm
|
||||
head bool
|
||||
rng *objectSDK.Range
|
||||
}
|
||||
|
||||
type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) (*objectSDK.Object, error)
|
||||
|
||||
// HeadPrm groups parameters of Head service call.
|
||||
type HeadPrm struct {
|
||||
|
@ -83,43 +83,25 @@ type commonPrm struct {
|
|||
signerKey *ecdsa.PrivateKey
|
||||
}
|
||||
|
||||
// ChunkWriter is an interface of target component
|
||||
// to write payload chunk.
|
||||
type ChunkWriter interface {
|
||||
WriteChunk(context.Context, []byte) error
|
||||
}
|
||||
|
||||
// HeaderWriter is an interface of target component
|
||||
// to write object header.
|
||||
type HeaderWriter interface {
|
||||
WriteHeader(context.Context, *object.Object) error
|
||||
}
|
||||
|
||||
// ObjectWriter is an interface of target component to write object.
|
||||
type ObjectWriter interface {
|
||||
HeaderWriter
|
||||
ChunkWriter
|
||||
}
|
||||
|
||||
// SetObjectWriter sets target component to write the object.
|
||||
func (p *Prm) SetObjectWriter(w ObjectWriter) {
|
||||
p.objWriter = w
|
||||
}
|
||||
|
||||
// SetChunkWriter sets target component to write the object payload range.
|
||||
func (p *RangePrm) SetChunkWriter(w ChunkWriter) {
|
||||
func (p *commonPrm) SetChunkWriter(w ChunkWriter) {
|
||||
p.objWriter = &partWriter{
|
||||
chunkWriter: w,
|
||||
}
|
||||
}
|
||||
|
||||
// SetRange sets range of the requested payload data.
|
||||
func (p *RangePrm) SetRange(rng *object.Range) {
|
||||
func (p *RangePrm) SetRange(rng *objectSDK.Range) {
|
||||
p.rng = rng
|
||||
}
|
||||
|
||||
// SetRangeList sets a list of object payload ranges.
|
||||
func (p *RangeHashPrm) SetRangeList(rngs []object.Range) {
|
||||
func (p *RangeHashPrm) SetRangeList(rngs []objectSDK.Range) {
|
||||
p.rngs = rngs
|
||||
}
|
||||
|
||||
|
@ -158,7 +140,7 @@ func (p *commonPrm) WithCachedSignerKey(signerKey *ecdsa.PrivateKey) {
|
|||
}
|
||||
|
||||
// SetHeaderWriter sets target component to write the object header.
|
||||
func (p *HeadPrm) SetHeaderWriter(w HeaderWriter) {
|
||||
func (p *commonPrm) SetHeaderWriter(w HeaderWriter) {
|
||||
p.objWriter = &partWriter{
|
||||
headWriter: w,
|
||||
}
|
||||
|
|
|
@ -12,18 +12,18 @@ import (
|
|||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool {
|
||||
func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
||||
ctx, span := tracing.StartSpanFromContext(ctx, "getService.processNode")
|
||||
defer span.End()
|
||||
|
||||
exec.log.Debug(logs.ProcessingNode)
|
||||
r.log.Debug(logs.ProcessingNode)
|
||||
|
||||
client, ok := exec.remoteClient(info)
|
||||
rs, ok := r.getRemoteStorage(info)
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
|
||||
obj, err := client.getObject(ctx, exec, info)
|
||||
obj, err := r.getRemote(ctx, rs, info)
|
||||
|
||||
var errSplitInfo *objectSDK.SplitInfoError
|
||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||
|
@ -33,34 +33,66 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool
|
|||
default:
|
||||
var errNotFound apistatus.ObjectNotFound
|
||||
|
||||
exec.status = statusUndefined
|
||||
exec.err = errNotFound
|
||||
r.status = statusUndefined
|
||||
r.err = errNotFound
|
||||
|
||||
exec.log.Debug(logs.GetRemoteCallFailed,
|
||||
zap.String("error", err.Error()),
|
||||
)
|
||||
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
||||
case err == nil:
|
||||
exec.status = statusOK
|
||||
exec.err = nil
|
||||
r.status = statusOK
|
||||
r.err = nil
|
||||
|
||||
// both object and err are nil only if the original
|
||||
// request was forwarded to another node and the object
|
||||
// has already been streamed to the requesting party
|
||||
if obj != nil {
|
||||
exec.collectedObject = obj
|
||||
exec.writeCollectedObject(ctx)
|
||||
r.collectedObject = obj
|
||||
r.writeCollectedObject(ctx)
|
||||
}
|
||||
case errors.As(err, &errRemoved):
|
||||
exec.status = statusINHUMED
|
||||
exec.err = errRemoved
|
||||
r.status = statusINHUMED
|
||||
r.err = errRemoved
|
||||
case errors.As(err, &errOutOfRange):
|
||||
exec.status = statusOutOfRange
|
||||
exec.err = errOutOfRange
|
||||
r.status = statusOutOfRange
|
||||
r.err = errOutOfRange
|
||||
case errors.As(err, &errSplitInfo):
|
||||
exec.status = statusVIRTUAL
|
||||
mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo())
|
||||
exec.err = objectSDK.NewSplitInfoError(exec.infoSplit)
|
||||
r.status = statusVIRTUAL
|
||||
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
||||
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||
}
|
||||
|
||||
return exec.status != statusUndefined
|
||||
return r.status != statusUndefined
|
||||
}
|
||||
|
||||
func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.NodeInfo) (*objectSDK.Object, error) {
|
||||
if r.isForwardingEnabled() {
|
||||
return rs.ForwardRequest(ctx, info, r.prm.forwarder)
|
||||
}
|
||||
|
||||
key, err := r.key()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
prm := RemoteRequestParams{
|
||||
Epoch: r.curProcEpoch,
|
||||
TTL: r.prm.common.TTL(),
|
||||
PrivateKey: key,
|
||||
SessionToken: r.prm.common.SessionToken(),
|
||||
BearerToken: r.prm.common.BearerToken(),
|
||||
XHeaders: r.prm.common.XHeaders(),
|
||||
IsRaw: r.isRaw(),
|
||||
}
|
||||
|
||||
if r.headOnly() {
|
||||
return rs.Head(ctx, r.address(), prm)
|
||||
}
|
||||
// we don't specify payload writer because we accumulate
|
||||
// the object locally (even huge).
|
||||
if rng := r.ctxRange(); rng != nil {
|
||||
// Current spec allows other storage node to deny access,
|
||||
// fallback to GET here.
|
||||
return rs.Range(ctx, r.address(), rng, prm)
|
||||
}
|
||||
|
||||
return rs.Get(ctx, r.address(), prm)
|
||||
}
|
||||
|
|
244
pkg/services/object/get/request.go
Normal file
|
@ -0,0 +1,244 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type request struct {
|
||||
prm RequestParameters
|
||||
|
||||
statusError
|
||||
|
||||
infoSplit *objectSDK.SplitInfo
|
||||
|
||||
log *logger.Logger
|
||||
|
||||
collectedObject *objectSDK.Object
|
||||
|
||||
curProcEpoch uint64
|
||||
|
||||
keyStore keyStorage
|
||||
epochSource epochSource
|
||||
traverserGenerator traverserGenerator
|
||||
remoteStorageConstructor remoteStorageConstructor
|
||||
localStorage localStorage
|
||||
}
|
||||
|
||||
func (r *request) setLogger(l *logger.Logger) {
|
||||
req := "GET"
|
||||
if r.headOnly() {
|
||||
req = "HEAD"
|
||||
} else if r.ctxRange() != nil {
|
||||
req = "GET_RANGE"
|
||||
}
|
||||
|
||||
r.log = &logger.Logger{Logger: l.With(
|
||||
zap.String("request", req),
|
||||
zap.Stringer("address", r.address()),
|
||||
zap.Bool("raw", r.isRaw()),
|
||||
zap.Bool("local", r.isLocal()),
|
||||
zap.Bool("with session", r.prm.common.SessionToken() != nil),
|
||||
zap.Bool("with bearer", r.prm.common.BearerToken() != nil),
|
||||
)}
|
||||
}
|
||||
|
||||
func (r *request) isLocal() bool {
|
||||
return r.prm.common.LocalOnly()
|
||||
}
|
||||
|
||||
func (r *request) isRaw() bool {
|
||||
return r.prm.raw
|
||||
}
|
||||
|
||||
func (r *request) address() oid.Address {
|
||||
return r.prm.addr
|
||||
}
|
||||
|
||||
func (r *request) key() (*ecdsa.PrivateKey, error) {
|
||||
if r.prm.signerKey != nil {
|
||||
// the key has already been requested and
|
||||
// cached in the previous operations
|
||||
return r.prm.signerKey, nil
|
||||
}
|
||||
|
||||
var sessionInfo *util.SessionInfo
|
||||
|
||||
if tok := r.prm.common.SessionToken(); tok != nil {
|
||||
sessionInfo = &util.SessionInfo{
|
||||
ID: tok.ID(),
|
||||
Owner: tok.Issuer(),
|
||||
}
|
||||
}
|
||||
|
||||
return r.keyStore.GetKey(sessionInfo)
|
||||
}
|
||||
|
||||
func (r *request) canAssemble() bool {
|
||||
return !r.isRaw() && !r.headOnly()
|
||||
}
|
||||
|
||||
func (r *request) splitInfo() *objectSDK.SplitInfo {
|
||||
return r.infoSplit
|
||||
}
|
||||
|
||||
func (r *request) containerID() cid.ID {
|
||||
return r.address().Container()
|
||||
}
|
||||
|
||||
func (r *request) ctxRange() *objectSDK.Range {
|
||||
return r.prm.rng
|
||||
}
|
||||
|
||||
func (r *request) headOnly() bool {
|
||||
return r.prm.head
|
||||
}
|
||||
|
||||
func (r *request) netmapEpoch() uint64 {
|
||||
return r.prm.common.NetmapEpoch()
|
||||
}
|
||||
|
||||
func (r *request) netmapLookupDepth() uint64 {
|
||||
return r.prm.common.NetmapLookupDepth()
|
||||
}
|
||||
|
||||
func (r *request) initEpoch() bool {
|
||||
r.curProcEpoch = r.netmapEpoch()
|
||||
if r.curProcEpoch > 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
e, err := r.epochSource.Epoch()
|
||||
|
||||
switch {
|
||||
default:
|
||||
r.status = statusUndefined
|
||||
r.err = err
|
||||
|
||||
r.log.Debug(logs.CouldNotGetCurrentEpochNumber, zap.Error(err))
|
||||
|
||||
return false
|
||||
case err == nil:
|
||||
r.curProcEpoch = e
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
|
||||
obj := addr.Object()
|
||||
|
||||
t, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch)
|
||||
|
||||
switch {
|
||||
default:
|
||||
r.status = statusUndefined
|
||||
r.err = err
|
||||
|
||||
r.log.Debug(logs.GetCouldNotGenerateContainerTraverser, zap.Error(err))
|
||||
|
||||
return nil, false
|
||||
case err == nil:
|
||||
return t, true
|
||||
}
|
||||
}
|
||||
|
||||
func (r *request) getRemoteStorage(info clientcore.NodeInfo) (remoteStorage, bool) {
|
||||
rs, err := r.remoteStorageConstructor.Get(info)
|
||||
if err != nil {
|
||||
r.status = statusUndefined
|
||||
r.err = err
|
||||
|
||||
r.log.Debug(logs.GetCouldNotConstructRemoteNodeClient)
|
||||
|
||||
return nil, false
|
||||
}
|
||||
|
||||
return rs, true
|
||||
}
|
||||
|
||||
func (r *request) writeCollectedHeader(ctx context.Context) bool {
|
||||
if r.ctxRange() != nil {
|
||||
return true
|
||||
}
|
||||
|
||||
err := r.prm.objWriter.WriteHeader(
|
||||
ctx,
|
||||
r.collectedObject.CutPayload(),
|
||||
)
|
||||
|
||||
switch {
|
||||
default:
|
||||
r.status = statusUndefined
|
||||
r.err = err
|
||||
|
||||
r.log.Debug(logs.GetCouldNotWriteHeader, zap.Error(err))
|
||||
case err == nil:
|
||||
r.status = statusOK
|
||||
r.err = nil
|
||||
}
|
||||
|
||||
return r.status == statusOK
|
||||
}
|
||||
|
||||
func (r *request) writeObjectPayload(ctx context.Context, obj *objectSDK.Object) bool {
|
||||
if r.headOnly() {
|
||||
return true
|
||||
}
|
||||
|
||||
err := r.prm.objWriter.WriteChunk(ctx, obj.Payload())
|
||||
|
||||
switch {
|
||||
default:
|
||||
r.status = statusUndefined
|
||||
r.err = err
|
||||
|
||||
r.log.Debug(logs.GetCouldNotWritePayloadChunk, zap.Error(err))
|
||||
case err == nil:
|
||||
r.status = statusOK
|
||||
r.err = nil
|
||||
}
|
||||
|
||||
return err == nil
|
||||
}
|
||||
|
||||
func (r *request) writeCollectedObject(ctx context.Context) {
|
||||
if ok := r.writeCollectedHeader(ctx); ok {
|
||||
r.writeObjectPayload(ctx, r.collectedObject)
|
||||
}
|
||||
}
|
||||
|
||||
// isForwardingEnabled returns true if common execution
|
||||
// parameters has request forwarding closure set.
|
||||
func (r request) isForwardingEnabled() bool {
|
||||
return r.prm.forwarder != nil
|
||||
}
|
||||
|
||||
// disableForwarding removes request forwarding closure from common
|
||||
// parameters, so it won't be inherited in new execution contexts.
|
||||
func (r *request) disableForwarding() {
|
||||
r.prm.SetRequestForwarder(nil)
|
||||
}
|
||||
|
||||
func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
|
||||
if last, ok := src.LastPart(); ok {
|
||||
dst.SetLastPart(last)
|
||||
}
|
||||
|
||||
if link, ok := src.Link(); ok {
|
||||
dst.SetLink(link)
|
||||
}
|
||||
|
||||
if splitID := src.SplitID(); splitID != nil {
|
||||
dst.SetSplitID(splitID)
|
||||
}
|
||||
}
|
|
@ -1,9 +0,0 @@
|
|||
package getsvc
|
||||
|
||||
type RangeHashRes struct {
|
||||
hashes [][]byte
|
||||
}
|
||||
|
||||
func (r *RangeHashRes) Hashes() [][]byte {
|
||||
return r.hashes
|
||||
}
|
|
@ -1,124 +1,54 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// Option is a Service's constructor option.
|
||||
type Option func(*Service)
|
||||
|
||||
// Service utility serving requests of Object.Get service.
|
||||
type Service struct {
|
||||
*cfg
|
||||
}
|
||||
|
||||
// Option is a Service's constructor option.
|
||||
type Option func(*cfg)
|
||||
|
||||
type getClient interface {
|
||||
getObject(context.Context, *execCtx, client.NodeInfo) (*object.Object, error)
|
||||
}
|
||||
|
||||
type cfg struct {
|
||||
log *logger.Logger
|
||||
|
||||
localStorage interface {
|
||||
get(context.Context, *execCtx) (*object.Object, error)
|
||||
}
|
||||
|
||||
clientCache interface {
|
||||
get(client.NodeInfo) (getClient, error)
|
||||
}
|
||||
|
||||
traverserGenerator interface {
|
||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
|
||||
}
|
||||
|
||||
currentEpochReceiver interface {
|
||||
currentEpoch() (uint64, error)
|
||||
}
|
||||
|
||||
keyStore *util.KeyStorage
|
||||
}
|
||||
|
||||
func defaultCfg() *cfg {
|
||||
return &cfg{
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
localStorage: new(storageEngineWrapper),
|
||||
clientCache: new(clientCacheWrapper),
|
||||
}
|
||||
log *logger.Logger
|
||||
localStorage localStorage
|
||||
traverserGenerator traverserGenerator
|
||||
epochSource epochSource
|
||||
keyStore keyStorage
|
||||
remoteStorageConstructor remoteStorageConstructor
|
||||
}
|
||||
|
||||
// New creates, initializes and returns utility serving
|
||||
fyrchik
commented
Why this change? We use functional options in services and you can do everything with both? Why this change? We use functional options in services and you can do everything with both?
dstepanov-yadro
commented
For example, it was not obvious to me which of the dependencies and options were required and which were not. So For example, it was not obvious to me which of the dependencies and options were required and which were not.
So `New` requires mandatory deps, `With...` - optional.
dstepanov-yadro
commented
Fixed back to Option for logger.
From https://github.com/uber-go/guide/blob/master/style.md#functional-options Fixed back to Option for logger.
> Use this pattern for optional arguments in constructors ...
From https://github.com/uber-go/guide/blob/master/style.md#functional-options
fyrchik
commented
Yes, but 5 positional arguments don't look nice either. Not that I am against the change, but I if we were to commit to a new scheme, I would like to do this atomically across the whole repo. We can create an issue for the discussion. Yes, but 5 positional arguments don't look nice either.
Not that I am against the change, but I if we were to commit to a new scheme, I would like to do this atomically across the whole repo. We can create an issue for the discussion.
ale64bit
commented
I would prefer the positional arguments if they are required. They can be packed into a I would prefer the positional arguments if they are required. They can be packed into a `Options` struct if needed.
It seems relatively easy to build incorrect objects in other places as well, because of this.
fyrchik
commented
I like struct, but it seems we do not enforce anything with it either. I like struct, but it seems we do not enforce anything with it either.
However it may be easier to see what arguments are required.
fyrchik
commented
We can try adopting this linter https://github.com/GaijinEntertainment/go-exhaustruct We can try adopting this linter https://github.com/GaijinEntertainment/go-exhaustruct
ale64bit
commented
Ofc it's not enforced, especially when having pointer fields. But I guess it's a combination of it being conventional and easier to look at the struct and its doc when calling Ofc it's not enforced, especially when having pointer fields. But I guess it's a combination of it being conventional and easier to look at the struct and its doc when calling `New`, rather than at the multiple option builders which might be spread around.
carpawell
commented
how about a struct that shows a caller that it contains only the required params + validaion inside > we do not enforce anything with it either.
how about a struct that shows a caller that it contains only the required params + validaion inside `New` (pointers to be sure it is not a default value)? i do not like 5 positional args too
dstepanov-yadro
commented
What's the difference? Instead of 5 arguments, there will be a structure with 5 fields that will most likely be filled in when calling the For example:
But positional arguments have an advantage: if a new dependency is added to the > > we do not enforce anything with it either.
>
> how about a struct that shows a caller that it contains only the required params + validaion inside `New` (pointers to be sure it is not a default value)? i do not like 5 positional args too
What's the difference? Instead of 5 arguments, there will be a structure with 5 fields that will most likely be filled in when calling the `New` method.
For example:
- `New` declaration: https://git.frostfs.info/TrueCloudLab/frostfs-node/src/branch/master/pkg/innerring/processors/frostfs/processor.go#L88
- `New` call: https://git.frostfs.info/TrueCloudLab/frostfs-node/src/branch/master/pkg/innerring/initialization.go#L412
But positional arguments have an advantage: if a new dependency is added to the `New` method, then with a very high probability the program will not assemble until this dependency is added everywhere.
acid-ant
commented
For me, it looks unreadable, especially when we provide a bunch of nil vars or empty strings. > What's the difference? Instead of 5 arguments, there will be a structure with 5 fields that will most likely be filled in when calling the `New` method.
For me, it looks unreadable, especially when we provide a bunch of nil vars or empty strings.
dstepanov-yadro
commented
It's about required arguments only. If you can pass nil or empty string as an argument value, i think that this argument is not required. For not required arguments use Option. > For me, it looks unreadable, especially when we provide a bunch of nil vars or empty strings.
It's about required arguments only. If you can pass nil or empty string as an argument value, i think that this argument is not required.
For not required arguments use Option.
|
||||
// Object.Get service requests.
|
||||
func New(opts ...Option) *Service {
|
||||
c := defaultCfg()
|
||||
|
||||
for i := range opts {
|
||||
opts[i](c)
|
||||
func New(
|
||||
ks keyStorage,
|
||||
es epochSource,
|
||||
e localStorageEngine,
|
||||
tg traverserGenerator,
|
||||
cc clientConstructor,
|
||||
opts ...Option,
|
||||
) *Service {
|
||||
result := &Service{
|
||||
keyStore: ks,
|
||||
epochSource: es,
|
||||
log: &logger.Logger{Logger: zap.L()},
|
||||
localStorage: &engineLocalStorage{
|
||||
engine: e,
|
||||
},
|
||||
traverserGenerator: tg,
|
||||
remoteStorageConstructor: &multiclientRemoteStorageConstructor{
|
||||
clientConstructor: cc,
|
||||
},
|
||||
}
|
||||
|
||||
return &Service{
|
||||
cfg: c,
|
||||
for _, option := range opts {
|
||||
option(result)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// WithLogger returns option to specify Get service's logger.
|
||||
func WithLogger(l *logger.Logger) Option {
|
||||
return func(c *cfg) {
|
||||
c.log = &logger.Logger{Logger: l.With(zap.String("component", "Object.Get service"))}
|
||||
}
|
||||
}
|
||||
|
||||
// WithLocalStorageEngine returns option to set local storage
|
||||
// instance.
|
||||
func WithLocalStorageEngine(e *engine.StorageEngine) Option {
|
||||
return func(c *cfg) {
|
||||
c.localStorage.(*storageEngineWrapper).engine = e
|
||||
}
|
||||
}
|
||||
|
||||
type ClientConstructor interface {
|
||||
Get(client.NodeInfo) (client.MultiAddressClient, error)
|
||||
}
|
||||
|
||||
// WithClientConstructor returns option to set constructor of remote node clients.
|
||||
func WithClientConstructor(v ClientConstructor) Option {
|
||||
return func(c *cfg) {
|
||||
c.clientCache.(*clientCacheWrapper).cache = v
|
||||
}
|
||||
}
|
||||
|
||||
// WithTraverserGenerator returns option to set generator of
|
||||
// placement traverser to get the objects from containers.
|
||||
func WithTraverserGenerator(t *util.TraverserGenerator) Option {
|
||||
return func(c *cfg) {
|
||||
c.traverserGenerator = t
|
||||
}
|
||||
}
|
||||
|
||||
// WithNetMapSource returns option to set network
|
||||
// map storage to receive current network state.
|
||||
func WithNetMapSource(nmSrc netmap.Source) Option {
|
||||
return func(c *cfg) {
|
||||
c.currentEpochReceiver = &nmSrcWrapper{
|
||||
nmSrc: nmSrc,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// WithKeyStorage returns option to set private
|
||||
// key storage for session tokens and node key.
|
||||
func WithKeyStorage(store *util.KeyStorage) Option {
|
||||
return func(c *cfg) {
|
||||
c.keyStore = store
|
||||
return func(s *Service) {
|
||||
s.log = &logger.Logger{Logger: l.With(zap.String("component", "Object.Get service"))}
|
||||
}
|
||||
}
|
||||
|
|
14
pkg/services/object/get/status.go
Normal file
|
@ -0,0 +1,14 @@
|
|||
package getsvc
|
||||
|
||||
const (
|
||||
statusUndefined int = iota
|
||||
statusOK
|
||||
statusINHUMED
|
||||
statusVIRTUAL
|
||||
statusOutOfRange
|
||||
)
|
||||
|
||||
type statusError struct {
|
||||
status int
|
||||
err error
|
||||
}
|
238
pkg/services/object/get/types.go
Normal file
|
@ -0,0 +1,238 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
|
||||
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||
)
|
||||
|
||||
type epochSource interface {
|
||||
Epoch() (uint64, error)
|
||||
}
|
||||
|
||||
type traverserGenerator interface {
|
||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
|
||||
}
|
||||
|
||||
type keyStorage interface {
|
||||
GetKey(info *util.SessionInfo) (*ecdsa.PrivateKey, error)
|
||||
}
|
||||
|
||||
type localStorageEngine interface {
|
||||
Head(ctx context.Context, p engine.HeadPrm) (engine.HeadRes, error)
|
||||
GetRange(ctx context.Context, p engine.RngPrm) (engine.RngRes, error)
|
||||
Get(ctx context.Context, p engine.GetPrm) (engine.GetRes, error)
|
||||
}
|
||||
|
||||
type clientConstructor interface {
|
||||
Get(coreclient.NodeInfo) (coreclient.MultiAddressClient, error)
|
||||
}
|
||||
|
||||
type remoteStorageConstructor interface {
|
||||
Get(coreclient.NodeInfo) (remoteStorage, error)
|
||||
}
|
||||
|
||||
type multiclientRemoteStorageConstructor struct {
|
||||
clientConstructor clientConstructor
|
||||
}
|
||||
|
||||
func (c *multiclientRemoteStorageConstructor) Get(info coreclient.NodeInfo) (remoteStorage, error) {
|
||||
clt, err := c.clientConstructor.Get(info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &multiaddressRemoteStorage{
|
||||
client: clt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type localStorage interface {
|
||||
Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error)
|
||||
Range(ctx context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error)
|
||||
Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error)
|
||||
}
|
||||
|
||||
type engineLocalStorage struct {
|
||||
engine localStorageEngine
|
||||
}
|
||||
|
||||
func (s *engineLocalStorage) Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error) {
|
||||
var headPrm engine.HeadPrm
|
||||
headPrm.WithAddress(address)
|
||||
headPrm.WithRaw(isRaw)
|
||||
|
||||
r, err := s.engine.Head(ctx, headPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r.Header(), nil
|
||||
}
|
||||
|
||||
func (s *engineLocalStorage) Range(ctx context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error) {
|
||||
var getRange engine.RngPrm
|
||||
getRange.WithAddress(address)
|
||||
getRange.WithPayloadRange(rng)
|
||||
|
||||
r, err := s.engine.GetRange(ctx, getRange)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r.Object(), nil
|
||||
}
|
||||
|
||||
func (s *engineLocalStorage) Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error) {
|
||||
var getPrm engine.GetPrm
|
||||
getPrm.WithAddress(address)
|
||||
|
||||
r, err := s.engine.Get(ctx, getPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r.Object(), nil
|
||||
}
|
||||
|
||||
type RemoteRequestParams struct {
|
||||
Epoch uint64
|
||||
TTL uint32
|
||||
PrivateKey *ecdsa.PrivateKey
|
||||
SessionToken *session.Object
|
||||
BearerToken *bearer.Token
|
||||
XHeaders []string
|
||||
IsRaw bool
|
||||
}
|
||||
|
||||
type remoteStorage interface {
|
||||
Get(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error)
|
||||
Head(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error)
|
||||
Range(ctx context.Context, address oid.Address, rng *objectSDK.Range, requestParams RemoteRequestParams) (*objectSDK.Object, error)
|
||||
|
||||
ForwardRequest(ctx context.Context, info coreclient.NodeInfo, forwarder RequestForwarder) (*objectSDK.Object, error)
|
||||
}
|
||||
|
||||
type multiaddressRemoteStorage struct {
|
||||
client coreclient.MultiAddressClient
|
||||
}
|
||||
|
||||
func (s *multiaddressRemoteStorage) ForwardRequest(ctx context.Context, info coreclient.NodeInfo, forwarder RequestForwarder) (*objectSDK.Object, error) {
|
||||
return forwarder(ctx, info, s.client)
|
||||
}
|
||||
|
||||
func (s *multiaddressRemoteStorage) Range(ctx context.Context, address oid.Address, rng *objectSDK.Range, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
|
||||
var prm internalclient.PayloadRangePrm
|
||||
|
||||
prm.SetClient(s.client)
|
||||
prm.SetTTL(requestParams.TTL)
|
||||
prm.SetNetmapEpoch(requestParams.Epoch)
|
||||
prm.SetAddress(address)
|
||||
prm.SetPrivateKey(requestParams.PrivateKey)
|
||||
prm.SetSessionToken(requestParams.SessionToken)
|
||||
prm.SetBearerToken(requestParams.BearerToken)
|
||||
prm.SetXHeaders(requestParams.XHeaders)
|
||||
prm.SetRange(rng)
|
||||
if requestParams.IsRaw {
|
||||
prm.SetRawFlag()
|
||||
}
|
||||
|
||||
res, err := internalclient.PayloadRange(ctx, prm)
|
||||
if err != nil {
|
||||
var errAccessDenied *apistatus.ObjectAccessDenied
|
||||
if errors.As(err, &errAccessDenied) {
|
||||
obj, err := s.Get(ctx, address, requestParams)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
payload := obj.Payload()
|
||||
from := rng.GetOffset()
|
||||
to := from + rng.GetLength()
|
||||
|
||||
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
|
||||
return nil, new(apistatus.ObjectOutOfRange)
|
||||
}
|
||||
|
||||
return s.payloadOnlyObject(payload[from:to]), nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s.payloadOnlyObject(res.PayloadRange()), nil
|
||||
}
|
||||
|
||||
func (s *multiaddressRemoteStorage) Head(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
|
||||
var prm internalclient.HeadObjectPrm
|
||||
|
||||
prm.SetClient(s.client)
|
||||
prm.SetTTL(requestParams.TTL)
|
||||
prm.SetNetmapEpoch(requestParams.Epoch)
|
||||
prm.SetAddress(address)
|
||||
prm.SetPrivateKey(requestParams.PrivateKey)
|
||||
prm.SetSessionToken(requestParams.SessionToken)
|
||||
prm.SetBearerToken(requestParams.BearerToken)
|
||||
prm.SetXHeaders(requestParams.XHeaders)
|
||||
|
||||
if requestParams.IsRaw {
|
||||
prm.SetRawFlag()
|
||||
}
|
||||
|
||||
res, err := internalclient.HeadObject(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.Header(), nil
|
||||
}
|
||||
|
||||
func (s *multiaddressRemoteStorage) Get(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
|
||||
var prm internalclient.GetObjectPrm
|
||||
|
||||
prm.SetClient(s.client)
|
||||
prm.SetTTL(requestParams.TTL)
|
||||
prm.SetNetmapEpoch(requestParams.Epoch)
|
||||
prm.SetAddress(address)
|
||||
prm.SetPrivateKey(requestParams.PrivateKey)
|
||||
prm.SetSessionToken(requestParams.SessionToken)
|
||||
prm.SetBearerToken(requestParams.BearerToken)
|
||||
prm.SetXHeaders(requestParams.XHeaders)
|
||||
|
||||
if requestParams.IsRaw {
|
||||
prm.SetRawFlag()
|
||||
}
|
||||
|
||||
res, err := internalclient.GetObject(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.Object(), nil
|
||||
}
|
||||
|
||||
func (s *multiaddressRemoteStorage) payloadOnlyObject(payload []byte) *objectSDK.Object {
|
||||
obj := objectSDK.New()
|
||||
obj.SetPayload(payload)
|
||||
|
||||
return obj
|
||||
}
|
||||
|
||||
type RangeHashRes struct {
|
||||
hashes [][]byte
|
||||
}
|
||||
|
||||
func (r *RangeHashRes) Hashes() [][]byte {
|
||||
return r.hashes
|
||||
}
|
|
@ -1,261 +0,0 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"io"
|
||||
|
||||
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
)
|
||||
|
||||
type SimpleObjectWriter struct {
|
||||
obj *object.Object
|
||||
|
||||
pld []byte
|
||||
}
|
||||
|
||||
type clientCacheWrapper struct {
|
||||
cache ClientConstructor
|
||||
}
|
||||
|
||||
type clientWrapper struct {
|
||||
client coreclient.MultiAddressClient
|
||||
}
|
||||
|
||||
type storageEngineWrapper struct {
|
||||
engine *engine.StorageEngine
|
||||
}
|
||||
|
||||
type partWriter struct {
|
||||
ObjectWriter
|
||||
|
||||
headWriter HeaderWriter
|
||||
|
||||
chunkWriter ChunkWriter
|
||||
}
|
||||
|
||||
type hasherWrapper struct {
|
||||
hash io.Writer
|
||||
}
|
||||
|
||||
type nmSrcWrapper struct {
|
||||
nmSrc netmap.Source
|
||||
}
|
||||
|
||||
func NewSimpleObjectWriter() *SimpleObjectWriter {
|
||||
return &SimpleObjectWriter{
|
||||
obj: object.New(),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SimpleObjectWriter) WriteHeader(_ context.Context, obj *object.Object) error {
|
||||
s.obj = obj
|
||||
|
||||
s.pld = make([]byte, 0, obj.PayloadSize())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SimpleObjectWriter) WriteChunk(_ context.Context, p []byte) error {
|
||||
s.pld = append(s.pld, p...)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SimpleObjectWriter) Object() *object.Object {
|
||||
if len(s.pld) > 0 {
|
||||
s.obj.SetPayload(s.pld)
|
||||
}
|
||||
|
||||
return s.obj
|
||||
}
|
||||
|
||||
func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) {
|
||||
clt, err := c.cache.Get(info)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &clientWrapper{
|
||||
client: clt,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *clientWrapper) getObject(ctx context.Context, exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
|
||||
if exec.isForwardingEnabled() {
|
||||
return exec.prm.forwarder(ctx, info, c.client)
|
||||
}
|
||||
|
||||
key, err := exec.key()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if exec.headOnly() {
|
||||
return c.getHeadOnly(ctx, exec, key)
|
||||
}
|
||||
// we don't specify payload writer because we accumulate
|
||||
// the object locally (even huge).
|
||||
if rng := exec.ctxRange(); rng != nil {
|
||||
// Current spec allows other storage node to deny access,
|
||||
// fallback to GET here.
|
||||
return c.getRange(ctx, exec, key, rng)
|
||||
}
|
||||
|
||||
return c.get(ctx, exec, key)
|
||||
}
|
||||
|
||||
func (c *clientWrapper) getRange(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey, rng *object.Range) (*object.Object, error) {
|
||||
var prm internalclient.PayloadRangePrm
|
||||
|
||||
prm.SetClient(c.client)
|
||||
prm.SetTTL(exec.prm.common.TTL())
|
||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
||||
prm.SetAddress(exec.address())
|
||||
prm.SetPrivateKey(key)
|
||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
||||
prm.SetBearerToken(exec.prm.common.BearerToken())
|
||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
||||
prm.SetRange(rng)
|
||||
|
||||
if exec.isRaw() {
|
||||
prm.SetRawFlag()
|
||||
}
|
||||
|
||||
res, err := internalclient.PayloadRange(ctx, prm)
|
||||
if err != nil {
|
||||
var errAccessDenied *apistatus.ObjectAccessDenied
|
||||
if errors.As(err, &errAccessDenied) {
|
||||
obj, err := c.get(ctx, exec, key)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
payload := obj.Payload()
|
||||
from := rng.GetOffset()
|
||||
to := from + rng.GetLength()
|
||||
|
||||
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
|
||||
return nil, new(apistatus.ObjectOutOfRange)
|
||||
}
|
||||
|
||||
return payloadOnlyObject(payload[from:to]), nil
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return payloadOnlyObject(res.PayloadRange()), nil
|
||||
}
|
||||
|
||||
func (c *clientWrapper) getHeadOnly(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
|
||||
var prm internalclient.HeadObjectPrm
|
||||
|
||||
prm.SetClient(c.client)
|
||||
prm.SetTTL(exec.prm.common.TTL())
|
||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
||||
prm.SetAddress(exec.address())
|
||||
prm.SetPrivateKey(key)
|
||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
||||
prm.SetBearerToken(exec.prm.common.BearerToken())
|
||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
||||
|
||||
if exec.isRaw() {
|
||||
prm.SetRawFlag()
|
||||
}
|
||||
|
||||
res, err := internalclient.HeadObject(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.Header(), nil
|
||||
}
|
||||
|
||||
func (c *clientWrapper) get(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
|
||||
var prm internalclient.GetObjectPrm
|
||||
|
||||
prm.SetClient(c.client)
|
||||
prm.SetTTL(exec.prm.common.TTL())
|
||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
||||
prm.SetAddress(exec.address())
|
||||
prm.SetPrivateKey(key)
|
||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
||||
prm.SetBearerToken(exec.prm.common.BearerToken())
|
||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
||||
|
||||
if exec.isRaw() {
|
||||
prm.SetRawFlag()
|
||||
}
|
||||
|
||||
res, err := internalclient.GetObject(ctx, prm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return res.Object(), nil
|
||||
}
|
||||
|
||||
func (e *storageEngineWrapper) get(ctx context.Context, exec *execCtx) (*object.Object, error) {
|
||||
if exec.headOnly() {
|
||||
var headPrm engine.HeadPrm
|
||||
headPrm.WithAddress(exec.address())
|
||||
headPrm.WithRaw(exec.isRaw())
|
||||
|
||||
r, err := e.engine.Head(ctx, headPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r.Header(), nil
|
||||
} else if rng := exec.ctxRange(); rng != nil {
|
||||
var getRange engine.RngPrm
|
||||
getRange.WithAddress(exec.address())
|
||||
getRange.WithPayloadRange(rng)
|
||||
|
||||
r, err := e.engine.GetRange(ctx, getRange)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r.Object(), nil
|
||||
} else {
|
||||
var getPrm engine.GetPrm
|
||||
getPrm.WithAddress(exec.address())
|
||||
|
||||
r, err := e.engine.Get(ctx, getPrm)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return r.Object(), nil
|
||||
}
|
||||
}
|
||||
|
||||
func (w *partWriter) WriteChunk(ctx context.Context, p []byte) error {
|
||||
return w.chunkWriter.WriteChunk(ctx, p)
|
||||
}
|
||||
|
||||
func (w *partWriter) WriteHeader(ctx context.Context, o *object.Object) error {
|
||||
return w.headWriter.WriteHeader(ctx, o)
|
||||
}
|
||||
|
||||
func payloadOnlyObject(payload []byte) *object.Object {
|
||||
obj := object.New()
|
||||
obj.SetPayload(payload)
|
||||
|
||||
return obj
|
||||
}
|
||||
|
||||
func (h *hasherWrapper) WriteChunk(_ context.Context, p []byte) error {
|
||||
_, err := h.hash.Write(p)
|
||||
return err
|
||||
}
|
||||
|
||||
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
|
||||
return n.nmSrc.Epoch()
|
||||
}
|
92
pkg/services/object/get/v2/errors.go
Normal file
|
@ -0,0 +1,92 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
refs "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||
)
|
||||
|
||||
var (
|
||||
errMissingObjAddress = errors.New("missing object address")
|
||||
errWrongMessageSeq = errors.New("incorrect message sequence")
|
||||
errNilObjectPart = errors.New("nil object part")
|
||||
errMissingSignature = errors.New("missing signature")
|
||||
errInvalidObjectIDSign = errors.New("invalid object ID signature")
|
||||
|
||||
errWrongHeaderPartTypeExpShortRecvWithSignature = fmt.Errorf("wrong header part type: expected %T, received %T",
|
||||
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
|
||||
)
|
||||
errWrongHeaderPartTypeExpWithSignRecvShort = fmt.Errorf("wrong header part type: expected %T, received %T",
|
||||
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
|
||||
)
|
||||
)
|
||||
|
||||
func errInvalidObjAddress(err error) error {
|
||||
return fmt.Errorf("invalid object address: %w", err)
|
||||
}
|
||||
|
||||
func errRequestParamsValidation(err error) error {
|
||||
return fmt.Errorf("request params validation: %w", err)
|
||||
}
|
||||
|
||||
func errFetchingSessionKey(err error) error {
|
||||
return fmt.Errorf("fetching session key: %w", err)
|
||||
}
|
||||
|
||||
func errUnknownChechsumType(t refs.ChecksumType) error {
|
||||
return fmt.Errorf("unknown checksum type %v", t)
|
||||
}
|
||||
|
||||
func errResponseVerificationFailed(err error) error {
|
||||
return fmt.Errorf("response verification failed: %w", err)
|
||||
}
|
||||
|
||||
func errCouldNotWriteObjHeader(err error) error {
|
||||
return fmt.Errorf("could not write object header in Get forwarder: %w", err)
|
||||
}
|
||||
|
||||
func errStreamOpenningFailed(err error) error {
|
||||
return fmt.Errorf("stream opening failed: %w", err)
|
||||
}
|
||||
|
||||
func errReadingResponseFailed(err error) error {
|
||||
return fmt.Errorf("reading the response failed: %w", err)
|
||||
}
|
||||
|
||||
func errUnexpectedObjectPart(v objectV2.GetObjectPart) error {
|
||||
return fmt.Errorf("unexpected object part %T", v)
|
||||
}
|
||||
|
||||
func errCouldNotWriteObjChunk(forwarder string, err error) error {
|
||||
return fmt.Errorf("could not write object chunk in %s forwarder: %w", forwarder, err)
|
||||
}
|
||||
|
||||
func errCouldNotVerifyRangeResponse(resp *objectV2.GetRangeResponse, err error) error {
|
||||
return fmt.Errorf("could not verify %T: %w", resp, err)
|
||||
}
|
||||
|
||||
func errCouldNotCreateGetRangeStream(err error) error {
|
||||
return fmt.Errorf("could not create Get payload range stream: %w", err)
|
||||
}
|
||||
|
||||
func errUnexpectedRangePart(v objectV2.GetRangePart) error {
|
||||
return fmt.Errorf("unexpected range type %T", v)
|
||||
}
|
||||
|
||||
func errUnexpectedHeaderPart(v objectV2.GetHeaderPart) error {
|
||||
return fmt.Errorf("unexpected header type %T", v)
|
||||
}
|
||||
|
||||
func errMarshalID(err error) error {
|
||||
return fmt.Errorf("marshal ID: %w", err)
|
||||
}
|
||||
|
||||
func errCantReadSignature(err error) error {
|
||||
return fmt.Errorf("can't read signature: %w", err)
|
||||
}
|
||||
|
||||
func errSendingRequestFailed(err error) error {
|
||||
return fmt.Errorf("sending the request failed: %w", err)
|
||||
}
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
|
@ -71,7 +70,7 @@ func (f *getRequestForwarder) verifyResponse(resp *objectV2.GetResponse, pubkey
|
|||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return fmt.Errorf("response verification failed: %w", err)
|
||||
return errResponseVerificationFailed(err)
|
||||
}
|
||||
|
||||
return checkStatus(resp.GetMetaHeader().GetStatus())
|
||||
|
@ -89,7 +88,7 @@ func (f *getRequestForwarder) writeHeader(ctx context.Context, v *objectV2.GetOb
|
|||
err = f.Stream.WriteHeader(ctx, object.NewFromV2(obj))
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("could not write object header in Get forwarder: %w", err)
|
||||
return errCouldNotWriteObjHeader(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -102,7 +101,7 @@ func (f *getRequestForwarder) openStream(ctx context.Context, addr network.Addre
|
|||
return e
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("stream opening failed: %w", err)
|
||||
return nil, errStreamOpenningFailed(err)
|
||||
}
|
||||
return getStream, nil
|
||||
}
|
||||
|
@ -127,7 +126,7 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr
|
|||
}
|
||||
|
||||
internalclient.ReportError(c, err)
|
||||
return fmt.Errorf("reading the response failed: %w", err)
|
||||
return errReadingResponseFailed(err)
|
||||
}
|
||||
|
||||
if err := f.verifyResponse(resp, pubkey); err != nil {
|
||||
|
@ -136,7 +135,7 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr
|
|||
|
||||
switch v := resp.GetBody().GetObjectPart().(type) {
|
||||
default:
|
||||
return fmt.Errorf("unexpected object part %T", v)
|
||||
return errUnexpectedObjectPart(v)
|
||||
case *objectV2.GetObjectPartInit:
|
||||
if headWas {
|
||||
return errWrongMessageSeq
|
||||
|
@ -159,7 +158,7 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr
|
|||
}
|
||||
|
||||
if err = f.Stream.WriteChunk(ctx, chunk); err != nil {
|
||||
return fmt.Errorf("could not write object chunk in Get forwarder: %w", err)
|
||||
return errCouldNotWriteObjChunk("Get", err)
|
||||
}
|
||||
|
||||
localProgress += len(origChunk)
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
|
@ -73,7 +72,7 @@ func (f *getRangeRequestForwarder) verifyResponse(resp *objectV2.GetRangeRespons
|
|||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||
return fmt.Errorf("could not verify %T: %w", resp, err)
|
||||
return errCouldNotVerifyRangeResponse(resp, err)
|
||||
}
|
||||
|
||||
return checkStatus(resp.GetMetaHeader().GetStatus())
|
||||
|
@ -88,7 +87,7 @@ func (f *getRangeRequestForwarder) openStream(ctx context.Context, addr network.
|
|||
return e
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("could not create Get payload range stream: %w", err)
|
||||
return nil, errCouldNotCreateGetRangeStream(err)
|
||||
}
|
||||
return rangeStream, nil
|
||||
}
|
||||
|
@ -105,7 +104,7 @@ func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *
|
|||
break
|
||||
}
|
||||
internalclient.ReportError(c, err)
|
||||
return fmt.Errorf("reading the response failed: %w", err)
|
||||
return errReadingResponseFailed(err)
|
||||
}
|
||||
|
||||
if err := f.verifyResponse(resp, pubkey); err != nil {
|
||||
|
@ -114,7 +113,7 @@ func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *
|
|||
|
||||
switch v := resp.GetBody().GetRangePart().(type) {
|
||||
case nil:
|
||||
return fmt.Errorf("unexpected range type %T", v)
|
||||
return errUnexpectedRangePart(v)
|
||||
case *objectV2.GetRangePartChunk:
|
||||
origChunk := v.GetChunk()
|
||||
|
||||
|
@ -125,7 +124,7 @@ func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *
|
|||
}
|
||||
|
||||
if err = f.Stream.WriteChunk(ctx, chunk); err != nil {
|
||||
return fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err)
|
||||
return errCouldNotWriteObjChunk("GetRange", err)
|
||||
}
|
||||
|
||||
localProgress += len(origChunk)
|
||||
|
|
|
@ -3,8 +3,6 @@ package getsvc
|
|||
import (
|
||||
"context"
|
||||
"crypto/ecdsa"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||
|
@ -74,7 +72,7 @@ func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr ne
|
|||
|
||||
switch v := headResp.GetBody().GetHeaderPart().(type) {
|
||||
case nil:
|
||||
return nil, fmt.Errorf("unexpected header type %T", v)
|
||||
return nil, errUnexpectedHeaderPart(v)
|
||||
case *objectV2.ShortHeader:
|
||||
if hdr, err = f.getHeaderFromShortHeader(v); err != nil {
|
||||
return nil, err
|
||||
|
@ -100,9 +98,7 @@ func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr ne
|
|||
|
||||
func (f *headRequestForwarder) getHeaderFromShortHeader(sh *objectV2.ShortHeader) (*objectV2.Header, error) {
|
||||
if !f.Request.GetBody().GetMainOnly() {
|
||||
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
|
||||
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
|
||||
)
|
||||
return nil, errWrongHeaderPartTypeExpShortRecvWithSignature
|
||||
}
|
||||
|
||||
hdr := new(objectV2.Header)
|
||||
|
@ -118,35 +114,32 @@ func (f *headRequestForwarder) getHeaderFromShortHeader(sh *objectV2.ShortHeader
|
|||
|
||||
func (f *headRequestForwarder) getHeaderAndSignature(hdrWithSig *objectV2.HeaderWithSignature) (*objectV2.Header, *refs.Signature, error) {
|
||||
if f.Request.GetBody().GetMainOnly() {
|
||||
return nil, nil, fmt.Errorf("wrong header part type: expected %T, received %T",
|
||||
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
|
||||
)
|
||||
return nil, nil, errWrongHeaderPartTypeExpWithSignRecvShort
|
||||
}
|
||||
|
||||
if hdrWithSig == nil {
|
||||
return nil, nil, errors.New("nil object part")
|
||||
return nil, nil, errNilObjectPart
|
||||
}
|
||||
|
||||
hdr := hdrWithSig.GetHeader()
|
||||
idSig := hdrWithSig.GetSignature()
|
||||
|
||||
if idSig == nil {
|
||||
// TODO(@cthulhu-rider): #1387 use "const" error
|
||||
return nil, nil, errors.New("missing signature")
|
||||
return nil, nil, errMissingSignature
|
||||
}
|
||||
|
||||
binID, err := f.ObjectAddr.Object().Marshal()
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("marshal ID: %w", err)
|
||||
return nil, nil, errMarshalID(err)
|
||||
}
|
||||
|
||||
var sig frostfscrypto.Signature
|
||||
if err := sig.ReadFromV2(*idSig); err != nil {
|
||||
return nil, nil, fmt.Errorf("can't read signature: %w", err)
|
||||
return nil, nil, errCantReadSignature(err)
|
||||
}
|
||||
|
||||
if !sig.Verify(binID) {
|
||||
return nil, nil, errors.New("invalid object ID signature")
|
||||
return nil, nil, errInvalidObjectIDSign
|
||||
}
|
||||
|
||||
return hdr, idSig, nil
|
||||
|
@ -160,7 +153,7 @@ func (f *headRequestForwarder) sendHeadRequest(ctx context.Context, addr network
|
|||
return e
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("sending the request failed: %w", err)
|
||||
return nil, errSendingRequestFailed(err)
|
||||
}
|
||||
return headResp, nil
|
||||
}
|
||||
|
@ -173,7 +166,7 @@ func (f *headRequestForwarder) verifyResponse(headResp *objectV2.HeadResponse, p
|
|||
|
||||
// verify response structure
|
||||
if err := signature.VerifyServiceMessage(headResp); err != nil {
|
||||
return fmt.Errorf("response verification failed: %w", err)
|
||||
return errResponseVerificationFailed(err)
|
||||
}
|
||||
|
||||
return checkStatus(f.Response.GetMetaHeader().GetStatus())
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"crypto/sha256"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash"
|
||||
"sync"
|
||||
|
||||
|
@ -24,21 +23,19 @@ import (
|
|||
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
||||
)
|
||||
|
||||
var errWrongMessageSeq = errors.New("incorrect message sequence")
|
||||
|
||||
func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) {
|
||||
body := req.GetBody()
|
||||
|
||||
addrV2 := body.GetAddress()
|
||||
if addrV2 == nil {
|
||||
return nil, errors.New("missing object address")
|
||||
return nil, errMissingObjAddress
|
||||
}
|
||||
|
||||
var addr oid.Address
|
||||
|
||||
err := addr.ReadFromV2(*addrV2)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid object address: %w", err)
|
||||
return nil, errInvalidObjAddress(err)
|
||||
}
|
||||
|
||||
commonPrm, err := util.CommonPrmFromV2(req)
|
||||
|
@ -81,14 +78,14 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
|
|||
|
||||
addrV2 := body.GetAddress()
|
||||
if addrV2 == nil {
|
||||
return nil, errors.New("missing object address")
|
||||
return nil, errMissingObjAddress
|
||||
}
|
||||
|
||||
var addr oid.Address
|
||||
|
||||
err := addr.ReadFromV2(*addrV2)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid object address: %w", err)
|
||||
return nil, errInvalidObjAddress(err)
|
||||
}
|
||||
|
||||
commonPrm, err := util.CommonPrmFromV2(req)
|
||||
|
@ -108,7 +105,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
|
|||
|
||||
err = p.Validate()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("request params validation: %w", err)
|
||||
return nil, errRequestParamsValidation(err)
|
||||
}
|
||||
|
||||
if !commonPrm.LocalOnly() {
|
||||
|
@ -136,14 +133,14 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran
|
|||
|
||||
addrV2 := body.GetAddress()
|
||||
if addrV2 == nil {
|
||||
return nil, errors.New("missing object address")
|
||||
return nil, errMissingObjAddress
|
||||
}
|
||||
|
||||
var addr oid.Address
|
||||
|
||||
err := addr.ReadFromV2(*addrV2)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid object address: %w", err)
|
||||
return nil, errInvalidObjAddress(err)
|
||||
}
|
||||
|
||||
commonPrm, err := util.CommonPrmFromV2(req)
|
||||
|
@ -167,7 +164,7 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran
|
|||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("fetching session key: %w", err)
|
||||
return nil, errFetchingSessionKey(err)
|
||||
}
|
||||
|
||||
p.WithCachedSignerKey(signerKey)
|
||||
|
@ -185,7 +182,7 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran
|
|||
|
||||
switch t := body.GetType(); t {
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown checksum type %v", t)
|
||||
return nil, errUnknownChechsumType(t)
|
||||
case refs.SHA256:
|
||||
p.SetHashGenerator(func() hash.Hash {
|
||||
return sha256.New()
|
||||
|
@ -220,14 +217,14 @@ func (s *Service) toHeadPrm(req *objectV2.HeadRequest, resp *objectV2.HeadRespon
|
|||
|
||||
addrV2 := body.GetAddress()
|
||||
if addrV2 == nil {
|
||||
return nil, errors.New("missing object address")
|
||||
return nil, errMissingObjAddress
|
||||
}
|
||||
|
||||
var objAddr oid.Address
|
||||
|
||||
err := objAddr.ReadFromV2(*addrV2)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid object address: %w", err)
|
||||
return nil, errInvalidObjAddress(err)
|
||||
}
|
||||
|
||||
commonPrm, err := util.CommonPrmFromV2(req)
|
||||
|
|
84
pkg/services/object/get/writer.go
Normal file
|
@ -0,0 +1,84 @@
|
|||
package getsvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||
)
|
||||
|
||||
// ChunkWriter is an interface of target component
|
||||
// to write payload chunk.
|
||||
type ChunkWriter interface {
|
||||
WriteChunk(context.Context, []byte) error
|
||||
}
|
||||
|
||||
// HeaderWriter is an interface of target component
|
||||
// to write object header.
|
||||
type HeaderWriter interface {
|
||||
WriteHeader(context.Context, *object.Object) error
|
||||
}
|
||||
|
||||
// ObjectWriter is an interface of target component to write object.
|
||||
type ObjectWriter interface {
|
||||
HeaderWriter
|
||||
ChunkWriter
|
||||
}
|
||||
|
||||
type SimpleObjectWriter struct {
|
||||
obj *object.Object
|
||||
|
||||
pld []byte
|
||||
}
|
||||
|
||||
type partWriter struct {
|
||||
ObjectWriter
|
||||
|
||||
headWriter HeaderWriter
|
||||
|
||||
chunkWriter ChunkWriter
|
||||
}
|
||||
|
||||
type hasherWrapper struct {
|
||||
hash io.Writer
|
||||
}
|
||||
|
||||
func NewSimpleObjectWriter() *SimpleObjectWriter {
|
||||
return &SimpleObjectWriter{
|
||||
obj: object.New(),
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SimpleObjectWriter) WriteHeader(_ context.Context, obj *object.Object) error {
|
||||
s.obj = obj
|
||||
|
||||
s.pld = make([]byte, 0, obj.PayloadSize())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SimpleObjectWriter) WriteChunk(_ context.Context, p []byte) error {
|
||||
s.pld = append(s.pld, p...)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SimpleObjectWriter) Object() *object.Object {
|
||||
if len(s.pld) > 0 {
|
||||
s.obj.SetPayload(s.pld)
|
||||
}
|
||||
|
||||
return s.obj
|
||||
}
|
||||
|
||||
func (w *partWriter) WriteChunk(ctx context.Context, p []byte) error {
|
||||
return w.chunkWriter.WriteChunk(ctx, p)
|
||||
}
|
||||
|
||||
func (w *partWriter) WriteHeader(ctx context.Context, o *object.Object) error {
|
||||
return w.headWriter.WriteHeader(ctx, o)
|
||||
}
|
||||
|
||||
func (h *hasherWrapper) WriteChunk(_ context.Context, p []byte) error {
|
||||
_, err := h.hash.Write(p)
|
||||
return err
|
||||
}
|
what does it mean? 1m was not enough for me to understand "detached" here
renamed