frostfs-node/pkg/services/object/put/streamer.go

86 lines
1.9 KiB
Go
Raw Normal View History

package putsvc
import (
"context"
"errors"
"fmt"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/core/client"
"git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/target"
objectwriter "git.frostfs.info/TrueCloudLab/frostfs-node/pkg/services/object/common/writer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
)
type Streamer struct {
*objectwriter.Config
target transformer.ChunkedObjectWriter
relay func(context.Context, client.NodeInfo, client.MultiAddressClient) error
}
var errNotInit = errors.New("stream not initialized")
var errInitRecall = errors.New("init recall")
func (p *Streamer) Init(ctx context.Context, prm *PutInitPrm) error {
if p.target != nil {
return errInitRecall
}
// initialize destination target
prmTarget := &objectwriter.Params{
Config: p.Config,
Common: prm.common,
Header: prm.hdr,
Container: prm.cnr,
TraverseOpts: prm.traverseOpts,
Relay: p.relay,
}
var err error
p.target, err = target.New(prmTarget)
if err != nil {
return fmt.Errorf("(%T) could not initialize object target: %w", p, err)
}
if err := p.target.WriteHeader(ctx, prm.hdr); err != nil {
return fmt.Errorf("(%T) could not write header to target: %w", p, err)
}
return nil
}
func (p *Streamer) SendChunk(ctx context.Context, prm *PutChunkPrm) error {
if p.target == nil {
return errNotInit
}
if _, err := p.target.Write(ctx, prm.chunk); err != nil {
return fmt.Errorf("(%T) could not write payload chunk to target: %w", p, err)
}
return nil
}
func (p *Streamer) Close(ctx context.Context) (*PutResponse, error) {
if p.target == nil {
return nil, errNotInit
}
ids, err := p.target.Close(ctx)
if err != nil {
return nil, fmt.Errorf("(%T) could not close object target: %w", p, err)
}
id := ids.ParentID
if id != nil {
return &PutResponse{
id: *id,
}, nil
}
return &PutResponse{
id: ids.SelfID,
}, nil
}