luau/tools/fuzz/fuzzer-postprocess.py
2023-12-01 20:40:43 -08:00

175 lines
6.4 KiB
Python

#!/usr/bin/python3
# This file is part of the Luau programming language and is licensed under MIT License; see LICENSE.txt for details
import argparse
import jinja2
import multiprocessing
import os
import shutil
import subprocess
import sys
def is_crash(reproducer_name: str) -> bool:
return reproducer_name.startswith("crash-") or reproducer_name.startswith("oom-")
class CrashReport:
def __init__(self, args, crash_id):
self.id = crash_id
self.args = args
self.crash_root = os.path.join(args.output_directory, crash_id)
def trace(self) -> str:
trace_path = os.path.join(self.crash_root, "trace.txt")
if os.path.exists(trace_path):
with open(os.path.join(self.crash_root, "trace.txt"), "r") as trace_file:
return trace_file.read()
else:
return None
def modules(self) -> str:
with open(os.path.join(self.crash_root, "modules.txt"), "r") as modules_file:
return modules_file.read()
def artifact_link(self) -> str:
return f"{self.args.artifact_root}/{self.id}/minimized_reproducer"
class MetaValue:
def __init__(self, name, value):
self.name = name
self.value = value
self.link = None
def minimize_crash(args, reproducer, workdir):
if not is_crash(os.path.basename(reproducer)):
# Not actually a crash, so no minimization is actually possible.
return
print(
f"Minimizing reproducer {os.path.basename(reproducer)} for {args.minimize_for} seconds.")
reproducer_absolute = os.path.abspath(reproducer)
artifact = os.path.join(workdir, "minimized_reproducer")
minimize_result = subprocess.run([args.executable, "-detect_leaks=0", "-minimize_crash=1",
f"-exact_artifact_path={artifact}", f"-max_total_time={args.minimize_for}", reproducer_absolute], cwd=workdir, stdout=sys.stdout if args.verbose else subprocess.DEVNULL, stderr=sys.stderr if args.verbose else subprocess.DEVNULL)
if minimize_result.returncode != 0:
print(
f"Minimize process exited with code {minimize_result.returncode}; minimization failed.")
return
if os.path.exists(artifact):
print(
f"Minimized {os.path.basename(reproducer)} from {os.path.getsize(reproducer)} bytes to {os.path.getsize(artifact)}.")
def process_crash(args, reproducer):
crash_id = os.path.basename(reproducer)
crash_output = os.path.join(args.output_directory, crash_id)
print(f"Processing reproducer {crash_id}.")
print(f"Output will be stored in {crash_output}.")
if os.path.exists(crash_output):
print(f"Contents of {crash_output} will be discarded.")
shutil.rmtree(crash_output, ignore_errors=True)
os.makedirs(crash_output)
shutil.copyfile(reproducer, os.path.join(crash_output, "original_reproducer"))
shutil.copyfile(reproducer, os.path.join(
crash_output, "minimized_reproducer"))
minimize_crash(args, reproducer, crash_output)
if is_crash(crash_id):
trace_result = subprocess.run([args.executable, os.path.join(
crash_output, "minimized_reproducer")], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
trace_text = trace_result.stdout
with open(os.path.join(crash_output, "trace.txt"), "w") as trace_file:
trace_file.write(trace_text)
modules_result = subprocess.run([args.prototest, os.path.join(
crash_output, "minimized_reproducer"), "-detect_leaks=0", "-verbosity=0"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
modules_text = modules_result.stdout
module_index_of = modules_text.index("Module")
modules_text = modules_text[module_index_of:]
with open(os.path.join(crash_output, "modules.txt"), "w") as modules_file:
modules_file.write(modules_text)
return CrashReport(args, crash_id)
def process_crashes(args):
crash_names = sorted(os.listdir(args.source_directory))
with multiprocessing.Pool(args.workers) as pool:
crashes = [(args, os.path.join(args.source_directory, c)) for c in crash_names]
crashes = pool.starmap(process_crash, crashes)
print(f"Processed {len(crashes)} crashes.")
return crashes
def generate_report(crashes, meta):
env = jinja2.Environment(
loader=jinja2.PackageLoader("fuzzer-postprocess"),
autoescape=jinja2.select_autoescape()
)
template = env.get_template("index.html")
with open("fuzz-report.html", "w") as report_file:
report_file.write(template.render(
crashes=crashes,
meta=meta,
))
def __main__():
parser = argparse.ArgumentParser()
parser.add_argument("--source_directory", required=True)
parser.add_argument("--output_directory", required=True)
parser.add_argument("--executable", required=True)
parser.add_argument("--prototest", required=True)
parser.add_argument("--minimize_for", required=True)
parser.add_argument("--artifact_root", required=True)
parser.add_argument("--verbose", "-v", action="store_true")
parser.add_argument("--workers", action="store", type=int, default=4)
meta_group = parser.add_argument_group(
"metadata", description="Report metadata to attach.")
meta_group.add_argument("--meta.values", nargs="*",
help="Any metadata to attach, in the form name=value. Multiple values may be specified.", dest="metadata_values", default=[])
meta_group.add_argument("--meta.urls", nargs="*",
help="URLs to attach to metadata, in the form name=url. Multiple values may be specified. A value must also be specified with --meta.values.", dest="metadata_urls", default=[])
args = parser.parse_args()
meta_values = dict()
for pair in args.metadata_values:
components = pair.split("=", 1)
name = components[0]
value = components[1]
meta_values[name] = MetaValue(name, value)
for pair in args.metadata_urls:
components = pair.split("=", 1)
name = components[0]
url = components[1]
if name in meta_values:
meta_values[name].link = url
else:
print(f"Metadata {name} has URL {url} but no value specified.")
meta_values = sorted(list(meta_values.values()), key=lambda x: x.name)
crashes = process_crashes(args)
generate_report(crashes, meta_values)
if __name__ == "__main__":
__main__()