Updated script that can be controled by Nodejs web app

This commit is contained in:
mac OS
2024-11-25 12:24:18 +07:00
parent c440eda1f4
commit 8b0ab2bd3a
8662 changed files with 1803808 additions and 34 deletions

View File

@ -0,0 +1,121 @@
from typing import Dict, Optional, Tuple, Type, Union
import dns.name
from dns.dnssecalgs.base import GenericPrivateKey
from dns.dnssectypes import Algorithm
from dns.exception import UnsupportedAlgorithm
from dns.rdtypes.ANY.DNSKEY import DNSKEY
if dns._features.have("dnssec"):
from dns.dnssecalgs.dsa import PrivateDSA, PrivateDSANSEC3SHA1
from dns.dnssecalgs.ecdsa import PrivateECDSAP256SHA256, PrivateECDSAP384SHA384
from dns.dnssecalgs.eddsa import PrivateED448, PrivateED25519
from dns.dnssecalgs.rsa import (
PrivateRSAMD5,
PrivateRSASHA1,
PrivateRSASHA1NSEC3SHA1,
PrivateRSASHA256,
PrivateRSASHA512,
)
_have_cryptography = True
else:
_have_cryptography = False
AlgorithmPrefix = Optional[Union[bytes, dns.name.Name]]
algorithms: Dict[Tuple[Algorithm, AlgorithmPrefix], Type[GenericPrivateKey]] = {}
if _have_cryptography:
# pylint: disable=possibly-used-before-assignment
algorithms.update(
{
(Algorithm.RSAMD5, None): PrivateRSAMD5,
(Algorithm.DSA, None): PrivateDSA,
(Algorithm.RSASHA1, None): PrivateRSASHA1,
(Algorithm.DSANSEC3SHA1, None): PrivateDSANSEC3SHA1,
(Algorithm.RSASHA1NSEC3SHA1, None): PrivateRSASHA1NSEC3SHA1,
(Algorithm.RSASHA256, None): PrivateRSASHA256,
(Algorithm.RSASHA512, None): PrivateRSASHA512,
(Algorithm.ECDSAP256SHA256, None): PrivateECDSAP256SHA256,
(Algorithm.ECDSAP384SHA384, None): PrivateECDSAP384SHA384,
(Algorithm.ED25519, None): PrivateED25519,
(Algorithm.ED448, None): PrivateED448,
}
)
def get_algorithm_cls(
algorithm: Union[int, str], prefix: AlgorithmPrefix = None
) -> Type[GenericPrivateKey]:
"""Get Private Key class from Algorithm.
*algorithm*, a ``str`` or ``int`` specifying the DNSKEY algorithm.
Raises ``UnsupportedAlgorithm`` if the algorithm is unknown.
Returns a ``dns.dnssecalgs.GenericPrivateKey``
"""
algorithm = Algorithm.make(algorithm)
cls = algorithms.get((algorithm, prefix))
if cls:
return cls
raise UnsupportedAlgorithm(
f'algorithm "{Algorithm.to_text(algorithm)}" not supported by dnspython'
)
def get_algorithm_cls_from_dnskey(dnskey: DNSKEY) -> Type[GenericPrivateKey]:
"""Get Private Key class from DNSKEY.
*dnskey*, a ``DNSKEY`` to get Algorithm class for.
Raises ``UnsupportedAlgorithm`` if the algorithm is unknown.
Returns a ``dns.dnssecalgs.GenericPrivateKey``
"""
prefix: AlgorithmPrefix = None
if dnskey.algorithm == Algorithm.PRIVATEDNS:
prefix, _ = dns.name.from_wire(dnskey.key, 0)
elif dnskey.algorithm == Algorithm.PRIVATEOID:
length = int(dnskey.key[0])
prefix = dnskey.key[0 : length + 1]
return get_algorithm_cls(dnskey.algorithm, prefix)
def register_algorithm_cls(
algorithm: Union[int, str],
algorithm_cls: Type[GenericPrivateKey],
name: Optional[Union[dns.name.Name, str]] = None,
oid: Optional[bytes] = None,
) -> None:
"""Register Algorithm Private Key class.
*algorithm*, a ``str`` or ``int`` specifying the DNSKEY algorithm.
*algorithm_cls*: A `GenericPrivateKey` class.
*name*, an optional ``dns.name.Name`` or ``str``, for for PRIVATEDNS algorithms.
*oid*: an optional BER-encoded `bytes` for PRIVATEOID algorithms.
Raises ``ValueError`` if a name or oid is specified incorrectly.
"""
if not issubclass(algorithm_cls, GenericPrivateKey):
raise TypeError("Invalid algorithm class")
algorithm = Algorithm.make(algorithm)
prefix: AlgorithmPrefix = None
if algorithm == Algorithm.PRIVATEDNS:
if name is None:
raise ValueError("Name required for PRIVATEDNS algorithms")
if isinstance(name, str):
name = dns.name.from_text(name)
prefix = name
elif algorithm == Algorithm.PRIVATEOID:
if oid is None:
raise ValueError("OID required for PRIVATEOID algorithms")
prefix = bytes([len(oid)]) + oid
elif name:
raise ValueError("Name only supported for PRIVATEDNS algorithm")
elif oid:
raise ValueError("OID only supported for PRIVATEOID algorithm")
algorithms[(algorithm, prefix)] = algorithm_cls

