Source code for lyra.engine.backward

"""
Backward Analysis Engine
========================

:Author: Caterina Urban
"""

from collections import deque
from copy import deepcopy
from queue import Queue
from typing import List, Optional

from lyra.engine.interpreter import Interpreter
from lyra.engine.result import AnalysisResult
from lyra.semantics.backward import BackwardSemantics

from lyra.abstract_domains.state import State
from lyra.core.cfg import Basic, Loop, Conditional, Edge, Node


[docs]class BackwardInterpreter(Interpreter): """Backward control flow graph interpreter.""" def __init__(self, cfg, semantics: BackwardSemantics, widening, precursory=None): """Backward control flow graph interpreter construction. :param cfg: control flow graph to analyze :param semantics: semantics of statements in the control flow graph :param widening: number of iterations before widening :param precursory: precursory control flow graph interpreter """ super().__init__(cfg, semantics, widening, precursory) @property def semantics(self): return self._semantics
[docs] def analyze(self, initial: State) -> AnalysisResult: from lyra.engine.forward import ForwardInterpreter # run the precursory analysis (if any) if self.precursory: # there is a precursory analysis to be run pre_result: Optional[AnalysisResult] = self.precursory.analyze(initial.precursory) else: # there is no precursory analysis to be run pre_result: Optional[AnalysisResult] = None # prepare the worklist and iteration counts worklist = Queue() worklist.put(self.cfg.out_node) iterations = {node: 0 for node in self.cfg.nodes} while not worklist.empty(): current: Node = worklist.get() # retrieve the current node iteration = iterations[current.identifier] # retrieve the previous exit state of the node if current in self.result.result: previous = deepcopy(self.result.get_node_result(current)[-1]) else: previous = None # compute the current exit state of the current node entry = deepcopy(initial) if current.identifier != self.cfg.out_node.identifier: entry.bottom() # join incoming states edges = self.cfg.out_edges(current) for edge in edges: if edge.target in self.result.result: successor = deepcopy(self.result.get_node_result(edge.target)[0]) else: successor = deepcopy(initial).bottom() # handle unconditional non-default edges if edge.kind == Edge.Kind.IF_OUT: successor = successor.enter_if() elif edge.kind == Edge.Kind.LOOP_OUT: successor = successor.enter_loop() # handle conditional edges if isinstance(edge, Conditional) and edge.kind == Edge.Kind.DEFAULT: branch = any(edge.kind == Edge.Kind.IF_IN for edge in edges) loop = any(edge.kind == Edge.Kind.LOOP_IN for edge in edges) assert (branch or loop) and not (branch and loop) successor = successor.enter_if() if branch else successor successor = successor.enter_loop() if loop else successor if pre_result: # a precursory analysis was run if isinstance(self.precursory, ForwardInterpreter): precursory = pre_result.get_node_result(edge.source)[-1] else: assert isinstance(self.precursory, BackwardInterpreter) precursory = pre_result.get_node_result(edge.target)[0] else: # no precursory analysis was run precursory = None successor = successor.before(edge.condition.pp, precursory) successor = self.semantics.semantics(edge.condition, successor).filter() successor = successor.exit_if() if branch else successor successor = successor.exit_loop() if loop else successor elif edge.kind == Edge.Kind.IF_IN: assert isinstance(edge, Conditional) if pre_result: # a precursory analysis was run if isinstance(self.precursory, ForwardInterpreter): precursory = pre_result.get_node_result(edge.source)[-1] else: assert isinstance(self.precursory, BackwardInterpreter) precursory = pre_result.get_node_result(edge.target)[0] else: # no precursory analysis was run precursory = None successor = successor.before(edge.condition.pp, precursory) successor = self.semantics.semantics(edge.condition, successor).filter() successor = successor.exit_if() elif edge.kind == Edge.Kind.LOOP_IN: assert isinstance(edge, Conditional) if pre_result: # a precursory analysis was run if isinstance(self.precursory, ForwardInterpreter): precursory = pre_result.get_node_result(edge.source)[-1] else: assert isinstance(self.precursory, BackwardInterpreter) precursory = pre_result.get_node_result(edge.target)[0] else: # no precursory analysis was run precursory = None successor = successor.before(edge.condition.pp, precursory) successor = self.semantics.semantics(edge.condition, successor).filter() successor = successor.exit_loop() entry = entry.join(successor) # widening if isinstance(current, Loop) and self.widening < iteration: entry = deepcopy(previous).widening(entry) # check for termination and execute block if previous is None or not entry.less_equal(previous): states = deque([entry]) if isinstance(current, Basic): successor = entry if pre_result: # a precursory analysis was run pre_states: List[Optional[State]] = pre_result.get_node_result(current) if isinstance(self.precursory, ForwardInterpreter): pre_states = pre_states[:-1] else: assert isinstance(self.precursory, BackwardInterpreter) pre_states = pre_states[1:] else: # no precursory analysis was run pre_states: List[Optional[State]] = [None] * len(current.stmts) for precursory, stmt in zip(reversed(pre_states), reversed(current.stmts)): successor = successor.before(stmt.pp, precursory) successor = self.semantics.semantics(stmt, deepcopy(successor)) states.appendleft(successor) elif isinstance(current, Loop): # nothing to be done pass self.result.set_node_result(current, list(states)) # update worklist and iteration count for node in self.cfg.predecessors(current): worklist.put(node) iterations[current.identifier] = iteration + 1 return self.result