Code Property Graph Tutorial

Note: This tutorial is available as a webpage or as a Jupyter notebook in a running MATE system. If you view this tutorial on the web, you won’t be able to see the output of the code blocks. To access this tutorial from a running MATE system, navigate to the web UI (usually http://localhost:3000), click “For experts”, “Notebooks”, “examples/”, and “cpg-tutorial-no-solutions.ipynb”.

At the core of MATE is a representation of the target program as a “code property graph.” A code property graph (CPG) is an annotated graph data structure that unifies the results of many different analyses of a target program, including its:

  • abstract syntax tree (AST)

  • call graph (CG)

  • control-flow graph (CFG)

  • inter-procedural control-flow graph (ICFG)

  • dataflow-graph (DFG)

  • control-dependence graph (CDG)

  • points-to graph (PTG)

Combining these representations in a single graph makes it possible to write queries over the graph that reason about the structure and behavior of the program and discover potential vulnerabilities.

This tutorial will introduce the elements of the code property graph while demonstrating how to use MATE’s CPG query language.

Note: A number of important query language concepts and tips are discussed along the way, so we encourage reading the entire tutorial, paying particular attention to notes like this one.

Guessing Game example program

To understand the components of the CPG, we examine a simple C program that implements a guessing game. You can access the source code of the challenge in the assets directory: assets/guessing-game.c.

The main function of the program initializes a “secret” number stored in a global variable to a randomly chosen number between 1 and 10. The user is then prompted to guess the number up to 10 times. If the user guesses correctly, the program prints “You win!” and the value of the secret number. If the user guesses incorrectly, the list of all of their guesses so far is printed and they are asked to try again. If the user does not guess correctly in 10 tries, the program exits.

#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>

typedef struct secret {
  int number;
} secret_t;

secret_t hidden;

void get_guess(int *guess);
void print_guesses(int *guesses, int i);

int main() {
  hidden.number = rand() % 10 + 1;

  int *guesses = calloc(9, sizeof(int));

  for (int i = 0; i < 9; i++) {
    get_guess(&guesses[i]);
 
    if (guesses[i] == hidden.number) {
      printf("You win! the secret was %d\n", hidden.number);
      free(guesses);
      return 0;
    } else {
      printf("Wrong answer!\n");
      print_guesses(guesses, i);
    }
  }

  printf("You're out of guesses, sorry.\n");

  free(guesses);
  return -1;
}

The get_guess function is implemented using a helper function read_num that uses the C standard library function scanf.

int read_num(int *num) {
  int no = scanf("%d", num);
  if (no != 1) {
    return 1;
  }
  if ((*num < 1) || (*num > 10)) {
    return 2;
  }
  return 0;
}

void get_guess(int *guess) {
  printf("Guess a number 1-10: ");
  while (true) {
    if (read_num(guess) == 0) {
      return;
    } else {
      printf("Invalid entry. Try again.\n");
    }
  }
}

The print_guesses function takes a pointer to the start of the guesses array and prints all of the guesses up to and including the guess at index i.

void print_guesses(int *guesses, int i) {
  printf("Your guesses so far:");
  for (int j = 0; j <= i; j++) {
    printf(" %d", guesses[j]);
  }
  printf("\n");
}

LLVM bitcode representation

MATE does not analyze C or C++ source code directly, but instead first compiles programs to LLVM bitcode, a low-level, procedural intermediatere presentation used by the LLVM compiler infrastructure. Analyzing bitcode directly allows MATE to process any language that compiles using LLVM and simplifies the number of coding constructs the system must understand.

The LLVM bitcode language is detailed in the LLVM Language Reference Manual accompanying the LLVM compiler. Consider taking some time to familiarize yourself with the language, particularly the Instruction Reference.

While building a program’s CPG, MATE generates a “canonical” bitcode file linking all of the modules compiled into the program. In the CHESS system, this file can be viewed in the “Vision” tool by selecting the file with the suffix “.canonical.ll”.

For this tutorial, you can view the bitcode of the guessing-game program in the assets directory: assets/guessing-game.ll. The bitcode of the program and other intermediate artifacts can be also be downloaded using the mate-cli command line tool or the MATE REST API.

Bitcode for the print_guesses function

To get a feel for the translation of C code to bitcode, consider the print_guesses function:

void print_guesses(int *guesses, int i) {
  printf("Your guesses so far:");
  for (int j = 0; j <= i; j++) {
    printf(" %d", guesses[j]);
  }
  printf("\n");
}

This function compiles to the following bitcode:

@.str.6 = private unnamed_addr constant [21 x i8] c"Your guesses so far:\00", align 1
@.str.7 = private unnamed_addr constant [4 x i8] c" %d\00", align 1
@.str.8 = private unnamed_addr constant [2 x i8] c"\0A\00", align 1

; Function Attrs: noinline nounwind optnone uwtable
define dso_local void @print_guesses(i32*, i32) #0 !dbg !84 {
i0:
  %t0 = alloca i32*, align 8
  %t1 = alloca i32, align 4
  %t2 = alloca i32, align 4
  store i32* %0, i32** %t0, align 8
  call void @llvm.dbg.declare(metadata i32** %t0,
                              metadata !87,
                              metadata !DIExpression()), !dbg !88
  store i32 %1, i32* %t1, align 4
  call void @llvm.dbg.declare(metadata i32* %t1,
                              metadata !89,
                              metadata !DIExpression()), !dbg !90
  %t3 = call i32 (i8*, ...) @printf(
    i8* getelementptr inbounds ([21 x i8], [21 x i8]* @.str.6, i32 0, i32 0)
  ), !dbg !91
  call void @llvm.dbg.declare(metadata i32* %t2,
                              metadata !92,
                              metadata !DIExpression()), !dbg !94
  store i32 0, i32* %t2, align 4, !dbg !94
  br label %i1, !dbg !95

i1:                                               ; preds = %i3, %i0
  %t4 = load i32, i32* %t2, align 4, !dbg !96
  %t5 = load i32, i32* %t1, align 4, !dbg !98
  %t6 = icmp sle i32 %t4, %t5, !dbg !99
  br i1 %t6, label %i2, label %i4, !dbg !100

i2:                                               ; preds = %i1
  %t7 = load i32*, i32** %t0, align 8, !dbg !101
  %t8 = load i32, i32* %t2, align 4, !dbg !103
  %t9 = sext i32 %t8 to i64, !dbg !101
  %t10 = getelementptr inbounds i32, i32* %t7, i64 %t9, !dbg !101
  %t11 = load i32, i32* %t10, align 4, !dbg !101
  %t12 = call i32 (i8*, ...) @printf(
    i8* getelementptr inbounds ([4 x i8], [4 x i8]* @.str.7, i32 0, i32 0),
    i32 %t11
  ), !dbg !104
  br label %i3, !dbg !105

i3:                                               ; preds = %i2
  %t13 = load i32, i32* %t2, align 4, !dbg !106
  %t14 = add nsw i32 %t13, 1, !dbg !106
  store i32 %t14, i32* %t2, align 4, !dbg !106
  br label %i1, !dbg !107, !llvm.loop !108

i4:                                               ; preds = %i1
  %t15 = call i32 (i8*, ...) @printf(
    i8* getelementptr inbounds ([2 x i8], [2 x i8]* @.str.8, i32 0, i32 0)
  ), !dbg !110
  ret void, !dbg !111
}