View File

@ -0,0 +1,89 @@
from abc import ABC, abstractmethod # pylint: disable=no-name-in-module
from typing import Any, Optional, Type
import dns.rdataclass
import dns.rdatatype
from dns.dnssectypes import Algorithm
from dns.exception import AlgorithmKeyMismatch
from dns.rdtypes.ANY.DNSKEY import DNSKEY
from dns.rdtypes.dnskeybase import Flag
class GenericPublicKey(ABC):
algorithm: Algorithm
@abstractmethod
def __init__(self, key: Any) -> None:
pass
@abstractmethod
def verify(self, signature: bytes, data: bytes) -> None:
"""Verify signed DNSSEC data"""
@abstractmethod
def encode_key_bytes(self) -> bytes:
"""Encode key as bytes for DNSKEY"""
@classmethod
def _ensure_algorithm_key_combination(cls, key: DNSKEY) -> None:
if key.algorithm != cls.algorithm:
raise AlgorithmKeyMismatch
def to_dnskey(self, flags: int = Flag.ZONE, protocol: int = 3) -> DNSKEY:
"""Return public key as DNSKEY"""
return DNSKEY(
rdclass=dns.rdataclass.IN,
rdtype=dns.rdatatype.DNSKEY,
flags=flags,
protocol=protocol,
algorithm=self.algorithm,
key=self.encode_key_bytes(),
)
@classmethod
@abstractmethod
def from_dnskey(cls, key: DNSKEY) -> "GenericPublicKey":
"""Create public key from DNSKEY"""
@classmethod
@abstractmethod
def from_pem(cls, public_pem: bytes) -> "GenericPublicKey":
"""Create public key from PEM-encoded SubjectPublicKeyInfo as specified
in RFC 5280"""
@abstractmethod
def to_pem(self) -> bytes:
"""Return public-key as PEM-encoded SubjectPublicKeyInfo as specified
in RFC 5280"""
class GenericPrivateKey(ABC):
public_cls: Type[GenericPublicKey]
@abstractmethod
def __init__(self, key: Any) -> None:
pass
@abstractmethod
def sign(
self,
data: bytes,
verify: bool = False,
deterministic: bool = True,
) -> bytes:
"""Sign DNSSEC data"""
@abstractmethod
def public_key(self) -> "GenericPublicKey":
"""Return public key instance"""
@classmethod
@abstractmethod
def from_pem(
cls, private_pem: bytes, password: Optional[bytes] = None
) -> "GenericPrivateKey":
"""Create private key from PEM-encoded PKCS#8"""
@abstractmethod
def to_pem(self, password: Optional[bytes] = None) -> bytes:
"""Return private key as PEM-encoded PKCS#8"""

View File

@ -0,0 +1,68 @@
from typing import Any, Optional, Type
from cryptography.hazmat.primitives import serialization
from dns.dnssecalgs.base import GenericPrivateKey, GenericPublicKey
from dns.exception import AlgorithmKeyMismatch
class CryptographyPublicKey(GenericPublicKey):
key: Any = None
key_cls: Any = None
def __init__(self, key: Any) -> None: # pylint: disable=super-init-not-called
if self.key_cls is None:
raise TypeError("Undefined private key class")
if not isinstance( # pylint: disable=isinstance-second-argument-not-valid-type
key, self.key_cls
):
raise AlgorithmKeyMismatch
self.key = key
@classmethod
def from_pem(cls, public_pem: bytes) -> "GenericPublicKey":
key = serialization.load_pem_public_key(public_pem)
return cls(key=key)
def to_pem(self) -> bytes:
return self.key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
class CryptographyPrivateKey(GenericPrivateKey):
key: Any = None
key_cls: Any = None
public_cls: Type[CryptographyPublicKey]
def __init__(self, key: Any) -> None: # pylint: disable=super-init-not-called
if self.key_cls is None:
raise TypeError("Undefined private key class")
if not isinstance( # pylint: disable=isinstance-second-argument-not-valid-type
key, self.key_cls
):
raise AlgorithmKeyMismatch
self.key = key
def public_key(self) -> "CryptographyPublicKey":
return self.public_cls(key=self.key.public_key())
@classmethod
def from_pem(
cls, private_pem: bytes, password: Optional[bytes] = None
) -> "GenericPrivateKey":
key = serialization.load_pem_private_key(private_pem, password=password)
return cls(key=key)
def to_pem(self, password: Optional[bytes] = None) -> bytes:
encryption_algorithm: serialization.KeySerializationEncryption
if password:
encryption_algorithm = serialization.BestAvailableEncryption(password)
else:
encryption_algorithm = serialization.NoEncryption()
return self.key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=encryption_algorithm,
)

