forked from TrueCloudLab/frostfs-sdk-go
108 lines
2.8 KiB
Go
108 lines
2.8 KiB
Go
|
package pool
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"crypto/ecdsa"
|
||
|
|
||
|
sdkClient "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||
|
apistatus "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||
|
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/session"
|
||
|
)
|
||
|
|
||
|
type PrmObjectPutClientCutInit struct {
|
||
|
sdkClient.PrmObjectPutInit
|
||
|
key *ecdsa.PrivateKey
|
||
|
maxSize uint64
|
||
|
epochSource transformer.EpochSource
|
||
|
withoutHomomorphicHash bool
|
||
|
stoken *session.Object
|
||
|
}
|
||
|
|
||
|
func (c *clientWrapper) objectPutInitTransformer(prm PrmObjectPutClientCutInit) (*objectWriterTransformer, error) {
|
||
|
var w objectWriterTransformer
|
||
|
w.it = internalTarget{
|
||
|
client: c.client,
|
||
|
prm: prm,
|
||
|
}
|
||
|
key := &c.prm.key
|
||
|
if prm.key != nil {
|
||
|
key = prm.key
|
||
|
}
|
||
|
|
||
|
w.ot = transformer.NewPayloadSizeLimiter(transformer.Params{
|
||
|
Key: key,
|
||
|
NextTargetInit: func() transformer.ObjectWriter { return &w.it },
|
||
|
MaxSize: prm.maxSize,
|
||
|
WithoutHomomorphicHash: prm.withoutHomomorphicHash,
|
||
|
NetworkState: prm.epochSource,
|
||
|
SessionToken: prm.stoken,
|
||
|
})
|
||
|
return &w, nil
|
||
|
}
|
||
|
|
||
|
type objectWriterTransformer struct {
|
||
|
ot transformer.ChunkedObjectWriter
|
||
|
it internalTarget
|
||
|
err error
|
||
|
}
|
||
|
|
||
|
func (x *objectWriterTransformer) WriteHeader(ctx context.Context, hdr object.Object) bool {
|
||
|
x.err = x.ot.WriteHeader(ctx, &hdr)
|
||
|
return x.err == nil
|
||
|
}
|
||
|
|
||
|
func (x *objectWriterTransformer) WritePayloadChunk(ctx context.Context, chunk []byte) bool {
|
||
|
_, x.err = x.ot.Write(ctx, chunk)
|
||
|
return x.err == nil
|
||
|
}
|
||
|
|
||
|
// ResObjectPut groups the final result values of ObjectPutInit operation.
|
||
|
type ResObjectPut struct {
|
||
|
Status apistatus.Status
|
||
|
OID oid.ID
|
||
|
}
|
||
|
|
||
|
func (x *objectWriterTransformer) Close(ctx context.Context) (*ResObjectPut, error) {
|
||
|
ai, err := x.ot.Close(ctx)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
if ai != nil && ai.ParentID != nil {
|
||
|
x.it.res.OID = *ai.ParentID
|
||
|
}
|
||
|
return &x.it.res, nil
|
||
|
}
|
||
|
|
||
|
type internalTarget struct {
|
||
|
client *sdkClient.Client
|
||
|
res ResObjectPut
|
||
|
prm PrmObjectPutClientCutInit
|
||
|
useStream bool
|
||
|
}
|
||
|
|
||
|
func (it *internalTarget) WriteObject(ctx context.Context, o *object.Object) error {
|
||
|
// todo support PutSingle
|
||
|
it.useStream = true
|
||
|
return it.putAsStream(ctx, o)
|
||
|
}
|
||
|
|
||
|
func (it *internalTarget) putAsStream(ctx context.Context, o *object.Object) error {
|
||
|
wrt, err := it.client.ObjectPutInit(ctx, it.prm.PrmObjectPutInit)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
if wrt.WriteHeader(ctx, *o) {
|
||
|
wrt.WritePayloadChunk(ctx, o.Payload())
|
||
|
}
|
||
|
res, err := wrt.Close(ctx)
|
||
|
if res != nil {
|
||
|
it.res.Status = res.Status()
|
||
|
it.res.OID = res.StoredObjectID()
|
||
|
}
|
||
|
return err
|
||
|
}
|