Some things to note:

  • LLVM bitcode has unstructured control-flow: constructs such as if statements and loops are compiled to sets of basic blocks (unconditionally executed sequences of instructions) with labels (e.g. i0, i1, …) connected by explicit branches.

  • Complex expressions that might involve multiple arithmetic operations or loads or stores to memory are decomposed into sequences of simple instructions such as load, store, or add.

  • Stack allocations are explicit via calls to the alloca instruction (though at higher optimization levels they are frequently compiled away).

  • The code is in static single assignment (SSA) form: each variable has a single definition and only values explicitly stored in memory locations may be directly mutated. Places where loops or other control structures mean that multiple dynamic occurences of a definition could reach the same instruction, explicit phi instructions are introduced to track the dependencies.

  • The bitcode includes a number of elements for tracking debug information relating back to the original source code of the program, including directives such as !dbg !71 and calls to debugging intrinsics, such as @llvm.dbg.declare, which will be converted into DWARF debugging information embedded in the compiled binary.

Guessing game CPG

Let’s take an initial look at the CPG for the guessing game program. You can load a CPG in the MATE notebook service by creating a new database session and then invoking session.graph_from_build() with the build UUID of the target CPG (available from the MATE builds page). Clicking on an “Open Jupyter Notebook” button will open a notebook prepopulated with the necessary commands to start querying the corresponding database.

Here, the tutorial queries the MATE databse to determine which CPG to load.

from mate_common.models.artifacts import ArtifactKind

session = db.new_session()

build = (
    session.query(db.Build)
    .join(db.Artifact, db.Build.artifacts)
    .filter(
        db.Artifact.kind == ArtifactKind.CompileOutputBitcode,
        db.Artifact.attributes['binary_filename'].astext == "guessing-game.bin"
    )
    .with_entities(db.Build)
    .first()
)

if build is None:
    print("\x1b[31mPlease build a CPG for the guess-game.c program before continuing.")
    print("\nYou can build the CPG using the `mate-cli` command line tool by downloading")
    print("`guessing-game.c` and then running the command `mate-cli oneshot guessing-game.c.`")
    print("\nYou may need to supply a connection string argument to `mate-cli` depending on")
    print("your network and MATE configuration.")
    print("\nOnce you have built a CPG for the guessing game, please run this cell again.\x1b[0m")
else:
    cpg = session.graph_from_build(build)

With the CPG connected to the notebook, let’s start exploring the CPG with some graph visualizations. The following cell implements a graphviz-based visualization function that takes as input queries defining the nodes and edges to visualize, along with options for configuring the output.

It is very complex and you don’t need to understand it to complete the tutorial—just run the block to make the definitions available in the cells that follow. In the future, you might find this code useful to re-use or modify when writing your own CPG exploration tools.

from __future__ import annotations

import enum
import logging
import tempfile
import warnings
from contextlib import contextmanager
from typing import TYPE_CHECKING, Callable, Dict, Final, Optional, Set, Type

import pygraphviz as pgv
from IPython.display import Image, display
from sqlalchemy.orm import Query, aliased

from mate_common.models.cpg_types import EdgeKind, MATEComponent, NodeJSON, NodeKind
from mate_common.models.cpg_types.mate import CONSTANT_NODES, DWARF_TYPE_NODES, INSTRUCTION_NODES, POINTS_TO

if TYPE_CHECKING:
    from mate.cpg.models.core import CPG, Edge, Node
    from mate.cpg.models.node.ast.llvm import Block, Function, Instruction

logger = logging.getLogger("MATE")

# NOTE(ww): The color names here and below are X11 color names;
# they're what Graphviz uses internally.
COMPONENT_COLOR_MAP: Final[Dict[MATEComponent, str]] = {
    MATEComponent.AST_GRAPH_WRITER: "green",
    MATEComponent.HEADACHE: "magenta",
    MATEComponent.ASPIRIN: "gold",
    MATEComponent.WEDLOCK: "aquamarine",
    MATEComponent.SIGNATURE: "orange",
}


@enum.unique
class VizStrategy(enum.Enum):
    """Enumerates the set of strategies understood by Visualizer."""

    NAIVE: Final[str] = "naive"
    IR_SUBGRAPHS: Final[str] = "ir-subgraphs"

    # TODO(ww): Implement this visualization strategies.
    # ASM_SUBGRAPHS = "asm-subgraphs"
    # CALLGRAPH = "callgraph"
    # DATAFLOW = "dataflow"

    # NOTE(ww): We define this so that argparse can convert each enum
    # into a nice value for its --help output.
    def __str__(self) -> str:
        return self.value


@enum.unique
class VizStyle(enum.Enum):
    """Enumerates the set of strategies understood by Visualizer."""

    DETAIL: Final[str] = "detail"
    PROVENANCE: Final[str] = "provenance"

    # NOTE(ww): We define this so that argparse can convert each enum
    # into a nice value for its --help output.
    def __str__(self) -> str:
        return self.value


@enum.unique
class VizProgram(enum.Enum):
    """Enumerates the layout programs that can be specified for graphviz."""

    DOT: Final[str] = "dot"
    NEATO: Final[str] = "neato"

    # NOTE(ww): We define this so that argparse can convert each enum
    # into a nice value for its --help output.
    def __str__(self) -> str:
        return self.value


def _add_node_tracked(
    graph: pgv.AGraph, stylefunc: Callable[[Node], Dict[str, str]], node: Node, visited: Set[str]
) -> None:
    if node.uuid in visited:
        return
    visited.add(node.uuid)
    # NOTE(ww): We could use the URL attribute here to link to the node's documentation.
    graph.add_node(node.uuid, **stylefunc(node))


def _add_edge_tracked(graph: pgv.AGraph, edge: Edge, visited: Set[str]) -> None:
    if edge.uuid in visited:
        return
    visited.add(edge.uuid)
    # NOTE(ww): We could use labelURL here to link to the edge's documentation.
    graph.add_edge(
        edge.source,
        edge.target,
        label=edge.kind.value,
        lhead=f"cluster_{edge.target}",
        ltail=f"cluster_{edge.source}",
    )


class StyleKeyword(enum.Enum):
    LABEL: Final[str] = "label"
    SHAPE: Final[str] = "shape"
    COLOR: Final[str] = "color"
    STYLE: Final[str] = "style"

    def __str__(self) -> str:
        return self.value


def _style_by_provenance(node: Node) -> Dict[str, str]:
    color = COMPONENT_COLOR_MAP.get(node.provenance)
    if color is None:
        logger.warn(f"No color configured for {node.provenance.value}; defaulting to gray")
        color = "gray"

    return {
        StyleKeyword.LABEL.value: node.kind,
        StyleKeyword.COLOR.value: color,
        StyleKeyword.STYLE.value: "filled",
    }


def _label_uuid(node: Node) -> str:
    return f"{node.kind.value}: {node.uuid}"


def _make_label_attribute(attribute: NodeJSON) -> Callable[[Node], str]:
    def _label_attribute(node: Node) -> str:
        return f"{node.kind.value}: {node.uuid}: {node.attributes.get(attribute.value,None)}"

    return _label_attribute


def _label_function(node: Node) -> str:
    name = node.attributes["name"]
    demangled = node.attributes.get("demangled", name)
    return (
        f"{node.kind.value}: {node.uuid}: {name}"
        if name == demangled
        else f"{node.kind.value}: {node.uuid}: {name} ({demangled})"
    )


def _label_instruction(node: Node) -> str:
    if "pretty_string" in node.attributes:
        return f"{node.kind.value}: {node.uuid}: {node.attributes['pretty_string']}"
    elif "ssa_name" in node.attributes:
        return f"{node.kind.value}: {node.uuid}: {node.attributes['ssa_name']} = {node.attributes['opcode']}"
    else:
        return f"{node.kind.value}: {node.uuid}: {node.attributes['opcode']}"


def _label_constant(node: Node) -> str:
    if "pretty_string" in node.attributes:
        return f"{node.kind.value}: {node.uuid}: {node.attributes['pretty_string']}"
    elif "constant_int_value" in node.attributes:
        return f"{node.kind.value}: {node.uuid}: {node.attributes['constant_int_value']}"
    else:
        return f"{node.kind.value}: {node.uuid}"


