import importlib.resources import shutil from collections.abc import Mapping from pathlib import Path from subprocess import run def get_mc_path(build): return build / "x264_src" / "common" / "mc.c" class MCBuilder: @staticmethod def get_src(): """ Get the source code of the motion compensation (mc.c) file. This static method reads and returns the content of the mc.c file, which is part of the x264 benchmark package. The file is accessed using importlib.resources to ensure proper resource handling. Returns: str: The complete source code of the mc.c file as a string. Raises: AssertionError: If the function is called from a module that is not part of a package. Example: >>> src = MCBuilder.get_src() >>> isinstance(src, str) True >>> "mc_weight" in src True >>> len(src) > 0 True """ assert __package__ is not None with (importlib.resources.files(__package__) / "mc.c").open("r") as f: mc_src = f.read() return mc_src def __init__(self): self.mc_src = self.get_src() def with_print_weight(self): """ Insert a printf statement after the opening brace in the mc_weight function. """ target_declaration = "static inline void mc_weight" printf_statement = ' printf("weight->i_scale = %d\\n", weight->i_scale);' # Find the position of the target function declaration start_index = self.mc_src.find(target_declaration) if start_index == -1: raise ValueError("Could not find target function `mc_weight`") # Find the position of the opening brace { after the target function declaration open_brace_index = self.mc_src.find("{", start_index) if open_brace_index == -1: raise ValueError("Could not find opening brace { for the target function") # Insert printf statement after the opening brace self.mc_src = ( self.mc_src[: open_brace_index + 1] # Original content up to { + "\n" + printf_statement # Insert printf statement + "\n" + self.mc_src[open_brace_index + 1 :] # Remaining content ) return self def with_disabled_iscale_mul(self): """ Disable the i_scale multiplication in the mc_weight function. Find: "src[x] * weight->i_scale", replace it to src[x] >>> builder = MCBuilder() >>> 'src[x] * weight->i_scale' in builder.mc_src True >>> builder = builder.with_disabled_iscale_mul() >>> 'src[x] * weight->i_scale' in builder.mc_src False >>> 'src[x]' in builder.mc_src True """ self.mc_src = self.mc_src.replace("src[x] * weight->i_scale", "src[x]") return self def with_disabled_vectorize_get_ref(self): """ Disable auto-vectorization for the get_ref function's loop. Adds a pragma directive before the main for loop in get_ref. Returns: self: Returns the builder instance for method chaining Example: >>> builder = MCBuilder() >>> "#pragma clang loop vectorize(disable)" not in builder.mc_src True >>> modified = builder.with_disabled_vectorize_get_ref() >>> "#pragma clang loop vectorize(disable)" in modified.mc_src True >>> target_loop = "for( int x = 0; x < i_width; x++ )" >>> lines = modified.mc_src.splitlines() >>> for i, line in enumerate(lines): ... if target_loop in line and i > 0: ... if "#pragma clang loop vectorize(disable)" in lines[i-1]: ... print("Pragma correctly placed") ... break Pragma correctly placed """ pragma_str = "#pragma clang loop vectorize(disable)" target_loop = "for( int x = 0; x < i_width; x++ )" # Split the source into lines to process lines = self.mc_src.splitlines() modified_lines = [] # Process each line for line in lines: # Add pragma before the target loop if target_loop in line: modified_lines.append(pragma_str) modified_lines.append(line) # Rebuild the source string self.mc_src = "\n".join(modified_lines) return self def build(self): return self.mc_src def update_exe(build_exe, exe_dir): exe_file = exe_dir / "x264_s_base.mytest-m64" shutil.copy2(build_exe, exe_file) print(f"Copied to {exe_file}") def make(build, spec_env): build_exe = build / "x264_s" run( [ "make", "TARGET=x264_s", ], check=True, env=spec_env, cwd=build, ) # Copy "s" into run_dir if not build_exe.exists(): raise RuntimeError(f"x264 exe not generated! Please verify make process") return build_exe def recompile_mc(mc_path: Path, build: Path, spec_env: Mapping[str, str]): # Recompile mc.o mc_o_path = mc_path.with_suffix(".o") if mc_o_path.exists(): print(f"Found exist mc.o file at {mc_o_path}") mc_o_path.unlink() # Run "make" return make(build, spec_env) specinvoke_args = { "ref": [ "--seek", "500", "--dumpyuv", "200", "--frames", "1250", "-o", "BuckBunny_New.264", "BuckBunny.yuv", "1280x720", ] } def perf_ref(x264_run, perf_output): """Run "perf" to profile x264 reference performance.""" x264_cmd = [ x264_run / "x264_s_base.mytest-m64", *specinvoke_args["ref"], ] run( [ "perf", "record", "-o", perf_output, "--", *x264_cmd, ], cwd=x264_run, )