Implemented neofs-adm lib

Signed-off-by: Vladimir Avdeev <v.avdeev@yadro.com>
This commit is contained in:
Vladimir Avdeev 2022-08-29 17:36:27 +03:00 committed by Vladimir Avdeev
parent b6a451dc8d
commit f40111dc4a
24 changed files with 662 additions and 28 deletions

View file

@ -0,0 +1,2 @@
from .adm import CompletionType, NeofsAdm
from .cli import NeofsCli

View file

@ -0,0 +1,2 @@
from .adm import NeofsAdm
from .completion_type import CompletionType

View file

@ -0,0 +1,36 @@
from typing import Optional
from common import NEOFS_ADM_EXEC
from .completion import NeofsAdmCompletion
from .config import NeofsAdmConfig
from .gendoc import NeofsAdmGenDoc
from .morph import NeofsAdmMorph
from .subnet import NeofsAdmMorphSubnet
from .storage_config import NeofsAdmStorageConfig
from .version import NeofsAdmVersion
class NeofsAdm:
neofs_adm_exec_path: Optional[str] = None
config_file: Optional[str] = None
completion: Optional[NeofsAdmCompletion] = None
config: Optional[NeofsAdmConfig] = None
gendoc: Optional[NeofsAdmGenDoc] = None
morph: Optional[NeofsAdmMorph] = None
subnet: Optional[NeofsAdmMorphSubnet] = None
storage_config: Optional[NeofsAdmStorageConfig] = None
version: Optional[NeofsAdmVersion] = None
def __init__(self, neofs_adm_exec_path: Optional[str] = None, config_file: Optional[str] = None, timeout: int = 30):
self.config_file = config_file
self.neofs_adm_exec_path = neofs_adm_exec_path or NEOFS_ADM_EXEC
self.completion = NeofsAdmCompletion(self.neofs_adm_exec_path, timeout=timeout, config=config_file)
self.config = NeofsAdmConfig(self.neofs_adm_exec_path, timeout=timeout, config=config_file)
self.gendoc = NeofsAdmGenDoc(self.neofs_adm_exec_path, timeout=timeout, config=config_file)
self.morph = NeofsAdmMorph(self.neofs_adm_exec_path, timeout=timeout, config=config_file)
self.subnet = NeofsAdmMorphSubnet(self.neofs_adm_exec_path, timeout=timeout, config=config_file)
self.storage_config = NeofsAdmStorageConfig(self.neofs_adm_exec_path, timeout=timeout, config=config_file)
self.version = NeofsAdmVersion(self.neofs_adm_exec_path, timeout=timeout, config=config_file)

View file

@ -0,0 +1,30 @@
from cli_utils.cli_command import NeofsCliCommand
from .completion_type import CompletionType
class NeofsAdmCompletion(NeofsCliCommand):
def get(self, completion_type: CompletionType = CompletionType.FISH) -> str:
"""To load completions:
Bash:
$ source <(neofs-adm completion bash)
Zsh:
If shell completion is not already enabled in your environment you will need
to enable it. You can execute the following once:
$ echo "autoload -U compinit; compinit" >> ~/.zshrc
You will need to start a new shell for this setup to take effect.
Fish:
$ neofs-adm completion fish | source
Args:
completion_type (CompletionType): Select completion type (default: Fish)
Returns:
str: Command string
"""
return self._execute('completion ' + completion_type.value)

View file

@ -0,0 +1,8 @@
from enum import Enum
class CompletionType(Enum):
BASH = 'bash'
ZHS = 'zsh'
FISH = 'fish'
POWERSHELL = 'powershell'

View file

@ -0,0 +1,19 @@
from cli_utils.cli_command import NeofsCliCommand
class NeofsAdmConfig(NeofsCliCommand):
def init(self, path: str = '~/.neofs/adm/config.yml') -> str:
"""Initialize basic neofs-adm configuration file.
Args:
path (str): path to config (default ~/.neofs/adm/config.yml)
Returns:
str: Command string
"""
return self._execute(
'config init',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)

View file

@ -0,0 +1,34 @@
from typing import Optional
from cli_utils.cli_command import NeofsCliCommand
class NeofsAdmGenDoc(NeofsCliCommand):
def get(self, doc_file: str, depth: int = 1, doc_type: str = 'md', extension: Optional[str] = None) -> str:
"""Generate documentation for this command. If the template is not provided,
builtin cobra generator is used and each subcommand is placed in
a separate file in the same directory.
The last optional argument specifies the template to use with text/template.
In this case there is a number of helper functions which can be used:
replace STR FROM TO -- same as strings.ReplaceAll
join ARRAY SEPARATOR -- same as strings.Join
split STR SEPARATOR -- same as strings.Split
fullUse CMD -- slice of all command names starting from the parent
listFlags CMD -- list of command flags
Args:
depth (int): if template is specified, unify all commands starting from depth in a single file.
Default = 1.
doc_file (str): file where to save generated documentation
extension (str): if the template is specified, string to append to the output file names
doc_type (str): type for the documentation ('md' or 'man') (default "md")
Returns:
str: Command string
"""
return self._execute(
f'gendoc {doc_file}',
**{param: param_value for param, param_value in locals().items() if param not in ['self', 'doc_file']}
)

