Source code for cert_hero.cert_hero

"""Main module."""
from __future__ import annotations

import ssl
import socket

from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, date
from itertools import repeat
from json import dumps
from logging import getLogger
from typing import Iterable

from asn1crypto.x509 import Certificate
from asn1crypto.keys import PublicKeyInfo


### Utilities ###

LOG = getLogger('cert_hero')

KEY_MAP = {
    'country_name': 'Country',
    'locality_name': 'Locality',
    'organization_name': 'Organization',
    'organizational_unit_name': 'Organization Unit',
    'state_or_province_name': 'State/Province',
    'common_name': 'Common Name',
    # 'C': 'Country',
    # 'countryName': 'Country',
    # 'ST': 'State/Province',
    # 'stateOrProvinceName': 'State/Province',
    # 'L': 'Locality',
    # 'localityName': 'Locality',
    # 'O': 'Organization',
    # 'organizationName': 'Organization',
    # 'OU': 'Organization Unit',
    # 'organizationalUnitName': 'Organization Unit',
    # 'CN': 'Common Name',
    # 'commonName': 'Common Name',
}

_DEFAULT_USER_AGENT: str | None

try:
    from fake_useragent import FakeUserAgent
except ImportError:  # no such module (fake_useragent)
    _DEFAULT_USER_AGENT = 'python-requests/2.31.0'

    def get_user_agent() -> str:
        """Return the default *user agent*."""
        return _DEFAULT_USER_AGENT
else:  # module is available (fake_useragent)
    _DEFAULT_USER_AGENT = None

    _FAKE_UA = FakeUserAgent()

[docs] def get_user_agent() -> str: """Return a random *user agent* using the ``fake_useragent`` module.""" return _FAKE_UA.__getattr__('random')
[docs] def create_ssl_context() -> ssl.SSLContext: # upgrade the socket to SSL without checking the certificate # !!!! don't transfer any sensitive data over this socket !!!! ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE return ctx
[docs] def set_expired(certs: CertHero | dict[str, str | int | dict[str, str | bool]] | dict[str, CertHero] | dict[str, dict[str, str | int | dict[str, str | bool]]] | Iterable[CertHero] | Iterable[dict[str, str | int | dict[str, str | bool]]] | None, _date_from_iso_str=date.fromisoformat) -> None: """ Set or update the value for ``Validity > Expired`` (:type:`bool`) on each cert in a response from :func:`cert_please()` or :func:`certs_please()`, or a serialized version thereof (e.g. ``json.dumps`` > ``json.loads``). Example Usage:: >>> from cert_hero import cert_please, set_expired >>> cert = cert_please('google.com') >>> assert 'Expired' not in cert['Validity'] >>> set_expired(cert) >>> assert 'Expired' in cert['Validity'] """ if not certs: return # cert_please(): given a `CertHero` (or `CertHero`-like) object if 'Serial' in certs: certs = [certs] # certs_please(): given a mapping of `hostname` to `CertHero` (or `CertHero`-like) object elif values_fn := getattr(certs, 'values', None): certs = values_fn() today = datetime.utcnow().date() for _cert in certs: if _cert: if _validity := _cert.get('Validity'): # Use cached attribute `not_after_date` if available (CertHero), # else we calculate it on the fly in case of a `dict`. not_after_date: date = getattr(_cert, '_not_after_date', None) \ or _date_from_iso_str(_validity['Not After']) # Set the `Validity > Expired` value (bool) _validity['Expired'] = not_after_date < today
def _build_failed_cert(reason: str): """ Build a :class:`CertHero` object for a failed connection or response, usually in the case of an HTTP timeout or when the server does not have an SSL certificate. """ _cert = CertHero({'Cert Status': reason}) _cert._not_after_date = _cert._not_before_date = date.min return _cert def _key_algo(cert: Certificate) -> str: pub_key: PublicKeyInfo = cert.public_key # print(pub_key.native) return f'{pub_key.algorithm.upper()}-{pub_key.bit_size}' def _sig_algo(cert: Certificate) -> str: """ :return: A unicode string of "md2", "md5", "sha1", "sha224", "sha256", "sha384", "sha512", "sha512_224", "sha512_256" or "shake256" """ algorithm = cert['signature_algorithm']['algorithm'].native return algorithm.upper().replace('_', 'WITH', 1) ### Models ###
[docs] class CertHero(dict): """ :class:`CertHero` represents the (resolved) SSL certificate of a server or hostname; it subclasses from builtin :class:`dict`, so it is essentially the same as a :class:`dict` object with convenience methods and a more human-readable :meth:`__repr__` method, for example. This means that a :class:`CertHero` object is inherently JSON serializable: >>> import cert_hero, json >>> cert = cert_hero.CertHero({'key': 'value'}) >>> cert CertHero( { "key": "value" } ) >>> cert['key'] 'value' >>> json.dumps(cert) # or, easier: str(cert) '{"key": "value"}' """ _not_after_date: date _not_before_date: date
[docs] @classmethod def from_dict(cls, o: dict, _from_iso_format=date.fromisoformat): """Convert a serialized ``dict`` to a :class:`CertHero` object.""" obj = cls(o) if validity := o.get('Validity'): obj._not_after_date = _from_iso_format(validity['Not After']) obj._not_before_date = _from_iso_format(validity['Not Before']) return obj
@property def not_after_date(self) -> date: """The Cert *Not After* Date (e.g. Valid Until)""" return self._not_after_date @property def not_before_date(self) -> date: """The Cert *Not Before* Date (e.g. Valid From)""" return self._not_before_date def __repr__(self, indent=2): """ Return a human-readable string with the (prettified) JSON string value enclosed in brackets, e.g.: .. code:: text CertHero( { ... } ) """ initial_space = ' ' * indent json_string = f'\n{initial_space}'.join(dumps(self, indent=indent).splitlines()) return f'{self.__class__.__name__}(\n{initial_space}{json_string}\n)' __str__ = dumps
### Core functions ###
[docs] def cert_please(hostname: str, context: ssl.SSLContext = None, user_agent: str | None = _DEFAULT_USER_AGENT, default_encoding='latin-1', ) -> CertHero[str, str | int | dict[str, str | bool]] | None: """ Retrieve the SSL certificate for a given ``hostname`` - works even in the case of expired or self-signed certificates. Usage: >>> import cert_hero >>> cert = cert_hero.cert_please('google.com') >>> cert.not_after_date datetime.date(2023, 10, 28) >>> f'Cert is Valid Till: {cert.not_after_date.isoformat()}' 'Cert is Valid Till: 2023-10-28' >>> cert CertHero( { "Cert Status": "SUCCESS", "Serial": "753DD6FF20CB1B4510CB4C1EA27DA2EB", "Subject Name": { "Common Name": "*.google.com" }, "Issuer Name": { "Country": "US", "State/Province": "California", "Organization": "Zscaler Inc.", "Organization Unit": "Zscaler Inc.", "Common Name": "Zscaler Intermediate Root CA (zscalerthree.net) (t) " }, "Validity": { "Not After": "2023-10-28", "Not Before": "2023-10-14" }, "Wildcard": true, "Signature Algorithm": "SHA256WITHRSA", "Key Algorithm": "RSA-2048", "Subject Alt Names": [ "*.google.com", "*.appengine.google.com", "youtu.be", "*.youtube.com", ... ], "Location": "https://www.google.com/", "Status": 301 } ) >>> cert_hero.set_expired(cert) >>> cert['Validity'] {'Not After': '2023-10-28', 'Not Before': '2023-10-14', 'Expired': False} Rationale: The builtin Python module ``ssl`` can be used to retrieve a certificate from a server via ``getpeercert``, but it'll work only if the certificate of interest can be successfully verified (source_). If, for any reason, verification fails, like, for example, with expired or a `self-signed certificate`_, we'll get ``ssl.SSLCertVerificationError`` instead of the requested info. We can work around this by asking for the certificate in the binary form: getpeercert(binary_form=True) But now we have to convert it, and thus we can use a third party ``asn1crypto`` module, instead of the (bulkier) ``cryptography`` module. Additionally, if the host **redirects** the client to another URL, this info is captured in the ``Location`` and ``Status`` fields. .. _source: https://stackoverflow.com/a/74349032/10237506 .. _self-signed certificate: https://stackoverflow.com/a/68889470/10237506 :param hostname: Host (or server) to retrieve SSL Certificate for :param context: (Optional) Shared SSL Context :param user_agent: A custom *user agent* to use for the HTTP call to retrieve ``Location`` and ``Status``. Defaults to ``python-requests/{version}``, or a random *user agent* if the ``fake_useragent`` module is installed (via the ``fake-ua`` `extra <https://packaging.python.org/en/latest/tutorials/installing-packages/#installing-extras>`__). :param default_encoding: Encoding used to decode bytes for the HTTP call to retrieve ``Location`` and ``Status``. Defaults to ``latin-1`` (or ISO-8859-1). """ if context is None: context = create_ssl_context() # with socket.create_connection() try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(3) with context.wrap_socket( sock, server_hostname=hostname ) as wrap_socket: wrap_socket.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 ) wrap_socket.connect((hostname, 443)) # get certificate cert_bin: bytes = wrap_socket.getpeercert(True) # type: ignore # use custom `user_agent` if passed in, else: # * use a random "user agent", if the `fake_useragent` module is installed, # else use the default "user agent" (python-requests) if not user_agent: user_agent = get_user_agent() LOG.debug('User Agent: %s', user_agent) headers = ( f'GET / HTTP/1.0\r\n' f'Host: {hostname}\r\n' f'User-Agent: {user_agent}\r\n' 'Accept-Encoding: gzip, deflate\r\n' 'Accept: */*\r\n' '\r\n' ) # print("\n\n" + headers) wrap_socket.send(headers.encode()) # send request data = bytes() while True: this_data = wrap_socket.recv(512) if not this_data: break data += this_data # Latin-1 (or ISO-8859-1) is a safe default: it will always # decode any bytes (though the result may not be useful). response = data.decode(default_encoding) # Get the first line (the "status line") # Ref: https://developer.mozilla.org/en-US/docs/Web/HTTP/Messages status_line = response.split('\n', 1)[0] # HTTP/1.1 301 Moved Permanently try: status_code = int(status_line.split(' ', 2)[1]) except (ValueError, TypeError): status_code = None # print(response) # print receive response loc = None if (loc_start := response.find('\nLocation: ')) != -1: loc = response[loc_start + 11:].split('\r\n', maxsplit=1)[ 0 ] except socket.gaierror as e: # curl: (6) Could not resolve host: <hostname> if e.errno == 8: # [Errno 8] nodename nor servname provided, or not known LOG.error(f'gaierror: could not resolve host. {hostname=}') ... else: LOG.error(f'{e.__class__.__name__}: {e}. {hostname=}') return None except ssl.SSLEOFError: # SSL/TLS connection terminated abruptly. # message: "EOF occurred in violation of protocol" # this could indicate bad cert or website is down LOG.error(f'SSLEOFError: bad cert. {hostname=}') return None except ssl.SSLError as e: # LOG.error(f'{e.__class__.__name__}: {e}. {hostname=}') return None # except socket.error as e: # print(f'{e.__class__.__name__}: Error for {hostname}: {e}') # return None except Exception as e: LOG.error(f'{e.__class__.__name__}: General Error - {e}. {hostname=}') return None else: _cert: Certificate = Certificate.load(cert_bin) # print(_cert) # print(dumps(_cert.native, default=str)) # print(_cert.self_signed) # print(dict(_cert.subject.native)) # print(dict(_cert.issuer.native)) # pprint(_cert.native) # print(_cert.subject_alt_name_value.native) cert_info = CertHero( { 'Cert Status': 'SUCCESS', 'Serial': format(_cert.serial_number, 'X'), 'Subject Name': ( subject := { KEY_MAP.get(k, k): v for k, v in _cert.subject.native.items() } ), 'Issuer Name': { KEY_MAP.get(k, k): v for k, v in _cert.issuer.native.items() }, 'Validity': { 'Not After': ( not_after_date := _cert.not_valid_after.date() ).isoformat(), 'Not Before': ( not_before_date := _cert.not_valid_before.date() ).isoformat(), }, 'Wildcard': subject.get('Common Name', '').startswith('*'), 'Signature Algorithm': _sig_algo(_cert), 'Key Algorithm': _key_algo(_cert), } ) cert_info._not_after_date = not_after_date cert_info._not_before_date = not_before_date if subj_alt_names := _cert.subject_alt_name_value.native: cert_info['Subject Alt Names'] = subj_alt_names if loc: cert_info['Location'] = loc if status_code: cert_info['Status'] = status_code return cert_info
[docs] def certs_please( hostnames: list[str] | tuple[str] | set[str], context: ssl.SSLContext = None, num_threads: int = 25, user_agent: str | None = _DEFAULT_USER_AGENT, ) -> dict[str, CertHero]: """ Retrieve (concurrently) the SSL certificate(s) for a list of ``hostnames`` - works even in the case of expired or self-signed certificates. Usage: >>> import cert_hero, json >>> host_to_cert = cert_hero.certs_please(['google.com', 'cnn.com', 'www.yahoo.co.in', 'youtu.be']) >>> cert_hero.set_expired(host_to_cert) >>> host_to_cert {'google.com': CertHero( { "Cert Status": "SUCCESS", "Serial": "753DD6FF20CB1B4510CB4C1EA27DA2EB", ... } ), 'cnn.com': CertHero( { "Cert Status": "SUCCESS", "Serial": "7F2F3E5C350554D71A6784CCFE6E8315", ... } ), ... } >>> json.dumps(host_to_cert) {"google.com": {"Cert Status": "SUCCESS", ...}, "cnn.com": {"Cert Status": "SUCCESS", ...}, ...} :param hostnames: List of hosts to retrieve SSL Certificate(s) for :param context: (Optional) Shared SSL Context :param num_threads: Max number of concurrent threads :param user_agent: A custom *user agent* to use for the HTTP call to retrieve ``Location`` and ``Status``. Defaults to ``python-requests/{version}``, or a random *user agent* if the ``fake_useragent`` module is installed (via the ``fake-ua`` `extra <https://packaging.python.org/en/latest/tutorials/installing-packages/#installing-extras>`__). :return: A mapping of ``hostname`` to the SSL Certificate (e.g. :class:`CertHero`) for that host """ if context is None: context = create_ssl_context() if num_hosts := len(hostnames): # We can use a with statement to ensure threads are cleaned up promptly with ThreadPoolExecutor( max_workers=min(num_hosts, num_threads) ) as pool: _host_to_cert = { # TODO: Update to remove `or` once we finalize how to handle missing certs host: cert_info or _build_failed_cert('TIMED_OUT') for host, cert_info in zip( hostnames, pool.map( cert_please, hostnames, repeat(context), repeat(user_agent), ), ) } else: _host_to_cert = {} return _host_to_cert