View File

@ -0,0 +1,106 @@
import struct
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import dsa, utils
from dns.dnssecalgs.cryptography import CryptographyPrivateKey, CryptographyPublicKey
from dns.dnssectypes import Algorithm
from dns.rdtypes.ANY.DNSKEY import DNSKEY
class PublicDSA(CryptographyPublicKey):
key: dsa.DSAPublicKey
key_cls = dsa.DSAPublicKey
algorithm = Algorithm.DSA
chosen_hash = hashes.SHA1()
def verify(self, signature: bytes, data: bytes) -> None:
sig_r = signature[1:21]
sig_s = signature[21:]
sig = utils.encode_dss_signature(
int.from_bytes(sig_r, "big"), int.from_bytes(sig_s, "big")
)
self.key.verify(sig, data, self.chosen_hash)
def encode_key_bytes(self) -> bytes:
"""Encode a public key per RFC 2536, section 2."""
pn = self.key.public_numbers()
dsa_t = (self.key.key_size // 8 - 64) // 8
if dsa_t > 8:
raise ValueError("unsupported DSA key size")
octets = 64 + dsa_t * 8
res = struct.pack("!B", dsa_t)
res += pn.parameter_numbers.q.to_bytes(20, "big")
res += pn.parameter_numbers.p.to_bytes(octets, "big")
res += pn.parameter_numbers.g.to_bytes(octets, "big")
res += pn.y.to_bytes(octets, "big")
return res
@classmethod
def from_dnskey(cls, key: DNSKEY) -> "PublicDSA":
cls._ensure_algorithm_key_combination(key)
keyptr = key.key
(t,) = struct.unpack("!B", keyptr[0:1])
keyptr = keyptr[1:]
octets = 64 + t * 8
dsa_q = keyptr[0:20]
keyptr = keyptr[20:]
dsa_p = keyptr[0:octets]
keyptr = keyptr[octets:]
dsa_g = keyptr[0:octets]
keyptr = keyptr[octets:]
dsa_y = keyptr[0:octets]
return cls(
key=dsa.DSAPublicNumbers( # type: ignore
int.from_bytes(dsa_y, "big"),
dsa.DSAParameterNumbers(
int.from_bytes(dsa_p, "big"),
int.from_bytes(dsa_q, "big"),
int.from_bytes(dsa_g, "big"),
),
).public_key(default_backend()),
)
class PrivateDSA(CryptographyPrivateKey):
key: dsa.DSAPrivateKey
key_cls = dsa.DSAPrivateKey
public_cls = PublicDSA
def sign(
self,
data: bytes,
verify: bool = False,
deterministic: bool = True,
) -> bytes:
"""Sign using a private key per RFC 2536, section 3."""
public_dsa_key = self.key.public_key()
if public_dsa_key.key_size > 1024:
raise ValueError("DSA key size overflow")
der_signature = self.key.sign(data, self.public_cls.chosen_hash)
dsa_r, dsa_s = utils.decode_dss_signature(der_signature)
dsa_t = (public_dsa_key.key_size // 8 - 64) // 8
octets = 20
signature = (
struct.pack("!B", dsa_t)
+ int.to_bytes(dsa_r, length=octets, byteorder="big")
+ int.to_bytes(dsa_s, length=octets, byteorder="big")
)
if verify:
self.public_key().verify(signature, data)
return signature
@classmethod
def generate(cls, key_size: int) -> "PrivateDSA":
return cls(
key=dsa.generate_private_key(key_size=key_size),
)
class PublicDSANSEC3SHA1(PublicDSA):
algorithm = Algorithm.DSANSEC3SHA1
class PrivateDSANSEC3SHA1(PrivateDSA):
public_cls = PublicDSANSEC3SHA1

View File

@ -0,0 +1,97 @@
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec, utils
from dns.dnssecalgs.cryptography import CryptographyPrivateKey, CryptographyPublicKey
from dns.dnssectypes import Algorithm
from dns.rdtypes.ANY.DNSKEY import DNSKEY
class PublicECDSA(CryptographyPublicKey):
key: ec.EllipticCurvePublicKey
key_cls = ec.EllipticCurvePublicKey
algorithm: Algorithm
chosen_hash: hashes.HashAlgorithm
curve: ec.EllipticCurve
octets: int
def verify(self, signature: bytes, data: bytes) -> None:
sig_r = signature[0 : self.octets]
sig_s = signature[self.octets :]
sig = utils.encode_dss_signature(
int.from_bytes(sig_r, "big"), int.from_bytes(sig_s, "big")
)
self.key.verify(sig, data, ec.ECDSA(self.chosen_hash))
def encode_key_bytes(self) -> bytes:
"""Encode a public key per RFC 6605, section 4."""
pn = self.key.public_numbers()
return pn.x.to_bytes(self.octets, "big") + pn.y.to_bytes(self.octets, "big")
@classmethod
def from_dnskey(cls, key: DNSKEY) -> "PublicECDSA":
cls._ensure_algorithm_key_combination(key)
ecdsa_x = key.key[0 : cls.octets]
ecdsa_y = key.key[cls.octets : cls.octets * 2]
return cls(
key=ec.EllipticCurvePublicNumbers(
curve=cls.curve,
x=int.from_bytes(ecdsa_x, "big"),
y=int.from_bytes(ecdsa_y, "big"),
).public_key(default_backend()),
)
class PrivateECDSA(CryptographyPrivateKey):
key: ec.EllipticCurvePrivateKey
key_cls = ec.EllipticCurvePrivateKey
public_cls = PublicECDSA
def sign(
self,
data: bytes,
verify: bool = False,
deterministic: bool = True,
) -> bytes:
"""Sign using a private key per RFC 6605, section 4."""
algorithm = ec.ECDSA(
self.public_cls.chosen_hash, deterministic_signing=deterministic
)
der_signature = self.key.sign(data, algorithm)
dsa_r, dsa_s = utils.decode_dss_signature(der_signature)
signature = int.to_bytes(
dsa_r, length=self.public_cls.octets, byteorder="big"
) + int.to_bytes(dsa_s, length=self.public_cls.octets, byteorder="big")
if verify:
self.public_key().verify(signature, data)
return signature
@classmethod
def generate(cls) -> "PrivateECDSA":
return cls(
key=ec.generate_private_key(
curve=cls.public_cls.curve, backend=default_backend()
),
)
class PublicECDSAP256SHA256(PublicECDSA):
algorithm = Algorithm.ECDSAP256SHA256
chosen_hash = hashes.SHA256()
curve = ec.SECP256R1()
octets = 32
class PrivateECDSAP256SHA256(PrivateECDSA):
public_cls = PublicECDSAP256SHA256
class PublicECDSAP384SHA384(PublicECDSA):
algorithm = Algorithm.ECDSAP384SHA384
chosen_hash = hashes.SHA384()
curve = ec.SECP384R1()
octets = 48
class PrivateECDSAP384SHA384(PrivateECDSA):
public_cls = PublicECDSAP384SHA384

View File

@ -0,0 +1,70 @@
from typing import Type
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import ed448, ed25519
from dns.dnssecalgs.cryptography import CryptographyPrivateKey, CryptographyPublicKey
from dns.dnssectypes import Algorithm
from dns.rdtypes.ANY.DNSKEY import DNSKEY
class PublicEDDSA(CryptographyPublicKey):
def verify(self, signature: bytes, data: bytes) -> None:
self.key.verify(signature, data)
def encode_key_bytes(self) -> bytes:
"""Encode a public key per RFC 8080, section 3."""
return self.key.public_bytes(
encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw
)
@classmethod
def from_dnskey(cls, key: DNSKEY) -> "PublicEDDSA":
cls._ensure_algorithm_key_combination(key)
return cls(
key=cls.key_cls.from_public_bytes(key.key),
)
class PrivateEDDSA(CryptographyPrivateKey):
public_cls: Type[PublicEDDSA]
def sign(
self,
data: bytes,
verify: bool = False,
deterministic: bool = True,
) -> bytes:
"""Sign using a private key per RFC 8080, section 4."""
signature = self.key.sign(data)
if verify:
self.public_key().verify(signature, data)
return signature
@classmethod
def generate(cls) -> "PrivateEDDSA":
return cls(key=cls.key_cls.generate())
class PublicED25519(PublicEDDSA):
key: ed25519.Ed25519PublicKey
key_cls = ed25519.Ed25519PublicKey
algorithm = Algorithm.ED25519
class PrivateED25519(PrivateEDDSA):
key: ed25519.Ed25519PrivateKey
key_cls = ed25519.Ed25519PrivateKey
public_cls = PublicED25519
class PublicED448(PublicEDDSA):
key: ed448.Ed448PublicKey
key_cls = ed448.Ed448PublicKey
algorithm = Algorithm.ED448
class PrivateED448(PrivateEDDSA):
key: ed448.Ed448PrivateKey
key_cls = ed448.Ed448PrivateKey
public_cls = PublicED448

View File

@ -0,0 +1,124 @@
import math
import struct
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from dns.dnssecalgs.cryptography import CryptographyPrivateKey, CryptographyPublicKey
from dns.dnssectypes import Algorithm
from dns.rdtypes.ANY.DNSKEY import DNSKEY
class PublicRSA(CryptographyPublicKey):
key: rsa.RSAPublicKey
key_cls = rsa.RSAPublicKey
algorithm: Algorithm
chosen_hash: hashes.HashAlgorithm
def verify(self, signature: bytes, data: bytes) -> None:
self.key.verify(signature, data, padding.PKCS1v15(), self.chosen_hash)
def encode_key_bytes(self) -> bytes:
"""Encode a public key per RFC 3110, section 2."""
pn = self.key.public_numbers()
_exp_len = math.ceil(int.bit_length(pn.e) / 8)
exp = int.to_bytes(pn.e, length=_exp_len, byteorder="big")
if _exp_len > 255:
exp_header = b"\0" + struct.pack("!H", _exp_len)
else:
exp_header = struct.pack("!B", _exp_len)
if pn.n.bit_length() < 512 or pn.n.bit_length() > 4096:
raise ValueError("unsupported RSA key length")
return exp_header + exp + pn.n.to_bytes((pn.n.bit_length() + 7) // 8, "big")
@classmethod
def from_dnskey(cls, key: DNSKEY) -> "PublicRSA":
cls._ensure_algorithm_key_combination(key)
keyptr = key.key
(bytes_,) = struct.unpack("!B", keyptr[0:1])
keyptr = keyptr[1:]
if bytes_ == 0:
(bytes_,) = struct.unpack("!H", keyptr[0:2])
keyptr = keyptr[2:]
rsa_e = keyptr[0:bytes_]
rsa_n = keyptr[bytes_:]
return cls(
key=rsa.RSAPublicNumbers(
int.from_bytes(rsa_e, "big"), int.from_bytes(rsa_n, "big")
).public_key(default_backend())
)
class PrivateRSA(CryptographyPrivateKey):
key: rsa.RSAPrivateKey
key_cls = rsa.RSAPrivateKey
public_cls = PublicRSA
default_public_exponent = 65537
def sign(
self,
data: bytes,
verify: bool = False,
deterministic: bool = True,
) -> bytes:
"""Sign using a private key per RFC 3110, section 3."""
signature = self.key.sign(data, padding.PKCS1v15(), self.public_cls.chosen_hash)
if verify:
self.public_key().verify(signature, data)
return signature
@classmethod
def generate(cls, key_size: int) -> "PrivateRSA":
return cls(
key=rsa.generate_private_key(
public_exponent=cls.default_public_exponent,
key_size=key_size,
backend=default_backend(),
)
)
class PublicRSAMD5(PublicRSA):
algorithm = Algorithm.RSAMD5
chosen_hash = hashes.MD5()
class PrivateRSAMD5(PrivateRSA):
public_cls = PublicRSAMD5
class PublicRSASHA1(PublicRSA):
algorithm = Algorithm.RSASHA1
chosen_hash = hashes.SHA1()
class PrivateRSASHA1(PrivateRSA):
public_cls = PublicRSASHA1
class PublicRSASHA1NSEC3SHA1(PublicRSA):
algorithm = Algorithm.RSASHA1NSEC3SHA1
chosen_hash = hashes.SHA1()
class PrivateRSASHA1NSEC3SHA1(PrivateRSA):
public_cls = PublicRSASHA1NSEC3SHA1
class PublicRSASHA256(PublicRSA):
algorithm = Algorithm.RSASHA256
chosen_hash = hashes.SHA256()
class PrivateRSASHA256(PrivateRSA):
public_cls = PublicRSASHA256
class PublicRSASHA512(PublicRSA):
algorithm = Algorithm.RSASHA512
chosen_hash = hashes.SHA512()
class PrivateRSASHA512(PrivateRSA):
public_cls = PublicRSASHA512