import argparse
import graphviz
from pairipcore.context import VM, addr_t
from pairipcore.insn import Insn, InsnFormat
[docs]
class LocationDB:
"""
A database for managing address ranges and labels within a virtual machine.
Attributes:
__ranges_ (List[range]): List of address ranges.
__labels_ (Dict[addr_t, str]): Dictionary of address labels.
"""
__ranges_: list[range]
__labels_: dict[addr_t, str]
[docs]
def __init__(self) -> None:
self.__ranges_ = []
self.__labels_ = {}
def __contains__(self, addr: addr_t) -> bool:
"""
Check if an address is within any of the managed ranges.
Args:
addr (addr_t): The address to check.
Returns:
bool: True if the address is within any range, False otherwise.
"""
return any(map(lambda x: addr in x, self.__ranges_))
[docs]
def add_insn(self, insn: Insn) -> None:
"""
Add an instruction's address range to the database.
Args:
insn (Insn): The instruction to add.
"""
self.__ranges_.append(
range(
insn.opcode_address, insn.opcode_address + len(insn.insn_format)
)
)
[docs]
def add_memory(self, start: addr_t, length: int) -> None:
"""
Add a memory range to the database.
Args:
start (addr_t): Starting address of the memory range.
length (int): Length of the memory range.
"""
self.__ranges_.append(range(start, start + length))
[docs]
def add(self, start: addr_t, end: addr_t) -> None:
"""
Add a custom address range to the database.
Args:
start (addr_t): Starting address of the range.
end (addr_t): Ending address of the range.
"""
self.__ranges_.append(range(start, end))
[docs]
def get_label(self, addr: addr_t, is_mem: bool = False) -> str:
"""
Retrieve or generate a label for an address.
Args:
addr (addr_t): The address to label.
is_mem (bool): Whether the address is for memory or not.
Returns:
str: The label for the address.
"""
name = self.__labels_.get(addr)
if name is None:
if is_mem:
name = f"DAT_{addr:#08x}"
else:
name = f"LAB_{addr:#08x}"
self.__labels_[addr] = name
return name
[docs]
def has_label(self, addr: addr_t) -> bool:
"""
Check if an address has an assigned label.
Args:
addr (addr_t): The address to check.
Returns:
bool: True if the address has a label, False otherwise.
"""
return addr in self.__labels_
[docs]
class GraphDelegate:
"""
A delegate class for managing graph nodes and edges related to VM instructions and memory.
Attributes:
graph (graphviz.Digraph): The graph object for visualization.
opcode_ids (Dict[int, str]): Mapping of opcodes to human-readable names.
Args:
graph (graphviz.Digraph): The graph object for visualization.
opcode_ids (Dict[int, str]): Mapping of opcodes to human-readable names.
"""
graph: graphviz.Digraph
[docs]
def __init__(self, graph, opcode_ids: dict) -> None:
self.graph = graph
self.opcode_ids = opcode_ids
[docs]
def memory_node(self, mem_label: str, mem_addr: addr_t) -> None:
"""
Create a graph node for a memory location.
Args:
mem_label (str): The label for the memory node.
mem_addr (addr_t): The memory address.
"""
mem_html_label = f'<td align="center" colspan="2"><font color="forestgreen">DAT_{mem_addr:#08x}</font></td>'
self.graph.node(
mem_label,
f'<<table border="0" cellborder="0" cellpadding="3"><tr>{mem_html_label}</tr></table>>',
{"shape": "record", "fontname": "Courier New"},
)
[docs]
def leaf_insn_node(self, label: str, opcode: int) -> None:
"""
Create a graph node for a leaf instruction.
Args:
label (str): The label for the instruction node.
opcode (int): The opcode of the instruction.
"""
opcode_html = self.opcode_ids.get(opcode, "unknown")
if opcode_html == "unknown":
opcode_html = f'<font color="red">{opcode_html}</font>'
head_html = f"{opcode_html} ({opcode:#04x})"
text = f'<td align="center" colspan="2" bgcolor="lightgrey">{head_html}</td>'
self.graph.node(
label,
f'<<table border="0" cellborder="0" cellpadding="3"><tr>{text}</tr></table>>',
{"shape": "Mrecord", "fontname": "Courier New"},
)
[docs]
def insn_node(
self, label: str, opcode: int, insn: Insn, loc_db: LocationDB
) -> None:
"""
Create a graph node for an instruction, including its memory accesses.
Args:
label (str): The label for the instruction node.
opcode (int): The opcode of the instruction.
insn (Insn): The instruction object.
loc_db (LocationDB): The location database for memory and label management.
"""
html_text = []
for name in insn.insn_format.vars():
mem_addr = getattr(insn, name)
create_node = not loc_db.has_label(mem_addr)
mem_label = loc_db.get_label(mem_addr, is_mem=True)
if create_node:
self.memory_node(mem_label, mem_addr)
loc_db.add_memory(mem_addr, 1)
html_name = f'<font color="blue1">{name:<8}</font>'
html_addr = f'<font color="black">#</font><font color="azure4">{mem_addr:#08x}</font>'
self.graph.edge(
label,
mem_label,
color="grey",
group="mem",
)
html_text.append(f'<td align="left">{html_name}{html_addr}</td>')
head_html = f'{self.opcode_ids.get(opcode, "unknown")} ({opcode:#04x}) @{insn.opcode_address:#08x}'
html_text.insert(
0,
f'<td align="center" colspan="2" bgcolor="lightgrey">{head_html}</td>',
)
text = "<tr>" + "</tr><tr>".join(html_text) + "</tr>"
self.graph.node(
label,
f'<<table border="0" cellborder="0" cellpadding="3">{text}</table>>',
{"shape": "Mrecord", "fontname": "Courier New"},
)
[docs]
def instruction_handler(vm: VM, depth: int) -> None:
"""
Recursively handle and visualize instructions in the VM's memory, updating
the location database and graph delegate with instruction details.
Args:
vm (VM): The virtual machine instance.
depth (int): The current depth of recursion.
"""
loc_db: LocationDB = vm.state["loc_db"]
if vm.context.pc in loc_db:
# memory region or overlaps with another instruction
return
argv = vm.state["argv"]
graph: GraphDelegate = vm.state["graph"]
if depth >= argv.depth:
return
opcode = vm.current_opcode()
if argv.path and opcode != argv.path[depth]:
return
vm.context += 2
label = loc_db.get_label(vm.context.pc - 2)
format_ids = vm.state["format_ids"]
if opcode not in format_ids:
graph.leaf_insn_node(label, opcode)
loc_db.add(vm.context.pc - 2, vm.context.pc)
return
insn = Insn(vm, format_ids[opcode])
graph.insn_node(label, opcode, insn, loc_db)
loc_db.add_insn(insn)
graph.graph.edge(label, loc_db.get_label(insn.info.next), color="green")
graph.graph.edge(label, loc_db.get_label(insn.info.fallback), color="red")
if insn.info.next not in loc_db:
vm.context.pc = insn.info.next
instruction_handler(vm, depth + 1)
if insn.info.fallback not in loc_db:
vm.context.pc = insn.info.fallback
instruction_handler(vm, depth + 1)
[docs]
def new_cfg(
vm: VM, opcode_def: dict, loc_db=None, depth=None, path=None, layout=None
) -> graphviz.Digraph:
"""
Generates a control flow graph (CFG) for a VM's instructions.
>>> vm = VM(...)
>>> cfg = pairipcore.cfg.new_cfg(vm, opcode_def={...}, depth=5)
>>> open("cfg.dot").write(str(cfg))
Args:
vm (VM): The virtual machine instance.
opcode_def (dict): Definitions of opcodes, including format IDs.
loc_db (LocationDB, optional): Pre-existing location database. If None, a new one will be created.
depth (int, optional): Maximum depth for recursion in CFG generation.
path (list[int], optional): Specific opcode path to follow in the CFG.
layout (str, optional): Graphviz layout direction (e.g., 'TB' for top-to-bottom).
Returns:
graphviz.Digraph: The generated control flow graph.
"""
if "opcodes" in opcode_def:
opcode_def = opcode_def["opcodes"]
opcode_ids = {}
format_ids = {}
for opcode, value in opcode_def.items():
opcode = int(opcode)
if isinstance(value, dict) and "format_id" in value:
format_id = value["format_id"]
else:
format_id = str(value)
if format_id is not None:
format_ids[opcode] = InsnFormat.parse(format_id)
opcode_ids[opcode] = f"_{format_id}"
argv = argparse.Namespace(depth=depth, path=path)
if argv.path:
argv.depth = len(argv.path)
if argv.depth is None:
argv.depth = 10
dot = graphviz.Digraph(
"instruction-tree",
comment="Instruction Trace",
graph_attr={"rankdir": layout or "TB"},
node_attr={"shape": "Mrecord", "fontname": "Courier New"},
)
loc_db = LocationDB()
vm.state["loc_db"] = loc_db
vm.state["graph"] = GraphDelegate(dot, opcode_ids)
vm.state["format_ids"] = format_ids
vm.state["argv"] = argv
instruction_handler(vm, 0)
return dot