def _label_llvm_type(node: Node) -> str:
    if "pretty_string" in node.attributes["definition"]:
        return f"{node.kind.value}: {node.uuid}: {node.attributes['definition']['pretty_string']}"
    else:
        return f"{node.kind.value}: {node.uuid}"


def _label_param_binding(node: Node) -> str:
    return f"{node.kind.value}: {node.uuid}: parameter binding for argument {node.attributes['arg_op_number']}"


def _label_machine_instr(node: Node) -> str:
    if "pretty_string" in node.attributes:
        return f"{node.kind.value}: {node.uuid}: {node.attributes['pretty_string']}"
    else:
        return f"{node.kind.value}: {node.uuid}: {node.attributes['opcode']}"


class DetailColors(enum.Enum):
    CODE: Final[str] = "black"
    DATA: Final[str] = "blue"
    MEM: Final[str] = "green"
    TYPE: Final[str] = "orange"
    MODEL: Final[str] = "purple"
    UNKNOWN: Final[str] = "red"


class DetailShapes(enum.Enum):
    CODE: Final[str] = "box"
    DATA: Final[str] = "box"
    LOWLEVEL: Final[str] = "trapezium"
    LOC: Final[str] = "oval"
    RIGHT: Final[str] = "rarrow"
    LEFT: Final[str] = "larrow"
    HEX: Final[str] = "hexagon"


NODE_KIND_COLOR_MAP: Final[Dict[NodeKind, str]] = {
    **{k: DetailColors.DATA.value for k in CONSTANT_NODES},
    **{k: DetailColors.TYPE.value for k in DWARF_TYPE_NODES},
    **{k: DetailColors.CODE.value for k in DWARF_FEATURE_NODES},
    **{k: DetailColors.CODE.value for k in INSTRUCTION_NODES},
    **{
        NodeKind.MODULE: DetailColors.CODE.value,
        NodeKind.TRANSLATION_UNIT: DetailColors.CODE.value,
        NodeKind.FUNCTION: DetailColors.CODE.value,
        NodeKind.BLOCK: DetailColors.CODE.value,
        NodeKind.MEMORY_LOCATION: DetailColors.MEM.value,
        NodeKind.UNCLASSIFIED_NODE: DetailColors.UNKNOWN.value,
        NodeKind.LLVM_TYPE: DetailColors.TYPE.value,
        NodeKind.ARGUMENT: DetailColors.DATA.value,
        NodeKind.LOCAL_VARIABLE: DetailColors.MEM.value,
        NodeKind.GLOBAL_VARIABLE: DetailColors.MEM.value,
        NodeKind.PARAM_BINDING: DetailColors.MODEL.value,
        NodeKind.CALL_RETURN: DetailColors.MODEL.value,
        NodeKind.ASM_GLOBAL_VARIABLE: DetailColors.MEM.value,
        NodeKind.MACHINE_BASIC_BLOCK: DetailColors.CODE.value,
        NodeKind.MACHINE_FUNCTION: DetailColors.CODE.value,
        NodeKind.MACHINE_INSTR: DetailColors.CODE.value,
        NodeKind.ASM_BLOCK: DetailColors.CODE.value,
        NodeKind.ASM_INST: DetailColors.CODE.value,
        NodeKind.DATAFLOW_SIGNATURE: DetailColors.MODEL.value,
        NodeKind.INPUT_SIGNATURE: DetailColors.MODEL.value,
        NodeKind.OUTPUT_SIGNATURE: DetailColors.MODEL.value,
        NodeKind.PLT_STUB: DetailColors.CODE.value,
    },
}

NODE_KIND_SHAPE_MAP: Final[Dict[NodeKind, str]] = {
    **{k: DetailShapes.DATA.value for k in CONSTANT_NODES},
    **{k: DetailShapes.LOWLEVEL.value for k in DWARF_TYPE_NODES},
    **{k: DetailShapes.LOWLEVEL.value for k in DWARF_FEATURE_NODES},
    **{k: DetailShapes.CODE.value for k in INSTRUCTION_NODES},
    **{
        NodeKind.MODULE: DetailShapes.CODE.value,
        NodeKind.TRANSLATION_UNIT: DetailShapes.CODE.value,
        NodeKind.FUNCTION: DetailShapes.CODE.value,
        NodeKind.BLOCK: DetailShapes.CODE.value,
        NodeKind.MEMORY_LOCATION: DetailShapes.LOC.value,
        NodeKind.UNCLASSIFIED_NODE: DetailShapes.CODE.value,
        NodeKind.LLVM_TYPE: DetailShapes.CODE.value,
        NodeKind.ARGUMENT: DetailShapes.DATA.value,
        NodeKind.LOCAL_VARIABLE: DetailShapes.DATA.value,
        NodeKind.GLOBAL_VARIABLE: DetailShapes.DATA.value,
        NodeKind.PARAM_BINDING: DetailShapes.RIGHT.value,
        NodeKind.CALL_RETURN: DetailShapes.LEFT.value,
        NodeKind.ASM_GLOBAL_VARIABLE: DetailShapes.LOWLEVEL.value,
        NodeKind.MACHINE_BASIC_BLOCK: DetailShapes.LOWLEVEL.value,
        NodeKind.MACHINE_FUNCTION: DetailShapes.LOWLEVEL.value,
        NodeKind.MACHINE_INSTR: DetailShapes.LOWLEVEL.value,
        NodeKind.ASM_BLOCK: DetailShapes.LOWLEVEL.value,
        NodeKind.ASM_INST: DetailShapes.LOWLEVEL.value,
        NodeKind.DATAFLOW_SIGNATURE: DetailShapes.HEX.value,
        NodeKind.INPUT_SIGNATURE: DetailShapes.RIGHT.value,
        NodeKind.OUTPUT_SIGNATURE: DetailShapes.LEFT.value,
        NodeKind.PLT_STUB: DetailShapes.LOWLEVEL.value,
    },
}

NODE_KIND_LABEL_MAP: Final[Dict[NodeKind, Callable[[Node], str]]] = {
    **{k: _label_constant for k in CONSTANT_NODES},
    **{k: _label_uuid for k in DWARF_TYPE_NODES},
    **{k: _label_uuid for k in DWARF_FEATURE_NODES},
    **{k: _label_instruction for k in INSTRUCTION_NODES},
    **{
        NodeKind.MODULE: _make_label_attribute(NodeJSON.MODULE_NAME),
        NodeKind.TRANSLATION_UNIT: _make_label_attribute(NodeJSON.FILENAME),
        NodeKind.FUNCTION: _label_function,
        NodeKind.BLOCK: _label_uuid,
        NodeKind.CONSTANT_STRING: _make_label_attribute(NodeJSON.STRING_VALUE),
        NodeKind.MEMORY_LOCATION: _make_label_attribute(NodeJSON.PRETTY_STRING),
        NodeKind.UNCLASSIFIED_NODE: _label_uuid,
        NodeKind.LLVM_TYPE: _label_llvm_type,
        NodeKind.ARGUMENT: _make_label_attribute(NodeJSON.NAME),
        NodeKind.LOCAL_VARIABLE: _make_label_attribute(NodeJSON.NAME),
        NodeKind.GLOBAL_VARIABLE: _make_label_attribute(NodeJSON.NAME),
        NodeKind.PARAM_BINDING: _label_param_binding,
        NodeKind.CALL_RETURN: _label_uuid,
        NodeKind.ASM_GLOBAL_VARIABLE: _make_label_attribute(NodeJSON.NAME),
        NodeKind.MACHINE_BASIC_BLOCK: _make_label_attribute(NodeJSON.SYMBOL),
        NodeKind.MACHINE_FUNCTION: _make_label_attribute(NodeJSON.NAME),
        NodeKind.MACHINE_INSTR: _label_machine_instr,
        NodeKind.ASM_BLOCK: _label_uuid,
        NodeKind.ASM_INST: _make_label_attribute(NodeJSON.PRETTY_STRING),
        NodeKind.DATAFLOW_SIGNATURE: _label_uuid,
        NodeKind.INPUT_SIGNATURE: _label_uuid,
        NodeKind.OUTPUT_SIGNATURE: _label_uuid,
        NodeKind.PLT_STUB: _label_uuid,
    },
}


