"""
Program Analysis
================
:Author: Caterina Urban
"""
import ast
import os
from abc import abstractmethod
from queue import Queue
from typing import Set
from lyra.core.cfg import Loop
from lyra.core.expressions import VariableIdentifier, LengthIdentifier
from lyra.core.statements import Assignment, VariableAccess, Call, TupleDisplayAccess
from lyra.core.types import ListLyraType, SequenceLyraType
from lyra.engine.result import AnalysisResult
from lyra.frontend.cfg_generator import ast_to_cfg, StringLyraType
from lyra.visualization.graph_renderer import AnalysisResultRenderer
[docs]class Runner:
"""Analysis runner."""
def __init__(self):
self._path = None
self._source = None
self._tree = None
self._cfg = None
@property
def path(self):
return self._path
@path.setter
def path(self, path):
self._path = path
@property
def source(self):
return self._source
@source.setter
def source(self, source):
self._source = source
@property
def tree(self):
return self._tree
@tree.setter
def tree(self, tree):
self._tree = tree
@property
def cfg(self):
return self._cfg
@cfg.setter
def cfg(self, cfg):
self._cfg = cfg
[docs] @abstractmethod
def interpreter(self):
"""Control flow graph interpreter."""
[docs] @abstractmethod
def state(self):
"""Initial analysis state."""
@property
def variables(self) -> Set[VariableIdentifier]:
variables = set()
visited, worklist = set(), Queue()
worklist.put(self.cfg.in_node)
while not worklist.empty():
current = worklist.get()
if current.identifier not in visited:
visited.add(current.identifier)
for stmt in current.stmts:
if isinstance(stmt, Assignment) and isinstance(stmt.left, VariableAccess):
variable = stmt.left.variable
variables.add(variable)
if isinstance(variable.typ, SequenceLyraType):
variables.add(LengthIdentifier(variable))
if isinstance(current, Loop):
edges = self.cfg.edges.items()
conds = [edge.condition for nodes, edge in edges if nodes[0] == current]
for cond in [c for c in conds if isinstance(c, Call)]:
for arg in cond.arguments:
if isinstance(arg, VariableAccess):
variable = arg.variable
variables.add(arg.variable)
if isinstance(variable.typ, SequenceLyraType):
variables.add(LengthIdentifier(variable))
elif isinstance(arg, TupleDisplayAccess):
for i in arg.items:
variables.add(i.variable)
for node in self.cfg.successors(current):
worklist.put(node)
return variables
[docs] def main(self, path):
self.path = path
with open(self.path, 'r') as source:
self.source = source.read()
self.tree = ast.parse(self.source)
self.cfg = ast_to_cfg(self.tree)
return self.run()
[docs] def run(self) -> AnalysisResult:
result = self.interpreter().analyze(self.state())
self.render(result)
return result
[docs] def render(self, result):
renderer = AnalysisResultRenderer()
data = (self.cfg, result)
name = os.path.splitext(os.path.basename(self.path))[0]
label = f"CFG with Analysis Result for {name}"
directory = os.path.dirname(self.path)
renderer.render(data, filename=name, label=label, directory=directory, view=True)