#!/usr/bin/env python # # ESP32 x509 certificate bundle generation utility # # Converts PEM and DER certificates to a custom bundle format which stores just the # subject name and public key to reduce space # # The bundle will have the format: number of certificates; crt 1 subject name length; crt 1 public key length; # crt 1 subject name; crt 1 public key; crt 2... # # Copyright 2018-2019 Espressif Systems (Shanghai) PTE LTD # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from __future__ import with_statement import argparse import csv import os import re import struct import sys from io import open try: from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization except ImportError: print( "The cryptography package is not installed." "Please refer to the Get Started section of the ESP-IDF Programming Guide for " "setting up the required packages." ) raise ca_bundle_bin_file = "x509_crt_bundle" quiet = False def status(msg): """Print status message to stderr""" if not quiet: critical(msg) def critical(msg): """Print critical message to stderr""" sys.stderr.write("gen_crt_bundle.py: ") sys.stderr.write(msg) sys.stderr.write("\n") class CertificateBundle: def __init__(self): self.certificates = [] self.compressed_crts = [] if os.path.isfile(ca_bundle_bin_file): os.remove(ca_bundle_bin_file) def add_from_path(self, crts_path): found = False for file_path in os.listdir(crts_path): found |= self.add_from_file(os.path.join(crts_path, file_path)) if found is False: raise InputError("No valid x509 certificates found in %s" % crts_path) def add_from_file(self, file_path): try: if file_path.endswith(".pem"): status("Parsing certificates from %s" % file_path) with open(file_path, "r", encoding="utf-8") as f: crt_str = f.read() self.add_from_pem(crt_str) return True elif file_path.endswith(".der"): status("Parsing certificates from %s" % file_path) with open(file_path, "rb") as f: crt_str = f.read() self.add_from_der(crt_str) return True except ValueError: critical("Invalid certificate in %s" % file_path) raise InputError("Invalid certificate") return False def add_from_pem(self, crt_str): """A single PEM file may have multiple certificates""" crt = "" count = 0 start = False for strg in crt_str.splitlines(True): if strg == "-----BEGIN CERTIFICATE-----\n" and start is False: crt = "" start = True elif strg == "-----END CERTIFICATE-----\n" and start is True: crt += strg + "\n" start = False self.certificates.append( x509.load_pem_x509_certificate(crt.encode(), default_backend()) ) count += 1 if start is True: crt += strg if count == 0: raise InputError("No certificate found") status("Successfully added %d certificates" % count) def add_from_der(self, crt_str): self.certificates.append(x509.load_der_x509_certificate(crt_str, default_backend())) status("Successfully added 1 certificate") def create_bundle(self): # Sort certificates in order to do binary search when looking up certificates self.certificates = sorted( self.certificates, key=lambda cert: cert.subject.public_bytes(default_backend()) ) bundle = struct.pack(">H", len(self.certificates)) for crt in self.certificates: """Read the public key as DER format""" pub_key = crt.public_key() pub_key_der = pub_key.public_bytes( serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo ) """ Read the subject name as DER format """ sub_name_der = crt.subject.public_bytes(default_backend()) name_len = len(sub_name_der) key_len = len(pub_key_der) len_data = struct.pack(">HH", name_len, key_len) bundle += len_data bundle += sub_name_der bundle += pub_key_der return bundle def add_with_filter(self, crts_path, filter_path): filter_set = set() with open(filter_path, "r", encoding="utf-8") as f: csv_reader = csv.reader(f, delimiter=",") # Skip header next(csv_reader) for row in csv_reader: filter_set.add(row[1]) status("Parsing certificates from %s" % crts_path) crt_str = [] with open(crts_path, "r", encoding="utf-8") as f: crt_str = f.read() # Split all certs into a list of (name, certificate string) tuples pem_crts = re.findall( r"(^.+?)\n(=+\n[\s\S]+?END CERTIFICATE-----\n)", crt_str, re.MULTILINE ) filtered_crts = "" for name, crt in pem_crts: if name in filter_set: filtered_crts += crt self.add_from_pem(filtered_crts) class InputError(RuntimeError): def __init__(self, e): super(InputError, self).__init__(e) def main(): global quiet parser = argparse.ArgumentParser(description="ESP-IDF x509 certificate bundle utility") parser.add_argument( "--quiet", "-q", help="Don't print non-critical status messages to stderr", action="store_true", ) parser.add_argument( "--input", "-i", nargs="+", required=True, help="Paths to the custom certificate folders or files to parse, parses all .pem or .der files", ) parser.add_argument( "--filter", "-f", help="Path to CSV-file where the second columns contains the name of the certificates \ that should be included from cacrt_all.pem", ) args = parser.parse_args() quiet = args.quiet bundle = CertificateBundle() for path in args.input: if os.path.isfile(path): if os.path.basename(path) == "cacrt_all.pem" and args.filter: bundle.add_with_filter(path, args.filter) else: bundle.add_from_file(path) elif os.path.isdir(path): bundle.add_from_path(path) else: raise InputError("Invalid --input=%s, is neither file nor folder" % args.input) status("Successfully added %d certificates in total" % len(bundle.certificates)) crt_bundle = bundle.create_bundle() with open(ca_bundle_bin_file, "wb") as f: f.write(crt_bundle) if __name__ == "__main__": try: main() except InputError as e: print(e) sys.exit(2)