[#1] Add cryptography methods
Signed-off-by: ilyas585 <niyazov2023@gmail.com>
This commit is contained in:
parent
480d288e2a
commit
19282f13cc
14 changed files with 268 additions and 0 deletions
3
.idea/.gitignore
generated
vendored
Normal file
3
.idea/.gitignore
generated
vendored
Normal file
|
@ -0,0 +1,3 @@
|
|||
# Default ignored files
|
||||
/shelf/
|
||||
/workspace.xml
|
0
frostfs_api/__init__.py
Normal file
0
frostfs_api/__init__.py
Normal file
1
frostfs_api/cryptography/__init__.py
Normal file
1
frostfs_api/cryptography/__init__.py
Normal file
|
@ -0,0 +1 @@
|
|||
# Cryptography directory package
|
44
frostfs_api/cryptography/key_extension.py
Normal file
44
frostfs_api/cryptography/key_extension.py
Normal file
|
@ -0,0 +1,44 @@
|
|||
import base58
|
||||
import ecdsa
|
||||
|
||||
|
||||
class KeyExtension:
|
||||
def get_private_key_from_wif(self, wif: str) -> bytes:
|
||||
"""
|
||||
Converts a WIF private key to a byte array.
|
||||
|
||||
:param wif: WIF private key in string format.
|
||||
:return: Private key in byte format (32 bytes).
|
||||
:raises ValueError: If the WIF key is incorrect.
|
||||
"""
|
||||
assert not self.is_empty(wif)
|
||||
|
||||
decoded = base58.b58decode_check(wif)
|
||||
if len(decoded) != 34 or decoded[0] != 0x80 or decoded[-1] != 0x01:
|
||||
raise ValueError("Incorrect WIF private key")
|
||||
|
||||
private_key = decoded[1:-1]
|
||||
return private_key
|
||||
|
||||
def get_public_key(self, private_key: bytes) -> bytes:
|
||||
"""
|
||||
Extract public key from Private key
|
||||
|
||||
:param private_key: Private key in byte format (32 bytes).
|
||||
:return: compressed public key in byte format (33 bytes).
|
||||
:raises ValueError: If the private_key key is empty or null.
|
||||
"""
|
||||
assert not self.is_empty(private_key)
|
||||
|
||||
if len(private_key) != 32:
|
||||
raise ValueError(f"Incorrect len of private key, Expected: 32, Actual: {len(private_key)}")
|
||||
|
||||
public_key = ecdsa.SigningKey.from_string(private_key, curve=ecdsa.NIST256p).get_verifying_key()
|
||||
compressed_public_key = public_key.to_string("compressed")
|
||||
return compressed_public_key
|
||||
|
||||
@staticmethod
|
||||
def is_empty(sequence_symbols: bytes | str):
|
||||
if len(sequence_symbols) == 0 or sequence_symbols is None:
|
||||
raise ValueError(f"Empty sequence symbols of key: {sequence_symbols}")
|
||||
return False
|
26
frostfs_api/cryptography/signer.py
Normal file
26
frostfs_api/cryptography/signer.py
Normal file
|
@ -0,0 +1,26 @@
|
|||
import ecdsa
|
||||
from hashlib import sha256, sha512
|
||||
|
||||
|
||||
class Signer:
|
||||
def sign_rfc6979(self, private_key: bytes, message: bytes) -> bytes:
|
||||
if len(private_key) == 0 or private_key is None:
|
||||
raise ValueError(f"Incorrect private_key: {private_key}")
|
||||
|
||||
sk = ecdsa.SigningKey.from_string(private_key, curve=ecdsa.NIST256p, hashfunc=sha256)
|
||||
|
||||
signature = sk.sign_deterministic(message)
|
||||
|
||||
return signature
|
||||
|
||||
def sign(self, private_key: bytes, message: bytes) -> bytes:
|
||||
if len(private_key) == 0 or private_key is None:
|
||||
raise ValueError(f"Incorrect private key: {private_key}")
|
||||
|
||||
sk = ecdsa.SigningKey.from_string(private_key, curve=ecdsa.NIST256p, hashfunc=sha512)
|
||||
signature = sk.sign(message)
|
||||
|
||||
# the first byte indicates the node version marker
|
||||
signature_with_marker = bytes([0x04]) + signature
|
||||
|
||||
return signature_with_marker
|
50
frostfs_api/cryptography/verifier.py
Normal file
50
frostfs_api/cryptography/verifier.py
Normal file
|
@ -0,0 +1,50 @@
|
|||
import ecdsa
|
||||
from hashlib import sha256, sha512
|
||||
|
||||
|
||||
class Verifier:
|
||||
def verify_rfc6979(self, public_key: bytes, message: bytes, signature: bytes) -> bool:
|
||||
"""
|
||||
Verify a signature using the public key.
|
||||
|
||||
:param public_key: Public key in byte format.
|
||||
:param message: Signature verification message in byte format.
|
||||
:param signature: Signature in byte format.
|
||||
:return: True if the signature is correct, otherwise False.
|
||||
:raises: ValueError: If the public_key key is incorrect.
|
||||
"""
|
||||
if len(public_key) == 0 or public_key is None:
|
||||
raise ValueError(f"Incorrect public key: {public_key}")
|
||||
|
||||
if message is None or signature is None:
|
||||
return False
|
||||
|
||||
vk = ecdsa.VerifyingKey.from_string(public_key, curve=ecdsa.NIST256p, hashfunc=sha256)
|
||||
|
||||
try:
|
||||
return vk.verify(signature, message)
|
||||
except ecdsa.BadSignatureError:
|
||||
return False
|
||||
|
||||
def verify(self, public_key: bytes, message: bytes, signature: bytes) -> bool:
|
||||
"""
|
||||
Verify a signature using the public key.
|
||||
|
||||
:param public_key: Public key in byte format.
|
||||
:param message: Signature verification message in byte format.
|
||||
:param signature: Signature in byte format.
|
||||
:return: True if the signature is correct, otherwise False.
|
||||
:raises: ValueError: If the public_key key is incorrect.
|
||||
"""
|
||||
if len(public_key) == 0 or public_key is None:
|
||||
raise ValueError(f"Incorrect public key: {public_key}")
|
||||
|
||||
if message is None or signature is None:
|
||||
return False
|
||||
|
||||
vk = ecdsa.VerifyingKey.from_string(public_key, curve=ecdsa.NIST256p, hashfunc=sha512)
|
||||
|
||||
try:
|
||||
return vk.verify(signature, message)
|
||||
except ecdsa.BadSignatureError:
|
||||
return False
|
4
pytest.ini
Normal file
4
pytest.ini
Normal file
|
@ -0,0 +1,4 @@
|
|||
[pytest]
|
||||
|
||||
# tests markers
|
||||
crypto: cryptographic functions tests
|
4
requirements.txt
Normal file
4
requirements.txt
Normal file
|
@ -0,0 +1,4 @@
|
|||
base58==2.1.1
|
||||
ecdsa==0.19.0
|
||||
|
||||
pytest==8.3.4
|
8
tests/conftest.py
Normal file
8
tests/conftest.py
Normal file
|
@ -0,0 +1,8 @@
|
|||
import pytest
|
||||
|
||||
from tests.helpers.models import ClientCryptograpy
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def client_cryptography():
|
||||
return ClientCryptograpy()
|
39
tests/cryptography/test_key_extension.py
Normal file
39
tests/cryptography/test_key_extension.py
Normal file
|
@ -0,0 +1,39 @@
|
|||
import pytest
|
||||
|
||||
from tests.helpers.models import ClientCryptograpy
|
||||
from tests.helpers.convert import convert_bytes_format_127_to_256
|
||||
from tests.helpers.resources import WIF, PRIVATE_KEY
|
||||
|
||||
|
||||
@pytest.mark.crypto
|
||||
class TestKeyExtension:
|
||||
def test_get_private_key_from_wif_success(self, client_cryptography: ClientCryptograpy):
|
||||
private_key = client_cryptography.key_extension.get_private_key_from_wif(WIF)
|
||||
|
||||
assert len(private_key) == 32, f"Not correct len of private key, expected: 32 Actual: {len(private_key)} "
|
||||
assert isinstance(private_key, bytes), f"Not correct type of private key, Expected bytes, Actual: {type(private_key)}"
|
||||
assert private_key == bytes(convert_bytes_format_127_to_256(PRIVATE_KEY)), \
|
||||
f"Not match private key, Expected: {PRIVATE_KEY}, Actual: {private_key}"
|
||||
|
||||
def test_get_private_key_empty_wif(self, client_cryptography: ClientCryptograpy):
|
||||
with pytest.raises(ValueError, match="Empty"):
|
||||
client_cryptography.key_extension.get_private_key_from_wif("")
|
||||
|
||||
def test_get_private_key_incorrect_wif(self, client_cryptography: ClientCryptograpy):
|
||||
with pytest.raises(ValueError, match="Invalid checksum"):
|
||||
client_cryptography.key_extension.get_private_key_from_wif("AAAAAAABBBBBBBBBBBCCCCCCCDDDDDDDDDDDDDDDDD")
|
||||
|
||||
def test_get_public_key(self, client_cryptography: ClientCryptograpy):
|
||||
private_key = client_cryptography.key_extension.get_private_key_from_wif(WIF)
|
||||
public_key = client_cryptography.key_extension.get_public_key(private_key)
|
||||
|
||||
assert len(public_key) == 33, f"Not correct len of public key, expected: 33 Actual: {len(private_key)} "
|
||||
assert isinstance(public_key, bytes), f"Not correct type of public key, Expected bytes, Actual: {type(private_key)}"
|
||||
|
||||
def test_get_public_key_empty(self, client_cryptography: ClientCryptograpy):
|
||||
with pytest.raises(ValueError, match="Empty"):
|
||||
client_cryptography.key_extension.get_public_key(b"")
|
||||
|
||||
def test_get_public_key_invalid_private_key(self, client_cryptography: ClientCryptograpy):
|
||||
with pytest.raises(ValueError):
|
||||
client_cryptography.key_extension.get_public_key(b"invalid_private_key")
|
55
tests/cryptography/test_sign_verify.py
Normal file
55
tests/cryptography/test_sign_verify.py
Normal file
|
@ -0,0 +1,55 @@
|
|||
import pytest
|
||||
|
||||
from tests.helpers.models import ClientCryptograpy
|
||||
from tests.helpers.resources import WIF, MESSAGE_IN_BYTES
|
||||
|
||||
|
||||
@pytest.mark.crypto
|
||||
class TestSignAndVerify:
|
||||
def test_sign_rfc6979_success(self, client_cryptography: ClientCryptograpy):
|
||||
private_key = client_cryptography.key_extension.get_private_key_from_wif(WIF)
|
||||
signature = client_cryptography.signer.sign_rfc6979(private_key, MESSAGE_IN_BYTES)
|
||||
assert len(signature) == 64, f"Incorrect len of signature, Expected: 64, Actual: {len(signature)}"
|
||||
assert isinstance(signature, bytes), f"Not correct type of signature, Expected bytes, Actual: {type(signature)}"
|
||||
|
||||
def test_sign_success(self, client_cryptography: ClientCryptograpy):
|
||||
private_key = client_cryptography.key_extension.get_private_key_from_wif(WIF)
|
||||
signature = client_cryptography.signer.sign(private_key, MESSAGE_IN_BYTES)
|
||||
|
||||
assert 0x04 == signature[0], "First byte is not expected node marker"
|
||||
assert isinstance(signature, bytes), f"Not correct type of signature, Expected bytes, Actual: {type(signature)}"
|
||||
|
||||
@pytest.mark.parametrize("message", [MESSAGE_IN_BYTES, b""])
|
||||
def test_verify_rfc6979_success(self, client_cryptography: ClientCryptograpy, message: bytes):
|
||||
private_key = client_cryptography.key_extension.get_private_key_from_wif(WIF)
|
||||
public_key = client_cryptography.key_extension.get_public_key(private_key)
|
||||
|
||||
signature = client_cryptography.signer.sign_rfc6979(private_key, message)
|
||||
assert client_cryptography.verifier.verify_rfc6979(public_key, message, signature) is True
|
||||
|
||||
@pytest.mark.parametrize("message", [MESSAGE_IN_BYTES, b""])
|
||||
def test_verify_success(self, client_cryptography: ClientCryptograpy, message: bytes):
|
||||
private_key = client_cryptography.key_extension.get_private_key_from_wif(WIF)
|
||||
public_key = client_cryptography.key_extension.get_public_key(private_key)
|
||||
|
||||
signature = client_cryptography.signer.sign(private_key, message)
|
||||
|
||||
# skip first byte, because it marker of version for node
|
||||
assert client_cryptography.verifier.verify(public_key, message, signature[1:]) is True
|
||||
|
||||
# = = = = = = = = = = = = = = = = = = = = = NEGATIVE cases = = = = = = = = = = = = = = = = = = = = =
|
||||
|
||||
def test_verify_empty_public_key(self, client_cryptography: ClientCryptograpy):
|
||||
with pytest.raises(ValueError, match="Incorrect public key"):
|
||||
client_cryptography.verifier.verify(b'', b"Test message", b"signature")
|
||||
|
||||
def test_verify_invalid_signature(self, client_cryptography: ClientCryptograpy):
|
||||
invalid_signature = b"invalid_signature"
|
||||
|
||||
private_key = client_cryptography.key_extension.get_private_key_from_wif(WIF)
|
||||
public_key = client_cryptography.key_extension.get_public_key(private_key)
|
||||
assert not client_cryptography.verifier.verify(public_key, MESSAGE_IN_BYTES, invalid_signature), "Verified invalid signature"
|
||||
|
||||
def test_sign_empty_key(self, client_cryptography: ClientCryptograpy):
|
||||
with pytest.raises(ValueError, match="Incorrect private key"):
|
||||
client_cryptography.signer.sign(b'', MESSAGE_IN_BYTES)
|
18
tests/helpers/convert.py
Normal file
18
tests/helpers/convert.py
Normal file
|
@ -0,0 +1,18 @@
|
|||
def convert_bytes_format_127_to_256(numbers: list[int]):
|
||||
new_list = []
|
||||
for num in numbers:
|
||||
if num > 0:
|
||||
new_list.append(num)
|
||||
else:
|
||||
new_list.append(num + 256)
|
||||
return new_list
|
||||
|
||||
|
||||
def convert_bytes_format_256_to_127(numbers: list[int]):
|
||||
new_list = []
|
||||
for num in numbers:
|
||||
if num < 128:
|
||||
new_list.append(num)
|
||||
else:
|
||||
new_list.append(num - 256)
|
||||
return new_list
|
10
tests/helpers/models.py
Normal file
10
tests/helpers/models.py
Normal file
|
@ -0,0 +1,10 @@
|
|||
from frostfs_api.cryptography.key_extension import KeyExtension
|
||||
from frostfs_api.cryptography.verifier import Verifier
|
||||
from frostfs_api.cryptography.signer import Signer
|
||||
|
||||
|
||||
class ClientCryptograpy:
|
||||
def __init__(self):
|
||||
self.key_extension = KeyExtension()
|
||||
self.signer = Signer()
|
||||
self.verifier = Verifier()
|
6
tests/helpers/resources.py
Normal file
6
tests/helpers/resources.py
Normal file
|
@ -0,0 +1,6 @@
|
|||
WIF = "L1YS4myg3xHPvi3FHeLaEt7G8upwJaWL5YLV7huviuUtXFpzBMqZ"
|
||||
PRIVATE_KEY = [
|
||||
-128, -5, 30, -36, -118, 85, -67, -6, 81, 43, 93, -38, 106, 21, -88, 127, 15, 125, -79, -17, -40, 77, -15,
|
||||
122, -88, 72, 109, -47, 125, -80, -40, -38
|
||||
]
|
||||
MESSAGE_IN_BYTES = b"any message in bytes format"
|
Loading…
Add table
Reference in a new issue