diff --git a/tools/gen_crt_bundle.py b/tools/gen_crt_bundle.py index 87e29e61fa..94855a067c 100755 --- a/tools/gen_crt_bundle.py +++ b/tools/gen_crt_bundle.py @@ -14,7 +14,7 @@ # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # -# http:#www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, @@ -37,27 +37,29 @@ try: from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization except ImportError: - print('The cryptography package is not installed.' - 'Please refer to the Get Started section of the ESP-IDF Programming Guide for ' - 'setting up the required packages.') + print( + "The cryptography package is not installed." + "Please refer to the Get Started section of the ESP-IDF Programming Guide for " + "setting up the required packages." + ) raise -ca_bundle_bin_file = 'x509_crt_bundle' +ca_bundle_bin_file = "x509_crt_bundle" quiet = False def status(msg): - """ Print status message to stderr """ + """Print status message to stderr""" if not quiet: critical(msg) def critical(msg): - """ Print critical message to stderr """ - sys.stderr.write('gen_crt_bundle.py: ') + """Print critical message to stderr""" + sys.stderr.write("gen_crt_bundle.py: ") sys.stderr.write(msg) - sys.stderr.write('\n') + sys.stderr.write("\n") class CertificateBundle: @@ -75,75 +77,81 @@ class CertificateBundle: found |= self.add_from_file(os.path.join(crts_path, file_path)) if found is False: - raise InputError('No valid x509 certificates found in %s' % crts_path) + raise InputError("No valid x509 certificates found in %s" % crts_path) def add_from_file(self, file_path): try: - if file_path.endswith('.pem'): - status('Parsing certificates from %s' % file_path) - with open(file_path, 'r', encoding='utf-8') as f: + if file_path.endswith(".pem"): + status("Parsing certificates from %s" % file_path) + with open(file_path, "r", encoding="utf-8") as f: crt_str = f.read() self.add_from_pem(crt_str) return True - elif file_path.endswith('.der'): - status('Parsing certificates from %s' % file_path) - with open(file_path, 'rb') as f: + elif file_path.endswith(".der"): + status("Parsing certificates from %s" % file_path) + with open(file_path, "rb") as f: crt_str = f.read() self.add_from_der(crt_str) return True except ValueError: - critical('Invalid certificate in %s' % file_path) - raise InputError('Invalid certificate') + critical("Invalid certificate in %s" % file_path) + raise InputError("Invalid certificate") return False def add_from_pem(self, crt_str): - """ A single PEM file may have multiple certificates """ + """A single PEM file may have multiple certificates""" - crt = '' + crt = "" count = 0 start = False for strg in crt_str.splitlines(True): - if strg == '-----BEGIN CERTIFICATE-----\n' and start is False: - crt = '' + if strg == "-----BEGIN CERTIFICATE-----\n" and start is False: + crt = "" start = True - elif strg == '-----END CERTIFICATE-----\n' and start is True: - crt += strg + '\n' + elif strg == "-----END CERTIFICATE-----\n" and start is True: + crt += strg + "\n" start = False - self.certificates.append(x509.load_pem_x509_certificate(crt.encode(), default_backend())) + self.certificates.append( + x509.load_pem_x509_certificate(crt.encode(), default_backend()) + ) count += 1 if start is True: crt += strg - if(count == 0): - raise InputError('No certificate found') + if count == 0: + raise InputError("No certificate found") - status('Successfully added %d certificates' % count) + status("Successfully added %d certificates" % count) def add_from_der(self, crt_str): self.certificates.append(x509.load_der_x509_certificate(crt_str, default_backend())) - status('Successfully added 1 certificate') + status("Successfully added 1 certificate") def create_bundle(self): # Sort certificates in order to do binary search when looking up certificates - self.certificates = sorted(self.certificates, key=lambda cert: cert.subject.public_bytes(default_backend())) + self.certificates = sorted( + self.certificates, key=lambda cert: cert.subject.public_bytes(default_backend()) + ) - bundle = struct.pack('>H', len(self.certificates)) + bundle = struct.pack(">H", len(self.certificates)) for crt in self.certificates: - """ Read the public key as DER format """ + """Read the public key as DER format""" pub_key = crt.public_key() - pub_key_der = pub_key.public_bytes(serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo) + pub_key_der = pub_key.public_bytes( + serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo + ) """ Read the subject name as DER format """ sub_name_der = crt.subject.public_bytes(default_backend()) name_len = len(sub_name_der) key_len = len(pub_key_der) - len_data = struct.pack('>HH', name_len, key_len) + len_data = struct.pack(">HH", name_len, key_len) bundle += len_data bundle += sub_name_der @@ -154,23 +162,25 @@ class CertificateBundle: def add_with_filter(self, crts_path, filter_path): filter_set = set() - with open(filter_path, 'r', encoding='utf-8') as f: - csv_reader = csv.reader(f, delimiter=',') + with open(filter_path, "r", encoding="utf-8") as f: + csv_reader = csv.reader(f, delimiter=",") # Skip header next(csv_reader) for row in csv_reader: filter_set.add(row[1]) - status('Parsing certificates from %s' % crts_path) + status("Parsing certificates from %s" % crts_path) crt_str = [] - with open(crts_path, 'r', encoding='utf-8') as f: + with open(crts_path, "r", encoding="utf-8") as f: crt_str = f.read() # Split all certs into a list of (name, certificate string) tuples - pem_crts = re.findall(r'(^.+?)\n(=+\n[\s\S]+?END CERTIFICATE-----\n)', crt_str, re.MULTILINE) + pem_crts = re.findall( + r"(^.+?)\n(=+\n[\s\S]+?END CERTIFICATE-----\n)", crt_str, re.MULTILINE + ) - filtered_crts = '' + filtered_crts = "" for name, crt in pem_crts: if name in filter_set: filtered_crts += crt @@ -186,13 +196,27 @@ class InputError(RuntimeError): def main(): global quiet - parser = argparse.ArgumentParser(description='ESP-IDF x509 certificate bundle utility') + parser = argparse.ArgumentParser(description="ESP-IDF x509 certificate bundle utility") - parser.add_argument('--quiet', '-q', help="Don't print non-critical status messages to stderr", action='store_true') - parser.add_argument('--input', '-i', nargs='+', required=True, - help='Paths to the custom certificate folders or files to parse, parses all .pem or .der files') - parser.add_argument('--filter', '-f', help='Path to CSV-file where the second columns contains the name of the certificates \ - that should be included from cacrt_all.pem') + parser.add_argument( + "--quiet", + "-q", + help="Don't print non-critical status messages to stderr", + action="store_true", + ) + parser.add_argument( + "--input", + "-i", + nargs="+", + required=True, + help="Paths to the custom certificate folders or files to parse, parses all .pem or .der files", + ) + parser.add_argument( + "--filter", + "-f", + help="Path to CSV-file where the second columns contains the name of the certificates \ + that should be included from cacrt_all.pem", + ) args = parser.parse_args() @@ -202,24 +226,24 @@ def main(): for path in args.input: if os.path.isfile(path): - if os.path.basename(path) == 'cacrt_all.pem' and args.filter: + if os.path.basename(path) == "cacrt_all.pem" and args.filter: bundle.add_with_filter(path, args.filter) else: bundle.add_from_file(path) elif os.path.isdir(path): bundle.add_from_path(path) else: - raise InputError('Invalid --input=%s, is neither file nor folder' % args.input) + raise InputError("Invalid --input=%s, is neither file nor folder" % args.input) - status('Successfully added %d certificates in total' % len(bundle.certificates)) + status("Successfully added %d certificates in total" % len(bundle.certificates)) crt_bundle = bundle.create_bundle() - with open(ca_bundle_bin_file, 'wb') as f: + with open(ca_bundle_bin_file, "wb") as f: f.write(crt_bundle) -if __name__ == '__main__': +if __name__ == "__main__": try: main() except InputError as e: