lego/providers/http/frostfs/client.go

205 lines
5.5 KiB
Go
Raw Normal View History

package frostfs
import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"fmt"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
status "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
containerid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
objectid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/encoding/base58"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet"
)
// Storage provides barebones API client for a single FrostFS container.
type Storage struct {
endpoint string
container containerid.ID
key *ecdsa.PrivateKey
user user.ID
}
const storageRequestTimeout = 10 * time.Second
// Open FrostFS client for working with a single storage container.
func Open(endpoint, containerID string, key *ecdsa.PrivateKey) (*Storage, error) {
var container containerid.ID
err := container.DecodeString(containerID)
if err != nil {
return nil, fmt.Errorf("invalid container ID: %w", err)
}
var owner user.ID
user.IDFromKey(&owner, key.PublicKey)
return &Storage{
endpoint: endpoint,
key: key,
user: owner,
container: container,
}, nil
}
// Save byte slice to FrostFS object
//
// Large objects must not be created this way. Depending on FrostFS network configuration,
// attempting to send byte slices larger than ~64MB will always return an error.
func (s *Storage) Save(ctx context.Context, data []byte, attr ...string) (oid string, err error) {
attributes, err := keyval2attrs(attr...)
if err != nil {
return "", fmt.Errorf("invalid attributes: %w", err)
}
obj := object.New()
object.InitCreation(obj, object.RequiredFields{
Container: s.container,
Owner: s.user,
})
obj.SetAttributes(attributes...)
obj.SetPayload(data)
obj.SetPayloadSize(uint64(len(data)))
object.CalculateAndSetPayloadChecksum(obj)
err = object.CalculateAndSetID(obj)
if err != nil {
return "", fmt.Errorf("object ID: %w", err)
}
err = object.CalculateAndSetSignature(*s.key, obj)
if err != nil {
return "", fmt.Errorf("signing object: %w", err)
}
ctx, cancel := context.WithTimeout(ctx, storageRequestTimeout)
defer cancel()
c, err := s.dial(ctx)
if err != nil {
return "", fmt.Errorf("connecting to storage node: %w", err)
}
put := client.PrmObjectPutSingle{
Key: s.key,
Object: obj,
}
res, err := c.ObjectPutSingle(ctx, put)
if err != nil {
return "", fmt.Errorf("sending object to storage: %w", err)
}
stat := res.Status()
if !status.IsSuccessful(stat) {
return "", fmt.Errorf("saving object to storage: %w", stat.(error))
}
id, _ := obj.ID()
return id.String(), nil
}
// Delete object from container.
func (s *Storage) Delete(ctx context.Context, oid string) error {
var obj objectid.ID
err := obj.DecodeString(oid)
if err != nil {
return fmt.Errorf("invalid object id: %w", err)
}
c, err := s.dial(ctx)
if err != nil {
return fmt.Errorf("connecting to storage node: %w", err)
}
ctx, cancel := context.WithTimeout(ctx, storageRequestTimeout)
defer cancel()
res, err := c.ObjectDelete(ctx, client.PrmObjectDelete{
ContainerID: &s.container,
ObjectID: &obj,
Key: s.key,
})
if err != nil {
return fmt.Errorf("delete request: %w", err)
}
stat := res.Status()
if !status.IsSuccessful(stat) {
return fmt.Errorf("delete object: %w", stat.(error))
}
return nil
}
// Open new connection to FrostFS storage node.
func (s *Storage) dial(ctx context.Context) (*client.Client, error) {
var w client.PrmInit
w.Key = *s.key
var c client.Client
c.Init(w)
var endpoint client.PrmDial
endpoint.Endpoint = s.endpoint
err := c.Dial(ctx, endpoint)
if err != nil {
return nil, err
}
return &c, nil
}
// Compose object attributes slice.
func keyval2attrs(attr ...string) ([]object.Attribute, error) {
if len(attr)%2 != 0 {
return nil, fmt.Errorf("odd number of key-value strings: %d (must be even)", len(attr))
}
attributes := make([]object.Attribute, 0, len(attr)/2)
var current *object.Attribute
for index, text := range attr {
if index%2 == 0 {
current = object.NewAttribute()
current.SetKey(text)
} else {
current.SetValue(text)
attributes = append(attributes, *current)
}
}
return attributes, nil
}
// Load private key from wallet file.
func getKey(walletPath, walletAccount, walletPassword string) (*ecdsa.PrivateKey, error) {
if walletPath == "" {
key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) // TODO: using ephemeral keys for now, later read from env vars
if err != nil {
return nil, fmt.Errorf("generating ephemeral key: %w", err)
}
return key, nil
}
w, err := wallet.NewWalletFromFile(walletPath)
if err != nil {
return nil, err
}
defer w.Close()
if len(w.Accounts) == 0 {
return nil, fmt.Errorf("no accounts in wallet: %s", walletPath)
}
account := w.Accounts[0]
if walletAccount != "" {
decode, err := base58.CheckDecode(walletAccount) //nolint:govet // err shadow declaration
if err != nil {
return nil, fmt.Errorf("invalid account address: %w", err)
}
hash, err := util.Uint160DecodeBytesBE(decode[1:])
if err != nil {
return nil, fmt.Errorf("invalid account hash: %w", err)
}
account = w.GetAccount(hash)
if account == nil {
return nil, fmt.Errorf("account not found: %s", walletAccount)
}
}
defer account.Close()
err = account.Decrypt(walletPassword, w.Scrypt)
if err != nil {
return nil, fmt.Errorf("failed to decrypt wallet: %w", err)
}
key := account.PrivateKey().PrivateKey
return &key, nil
}