Refactor getsvc #277
23 changed files with 1046 additions and 939 deletions
|
@ -336,17 +336,14 @@ func createGetService(c *cfg, keyStorage *util.KeyStorage, traverseGen *util.Tra
|
||||||
ls := c.cfgObject.cfgLocalStorage.localStorage
|
ls := c.cfgObject.cfgLocalStorage.localStorage
|
||||||
|
|
||||||
return getsvc.New(
|
return getsvc.New(
|
||||||
getsvc.WithLogger(c.log),
|
keyStorage,
|
||||||
getsvc.WithLocalStorageEngine(ls),
|
c.netMapSource,
|
||||||
getsvc.WithClientConstructor(coreConstructor),
|
ls,
|
||||||
getsvc.WithTraverserGenerator(
|
|
||||||
traverseGen.WithTraverseOptions(
|
traverseGen.WithTraverseOptions(
|
||||||
placement.SuccessAfter(1),
|
placement.SuccessAfter(1),
|
||||||
),
|
),
|
||||||
),
|
coreConstructor,
|
||||||
getsvc.WithNetMapSource(c.netMapSource),
|
getsvc.WithLogger(c.log))
|
||||||
getsvc.WithKeyStorage(keyStorage),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func createGetServiceV2(sGet *getsvc.Service, keyStorage *util.KeyStorage) *getsvcV2.Service {
|
func createGetServiceV2(sGet *getsvc.Service, keyStorage *util.KeyStorage) *getsvcV2.Service {
|
||||||
|
|
|
@ -11,9 +11,9 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (exec *execCtx) assemble(ctx context.Context) {
|
func (r *request) assemble(ctx context.Context) {
|
||||||
if !exec.canAssemble() {
|
if !r.canAssemble() {
|
||||||
exec.log.Debug(logs.GetCanNotAssembleTheObject)
|
r.log.Debug(logs.GetCanNotAssembleTheObject)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,35 +28,35 @@ func (exec *execCtx) assemble(ctx context.Context) {
|
||||||
// - the assembly process is expected to be handled on a container node
|
// - the assembly process is expected to be handled on a container node
|
||||||
// only since the requests forwarding mechanism presentation; such the
|
// only since the requests forwarding mechanism presentation; such the
|
||||||
// node should have enough rights for getting any child object by design.
|
// node should have enough rights for getting any child object by design.
|
||||||
exec.prm.common.ForgetTokens()
|
r.prm.common.ForgetTokens()
|
||||||
|
|
||||||
// Do not use forwarding during assembly stage.
|
// Do not use forwarding during assembly stage.
|
||||||
// Request forwarding closure inherited in produced
|
// Request forwarding closure inherited in produced
|
||||||
// `execCtx` so it should be disabled there.
|
// `execCtx` so it should be disabled there.
|
||||||
exec.disableForwarding()
|
r.disableForwarding()
|
||||||
|
|
||||||
exec.log.Debug(logs.GetTryingToAssembleTheObject)
|
r.log.Debug(logs.GetTryingToAssembleTheObject)
|
||||||
|
|
||||||
assembler := newAssembler(exec.address(), exec.splitInfo(), exec.ctxRange(), exec)
|
assembler := newAssembler(r.address(), r.splitInfo(), r.ctxRange(), r)
|
||||||
|
|
||||||
exec.log.Debug(logs.GetAssemblingSplittedObject,
|
r.log.Debug(logs.GetAssemblingSplittedObject,
|
||||||
zap.Stringer("address", exec.address()),
|
zap.Stringer("address", r.address()),
|
||||||
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
zap.Uint64("range_length", exec.ctxRange().GetLength()),
|
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||||
)
|
)
|
||||||
defer exec.log.Debug(logs.GetAssemblingSplittedObjectCompleted,
|
defer r.log.Debug(logs.GetAssemblingSplittedObjectCompleted,
|
||||||
zap.Stringer("address", exec.address()),
|
zap.Stringer("address", r.address()),
|
||||||
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
zap.Uint64("range_length", exec.ctxRange().GetLength()),
|
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||||
)
|
)
|
||||||
|
|
||||||
obj, err := assembler.Assemble(ctx, exec.prm.objWriter)
|
obj, err := assembler.Assemble(ctx, r.prm.objWriter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
exec.log.Warn(logs.GetFailedToAssembleSplittedObject,
|
r.log.Warn(logs.GetFailedToAssembleSplittedObject,
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
zap.Stringer("address", exec.address()),
|
zap.Stringer("address", r.address()),
|
||||||
zap.Uint64("range_offset", exec.ctxRange().GetOffset()),
|
zap.Uint64("range_offset", r.ctxRange().GetOffset()),
|
||||||
zap.Uint64("range_length", exec.ctxRange().GetLength()),
|
zap.Uint64("range_length", r.ctxRange().GetLength()),
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,27 +68,27 @@ func (exec *execCtx) assemble(ctx context.Context) {
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
default:
|
default:
|
||||||
exec.status = statusUndefined
|
r.status = statusUndefined
|
||||||
exec.err = err
|
r.err = err
|
||||||
case err == nil:
|
case err == nil:
|
||||||
exec.status = statusOK
|
r.status = statusOK
|
||||||
exec.err = nil
|
r.err = nil
|
||||||
exec.collectedObject = obj
|
r.collectedObject = obj
|
||||||
case errors.As(err, &errRemovedRemote):
|
case errors.As(err, &errRemovedRemote):
|
||||||
exec.status = statusINHUMED
|
r.status = statusINHUMED
|
||||||
exec.err = errRemovedRemote
|
r.err = errRemovedRemote
|
||||||
case errors.As(err, &errRemovedLocal):
|
case errors.As(err, &errRemovedLocal):
|
||||||
exec.status = statusINHUMED
|
r.status = statusINHUMED
|
||||||
exec.err = errRemovedLocal
|
r.err = errRemovedLocal
|
||||||
case errors.As(err, &errSplitInfo):
|
case errors.As(err, &errSplitInfo):
|
||||||
exec.status = statusVIRTUAL
|
r.status = statusVIRTUAL
|
||||||
exec.err = errSplitInfo
|
r.err = errSplitInfo
|
||||||
case errors.As(err, &errOutOfRangeRemote):
|
case errors.As(err, &errOutOfRangeRemote):
|
||||||
exec.status = statusOutOfRange
|
r.status = statusOutOfRange
|
||||||
exec.err = errOutOfRangeRemote
|
r.err = errOutOfRangeRemote
|
||||||
case errors.As(err, &errOutOfRangeLocal):
|
case errors.As(err, &errOutOfRangeLocal):
|
||||||
exec.status = statusOutOfRange
|
r.status = statusOutOfRange
|
||||||
exec.err = errOutOfRangeLocal
|
r.err = errOutOfRangeLocal
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,42 +96,54 @@ func equalAddresses(a, b oid.Address) bool {
|
||||||
return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object())
|
return a.Container().Equals(b.Container()) && a.Object().Equals(b.Object())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *execCtx) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) {
|
func (r *request) HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error) {
|
||||||
p := exec.prm
|
|
||||||
p.common = p.common.WithLocalOnly(false)
|
|
||||||
p.addr.SetContainer(exec.containerID())
|
|
||||||
p.addr.SetObject(id)
|
|
||||||
|
|
||||||
prm := HeadPrm{
|
|
||||||
commonPrm: p.commonPrm,
|
|
||||||
}
|
|
||||||
|
|
||||||
w := NewSimpleObjectWriter()
|
w := NewSimpleObjectWriter()
|
||||||
prm.SetHeaderWriter(w)
|
|
||||||
err := exec.svc.Head(ctx, prm)
|
|
||||||
|
|
||||||
if err != nil {
|
p := RequestParameters{}
|
||||||
|
p.common = p.common.WithLocalOnly(false)
|
||||||
|
p.addr.SetContainer(r.containerID())
|
||||||
|
p.addr.SetObject(id)
|
||||||
|
p.head = true
|
||||||
|
p.SetHeaderWriter(w)
|
||||||
|
|
||||||
|
if err := r.getObjectWithIndependentRequest(ctx, p); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return w.Object(), nil
|
return w.Object(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *execCtx) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Range) (*objectSDK.Object, error) {
|
func (r *request) GetObject(ctx context.Context, id oid.ID, rng *objectSDK.Range) (*objectSDK.Object, error) {
|
||||||
w := NewSimpleObjectWriter()
|
w := NewSimpleObjectWriter()
|
||||||
|
|
||||||
p := exec.prm
|
p := r.prm
|
||||||
p.common = p.common.WithLocalOnly(false)
|
p.common = p.common.WithLocalOnly(false)
|
||||||
p.objWriter = w
|
p.objWriter = w
|
||||||
p.SetRange(rng)
|
p.rng = rng
|
||||||
|
|
||||||
p.addr.SetContainer(exec.containerID())
|
p.addr.SetContainer(r.containerID())
|
||||||
p.addr.SetObject(id)
|
p.addr.SetObject(id)
|
||||||
|
|
||||||
statusError := exec.svc.get(ctx, p.commonPrm, withPayloadRange(rng))
|
if err := r.getObjectWithIndependentRequest(ctx, p); err != nil {
|
||||||
|
return nil, err
|
||||||
if statusError.err != nil {
|
|
||||||
return nil, statusError.err
|
|
||||||
}
|
}
|
||||||
return w.Object(), nil
|
return w.Object(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *request) getObjectWithIndependentRequest(ctx context.Context, prm RequestParameters) error {
|
||||||
|
detachedExecutor := &request{
|
||||||
|
keyStore: r.keyStore,
|
||||||
|
traverserGenerator: r.traverserGenerator,
|
||||||
|
remoteStorageConstructor: r.remoteStorageConstructor,
|
||||||
|
epochSource: r.epochSource,
|
||||||
|
localStorage: r.localStorage,
|
||||||
|
|
||||||
|
prm: prm,
|
||||||
|
infoSplit: objectSDK.NewSplitInfo(),
|
||||||
|
log: r.log,
|
||||||
|
}
|
||||||
|
|
||||||
|
detachedExecutor.execute(ctx)
|
||||||
|
|
||||||
|
return detachedExecutor.statusError.err
|
||||||
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package getsvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/object"
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
@ -15,10 +14,6 @@ type objectGetter interface {
|
||||||
HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error)
|
HeadObject(ctx context.Context, id oid.ID) (*objectSDK.Object, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
|
||||||
errParentAddressDiffers = errors.New("parent address in child object differs")
|
|
||||||
)
|
|
||||||
|
|
||||||
type assembler struct {
|
type assembler struct {
|
||||||
addr oid.Address
|
addr oid.Address
|
||||||
splitInfo *objectSDK.SplitInfo
|
splitInfo *objectSDK.SplitInfo
|
||||||
|
@ -89,7 +84,7 @@ func (a *assembler) initializeFromSourceObjectID(ctx context.Context, id oid.ID)
|
||||||
|
|
||||||
parentObject := sourceObject.Parent()
|
parentObject := sourceObject.Parent()
|
||||||
if parentObject == nil {
|
if parentObject == nil {
|
||||||
return nil, nil, errors.New("received child with empty parent")
|
return nil, nil, errChildWithEmptyParent
|
||||||
}
|
}
|
||||||
|
|
||||||
a.parentObject = parentObject
|
a.parentObject = parentObject
|
||||||
|
|
|
@ -8,26 +8,26 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (exec *execCtx) executeOnContainer(ctx context.Context) {
|
func (r *request) executeOnContainer(ctx context.Context) {
|
||||||
if exec.isLocal() {
|
if r.isLocal() {
|
||||||
exec.log.Debug(logs.GetReturnResultDirectly)
|
r.log.Debug(logs.GetReturnResultDirectly)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
lookupDepth := exec.netmapLookupDepth()
|
lookupDepth := r.netmapLookupDepth()
|
||||||
|
|
||||||
exec.log.Debug(logs.TryingToExecuteInContainer,
|
r.log.Debug(logs.TryingToExecuteInContainer,
|
||||||
zap.Uint64("netmap lookup depth", lookupDepth),
|
zap.Uint64("netmap lookup depth", lookupDepth),
|
||||||
)
|
)
|
||||||
|
|
||||||
// initialize epoch number
|
// initialize epoch number
|
||||||
ok := exec.initEpoch()
|
ok := r.initEpoch()
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if exec.processCurrentEpoch(ctx) {
|
if r.processCurrentEpoch(ctx) {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -39,16 +39,16 @@ func (exec *execCtx) executeOnContainer(ctx context.Context) {
|
||||||
lookupDepth--
|
lookupDepth--
|
||||||
|
|
||||||
// go to the previous epoch
|
// go to the previous epoch
|
||||||
exec.curProcEpoch--
|
r.curProcEpoch--
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
|
func (r *request) processCurrentEpoch(ctx context.Context) bool {
|
||||||
exec.log.Debug(logs.ProcessEpoch,
|
r.log.Debug(logs.ProcessEpoch,
|
||||||
zap.Uint64("number", exec.curProcEpoch),
|
zap.Uint64("number", r.curProcEpoch),
|
||||||
)
|
)
|
||||||
|
|
||||||
traverser, ok := exec.generateTraverser(exec.address())
|
traverser, ok := r.generateTraverser(r.address())
|
||||||
if !ok {
|
if !ok {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -56,12 +56,12 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
exec.status = statusUndefined
|
r.status = statusUndefined
|
||||||
|
|
||||||
for {
|
for {
|
||||||
addrs := traverser.Next()
|
addrs := traverser.Next()
|
||||||
if len(addrs) == 0 {
|
if len(addrs) == 0 {
|
||||||
exec.log.Debug(logs.NoMoreNodesAbortPlacementIteration)
|
r.log.Debug(logs.NoMoreNodesAbortPlacementIteration)
|
||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
@ -69,8 +69,8 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
|
||||||
for i := range addrs {
|
for i := range addrs {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
exec.log.Debug(logs.InterruptPlacementIterationByContext,
|
r.log.Debug(logs.InterruptPlacementIterationByContext,
|
||||||
zap.String("error", ctx.Err().Error()),
|
zap.Error(ctx.Err()),
|
||||||
)
|
)
|
||||||
|
|
||||||
return true
|
return true
|
||||||
|
@ -84,8 +84,8 @@ func (exec *execCtx) processCurrentEpoch(ctx context.Context) bool {
|
||||||
|
|
||||||
client.NodeInfoFromNetmapElement(&info, addrs[i])
|
client.NodeInfoFromNetmapElement(&info, addrs[i])
|
||||||
|
|
||||||
if exec.processNode(ctx, info) {
|
if r.processNode(ctx, info) {
|
||||||
exec.log.Debug(logs.GetCompletingTheOperation)
|
r.log.Debug(logs.GetCompletingTheOperation)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
10
pkg/services/object/get/errors.go
Normal file
10
pkg/services/object/get/errors.go
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
package getsvc
|
||||||
|
|
||||||
|
import "errors"
|
||||||
|
|
||||||
|
var (
|
||||||
|
errRangeZeroLength = errors.New("zero range length")
|
||||||
|
errRangeOverflow = errors.New("range overflow")
|
||||||
|
errChildWithEmptyParent = errors.New("received child with empty parent")
|
||||||
|
errParentAddressDiffers = errors.New("parent address in child object differs")
|
||||||
|
)
|
|
@ -1,279 +0,0 @@
|
||||||
package getsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"crypto/ecdsa"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
|
||||||
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
||||||
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
"go.uber.org/zap"
|
|
||||||
)
|
|
||||||
|
|
||||||
type statusError struct {
|
|
||||||
status int
|
|
||||||
err error
|
|
||||||
}
|
|
||||||
|
|
||||||
type execCtx struct {
|
|
||||||
svc *Service
|
|
||||||
|
|
||||||
prm RangePrm
|
|
||||||
|
|
||||||
statusError
|
|
||||||
|
|
||||||
infoSplit *objectSDK.SplitInfo
|
|
||||||
|
|
||||||
log *logger.Logger
|
|
||||||
|
|
||||||
collectedObject *objectSDK.Object
|
|
||||||
|
|
||||||
head bool
|
|
||||||
|
|
||||||
curProcEpoch uint64
|
|
||||||
}
|
|
||||||
|
|
||||||
type execOption func(*execCtx)
|
|
||||||
|
|
||||||
const (
|
|
||||||
statusUndefined int = iota
|
|
||||||
statusOK
|
|
||||||
statusINHUMED
|
|
||||||
statusVIRTUAL
|
|
||||||
statusOutOfRange
|
|
||||||
)
|
|
||||||
|
|
||||||
func headOnly() execOption {
|
|
||||||
return func(c *execCtx) {
|
|
||||||
c.head = true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func withPayloadRange(r *objectSDK.Range) execOption {
|
|
||||||
return func(c *execCtx) {
|
|
||||||
c.prm.rng = r
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec *execCtx) setLogger(l *logger.Logger) {
|
|
||||||
req := "GET"
|
|
||||||
if exec.headOnly() {
|
|
||||||
req = "HEAD"
|
|
||||||
} else if exec.ctxRange() != nil {
|
|
||||||
req = "GET_RANGE"
|
|
||||||
}
|
|
||||||
|
|
||||||
exec.log = &logger.Logger{Logger: l.With(
|
|
||||||
zap.String("request", req),
|
|
||||||
zap.Stringer("address", exec.address()),
|
|
||||||
zap.Bool("raw", exec.isRaw()),
|
|
||||||
zap.Bool("local", exec.isLocal()),
|
|
||||||
zap.Bool("with session", exec.prm.common.SessionToken() != nil),
|
|
||||||
zap.Bool("with bearer", exec.prm.common.BearerToken() != nil),
|
|
||||||
)}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec execCtx) isLocal() bool {
|
|
||||||
return exec.prm.common.LocalOnly()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec execCtx) isRaw() bool {
|
|
||||||
return exec.prm.raw
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec execCtx) address() oid.Address {
|
|
||||||
return exec.prm.addr
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec execCtx) key() (*ecdsa.PrivateKey, error) {
|
|
||||||
if exec.prm.signerKey != nil {
|
|
||||||
// the key has already been requested and
|
|
||||||
// cached in the previous operations
|
|
||||||
return exec.prm.signerKey, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
var sessionInfo *util.SessionInfo
|
|
||||||
|
|
||||||
if tok := exec.prm.common.SessionToken(); tok != nil {
|
|
||||||
sessionInfo = &util.SessionInfo{
|
|
||||||
ID: tok.ID(),
|
|
||||||
Owner: tok.Issuer(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return exec.svc.keyStore.GetKey(sessionInfo)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec *execCtx) canAssemble() bool {
|
|
||||||
return !exec.isRaw() && !exec.headOnly()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec *execCtx) splitInfo() *objectSDK.SplitInfo {
|
|
||||||
return exec.infoSplit
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec *execCtx) containerID() cid.ID {
|
|
||||||
return exec.address().Container()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec *execCtx) ctxRange() *objectSDK.Range {
|
|
||||||
return exec.prm.rng
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec *execCtx) headOnly() bool {
|
|
||||||
return exec.head
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec *execCtx) netmapEpoch() uint64 {
|
|
||||||
return exec.prm.common.NetmapEpoch()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec *execCtx) netmapLookupDepth() uint64 {
|
|
||||||
return exec.prm.common.NetmapLookupDepth()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec *execCtx) initEpoch() bool {
|
|
||||||
exec.curProcEpoch = exec.netmapEpoch()
|
|
||||||
if exec.curProcEpoch > 0 {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
e, err := exec.svc.currentEpochReceiver.currentEpoch()
|
|
||||||
|
|
||||||
switch {
|
|
||||||
default:
|
|
||||||
exec.status = statusUndefined
|
|
||||||
exec.err = err
|
|
||||||
|
|
||||||
exec.log.Debug(logs.CouldNotGetCurrentEpochNumber,
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
|
|
||||||
return false
|
|
||||||
case err == nil:
|
|
||||||
exec.curProcEpoch = e
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec *execCtx) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
|
|
||||||
obj := addr.Object()
|
|
||||||
|
|
||||||
t, err := exec.svc.traverserGenerator.GenerateTraverser(addr.Container(), &obj, exec.curProcEpoch)
|
|
||||||
|
|
||||||
switch {
|
|
||||||
default:
|
|
||||||
exec.status = statusUndefined
|
|
||||||
exec.err = err
|
|
||||||
|
|
||||||
exec.log.Debug(logs.GetCouldNotGenerateContainerTraverser,
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
|
|
||||||
return nil, false
|
|
||||||
case err == nil:
|
|
||||||
return t, true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec execCtx) remoteClient(info clientcore.NodeInfo) (getClient, bool) {
|
|
||||||
c, err := exec.svc.clientCache.get(info)
|
|
||||||
|
|
||||||
switch {
|
|
||||||
default:
|
|
||||||
exec.status = statusUndefined
|
|
||||||
exec.err = err
|
|
||||||
|
|
||||||
exec.log.Debug(logs.GetCouldNotConstructRemoteNodeClient)
|
|
||||||
case err == nil:
|
|
||||||
return c, true
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
|
|
||||||
func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
|
|
||||||
if last, ok := src.LastPart(); ok {
|
|
||||||
dst.SetLastPart(last)
|
|
||||||
}
|
|
||||||
|
|
||||||
if link, ok := src.Link(); ok {
|
|
||||||
dst.SetLink(link)
|
|
||||||
}
|
|
||||||
|
|
||||||
if splitID := src.SplitID(); splitID != nil {
|
|
||||||
dst.SetSplitID(splitID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec *execCtx) writeCollectedHeader(ctx context.Context) bool {
|
|
||||||
if exec.ctxRange() != nil {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
err := exec.prm.objWriter.WriteHeader(
|
|
||||||
ctx,
|
|
||||||
exec.collectedObject.CutPayload(),
|
|
||||||
)
|
|
||||||
|
|
||||||
switch {
|
|
||||||
default:
|
|
||||||
exec.status = statusUndefined
|
|
||||||
exec.err = err
|
|
||||||
|
|
||||||
exec.log.Debug(logs.GetCouldNotWriteHeader,
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
case err == nil:
|
|
||||||
exec.status = statusOK
|
|
||||||
exec.err = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return exec.status == statusOK
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec *execCtx) writeObjectPayload(ctx context.Context, obj *objectSDK.Object) bool {
|
|
||||||
if exec.headOnly() {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
err := exec.prm.objWriter.WriteChunk(ctx, obj.Payload())
|
|
||||||
|
|
||||||
switch {
|
|
||||||
default:
|
|
||||||
exec.status = statusUndefined
|
|
||||||
exec.err = err
|
|
||||||
|
|
||||||
exec.log.Debug(logs.GetCouldNotWritePayloadChunk,
|
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
case err == nil:
|
|
||||||
exec.status = statusOK
|
|
||||||
exec.err = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return err == nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (exec *execCtx) writeCollectedObject(ctx context.Context) {
|
|
||||||
if ok := exec.writeCollectedHeader(ctx); ok {
|
|
||||||
exec.writeObjectPayload(ctx, exec.collectedObject)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// isForwardingEnabled returns true if common execution
|
|
||||||
// parameters has request forwarding closure set.
|
|
||||||
func (exec execCtx) isForwardingEnabled() bool {
|
|
||||||
return exec.prm.forwarder != nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// disableForwarding removes request forwarding closure from common
|
|
||||||
// parameters, so it won't be inherited in new execution contexts.
|
|
||||||
func (exec *execCtx) disableForwarding() {
|
|
||||||
exec.prm.SetRequestForwarder(nil)
|
|
||||||
}
|
|
|
@ -11,18 +11,18 @@ import (
|
||||||
|
|
||||||
// Get serves a request to get an object by address, and returns Streamer instance.
|
// Get serves a request to get an object by address, and returns Streamer instance.
|
||||||
func (s *Service) Get(ctx context.Context, prm Prm) error {
|
func (s *Service) Get(ctx context.Context, prm Prm) error {
|
||||||
return s.get(ctx, prm.commonPrm).err
|
return s.get(ctx, RequestParameters{
|
||||||
|
commonPrm: prm.commonPrm,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRange serves a request to get an object by address, and returns Streamer instance.
|
// GetRange serves a request to get an object by address, and returns Streamer instance.
|
||||||
func (s *Service) GetRange(ctx context.Context, prm RangePrm) error {
|
func (s *Service) GetRange(ctx context.Context, prm RangePrm) error {
|
||||||
return s.getRange(ctx, prm)
|
return s.get(ctx, RequestParameters{
|
||||||
|
commonPrm: prm.commonPrm,
|
||||||
|
rng: prm.rng,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) getRange(ctx context.Context, prm RangePrm, opts ...execOption) error {
|
|
||||||
return s.get(ctx, prm.commonPrm, append(opts, withPayloadRange(prm.rng))...).err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHashRes, error) {
|
func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHashRes, error) {
|
||||||
hashes := make([][]byte, 0, len(prm.rngs))
|
hashes := make([][]byte, 0, len(prm.rngs))
|
||||||
|
|
||||||
|
@ -34,16 +34,15 @@ func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHas
|
||||||
// 1. Potential gains are insignificant when operating in the Internet given typical latencies and losses.
|
// 1. Potential gains are insignificant when operating in the Internet given typical latencies and losses.
|
||||||
// 2. Parallel solution is more complex in terms of code.
|
// 2. Parallel solution is more complex in terms of code.
|
||||||
// 3. TZ-hash is likely to be disabled in private installations.
|
// 3. TZ-hash is likely to be disabled in private installations.
|
||||||
rngPrm := RangePrm{
|
reqPrm := RequestParameters{
|
||||||
commonPrm: prm.commonPrm,
|
commonPrm: prm.commonPrm,
|
||||||
|
rng: &rng,
|
||||||
}
|
}
|
||||||
|
reqPrm.SetChunkWriter(&hasherWrapper{
|
||||||
rngPrm.SetRange(&rng)
|
|
||||||
rngPrm.SetChunkWriter(&hasherWrapper{
|
|
||||||
hash: util.NewSaltingWriter(h, prm.salt),
|
hash: util.NewSaltingWriter(h, prm.salt),
|
||||||
})
|
})
|
||||||
|
|
||||||
if err := s.getRange(ctx, rngPrm); err != nil {
|
if err := s.get(ctx, reqPrm); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,30 +59,32 @@ func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHas
|
||||||
// Returns ErrNotFound if the header was not received for the call.
|
// Returns ErrNotFound if the header was not received for the call.
|
||||||
// Returns SplitInfoError if object is virtual and raw flag is set.
|
// Returns SplitInfoError if object is virtual and raw flag is set.
|
||||||
func (s *Service) Head(ctx context.Context, prm HeadPrm) error {
|
func (s *Service) Head(ctx context.Context, prm HeadPrm) error {
|
||||||
return s.get(ctx, prm.commonPrm, headOnly()).err
|
return s.get(ctx, RequestParameters{
|
||||||
|
head: true,
|
||||||
|
commonPrm: prm.commonPrm,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) statusError {
|
func (s *Service) get(ctx context.Context, prm RequestParameters) error {
|
||||||
exec := &execCtx{
|
exec := &request{
|
||||||
svc: s,
|
keyStore: s.keyStore,
|
||||||
prm: RangePrm{
|
traverserGenerator: s.traverserGenerator,
|
||||||
commonPrm: prm,
|
remoteStorageConstructor: s.remoteStorageConstructor,
|
||||||
},
|
epochSource: s.epochSource,
|
||||||
|
localStorage: s.localStorage,
|
||||||
|
|
||||||
|
prm: prm,
|
||||||
infoSplit: object.NewSplitInfo(),
|
infoSplit: object.NewSplitInfo(),
|
||||||
}
|
}
|
||||||
|
|
||||||
for i := range opts {
|
|
||||||
opts[i](exec)
|
|
||||||
}
|
|
||||||
|
|
||||||
exec.setLogger(s.log)
|
exec.setLogger(s.log)
|
||||||
|
|
||||||
exec.execute(ctx)
|
exec.execute(ctx)
|
||||||
|
|
||||||
return exec.statusError
|
return exec.statusError.err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *execCtx) execute(ctx context.Context) {
|
func (exec *request) execute(ctx context.Context) {
|
||||||
exec.log.Debug(logs.ServingRequest)
|
exec.log.Debug(logs.ServingRequest)
|
||||||
|
|
||||||
// perform local operation
|
// perform local operation
|
||||||
|
@ -92,7 +93,7 @@ func (exec *execCtx) execute(ctx context.Context) {
|
||||||
exec.analyzeStatus(ctx, true)
|
exec.analyzeStatus(ctx, true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) {
|
func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
|
||||||
// analyze local result
|
// analyze local result
|
||||||
switch exec.status {
|
switch exec.status {
|
||||||
case statusOK:
|
case statusOK:
|
||||||
|
@ -106,7 +107,7 @@ func (exec *execCtx) analyzeStatus(ctx context.Context, execCnr bool) {
|
||||||
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
|
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
|
||||||
default:
|
default:
|
||||||
exec.log.Debug(logs.OperationFinishedWithError,
|
exec.log.Debug(logs.OperationFinishedWithError,
|
||||||
zap.String("error", exec.err.Error()),
|
zap.Error(exec.err),
|
||||||
)
|
)
|
||||||
|
|
||||||
if execCnr {
|
if execCnr {
|
||||||
|
|
|
@ -2,6 +2,7 @@ package getsvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
"crypto/rand"
|
"crypto/rand"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
@ -56,7 +57,7 @@ type testClient struct {
|
||||||
|
|
||||||
type testEpochReceiver uint64
|
type testEpochReceiver uint64
|
||||||
|
|
||||||
func (e testEpochReceiver) currentEpoch() (uint64, error) {
|
func (e testEpochReceiver) Epoch() (uint64, error) {
|
||||||
return uint64(e), nil
|
return uint64(e), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -99,7 +100,7 @@ func (p *testPlacementBuilder) BuildPlacement(cnr cid.ID, obj *oid.ID, _ netmap.
|
||||||
return vs, nil
|
return vs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *testClientCache) get(info client.NodeInfo) (getClient, error) {
|
func (c *testClientCache) Get(info client.NodeInfo) (remoteStorage, error) {
|
||||||
v, ok := c.clients[network.StringifyGroup(info.AddressGroup())]
|
v, ok := c.clients[network.StringifyGroup(info.AddressGroup())]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, errors.New("could not construct client")
|
return nil, errors.New("could not construct client")
|
||||||
|
@ -117,8 +118,15 @@ func newTestClient() *testClient {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *testClient) getObject(ctx context.Context, exec *execCtx, _ client.NodeInfo) (*objectSDK.Object, error) {
|
func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err error) {
|
||||||
v, ok := c.results[exec.address().EncodeToString()]
|
c.results[addr.EncodeToString()] = struct {
|
||||||
|
obj *objectSDK.Object
|
||||||
|
err error
|
||||||
|
}{obj: obj, err: err}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *testClient) Get(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
|
||||||
|
v, ok := c.results[address.EncodeToString()]
|
||||||
if !ok {
|
if !ok {
|
||||||
var errNotFound apistatus.ObjectNotFound
|
var errNotFound apistatus.ObjectNotFound
|
||||||
|
|
||||||
|
@ -129,21 +137,38 @@ func (c *testClient) getObject(ctx context.Context, exec *execCtx, _ client.Node
|
||||||
return nil, v.err
|
return nil, v.err
|
||||||
}
|
}
|
||||||
|
|
||||||
return cutToRange(v.obj, exec.ctxRange()), nil
|
return v.obj, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *testClient) addResult(addr oid.Address, obj *objectSDK.Object, err error) {
|
func (c *testClient) Head(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
|
||||||
c.results[addr.EncodeToString()] = struct {
|
return c.Get(ctx, address, requestParams)
|
||||||
obj *objectSDK.Object
|
|
||||||
err error
|
|
||||||
}{obj: obj, err: err}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *testStorage) get(_ context.Context, exec *execCtx) (*objectSDK.Object, error) {
|
func (c *testClient) Range(ctx context.Context, address oid.Address, rng *objectSDK.Range, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
|
||||||
|
obj, err := c.Get(ctx, address, requestParams)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return cutToRange(obj, rng), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *testClient) ForwardRequest(ctx context.Context, info client.NodeInfo, forwarder RequestForwarder) (*objectSDK.Object, error) {
|
||||||
|
return nil, fmt.Errorf("not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testStorage) Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error) {
|
||||||
|
return s.Range(ctx, address, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testStorage) Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error) {
|
||||||
|
return s.Range(ctx, address, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *testStorage) Range(_ context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error) {
|
||||||
var (
|
var (
|
||||||
ok bool
|
ok bool
|
||||||
obj *objectSDK.Object
|
obj *objectSDK.Object
|
||||||
sAddr = exec.address().EncodeToString()
|
sAddr = address.EncodeToString()
|
||||||
)
|
)
|
||||||
|
|
||||||
if _, ok = s.inhumed[sAddr]; ok {
|
if _, ok = s.inhumed[sAddr]; ok {
|
||||||
|
@ -157,7 +182,7 @@ func (s *testStorage) get(_ context.Context, exec *execCtx) (*objectSDK.Object,
|
||||||
}
|
}
|
||||||
|
|
||||||
if obj, ok = s.phy[sAddr]; ok {
|
if obj, ok = s.phy[sAddr]; ok {
|
||||||
return cutToRange(obj, exec.ctxRange()), nil
|
return cutToRange(obj, rng), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var errNotFound apistatus.ObjectNotFound
|
var errNotFound apistatus.ObjectNotFound
|
||||||
|
@ -241,15 +266,21 @@ func (w *writePayloadErrorObjectWriter) WriteChunk(_ context.Context, _ []byte)
|
||||||
return &writePayloadError{}
|
return &writePayloadError{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type testKeyStorage struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ks *testKeyStorage) GetKey(_ *util.SessionInfo) (*ecdsa.PrivateKey, error) {
|
||||||
|
return &ecdsa.PrivateKey{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func TestGetLocalOnly(t *testing.T) {
|
func TestGetLocalOnly(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
newSvc := func(storage *testStorage) *Service {
|
newSvc := func(storage *testStorage) *Service {
|
||||||
svc := &Service{cfg: new(cfg)}
|
return &Service{
|
||||||
svc.log = test.NewLogger(t, false)
|
log: test.NewLogger(t, false),
|
||||||
svc.localStorage = storage
|
localStorage: storage,
|
||||||
|
}
|
||||||
return svc
|
|
||||||
}
|
}
|
||||||
|
|
||||||
newPrm := func(raw bool, w ObjectWriter) Prm {
|
newPrm := func(raw bool, w ObjectWriter) Prm {
|
||||||
|
@ -506,22 +537,21 @@ func TestGetRemoteSmall(t *testing.T) {
|
||||||
container.CalculateID(&idCnr, cnr)
|
container.CalculateID(&idCnr, cnr)
|
||||||
|
|
||||||
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
|
newSvc := func(b *testPlacementBuilder, c *testClientCache) *Service {
|
||||||
svc := &Service{cfg: new(cfg)}
|
|
||||||
svc.log = test.NewLogger(t, false)
|
|
||||||
svc.localStorage = newTestStorage()
|
|
||||||
|
|
||||||
const curEpoch = 13
|
const curEpoch = 13
|
||||||
|
|
||||||
svc.traverserGenerator = &testTraverserGenerator{
|
return &Service{
|
||||||
|
log: test.NewLogger(t, false),
|
||||||
|
localStorage: newTestStorage(),
|
||||||
|
traverserGenerator: &testTraverserGenerator{
|
||||||
c: cnr,
|
c: cnr,
|
||||||
b: map[uint64]placement.Builder{
|
b: map[uint64]placement.Builder{
|
||||||
curEpoch: b,
|
curEpoch: b,
|
||||||
},
|
},
|
||||||
|
},
|
||||||
|
epochSource: testEpochReceiver(curEpoch),
|
||||||
|
remoteStorageConstructor: c,
|
||||||
|
keyStore: &testKeyStorage{},
|
||||||
}
|
}
|
||||||
svc.clientCache = c
|
|
||||||
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
|
|
||||||
|
|
||||||
return svc
|
|
||||||
}
|
}
|
||||||
|
|
||||||
newPrm := func(raw bool, w ObjectWriter) Prm {
|
newPrm := func(raw bool, w ObjectWriter) Prm {
|
||||||
|
@ -1176,7 +1206,7 @@ func TestGetRemoteSmall(t *testing.T) {
|
||||||
|
|
||||||
err := svc.Get(ctx, p)
|
err := svc.Get(ctx, p)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Equal(t, err.Error(), "received child with empty parent")
|
require.ErrorIs(t, err, errChildWithEmptyParent)
|
||||||
|
|
||||||
w = NewSimpleObjectWriter()
|
w = NewSimpleObjectWriter()
|
||||||
payloadSz := srcObj.PayloadSize()
|
payloadSz := srcObj.PayloadSize()
|
||||||
|
@ -1189,7 +1219,7 @@ func TestGetRemoteSmall(t *testing.T) {
|
||||||
|
|
||||||
err = svc.GetRange(ctx, rngPrm)
|
err = svc.GetRange(ctx, rngPrm)
|
||||||
require.Error(t, err)
|
require.Error(t, err)
|
||||||
require.Equal(t, err.Error(), "received child with empty parent")
|
require.ErrorIs(t, err, errChildWithEmptyParent)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("out of range", func(t *testing.T) {
|
t.Run("out of range", func(t *testing.T) {
|
||||||
|
@ -1639,13 +1669,13 @@ func TestGetFromPastEpoch(t *testing.T) {
|
||||||
c22 := newTestClient()
|
c22 := newTestClient()
|
||||||
c22.addResult(addr, obj, nil)
|
c22.addResult(addr, obj, nil)
|
||||||
|
|
||||||
svc := &Service{cfg: new(cfg)}
|
|
||||||
svc.log = test.NewLogger(t, false)
|
|
||||||
svc.localStorage = newTestStorage()
|
|
||||||
|
|
||||||
const curEpoch = 13
|
const curEpoch = 13
|
||||||
|
|
||||||
svc.traverserGenerator = &testTraverserGenerator{
|
svc := &Service{
|
||||||
|
log: test.NewLogger(t, false),
|
||||||
|
localStorage: newTestStorage(),
|
||||||
|
epochSource: testEpochReceiver(curEpoch),
|
||||||
|
traverserGenerator: &testTraverserGenerator{
|
||||||
c: cnr,
|
c: cnr,
|
||||||
b: map[uint64]placement.Builder{
|
b: map[uint64]placement.Builder{
|
||||||
curEpoch: &testPlacementBuilder{
|
curEpoch: &testPlacementBuilder{
|
||||||
|
@ -1659,19 +1689,18 @@ func TestGetFromPastEpoch(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
}
|
},
|
||||||
|
remoteStorageConstructor: &testClientCache{
|
||||||
svc.clientCache = &testClientCache{
|
|
||||||
clients: map[string]*testClient{
|
clients: map[string]*testClient{
|
||||||
as[0][0]: c11,
|
as[0][0]: c11,
|
||||||
as[0][1]: c12,
|
as[0][1]: c12,
|
||||||
as[1][0]: c21,
|
as[1][0]: c21,
|
||||||
as[1][1]: c22,
|
as[1][1]: c22,
|
||||||
},
|
},
|
||||||
|
},
|
||||||
|
keyStore: &testKeyStorage{},
|
||||||
}
|
}
|
||||||
|
|
||||||
svc.currentEpochReceiver = testEpochReceiver(curEpoch)
|
|
||||||
|
|
||||||
w := NewSimpleObjectWriter()
|
w := NewSimpleObjectWriter()
|
||||||
|
|
||||||
commonPrm := new(util.CommonPrm)
|
commonPrm := new(util.CommonPrm)
|
||||||
|
|
|
@ -11,7 +11,7 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (exec *execCtx) executeLocal(ctx context.Context) {
|
func (r *request) executeLocal(ctx context.Context) {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "getService.executeLocal")
|
ctx, span := tracing.StartSpanFromContext(ctx, "getService.executeLocal")
|
||||||
defer func() {
|
defer func() {
|
||||||
span.End()
|
span.End()
|
||||||
|
@ -19,7 +19,7 @@ func (exec *execCtx) executeLocal(ctx context.Context) {
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
exec.collectedObject, err = exec.svc.localStorage.get(ctx, exec)
|
r.collectedObject, err = r.get(ctx)
|
||||||
|
|
||||||
var errSplitInfo *objectSDK.SplitInfoError
|
var errSplitInfo *objectSDK.SplitInfoError
|
||||||
var errRemoved apistatus.ObjectAlreadyRemoved
|
var errRemoved apistatus.ObjectAlreadyRemoved
|
||||||
|
@ -27,25 +27,33 @@ func (exec *execCtx) executeLocal(ctx context.Context) {
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
default:
|
default:
|
||||||
exec.status = statusUndefined
|
r.status = statusUndefined
|
||||||
exec.err = err
|
r.err = err
|
||||||
|
|
||||||
exec.log.Debug(logs.GetLocalGetFailed,
|
r.log.Debug(logs.GetLocalGetFailed, zap.Error(err))
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
case err == nil:
|
case err == nil:
|
||||||
exec.status = statusOK
|
r.status = statusOK
|
||||||
exec.err = nil
|
r.err = nil
|
||||||
exec.writeCollectedObject(ctx)
|
r.writeCollectedObject(ctx)
|
||||||
case errors.As(err, &errRemoved):
|
case errors.As(err, &errRemoved):
|
||||||
exec.status = statusINHUMED
|
r.status = statusINHUMED
|
||||||
exec.err = errRemoved
|
r.err = errRemoved
|
||||||
case errors.As(err, &errSplitInfo):
|
case errors.As(err, &errSplitInfo):
|
||||||
exec.status = statusVIRTUAL
|
r.status = statusVIRTUAL
|
||||||
mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo())
|
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
||||||
exec.err = objectSDK.NewSplitInfoError(exec.infoSplit)
|
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||||
case errors.As(err, &errOutOfRange):
|
case errors.As(err, &errOutOfRange):
|
||||||
exec.status = statusOutOfRange
|
r.status = statusOutOfRange
|
||||||
exec.err = errOutOfRange
|
r.err = errOutOfRange
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *request) get(ctx context.Context) (*objectSDK.Object, error) {
|
||||||
|
if r.headOnly() {
|
||||||
|
return r.localStorage.Head(ctx, r.address(), r.isRaw())
|
||||||
|
}
|
||||||
|
if rng := r.ctxRange(); rng != nil {
|
||||||
|
return r.localStorage.Range(ctx, r.address(), rng)
|
||||||
|
}
|
||||||
|
return r.localStorage.Get(ctx, r.address())
|
||||||
|
}
|
||||||
|
|
|
@ -3,12 +3,11 @@ package getsvc
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"errors"
|
|
||||||
"hash"
|
"hash"
|
||||||
|
|
||||||
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -21,14 +20,9 @@ type Prm struct {
|
||||||
type RangePrm struct {
|
type RangePrm struct {
|
||||||
commonPrm
|
commonPrm
|
||||||
|
|
||||||
rng *object.Range
|
rng *objectSDK.Range
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
|
||||||
errRangeZeroLength = errors.New("zero range length")
|
|
||||||
errRangeOverflow = errors.New("range overflow")
|
|
||||||
)
|
|
||||||
|
|
||||||
// Validate pre-validates `OBJECTRANGE` request's parameters content
|
// Validate pre-validates `OBJECTRANGE` request's parameters content
|
||||||
// without access to the requested object's payload.
|
// without access to the requested object's payload.
|
||||||
func (p RangePrm) Validate() error {
|
func (p RangePrm) Validate() error {
|
||||||
|
@ -54,12 +48,18 @@ type RangeHashPrm struct {
|
||||||
|
|
||||||
hashGen func() hash.Hash
|
hashGen func() hash.Hash
|
||||||
|
|
||||||
rngs []object.Range
|
rngs []objectSDK.Range
|
||||||
|
|
||||||
salt []byte
|
salt []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error)
|
type RequestParameters struct {
|
||||||
|
commonPrm
|
||||||
|
head bool
|
||||||
|
rng *objectSDK.Range
|
||||||
|
}
|
||||||
|
|
||||||
|
type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) (*objectSDK.Object, error)
|
||||||
|
|
||||||
// HeadPrm groups parameters of Head service call.
|
// HeadPrm groups parameters of Head service call.
|
||||||
type HeadPrm struct {
|
type HeadPrm struct {
|
||||||
|
@ -83,43 +83,25 @@ type commonPrm struct {
|
||||||
signerKey *ecdsa.PrivateKey
|
signerKey *ecdsa.PrivateKey
|
||||||
}
|
}
|
||||||
|
|
||||||
// ChunkWriter is an interface of target component
|
|
||||||
// to write payload chunk.
|
|
||||||
type ChunkWriter interface {
|
|
||||||
WriteChunk(context.Context, []byte) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// HeaderWriter is an interface of target component
|
|
||||||
// to write object header.
|
|
||||||
type HeaderWriter interface {
|
|
||||||
WriteHeader(context.Context, *object.Object) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// ObjectWriter is an interface of target component to write object.
|
|
||||||
type ObjectWriter interface {
|
|
||||||
HeaderWriter
|
|
||||||
ChunkWriter
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetObjectWriter sets target component to write the object.
|
// SetObjectWriter sets target component to write the object.
|
||||||
func (p *Prm) SetObjectWriter(w ObjectWriter) {
|
func (p *Prm) SetObjectWriter(w ObjectWriter) {
|
||||||
p.objWriter = w
|
p.objWriter = w
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetChunkWriter sets target component to write the object payload range.
|
// SetChunkWriter sets target component to write the object payload range.
|
||||||
func (p *RangePrm) SetChunkWriter(w ChunkWriter) {
|
func (p *commonPrm) SetChunkWriter(w ChunkWriter) {
|
||||||
p.objWriter = &partWriter{
|
p.objWriter = &partWriter{
|
||||||
chunkWriter: w,
|
chunkWriter: w,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetRange sets range of the requested payload data.
|
// SetRange sets range of the requested payload data.
|
||||||
func (p *RangePrm) SetRange(rng *object.Range) {
|
func (p *RangePrm) SetRange(rng *objectSDK.Range) {
|
||||||
p.rng = rng
|
p.rng = rng
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetRangeList sets a list of object payload ranges.
|
// SetRangeList sets a list of object payload ranges.
|
||||||
func (p *RangeHashPrm) SetRangeList(rngs []object.Range) {
|
func (p *RangeHashPrm) SetRangeList(rngs []objectSDK.Range) {
|
||||||
p.rngs = rngs
|
p.rngs = rngs
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -158,7 +140,7 @@ func (p *commonPrm) WithCachedSignerKey(signerKey *ecdsa.PrivateKey) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetHeaderWriter sets target component to write the object header.
|
// SetHeaderWriter sets target component to write the object header.
|
||||||
func (p *HeadPrm) SetHeaderWriter(w HeaderWriter) {
|
func (p *commonPrm) SetHeaderWriter(w HeaderWriter) {
|
||||||
p.objWriter = &partWriter{
|
p.objWriter = &partWriter{
|
||||||
headWriter: w,
|
headWriter: w,
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,18 +12,18 @@ import (
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool {
|
func (r *request) processNode(ctx context.Context, info client.NodeInfo) bool {
|
||||||
ctx, span := tracing.StartSpanFromContext(ctx, "getService.processNode")
|
ctx, span := tracing.StartSpanFromContext(ctx, "getService.processNode")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
|
|
||||||
exec.log.Debug(logs.ProcessingNode)
|
r.log.Debug(logs.ProcessingNode)
|
||||||
|
|
||||||
client, ok := exec.remoteClient(info)
|
rs, ok := r.getRemoteStorage(info)
|
||||||
if !ok {
|
if !ok {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
obj, err := client.getObject(ctx, exec, info)
|
obj, err := r.getRemote(ctx, rs, info)
|
||||||
|
|
||||||
var errSplitInfo *objectSDK.SplitInfoError
|
var errSplitInfo *objectSDK.SplitInfoError
|
||||||
var errRemoved *apistatus.ObjectAlreadyRemoved
|
var errRemoved *apistatus.ObjectAlreadyRemoved
|
||||||
|
@ -33,34 +33,66 @@ func (exec *execCtx) processNode(ctx context.Context, info client.NodeInfo) bool
|
||||||
default:
|
default:
|
||||||
var errNotFound apistatus.ObjectNotFound
|
var errNotFound apistatus.ObjectNotFound
|
||||||
|
|
||||||
exec.status = statusUndefined
|
r.status = statusUndefined
|
||||||
exec.err = errNotFound
|
r.err = errNotFound
|
||||||
|
|
||||||
exec.log.Debug(logs.GetRemoteCallFailed,
|
r.log.Debug(logs.GetRemoteCallFailed, zap.Error(err))
|
||||||
zap.String("error", err.Error()),
|
|
||||||
)
|
|
||||||
case err == nil:
|
case err == nil:
|
||||||
exec.status = statusOK
|
r.status = statusOK
|
||||||
exec.err = nil
|
r.err = nil
|
||||||
|
|
||||||
// both object and err are nil only if the original
|
// both object and err are nil only if the original
|
||||||
// request was forwarded to another node and the object
|
// request was forwarded to another node and the object
|
||||||
// has already been streamed to the requesting party
|
// has already been streamed to the requesting party
|
||||||
if obj != nil {
|
if obj != nil {
|
||||||
exec.collectedObject = obj
|
r.collectedObject = obj
|
||||||
exec.writeCollectedObject(ctx)
|
r.writeCollectedObject(ctx)
|
||||||
}
|
}
|
||||||
case errors.As(err, &errRemoved):
|
case errors.As(err, &errRemoved):
|
||||||
exec.status = statusINHUMED
|
r.status = statusINHUMED
|
||||||
exec.err = errRemoved
|
r.err = errRemoved
|
||||||
case errors.As(err, &errOutOfRange):
|
case errors.As(err, &errOutOfRange):
|
||||||
exec.status = statusOutOfRange
|
r.status = statusOutOfRange
|
||||||
exec.err = errOutOfRange
|
r.err = errOutOfRange
|
||||||
case errors.As(err, &errSplitInfo):
|
case errors.As(err, &errSplitInfo):
|
||||||
exec.status = statusVIRTUAL
|
r.status = statusVIRTUAL
|
||||||
mergeSplitInfo(exec.splitInfo(), errSplitInfo.SplitInfo())
|
mergeSplitInfo(r.splitInfo(), errSplitInfo.SplitInfo())
|
||||||
exec.err = objectSDK.NewSplitInfoError(exec.infoSplit)
|
r.err = objectSDK.NewSplitInfoError(r.infoSplit)
|
||||||
}
|
}
|
||||||
|
|
||||||
return exec.status != statusUndefined
|
return r.status != statusUndefined
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) getRemote(ctx context.Context, rs remoteStorage, info client.NodeInfo) (*objectSDK.Object, error) {
|
||||||
|
if r.isForwardingEnabled() {
|
||||||
|
return rs.ForwardRequest(ctx, info, r.prm.forwarder)
|
||||||
|
}
|
||||||
|
|
||||||
|
key, err := r.key()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
prm := RemoteRequestParams{
|
||||||
|
Epoch: r.curProcEpoch,
|
||||||
|
TTL: r.prm.common.TTL(),
|
||||||
|
PrivateKey: key,
|
||||||
|
SessionToken: r.prm.common.SessionToken(),
|
||||||
|
BearerToken: r.prm.common.BearerToken(),
|
||||||
|
XHeaders: r.prm.common.XHeaders(),
|
||||||
|
IsRaw: r.isRaw(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if r.headOnly() {
|
||||||
|
return rs.Head(ctx, r.address(), prm)
|
||||||
|
}
|
||||||
|
// we don't specify payload writer because we accumulate
|
||||||
|
// the object locally (even huge).
|
||||||
|
if rng := r.ctxRange(); rng != nil {
|
||||||
|
// Current spec allows other storage node to deny access,
|
||||||
|
// fallback to GET here.
|
||||||
|
return rs.Range(ctx, r.address(), rng, prm)
|
||||||
|
}
|
||||||
|
|
||||||
|
return rs.Get(ctx, r.address(), prm)
|
||||||
}
|
}
|
||||||
|
|
244
pkg/services/object/get/request.go
Normal file
244
pkg/services/object/get/request.go
Normal file
|
@ -0,0 +1,244 @@
|
||||||
|
package getsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
|
||||||
|
clientcore "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type request struct {
|
||||||
|
prm RequestParameters
|
||||||
|
|
||||||
|
statusError
|
||||||
|
|
||||||
|
infoSplit *objectSDK.SplitInfo
|
||||||
|
|
||||||
|
log *logger.Logger
|
||||||
|
|
||||||
|
collectedObject *objectSDK.Object
|
||||||
|
|
||||||
|
curProcEpoch uint64
|
||||||
|
|
||||||
|
keyStore keyStorage
|
||||||
|
epochSource epochSource
|
||||||
|
traverserGenerator traverserGenerator
|
||||||
|
remoteStorageConstructor remoteStorageConstructor
|
||||||
|
localStorage localStorage
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) setLogger(l *logger.Logger) {
|
||||||
|
req := "GET"
|
||||||
|
if r.headOnly() {
|
||||||
|
req = "HEAD"
|
||||||
|
} else if r.ctxRange() != nil {
|
||||||
|
req = "GET_RANGE"
|
||||||
|
}
|
||||||
|
|
||||||
|
r.log = &logger.Logger{Logger: l.With(
|
||||||
|
zap.String("request", req),
|
||||||
|
zap.Stringer("address", r.address()),
|
||||||
|
zap.Bool("raw", r.isRaw()),
|
||||||
|
zap.Bool("local", r.isLocal()),
|
||||||
|
zap.Bool("with session", r.prm.common.SessionToken() != nil),
|
||||||
|
zap.Bool("with bearer", r.prm.common.BearerToken() != nil),
|
||||||
|
)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) isLocal() bool {
|
||||||
|
return r.prm.common.LocalOnly()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) isRaw() bool {
|
||||||
|
return r.prm.raw
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) address() oid.Address {
|
||||||
|
return r.prm.addr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) key() (*ecdsa.PrivateKey, error) {
|
||||||
|
if r.prm.signerKey != nil {
|
||||||
|
// the key has already been requested and
|
||||||
|
// cached in the previous operations
|
||||||
|
return r.prm.signerKey, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var sessionInfo *util.SessionInfo
|
||||||
|
|
||||||
|
if tok := r.prm.common.SessionToken(); tok != nil {
|
||||||
|
sessionInfo = &util.SessionInfo{
|
||||||
|
ID: tok.ID(),
|
||||||
|
Owner: tok.Issuer(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.keyStore.GetKey(sessionInfo)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) canAssemble() bool {
|
||||||
|
return !r.isRaw() && !r.headOnly()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) splitInfo() *objectSDK.SplitInfo {
|
||||||
|
return r.infoSplit
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) containerID() cid.ID {
|
||||||
|
return r.address().Container()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) ctxRange() *objectSDK.Range {
|
||||||
|
return r.prm.rng
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) headOnly() bool {
|
||||||
|
return r.prm.head
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) netmapEpoch() uint64 {
|
||||||
|
return r.prm.common.NetmapEpoch()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) netmapLookupDepth() uint64 {
|
||||||
|
return r.prm.common.NetmapLookupDepth()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) initEpoch() bool {
|
||||||
|
r.curProcEpoch = r.netmapEpoch()
|
||||||
|
if r.curProcEpoch > 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
e, err := r.epochSource.Epoch()
|
||||||
|
|
||||||
|
switch {
|
||||||
|
default:
|
||||||
|
r.status = statusUndefined
|
||||||
|
r.err = err
|
||||||
|
|
||||||
|
r.log.Debug(logs.CouldNotGetCurrentEpochNumber, zap.Error(err))
|
||||||
|
|
||||||
|
return false
|
||||||
|
case err == nil:
|
||||||
|
r.curProcEpoch = e
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) generateTraverser(addr oid.Address) (*placement.Traverser, bool) {
|
||||||
|
obj := addr.Object()
|
||||||
|
|
||||||
|
t, err := r.traverserGenerator.GenerateTraverser(addr.Container(), &obj, r.curProcEpoch)
|
||||||
|
|
||||||
|
switch {
|
||||||
|
default:
|
||||||
|
r.status = statusUndefined
|
||||||
|
r.err = err
|
||||||
|
|
||||||
|
r.log.Debug(logs.GetCouldNotGenerateContainerTraverser, zap.Error(err))
|
||||||
|
|
||||||
|
return nil, false
|
||||||
|
case err == nil:
|
||||||
|
return t, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) getRemoteStorage(info clientcore.NodeInfo) (remoteStorage, bool) {
|
||||||
|
rs, err := r.remoteStorageConstructor.Get(info)
|
||||||
|
if err != nil {
|
||||||
|
r.status = statusUndefined
|
||||||
|
r.err = err
|
||||||
|
|
||||||
|
r.log.Debug(logs.GetCouldNotConstructRemoteNodeClient)
|
||||||
|
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
return rs, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) writeCollectedHeader(ctx context.Context) bool {
|
||||||
|
if r.ctxRange() != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
err := r.prm.objWriter.WriteHeader(
|
||||||
|
ctx,
|
||||||
|
r.collectedObject.CutPayload(),
|
||||||
|
)
|
||||||
|
|
||||||
|
switch {
|
||||||
|
default:
|
||||||
|
r.status = statusUndefined
|
||||||
|
r.err = err
|
||||||
|
|
||||||
|
r.log.Debug(logs.GetCouldNotWriteHeader, zap.Error(err))
|
||||||
|
case err == nil:
|
||||||
|
r.status = statusOK
|
||||||
|
r.err = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.status == statusOK
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) writeObjectPayload(ctx context.Context, obj *objectSDK.Object) bool {
|
||||||
|
if r.headOnly() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
err := r.prm.objWriter.WriteChunk(ctx, obj.Payload())
|
||||||
|
|
||||||
|
switch {
|
||||||
|
default:
|
||||||
|
r.status = statusUndefined
|
||||||
|
r.err = err
|
||||||
|
|
||||||
|
r.log.Debug(logs.GetCouldNotWritePayloadChunk, zap.Error(err))
|
||||||
|
case err == nil:
|
||||||
|
r.status = statusOK
|
||||||
|
r.err = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return err == nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *request) writeCollectedObject(ctx context.Context) {
|
||||||
|
if ok := r.writeCollectedHeader(ctx); ok {
|
||||||
|
r.writeObjectPayload(ctx, r.collectedObject)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// isForwardingEnabled returns true if common execution
|
||||||
|
// parameters has request forwarding closure set.
|
||||||
|
func (r request) isForwardingEnabled() bool {
|
||||||
|
return r.prm.forwarder != nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// disableForwarding removes request forwarding closure from common
|
||||||
|
// parameters, so it won't be inherited in new execution contexts.
|
||||||
|
func (r *request) disableForwarding() {
|
||||||
|
r.prm.SetRequestForwarder(nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
func mergeSplitInfo(dst, src *objectSDK.SplitInfo) {
|
||||||
|
if last, ok := src.LastPart(); ok {
|
||||||
|
dst.SetLastPart(last)
|
||||||
|
}
|
||||||
|
|
||||||
|
if link, ok := src.Link(); ok {
|
||||||
|
dst.SetLink(link)
|
||||||
|
}
|
||||||
|
|
||||||
|
if splitID := src.SplitID(); splitID != nil {
|
||||||
|
dst.SetSplitID(splitID)
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,9 +0,0 @@
|
||||||
package getsvc
|
|
||||||
|
|
||||||
type RangeHashRes struct {
|
|
||||||
hashes [][]byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *RangeHashRes) Hashes() [][]byte {
|
|
||||||
return r.hashes
|
|
||||||
}
|
|
|
@ -1,124 +1,54 @@
|
||||||
package getsvc
|
package getsvc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
|
||||||
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
||||||
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Option is a Service's constructor option.
|
||||||
|
type Option func(*Service)
|
||||||
|
|
||||||
// Service utility serving requests of Object.Get service.
|
// Service utility serving requests of Object.Get service.
|
||||||
type Service struct {
|
type Service struct {
|
||||||
*cfg
|
|
||||||
}
|
|
||||||
|
|
||||||
// Option is a Service's constructor option.
|
|
||||||
type Option func(*cfg)
|
|
||||||
|
|
||||||
type getClient interface {
|
|
||||||
getObject(context.Context, *execCtx, client.NodeInfo) (*object.Object, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
type cfg struct {
|
|
||||||
log *logger.Logger
|
log *logger.Logger
|
||||||
|
localStorage localStorage
|
||||||
localStorage interface {
|
traverserGenerator traverserGenerator
|
||||||
get(context.Context, *execCtx) (*object.Object, error)
|
epochSource epochSource
|
||||||
}
|
keyStore keyStorage
|
||||||
|
remoteStorageConstructor remoteStorageConstructor
|
||||||
clientCache interface {
|
|
||||||
get(client.NodeInfo) (getClient, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
traverserGenerator interface {
|
|
||||||
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
currentEpochReceiver interface {
|
|
||||||
currentEpoch() (uint64, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
keyStore *util.KeyStorage
|
|
||||||
}
|
|
||||||
|
|
||||||
func defaultCfg() *cfg {
|
|
||||||
return &cfg{
|
|
||||||
log: &logger.Logger{Logger: zap.L()},
|
|
||||||
localStorage: new(storageEngineWrapper),
|
|
||||||
clientCache: new(clientCacheWrapper),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// New creates, initializes and returns utility serving
|
// New creates, initializes and returns utility serving
|
||||||
// Object.Get service requests.
|
// Object.Get service requests.
|
||||||
func New(opts ...Option) *Service {
|
func New(
|
||||||
c := defaultCfg()
|
ks keyStorage,
|
||||||
|
es epochSource,
|
||||||
for i := range opts {
|
e localStorageEngine,
|
||||||
opts[i](c)
|
tg traverserGenerator,
|
||||||
|
cc clientConstructor,
|
||||||
|
opts ...Option,
|
||||||
|
) *Service {
|
||||||
|
result := &Service{
|
||||||
|
keyStore: ks,
|
||||||
|
epochSource: es,
|
||||||
|
log: &logger.Logger{Logger: zap.L()},
|
||||||
|
localStorage: &engineLocalStorage{
|
||||||
|
engine: e,
|
||||||
|
},
|
||||||
|
traverserGenerator: tg,
|
||||||
|
remoteStorageConstructor: &multiclientRemoteStorageConstructor{
|
||||||
|
clientConstructor: cc,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
for _, option := range opts {
|
||||||
return &Service{
|
option(result)
|
||||||
cfg: c,
|
|
||||||
}
|
}
|
||||||
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
// WithLogger returns option to specify Get service's logger.
|
// WithLogger returns option to specify Get service's logger.
|
||||||
func WithLogger(l *logger.Logger) Option {
|
func WithLogger(l *logger.Logger) Option {
|
||||||
return func(c *cfg) {
|
return func(s *Service) {
|
||||||
c.log = &logger.Logger{Logger: l.With(zap.String("component", "Object.Get service"))}
|
s.log = &logger.Logger{Logger: l.With(zap.String("component", "Object.Get service"))}
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithLocalStorageEngine returns option to set local storage
|
|
||||||
// instance.
|
|
||||||
func WithLocalStorageEngine(e *engine.StorageEngine) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.localStorage.(*storageEngineWrapper).engine = e
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type ClientConstructor interface {
|
|
||||||
Get(client.NodeInfo) (client.MultiAddressClient, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithClientConstructor returns option to set constructor of remote node clients.
|
|
||||||
func WithClientConstructor(v ClientConstructor) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.clientCache.(*clientCacheWrapper).cache = v
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithTraverserGenerator returns option to set generator of
|
|
||||||
// placement traverser to get the objects from containers.
|
|
||||||
func WithTraverserGenerator(t *util.TraverserGenerator) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.traverserGenerator = t
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithNetMapSource returns option to set network
|
|
||||||
// map storage to receive current network state.
|
|
||||||
func WithNetMapSource(nmSrc netmap.Source) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.currentEpochReceiver = &nmSrcWrapper{
|
|
||||||
nmSrc: nmSrc,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// WithKeyStorage returns option to set private
|
|
||||||
// key storage for session tokens and node key.
|
|
||||||
func WithKeyStorage(store *util.KeyStorage) Option {
|
|
||||||
return func(c *cfg) {
|
|
||||||
c.keyStore = store
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
14
pkg/services/object/get/status.go
Normal file
14
pkg/services/object/get/status.go
Normal file
|
@ -0,0 +1,14 @@
|
||||||
|
package getsvc
|
||||||
|
|
||||||
|
const (
|
||||||
|
statusUndefined int = iota
|
||||||
|
statusOK
|
||||||
|
statusINHUMED
|
||||||
|
statusVIRTUAL
|
||||||
|
statusOutOfRange
|
||||||
|
)
|
||||||
|
|
||||||
|
type statusError struct {
|
||||||
|
status int
|
||||||
|
err error
|
||||||
|
}
|
238
pkg/services/object/get/types.go
Normal file
238
pkg/services/object/get/types.go
Normal file
|
@ -0,0 +1,238 @@
|
||||||
|
package getsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/ecdsa"
|
||||||
|
"errors"
|
||||||
|
|
||||||
|
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
||||||
|
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/util"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object_manager/placement"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/bearer"
|
||||||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||||||
|
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||||||
|
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||||||
|
)
|
||||||
|
|
||||||
|
type epochSource interface {
|
||||||
|
Epoch() (uint64, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type traverserGenerator interface {
|
||||||
|
GenerateTraverser(cid.ID, *oid.ID, uint64) (*placement.Traverser, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type keyStorage interface {
|
||||||
|
GetKey(info *util.SessionInfo) (*ecdsa.PrivateKey, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type localStorageEngine interface {
|
||||||
|
Head(ctx context.Context, p engine.HeadPrm) (engine.HeadRes, error)
|
||||||
|
GetRange(ctx context.Context, p engine.RngPrm) (engine.RngRes, error)
|
||||||
|
Get(ctx context.Context, p engine.GetPrm) (engine.GetRes, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type clientConstructor interface {
|
||||||
|
Get(coreclient.NodeInfo) (coreclient.MultiAddressClient, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type remoteStorageConstructor interface {
|
||||||
|
Get(coreclient.NodeInfo) (remoteStorage, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type multiclientRemoteStorageConstructor struct {
|
||||||
|
clientConstructor clientConstructor
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *multiclientRemoteStorageConstructor) Get(info coreclient.NodeInfo) (remoteStorage, error) {
|
||||||
|
clt, err := c.clientConstructor.Get(info)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &multiaddressRemoteStorage{
|
||||||
|
client: clt,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type localStorage interface {
|
||||||
|
Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error)
|
||||||
|
Range(ctx context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error)
|
||||||
|
Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type engineLocalStorage struct {
|
||||||
|
engine localStorageEngine
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *engineLocalStorage) Head(ctx context.Context, address oid.Address, isRaw bool) (*objectSDK.Object, error) {
|
||||||
|
var headPrm engine.HeadPrm
|
||||||
|
headPrm.WithAddress(address)
|
||||||
|
headPrm.WithRaw(isRaw)
|
||||||
|
|
||||||
|
r, err := s.engine.Head(ctx, headPrm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.Header(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *engineLocalStorage) Range(ctx context.Context, address oid.Address, rng *objectSDK.Range) (*objectSDK.Object, error) {
|
||||||
|
var getRange engine.RngPrm
|
||||||
|
getRange.WithAddress(address)
|
||||||
|
getRange.WithPayloadRange(rng)
|
||||||
|
|
||||||
|
r, err := s.engine.GetRange(ctx, getRange)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.Object(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *engineLocalStorage) Get(ctx context.Context, address oid.Address) (*objectSDK.Object, error) {
|
||||||
|
var getPrm engine.GetPrm
|
||||||
|
getPrm.WithAddress(address)
|
||||||
|
|
||||||
|
r, err := s.engine.Get(ctx, getPrm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return r.Object(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type RemoteRequestParams struct {
|
||||||
|
Epoch uint64
|
||||||
|
TTL uint32
|
||||||
|
PrivateKey *ecdsa.PrivateKey
|
||||||
|
SessionToken *session.Object
|
||||||
|
BearerToken *bearer.Token
|
||||||
|
XHeaders []string
|
||||||
|
IsRaw bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type remoteStorage interface {
|
||||||
|
Get(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error)
|
||||||
|
Head(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error)
|
||||||
|
Range(ctx context.Context, address oid.Address, rng *objectSDK.Range, requestParams RemoteRequestParams) (*objectSDK.Object, error)
|
||||||
|
|
||||||
|
ForwardRequest(ctx context.Context, info coreclient.NodeInfo, forwarder RequestForwarder) (*objectSDK.Object, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type multiaddressRemoteStorage struct {
|
||||||
|
client coreclient.MultiAddressClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *multiaddressRemoteStorage) ForwardRequest(ctx context.Context, info coreclient.NodeInfo, forwarder RequestForwarder) (*objectSDK.Object, error) {
|
||||||
|
return forwarder(ctx, info, s.client)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *multiaddressRemoteStorage) Range(ctx context.Context, address oid.Address, rng *objectSDK.Range, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
|
||||||
|
var prm internalclient.PayloadRangePrm
|
||||||
|
|
||||||
|
prm.SetClient(s.client)
|
||||||
|
prm.SetTTL(requestParams.TTL)
|
||||||
|
prm.SetNetmapEpoch(requestParams.Epoch)
|
||||||
|
prm.SetAddress(address)
|
||||||
|
prm.SetPrivateKey(requestParams.PrivateKey)
|
||||||
|
prm.SetSessionToken(requestParams.SessionToken)
|
||||||
|
prm.SetBearerToken(requestParams.BearerToken)
|
||||||
|
prm.SetXHeaders(requestParams.XHeaders)
|
||||||
|
prm.SetRange(rng)
|
||||||
|
if requestParams.IsRaw {
|
||||||
|
prm.SetRawFlag()
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := internalclient.PayloadRange(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
var errAccessDenied *apistatus.ObjectAccessDenied
|
||||||
|
if errors.As(err, &errAccessDenied) {
|
||||||
|
obj, err := s.Get(ctx, address, requestParams)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
payload := obj.Payload()
|
||||||
|
from := rng.GetOffset()
|
||||||
|
to := from + rng.GetLength()
|
||||||
|
|
||||||
|
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
|
||||||
|
return nil, new(apistatus.ObjectOutOfRange)
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.payloadOnlyObject(payload[from:to]), nil
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.payloadOnlyObject(res.PayloadRange()), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *multiaddressRemoteStorage) Head(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
|
||||||
|
var prm internalclient.HeadObjectPrm
|
||||||
|
|
||||||
|
prm.SetClient(s.client)
|
||||||
|
prm.SetTTL(requestParams.TTL)
|
||||||
|
prm.SetNetmapEpoch(requestParams.Epoch)
|
||||||
|
prm.SetAddress(address)
|
||||||
|
prm.SetPrivateKey(requestParams.PrivateKey)
|
||||||
|
prm.SetSessionToken(requestParams.SessionToken)
|
||||||
|
prm.SetBearerToken(requestParams.BearerToken)
|
||||||
|
prm.SetXHeaders(requestParams.XHeaders)
|
||||||
|
|
||||||
|
if requestParams.IsRaw {
|
||||||
|
prm.SetRawFlag()
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := internalclient.HeadObject(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.Header(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *multiaddressRemoteStorage) Get(ctx context.Context, address oid.Address, requestParams RemoteRequestParams) (*objectSDK.Object, error) {
|
||||||
|
var prm internalclient.GetObjectPrm
|
||||||
|
|
||||||
|
prm.SetClient(s.client)
|
||||||
|
prm.SetTTL(requestParams.TTL)
|
||||||
|
prm.SetNetmapEpoch(requestParams.Epoch)
|
||||||
|
prm.SetAddress(address)
|
||||||
|
prm.SetPrivateKey(requestParams.PrivateKey)
|
||||||
|
prm.SetSessionToken(requestParams.SessionToken)
|
||||||
|
prm.SetBearerToken(requestParams.BearerToken)
|
||||||
|
prm.SetXHeaders(requestParams.XHeaders)
|
||||||
|
|
||||||
|
if requestParams.IsRaw {
|
||||||
|
prm.SetRawFlag()
|
||||||
|
}
|
||||||
|
|
||||||
|
res, err := internalclient.GetObject(ctx, prm)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return res.Object(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *multiaddressRemoteStorage) payloadOnlyObject(payload []byte) *objectSDK.Object {
|
||||||
|
obj := objectSDK.New()
|
||||||
|
obj.SetPayload(payload)
|
||||||
|
|
||||||
|
return obj
|
||||||
|
}
|
||||||
|
|
||||||
|
type RangeHashRes struct {
|
||||||
|
hashes [][]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *RangeHashRes) Hashes() [][]byte {
|
||||||
|
return r.hashes
|
||||||
|
}
|
|
@ -1,261 +0,0 @@
|
||||||
package getsvc
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"crypto/ecdsa"
|
|
||||||
"errors"
|
|
||||||
"io"
|
|
||||||
|
|
||||||
coreclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/netmap"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/local_object_storage/engine"
|
|
||||||
internalclient "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/internal/client"
|
|
||||||
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
|
||||||
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
|
||||||
)
|
|
||||||
|
|
||||||
type SimpleObjectWriter struct {
|
|
||||||
obj *object.Object
|
|
||||||
|
|
||||||
pld []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
type clientCacheWrapper struct {
|
|
||||||
cache ClientConstructor
|
|
||||||
}
|
|
||||||
|
|
||||||
type clientWrapper struct {
|
|
||||||
client coreclient.MultiAddressClient
|
|
||||||
}
|
|
||||||
|
|
||||||
type storageEngineWrapper struct {
|
|
||||||
engine *engine.StorageEngine
|
|
||||||
}
|
|
||||||
|
|
||||||
type partWriter struct {
|
|
||||||
ObjectWriter
|
|
||||||
|
|
||||||
headWriter HeaderWriter
|
|
||||||
|
|
||||||
chunkWriter ChunkWriter
|
|
||||||
}
|
|
||||||
|
|
||||||
type hasherWrapper struct {
|
|
||||||
hash io.Writer
|
|
||||||
}
|
|
||||||
|
|
||||||
type nmSrcWrapper struct {
|
|
||||||
nmSrc netmap.Source
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewSimpleObjectWriter() *SimpleObjectWriter {
|
|
||||||
return &SimpleObjectWriter{
|
|
||||||
obj: object.New(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SimpleObjectWriter) WriteHeader(_ context.Context, obj *object.Object) error {
|
|
||||||
s.obj = obj
|
|
||||||
|
|
||||||
s.pld = make([]byte, 0, obj.PayloadSize())
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SimpleObjectWriter) WriteChunk(_ context.Context, p []byte) error {
|
|
||||||
s.pld = append(s.pld, p...)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *SimpleObjectWriter) Object() *object.Object {
|
|
||||||
if len(s.pld) > 0 {
|
|
||||||
s.obj.SetPayload(s.pld)
|
|
||||||
}
|
|
||||||
|
|
||||||
return s.obj
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *clientCacheWrapper) get(info coreclient.NodeInfo) (getClient, error) {
|
|
||||||
clt, err := c.cache.Get(info)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return &clientWrapper{
|
|
||||||
client: clt,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *clientWrapper) getObject(ctx context.Context, exec *execCtx, info coreclient.NodeInfo) (*object.Object, error) {
|
|
||||||
if exec.isForwardingEnabled() {
|
|
||||||
return exec.prm.forwarder(ctx, info, c.client)
|
|
||||||
}
|
|
||||||
|
|
||||||
key, err := exec.key()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if exec.headOnly() {
|
|
||||||
return c.getHeadOnly(ctx, exec, key)
|
|
||||||
}
|
|
||||||
// we don't specify payload writer because we accumulate
|
|
||||||
// the object locally (even huge).
|
|
||||||
if rng := exec.ctxRange(); rng != nil {
|
|
||||||
// Current spec allows other storage node to deny access,
|
|
||||||
// fallback to GET here.
|
|
||||||
return c.getRange(ctx, exec, key, rng)
|
|
||||||
}
|
|
||||||
|
|
||||||
return c.get(ctx, exec, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *clientWrapper) getRange(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey, rng *object.Range) (*object.Object, error) {
|
|
||||||
var prm internalclient.PayloadRangePrm
|
|
||||||
|
|
||||||
prm.SetClient(c.client)
|
|
||||||
prm.SetTTL(exec.prm.common.TTL())
|
|
||||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
|
||||||
prm.SetAddress(exec.address())
|
|
||||||
prm.SetPrivateKey(key)
|
|
||||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
|
||||||
prm.SetBearerToken(exec.prm.common.BearerToken())
|
|
||||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
|
||||||
prm.SetRange(rng)
|
|
||||||
|
|
||||||
if exec.isRaw() {
|
|
||||||
prm.SetRawFlag()
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := internalclient.PayloadRange(ctx, prm)
|
|
||||||
if err != nil {
|
|
||||||
var errAccessDenied *apistatus.ObjectAccessDenied
|
|
||||||
if errors.As(err, &errAccessDenied) {
|
|
||||||
obj, err := c.get(ctx, exec, key)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
payload := obj.Payload()
|
|
||||||
from := rng.GetOffset()
|
|
||||||
to := from + rng.GetLength()
|
|
||||||
|
|
||||||
if pLen := uint64(len(payload)); to < from || pLen < from || pLen < to {
|
|
||||||
return nil, new(apistatus.ObjectOutOfRange)
|
|
||||||
}
|
|
||||||
|
|
||||||
return payloadOnlyObject(payload[from:to]), nil
|
|
||||||
}
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return payloadOnlyObject(res.PayloadRange()), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *clientWrapper) getHeadOnly(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
|
|
||||||
var prm internalclient.HeadObjectPrm
|
|
||||||
|
|
||||||
prm.SetClient(c.client)
|
|
||||||
prm.SetTTL(exec.prm.common.TTL())
|
|
||||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
|
||||||
prm.SetAddress(exec.address())
|
|
||||||
prm.SetPrivateKey(key)
|
|
||||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
|
||||||
prm.SetBearerToken(exec.prm.common.BearerToken())
|
|
||||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
|
||||||
|
|
||||||
if exec.isRaw() {
|
|
||||||
prm.SetRawFlag()
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := internalclient.HeadObject(ctx, prm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return res.Header(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *clientWrapper) get(ctx context.Context, exec *execCtx, key *ecdsa.PrivateKey) (*object.Object, error) {
|
|
||||||
var prm internalclient.GetObjectPrm
|
|
||||||
|
|
||||||
prm.SetClient(c.client)
|
|
||||||
prm.SetTTL(exec.prm.common.TTL())
|
|
||||||
prm.SetNetmapEpoch(exec.curProcEpoch)
|
|
||||||
prm.SetAddress(exec.address())
|
|
||||||
prm.SetPrivateKey(key)
|
|
||||||
prm.SetSessionToken(exec.prm.common.SessionToken())
|
|
||||||
prm.SetBearerToken(exec.prm.common.BearerToken())
|
|
||||||
prm.SetXHeaders(exec.prm.common.XHeaders())
|
|
||||||
|
|
||||||
if exec.isRaw() {
|
|
||||||
prm.SetRawFlag()
|
|
||||||
}
|
|
||||||
|
|
||||||
res, err := internalclient.GetObject(ctx, prm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return res.Object(), nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *storageEngineWrapper) get(ctx context.Context, exec *execCtx) (*object.Object, error) {
|
|
||||||
if exec.headOnly() {
|
|
||||||
var headPrm engine.HeadPrm
|
|
||||||
headPrm.WithAddress(exec.address())
|
|
||||||
headPrm.WithRaw(exec.isRaw())
|
|
||||||
|
|
||||||
r, err := e.engine.Head(ctx, headPrm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return r.Header(), nil
|
|
||||||
} else if rng := exec.ctxRange(); rng != nil {
|
|
||||||
var getRange engine.RngPrm
|
|
||||||
getRange.WithAddress(exec.address())
|
|
||||||
getRange.WithPayloadRange(rng)
|
|
||||||
|
|
||||||
r, err := e.engine.GetRange(ctx, getRange)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return r.Object(), nil
|
|
||||||
} else {
|
|
||||||
var getPrm engine.GetPrm
|
|
||||||
getPrm.WithAddress(exec.address())
|
|
||||||
|
|
||||||
r, err := e.engine.Get(ctx, getPrm)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return r.Object(), nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *partWriter) WriteChunk(ctx context.Context, p []byte) error {
|
|
||||||
return w.chunkWriter.WriteChunk(ctx, p)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *partWriter) WriteHeader(ctx context.Context, o *object.Object) error {
|
|
||||||
return w.headWriter.WriteHeader(ctx, o)
|
|
||||||
}
|
|
||||||
|
|
||||||
func payloadOnlyObject(payload []byte) *object.Object {
|
|
||||||
obj := object.New()
|
|
||||||
obj.SetPayload(payload)
|
|
||||||
|
|
||||||
return obj
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *hasherWrapper) WriteChunk(_ context.Context, p []byte) error {
|
|
||||||
_, err := h.hash.Write(p)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *nmSrcWrapper) currentEpoch() (uint64, error) {
|
|
||||||
return n.nmSrc.Epoch()
|
|
||||||
}
|
|
92
pkg/services/object/get/v2/errors.go
Normal file
92
pkg/services/object/get/v2/errors.go
Normal file
|
@ -0,0 +1,92 @@
|
||||||
|
package getsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
refs "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/refs"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
errMissingObjAddress = errors.New("missing object address")
|
||||||
|
errWrongMessageSeq = errors.New("incorrect message sequence")
|
||||||
|
errNilObjectPart = errors.New("nil object part")
|
||||||
|
errMissingSignature = errors.New("missing signature")
|
||||||
|
errInvalidObjectIDSign = errors.New("invalid object ID signature")
|
||||||
|
|
||||||
|
errWrongHeaderPartTypeExpShortRecvWithSignature = fmt.Errorf("wrong header part type: expected %T, received %T",
|
||||||
|
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
|
||||||
|
)
|
||||||
|
errWrongHeaderPartTypeExpWithSignRecvShort = fmt.Errorf("wrong header part type: expected %T, received %T",
|
||||||
|
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
func errInvalidObjAddress(err error) error {
|
||||||
|
return fmt.Errorf("invalid object address: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func errRequestParamsValidation(err error) error {
|
||||||
|
return fmt.Errorf("request params validation: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func errFetchingSessionKey(err error) error {
|
||||||
|
return fmt.Errorf("fetching session key: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func errUnknownChechsumType(t refs.ChecksumType) error {
|
||||||
|
return fmt.Errorf("unknown checksum type %v", t)
|
||||||
|
}
|
||||||
|
|
||||||
|
func errResponseVerificationFailed(err error) error {
|
||||||
|
return fmt.Errorf("response verification failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func errCouldNotWriteObjHeader(err error) error {
|
||||||
|
return fmt.Errorf("could not write object header in Get forwarder: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func errStreamOpenningFailed(err error) error {
|
||||||
|
return fmt.Errorf("stream opening failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func errReadingResponseFailed(err error) error {
|
||||||
|
return fmt.Errorf("reading the response failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func errUnexpectedObjectPart(v objectV2.GetObjectPart) error {
|
||||||
|
return fmt.Errorf("unexpected object part %T", v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func errCouldNotWriteObjChunk(forwarder string, err error) error {
|
||||||
|
return fmt.Errorf("could not write object chunk in %s forwarder: %w", forwarder, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func errCouldNotVerifyRangeResponse(resp *objectV2.GetRangeResponse, err error) error {
|
||||||
|
return fmt.Errorf("could not verify %T: %w", resp, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func errCouldNotCreateGetRangeStream(err error) error {
|
||||||
|
return fmt.Errorf("could not create Get payload range stream: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func errUnexpectedRangePart(v objectV2.GetRangePart) error {
|
||||||
|
return fmt.Errorf("unexpected range type %T", v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func errUnexpectedHeaderPart(v objectV2.GetHeaderPart) error {
|
||||||
|
return fmt.Errorf("unexpected header type %T", v)
|
||||||
|
}
|
||||||
|
|
||||||
|
func errMarshalID(err error) error {
|
||||||
|
return fmt.Errorf("marshal ID: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func errCantReadSignature(err error) error {
|
||||||
|
return fmt.Errorf("can't read signature: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func errSendingRequestFailed(err error) error {
|
||||||
|
return fmt.Errorf("sending the request failed: %w", err)
|
||||||
|
}
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
@ -71,7 +70,7 @@ func (f *getRequestForwarder) verifyResponse(resp *objectV2.GetResponse, pubkey
|
||||||
|
|
||||||
// verify response structure
|
// verify response structure
|
||||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||||
return fmt.Errorf("response verification failed: %w", err)
|
return errResponseVerificationFailed(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return checkStatus(resp.GetMetaHeader().GetStatus())
|
return checkStatus(resp.GetMetaHeader().GetStatus())
|
||||||
|
@ -89,7 +88,7 @@ func (f *getRequestForwarder) writeHeader(ctx context.Context, v *objectV2.GetOb
|
||||||
err = f.Stream.WriteHeader(ctx, object.NewFromV2(obj))
|
err = f.Stream.WriteHeader(ctx, object.NewFromV2(obj))
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("could not write object header in Get forwarder: %w", err)
|
return errCouldNotWriteObjHeader(err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -102,7 +101,7 @@ func (f *getRequestForwarder) openStream(ctx context.Context, addr network.Addre
|
||||||
return e
|
return e
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("stream opening failed: %w", err)
|
return nil, errStreamOpenningFailed(err)
|
||||||
}
|
}
|
||||||
return getStream, nil
|
return getStream, nil
|
||||||
}
|
}
|
||||||
|
@ -127,7 +126,7 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr
|
||||||
}
|
}
|
||||||
|
|
||||||
internalclient.ReportError(c, err)
|
internalclient.ReportError(c, err)
|
||||||
return fmt.Errorf("reading the response failed: %w", err)
|
return errReadingResponseFailed(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := f.verifyResponse(resp, pubkey); err != nil {
|
if err := f.verifyResponse(resp, pubkey); err != nil {
|
||||||
|
@ -136,7 +135,7 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr
|
||||||
|
|
||||||
switch v := resp.GetBody().GetObjectPart().(type) {
|
switch v := resp.GetBody().GetObjectPart().(type) {
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("unexpected object part %T", v)
|
return errUnexpectedObjectPart(v)
|
||||||
case *objectV2.GetObjectPartInit:
|
case *objectV2.GetObjectPartInit:
|
||||||
if headWas {
|
if headWas {
|
||||||
return errWrongMessageSeq
|
return errWrongMessageSeq
|
||||||
|
@ -159,7 +158,7 @@ func (f *getRequestForwarder) readStream(ctx context.Context, c client.MultiAddr
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = f.Stream.WriteChunk(ctx, chunk); err != nil {
|
if err = f.Stream.WriteChunk(ctx, chunk); err != nil {
|
||||||
return fmt.Errorf("could not write object chunk in Get forwarder: %w", err)
|
return errCouldNotWriteObjChunk("Get", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
localProgress += len(origChunk)
|
localProgress += len(origChunk)
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"io"
|
"io"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
@ -73,7 +72,7 @@ func (f *getRangeRequestForwarder) verifyResponse(resp *objectV2.GetRangeRespons
|
||||||
|
|
||||||
// verify response structure
|
// verify response structure
|
||||||
if err := signature.VerifyServiceMessage(resp); err != nil {
|
if err := signature.VerifyServiceMessage(resp); err != nil {
|
||||||
return fmt.Errorf("could not verify %T: %w", resp, err)
|
return errCouldNotVerifyRangeResponse(resp, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return checkStatus(resp.GetMetaHeader().GetStatus())
|
return checkStatus(resp.GetMetaHeader().GetStatus())
|
||||||
|
@ -88,7 +87,7 @@ func (f *getRangeRequestForwarder) openStream(ctx context.Context, addr network.
|
||||||
return e
|
return e
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("could not create Get payload range stream: %w", err)
|
return nil, errCouldNotCreateGetRangeStream(err)
|
||||||
}
|
}
|
||||||
return rangeStream, nil
|
return rangeStream, nil
|
||||||
}
|
}
|
||||||
|
@ -105,7 +104,7 @@ func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
internalclient.ReportError(c, err)
|
internalclient.ReportError(c, err)
|
||||||
return fmt.Errorf("reading the response failed: %w", err)
|
return errReadingResponseFailed(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := f.verifyResponse(resp, pubkey); err != nil {
|
if err := f.verifyResponse(resp, pubkey); err != nil {
|
||||||
|
@ -114,7 +113,7 @@ func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *
|
||||||
|
|
||||||
switch v := resp.GetBody().GetRangePart().(type) {
|
switch v := resp.GetBody().GetRangePart().(type) {
|
||||||
case nil:
|
case nil:
|
||||||
return fmt.Errorf("unexpected range type %T", v)
|
return errUnexpectedRangePart(v)
|
||||||
case *objectV2.GetRangePartChunk:
|
case *objectV2.GetRangePartChunk:
|
||||||
origChunk := v.GetChunk()
|
origChunk := v.GetChunk()
|
||||||
|
|
||||||
|
@ -125,7 +124,7 @@ func (f *getRangeRequestForwarder) readStream(ctx context.Context, rangeStream *
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = f.Stream.WriteChunk(ctx, chunk); err != nil {
|
if err = f.Stream.WriteChunk(ctx, chunk); err != nil {
|
||||||
return fmt.Errorf("could not write object chunk in GetRange forwarder: %w", err)
|
return errCouldNotWriteObjChunk("GetRange", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
localProgress += len(origChunk)
|
localProgress += len(origChunk)
|
||||||
|
|
|
@ -3,8 +3,6 @@ package getsvc
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
objectV2 "git.frostfs.info/TrueCloudLab/frostfs-api-go/v2/object"
|
||||||
|
@ -74,7 +72,7 @@ func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr ne
|
||||||
|
|
||||||
switch v := headResp.GetBody().GetHeaderPart().(type) {
|
switch v := headResp.GetBody().GetHeaderPart().(type) {
|
||||||
case nil:
|
case nil:
|
||||||
return nil, fmt.Errorf("unexpected header type %T", v)
|
return nil, errUnexpectedHeaderPart(v)
|
||||||
case *objectV2.ShortHeader:
|
case *objectV2.ShortHeader:
|
||||||
if hdr, err = f.getHeaderFromShortHeader(v); err != nil {
|
if hdr, err = f.getHeaderFromShortHeader(v); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -100,9 +98,7 @@ func (f *headRequestForwarder) forwardRequestToNode(ctx context.Context, addr ne
|
||||||
|
|
||||||
func (f *headRequestForwarder) getHeaderFromShortHeader(sh *objectV2.ShortHeader) (*objectV2.Header, error) {
|
func (f *headRequestForwarder) getHeaderFromShortHeader(sh *objectV2.ShortHeader) (*objectV2.Header, error) {
|
||||||
if !f.Request.GetBody().GetMainOnly() {
|
if !f.Request.GetBody().GetMainOnly() {
|
||||||
return nil, fmt.Errorf("wrong header part type: expected %T, received %T",
|
return nil, errWrongHeaderPartTypeExpShortRecvWithSignature
|
||||||
(*objectV2.ShortHeader)(nil), (*objectV2.HeaderWithSignature)(nil),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
hdr := new(objectV2.Header)
|
hdr := new(objectV2.Header)
|
||||||
|
@ -118,35 +114,32 @@ func (f *headRequestForwarder) getHeaderFromShortHeader(sh *objectV2.ShortHeader
|
||||||
|
|
||||||
func (f *headRequestForwarder) getHeaderAndSignature(hdrWithSig *objectV2.HeaderWithSignature) (*objectV2.Header, *refs.Signature, error) {
|
func (f *headRequestForwarder) getHeaderAndSignature(hdrWithSig *objectV2.HeaderWithSignature) (*objectV2.Header, *refs.Signature, error) {
|
||||||
if f.Request.GetBody().GetMainOnly() {
|
if f.Request.GetBody().GetMainOnly() {
|
||||||
return nil, nil, fmt.Errorf("wrong header part type: expected %T, received %T",
|
return nil, nil, errWrongHeaderPartTypeExpWithSignRecvShort
|
||||||
(*objectV2.HeaderWithSignature)(nil), (*objectV2.ShortHeader)(nil),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if hdrWithSig == nil {
|
if hdrWithSig == nil {
|
||||||
return nil, nil, errors.New("nil object part")
|
return nil, nil, errNilObjectPart
|
||||||
}
|
}
|
||||||
|
|
||||||
hdr := hdrWithSig.GetHeader()
|
hdr := hdrWithSig.GetHeader()
|
||||||
idSig := hdrWithSig.GetSignature()
|
idSig := hdrWithSig.GetSignature()
|
||||||
|
|
||||||
if idSig == nil {
|
if idSig == nil {
|
||||||
// TODO(@cthulhu-rider): #1387 use "const" error
|
return nil, nil, errMissingSignature
|
||||||
return nil, nil, errors.New("missing signature")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
binID, err := f.ObjectAddr.Object().Marshal()
|
binID, err := f.ObjectAddr.Object().Marshal()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("marshal ID: %w", err)
|
return nil, nil, errMarshalID(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
var sig frostfscrypto.Signature
|
var sig frostfscrypto.Signature
|
||||||
if err := sig.ReadFromV2(*idSig); err != nil {
|
if err := sig.ReadFromV2(*idSig); err != nil {
|
||||||
return nil, nil, fmt.Errorf("can't read signature: %w", err)
|
return nil, nil, errCantReadSignature(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !sig.Verify(binID) {
|
if !sig.Verify(binID) {
|
||||||
return nil, nil, errors.New("invalid object ID signature")
|
return nil, nil, errInvalidObjectIDSign
|
||||||
}
|
}
|
||||||
|
|
||||||
return hdr, idSig, nil
|
return hdr, idSig, nil
|
||||||
|
@ -160,7 +153,7 @@ func (f *headRequestForwarder) sendHeadRequest(ctx context.Context, addr network
|
||||||
return e
|
return e
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("sending the request failed: %w", err)
|
return nil, errSendingRequestFailed(err)
|
||||||
}
|
}
|
||||||
return headResp, nil
|
return headResp, nil
|
||||||
}
|
}
|
||||||
|
@ -173,7 +166,7 @@ func (f *headRequestForwarder) verifyResponse(headResp *objectV2.HeadResponse, p
|
||||||
|
|
||||||
// verify response structure
|
// verify response structure
|
||||||
if err := signature.VerifyServiceMessage(headResp); err != nil {
|
if err := signature.VerifyServiceMessage(headResp); err != nil {
|
||||||
return fmt.Errorf("response verification failed: %w", err)
|
return errResponseVerificationFailed(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return checkStatus(f.Response.GetMetaHeader().GetStatus())
|
return checkStatus(f.Response.GetMetaHeader().GetStatus())
|
||||||
|
|
|
@ -4,7 +4,6 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
|
||||||
"hash"
|
"hash"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
|
@ -24,21 +23,19 @@ import (
|
||||||
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
"git.frostfs.info/TrueCloudLab/tzhash/tz"
|
||||||
)
|
)
|
||||||
|
|
||||||
var errWrongMessageSeq = errors.New("incorrect message sequence")
|
|
||||||
|
|
||||||
func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) {
|
func (s *Service) toPrm(req *objectV2.GetRequest, stream objectSvc.GetObjectStream) (*getsvc.Prm, error) {
|
||||||
body := req.GetBody()
|
body := req.GetBody()
|
||||||
|
|
||||||
addrV2 := body.GetAddress()
|
addrV2 := body.GetAddress()
|
||||||
if addrV2 == nil {
|
if addrV2 == nil {
|
||||||
return nil, errors.New("missing object address")
|
return nil, errMissingObjAddress
|
||||||
}
|
}
|
||||||
|
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
|
|
||||||
err := addr.ReadFromV2(*addrV2)
|
err := addr.ReadFromV2(*addrV2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("invalid object address: %w", err)
|
return nil, errInvalidObjAddress(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
commonPrm, err := util.CommonPrmFromV2(req)
|
commonPrm, err := util.CommonPrmFromV2(req)
|
||||||
|
@ -81,14 +78,14 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
|
||||||
|
|
||||||
addrV2 := body.GetAddress()
|
addrV2 := body.GetAddress()
|
||||||
if addrV2 == nil {
|
if addrV2 == nil {
|
||||||
return nil, errors.New("missing object address")
|
return nil, errMissingObjAddress
|
||||||
}
|
}
|
||||||
|
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
|
|
||||||
err := addr.ReadFromV2(*addrV2)
|
err := addr.ReadFromV2(*addrV2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("invalid object address: %w", err)
|
return nil, errInvalidObjAddress(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
commonPrm, err := util.CommonPrmFromV2(req)
|
commonPrm, err := util.CommonPrmFromV2(req)
|
||||||
|
@ -108,7 +105,7 @@ func (s *Service) toRangePrm(req *objectV2.GetRangeRequest, stream objectSvc.Get
|
||||||
|
|
||||||
err = p.Validate()
|
err = p.Validate()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("request params validation: %w", err)
|
return nil, errRequestParamsValidation(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !commonPrm.LocalOnly() {
|
if !commonPrm.LocalOnly() {
|
||||||
|
@ -136,14 +133,14 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran
|
||||||
|
|
||||||
addrV2 := body.GetAddress()
|
addrV2 := body.GetAddress()
|
||||||
if addrV2 == nil {
|
if addrV2 == nil {
|
||||||
return nil, errors.New("missing object address")
|
return nil, errMissingObjAddress
|
||||||
}
|
}
|
||||||
|
|
||||||
var addr oid.Address
|
var addr oid.Address
|
||||||
|
|
||||||
err := addr.ReadFromV2(*addrV2)
|
err := addr.ReadFromV2(*addrV2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("invalid object address: %w", err)
|
return nil, errInvalidObjAddress(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
commonPrm, err := util.CommonPrmFromV2(req)
|
commonPrm, err := util.CommonPrmFromV2(req)
|
||||||
|
@ -167,7 +164,7 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran
|
||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("fetching session key: %w", err)
|
return nil, errFetchingSessionKey(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
p.WithCachedSignerKey(signerKey)
|
p.WithCachedSignerKey(signerKey)
|
||||||
|
@ -185,7 +182,7 @@ func (s *Service) toHashRangePrm(req *objectV2.GetRangeHashRequest) (*getsvc.Ran
|
||||||
|
|
||||||
switch t := body.GetType(); t {
|
switch t := body.GetType(); t {
|
||||||
default:
|
default:
|
||||||
return nil, fmt.Errorf("unknown checksum type %v", t)
|
return nil, errUnknownChechsumType(t)
|
||||||
case refs.SHA256:
|
case refs.SHA256:
|
||||||
p.SetHashGenerator(func() hash.Hash {
|
p.SetHashGenerator(func() hash.Hash {
|
||||||
return sha256.New()
|
return sha256.New()
|
||||||
|
@ -220,14 +217,14 @@ func (s *Service) toHeadPrm(req *objectV2.HeadRequest, resp *objectV2.HeadRespon
|
||||||
|
|
||||||
addrV2 := body.GetAddress()
|
addrV2 := body.GetAddress()
|
||||||
if addrV2 == nil {
|
if addrV2 == nil {
|
||||||
return nil, errors.New("missing object address")
|
return nil, errMissingObjAddress
|
||||||
}
|
}
|
||||||
|
|
||||||
var objAddr oid.Address
|
var objAddr oid.Address
|
||||||
|
|
||||||
err := objAddr.ReadFromV2(*addrV2)
|
err := objAddr.ReadFromV2(*addrV2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("invalid object address: %w", err)
|
return nil, errInvalidObjAddress(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
commonPrm, err := util.CommonPrmFromV2(req)
|
commonPrm, err := util.CommonPrmFromV2(req)
|
||||||
|
|
84
pkg/services/object/get/writer.go
Normal file
84
pkg/services/object/get/writer.go
Normal file
|
@ -0,0 +1,84 @@
|
||||||
|
package getsvc
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ChunkWriter is an interface of target component
|
||||||
|
// to write payload chunk.
|
||||||
|
type ChunkWriter interface {
|
||||||
|
WriteChunk(context.Context, []byte) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// HeaderWriter is an interface of target component
|
||||||
|
// to write object header.
|
||||||
|
type HeaderWriter interface {
|
||||||
|
WriteHeader(context.Context, *object.Object) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// ObjectWriter is an interface of target component to write object.
|
||||||
|
type ObjectWriter interface {
|
||||||
|
HeaderWriter
|
||||||
|
ChunkWriter
|
||||||
|
}
|
||||||
|
|
||||||
|
type SimpleObjectWriter struct {
|
||||||
|
obj *object.Object
|
||||||
|
|
||||||
|
pld []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type partWriter struct {
|
||||||
|
ObjectWriter
|
||||||
|
|
||||||
|
headWriter HeaderWriter
|
||||||
|
|
||||||
|
chunkWriter ChunkWriter
|
||||||
|
}
|
||||||
|
|
||||||
|
type hasherWrapper struct {
|
||||||
|
hash io.Writer
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewSimpleObjectWriter() *SimpleObjectWriter {
|
||||||
|
return &SimpleObjectWriter{
|
||||||
|
obj: object.New(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SimpleObjectWriter) WriteHeader(_ context.Context, obj *object.Object) error {
|
||||||
|
s.obj = obj
|
||||||
|
|
||||||
|
s.pld = make([]byte, 0, obj.PayloadSize())
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SimpleObjectWriter) WriteChunk(_ context.Context, p []byte) error {
|
||||||
|
s.pld = append(s.pld, p...)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SimpleObjectWriter) Object() *object.Object {
|
||||||
|
if len(s.pld) > 0 {
|
||||||
|
s.obj.SetPayload(s.pld)
|
||||||
|
}
|
||||||
|
|
||||||
|
return s.obj
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *partWriter) WriteChunk(ctx context.Context, p []byte) error {
|
||||||
|
return w.chunkWriter.WriteChunk(ctx, p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *partWriter) WriteHeader(ctx context.Context, o *object.Object) error {
|
||||||
|
return w.headWriter.WriteHeader(ctx, o)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *hasherWrapper) WriteChunk(_ context.Context, p []byte) error {
|
||||||
|
_, err := h.hash.Write(p)
|
||||||
|
return err
|
||||||
|
}
|
Loading…
Reference in a new issue