forked from TrueCloudLab/lego
159 lines
4.1 KiB
Go
159 lines
4.1 KiB
Go
|
package frostfs
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"crypto/ecdsa"
|
||
|
"fmt"
|
||
|
"time"
|
||
|
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client"
|
||
|
status "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/client/status"
|
||
|
containerid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
|
||
|
objectid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
|
||
|
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
|
||
|
)
|
||
|
|
||
|
// Barebones API client for a single FrostFS container
|
||
|
type Storage struct {
|
||
|
endpoint string
|
||
|
container containerid.ID
|
||
|
key *ecdsa.PrivateKey
|
||
|
user user.ID
|
||
|
}
|
||
|
|
||
|
const storageRequestTimeout = 10 * time.Second
|
||
|
|
||
|
// Create FrostFS client for working with a single storage container
|
||
|
func Open(endpoint, containerID string, key *ecdsa.PrivateKey) (*Storage, error) {
|
||
|
var container containerid.ID
|
||
|
err := container.DecodeString(containerID)
|
||
|
if err != nil {
|
||
|
return nil, fmt.Errorf("invalid container ID: %w", err)
|
||
|
}
|
||
|
var owner user.ID
|
||
|
user.IDFromKey(&owner, key.PublicKey)
|
||
|
return &Storage{
|
||
|
endpoint: endpoint,
|
||
|
key: key,
|
||
|
user: owner,
|
||
|
container: container,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
// Save byte slice to FrostFS object
|
||
|
//
|
||
|
// Large objects must not be created this way. Depending on FrostFS network configuration,
|
||
|
// attempting to send byte slices larger than ~64MB will always return an error.
|
||
|
func (s *Storage) Save(ctx context.Context, data []byte, attr ...string) (oid string, err error) {
|
||
|
attributes, err := keyval2attrs(attr...)
|
||
|
if err != nil {
|
||
|
return "", fmt.Errorf("invalid attributes: %w", err)
|
||
|
}
|
||
|
|
||
|
obj := object.New()
|
||
|
object.InitCreation(obj, object.RequiredFields{
|
||
|
Container: s.container,
|
||
|
Owner: s.user,
|
||
|
})
|
||
|
obj.SetAttributes(attributes...)
|
||
|
obj.SetPayload(data)
|
||
|
obj.SetPayloadSize(uint64(len(data)))
|
||
|
object.CalculateAndSetPayloadChecksum(obj)
|
||
|
err = object.CalculateAndSetID(obj)
|
||
|
if err != nil {
|
||
|
return "", fmt.Errorf("object ID: %w", err)
|
||
|
}
|
||
|
err = object.CalculateAndSetSignature(*s.key, obj)
|
||
|
if err != nil {
|
||
|
return "", fmt.Errorf("signing object: %w", err)
|
||
|
}
|
||
|
|
||
|
ctx, cancel := context.WithTimeout(ctx, storageRequestTimeout)
|
||
|
defer cancel()
|
||
|
|
||
|
c, err := s.dial(ctx)
|
||
|
if err != nil {
|
||
|
return "", fmt.Errorf("connecting to storage node: %w", err)
|
||
|
}
|
||
|
put := client.PrmObjectPutSingle{
|
||
|
Key: s.key,
|
||
|
Object: obj,
|
||
|
}
|
||
|
res, err := c.ObjectPutSingle(ctx, put)
|
||
|
if err != nil {
|
||
|
return "", fmt.Errorf("sending object to storage: %w", err)
|
||
|
}
|
||
|
stat := res.Status()
|
||
|
if !status.IsSuccessful(stat) {
|
||
|
return "", fmt.Errorf("saving object to storage: %w", stat.(error))
|
||
|
}
|
||
|
id, _ := obj.ID()
|
||
|
return fmt.Sprint(id), nil
|
||
|
}
|
||
|
|
||
|
// Delete object from container
|
||
|
func (s *Storage) Delete(ctx context.Context, oid string) error {
|
||
|
var obj objectid.ID
|
||
|
err := obj.DecodeString(oid)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("invalid object id: %w", err)
|
||
|
}
|
||
|
c, err := s.dial(ctx)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("connecting to storage node: %w", err)
|
||
|
}
|
||
|
ctx, cancel := context.WithTimeout(ctx, storageRequestTimeout)
|
||
|
defer cancel()
|
||
|
res, err := c.ObjectDelete(ctx, client.PrmObjectDelete{
|
||
|
ContainerID: &s.container,
|
||
|
ObjectID: &obj,
|
||
|
Key: s.key,
|
||
|
})
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("delete request: %w", err)
|
||
|
}
|
||
|
stat := res.Status()
|
||
|
if !status.IsSuccessful(stat) {
|
||
|
return fmt.Errorf("delete object: %w", stat.(error))
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Open new connection to FrostFS storage node
|
||
|
func (s *Storage) dial(ctx context.Context) (*client.Client, error) {
|
||
|
var wallet client.PrmInit
|
||
|
wallet.Key = *s.key
|
||
|
|
||
|
var c client.Client
|
||
|
c.Init(wallet)
|
||
|
|
||
|
var endpoint client.PrmDial
|
||
|
endpoint.Endpoint = s.endpoint
|
||
|
|
||
|
err := c.Dial(ctx, endpoint)
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
return &c, nil
|
||
|
}
|
||
|
|
||
|
// Compose object attributes slice
|
||
|
func keyval2attrs(attr ...string) ([]object.Attribute, error) {
|
||
|
if len(attr)%2 != 0 {
|
||
|
return nil, fmt.Errorf("odd number of key-value strings: %d (must be even)", len(attr))
|
||
|
}
|
||
|
attributes := make([]object.Attribute, 0, len(attr)/2)
|
||
|
var current *object.Attribute
|
||
|
for index, text := range attr {
|
||
|
if index%2 == 0 {
|
||
|
current = object.NewAttribute()
|
||
|
current.SetKey(text)
|
||
|
} else {
|
||
|
current.SetValue(text)
|
||
|
attributes = append(attributes, *current)
|
||
|
}
|
||
|
}
|
||
|
return attributes, nil
|
||
|
}
|