lego/providers/http/frostfs/client.go

276 lines
7.6 KiB
Go
Raw Normal View History

package frostfs
import (
"context"
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"errors"
"fmt"
"math"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
status "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
containerid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
objectid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"github.com/nspcc-dev/neo-go/pkg/encoding/base58"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neo-go/pkg/wallet"
)
// Storage provides barebones API client for a single FrostFS container.
type Storage struct {
endpoint string
container containerid.ID
key *ecdsa.PrivateKey
user user.ID
epoch epochCalc
}
const storageRequestTimeout = 10 * time.Second
// Open FrostFS client for working with a single storage container.
func Open(endpoint, containerID string, key *ecdsa.PrivateKey) (*Storage, error) {
var container containerid.ID
err := container.DecodeString(containerID)
if err != nil {
return nil, fmt.Errorf("invalid container ID: %w", err)
}
var owner user.ID
user.IDFromKey(&owner, key.PublicKey)
return &Storage{
endpoint: endpoint,
key: key,
user: owner,
container: container,
}, nil
}
// Save byte slice to FrostFS object
//
// Large objects must not be created this way. Depending on FrostFS network configuration,
// attempting to send byte slices larger than ~64MB will always return an error.
func (s *Storage) Save(ctx context.Context, data []byte, attr ...string) (oid string, err error) {
attributes, err := keyval2attrs(attr...)
if err != nil {
return "", fmt.Errorf("invalid attributes: %w", err)
}
obj := object.New()
object.InitCreation(obj, object.RequiredFields{
Container: s.container,
Owner: s.user,
})
obj.SetAttributes(attributes...)
obj.SetPayload(data)
obj.SetPayloadSize(uint64(len(data)))
object.CalculateAndSetPayloadChecksum(obj)
err = object.CalculateAndSetID(obj)
if err != nil {
return "", fmt.Errorf("object ID: %w", err)
}
err = object.CalculateAndSetSignature(*s.key, obj)
if err != nil {
return "", fmt.Errorf("signing object: %w", err)
}
if !obj.VerifyIDSignature() {
return "", errors.New("signing object: invalid signature was generated")
}
ctx, cancel := context.WithTimeout(ctx, storageRequestTimeout)
defer cancel()
c, err := s.dial(ctx)
if err != nil {
return "", fmt.Errorf("connecting to storage node: %w", err)
}
put := client.PrmObjectPutSingle{
Key: s.key,
Object: obj,
}
res, err := c.ObjectPutSingle(ctx, put)
if err != nil {
return "", fmt.Errorf("sending object to storage: %w", err)
}
stat := res.Status()
if !status.IsSuccessful(stat) {
return "", fmt.Errorf("saving object to storage: %w", stat.(error))
}
id, _ := obj.ID()
return id.String(), nil
}
// Delete object from container.
func (s *Storage) Delete(ctx context.Context, oid string) error {
var obj objectid.ID
err := obj.DecodeString(oid)
if err != nil {
return fmt.Errorf("invalid object id: %w", err)
}
c, err := s.dial(ctx)
if err != nil {
return fmt.Errorf("connecting to storage node: %w", err)
}
ctx, cancel := context.WithTimeout(ctx, storageRequestTimeout)
defer cancel()
res, err := c.ObjectDelete(ctx, client.PrmObjectDelete{
ContainerID: &s.container,
ObjectID: &obj,
Key: s.key,
})
if err != nil {
return fmt.Errorf("delete request (%s): %w", oid, err)
}
stat := res.Status()
if !status.IsSuccessful(stat) {
return fmt.Errorf("delete object (%s): %w", oid, stat.(error))
}
return nil
}
// Open new connection to FrostFS storage node.
func (s *Storage) dial(ctx context.Context) (*client.Client, error) {
var w client.PrmInit
w.Key = *s.key
var c client.Client
c.Init(w)
var endpoint client.PrmDial
endpoint.Endpoint = s.endpoint
err := c.Dial(ctx, endpoint)
if err != nil {
return nil, err
}
return &c, nil
}
// Compose object attributes slice.
func keyval2attrs(attr ...string) ([]object.Attribute, error) {
if len(attr)%2 != 0 {
return nil, fmt.Errorf("odd number of key-value strings: %d (must be even)", len(attr))
}
attributes := make([]object.Attribute, 0, len(attr)/2)
var current *object.Attribute
for index, text := range attr {
if index%2 == 0 {
current = object.NewAttribute()
current.SetKey(text)
} else {
current.SetValue(text)
attributes = append(attributes, *current)
}
}
return attributes, nil
}
// Load private key from wallet file.
func getKey(walletPath, walletAccount, walletPassword string) (*ecdsa.PrivateKey, error) { //nolint:gocyclo
if walletPath == "" {
key, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
return nil, fmt.Errorf("generating ephemeral key: %w", err)
}
return key, nil
}
// This function intentionally omits calls to w.Close() and account.Close()
// because that would destroy the underlying ecdsa.PrivateKey.
w, err := wallet.NewWalletFromFile(walletPath)
if err != nil {
return nil, err
}
if len(w.Accounts) == 0 {
return nil, fmt.Errorf("no accounts in wallet: %s", walletPath)
}
account := w.Accounts[0]
if walletAccount != "" {
decode, err := base58.CheckDecode(walletAccount) //nolint:govet // err shadow declaration
if err != nil {
return nil, fmt.Errorf("invalid account address: %w", err)
}
if len(decode) != 21 {
return nil, fmt.Errorf("invalid account address length: %d bytes", len(decode))
}
if decode[0] != 0x35 {
return nil, fmt.Errorf("invalid account address first byte: %s -> %#x", walletAccount, decode[0])
}
hash, err := util.Uint160DecodeBytesBE(decode[1:])
if err != nil {
return nil, fmt.Errorf("invalid account hash: %w", err)
}
account = w.GetAccount(hash)
if account == nil {
return nil, fmt.Errorf("account not found: %s", walletAccount)
}
}
if account.PrivateKey() == nil {
err = account.Decrypt(walletPassword, w.Scrypt)
if err != nil {
return nil, fmt.Errorf("failed to decrypt wallet: %w", err)
}
}
key := account.PrivateKey().PrivateKey
return &key, nil
}
// Epoch converts human time value into FrostFS epoch that is expected to be
// current at that time.
//
// Due to nonlinear nature of FrostFS time these calculations are approximate
// for the future and are likely wrong for the past.
func (s *Storage) Epoch(ctx context.Context, t time.Time) (epoch uint64, err error) {
if !s.epoch.Ready() {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, storageRequestTimeout)
defer cancel()
c, err := s.dial(ctx)
if err != nil {
return 0, fmt.Errorf("connecting to storage node: %w", err)
}
res, err := c.NetworkInfo(ctx, client.PrmNetworkInfo{})
if err != nil {
return 0, fmt.Errorf("network info request: %w", err)
}
stat := res.Status()
if !status.IsSuccessful(stat) {
return 0, fmt.Errorf("network info: %w", stat.(error))
}
info := res.Info()
s.epoch = epochCalc{
timestamp: time.Now(),
epoch: info.CurrentEpoch(),
blockPerEpoch: info.EpochDuration(),
msPerBlock: info.MsPerBlock(),
}
}
if !s.epoch.Ready() {
return 0, errors.New("failed to initialize epoch calculator")
}
return s.epoch.At(t), nil
}
type epochCalc struct {
timestamp time.Time
epoch uint64
msPerBlock int64
blockPerEpoch uint64
}
func (e epochCalc) At(t time.Time) uint64 {
return uint64(
int64(e.epoch) +
int64(math.Ceil(
float64(t.Sub(e.timestamp).Milliseconds())/
float64(e.msPerBlock)/
float64(e.blockPerEpoch))))
}
func (e epochCalc) Ready() bool {
return !e.timestamp.IsZero() && e.epoch != 0 && e.msPerBlock != 0 && e.blockPerEpoch != 0
}