View file

@ -0,0 +1,272 @@
from typing import Optional
from cli_utils.cli_command import NeofsCliCommand
class NeofsAdmMorph(NeofsCliCommand):
def deposit_notary(self, rpc_endpoint: str, account: str, gas: str, storage_wallet: Optional[str] = None,
till: Optional[str] = None) -> str:
"""Deposit GAS for notary service.
Args:
account (str): wallet account address
gas (str): amount of GAS to deposit
rpc_endpoint (str): N3 RPC node endpoint
storage_wallet (str): path to storage node wallet
till (str): notary deposit duration in blocks
Returns:
str: Command string
"""
return self._execute(
'morph deposit-notary',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def dump_balances(self, rpc_endpoint: str, alphabet: Optional[str] = None, proxy: Optional[str] = None,
script_hash: Optional[str] = None, storage: Optional[str] = None) -> str:
"""Dump GAS balances
Args:
alphabet (str): dump balances of alphabet contracts
proxy (str): dump balances of the proxy contract
rpc_endpoint (str): N3 RPC node endpoint
script_hash (str): use script-hash format for addresses
storage (str): dump balances of storage nodes from the current netmap
Returns:
str: Command string
"""
return self._execute(
'morph dump-balances',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def dump_config(self, rpc_endpoint: str) -> str:
"""Section for morph network configuration commands.
Args:
rpc_endpoint (str): N3 RPC node endpoint
Returns:
str: Command string
"""
return self._execute(
'morph dump-config',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def dump_containers(self, rpc_endpoint: str, cid: Optional[str] = None, container_contract: Optional[str] = None,
dump: Optional[str] = None) -> str:
"""Dump NeoFS containers to file.
Args:
cid (str): containers to dump
container_contract (str): container contract hash (for networks without NNS)
dump (str): file where to save dumped containers
rpc_endpoint (str): N3 RPC node endpoint
Returns:
str: Command string
"""
return self._execute(
'morph dump-containers',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def dump_hashes(self, rpc_endpoint: str) -> str:
"""Dump deployed contract hashes.
Args:
rpc_endpoint (str): N3 RPC node endpoint
Returns:
str: Command string
"""
return self._execute(
'morph dump-hashes',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def force_new_epoch(self, rpc_endpoint: Optional[str] = None, alphabet: Optional[str] = None) -> str:
"""Create new NeoFS epoch event in the side chain
Args:
alphabet (str): path to alphabet wallets dir
rpc_endpoint (str): N3 RPC node endpoint
Returns:
str: Command string
"""
return self._execute(
'morph force-new-epoch',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def generate_alphabet(self, rpc_endpoint: str, alphabet_wallets: str, size: int = 7) -> str:
"""Generate alphabet wallets for consensus nodes of the morph network
Args:
alphabet_wallets (str): path to alphabet wallets dir
size (int): amount of alphabet wallets to generate (default 7)
rpc_endpoint (str): N3 RPC node endpoint
Returns:
str: Command string
"""
return self._execute(
'morph generate-alphabet',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def generate_storage_wallet(self, rpc_endpoint: str, alphabet_wallets: str, storage_wallet: str,
initial_gas: Optional[str] = None) -> str:
"""Generate storage node wallet for the morph network
Args:
alphabet_wallets (str): path to alphabet wallets dir
initial_gas (str): initial amount of GAS to transfer
rpc_endpoint (str): N3 RPC node endpoint
storage_wallet (str): path to new storage node wallet
Returns:
str: Command string
"""
return self._execute(
'morph generate-storage-wallet',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def init(self, rpc_endpoint: str, alphabet_wallets: str, contracts: str, protocol: str,
container_alias_fee: int = 500, container_fee: int = 1000, epoch_duration: int = 240,
homomorphic_disabled: bool = False, local_dump: Optional[str] = None, max_object_size: int = 67108864
) -> str:
"""Section for morph network configuration commands.
Args:
alphabet_wallets (str): path to alphabet wallets dir
container_alias_fee (int): container alias fee (default 500)
container_fee (int): container registration fee (default 1000)
contracts (str): path to archive with compiled NeoFS contracts
(default fetched from latest github release)
epoch_duration (int): amount of side chain blocks in one NeoFS epoch (default 240)
homomorphic_disabled: (bool): disable object homomorphic hashing
local_dump (str): path to the blocks dump file
max_object_size (int): max single object size in bytes (default 67108864)
protocol (str): path to the consensus node configuration
rpc_endpoint (str): N3 RPC node endpoint
Returns:
str: Command string
"""
return self._execute(
'morph init',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def refill_gas(self, rpc_endpoint: str, alphabet_wallets: str, storage_wallet: str, gas: Optional[str] = None
) -> str:
"""Refill GAS of storage node's wallet in the morph network
Args:
alphabet_wallets (str): path to alphabet wallets dir
gas (str): additional amount of GAS to transfer
rpc_endpoint (str): N3 RPC node endpoint
storage_wallet (str): path to new storage node wallet
Returns:
str: Command string
"""
return self._execute(
'morph refill-gas',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def restore_containers(self, rpc_endpoint: str, alphabet_wallets: str, cid: str, dump: str) -> str:
"""Restore NeoFS containers from file.
Args:
alphabet_wallets (str): path to alphabet wallets dir
cid (str): containers to restore
dump (str): file to restore containers from
rpc_endpoint (str): N3 RPC node endpoint
Returns:
str: Command string
"""
return self._execute(
'morph restore-containers',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def set_policy(self, rpc_endpoint: str, alphabet_wallets: str, exec_fee_factor: Optional[int] = None,
storage_price: Optional[int] = None, fee_per_byte: Optional[int] = None) -> str:
"""Set global policy values
Args:
alphabet_wallets (str): path to alphabet wallets dir
exec_fee_factor (int): ExecFeeFactor=<n1>
storage_price (int): StoragePrice=<n2>
fee_per_byte (int): FeePerByte=<n3>
rpc_endpoint (str): N3 RPC node endpoint
Returns:
str: Command string
"""
non_param_attribute = ''
if exec_fee_factor:
non_param_attribute += f'ExecFeeFactor={exec_fee_factor} '
if storage_price:
non_param_attribute += f'StoragePrice={storage_price} '
if fee_per_byte:
non_param_attribute += f'FeePerByte={fee_per_byte} '
return self._execute(
f'morph restore-containers {non_param_attribute}',
**{param: param_value for param, param_value in locals().items() if param not in [
'self', 'exec_fee_factor', 'storage_price', 'fee_per_byte'
]}
)
def update_contracts(self, rpc_endpoint: str, alphabet_wallets: str, contracts: Optional[str] = None
) -> str:
"""Update NeoFS contracts.
Args:
alphabet_wallets (str): path to alphabet wallets dir
contracts (str): path to archive with compiled NeoFS contracts
(default fetched from latest github release)
rpc_endpoint (str): N3 RPC node endpoint
Returns:
str: Command string
"""
return self._execute(
'morph update-contracts',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)

View file

@ -0,0 +1,20 @@
from cli_utils.cli_command import NeofsCliCommand
class NeofsAdmStorageConfig(NeofsCliCommand):
def set(self, account: str, wallet: str) -> str:
"""Initialize basic neofs-adm configuration file.
Args:
account (str): wallet account
wallet (str): path to wallet
Returns:
str: Command string
"""
return self._execute(
'storage-config',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)

View file

@ -0,0 +1,187 @@
from typing import Optional
from cli_utils.cli_command import NeofsCliCommand
class NeofsAdmMorphSubnet(NeofsCliCommand):
def create(self, rpc_endpoint: str, address: str, wallet: str, notary: bool = False) -> str:
"""Create NeoFS subnet.
Args:
address (str): Address in the wallet, optional
notary (bool): Flag to create subnet in notary environment
rpc_endpoint (str): N3 RPC node endpoint
wallet (str): Path to file with wallet
Returns:
str: Command string
"""
return self._execute(
'morph subnet create',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def get(self, rpc_endpoint: str, subnet: str) -> str:
"""Read information about the NeoFS subnet.
Args:
rpc_endpoint (str): N3 RPC node endpoint
subnet (str): ID of the subnet to read
Returns:
str: Command string
"""
return self._execute(
'morph subnet get',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def remove(self, rpc_endpoint: str, wallet: str, subnet: str, address: Optional[str] = None) -> str:
"""Remove NeoFS subnet.
Args:
address (str): Address in the wallet, optional
rpc_endpoint (str): N3 RPC node endpoint
subnet (str): ID of the subnet to read
wallet (str): Path to file with wallet
Returns:
str: Command string
"""
return self._execute(
'morph subnet remove',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def admin_add(self, rpc_endpoint: str, wallet: str, admin: str, subnet: str, client: Optional[str] = None,
group: Optional[str] = None, address: Optional[str] = None) -> str:
"""Add admin to the NeoFS subnet.
Args:
address (str): Address in the wallet, optional
admin (str): Hex-encoded public key of the admin
client (str): Add client admin instead of node one
group (str): Client group ID in text format (needed with --client only)
rpc_endpoint (str): N3 RPC node endpoint
subnet (str): ID of the subnet to read
wallet (str): Path to file with wallet
Returns:
str: Command string
"""
return self._execute(
'morph subnet admin add',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def admin_remove(self, rpc_endpoint: str, wallet: str, admin: str, subnet: str, client: Optional[str] = None,
address: Optional[str] = None) -> str:
"""Remove admin of the NeoFS subnet.
Args:
address (str): Address in the wallet, optional
admin (str): Hex-encoded public key of the admin
client (str): Remove client admin instead of node one
rpc_endpoint (str): N3 RPC node endpoint
subnet (str): ID of the subnet to read
wallet (str): Path to file with wallet
Returns:
str: Command string
"""
return self._execute(
'morph subnet admin remove',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def client_add(self, rpc_endpoint: str, wallet: str, subnet: str, client: Optional[str] = None,
group: Optional[str] = None, address: Optional[str] = None) -> str:
"""Add client to the NeoFS subnet.
Args:
address (str): Address in the wallet, optional
client (str): Add client admin instead of node one
group (str): Client group ID in text format (needed with --client only)
rpc_endpoint (str): N3 RPC node endpoint
subnet (str): ID of the subnet to read
wallet (str): Path to file with wallet
Returns:
str: Command string
"""
return self._execute(
'morph subnet client add',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def client_remove(self, rpc_endpoint: str, wallet: str, client: str, group: str, subnet: str,
address: Optional[str] = None) -> str:
"""Remove client of the NeoFS subnet.
Args:
address (str): Address in the wallet, optional
client (str): Remove client admin instead of node one
group (str): ID of the client group to work with
rpc_endpoint (str): N3 RPC node endpoint
subnet (str): ID of the subnet to read
wallet (str): Path to file with wallet
Returns:
str: Command string
"""
return self._execute(
'morph subnet client remove',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def node_add(self, rpc_endpoint: str, wallet: str, node: str, subnet: str) -> str:
"""Add node to the NeoFS subnet.
Args:
node (str): Hex-encoded public key of the node
rpc_endpoint (str): N3 RPC node endpoint
subnet (str): ID of the subnet to read
wallet (str): Path to file with wallet
Returns:
str: Command string
"""
return self._execute(
'morph subnet node add',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def node_remove(self, rpc_endpoint: str, wallet: str, node: str, subnet: str) -> str:
"""Remove node from the NeoFS subnet.
Args:
node (str): Hex-encoded public key of the node
rpc_endpoint (str): N3 RPC node endpoint
subnet (str): ID of the subnet to read
wallet (str): Path to file with wallet
Returns:
str: Command string
"""
return self._execute(
'morph subnet node remove',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)

View file

@ -0,0 +1,12 @@
from cli_utils.cli_command import NeofsCliCommand
class NeofsAdmVersion(NeofsCliCommand):
def get(self) -> str:
"""Application version
Returns:
str: Command string
"""
return self._execute('', version=True)

View file

@ -0,0 +1 @@
from .cli import NeofsCli

View file

@ -0,0 +1,25 @@
from typing import Optional
from cli_utils.cli_command import NeofsCliCommand
class NeofsCliAccounting(NeofsCliCommand):
def balance(self, wallet: str, rpc_endpoint: str, address: Optional[str] = None,
owner: Optional[str] = None) -> str:
"""Get internal balance of NeoFS account
Args:
address: address of wallet account
owner: owner of balance account (omit to use owner from private key)
rpc_endpoint: remote node address (as 'multiaddr' or '<host>:<port>')
wallet: WIF (NEP-2) string or path to the wallet or binary key
Returns:
str: Command string
"""
return self._execute(
'accounting balance',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)

View file

@ -0,0 +1,47 @@
from typing import Optional
from cli_utils.cli_command import NeofsCliCommand
class NeofsCliACL(NeofsCliCommand):
def extended_create(self, cid: str, out: str, file: Optional[str] = None, rule: Optional[list] = None) -> str:
"""Create extended ACL from the text representation.
Rule consist of these blocks: <action> <operation> [<filter1> ...] [<target1> ...]
Action is 'allow' or 'deny'.
Operation is an object service verb: 'get', 'head', 'put', 'search', 'delete', 'getrange', or 'getrangehash'.
Filter consists of <typ>:<key><match><value>
Typ is 'obj' for object applied filter or 'req' for request applied filter.
Key is a valid unicode string corresponding to object or request header key.
Well-known system object headers start with '$Object:' prefix.
User defined headers start without prefix.
Read more about filter keys at:
http://github.com/nspcc-dev/neofs-api/blob/master/proto-docs/acl.md#message-eaclrecordfilter
Match is '=' for matching and '!=' for non-matching filter.
Value is a valid unicode string corresponding to object or request header value.
Target is
'user' for container owner,
'system' for Storage nodes in container and Inner Ring nodes,
'others' for all other request senders,
'pubkey:<key1>,<key2>,...' for exact request sender, where <key> is a hex-encoded 33-byte public key.
When both '--rule' and '--file' arguments are used, '--rule' records will be placed higher in resulting
extended ACL table.
Args:
cid: Container ID
file: Read list of extended ACL table records from from text file
out: Save JSON formatted extended ACL table in file
rule: Extended ACL table record to apply
Returns:
str: Command string
"""
return self._execute(
'acl extended create',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)

View file

@ -0,0 +1,28 @@
from typing import Optional
from common import NEOFS_CLI_EXEC
from .accounting import NeofsCliAccounting
from .acl import NeofsCliACL
from .container import NeofsCliContainer
from .object import NeofsCliObject
from .version import NeofsCliVersion
class NeofsCli:
neofs_cli_exec_path: Optional[str] = None
config: Optional[str] = None
accounting: Optional[NeofsCliAccounting] = None
acl: Optional[NeofsCliACL] = None
container: Optional[NeofsCliContainer] = None
object: Optional[NeofsCliObject] = None
version: Optional[NeofsCliVersion] = None
def __init__(self, neofs_cli_exec_path: Optional[str] = None, config: Optional[str] = None, timeout: int = 30):
self.config = config # config(str): config file (default is $HOME/.config/neofs-cli/config.yaml)
self.neofs_cli_exec_path = neofs_cli_exec_path or NEOFS_CLI_EXEC
self.accounting = NeofsCliAccounting(self.neofs_cli_exec_path, timeout=timeout, config=config)
self.acl = NeofsCliACL(self.neofs_cli_exec_path, timeout=timeout, config=config)
self.container = NeofsCliContainer(self.neofs_cli_exec_path, timeout=timeout, config=config)
self.object = NeofsCliObject(self.neofs_cli_exec_path, timeout=timeout, config=config)
self.version = NeofsCliVersion(self.neofs_cli_exec_path, timeout=timeout, config=config)

View file

@ -0,0 +1,186 @@
from typing import Optional
from cli_utils.cli_command import NeofsCliCommand
class NeofsCliContainer(NeofsCliCommand):
def create(self, rpc_endpoint: str, wallet: str, address: Optional[str] = None, attributes: Optional[dict] = None,
basic_acl: Optional[str] = None, await_mode: bool = False, disable_timestamp: bool = False,
name: Optional[str] = None, nonce: Optional[str] = None, policy: Optional[str] = None,
session: Optional[str] = None, subnet: Optional[str] = None, ttl: Optional[int] = None,
xhdr: Optional[dict] = None) -> str:
"""Create a new container and register it in the NeoFS.
It will be stored in the sidechain when the Inner Ring accepts it.
Args:
address: address of wallet account
attributes: comma separated pairs of container attributes in form of Key1=Value1,Key2=Value2
await_mode: block execution until container is persisted
basic_acl: hex encoded basic ACL value or keywords like 'public-read-write', 'private',
'eacl-public-read' (default "private")
disable_timestamp: disable timestamp container attribute
name: container name attribute
nonce: UUIDv4 nonce value for container
policy: QL-encoded or JSON-encoded placement policy or path to file with it
rpc_endpoint: remote node address (as 'multiaddr' or '<host>:<port>')
session: path to a JSON-encoded container session token
subnet: string representation of container subnetwork
ttl: TTL value in request meta header (default 2)
wallet: WIF (NEP-2) string or path to the wallet or binary key
xhdr: Request X-Headers in form of Key=Value
Returns:
str: Command string
"""
return self._execute(
'container create',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def delete(self, rpc_endpoint: str, wallet: str, cid: str, address: Optional[str] = None, await_mode: bool = False,
session: Optional[str] = None, ttl: Optional[int] = None, xhdr: Optional[dict] = None,
force: bool = False) -> str:
"""Delete an existing container.
Only the owner of the container has permission to remove the container.
Args:
address: address of wallet account
await_mode: block execution until container is removed
cid: container ID
force: do not check whether container contains locks and remove immediately
rpc_endpoint: remote node address (as 'multiaddr' or '<host>:<port>')
session: path to a JSON-encoded container session token
ttl: TTL value in request meta header (default 2)
wallet: WIF (NEP-2) string or path to the wallet or binary key
xhdr: Request X-Headers in form of Key=Value
Returns:
str: Command string
"""
return self._execute(
'container delete',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def get(self, rpc_endpoint: str, wallet: str, cid: str, address: Optional[str] = None, await_mode: bool = False,
to: Optional[str] = None, json_mode: bool = False, ttl: Optional[int] = None,
xhdr: Optional[dict] = None) -> str:
"""Get container field info
Args:
address: address of wallet account
await_mode: block execution until container is removed
cid: container ID
json_mode: print or dump container in JSON format
rpc_endpoint: remote node address (as 'multiaddr' or '<host>:<port>')
to: path to dump encoded container
ttl: TTL value in request meta header (default 2)
wallet: WIF (NEP-2) string or path to the wallet or binary key
xhdr: Request X-Headers in form of Key=Value
Returns:
str: Command string
"""
return self._execute(
'container get',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def get_eacl(self, rpc_endpoint: str, wallet: str, cid: str, address: Optional[str] = None,
await_mode: bool = False, to: Optional[str] = None, session: Optional[str] = None,
ttl: Optional[int] = None, xhdr: Optional[dict] = None) -> str:
"""Get extended ACL talbe of container
Args:
address: address of wallet account
await_mode: block execution until container is removed
cid: container ID
rpc_endpoint: remote node address (as 'multiaddr' or '<host>:<port>')
to: path to dump encoded container
session: path to a JSON-encoded container session token
ttl: TTL value in request meta header (default 2)
wallet: WIF (NEP-2) string or path to the wallet or binary key
xhdr: Request X-Headers in form of Key=Value
Returns:
str: Command string
"""
return self._execute(
'container get-eacl',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def list(self, rpc_endpoint: str, wallet: str, address: Optional[str] = None,
owner: Optional[str] = None, ttl: Optional[int] = None, xhdr: Optional[dict] = None, **params) -> str:
"""List all created containers
Args:
address: address of wallet account
owner: owner of containers (omit to use owner from private key)
rpc_endpoint: remote node address (as 'multiaddr' or '<host>:<port>')
ttl: TTL value in request meta header (default 2)
wallet: WIF (NEP-2) string or path to the wallet or binary key
xhdr: Request X-Headers in form of Key=Value
Returns:
str: Command string
"""
return self._execute(
'container list',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def list_objects(self, rpc_endpoint: str, wallet: str, cid: str, address: Optional[str] = None,
ttl: Optional[int] = None, xhdr: Optional[dict] = None) -> str:
"""List existing objects in container
Args:
address: address of wallet account
cid: container ID
rpc_endpoint: remote node address (as 'multiaddr' or '<host>:<port>')
ttl: TTL value in request meta header (default 2)
wallet: WIF (NEP-2) string or path to the wallet or binary key
xhdr: Request X-Headers in form of Key=Value
Returns:
str: Command string
"""
return self._execute(
'container list-objects',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def set_eacl(self, rpc_endpoint: str, wallet: str, cid: str, address: Optional[str] = None,
await_mode: bool = False, table: Optional[str] = None, session: Optional[str] = None,
ttl: Optional[int] = None, xhdr: Optional[dict] = None) -> str:
"""Set a new extended ACL table for the container.
Container ID in the EACL table will be substituted with the ID from the CLI.
Args:
address: address of wallet account
await_mode: block execution until container is removed
cid: container ID
rpc_endpoint: remote node address (as 'multiaddr' or '<host>:<port>')
session: path to a JSON-encoded container session token
table: path to file with JSON or binary encoded EACL table
ttl: TTL value in request meta header (default 2)
wallet: WIF (NEP-2) string or path to the wallet or binary key
xhdr: Request X-Headers in form of Key=Value
Returns:
str: Command string
"""
return self._execute(
'container set-eacl',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)

View file

@ -0,0 +1,239 @@
from typing import Optional
from cli_utils.cli_command import NeofsCliCommand
class NeofsCliObject(NeofsCliCommand):
def delete(self, rpc_endpoint: str, wallet: str, cid: str, oid: str, address: Optional[str] = None,
bearer: Optional[str] = None, session: Optional[str] = None, ttl: Optional[int] = None,
xhdr: Optional[dict] = None) -> str:
"""Delete object from NeoFS
Args:
address: address of wallet account
bearer: File with signed JSON or binary encoded bearer token
cid: Container ID
oid: Object ID
rpc_endpoint: remote node address (as 'multiaddr' or '<host>:<port>')
session: path to a JSON-encoded container session token
ttl: TTL value in request meta header (default 2)
wallet: WIF (NEP-2) string or path to the wallet or binary key
xhdr: Request X-Headers in form of Key=Value
Returns:
str: Command string
"""
return self._execute(
'object delete',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def get(self, rpc_endpoint: str, wallet: str, cid: str, oid: str, address: Optional[str] = None,
bearer: Optional[str] = None, file: Optional[str] = None,
header: Optional[str] = None, no_progress: bool = False, raw: bool = False,
session: Optional[str] = None, ttl: Optional[int] = None, xhdr: Optional[dict] = None) -> str:
"""Get object from NeoFS
Args:
address: address of wallet account
bearer: File with signed JSON or binary encoded bearer token
cid: Container ID
file: File to write object payload to. Default: stdout.
header: File to write header to. Default: stdout.
no_progress: Do not show progress bar
oid: Object ID
raw: Set raw request option
rpc_endpoint: remote node address (as 'multiaddr' or '<host>:<port>')
session: path to a JSON-encoded container session token
ttl: TTL value in request meta header (default 2)
wallet: WIF (NEP-2) string or path to the wallet or binary key
xhdr: Request X-Headers in form of Key=Value
Returns:
str: Command string
"""
return self._execute(
'object get',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def hash(self, rpc_endpoint: str, wallet: str, cid: str, oid: str, address: Optional[str] = None,
bearer: Optional[str] = None, range: Optional[str] = None, salt: Optional[str] = None,
ttl: Optional[int] = None, hash_type: Optional[str] = None, xhdr: Optional[dict] = None) -> str:
"""Get object hash
Args:
address: address of wallet account
bearer: File with signed JSON or binary encoded bearer token
cid: Container ID
oid: Object ID
range: Range to take hash from in the form offset1:length1,...
rpc_endpoint: remote node address (as 'multiaddr' or '<host>:<port>')
salt: Salt in hex format
ttl: TTL value in request meta header (default 2)
hash_type: Hash type. Either 'sha256' or 'tz' (default "sha256")
wallet: WIF (NEP-2) string or path to the wallet or binary key
xhdr: Request X-Headers in form of Key=Value
Returns:
str: Command string
"""
return self._execute(
'object hash',
**{param: param_value for param, param_value in locals().items() if param not in ['self', 'params']}
)
def head(self, rpc_endpoint: str, wallet: str, cid: str, oid: str, address: Optional[str] = None,
bearer: Optional[str] = None, file: Optional[str] = None,
json_mode: bool = False, main_only: bool = False, proto: bool = False, raw: bool = False,
session: Optional[str] = None, ttl: Optional[int] = None, xhdr: Optional[dict] = None) -> str:
"""Get object header
Args:
address: address of wallet account
bearer: File with signed JSON or binary encoded bearer token
cid: Container ID
file: File to write object payload to. Default: stdout.
json_mode: Marshal output in JSON
main_only: Return only main fields
oid: Object ID
proto: Marshal output in Protobuf
raw: Set raw request option
rpc_endpoint: remote node address (as 'multiaddr' or '<host>:<port>')
session: path to a JSON-encoded container session token
ttl: TTL value in request meta header (default 2)
wallet: WIF (NEP-2) string or path to the wallet or binary key
xhdr: Request X-Headers in form of Key=Value
Returns:
str: Command string
"""
return self._execute(
'object head',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def lock(self, rpc_endpoint: str, wallet: str, cid: str, oid: str, lifetime: int, address: Optional[str] = None,
bearer: Optional[str] = None, session: Optional[str] = None, ttl: Optional[int] = None,
xhdr: Optional[dict] = None) -> str:
"""Lock object in container
Args:
address: address of wallet account
bearer: File with signed JSON or binary encoded bearer token
cid: Container ID
oid: Object ID
lifetime: Object lifetime
rpc_endpoint: remote node address (as 'multiaddr' or '<host>:<port>')
session: path to a JSON-encoded container session token
ttl: TTL value in request meta header (default 2)
wallet: WIF (NEP-2) string or path to the wallet or binary key
xhdr: Request X-Headers in form of Key=Value
Returns:
str: Command string
"""
return self._execute(
'object lock',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def put(self, rpc_endpoint: str, wallet: str, cid: str, file: str, address: Optional[str] = None,
attributes: Optional[dict] = None, bearer: Optional[str] = None, disable_filename: bool = False,
disable_timestamp: bool = False, expire_at: Optional[int] = None, no_progress: bool = False,
notify: Optional[str] = None, session: Optional[str] = None, ttl: Optional[int] = None,
xhdr: Optional[dict] = None) -> str:
"""Put object to NeoFS
Args:
address: address of wallet account
attributes: User attributes in form of Key1=Value1,Key2=Value2
bearer: File with signed JSON or binary encoded bearer token
cid: Container ID
disable_filename: Do not set well-known filename attribute
disable_timestamp: Do not set well-known timestamp attribute
expire_at: Last epoch in the life of the object
file: File with object payload
no_progress: Do not show progress bar
notify: Object notification in the form of *epoch*:*topic*; '-' topic means using default
rpc_endpoint: remote node address (as 'multiaddr' or '<host>:<port>')
session: path to a JSON-encoded container session token
ttl: TTL value in request meta header (default 2)
wallet: WIF (NEP-2) string or path to the wallet or binary key
xhdr: Request X-Headers in form of Key=Value
Returns:
str: Command string
"""
return self._execute(
'object put',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def range(self, rpc_endpoint: str, wallet: str, cid: str, oid: str, range: str, address: Optional[str] = None,
bearer: Optional[str] = None, file: Optional[str] = None, json_mode: bool = False, raw: bool = False,
session: Optional[str] = None, ttl: Optional[int] = None, xhdr: Optional[dict] = None) -> str:
"""Get payload range data of an object
Args:
address: address of wallet account
bearer: File with signed JSON or binary encoded bearer token
cid: Container ID
file: File to write object payload to. Default: stdout.
json_mode: Marshal output in JSON
oid: Object ID
range: Range to take data from in the form offset:length
raw: Set raw request option
rpc_endpoint: remote node address (as 'multiaddr' or '<host>:<port>')
session: path to a JSON-encoded container session token
ttl: TTL value in request meta header (default 2)
wallet: WIF (NEP-2) string or path to the wallet or binary key
xhdr: Request X-Headers in form of Key=Value
Returns:
str: Command string
"""
return self._execute(
'object range',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)
def search(self, rpc_endpoint: str, wallet: str, cid: str, address: Optional[str] = None,
bearer: Optional[str] = None, filters: Optional[list] = None, oid: Optional[str] = None,
phy: bool = False, root: bool = False, session: Optional[str] = None, ttl: Optional[int] = None,
xhdr: Optional[dict] = None) -> str:
"""Search object
Args:
address: address of wallet account
bearer: File with signed JSON or binary encoded bearer token
cid: Container ID
filters: Repeated filter expressions or files with protobuf JSON
oid: Object ID
phy: Search physically stored objects
root: Search for user objects
rpc_endpoint: remote node address (as 'multiaddr' or '<host>:<port>')
session: path to a JSON-encoded container session token
ttl: TTL value in request meta header (default 2)
wallet: WIF (NEP-2) string or path to the wallet or binary key
xhdr: Request X-Headers in form of Key=Value
Returns:
str: Command string
"""
return self._execute(
'object search',
**{param: param_value for param, param_value in locals().items() if param not in ['self']}
)

View file

@ -0,0 +1,12 @@
from cli_utils.cli_command import NeofsCliCommand
class NeofsCliVersion(NeofsCliCommand):
def get(self) -> str:
"""Application version and NeoFS API compatibility
Returns:
str: Command string
"""
return self._execute('', version=True)

View file

@ -0,0 +1,47 @@
from typing import Optional
from cli_helpers import _cmd_run
class NeofsCliCommand:
neofs_cli_exec: Optional[str] = None
timeout: Optional[int] = None
__base_params: Optional[str] = None
map_params = {'json_mode': 'json', 'await_mode': 'await', 'hash_type': 'hash', 'doc_type': 'type'}
def __init__(self, neofs_cli_exec: str, timeout: int, **base_params):
self.neofs_cli_exec = neofs_cli_exec
self.timeout = timeout
self.__base_params = ' '.join([f'--{param} {value}' for param, value in base_params.items() if value])
def _format_command(self, command: str, **params) -> str:
param_str = []
for param, value in params.items():
if param in self.map_params.keys():
param = self.map_params[param]
param = param.replace('_', '-')
if not value:
continue
if isinstance(value, bool):
param_str.append(f'--{param}')
elif isinstance(value, int):
param_str.append(f'--{param} {value}')
elif isinstance(value, list):
for value_item in value:
val_str = str(value_item).replace("'", "\\'")
param_str.append(f"--{param} '{val_str}'")
elif isinstance(value, dict):
param_str.append(f'--{param} \'{",".join(f"{key}={val}" for key, val in value.items())}\'')
else:
if "'" in str(value):
value_str = str(value).replace('"', '\\"')
param_str.append(f'--{param} "{value_str}"')
else:
param_str.append(f"--{param} '{value}'")
param_str = ' '.join(param_str)
return f'{self.neofs_cli_exec} {self.__base_params} {command or ""} {param_str}'
def _execute(self, command: Optional[str], **params) -> str:
return _cmd_run(self._format_command(command, **params), timeout=self.timeout)