def _style_by_detail(node: Node) -> Dict[str, str]:
    return {
        StyleKeyword.SHAPE.value: NODE_KIND_SHAPE_MAP[node.kind],
        StyleKeyword.COLOR.value: NODE_KIND_COLOR_MAP[node.kind],
        StyleKeyword.LABEL.value: NODE_KIND_LABEL_MAP[node.kind](node),
    }


class Visualizer:
    """Visualizes the supplied CPG using Graphviz.

    Supports filters for restricting the kinds of nodes visualized, as well as a set of
    visualization "strategies" for laying the CPG out with a particular set of features in mind.
    """

    def __init__(self, cpg: CPG) -> None:
        self._cpg = cpg

    def _dispatch_strategy(self, strategy, style, query, graph_kwargs) -> pgv.AGraph:
        if style == VizStyle.PROVENANCE:
            style_fun = _style_by_provenance
        elif style == VizStyle.DETAIL:
            style_fun = _style_by_detail
        else:
            logger.error(f"Unimplemented strategy: {strategy.value}")
            raise ValueError(f"Unimplemented strategy: {strategy.value}")

        if strategy == VizStrategy.NAIVE:
            return self._render_naive(query, style_fun, graph_kwargs)
        elif strategy == VizStrategy.IR_SUBGRAPHS:
            return self._render_ir_subgraphs(query, style_fun, graph_kwargs)
        else:
            logger.error(f"Unimplemented style: {style.value}")
            raise ValueError(f"Unimplemented style: {style.value}")

    def _render_naive(self, query, style_fun, graph_kwargs) -> pgv.AGraph:
        """A "naive" rendering strategy, where we iterate over every edge in some arbitrary
        (probably insertion) order and hope for a relatively readable output graph.

        Pros: Fast.
        Cons: No clustering or subgraph identification.
        """
        graph = pgv.AGraph(directed=True, **graph_kwargs)

        visited_nodes: Set[str] = set()
        for (edge, source, target) in query.yield_per(1000):
            for node in [source, target]:
                if node.uuid not in visited_nodes:
                    graph.add_node(node.uuid, **style_fun(node))

            graph.add_edge(source.uuid, target.uuid, label=edge.kind.value)
            visited_nodes.update({source.uuid, target.uuid})

        return graph

    def _render_ir_subgraphs(self, query, style_fun, graph_kwargs) -> pgv.AGraph:
        """A subgraph rendering strategy, where IR functions (and their constituent blocks +
        instructions) are collected into discrete subgraphs.

        Pros: Fast-ish, puts functions and their features in distinct subgraphs.
        Cons: Not much easier to read than the "naive" strategy, does funny things
              when a subgraph has no blocks or instructions
              (e.g., an external or intrinsic function).
        """
        graph = pgv.AGraph(directed=True, compound=True, **graph_kwargs)

        visited_nodes: Set[str] = set()
        visited_edges: Set[str] = set()

        # This strategy has two passes:
        # 1. We build our subgraphs using a separate query, keeping track of seen nodes.
        # 2. We perform the main edge iteration, ignoring work already done by the subgraph pass.
        node_uuids = (
            # Get the nodes that will be drawn by the visualization
            query.with_entities(self._cpg.Edge.source)
            .union(query.with_entities(self._cpg.Edge.target))
            .subquery()
        )

        I = aliased(self._cpg.Instruction)
        B = aliased(self._cpg.Block)
        F = aliased(self._cpg.Function)

        func_query = (
            self._cpg.session.query(B)
            .filter(B.uuid.in_(node_uuids))
            .join(F, B.parent_function)
            .with_entities(F)
            .union(
                self._cpg.session.query(I)
                .filter(I.uuid.in_(node_uuids))
                .join(B, I.parent_block)
                .join(F, B.parent_function)
                .with_entities(F)
            )
        )

        for func in func_query.yield_per(100):
            subgraph = graph.get_subgraph(func.uuid)
            if subgraph is None:
                subgraph = graph.add_subgraph(name=f"cluster_{func.uuid}", **_style_by_detail(func))

            # The subgraph gets the function itself, all of the function's basic blocks,
            # and each basic block's instructions (unless the latter has been filtered).
            _add_node_tracked(subgraph, style_fun, func, visited_nodes)
            for block in func.blocks:
                blockgraph = subgraph.get_subgraph(block.uuid)
                if blockgraph is None:
                    blockgraph = subgraph.add_subgraph(
                        name=f"cluster_{block.uuid}", **_style_by_detail(block)
                    )

                _add_node_tracked(blockgraph, style_fun, block, visited_nodes)

                for instr in block.instructions:
                    _add_node_tracked(blockgraph, style_fun, instr, visited_nodes)

        for (edge, source, target) in query.yield_per(1000):
            if edge.kind in [
                EdgeKind.INSTRUCTION_TO_PARENT_BLOCK,
                EdgeKind.BLOCK_TO_PARENT_FUNCTION,
            ]:
                continue
            for node in [source, target]:
                _add_node_tracked(graph, style_fun, node, visited_nodes)

            _add_edge_tracked(graph, edge, visited_edges)

        return graph

    @contextmanager
    def render(
        self,
        node_query: Optional[Query] = None,
        node_class: Optional[Type[Node]] = None,
        edge_query: Optional[Query] = None,
        edge_class: Optional[Type[Edge]] = None,
        strategy=VizStrategy.NAIVE,
        style=VizStyle.DETAIL,
        **graph_kwargs,
    ):
        """Renders the CPG with Graphviz using the supplied options, yielding a
        ``pygraphviz.AGraph`` object."""

        if node_class is None:
            node_class = self._cpg.Node

        if edge_class is None:
            edge_class = self._cpg.Edge

        Source = aliased(self._cpg.Node)
        Target = aliased(self._cpg.Node)

        if node_query is None:
            if edge_query is None:
                query = (
                    self._cpg.session.query(edge_class)
                    .join(Source, Source.uuid == edge_class.source)
                    .join(Target, Target.uuid == edge_class.target)
                    .with_entities(edge_class, Source, Target)
                )
            else:
                query = (
                    edge_query.join(Source, Source.uuid == edge_class.source)
                    .join(Target, Target.uuid == edge_class.target)
                    .with_entities(edge_class, Source, Target)
                )
        else:
            node_subquery = node_query.with_entities(node_class.uuid).subquery()

            if edge_query is None:
                query = (
                    self._cpg.session.query(edge_class)
                    .join(Source, (Source.uuid == edge_class.source) & (Source.uuid.in_(node_subquery)))
                    .join(Target, (Target.uuid == edge_class.target) & (Target.uuid.in_(node_subquery)))
                    .with_entities(edge_class, Source, Target)
                )
            else:
                query = (
                    edge_query.join(
                        Source,
                        (Source.uuid == edge_class.source) & (Source.uuid.in_(node_subquery)),
                    )
                    .join(
                        Target,
                        (Target.uuid == edge_class.target) & (Target.uuid.in_(node_subquery)),
                    )
                    .with_entities(edge_class, Source, Target)
                )

        # NOTE(ww): 5000 is somewhat arbitrary here.
        edge_count = query.count()
        if edge_count > 5000:
            logger.warn(f"Given a large CPG; consider filtering some nodes")
        logger.debug(f"Rendering CPG with {edge_count} edges")

        graph = self._dispatch_strategy(strategy, style, query, graph_kwargs)
        graph.layout()
        yield graph
        graph.close()


