package getsvc import ( "context" "fmt" "git.frostfs.info/TrueCloudLab/frostfs-node/internal/logs" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/container" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/policy" "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/util/logger" objectSDK "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object" "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/erasurecode" oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id" "go.uber.org/zap" "golang.org/x/sync/errgroup" ) type assemblerec struct { addr oid.Address ecInfo *objectSDK.ECInfo rng *objectSDK.Range objGetter objectGetter cs container.Source log *logger.Logger } func newAssemblerEC( addr oid.Address, ecInfo *objectSDK.ECInfo, rng *objectSDK.Range, objGetter objectGetter, cs container.Source, log *logger.Logger, ) *assemblerec { return &assemblerec{ addr: addr, rng: rng, ecInfo: ecInfo, objGetter: objGetter, cs: cs, log: log, } } // Assemble assembles erasure-coded object and writes it's content to ObjectWriter. // It returns parent object. func (a *assemblerec) Assemble(ctx context.Context, writer ObjectWriter, headOnly bool) (*objectSDK.Object, error) { parts := a.retrieveParts(ctx, headOnly) cnt, err := a.cs.Get(a.addr.Container()) if err != nil { return nil, err } c, err := erasurecode.NewConstructor( policy.ECDataCount(cnt.Value.PlacementPolicy()), policy.ECParityCount(cnt.Value.PlacementPolicy()), ) if err != nil { return nil, err } if headOnly { obj, err := c.ReconstructHeader(parts) if err == nil { return obj, writer.WriteHeader(ctx, obj) } return nil, err } obj, err := c.Reconstruct(parts) if err == nil { err = writer.WriteHeader(ctx, obj.CutPayload()) if err == nil { err = writer.WriteChunk(ctx, obj.Payload()) if err != nil { return nil, err } } } return obj, err } func (a *assemblerec) retrieveParts(mainCtx context.Context, headOnly bool) []*objectSDK.Object { parts := make([]*objectSDK.Object, int(a.ecInfo.Chunks[0].Total)) errGroup, ctx := errgroup.WithContext(mainCtx) for i := range a.ecInfo.Chunks { chunk := a.ecInfo.Chunks[i] errGroup.Go(func() error { objID := new(oid.ID) err := objID.ReadFromV2(chunk.ID) if err != nil { return fmt.Errorf("invalid object ID: %w", err) } var obj *objectSDK.Object if headOnly { obj, err = a.objGetter.HeadObject(ctx, *objID) if err != nil { a.log.Debug(logs.GetUnableToHeadPartECObject, zap.Stringer("part_id", objID), zap.Error(err)) return nil } } else { sow := NewSimpleObjectWriter() obj, err = a.objGetter.GetObjectAndWritePayload(ctx, *objID, nil, sow) if err != nil { a.log.Debug(logs.GetUnableToGetPartECObject, zap.Stringer("part_id", objID), zap.Error(err)) return nil } obj.SetPayload(sow.pld) } parts[chunk.Index] = obj return nil }) } if err := errGroup.Wait(); err != nil { a.log.Debug(logs.GetUnableToGetAllPartsECObject, zap.Error(err)) } return parts }