distribution/registry/storage/driver/frostfs/frostfs.go
Roman Loginov ca2da5e23d [#3] Integration with the tree service
Signed-off-by: Roman Loginov <r.loginov@yadro.com>
2024-04-03 16:24:33 +03:00

963 lines
25 KiB
Go

package frostfs
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"io"
"net/http"
"path/filepath"
"regexp"
"sort"
"strconv"
"strings"
"time"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container"
cid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/container/id"
resolver "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/ns"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object"
oid "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/id"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/object/transformer"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool"
treepool "git.frostfs.info/TrueCloudLab/frostfs-sdk-go/pool/tree"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/user"
"git.frostfs.info/TrueCloudLab/frostfs-sdk-go/version"
"github.com/distribution/distribution/v3/internal/dcontext"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
"github.com/distribution/distribution/v3/registry/storage/driver/base"
"github.com/distribution/distribution/v3/registry/storage/driver/factory"
"github.com/distribution/distribution/v3/registry/storage/driver/frostfs/tree"
"github.com/distribution/distribution/v3/registry/storage/driver/frostfs/wrapper"
"github.com/nspcc-dev/neo-go/cli/flags"
"github.com/nspcc-dev/neo-go/pkg/wallet"
)
const (
driverName = "frostfs"
attributeFilePath = "FilePath"
attributeSHAState = "sha256state"
)
const (
paramPeers = "peers"
paramAddress = "address"
paramWeight = "weight"
paramPriority = "priority"
paramWallet = "wallet"
paramPath = "path"
paramPassword = "password"
paramContainer = "container"
paramConnectionTimeout = "connection_timeout"
paramRequestTimeout = "request_timeout"
paramRebalanceInterval = "rebalance_interval"
paramSessionExpirationDuration = "session_expiration_duration"
paramRPCEndpoint = "rpc_endpoint"
defaultConnectionTimeout = 4 * time.Second
defaultRequestTimeout = 4 * time.Second
defaultRebalanceInterval = 20 * time.Second
defaultSessionExpirationDuration = 100 // in epoch
)
var (
filePathValidateRegex = regexp.MustCompile(`^/([^/]+/)*[^/]+$`)
prefixPathValidateRegex = regexp.MustCompile(`^/([^/]+/?)*$`)
)
// DriverParameters is a struct that encapsulates all of the driver parameters after all values have been set.
type DriverParameters struct {
ContainerID string
Peers []*PeerInfo
Wallet *Wallet
ConnectionTimeout time.Duration
RequestTimeout time.Duration
RebalanceInterval time.Duration
SessionExpirationDuration uint64
RPCEndpoint string
}
// Wallet contains params to get key from wallet.
type Wallet struct {
Path string
Password string
Address string
}
// PeerInfo contains node params.
type PeerInfo struct {
Address string
Weight float64
Priority int
}
func init() {
factory.Register(driverName, &frostfsDriverFactory{})
}
type frostfsDriverFactory struct{}
func (n *frostfsDriverFactory) Create(_ context.Context, parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
return FromParameters(parameters)
}
type driver struct {
sdkPool *pool.Pool
treeService *tree.Tree
owner *user.ID
key *ecdsa.PrivateKey
containerID cid.ID
maxSize uint64
}
type baseEmbed struct {
base.Base
}
// Driver is a storagedriver.StorageDriver implementation backed by FrostFS
// Objects are stored at absolute keys in the provided container.
type Driver struct {
baseEmbed
}
// FromParameters constructs a new Driver with a given parameters map
// Required parameters:
// - peers
// - wallet
// Optional Parameters:
// - connection_timeout
// - request_timeout
// - rebalance_interval
// - session_expiration_duration
// - rpc_endpoint
func FromParameters(parameters map[string]interface{}) (storagedriver.StorageDriver, error) {
peers, err := parsePeers(parameters)
if err != nil {
return nil, err
}
walletInfo, err := parseWallet(parameters)
if err != nil {
return nil, err
}
containerID, ok := parameters[paramContainer].(string)
if !ok {
return nil, fmt.Errorf("no container provided")
}
var rpcEndpoint string
rpcEndpointParam := parameters[paramRPCEndpoint]
if rpcEndpointParam != nil {
if rpcEndpoint, ok = rpcEndpointParam.(string); !ok {
return nil, fmt.Errorf("invalid rpc_endpoint param")
}
}
connectionTimeout, err := parseTimeout(parameters, paramConnectionTimeout, defaultConnectionTimeout)
if err != nil {
return nil, err
}
requestTimeout, err := parseTimeout(parameters, paramRequestTimeout, defaultRequestTimeout)
if err != nil {
return nil, err
}
rebalanceInterval, err := parseTimeout(parameters, paramRebalanceInterval, defaultRebalanceInterval)
if err != nil {
return nil, err
}
expiration, err := parseUInt64(parameters, paramSessionExpirationDuration, defaultSessionExpirationDuration)
if err != nil {
return nil, err
}
params := DriverParameters{
Peers: peers,
ContainerID: containerID,
Wallet: walletInfo,
ConnectionTimeout: connectionTimeout,
RequestTimeout: requestTimeout,
RebalanceInterval: rebalanceInterval,
SessionExpirationDuration: expiration,
RPCEndpoint: rpcEndpoint,
}
return New(params)
}
func parseWallet(parameters map[string]interface{}) (*Wallet, error) {
walletInfo := new(Wallet)
walletParams, ok := parameters[paramWallet].(map[interface{}]interface{})
if !ok {
return nil, fmt.Errorf("no wallet params provided")
}
walletInfo.Path, ok = walletParams[paramPath].(string)
if !ok {
return nil, fmt.Errorf("no path provided")
}
walletInfo.Password, ok = walletParams[paramPassword].(string)
if !ok {
return nil, fmt.Errorf("no password provided")
}
addressParam := walletParams[paramAddress]
if addressParam != nil {
if walletInfo.Address, ok = addressParam.(string); !ok {
return nil, fmt.Errorf("invalid address param")
}
}
return walletInfo, nil
}
func parsePeers(parameters map[string]interface{}) ([]*PeerInfo, error) {
poolParams, ok := parameters[paramPeers].(map[interface{}]interface{})
if !ok {
return nil, fmt.Errorf("no peers params provided")
}
var peers []*PeerInfo
for _, val := range poolParams {
peerInfo, ok := val.(map[interface{}]interface{})
if !ok {
return nil, fmt.Errorf("invalid peers params")
}
peer := new(PeerInfo)
peer.Address, ok = peerInfo[paramAddress].(string)
if !ok {
return nil, fmt.Errorf("invalid peer address")
}
weightParam := peerInfo[paramWeight]
if weightParam != nil {
switch weight := weightParam.(type) {
case int:
peer.Weight = float64(weight)
case float64:
peer.Weight = weight
default:
return nil, fmt.Errorf("invalid weight param")
}
if peer.Weight <= 0 {
peer.Weight = 1
}
}
priorityParam := peerInfo[paramPriority]
if priorityParam != nil {
if peer.Priority, ok = priorityParam.(int); !ok {
return nil, fmt.Errorf("invalid priority param")
} else if peer.Priority <= 0 {
peer.Priority = 1
}
}
peers = append(peers, peer)
}
return peers, nil
}
func parseTimeout(parameters map[string]interface{}, name string, defaultValue time.Duration) (time.Duration, error) {
timeoutValue := parameters[name]
if timeoutValue == nil {
return defaultValue, nil
}
switch val := timeoutValue.(type) {
case int:
return time.Duration(val), nil
case int64:
return time.Duration(val), nil
case string:
timeout, err := time.ParseDuration(val)
if err != nil {
return 0, fmt.Errorf("couldn't parse duration '%s': %w", val, err)
}
return timeout, nil
}
return 0, fmt.Errorf("invalid %s", name)
}
func parseUInt64(parameters map[string]interface{}, name string, defaultValue uint64) (uint64, error) {
expirationValue := parameters[name]
if expirationValue == nil {
return defaultValue, nil
}
switch val := expirationValue.(type) {
case int:
return uint64(val), nil
case int64:
return uint64(val), nil
}
return 0, fmt.Errorf("invalid %s", name)
}
// New constructs a new Driver with the given FrostFS params
func New(params DriverParameters) (*Driver, error) {
ctx := context.Background()
acc, err := getAccount(params.Wallet)
if err != nil {
return nil, err
}
var owner user.ID
user.IDFromKey(&owner, acc.PrivateKey().PrivateKey.PublicKey)
sdkPool, treePool, err := createPool(ctx, acc, params)
if err != nil {
return nil, fmt.Errorf("couldn't create sdk pool: %w", err)
}
maxObjectSize, err := getMaxObjectSize(ctx, sdkPool)
if err != nil {
return nil, fmt.Errorf("couldn't get max object size: %w", err)
}
cnrID, err := getContainerID(params)
if err != nil {
return nil, fmt.Errorf("couldn't get container id: %w", err)
}
treeService := wrapper.NewPoolWrapper(treePool)
d := &driver{
sdkPool: sdkPool,
treeService: tree.NewTree(treeService, nil),
owner: &owner,
key: &acc.PrivateKey().PrivateKey,
containerID: cnrID,
maxSize: maxObjectSize,
}
return &Driver{
baseEmbed: baseEmbed{
Base: base.Base{
StorageDriver: d,
},
},
}, nil
}
func getMaxObjectSize(ctx context.Context, sdkPool *pool.Pool) (uint64, error) {
networkInfo, err := sdkPool.NetworkInfo(ctx)
if err != nil {
return 0, fmt.Errorf("couldn't get connection: %w", err)
}
maxObjectSize := networkInfo.MaxObjectSize()
if maxObjectSize == 0 {
return 0, fmt.Errorf("max object size must not be zero")
}
return maxObjectSize, nil
}
func getContainerID(params DriverParameters) (cid.ID, error) {
var (
cnrID cid.ID
domain container.Domain
)
if err := cnrID.DecodeString(params.ContainerID); err == nil {
return cnrID, nil
}
nnsResolver, err := createNnsResolver(params)
if err != nil {
return cid.ID{}, fmt.Errorf("couldn't create nns resolver: %w", err)
}
domain.SetName(params.ContainerID)
if cnrID, err = nnsResolver.ResolveContainerDomain(domain); err != nil {
return cid.ID{}, fmt.Errorf("couldn't resolve container name '%s': %w", params.ContainerID, err)
}
return cnrID, nil
}
func createPool(ctx context.Context, acc *wallet.Account, param DriverParameters) (*pool.Pool, *treepool.Pool, error) {
var prm pool.InitParameters
var prmTree treepool.InitParameters
prm.SetKey(&acc.PrivateKey().PrivateKey)
prmTree.SetKey(acc.PrivateKey())
prm.SetNodeDialTimeout(param.ConnectionTimeout)
prmTree.SetNodeDialTimeout(param.ConnectionTimeout)
prm.SetHealthcheckTimeout(param.RequestTimeout)
prmTree.SetHealthcheckTimeout(param.RequestTimeout)
prm.SetClientRebalanceInterval(param.RebalanceInterval)
prmTree.SetClientRebalanceInterval(param.RebalanceInterval)
prm.SetSessionExpirationDuration(param.SessionExpirationDuration)
for _, peer := range param.Peers {
nodeParam := pool.NewNodeParam(peer.Priority, peer.Address, peer.Weight)
prm.AddNode(nodeParam)
prmTree.AddNode(nodeParam)
}
p, err := pool.NewPool(prm)
if err != nil {
return nil, nil, fmt.Errorf("create pool: %w", err)
}
if err = p.Dial(ctx); err != nil {
return nil, nil, fmt.Errorf("dial pool: %w", err)
}
treePool, err := treepool.NewPool(prmTree)
if err != nil {
return nil, nil, fmt.Errorf("create tree pool: %w", err)
}
if err = treePool.Dial(ctx); err != nil {
return nil, nil, fmt.Errorf("dial tree pool: %w", err)
}
return p, treePool, nil
}
func createNnsResolver(params DriverParameters) (*resolver.NNS, error) {
if params.RPCEndpoint == "" {
return nil, fmt.Errorf("empty rpc endpoind")
}
var nns resolver.NNS
if err := nns.Dial(params.RPCEndpoint); err != nil {
return nil, fmt.Errorf("dial nns resolver: %w", err)
}
return &nns, nil
}
func getAccount(walletInfo *Wallet) (*wallet.Account, error) {
w, err := wallet.NewWalletFromFile(walletInfo.Path)
if err != nil {
return nil, err
}
addr := w.GetChangeAddress()
if walletInfo.Address != "" {
addr, err = flags.ParseAddress(walletInfo.Address)
if err != nil {
return nil, fmt.Errorf("invalid address")
}
}
acc := w.GetAccount(addr)
err = acc.Decrypt(walletInfo.Password, w.Scrypt)
if err != nil {
return nil, err
}
return acc, nil
}
func (d *driver) objectAddress(objID oid.ID) oid.Address {
var addr oid.Address
addr.SetContainer(d.containerID)
addr.SetObject(objID)
return addr
}
func (d *driver) formObject(path string) *object.Object {
attrFilePath := object.NewAttribute()
attrFilePath.SetKey(attributeFilePath)
attrFilePath.SetValue(path)
attrFileName := object.NewAttribute()
attrFileName.SetKey(object.AttributeFileName)
attrFileName.SetValue(filepath.Base(path))
attrTimestamp := object.NewAttribute()
attrTimestamp.SetKey(object.AttributeTimestamp)
attrTimestamp.SetValue(strconv.FormatInt(time.Now().UTC().Unix(), 10))
obj := object.New()
obj.SetOwnerID(d.owner)
obj.SetContainerID(d.containerID)
obj.SetAttributes(*attrFilePath, *attrFileName, *attrTimestamp)
return obj
}
func (d *driver) Name() string {
return driverName
}
func (d *driver) GetContent(ctx context.Context, path string) ([]byte, error) {
if ok := validateFilePath(path); !ok {
return nil, storagedriver.InvalidPathError{Path: path}
}
treeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, path)
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
}
return nil, fmt.Errorf("couldn't get object from tree: %w", err)
}
var prm pool.PrmObjectGet
prm.SetAddress(d.objectAddress(treeObj.ObjID))
obj, err := d.sdkPool.GetObject(ctx, prm)
if err != nil {
return nil, fmt.Errorf("couldn't get object '%s': %w", treeObj.ObjID, err)
}
return io.ReadAll(obj.Payload)
}
func (d *driver) PutContent(ctx context.Context, path string, content []byte) error {
if ok := validateFilePath(path); !ok {
return storagedriver.InvalidPathError{Path: path}
}
if err := d.Delete(ctx, path); err != nil {
return fmt.Errorf("couldn't delete '%s': %s", path, err)
}
obj := d.formObject(path)
obj.SetPayload(content)
var prm pool.PrmObjectPut
prm.SetHeader(*obj)
id, err := d.sdkPool.PutObject(ctx, prm)
if err != nil {
return fmt.Errorf("couldn't put object '%s': %w", path, err)
}
_, err = d.treeService.AddObject(ctx, d.containerID, path, id, uint64(len(content)))
if err != nil {
return fmt.Errorf("can't add object to tree: %w", err)
}
return nil
}
func (d *driver) Reader(ctx context.Context, path string, offset int64) (io.ReadCloser, error) {
if ok := validateFilePath(path); !ok {
return nil, storagedriver.InvalidPathError{Path: path}
}
treeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, path)
if err != nil {
return nil, fmt.Errorf("couldn't get object from tree: %w", err)
}
addr := d.objectAddress(treeObj.ObjID)
if uint64(offset) >= treeObj.PayloadSize {
return nil, fmt.Errorf("invalid offset %d for object length %d", offset, treeObj.PayloadSize)
}
length := treeObj.PayloadSize - uint64(offset)
var prmRange pool.PrmObjectRange
prmRange.SetAddress(addr)
prmRange.SetOffset(uint64(offset))
prmRange.SetLength(length)
res, err := d.sdkPool.ObjectRange(ctx, prmRange)
if err != nil {
return nil, fmt.Errorf("couldn't get payload range of object '%s', offset %d, length %d, id '%s': %w",
path, offset, length, treeObj.ObjID, err)
}
return &res, nil
}
func getUploadUUID(ctx context.Context) (uuid string) {
return dcontext.GetStringValue(ctx, "vars.uuid")
}
func (d *driver) Writer(ctx context.Context, path string, append bool) (storagedriver.FileWriter, error) {
if ok := validateFilePath(path); !ok {
return nil, storagedriver.InvalidPathError{Path: path}
}
splitID := object.NewSplitID()
uploadUUID := getUploadUUID(ctx)
if err := splitID.Parse(uploadUUID); err != nil {
return nil, fmt.Errorf("couldn't parse split id as upload uuid '%s': %w", uploadUUID, err)
}
parts, noChild, err := d.headSplitParts(ctx, splitID, path, append)
if err != nil {
return nil, fmt.Errorf("couldn't search split parts '%s': %w", path, err)
}
splitInfo := object.NewSplitInfo()
splitInfo.SetSplitID(splitID)
for _, obj := range parts {
if obj.Parent() != nil {
return nil, fmt.Errorf("object already exist '%s'", path)
}
prevID, _ := obj.PreviousID()
delete(noChild, prevID)
}
if len(noChild) > 1 {
return nil, fmt.Errorf("couldn't find last part '%s'", path)
}
for lastPartID := range noChild {
splitInfo.SetLastPart(lastPartID)
break
}
wrtr, err := newSizeLimiterWriter(ctx, d, path, splitInfo, parts)
if err != nil {
return nil, fmt.Errorf("couldn't init size limiter writer: %w", err)
}
return wrtr, nil
}
func (d *driver) Stat(ctx context.Context, path string) (storagedriver.FileInfo, error) {
if path == "/" { // healthcheck
if _, err := d.sdkPool.NetworkInfo(ctx); err != nil {
return nil, fmt.Errorf("healthcheck failed: %w", err)
}
return newFileInfoDir(path), nil
}
if ok := validatePrefixPath(path); !ok {
return nil, storagedriver.InvalidPathError{Path: path}
}
path = preparePrefixPath(path)
treeObj, err := d.treeService.GetObjectByPathDir(ctx, d.containerID, path)
if err != nil {
if errors.Is(err, tree.ErrNodeNotFound) {
return nil, storagedriver.PathNotFoundError{Path: path, DriverName: driverName}
} else if errors.Is(err, tree.ErrOnlyDirFound) {
return newFileInfoDir(path), nil
}
return nil, fmt.Errorf("couldn't get object from tree: %w", err)
}
fileInfo := newFileInfo(*treeObj, "")
return fileInfo, nil
}
func (d *driver) List(ctx context.Context, path string) ([]string, error) {
if ok := validatePrefixPath(path); !ok {
return nil, storagedriver.InvalidPathError{Path: path}
}
path = preparePrefixPath(path)
treeObjList, err := d.treeService.GetListObjectByPrefix(ctx, d.containerID, path)
if err != nil {
return nil, fmt.Errorf("couldn't get object from tree by prefix '%s': %w", path, err)
}
added := make(map[string]bool)
result := make([]string, 0, len(treeObjList))
for _, treeObj := range treeObjList {
fileInf := newFileInfo(treeObj, path)
if !added[fileInf.Path()] {
result = append(result, fileInf.Path())
added[fileInf.Path()] = true
}
}
sort.Strings(result)
return result, nil
}
func (d *driver) Move(ctx context.Context, sourcePath string, destPath string) error {
if ok := validateFilePath(sourcePath); !ok {
return storagedriver.InvalidPathError{Path: sourcePath}
}
if ok := validateFilePath(destPath); !ok {
return storagedriver.InvalidPathError{Path: destPath}
}
sourceTreeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, sourcePath)
if err != nil {
return fmt.Errorf("couldn't get object from tree: %w", err)
}
_, err = d.treeService.AddObject(ctx, d.containerID, destPath, sourceTreeObj.ObjID, sourceTreeObj.PayloadSize)
if err != nil {
return fmt.Errorf("can't add object to tree: %w", err)
}
if err = d.treeService.DeleteObject(ctx, d.containerID, sourceTreeObj.ID); err != nil {
return fmt.Errorf("couldn't delete object from tree: %w", err)
}
return nil
}
func (d *driver) Delete(ctx context.Context, path string) error {
if ok := validatePrefixPath(path); !ok {
return storagedriver.InvalidPathError{Path: path}
}
path = preparePrefixPath(path)
treeObjList, err := d.treeService.GetListObjectByPrefix(ctx, d.containerID, path)
if err != nil {
return fmt.Errorf("couldn't get object from tree by prefix '%s': %w", path, err)
}
rootTreeObj, err := d.treeService.GetObjectByPath(ctx, d.containerID, path)
if err != nil {
if !errors.Is(err, tree.ErrNodeNotFound) {
return fmt.Errorf("couldn't get object from tree: %w", err)
}
} else {
treeObjList = append(treeObjList, *rootTreeObj)
}
for _, treeObj := range treeObjList {
if err := d.delete(ctx, treeObj.ObjID); err != nil {
return fmt.Errorf("error in delete object oid: %s : %w", treeObj.ObjID, err)
}
if err = d.treeService.DeleteObject(ctx, d.containerID, treeObj.ID); err != nil {
return fmt.Errorf("couldn't delete object from tree: %w", err)
}
}
return nil
}
func (d *driver) delete(ctx context.Context, id oid.ID) error {
var prm pool.PrmObjectDelete
prm.SetAddress(d.objectAddress(id))
if err := d.sdkPool.DeleteObject(ctx, prm); err != nil {
return fmt.Errorf("couldn't delete object '%s': %w", id, err)
}
return nil
}
func (d *driver) RedirectURL(_ *http.Request, _ string) (string, error) {
return "", nil
}
func (d *driver) Walk(ctx context.Context, path string, f storagedriver.WalkFn, options ...func(*storagedriver.WalkOptions)) error {
return storagedriver.WalkFallback(ctx, d, path, f, options...)
}
func (d *driver) headSplitParts(ctx context.Context, splitID *object.SplitID, path string, isAppend bool) ([]*object.Object, map[oid.ID]struct{}, error) {
ids, err := d.treeService.GetListOIDBySplitID(ctx, d.containerID, path, splitID)
if err != nil {
return nil, nil, fmt.Errorf("couldn't get object from tree by split id '%s': %w", path, err)
}
var (
addr oid.Address
prmHead pool.PrmObjectHead
objects []*object.Object
)
addr.SetContainer(d.containerID)
noChild := make(map[oid.ID]struct{})
for _, objID := range ids {
addr.SetObject(objID)
prmHead.SetAddress(addr)
obj, err := d.sdkPool.HeadObject(ctx, prmHead)
if err != nil {
return nil, nil, fmt.Errorf("couldn't head object part '%s', id '%s', splitID '%s': %w", path, objID, splitID, err)
}
if isAppend {
objects = append(objects, &obj)
noChild[objID] = struct{}{}
continue
}
return nil, nil, fmt.Errorf("init upload part '%s' already exist, splitID '%s'", path, splitID)
}
return objects, noChild, nil
}
func newFileInfo(treeObj tree.TreeNode, prefix string) storagedriver.FileInfo {
fileInfoFields := storagedriver.FileInfoFields{
Path: treeObj.FullPath,
Size: int64(treeObj.PayloadSize),
ModTime: treeObj.ModificationTime,
}
if len(prefix) > 0 {
tail := strings.TrimPrefix(fileInfoFields.Path, prefix)
if len(tail) > 0 {
index := strings.Index(tail[1:], "/")
if index >= 0 {
fileInfoFields.IsDir = true
fileInfoFields.Path = prefix + tail[:index+1]
}
}
}
return storagedriver.FileInfoInternal{FileInfoFields: fileInfoFields}
}
func newFileInfoDir(path string) storagedriver.FileInfo {
return storagedriver.FileInfoInternal{
FileInfoFields: storagedriver.FileInfoFields{
Path: path,
ModTime: time.Now(),
IsDir: true,
},
}
}
func validatePrefixPath(path string) bool {
return prefixPathValidateRegex.MatchString(path)
}
func validateFilePath(path string) bool {
return filePathValidateRegex.MatchString(path)
}
func preparePrefixPath(path string) string {
if strings.HasSuffix(path, "/") {
return path[:len(path)-1]
}
return path
}
func (d *driver) newObjTarget(path string, splitID *object.SplitID) transformer.ChunkedObjectWriter {
return &objTarget{
sdkPool: d.sdkPool,
treeService: d.treeService,
splitID: splitID,
key: d.key,
path: path,
}
}
type objTarget struct {
sdkPool *pool.Pool
treeService *tree.Tree
path string
splitID *object.SplitID
key *ecdsa.PrivateKey
obj *object.Object
chunks [][]byte
}
func (t *objTarget) WriteHeader(_ context.Context, obj *object.Object) error {
t.obj = obj
return nil
}
func (t *objTarget) Write(_ context.Context, p []byte) (n int, err error) {
t.chunks = append(t.chunks, p)
return len(p), nil
}
func (t *objTarget) Close(ctx context.Context) (*transformer.AccessIdentifiers, error) {
networkInfo, err := t.sdkPool.NetworkInfo(ctx)
if err != nil {
return nil, fmt.Errorf("couldn't get network info: %w", err)
}
sz := 0
for i := range t.chunks {
sz += len(t.chunks[i])
}
ver := version.Current()
currEpoch := networkInfo.CurrentEpoch()
t.obj.SetPayloadSize(uint64(sz))
t.obj.SetVersion(&ver)
t.obj.SetCreationEpoch(currEpoch)
var (
parID oid.ID
parHdr *object.Object
isCommit bool
sizeFinalObject uint64
)
if par := t.obj.Parent(); par != nil && par.Signature() == nil {
isCommit = true
objPar := object.NewFromV2(par.ToV2())
objPar.SetCreationEpoch(currEpoch)
if err := object.SetIDWithSignature(*t.key, objPar); err != nil {
return nil, fmt.Errorf("could not finalize parent object: %w", err)
}
parID, _ = objPar.ID()
sizeFinalObject = objPar.PayloadSize()
t.obj.SetParent(objPar)
}
if err = object.SetIDWithSignature(*t.key, t.obj); err != nil {
return nil, fmt.Errorf("could not finalize object: %w", err)
}
payload := make([]byte, 0, sz)
for i := range t.chunks {
payload = append(payload, t.chunks[i]...)
}
t.obj.SetPayload(payload)
var prm pool.PrmObjectPut
prm.SetHeader(*t.obj)
id, err := t.sdkPool.PutObject(ctx, prm)
if err != nil {
return nil, fmt.Errorf("couldn't put part: %w", err)
}
containerID, ok := t.obj.ContainerID()
if !ok {
return nil, fmt.Errorf("error in get container id from object")
}
if isCommit {
_, err := t.treeService.AddObject(ctx, containerID, t.path, parID, sizeFinalObject)
if err != nil {
return nil, fmt.Errorf("can't add object to tree: %w", err)
}
}
_, err = t.treeService.AddPHYObject(ctx, containerID, t.path, id, t.splitID)
if err != nil {
return nil, fmt.Errorf("can't add phy object to tree: %w", err)
}
objID, _ := t.obj.ID()
return &transformer.AccessIdentifiers{
ParentID: &parID,
SelfID: objID,
ParentHeader: parHdr,
}, nil
}