def display_cpg(
    cpg,
    node_query: Optional[Query] = None,
    node_class: Optional[Type[Node]] = None,
    edge_query: Optional[Query] = None,
    edge_class: Optional[Type[Edge]] = None,
    strategy=VizStrategy.NAIVE,
    style=VizStyle.DETAIL,
    program=VizProgram.DOT,
    unconfined=False,
    overlap=False,
):
    """Displays a visualization of the CPG or query within a notebook.

    :param cpg: The CPG to visualize
    :param node_query: A query returning Nodes to include in the visualization
    :param node_class: Optional, the model class for which node_query will return results, required if node_query uses an aliased model class
    :param edge_query: A query returning Edges to include in the visualization
    :param edge_class: Optional, the model class for which edge_query will return results, required if node_query uses an aliased model class
    :param VizStrategy strategy: A layout strategy for grouping CPG nodes
    :param VizStyle style: A style for rendering CPG nodes
    :param VizProgram program: The program to use for rendering, e.g. 'dot' or 'neato'
    :param bool unconfined: If True, renders the visualization with scrollbars
    :param bool overlap: Whether to allow overlapping nodes
    """
    v = Visualizer(cpg)
    with tempfile.NamedTemporaryFile(suffix=".png") as file:
        # Hide warnings from implementation of IR_SUBGRAPH clusters, sizes, etc.
        with warnings.catch_warnings():
            warnings.simplefilter("ignore")
            with v.render(
                node_query=node_query,
                node_class=node_class,
                edge_query=edge_query,
                edge_class=edge_class,
                strategy=strategy,
                style=style,
                overlap=overlap,
            ) as g:
                g.draw(file, format="png", prog=program.value)
                display(Image(file.name, unconfined=unconfined))

def uuids_for_function(cpg: CPG, name: str):
    Inst = aliased(cpg.Instruction)
    Blk = aliased(cpg.Block)
    Fun = aliased(cpg.Function)

    # Start by getting all of the Function, Block, and Instruction
    # nodes associated with the function.
    in_func = (
        cpg.session.query(Fun)
        .filter_by(name=name)
        .with_entities(Fun.uuid)
        .union(
            cpg.session.query(Blk)
            .join(Fun, Blk.parent_function)
            .filter(Fun.name == name)
            .with_entities(Blk.uuid),
            cpg.session.query(Inst)
            .join(Blk, Inst.parent_block)
            .join(Fun, Blk.parent_function)
            .filter(Fun.name == name)
            .with_entities(Inst.uuid),
        )
    )

    # Then add any node that's connected to these by an edge
    nodes = in_func.union(
        cpg.session.query(cpg.Edge)
        .filter(
            cpg.Edge.source.in_(in_func),
            # Don't follow outgoing CALL_RETURN_TO_CALLER, CALLGRAPH,
            # and VALUE_DEFINITION_TO_USE edges because they will
            # return nodes that aren't immediately relevant, such as
            # functions nodes invoked by the program
            ~cpg.Edge.kind.in_(
                [
                    EdgeKind.CALL_RETURN_TO_CALLER,
                    EdgeKind.CALLGRAPH,
                    EdgeKind.VALUE_DEFINITION_TO_USE,
                ]
            ),
            # Don't add nodes for machine inst or asmlevel
            # relationships
            cpg.Edge.target_node.has(cpg.Node.kind.in_(LLVM_LEVEL_NODES)),
        )
        .with_entities(cpg.Edge.target),
        cpg.session.query(cpg.Edge)
        .filter(
            cpg.Edge.target.in_(in_func),
            # Don't follow incoming CALL_TO_FUNCTION or CALLGRAPH
            # edges because we don't want to pick up callsites or
            # calling functions
            ~cpg.Edge.kind.in_([EdgeKind.CALL_TO_FUNCTION, EdgeKind.CALLGRAPH]),
            cpg.Edge.source_node.has(cpg.Node.kind.in_(LLVM_LEVEL_NODES)),
        )
        .with_entities(cpg.Edge.source),
    ).subquery()

    # Finally, add in all of the memory locations that are reachable
    # from pointers in the function.
    expanded = (
        db.PathBuilder()
        .starting_at(lambda Node: Node.uuid.in_(nodes))
        .continuing_while(lambda _, Edge: Edge.kind.in_(POINTS_TO))
        .build(cpg)
    )
    return cpg.session.query(expanded.target)

Let’s start by taking a quick look at the entire CPG using the display_cpg function defined above.

Note: For even moderately-sized programs, the CPG may take an extremely long time to visualize, so we don’t recommend calling display_cpg without carefully narrowing down the set of nodes and edges you wish to see.

display_cpg(cpg, program=VizProgram.NEATO)

Even this simple program has a very complex CPG!

Note: While the CPG for this program is too large to understand using such a visualization, you can view a larger version of this image (or others in this tutorial) by right-clicking on the image and saving it to view in a different program, or by clicking in the left margin of the notebook to zoom in on the image and enable scroll bars.

CPG data model

The cpg object provides an interface for querying the CPG via MATE’s embedded query language. The query language is based on the SQLAlchemy ORM framework for Python, and supports a wide-range of SQL-like query capabilities. This tutorial will demonstrate many common queries, but additional tutorials and documentation are available from the SQLAlchemy website.

A more detailed description of the underlying data representation and CPG schema can be found in the section CPG Schema of the MATE documentation.

To make writing queries more natural, MATE implements model classes that can be directly used to query specific types of nodes and relationships within the CPG. The available models, which are imported into MATE notebooks by default, are documented in the API Reference section in the mate.cpg.models package and its submodules.

Simple queries

Let’s start by writing some simple queries to understand the scale and complexity of the CPG.

The Node and Edge model classes of a CPG represent nodes and edges in the CPG of any type. We can write a query that selects all the corresponding entities of a model class in the CPG by calling query on our current session with the class as an argument: e.g. session.query(cpg.Node). We can realize the query by invoking a method on the resulting query object, such as count:

print(session.query(cpg.Node).count())
print(session.query(cpg.Edge).count())

You can narrow the results of a query by specifying a filter. For example, we can filter on the “kind” attribute of Nodes or Edges to get a more detailed account of what’s in the CPG.

print("Nodes by kind")
for kind in NodeKind:
    print(f"{session.query(cpg.Node).filter(cpg.Node.kind == kind).count()}\t{kind}")

print("Edges by kind")
for kind in EdgeKind:
    print(f"{session.query(cpg.Edge).filter(cpg.Edge.kind == kind).count()}\t{kind}")

To examine nodes or edges in the graph more closely, we can call all on a query to get the results as a python list:

session.query(cpg.Function).all()

Note that in this query we used the model class Function, which corresponds to Nodes with the kind NodeKind.FUNCTION. Each kind of node in the MATE CPG has an accompanying model class.

In addition to providing an easy way to query for a particular kind of node, model classes provide streamlined access to node attributes and relationships.

For example, we can query for the function whose name is “print_guesses” using the following query. We can also directly access attributes of the node via the returned python object.

print_guesses = session.query(cpg.Function).filter(cpg.Function.name == "print_guesses").one()
print_guesses.is_declaration

Note: You can include multiple filters in a single query by passing them as additional arguments to filter, calling filter multiple times, or combining them with logical operators. The following all return the same results:

  • session.query(cpg.Node).filter(<predicate 1>, <predicate 2>).all()

  • session.query(cpg.Node).filter(predicate 1).filter(predicate 2).all()

  • session.query(cpg.Node).filter((<predicate 1>) & (<predicate 2>)).all()

Call Graph (CG)

