frostfs-node/pkg/services/object/get/assemblerec.go
Anton Nikiforov 00b2b77b26 [#1112] node: Implement Range\RangeHash requests for EC object
Signed-off-by: Anton Nikiforov <an.nikiforov@yadro.com>
2024-05-07 14:47:21 +03:00

159 lines
4.3 KiB
Go

package getsvc
import (
"context"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger"
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
type assemblerec struct {
addr oid.Address
ecInfo *objectSDK.ECInfo
rng *objectSDK.Range
objGetter objectGetter
cs container.Source
log *logger.Logger
}
func newAssemblerEC(
addr oid.Address,
ecInfo *objectSDK.ECInfo,
rng *objectSDK.Range,
objGetter objectGetter,
cs container.Source,
log *logger.Logger,
) *assemblerec {
return &assemblerec{
addr: addr,
rng: rng,
ecInfo: ecInfo,
objGetter: objGetter,
cs: cs,
log: log,
}
}
// Assemble assembles erasure-coded object and writes it's content to ObjectWriter.
// It returns parent object.
func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) {
if headOnly {
return a.reconstructHeader(ctx, writer)
} else if a.rng != nil {
return a.reconstructRange(ctx, writer)
}
return a.reconstructObject(ctx, writer)
}
func (a *assemblerec) getConstructor() (*erasurecode.Constructor, error) {
cnt, err := a.cs.Get(a.addr.Container())
if err != nil {
return nil, err
}
dataCount := policy.ECDataCount(cnt.Value.PlacementPolicy())
parityCount := policy.ECParityCount(cnt.Value.PlacementPolicy())
return erasurecode.NewConstructor(dataCount, parityCount)
}
func (a *assemblerec) reconstructHeader(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
parts := a.retrieveParts(ctx, true)
c, err := a.getConstructor()
if err != nil {
return nil, err
}
obj, err := c.ReconstructHeader(parts)
if err == nil {
return obj, writer.WriteHeader(ctx, obj)
}
return nil, err
}
func (a *assemblerec) reconstructRange(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
parts := a.retrieveParts(ctx, false)
c, err := a.getConstructor()
if err != nil {
return nil, err
}
obj, err := c.Reconstruct(parts)
if err != nil {
return nil, err
}
from := a.rng.GetOffset()
to := from + a.rng.GetLength()
if pLen := uint64(len(obj.Payload())); to < from || pLen < from || pLen < to {
return nil, &apistatus.ObjectOutOfRange{}
}
err = writer.WriteChunk(ctx, obj.Payload()[from:to])
if err != nil {
return nil, err
}
return obj, err
}
func (a *assemblerec) reconstructObject(ctx context.Context, writer ObjectWriter) (*objectSDK.Object, error) {
parts := a.retrieveParts(ctx, false)
c, err := a.getConstructor()
if err != nil {
return nil, err
}
obj, err := c.Reconstruct(parts)
if err == nil {
err = writer.WriteHeader(ctx, obj.CutPayload())
if err == nil {
err = writer.WriteChunk(ctx, obj.Payload())
if err != nil {
return nil, err
}
}
}
return obj, err
}
func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object {
parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total))
errGroup, ctx := errgroup.WithContext(mainCtx)
for i := range a.ecInfo.Chunks {
chunk := a.ecInfo.Chunks[i]
errGroup.Go(func() error {
objID := new(oid.ID)
err := objID.ReadFromV2(chunk.ID)
if err != nil {
return fmt.Errorf("invalid object ID: %w", err)
}
var obj *objectSDK.Object
if headOnly {
obj, err = a.objGetter.HeadObject(ctx, *objID)
if err != nil {
a.log.Debug(logs.GetUnableToHeadPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
} else {
sow := NewSimpleObjectWriter()
obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow)
if err != nil {
a.log.Debug(logs.GetUnableToGetPartECObject, zap.Stringer("part_id", objID), zap.Error(err))
return nil
}
obj.SetPayload(sow.pld)
}
parts[chunk.Index] = obj
return nil
})
}
if err := errGroup.Wait(); err != nil {
a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err))
}
return parts
}