frostfs-http-gw/neofs/client-plant.go
Pavel Korotkov a44551d42b Add connection pool implementation (part 2)
Signed-off-by: Pavel Korotkov <pavel@nspcc.ru>
2021-04-06 12:19:21 +03:00

266 lines
6.6 KiB
Go

package neofs
import (
"bytes"
"context"
"crypto/ecdsa"
"io"
"math"
"github.com/nspcc-dev/neofs-api-go/pkg/client"
"github.com/nspcc-dev/neofs-api-go/pkg/container"
"github.com/nspcc-dev/neofs-api-go/pkg/object"
"github.com/nspcc-dev/neofs-api-go/pkg/owner"
"github.com/nspcc-dev/neofs-api-go/pkg/token"
"github.com/nspcc-dev/neofs-http-gate/connections"
objectCore "github.com/nspcc-dev/neofs-node/pkg/core/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/transformer"
"github.com/pkg/errors"
)
const maxObjectSize = uint64(1 << 28) // Limit objects to 256 MiB.
type BaseOptions struct {
Client client.Client
SessionToken *token.SessionToken
BearerToken *token.BearerToken
}
type PutOptions struct {
BaseOptions
ContainerID *container.ID
OwnerID *owner.ID
PrepareObjectOnsite bool
Reader io.Reader
}
type GetOptions struct {
BaseOptions
ObjectAddress *object.Address
Writer io.Writer
}
type SearchOptions struct {
BaseOptions
ContainerID *container.ID
Attribute struct {
Key string
Value string
}
}
type DeleteOptions struct {
BaseOptions
ObjectAddress *object.Address
}
type ObjectClient interface {
Put(context.Context, *PutOptions) (*object.Address, error)
Get(context.Context, *GetOptions) (*object.Object, error)
Search(context.Context, *SearchOptions) ([]*object.ID, error)
Delete(context.Context, *DeleteOptions) error
}
type ClientPlant interface {
GetReusableArtifacts(ctx context.Context) (client.Client, *token.SessionToken, error)
Object() ObjectClient
OwnerID() *owner.ID
}
type neofsObjectClient struct {
key *ecdsa.PrivateKey
pool connections.Pool
}
type neofsClientPlant struct {
key *ecdsa.PrivateKey
ownerID *owner.ID
pool connections.Pool
}
func (cp *neofsClientPlant) GetReusableArtifacts(ctx context.Context) (client.Client, *token.SessionToken, error) {
c := cp.pool.Client()
if c == nil {
return nil, nil, errors.New("failed to peek a healthy node to connect to")
}
st, err := c.CreateSession(ctx, math.MaxUint64)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to create reusable neofs session token")
}
return c, st, nil
}
func (cc *neofsClientPlant) Object() ObjectClient {
return &neofsObjectClient{
key: cc.key,
pool: cc.pool,
}
}
func (cc *neofsClientPlant) OwnerID() *owner.ID {
return cc.ownerID
}
func NewClientPlant(ctx context.Context, pool connections.Pool, creds Credentials) (ClientPlant, error) {
return &neofsClientPlant{key: creds.PrivateKey(), ownerID: creds.Owner(), pool: pool}, nil
}
func (oc *neofsObjectClient) Put(ctx context.Context, options *PutOptions) (*object.Address, error) {
var (
err error
objectID *object.ID
)
address := object.NewAddress()
if options.PrepareObjectOnsite {
rawObject := objectCore.NewRaw()
rawObject.SetContainerID(options.ContainerID)
rawObject.SetOwnerID(options.OwnerID)
ns := newNetworkState(ctx, options.Client)
objectTarget := transformer.NewPayloadSizeLimiter(maxObjectSize, func() transformer.ObjectTarget {
return transformer.NewFormatTarget(&transformer.FormatterParams{
Key: oc.key,
NextTarget: &remoteClientTarget{
ctx: ctx,
client: options.Client,
},
NetworkState: ns,
})
})
if err = ns.LastError(); err != nil {
return nil, err
}
err = objectTarget.WriteHeader(rawObject)
if err != nil {
return nil, err
}
_, err = io.Copy(objectTarget, options.Reader)
if err != nil {
return nil, err
}
var ids *transformer.AccessIdentifiers
ids, err = objectTarget.Close()
if err != nil {
return nil, err
}
address.SetObjectID(ids.SelfID())
} else {
rawObject := object.NewRaw()
rawObject.SetContainerID(options.ContainerID)
rawObject.SetOwnerID(options.OwnerID)
ops := new(client.PutObjectParams).
WithObject(rawObject.Object()).
WithPayloadReader(options.Reader)
objectID, err = options.Client.PutObject(
ctx,
ops,
client.WithSession(options.SessionToken),
client.WithBearer(options.BearerToken),
)
if err != nil {
return nil, err
}
address.SetObjectID(objectID)
}
address.SetContainerID(options.ContainerID)
return address, nil
}
func (oc *neofsObjectClient) Get(ctx context.Context, options *GetOptions) (*object.Object, error) {
var (
err error
obj *object.Object
)
ops := new(client.GetObjectParams).
WithAddress(options.ObjectAddress).
WithPayloadWriter(options.Writer)
obj, err = options.Client.GetObject(
ctx,
ops,
client.WithSession(options.SessionToken),
client.WithBearer(options.BearerToken),
)
return obj, err
}
func (oc *neofsObjectClient) Search(ctx context.Context, options *SearchOptions) ([]*object.ID, error) {
sfs := object.NewSearchFilters()
sfs.AddRootFilter()
sfs.AddFilter(options.Attribute.Key, options.Attribute.Value, object.MatchStringEqual)
sops := new(client.SearchObjectParams)
sops.WithContainerID(options.ContainerID)
sops.WithSearchFilters(sfs)
return options.Client.SearchObject(
ctx,
sops,
client.WithSession(options.SessionToken),
client.WithBearer(options.BearerToken),
)
}
func (oc *neofsObjectClient) Delete(ctx context.Context, options *DeleteOptions) error {
ops := new(client.DeleteObjectParams).WithAddress(options.ObjectAddress)
err := options.Client.DeleteObject(
ctx,
ops,
client.WithSession(options.SessionToken),
client.WithBearer(options.BearerToken),
)
return err
}
type remoteClientTarget struct {
ctx context.Context
client client.Client
object *object.Object
payload []byte
}
func (rct *remoteClientTarget) WriteHeader(raw *objectCore.RawObject) error {
rct.object = raw.Object().SDK()
return nil
}
func (rct *remoteClientTarget) Write(p []byte) (n int, err error) {
rct.payload = append(rct.payload, p...)
return len(p), nil
}
func (rct *remoteClientTarget) Close() (*transformer.AccessIdentifiers, error) {
id, err := rct.client.PutObject(
rct.ctx, new(client.PutObjectParams).
WithObject(rct.object).
WithPayloadReader(bytes.NewReader(rct.payload)),
)
if err != nil {
return nil, err
}
return new(transformer.AccessIdentifiers).WithSelfID(id), nil
}
type networkState struct {
ctx context.Context
client client.Client
lastError error
onError func(error)
}
func newNetworkState(ctx context.Context, client client.Client) *networkState {
ns := &networkState{
ctx: ctx,
client: client,
}
ns.onError = func(err error) { ns.lastError = err }
return ns
}
func (ns *networkState) LastError() error {
return ns.lastError
}
func (ns *networkState) CurrentEpoch() uint64 {
ce, err := ns.client.NetworkInfo(ns.ctx)
if err != nil {
ns.onError(err)
}
return ce.CurrentEpoch()
}