The first subgraph of the CPG we will explore is the Call Graph. The call graph connects each function to functions it might directly call or invoke using “Callgraph” edges.

display_cpg(cpg, edge_query=session.query(cpg.Edge).filter(cpg.Edge.kind.in_([EdgeKind.CALLGRAPH])), program=VizProgram.DOT)

Note: The in_ operator can be used to select nodes whose attributes are a member of a python collection or the results of another subquery.

Note: Supplying VizProgram.DOT (“dot”) as the program argument to display_cpg can result in nicer layouts, but is more expensive than VizProgram.NEATO (“neato”).

There are a number of ways to traverse the callgraph using queries. We will show a few here to demonstrate different ways to interact with the query language.

For simple interactive exploration with a small number of nodes, you can use the callers and callees attributes of the Function class directly within python:

main = session.query(cpg.Function).filter(cpg.Function.name == "main").one()
main.callees

Note: To limit the amount of data loaded from the database at once, some attributes that represent one-to-many or many-to-many relationships are not loaded automatically, and must be manually queried using all().

Note: You can see a list of attributes available for a model using the python help feature: e.g. help(Function) or by tab-completing within the Jupyter notebook.

These relationship attributes can also be used within a query to follow callgraph edges within a larger SQL query. For many-to-many relationships such as callers, the any operator returns true if any related node satisfies the predicate. For many-to-one or one-to-one relationships, the has operator fills the same purpose.

session.query(cpg.Function).filter(cpg.Function.callers.any(cpg.Function.name == "main")).all()

Relationship attributes can also be used to perform join queries over multiple entities in the CPG. The join operator works like a standard SQL join operator. The first argument is the model class to join against and the second argument is either the attribute to join on or a predicate expressing a join condition.

Callee = aliased(cpg.Function)
(
    session.query(cpg.Function)
    .filter(cpg.Function.name == "main")
    .join(Callee, cpg.Function.callees)
    .with_entities(Callee)
    .all()
)

Note: The various model classes for different types of nodes all represent the same set of node entities. This means that you can write session.query(cpg.Node).filter(cpg.Function.name == "main").one(), where cpg.Node and cpg.Function refer to the same entities and return the expected result.

To write queries that describe relationships between multiple nodes or edges, you must create “aliases” of an appropriate model to represent each model.

It is helpful to think of defining an alias as defining a separate logical variable to use in your query.

Note: In practice, writing queries with explicit joins is significantly more performant than writing queries that filter on relationships using has or any operators.

The with_entities operator changes the set of entities involved in the query that will be return in the results. For example, we could choose to also return the cpg.Function entity to get a series of caller-callee pairs as results.

Callee = aliased(cpg.Function)
(
    session.query(cpg.Function)
    .filter(cpg.Function.name == "main")
    .join(Callee, cpg.Function.callees)
    .with_entities(cpg.Function, Callee)
    .all()
)

We can also more explicitly write this query to directly reference the edges table, though in this case one of the previous queries would be more appropriate. This query also demonstrates the use of an explicit join condition join(cpg.Node, cpg.Node.uuid == cpg.Edge.target), which is equivalent to join(cpg.Node, cpg.Edge.target_node).

(
    session.query(cpg.Edge)
    .filter(
        cpg.Edge.kind == EdgeKind.CALLGRAPH,
        cpg.Edge.source_node.has(cpg.Function.name == "main")
    )
    .join(cpg.Node, cpg.Node.uuid == cpg.Edge.target)
    .with_entities(cpg.Node)
    .all()
)

Abstract Syntax Tree (AST)

For the next series of explorations of the CPG, we will focus our attention on the function get_guess:

void get_guess(int *guess) {
  printf("Guess a number 1-10: ");
  while (true) {
    if (read_num(guess) == 0) {
      return;
    } else {
      printf("Invalid entry. Try again.\n");
    }
  }
}

The bitcode of this function is:

; Function Attrs: noinline nounwind optnone uwtable
define dso_local void @get_guess(i32*) #0 !dbg !65 {
i0:
  %t0 = alloca i32*, align 8
  store i32* %0, i32** %t0, align 8
  call void @llvm.dbg.declare(metadata i32** %t0,
                              metadata !68,
                              metadata !DIExpression()), !dbg !69
  %t1 = call i32 (i8*, ...) @printf(
    i8* getelementptr inbounds ([22 x i8], [22 x i8]* @.str.4, i32 0, i32 0)
  ), !dbg !70
  br label %i1, !dbg !71

i1:                                               ; preds = %i4, %i0
  %t2 = load i32*, i32** %t0, align 8, !dbg !72
  %t3 = call i32 @read_num(i32* %t2), !dbg !75
  %t4 = icmp eq i32 %t3, 0, !dbg !76
  br i1 %t4, label %i2, label %i3, !dbg !77

i2:                                               ; preds = %i1
  ret void, !dbg !78

i3:                                               ; preds = %i1
  %t5 = call i32 (i8*, ...) @printf(
    i8* getelementptr inbounds ([27 x i8], [27 x i8]* @.str.5, i32 0, i32 0)
  ), !dbg !80
  br label %i4

i4:                                               ; preds = %i3
  br label %i1, !dbg !71, !llvm.loop !82
}

The Abstract Syntax Tree subgraph records information about how different program elements are related textually in the program. Because LLVM does not have structured control-flow, the resulting information is fairly minimal: there are edges between functions and their arguments, between functions and the blocks they contain, between blocks and the instructions they contain, and between functions and instructions and their types.

get_guess_uuids = uuids_for_function(cpg, "get_guess")

get_guess_ast = (
    session.query(cpg.Edge)
    .filter(
        cpg.Edge.kind.in_(AST_EDGES),
        cpg.Edge.source.in_(get_guess_uuids),
        cpg.Edge.target.in_(get_guess_uuids)
    )
)

display_cpg(cpg, edge_query=get_guess_ast, program=VizProgram.DOT)

We can use AST edges to traverse between instructions and their parent blocks and functions, or to filter a query to only those instructions belonging to a function:

F = aliased(cpg.Function)
B = aliased(cpg.Block)

inst = (
    session.query(F)
    .filter(F.name == "get_guess")
    .join(B, F.blocks)
    .join(cpg.Instruction, B.instructions)
    .with_entities(cpg.Instruction)
    .first()
)

print(
    session.query(cpg.Instruction)
    .filter(cpg.Instruction.uuid == inst.uuid)
    .join(B, cpg.Instruction.parent_block)
    .join(F, B.parent_function)
    .with_entities(F)
    .one()
)

Control-Flow Graph (CFG)

The Control-Flow Graph encodes the intra-procedural control-flow between basic blocks and instructions within each function.

Let’s take a look at the CFG for the get_guess function.

get_guess_uuids = uuids_for_function(cpg, "get_guess")

get_guess_cfg = (
    session.query(cpg.Edge)
    .filter(
        cpg.Edge.kind.in_(LOCAL_CONTROL_FLOW_FORWARD),
        cpg.Edge.source.in_(get_guess_uuids),
        cpg.Edge.target.in_(get_guess_uuids)
    )
)

display_cpg(cpg, edge_query=get_guess_cfg, program=VizProgram.DOT, strategy=VizStrategy.IR_SUBGRAPHS)

Control-Dependence Graph (CDG)

The Control-Dependency Graph has edges between blocks and instructions that are control-dependent on each other. An instruction is control-dependent on another if whether it is executed depends on the control-flow leaving the controlling instruction. For example, the body of a loop is control-dependent on the branch instruction of the loop guard. In this snippet of code:

if (some_condition) {
   stmt1;   
} else {
   stmt2;   
}
stmt3;

stmt1 and stmt2 are control dependent on some_condition, but stmt3 is not.

Compare the control-dependency graph for get_guess to the control-flow graph for get_guess.

get_guess_uuids = uuids_for_function(cpg, "get_guess")

get_guess_cdg = (
    session.query(cpg.Edge)
    .filter(
        cpg.Edge.kind.in_(CONTROL_DEP_FORWARD),
        cpg.Edge.source.in_(get_guess_uuids),
        cpg.Edge.target.in_(get_guess_uuids)
    )
)

display_cpg(cpg, edge_query=get_guess_cdg, program=VizProgram.DOT)

Control dependence can be useful for understanding the conditions under which code executes. For example, we can use a path query in the CDG to find what nodes control printing the “You win!” message in main.

you_win = (
    session.query(cpg.CallSite)
    .filter(cpg.CallSite.callees.any(cpg.Function.name == "printf"))
    .filter(cpg.CallSite.location['line'] == '23')
    .one()
)

cdg_query = (
    db.PathBuilder()
    .stopping_at(lambda Node: Node.uuid == you_win.uuid)
    .continuing_while(
        lambda _, Edge: Edge.kind == EdgeKind.TERMINATOR_INSTRUCTION_TO_CONTROL_DEPENDENT_INSTRUCTION
    )
    .reverse()
    .build(cpg)
)

controllers = (
    session.query(cdg_query)
    .join(cpg.Instruction, cpg.Instruction.uuid == cdg_query.source)
    .with_entities(cpg.Instruction)
    .all()
)

for controller in controllers:
    print(controller.attributes['pretty_string'])

This query uses a “path” query to find all nodes reachable in the CDG from the start node. The PathBuilder constructor specifies a path traversal to run within the CPG using a “fluent” interface where method calls are chained to refine the desired path.

The stopping_at operator takes a function that takes a Node class object as an argument and returns a predicate describing which nodes in the graph paths must end at. There is an analogous starting_at operator that specifies where the paths should start. If either of these operators are not called, then the path will start (or stop) at any node. The continuing_while operator takes a function whose arguments are a class representing the current state of the traversal and an Edge class object, and must return a predicate indicating whether that edge should be traversed from the current state.

For example, you could force the query to stay in a single function using the following filter:

.continuing_while(
    lambda _, Edge: (
        Edge.target_node.has(
            cpg.Instruction.parent_block.has(
                cpg.Block.parent_function.has(
                    cpg.Function.name == "main"
                )
            )
        )
    ) 
)

In this case, this filter is not necessary.

The reverse operator changes the direction along which edges are followed during the search. Typically, if you specify stopping_at but not starting_at, you should specify a reverse traversal. If you provide both starting_at and stopping_at, you should chose the direction of search based on which is likely to to explore less of the graph.

Note: the source and target nodes of the Edge passed to the continuing while predicate are not swapped if you specify reverse.

The final call to build() runs the traversal and returns a new model class representing the path with attributes such as the source and target uuids of nodes of the path. A common pattern is to join these attributes with the Node table, as shown in this example.

We write another visualization function to help understand the results of this query:

def visualize_ctxt(cpg, instructions, message, window_size=1):
    pointer = message + " --> "
    margin = ''.ljust(len(pointer))
    for inst in instructions:
        try:
            ndir = inst.location['dir']
            nfile = inst.location['file']
            nline = inst.location['line']
            nstr = inst.location_string
            print(f"{margin}{nstr}")
            print(f"{margin}{''.ljust(len(nstr),'=')}")
            for l in range(nline-window_size,nline+window_size+1):
                code = (
                    session.query(cpg.Instruction)
                    .filter(
                        cpg.Instruction.location['dir'].astext == ndir,
                        cpg.Instruction.location['file'].astext == nfile,
                        cpg.Instruction.location['line'].as_integer() == l,
                    )
                    .with_entities(cpg.Instruction.attributes['source_code'])
                    .first()
                )
                print(f"{pointer if l == nline else margin}{l}:\t{code[0] if code else ''}")
        except:
            fun = inst.parent_block.parent_function.name
            print(f"{margin}{fun}")
            print(f"{margin}{''.ljust(len(fun),'=')}")
            print(f"{pointer}\t{inst.attributes['pretty_string']}")
        print()
            
            

visualize_ctxt(cpg, controllers, "controlling instruction")

As we might hope, you can only win the game if you guess the number correctly and you haven’t run out of guesses yet.

Data-Flow Graph (DFG)

The Data-Flow Graph encodes data dependencies between instructions and memory locations in the program and abstracts away the control structure of the program.

Compare the data-flow graph for the get_guess function to its control-flow graph.

get_guess_uuids = uuids_for_function(cpg, "get_guess")

get_guess_dfg = (
    session.query(cpg.Edge)
    .filter(
        cpg.Edge.kind.in_(DATA_FLOW_FORWARD),
        cpg.Edge.source.in_(get_guess_uuids),
        cpg.Edge.target.in_(get_guess_uuids)
    )
)

display_cpg(cpg, edge_query=get_guess_dfg, program=VizProgram.DOT)

This graph has been restricted to only intra-procedural data flows becuase we filter on Nodes for the get_guess function, but the dataflow graph can also be traversed intra-procedurally in two ways:

  1. through parameter binding and call return nodes, described in the section “Understanding function call subgraphs” below.

  2. through loads and stores to memory accessible in multiple functions,

This dataflow graph also includes a node for a dataflow signature for the printf function. You can learn more about dataflow signatures in the “Mate Signatures” section of the MATE documentation.

Inter-procedural data-flow

Naively tracing data-flow inter-procedurally by following dataflow edges in the CPG can result in false-positives because not all sequences of dataflow edges in the graph are realizable during program execution. For example, consider this simple program:

int propagate(int x) {
  return x;
}

int test() {
  int a = 1;
  int b = 2;
  int c = propagate(a);
  int d = propagate(b);
}

The data-flow graph contains paths from both a and b to the argument x of propagate and paths from both the return value of propagate to c and d. As a result, simply following these dataflow edges in the graph will report that a flows to both c and d and that b flows to both c and d. In this case, the cause of the imprecision is the failure of the analysis to track C’s stack-based call discipline, which ensures that each call to propagate returns to its original callsite.

Similar imprecision can occur when tracking dataflows through memory locations because aliasing in the program can also depend on the history of the program’s execution.

To achieve significantly improved precision for queries about inter-procedural dataflow, MATE has an interface for expressing graph traversals as “context-free-language reachability” (CFL-reachability) queries. A CFL-reachability query matches paths in the graph whose edge labels are members of a specified context-free language. An example such language is the language of well-matched function calls and returns, which can filter out infeasible paths like the flows from a to d and b to c in the example above.

CFL-reachability queries use the same PathBuilder interface as the path queries described above, but must specify a base Path class in the call to the PathBuilder() constructor. A number of built-in CFL traversals are provided in the cfl module. The query below uses the cfl.CSThingDataflowPath class for precise inter-procedural dataflow.

It starts at all InputSignature nodes that represent potential user input from the library function scanf.

scanf_signatures = (
    session.query(cpg.InputSignature)
    .filter(cpg.InputSignature.signature_for_function.has(cpg.Function.name == "__isoc99_scanf"))
    .all()
)

scanf_dataflow = (
    db.PathBuilder(cfl.CSThinDataflowPath)
    .starting_at(lambda Node: Node.uuid.in_([cs.uuid for cs in scanf_signatures]))
    .build(cpg, keep_edge=True)
)

scanf_dataflow_edges = (
    session.query(scanf_dataflow)
    .join(cpg.Edge, scanf_dataflow.edge == cpg.Edge.uuid)
    .with_entities(cpg.Edge)
)

display_cpg(cpg, edge_query=scanf_dataflow_edges, program=VizProgram.DOT)

Information-Flow Graph (IFG)

The Information-Flow Graph combines the Control-Dependence and Data-Flow Graphs. Tracing relationships in the information-flow graph will find all the data and instructions that influence the computation of a result. If you are familiar with research in information-flow security, you can think of the IFG as combining tracking of “explicit flows” from the DFG with tracking of “implicit flows” from the CDG.

Let’s compare the IFG with DFG for get_guess:

get_guess_uuids = uuids_for_function(cpg, "get_guess")

get_guess_ifg = (
    session.query(cpg.Edge)
    .filter(
        cpg.Edge.kind.in_(INFO_FLOW_FORWARD),
        cpg.Edge.source.in_(get_guess_uuids),
        cpg.Edge.target.in_(get_guess_uuids)
    )
)

display_cpg(cpg, edge_query=get_guess_ifg, program=VizProgram.DOT)

We can also re-examine what influences the “You win!” message through a wider lens:

you_win = (
    session.query(cpg.CallSite)
    .filter(cpg.CallSite.callees.any(cpg.Function.name == "printf"))
    .filter(cpg.CallSite.location['line'] == '23')
    .one()
)

ifg_query = (
    db.PathBuilder()
    .stopping_at(lambda Node: Node.uuid == you_win.uuid)
    .continuing_while(
        lambda _, Edge: Edge.kind.in_(INFO_FLOW_FORWARD)
    )
    .reverse()
    .build(cpg)
)

influencers = (
    session.query(ifg_query)
    .join(cpg.Instruction, cpg.Instruction.uuid == ifg_query.source)
    .with_entities(cpg.Instruction)
    .all()
)

visualize_ctxt(cpg, influencers, "influencing instruction")

Our query now also returns interesting lines such as where the secret is initialized:

                            guessing-game.c:15:26:main
                            ==========================
                            14:	
influencing instruction --> 15:	hidden.number = rand() % 10 + 1;
                            16:	

Note: This list includes many duplicate source lines because compiling a C statement often results in many LLVM bitcode instructions.

Points-To Graph (PTG)

The Points-To Graph encodes information about the structure of memory during execution. It is generated by a whole-program static analysis that takes place before CPG construction.

Each node in the points-to graph represents an abstraction of a set of concrete memory locations that could exist at runtime. The point-to analysis used by MATE type and field sensitive, so a memory location might be represented by more than one node in order to allow more precise reasoning about accesses to subobjects. For example, consider the following snippet of C:

int *a = calloc(4, sizeof(int));
int *b = a[1];

The pointer b might point to memory location nodes:

  • heap_alloc@main[i32 %t0][1]

  • typed_heap_alloc@main[i32 %t0][1]

Tracking points-to information at this fine-grained level makes it possible to accurately track dataflows through structures and other arrays.

Sometimes it is necessary to find less-precise information about a pointer, for example when finding other uses of a memory object at a different granualarity. To facilite this, the CPG between memory locations that must alias (such as a and &a[0]), that may alias (such as &a[0] and &a[x] where x is unknown, and that are subregions of each other (such as a and &a[1]).

Let’s take a look at the points-to graph for the guessing game program:

display_cpg(cpg, node_query=session.query(cpg.MemoryLocation), program=VizProgram.NEATO)

Inter-procedural Control-Flow Graph (ICFG)

The Inter-procedural Control-Flow Graph is a subgraph of the CPG that connects each function’s CFG into a single graph. For small programs, it is possible to visualize the entir program’s ICFG at once:

icfg = (
    session.query(cpg.Edge)
    .filter(
        cpg.Edge.kind.in_(CONTROL_FLOW_FORWARD)
    )
)

nodes = (
    session.query(cpg.Node)
    .filter(cpg.Node.kind.in_(LLVM_LEVEL_NODES))
)

display_cpg(cpg, edge_query=icfg, node_query=nodes, program=VizProgram.DOT, strategy=VizStrategy.IR_SUBGRAPHS)

Understanding function call subgraphs

To retain precision when reasoning about control- and data-flows through a function with multiple callsites, the CPG has special “Parameter Binding” and “Call Return” nodes that are inserted between each callsite and the function it invokes. To better understand these nodes and how they are used, let’s zoom in on the call to read_num inside the get_guess function.

# Find the Function node
readnum = session.query(cpg.Function).filter(cpg.Function.name == "read_num").one()
relevant = [readnum]

# Find the CallSite node
readnum_callsite = session.query(cpg.CallSite).filter(cpg.CallSite.calls("read_num")).one()
relevant += [readnum_callsite]

# Add the arguments to the Call
operands = readnum_callsite.uses.all()
relevant += operands

# Add the parameter binding nodes for the call
binding_nodes = readnum_callsite.binds
relevant += binding_nodes

# Add the function's argument nodes
arguments = readnum.arguments
relevant += arguments
for arg in arguments:
    # And the instructions that use the argument to hint at the dataflow within read_num
    relevant += arg.used_by

# Add the call return node for the call
return_node = readnum_callsite.returns_from
relevant += [return_node]
# Add the return instruction
relevant += [return_node.returns_from]
# Add the return value
relevant += [return_node.value]

display_cpg(cpg,
            node_query=(
                session.query(cpg.Node).filter(cpg.Node.uuid.in_([n.uuid for n in relevant]))
            ),
            edge_query=(
                session.query(cpg.Edge).filter(
                    cpg.Edge.kind.notin_(
                        [EdgeKind.FUNCTION_TO_ENTRY_BLOCK,EdgeKind.BLOCK_TO_TERMINATOR_INSTRUCTION]
                    )
                )
            ),
            program=VizProgram.DOT
           )

For each argument at a callsite, there is a “ParamBinding” node that represents the assignment of the call’s argument to the corresponding function’s argument. This node is connected to the CallSite by a “CallToParamBinding” edge and to the function argument it binds by a “ParamBindingToArg” edge. It is also connected to the call’s “CallReturn” node.

The CallReturn node has edges from each of the callee’s return instructions as well as an edge from the instructions that define the function’s return value.

The edges between CallSites and all of their corresponding ParamBinding and CallReturn nodes add structure to the ICFG that can be useful when trying to reason about flows through functions with many callsites.

Query language tips and tricks

Corrupted cpg session

If running a query reports a problem related to the state of the database connection, try rolling back the session by issuing the command

cpg.session.rollback()

Exercises

Exercise 1

Find the locations of all of the call sites in the program where the printf function is invoked, along with which functions contain them.

Hint: You can see documentation of available attributes for a model class by invoking help() on the class (e.g., help(cpg.Instruction)).

Hint: The cpg.CallSite model has a special attribute .calls that can be used to filter for CallSites that call the function whose name is given as an argument to .calls.

Hint: Instruction objects (including nodes returned by cpg.CallSite queries) have a parent_block attribute and blocks have a parent_function attribute.

Hint: Instruction objects have a .location attribute.

Exercise 2

Find pairs of locations of callsites in the program that take input from the user and provide output to the user where the inputs and outputs are connected via dataflow.

Hint: Use a cfl.CSThinDataflowPath path in your query.

Hint: The predicates Node.Kind == NodeKind.INPUT_SIGNATURE and Node.Kind == NodeKind.OUTPUT_SIGNATURE match input and output signature nodes, respectively.

Hint: Passing the argument keep_start=True to db.PathBuilder’s .build() method makes the resulting query object’s .source and .target attributes include the uuids of the path’s start and end, respectively.

Hint: Part of your solution should use a query with provided aliases to output pairs of Input and Output Signatures by joining on these classes and including the clause .with_entities(InputSig, OutputSig).

Hint: Signature objects have an attribute signature_for connecting them to their corresponding callsites.

InputSig = aliased(cpg.InputSignature)
OutputSig = aliased(cpg.OutputSignature)

Exercise 3

Find the possible call stacks at the input and output calls you found in exercise 2.