mate_query.db module¶
This module contains classes that represent tables in the Postgres DB, or derive from them.
- class mate_query.db.AdvisoryLockBuildTransaction(build: str)¶
Bases:
sqlalchemy.sql.ddl.DDLElement
- Parameters
build (str) –
- Return type
None
- class mate_query.db.AnalysisTask(uuid: str, analysis_name: str, state: mate_common.models.analyses.AnalysisTaskState, build: mate_query.db.Build)¶
Bases:
object
Represents a concrete invocation of a POI analysis on a
Build
.- Parameters
uuid (str) –
analysis_name (str) –
state (AnalysisTaskState) –
build (Build) –
- classmethod create(analysis_name: str, build: mate_query.db.Build) mate_query.db.AnalysisTask ¶
- Parameters
analysis_name (str) –
build (mate_query.db.Build) –
- Return type
- to_info() mate_common.models.analyses.AnalysisTaskInfo ¶
Returns an
AnalysisTaskInfo
model for this analysis task.- Return type
- transition_to_state(new_state: mate_common.models.analyses.AnalysisTaskState) None ¶
- Parameters
new_state (mate_common.models.analyses.AnalysisTaskState) –
- Return type
None
- class mate_query.db.Artifact(uuid: str, kind: ArtifactKind, *, attributes: Dict[str, Any] = {})¶
Bases:
object
Represents a filesystem artifact associated with zero or more builds.
Artifacts are tagged with a
kind
, which indicates their disposition within MATE. Thekind
may or may not reflect the file format itself. Artifacts also carry anattributes
dictionary, which contains unstructured metadata relevant to the artifact.- Parameters
uuid (str) –
kind (ArtifactKind) –
attributes (Dict[str, Any]) –
- Return type
None
- compilations: List[Compilation]¶
- classmethod create_with_object(kind: ArtifactKind, *, fileobj: IO[AnyStr], attributes: Dict[str, Any] = {}) Artifact ¶
Create a new
Artifact
and put the given fileobj into the object store, associating it with the newly created artifact.- Parameters
kind (ArtifactKind) –
fileobj (IO[AnyStr]) –
attributes (Dict[str, Any]) –
- Return type
- get_object() Iterator[IO[bytes]] ¶
Yields the object associated with this artifact as a file-like object.
- Return type
Iterator[IO[bytes]]
- has_object() bool ¶
Returns whether this artifact has an associated object, i.e. whether an object has been upload to the object store for it.
- Return type
bool
- persist_locally(*, suffix: Optional[str] = None) pathlib.Path ¶
Create a local copy of the artifact and return it as a
pathlib.Path
.The caller is responsible for disposing of the path. No cleanup is performed by default.
- Parameters
suffix (Optional[str]) –
- Return type
pathlib.Path
- put_object(fileobj: IO[AnyStr], **kwargs: Any) None ¶
Puts the given fileobj into the object store, associating it with this artifact.
- Parameters
fileobj (IO[AnyStr]) –
kwargs (Any) –
- Return type
None
- to_info() mate_common.models.artifacts.ArtifactInformation ¶
Returns an
ArtifactInformation
model for this artifact.- Return type
- class mate_query.db.Build(uuid: str, state: BuildState, bitcode_artifact: Artifact, compilation: Compilation, *, options: Dict[str, Any], attributes: Dict[str, Any])¶
Bases:
object
- Parameters
uuid (str) –
state (BuildState) –
bitcode_artifact (Artifact) –
compilation (Compilation) –
options (Dict[str, Any]) –
attributes (Dict[str, Any]) –
- Return type
None
- analysis_tasks: List[AnalysisTask]¶
- classmethod create(bitcode_artifact: Artifact, compilation: Compilation, *, options: Dict[str, Any], attributes: Dict[str, Any] = {}) Build ¶
- Parameters
bitcode_artifact (Artifact) –
compilation (Compilation) –
options (Dict[str, Any]) –
attributes (Dict[str, Any]) –
- Return type
- mantiserve_tasks: List[MantiserveTask]¶
- to_info(artifact_detail: bool = False) mate_common.models.builds.BuildInformation ¶
Returns a
BuildInformation
model for this build.If
artifact_detail
is supplied, the resultingBuildInformation
includes theArtifactInformation
for each artifact in the build.- Parameters
artifact_detail (bool) –
- Return type
- transition_to_state(new_state: mate_common.models.builds.BuildState) None ¶
- Parameters
new_state (mate_common.models.builds.BuildState) –
- Return type
None
- class mate_query.db.BuildArtifactAssociation¶
Bases:
object
A many-many association table for mapping artifacts to builds and builds to artifacts.
An artifact can have many builds in one case: a compile target that’s used to produce multiple builds.
A build can have many artifacts in the normal case: each build is expected to produce compilation outputs and other artifacts that are associated with it.
- class mate_query.db.Compilation(uuid: str, state: CompilationState, source_artifact: Artifact, *, options: Dict[str, Any])¶
Bases:
object
- Parameters
uuid (str) –
state (CompilationState) –
source_artifact (Artifact) –
options (Dict[str, Any]) –
- Return type
None
- property challenge_id: Optional[str]¶
Returns the challenge broker ID that corresponds to this compilation, if this compilation was created from a brokered challenge.
- classmethod create(source_artifact: Artifact, *, options: Dict[str, Any]) Compilation ¶
- Parameters
source_artifact (Artifact) –
options (Dict[str, Any]) –
- Return type
- to_info() mate_common.models.compilations.CompilationInformation ¶
Returns a
CompilationInformation
model for this compilation.
- transition_to_state(new_state: mate_common.models.compilations.CompilationState) None ¶
- Parameters
new_state (mate_common.models.compilations.CompilationState) –
- Return type
None
- class mate_query.db.CompilationArtifactAssociation¶
Bases:
object
A many-many association table for mapping artifacts to compilations and compilations to artifacts.
An artifact can have many compilations, since multiple independent ``db.Compilation`s can be created (with different configurations) for one artifact.
Each
db.Compilation
, in turn, can have manydb.Artifacts
(in the form of compilation products and byproducts).
- class mate_query.db.CreateView(name: str, selectable: sqlalchemy.sql.selectable.FromClause)¶
Bases:
sqlalchemy.sql.ddl.DDLElement
- Parameters
name (str) –
selectable (FromClause) –
- Return type
None
- class mate_query.db.FlowFinderSnapshot(*, uuid: str, poi_result: Optional[POIResult], build: Build, label: str, filters: List[str], graph_requests: GraphRequests, hidden_graph_ids: List[str], hidden_node_ids: List[str], user_annotations: Dict[str, FlowFinderAnnotations])¶
Bases:
object
Represents the state of a user’s analysis of a
POIResult
.- Parameters
- Return type
None
- classmethod create(*, poi_result: Optional[POIResult], build: Build, label: str, filters: List[str] = [], graph_requests: GraphRequests = [], hidden_graph_ids: List[str] = [], hidden_node_ids: List[str] = [], user_annotations: Dict[str, FlowFinderAnnotations] = {}) FlowFinderSnapshot ¶
- Parameters
- Return type
- to_info() mate_common.models.analyses.FlowFinderSnapshotInfo ¶
Returns a
FlowFinderSnapshotInfo
model for this snapshot.
- class mate_query.db.Graph(parent: Optional[Graph], name: str, build: str, selectables: Optional[Tuple[FromClause, FromClause]], session: Session)¶
Bases:
mate_query.cpg.models.core.cpg.BaseCPG
- Parameters
- Return type
None
- ASMGlobalVariable: Type[ASMGlobalVariable]¶
- CallReturn: Type[CallReturn]¶
- CompositeCachedType: Type[CompositeCachedType]¶
- CompositeType: Type[CompositeType]¶
- ConstantFP: Type[ConstantFP]¶
- ConstantInt: Type[ConstantInt]¶
- ConstantString: Type[ConstantString]¶
- ConstantUndef: Type[ConstantUndef]¶
- DWARFArgument: Type[DWARFArgument]¶
- DWARFLocalVariable: Type[DWARFLocalVariable]¶
- DataflowSignature: Type[DataflowSignature]¶
- DerivedType: Type[DerivedType]¶
- GlobalVariable: Type[GlobalVariable]¶
- InputSignature: Type[InputSignature]¶
- Instruction: Type[Instruction]¶
- LocalVariable: Type[LocalVariable]¶
- MachineBasicBlock: Type[MachineBasicBlock]¶
- MachineFunction: Type[MachineFunction]¶
- MachineInstr: Type[MachineInstr]¶
- MemoryLocation: Type[MemoryLocation]¶
- OutputSignature: Type[OutputSignature]¶
- ParamBinding: Type[ParamBinding]¶
- StructureType: Type[StructureType]¶
- SubroutineType: Type[SubroutineType]¶
- TranslationUnit: Type[TranslationUnit]¶
- UnclassifiedNode: Type[UnclassifiedNode]¶
- property ast: mate_query.db.Graph¶
Construct the abstract sytnax tree subgraph of the current graph.
- property cdg: mate_query.db.Graph¶
Construct the control-dependence subgraph of the current graph.
- property cfg: mate_query.db.Graph¶
Construct the control-flow subgraph of the current graph.
- property cg: mate_query.db.Graph¶
Construct the call-subgraph of the current graph.
- property dfg: mate_query.db.Graph¶
Construct the data-flow subgraph of the current graph.
- classmethod from_build(build: mate_query.db.Build, session: mate_query.db.Session) mate_query.db.Graph ¶
Create a graph consisting of all nodes/edges from an existing build.
- Parameters
build (mate_query.db.Build) –
session (mate_query.db.Session) –
- Return type
- property icfg: mate_query.db.Graph¶
Construct the interprocedural control-flow subgraph of the current graph.
- property ifg: mate_query.db.Graph¶
Construct the information-flow subgraph of the current graph.
- paths(builder: PathBuilder) Type[Path] ¶
Run the given PathBuilder on the current graph.
- Parameters
builder (PathBuilder) –
- Return type
Type[Path]
- property ptg: mate_query.db.Graph¶
Construct the points-to subgraph of the current graph.
- subgraph(builder: mate_query.db.SubgraphBuilder) mate_query.db.Graph ¶
Run the given SubgraphBuilder on the current graph.
- Parameters
builder (mate_query.db.SubgraphBuilder) –
- Return type
- class mate_query.db.MantiserveTask(uuid: str, build: Build, kind: MantiserveTaskKind, request_msg: Dict[str, Any], state: MantiserveTaskState, response_msg: Optional[Dict[str, Any]] = None, docker_image: Optional[str] = None, job_id: Optional[str] = None)¶
Bases:
object
Mantiserve task info.
- Parameters
uuid (str) –
build (Build) –
kind (MantiserveTaskKind) –
request_msg (Dict[str, Any]) –
state (MantiserveTaskState) –
response_msg (Optional[Dict[str, Any]]) –
docker_image (Optional[str]) –
job_id (Optional[str]) –
- classmethod create(build: Build, kind: MantiserveTaskKind, request_msg: Dict[str, Any], docker_image: Optional[str] = None) MantiserveTask ¶
- Parameters
build (Build) –
kind (MantiserveTaskKind) –
request_msg (Dict[str, Any]) –
docker_image (Optional[str]) –
- Return type
- to_info() mate_common.models.manticore.MantiserveTaskInformation ¶
Returns a
MantiserveTaskInformation
model for this Mantiserve task.
- transition_to_state(new_state: MantiserveTaskState, msg: Optional[str] = None) None ¶
Transition to a different state with validity checking.
- Parameters
new_state¶ – Transition to this state
msg¶ – Optional log message to attach to
self.response_msg
. If not present, does not touchself.response_msg
. Should only be used when transitioning to aMantiserveTaskState.Failed
state.new_state (MantiserveTaskState) –
msg (Optional[str]) –
- Returns
None
- Return type
None
- class mate_query.db.MantiserveTaskArtifactAssociation¶
Bases:
object
Associate table of a MantiserveTask to many Artifacts.
- class mate_query.db.POIResult(uuid: str, poi: Dict[str, Any], analysis_task: AnalysisTask, graph_requests: GraphRequests, flagged: bool = False, done: bool = False, complexity: POIResultComplexity = POIResultComplexity.Unknown)¶
Bases:
object
Represents a single POI result, produced by running a POI analysis through an
AnalysisTask
.- Parameters
uuid (str) –
poi (Dict[str, Any]) –
analysis_task (AnalysisTask) –
graph_requests (GraphRequests) –
flagged (bool) –
done (bool) –
complexity (POIResultComplexity) –
- Return type
None
- analysis_task: AnalysisTask¶
- classmethod create(poi: Dict[str, Any], analysis_task: AnalysisTask, graph_requests: List[Dict[str, Any]], complexity: POIResultComplexity = POIResultComplexity.Unknown) POIResult ¶
- Parameters
poi (Dict[str, Any]) –
analysis_task (AnalysisTask) –
graph_requests (List[Dict[str, Any]]) –
complexity (POIResultComplexity) –
- Return type
- snapshots: List[FlowFinderSnapshot]¶
- to_info() mate_common.models.analyses.POIResultInfo ¶
Returns a
POIResultInfo
model for this POI result.- Return type
- class mate_query.db.Path(info: Any, source: Any, source_stack: Tuple[Any, ...], target: Any, edge: Any, trace: Tuple[Any, ...], state: Any, stack: Tuple[Any, ...], stack_top: Any, c: Any, edge_symbol_table: Optional[Table] = None, transition_table: Optional[Table] = None, is_reversed: ClassVar[bool] = False, stepped_over_functions: ClassVar[set[str]] = <factory>, cfl: ClassVar[bool] = False)¶
Bases:
object
For documentation of this class’s attributes, see comments in
_make_path_table
or usehelp
at runtime.- Parameters
info (Any) –
source (Any) –
source_stack (Tuple[Any, ...]) –
target (Any) –
edge (Any) –
trace (Tuple[Any, ...]) –
state (Any) –
stack (Tuple[Any, ...]) –
stack_top (Any) –
c (Any) –
edge_symbol_table (Optional[Table]) –
transition_table (Optional[Table]) –
is_reversed (ClassVar[bool]) –
stepped_over_functions (ClassVar[set[str]]) –
cfl (ClassVar[bool]) –
- Return type
None
- c: Any¶
- cfl: ClassVar[bool] = False¶
- edge: Any¶
- edge_symbol_table: Optional[Table] = None¶
- info: Any¶
- classmethod initial_stack() Null ¶
- Return type
Null
- classmethod initial_state() Null ¶
- Return type
Null
- is_reversed: ClassVar[bool] = False¶
- classmethod populate_edge_symbol_table(graph: mate_query.db.Graph) None ¶
- Parameters
graph (mate_query.db.Graph) –
- Return type
None
- classmethod populate_transition_table(graph: mate_query.db.Graph) None ¶
- Parameters
graph (mate_query.db.Graph) –
- Return type
None
- source: Any¶
- source_stack: Tuple[Any, ...]¶
- stack: Tuple[Any, ...]¶
- stack_top: Any¶
- state: Any¶
- stepped_over_functions: ClassVar[set[str]]¶
- target: Any¶
- trace: Tuple[Any, ...]¶
- transition_table: Optional[Table] = None¶
- class mate_query.db.PathBuilder(PathBase: Type[Path] = <class 'mate_query.db.Path'>)¶
Bases:
object
A builder class for enumerating paths in a graph.
PathBuilder supports “CFL-reachability queries” (CFL stands for “context-free language”). These queries have two parts:
In a pre-processing step, each edge in the graph is labeled with some symbol (string).
The graph is explored one path at a time by a push-down automaton (PDA) that reads the edge labels and recognizes strings in a context-free grammar.
These queries return the set of all paths in the graph (that start at some set of nodes) where the edge labels on that path form a string in the context-free language recognized by the PDA.
For example the “balanced parentheses” context-free language can be used to encode the problem of finding control-flow graph (CFG) paths that have a balanced number of calls and returns.
Step (2) above can be accomplished by any number of specialized algorithms, but the PDA approach is relatively simple and it’s the only one MATE has so far.
The state of the PDA and its set of transitions are kept in a dedicated database tables, see
_make_path_table
and_make_transition_table
respectively, and the comments on their columns especially.To perform a transition, the PDA state table is first joined against its transition table, and then joined against the edge symbol table. This join order is more performant than vice-versa, because the edge table is typically way bigger. At each step, the automation can make multiple transitions (end up in multiple new states).
Because the PDA state is kept in a table, and a table is like a set of tuples (configurations), we get some cycle-avoidance for free. However, this can get more complicated if the stack or trace is kept as part of the PDA configuration/state, because while these are theoretically bounded in practice they can be too big. See the
exploration_bound
parameter tobuild
.- Parameters
PathBase (Type[Path]) –
- build(graph: Graph, *, keep_start: bool = True, keep_trace: bool = False, keep_edge: bool = False, exploration_bound: int = 5000000) Type[Path] ¶
Concretize the path set defined by this builder, with respect to the given graph.
- Parameters
keep_start¶ – record the starting node for each path
keep_trace¶ – record the sequence of edges visited by the path
keep_edge¶ – record the last edge visited by the path
exploration_bound¶ – the maximum number of path configurations to visit during exploration (not a bound on path length)
graph (Graph) –
keep_start (bool) –
keep_trace (bool) –
keep_edge (bool) –
exploration_bound (int) –
- Return type
Type[Path]
All of the
keep_*
options have some performance cost if enabled.keep_trace
is especially expensive.Returns a new Path class, mapped to the view represeting the path set.
- static build_conjoined_predicate(predicates: List[Callable]) Optional[Callable] ¶
- Parameters
predicates (List[Callable]) –
- Return type
Optional[Callable]
- continuing_while(*continue_predicates: Callable[[Type[Path], Type[Edge]], ColumnElement], edge_detail: bool = True) PB ¶
Set a predicate on configurations and edges which must be satisfied by all edges on a valid traversal.
This method takes as argument a callable, which is provided with a Path object of the PathBuilder’s current configuration and an edge object of the underlying graph during build, and must return a valid where-clause-compatible expression.
The optional argument edge_detail specifies whether the predicate inspects properties of the edge other than its source, target, or UUID
- initial_configuration(cte: CTE) PB ¶
Provide an initial set of configurations (node, stack, and info) at which to start the traversal.
This method takes as argument a SQLAlchemy common table expression, which must have columns “uuid”, “stack”, and “info” containing the UUID of the start node, the starting stack, and an information string (which may be null), respectively.
- Parameters
self (PB) –
cte (CTE) –
- Return type
PB
- limited_to(max_length: int) mate_query.db.PB ¶
Limit the maximum length of the traversal.
- Parameters
self (mate_query.db.PB) –
max_length (int) –
- Return type
mate_query.db.PB
- reverse() mate_query.db.PB ¶
Reverse the direction of traversal.
This causes the traversal to be jump from target -> source rather than from source -> target; edge IDs in any resulting trace must be interpreted as such.
Repeated reversals will undo each other.
- Parameters
self (mate_query.db.PB) –
- Return type
mate_query.db.PB
- starting_at(*start_predicates: Callable[[Type[Node]], ColumnElement]) PB ¶
Set a predicate on nodes which starts the traversal.
This method takes as argument a callable, which is provided with a Node object of the underlying graph during build, and must return a valid where-clause-compatible expression.
- Parameters
self (PB) –
start_predicates (Callable[[Type[Node]], ColumnElement]) –
- Return type
PB
- stepping_over(*stepped_over_functions: str) mate_query.db.PB ¶
Specify a set of functions to step over during control-flow traversal.
- Parameters
self (mate_query.db.PB) –
stepped_over_functions (str) –
- Return type
mate_query.db.PB
- stopping_at(*stop_predicates: Callable[[Type[Node]], ColumnElement]) PB ¶
Set a predicate on nodes which stops the traversal.
This method takes as argument a callable, which is provided with a Node object of the underlying graph during build, and must return a valid where-clause-compatible expression.
- Parameters
self (PB) –
stop_predicates (Callable[[Type[Node]], ColumnElement]) –
- Return type
PB
- class mate_query.db.Session(bind=None, autoflush=True, expire_on_commit=True, _enable_transaction_accounting=True, autocommit=False, twophase=False, weak_identity_map=None, binds=None, extension=None, enable_baked_queries=True, info=None, query_cls=None)¶
Bases:
sqlalchemy.orm.session.Session
- graph_from_build(build: mate_query.db.Build) mate_query.db.Graph ¶
- Parameters
build (mate_query.db.Build) –
- Return type
- class mate_query.db.SubgraphBuilder¶
Bases:
object
A builder class for subgraphs.
- Return type
None
- build(graph: mate_query.db.Graph) mate_query.db.Graph ¶
Concretize the subgraph defined by this builder, with respect to the given graph.
- Parameters
graph (mate_query.db.Graph) –
- Return type
- with_edge_kinds(*kinds: mate_common.models.cpg_types.mate.EdgeKind) mate_query.db.SubgraphBuilder ¶
Specify a set of edge kinds to be included within the subgraph.
- Parameters
- Return type
- with_node_kinds(*kinds: mate_common.models.cpg_types.mate.NodeKind) mate_query.db.SubgraphBuilder ¶
Specify a set of node kinds to be included within the subgraph.
- Parameters
- Return type
- exception mate_query.db.UninitializedDatabaseError¶
Bases:
mate_common.error.MateError
An exception denoting an attempted use of an uninitialized database.
- mate_query.db.build_model_classes(build: str) Tuple[Type[BaseNode], Type[BaseEdge]] ¶
- mate_query.db.emit_lock(element: Any, _compiler: Any, **_kw: Dict[str, Any]) str ¶
- Parameters
element (Any) –
_compiler (Any) –
_kw (Dict[str, Any]) –
- Return type
str
- mate_query.db.emit_view(element: Any, compiler: Any, **_kw: Dict[str, Any]) str ¶
- Parameters
element (Any) –
compiler (Any) –
_kw (Dict[str, Any]) –
- Return type
str
- mate_query.db.initialize(connection_string: str, echo: bool = False, create: bool = False) None ¶
- Parameters
connection_string (str) –
echo (bool) –
create (bool) –
- Return type
None
- mate_query.db.join_collapse_limit(session: orm.Session) Iterator[None] ¶
- Parameters
session (orm.Session) –
- Return type
Iterator[None]
- mate_query.db.new_session() sqlalchemy.orm.session.Session ¶
- Return type
sqlalchemy.orm.session.Session
- mate_query.db.session_scope() Iterator[orm.Session] ¶
- Return type
Iterator[orm.Session]
- mate_query.db.set_n_distinct(session: orm.Session, tablename: str, columns: Set[str], n_distinct: int) None ¶
- Parameters
session (orm.Session) –
tablename (str) –
columns (Set[str]) –
n_distinct (int) –
- Return type
None