frostfs-node/pkg/services/object/get/get.go

138 lines
3.8 KiB
Go
Raw Permalink Normal View History

package getsvc
import (
"context"
"errors"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"go.uber.org/zap"
)
// Get serves a request to get an object by address, and returns Streamer instance.
func (s *Service) Get(ctx context.Context, prm Prm) error {
return s.get(ctx, RequestParameters{
commonPrm: prm.commonPrm,
})
}
// GetRange serves a request to get an object by address, and returns Streamer instance.
func (s *Service) GetRange(ctx context.Context, prm RangePrm) error {
return s.get(ctx, RequestParameters{
commonPrm: prm.commonPrm,
rng: prm.rng,
})
}
func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHashRes, error) {
hashes := make([][]byte, 0, len(prm.rngs))
for _, rng := range prm.rngs {
h := prm.hashGen()
// For big ranges we could fetch range-hashes from different nodes and concatenate them locally.
// However,
// 1. Potential gains are insignificant when operating in the Internet given typical latencies and losses.
// 2. Parallel solution is more complex in terms of code.
// 3. TZ-hash is likely to be disabled in private installations.
reqPrm := RequestParameters{
commonPrm: prm.commonPrm,
rng: &rng,
}
reqPrm.SetChunkWriter(&hasherWrapper{
hash: util.NewSaltingWriter(h, prm.salt),
})
if err := s.get(ctx, reqPrm); err != nil {
return nil, err
}
hashes = append(hashes, h.Sum(nil))
}
return &RangeHashRes{
hashes: hashes,
}, nil
}
// Head reads object header from container.
//
// Returns ErrNotFound if the header was not received for the call.
// Returns SplitInfoError if object is virtual and raw flag is set.
func (s *Service) Head(ctx context.Context, prm HeadPrm) error {
return s.get(ctx, RequestParameters{
head: true,
commonPrm: prm.commonPrm,
})
}
func (s *Service) get(ctx context.Context, prm RequestParameters) error {
exec := &request{
keyStore: s.keyStore,
traverserGenerator: s.traverserGenerator,
remoteStorageConstructor: s.remoteStorageConstructor,
epochSource: s.epochSource,
localStorage: s.localStorage,
containerSource: s.containerSource,
prm: prm,
infoSplit: objectSDK.NewSplitInfo(),
infoEC: newECInfo(),
log: s.log,
}
exec.setLogger(s.log)
exec.execute(ctx)
return exec.statusError.err
}
func (exec *request) execute(ctx context.Context) {
exec.log.Debug(logs.ServingRequest)
// perform local operation
exec.executeLocal(ctx)
exec.analyzeStatus(ctx, true)
}
func (exec *request) analyzeStatus(ctx context.Context, execCnr bool) {
// analyze local result
switch exec.status {
case statusOK:
exec.log.Debug(logs.OperationFinishedSuccessfully)
case statusINHUMED:
exec.log.Debug(logs.GetRequestedObjectWasMarkedAsRemoved)
case statusVIRTUAL:
exec.log.Debug(logs.GetRequestedObjectIsVirtual)
exec.assemble(ctx)
case statusOutOfRange:
exec.log.Debug(logs.GetRequestedRangeIsOutOfObjectBounds)
case statusEC:
exec.log.Debug(logs.GetRequestedObjectIsEC)
if exec.isRaw() && execCnr {
exec.executeOnContainer(ctx)
exec.analyzeStatus(ctx, false)
}
exec.assembleEC(ctx)
default:
exec.log.Debug(logs.OperationFinishedWithError,
zap.Error(exec.err),
)
var errAccessDenied *apistatus.ObjectAccessDenied
if execCnr && errors.As(exec.err, &errAccessDenied) {
// Local get can't return access denied error, so this error was returned by
// write to the output stream. So there is no need to try to find object on other nodes.
return
}
if execCnr {
exec.executeOnContainer(ctx)
exec.analyzeStatus(ctx, false)
}
}
}