black and fix doc typo

This commit is contained in:
Jeff Epler 2022-10-05 13:06:23 -05:00
parent b1ce1d05d7
commit b47d1d777b
No known key found for this signature in database
GPG Key ID: D5BF15AB975AB4DE

View File

@ -14,7 +14,7 @@
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
# You may obtain a copy of the License at # You may obtain a copy of the License at
# #
# http:#www.apache.org/licenses/LICENSE-2.0 # http://www.apache.org/licenses/LICENSE-2.0
# #
# Unless required by applicable law or agreed to in writing, software # Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, # distributed under the License is distributed on an "AS IS" BASIS,
@ -37,27 +37,29 @@ try:
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
except ImportError: except ImportError:
print('The cryptography package is not installed.' print(
'Please refer to the Get Started section of the ESP-IDF Programming Guide for ' "The cryptography package is not installed."
'setting up the required packages.') "Please refer to the Get Started section of the ESP-IDF Programming Guide for "
"setting up the required packages."
)
raise raise
ca_bundle_bin_file = 'x509_crt_bundle' ca_bundle_bin_file = "x509_crt_bundle"
quiet = False quiet = False
def status(msg): def status(msg):
""" Print status message to stderr """ """Print status message to stderr"""
if not quiet: if not quiet:
critical(msg) critical(msg)
def critical(msg): def critical(msg):
""" Print critical message to stderr """ """Print critical message to stderr"""
sys.stderr.write('gen_crt_bundle.py: ') sys.stderr.write("gen_crt_bundle.py: ")
sys.stderr.write(msg) sys.stderr.write(msg)
sys.stderr.write('\n') sys.stderr.write("\n")
class CertificateBundle: class CertificateBundle:
@ -75,75 +77,81 @@ class CertificateBundle:
found |= self.add_from_file(os.path.join(crts_path, file_path)) found |= self.add_from_file(os.path.join(crts_path, file_path))
if found is False: if found is False:
raise InputError('No valid x509 certificates found in %s' % crts_path) raise InputError("No valid x509 certificates found in %s" % crts_path)
def add_from_file(self, file_path): def add_from_file(self, file_path):
try: try:
if file_path.endswith('.pem'): if file_path.endswith(".pem"):
status('Parsing certificates from %s' % file_path) status("Parsing certificates from %s" % file_path)
with open(file_path, 'r', encoding='utf-8') as f: with open(file_path, "r", encoding="utf-8") as f:
crt_str = f.read() crt_str = f.read()
self.add_from_pem(crt_str) self.add_from_pem(crt_str)
return True return True
elif file_path.endswith('.der'): elif file_path.endswith(".der"):
status('Parsing certificates from %s' % file_path) status("Parsing certificates from %s" % file_path)
with open(file_path, 'rb') as f: with open(file_path, "rb") as f:
crt_str = f.read() crt_str = f.read()
self.add_from_der(crt_str) self.add_from_der(crt_str)
return True return True
except ValueError: except ValueError:
critical('Invalid certificate in %s' % file_path) critical("Invalid certificate in %s" % file_path)
raise InputError('Invalid certificate') raise InputError("Invalid certificate")
return False return False
def add_from_pem(self, crt_str): def add_from_pem(self, crt_str):
""" A single PEM file may have multiple certificates """ """A single PEM file may have multiple certificates"""
crt = '' crt = ""
count = 0 count = 0
start = False start = False
for strg in crt_str.splitlines(True): for strg in crt_str.splitlines(True):
if strg == '-----BEGIN CERTIFICATE-----\n' and start is False: if strg == "-----BEGIN CERTIFICATE-----\n" and start is False:
crt = '' crt = ""
start = True start = True
elif strg == '-----END CERTIFICATE-----\n' and start is True: elif strg == "-----END CERTIFICATE-----\n" and start is True:
crt += strg + '\n' crt += strg + "\n"
start = False start = False
self.certificates.append(x509.load_pem_x509_certificate(crt.encode(), default_backend())) self.certificates.append(
x509.load_pem_x509_certificate(crt.encode(), default_backend())
)
count += 1 count += 1
if start is True: if start is True:
crt += strg crt += strg
if(count == 0): if count == 0:
raise InputError('No certificate found') raise InputError("No certificate found")
status('Successfully added %d certificates' % count) status("Successfully added %d certificates" % count)
def add_from_der(self, crt_str): def add_from_der(self, crt_str):
self.certificates.append(x509.load_der_x509_certificate(crt_str, default_backend())) self.certificates.append(x509.load_der_x509_certificate(crt_str, default_backend()))
status('Successfully added 1 certificate') status("Successfully added 1 certificate")
def create_bundle(self): def create_bundle(self):
# Sort certificates in order to do binary search when looking up certificates # Sort certificates in order to do binary search when looking up certificates
self.certificates = sorted(self.certificates, key=lambda cert: cert.subject.public_bytes(default_backend())) self.certificates = sorted(
self.certificates, key=lambda cert: cert.subject.public_bytes(default_backend())
)
bundle = struct.pack('>H', len(self.certificates)) bundle = struct.pack(">H", len(self.certificates))
for crt in self.certificates: for crt in self.certificates:
""" Read the public key as DER format """ """Read the public key as DER format"""
pub_key = crt.public_key() pub_key = crt.public_key()
pub_key_der = pub_key.public_bytes(serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo) pub_key_der = pub_key.public_bytes(
serialization.Encoding.DER, serialization.PublicFormat.SubjectPublicKeyInfo
)
""" Read the subject name as DER format """ """ Read the subject name as DER format """
sub_name_der = crt.subject.public_bytes(default_backend()) sub_name_der = crt.subject.public_bytes(default_backend())
name_len = len(sub_name_der) name_len = len(sub_name_der)
key_len = len(pub_key_der) key_len = len(pub_key_der)
len_data = struct.pack('>HH', name_len, key_len) len_data = struct.pack(">HH", name_len, key_len)
bundle += len_data bundle += len_data
bundle += sub_name_der bundle += sub_name_der
@ -154,23 +162,25 @@ class CertificateBundle:
def add_with_filter(self, crts_path, filter_path): def add_with_filter(self, crts_path, filter_path):
filter_set = set() filter_set = set()
with open(filter_path, 'r', encoding='utf-8') as f: with open(filter_path, "r", encoding="utf-8") as f:
csv_reader = csv.reader(f, delimiter=',') csv_reader = csv.reader(f, delimiter=",")
# Skip header # Skip header
next(csv_reader) next(csv_reader)
for row in csv_reader: for row in csv_reader:
filter_set.add(row[1]) filter_set.add(row[1])
status('Parsing certificates from %s' % crts_path) status("Parsing certificates from %s" % crts_path)
crt_str = [] crt_str = []
with open(crts_path, 'r', encoding='utf-8') as f: with open(crts_path, "r", encoding="utf-8") as f:
crt_str = f.read() crt_str = f.read()
# Split all certs into a list of (name, certificate string) tuples # Split all certs into a list of (name, certificate string) tuples
pem_crts = re.findall(r'(^.+?)\n(=+\n[\s\S]+?END CERTIFICATE-----\n)', crt_str, re.MULTILINE) pem_crts = re.findall(
r"(^.+?)\n(=+\n[\s\S]+?END CERTIFICATE-----\n)", crt_str, re.MULTILINE
)
filtered_crts = '' filtered_crts = ""
for name, crt in pem_crts: for name, crt in pem_crts:
if name in filter_set: if name in filter_set:
filtered_crts += crt filtered_crts += crt
@ -186,13 +196,27 @@ class InputError(RuntimeError):
def main(): def main():
global quiet global quiet
parser = argparse.ArgumentParser(description='ESP-IDF x509 certificate bundle utility') parser = argparse.ArgumentParser(description="ESP-IDF x509 certificate bundle utility")
parser.add_argument('--quiet', '-q', help="Don't print non-critical status messages to stderr", action='store_true') parser.add_argument(
parser.add_argument('--input', '-i', nargs='+', required=True, "--quiet",
help='Paths to the custom certificate folders or files to parse, parses all .pem or .der files') "-q",
parser.add_argument('--filter', '-f', help='Path to CSV-file where the second columns contains the name of the certificates \ help="Don't print non-critical status messages to stderr",
that should be included from cacrt_all.pem') action="store_true",
)
parser.add_argument(
"--input",
"-i",
nargs="+",
required=True,
help="Paths to the custom certificate folders or files to parse, parses all .pem or .der files",
)
parser.add_argument(
"--filter",
"-f",
help="Path to CSV-file where the second columns contains the name of the certificates \
that should be included from cacrt_all.pem",
)
args = parser.parse_args() args = parser.parse_args()
@ -202,24 +226,24 @@ def main():
for path in args.input: for path in args.input:
if os.path.isfile(path): if os.path.isfile(path):
if os.path.basename(path) == 'cacrt_all.pem' and args.filter: if os.path.basename(path) == "cacrt_all.pem" and args.filter:
bundle.add_with_filter(path, args.filter) bundle.add_with_filter(path, args.filter)
else: else:
bundle.add_from_file(path) bundle.add_from_file(path)
elif os.path.isdir(path): elif os.path.isdir(path):
bundle.add_from_path(path) bundle.add_from_path(path)
else: else:
raise InputError('Invalid --input=%s, is neither file nor folder' % args.input) raise InputError("Invalid --input=%s, is neither file nor folder" % args.input)
status('Successfully added %d certificates in total' % len(bundle.certificates)) status("Successfully added %d certificates in total" % len(bundle.certificates))
crt_bundle = bundle.create_bundle() crt_bundle = bundle.create_bundle()
with open(ca_bundle_bin_file, 'wb') as f: with open(ca_bundle_bin_file, "wb") as f:
f.write(crt_bundle) f.write(crt_bundle)
if __name__ == '__main__': if __name__ == "__main__":
try: try:
main() main()
except InputError as e: except InputError as e: