Compute ingestible control flow graph from source code
Asked Answered
T

4

9

I know that there are ways to automatically generate a CFG (C ontrol F low G raph) from source code. However, from what I understand, those methods give me a visual graph - an image. I can't really use such an image to do any computation with.

Thus my question: is there a way to automatically generate a CFG from source code such that the source code is returned to me in some data structure or file that is programmatically parseable? (ideally, I'd like to have access to the line numbers for each node/edge in the CFG as well)

I will be using this for a project that uses such a CFG to extract control flow paths to determine input path coverage (which I will solve using trace)

Important: The code that I am trying to analyze is written in python; and I want to perform the analysis with python

Teledu answered 9/1, 2013 at 1:8 Comment(0)
H
3

Q: Is it possible to get control flow graph of the application with the llvm infrastructure in terms of basic blocks?

A:Yes, given a BasicBlock*, you can iterate over the pred/succ blocks in the CFG like this:

#include "llvm/Support/CFG.h"

   for (pred_iterator PI = pred_begin(BB), E = pred_end(BB); PI != E; ++PI) {
     BasicBlock *OnePredecessor = *PI;
     ...

For successors, s/pred/succ/

Q:Is there any way of viewing the Control flow graph of the application.

A:Yes, use: analyze -print-cfg X.(bc|ll) or: analyze -print-cfg-only X.(bc|ll)

These will emit "dot" files that can be turned into a variety of graphics formats with the graphviz toolset that you can get on the net.

http://lists.cs.uiuc.edu/pipermail/llvmdev/2005-June/004416.html That shows you how to programatically extract CFG AND using an auxiliary tool to print CFG to console.

Hagbut answered 9/1, 2013 at 5:57 Comment(1)
Thank you for your response. I notice now that my question was unclear with one respect: the sourcecode that I am trying to analyze is in python. Is there an (analogous) way of analyzing python code?Teledu
W
2

Have you looked into python's ast module. It won't do exactly what you're looking for, but you could probably build your own with greater ease using it.

Edit To answer the question in the comment:

If you save the following in test.py

def add(a, b):
    return a + b


def mult(a, b):
    result = 0
    for i in range(b):
        result += add(result, a)
    return result

Then do

import _ast


test = compile(open('test.py').read(), 'test.py', 'exec', _ast.PyCF_ONLY_AST)

Then you have test which is a Module class and which has a body attribute. This body attribute is a list of everything in the module, so in this example:

test.body  # >>> [<_ast.FunctionDef object at 0x...>, <_ast.FunctionDef object at 0x...>]
test.body[0].name  # >>> 'add'
(test.body[0].lineno, test.body[0].col_offset)  # >>> 1, 0
test.body[0].body  # >>> [<_ast.Return object at 0x...>]

test.body[1].body  # >>> [<_ast.Assign ...>, <_ast.For ...>, <_ast.Return ...>]
# etc.

You can find which classes you want to track and note that each statement has a different class associated with it.

It probably won't be as easy as I first thought but it shouldn't be horrible. Also, to do it iteratively you'll need the ast module which has the function iter_child_nodes, e.g.,

test2 = next(ast.iter_child_nodes(test))
test2 is test.body[0]
Weksler answered 9/1, 2013 at 21:21 Comment(8)
No I have not. Thanks for the recommendation. Would you be able to provide an example of how I might be able to use it (with say a function that multiplies two numbers by repeated calls to an addition function: def add(a,b): return a+b and def mult(a,b): answer = 0; for i in range(b): answer += add(answer,a); return answer)?Teledu
Thanks for the update. I've been playing around with your example, and it seems that _ast doesn't capture the fact that mult calls add. Please tell me that I'm wrongTeledu
test.body[1].body[1].body[0].value.func.id == 'add'Weksler
I just figured that out and came to delete my comment. Thank you very much (have an upvote :))Teledu
This has actually inspired me to write a CFG for distribution based on the ast module for python. Should be fun.Weksler
I like the way you think. I have just started writing my own (about an hour into production). Wanna collaborate on this?Teledu
@Teledu if you've open sourced yours, I'll take a look but I have fairly strong design opinions and I'm still fleshing my design out.Weksler
let us continue this discussion in chatTeledu
A
1
import codecs
import pydot
import ast
import astunparse
import os
import time

#  python3.6   conda   TensorFlow

class FunctionVisitor(ast.NodeVisitor):
    def __init__(self):
        self.number = 0 # 节点编号
        self.s = '' # 简化ast串
        self.n2title = {} # 编号 -> 标题

    def print(self, v):
        self.s += str(v)

    def generic_visit(self, node):
        ast.NodeVisitor.generic_visit(self, node)

    def visit_FunctionDef(self, node):
        self.print('(')
        siz = len(node.body)
        for i in range(siz):
            fun_name = 'visit_' + type(node.body[i]).__name__
            if hasattr(self, fun_name):
                getattr(self, fun_name)(node.body[i])
            else:
                self.number += 1
                self.print(self.number)
                self.n2title[str(self.number)] = astunparse.unparse(node.body[i]).lstrip().rstrip()
            if i != siz - 1:
                self.print(',')
        self.print(')')

    # 忽略掉import等语句
    def visit_Import(self, node):
        ast.NodeVisitor.generic_visit(self, node)

    def visit_ImportFrom(self, node):
        ast.NodeVisitor.generic_visit(self, node)

    def visit_alias(self, node):
        ast.NodeVisitor.generic_visit(self, node)

    # if语句
    def visit_If(self, node):
        # 条件
        self.print('i(')
        self.number += 1
        self.print(self.number)
        self.n2title[str(self.number)] = astunparse.unparse(node.test).lstrip().rstrip()
        # true分支
        self.print(',(')
        siz = len(node.body)
        for i in range(siz):
            fun_name = 'visit_' + type(node.body[i]).__name__
            if hasattr(self, fun_name):
                getattr(self, fun_name)(node.body[i])
            else:
                self.number += 1
                self.print(self.number)
                self.n2title[str(self.number)] = astunparse.unparse(node.body[i]).lstrip().rstrip()
            if i != siz - 1:
                self.print(',')
        # else分支
        self.print('),(')
        siz = len(node.orelse)
        for i in range(siz):
            fun_name = 'visit_' + type(node.orelse[i]).__name__
            if hasattr(self, fun_name):
                getattr(self, fun_name)(node.orelse[i])
            else:
                self.number += 1
                self.print(self.number)
                self.n2title[str(self.number)] = astunparse.unparse(node.orelse[i]).lstrip().rstrip()
            if i != siz - 1:
                self.print(',')
        self.print('))')

    # for 语句
    def visit_For(self, node):
        # 迭代器
        self.print('x(')
        self.number += 1
        self.print(self.number)
        a = astunparse.unparse(node.target).lstrip().rstrip()
        b = astunparse.unparse(node.iter).lstrip().rstrip()
        self.n2title[str(self.number)] = a + ' in ' + b
        # 循环体
        self.print(',(')
        siz = len(node.body)
        for i in range(siz):
            fun_name = 'visit_' + type(node.body[i]).__name__
            if hasattr(self, fun_name):
                getattr(self, fun_name)(node.body[i])
            else:
                self.number += 1
                self.print(self.number)
                self.n2title[str(self.number)] = astunparse.unparse(node.body[i]).lstrip().rstrip()
            if i != siz - 1:
                self.print(',')
        self.print('))')

    # while语句
    def visit_While(self, node):
        # 条件
        self.print('x(')
        self.number += 1
        self.print(self.number)
        self.n2title[str(self.number)] = astunparse.unparse(node.test).lstrip().rstrip()
        # 循环体
        self.print(',(')
        siz = len(node.body)
        for i in range(siz):
            fun_name = 'visit_' + type(node.body[i]).__name__
            if hasattr(self, fun_name):
                getattr(self, fun_name)(node.body[i])
            else:
                self.number += 1
                self.print(self.number)
                self.n2title[str(self.number)] = astunparse.unparse(node.body[i]).lstrip().rstrip()
            if i != siz - 1:
                self.print(',')
        self.print('))')

    def visit_Break(self, node):
        self.print('b')
        self.number += 1
        self.print(self.number)

    def visit_Continue(self, node):
        self.print('c')
        self.number += 1
        self.print(self.number)

    def visit_Return(self, node):
        self.print('r')
        self.number += 1
        self.print(self.number)
        if node.value == None:
            self.n2title['r' + str(self.number)] = 'return'
        else:
            self.n2title['r' + str(self.number)] = 'return ' + astunparse.unparse(node.value).lstrip().rstrip()

class CFG():
    def __init__(self):
        self.special_node = {}
        self.s = ''
        self.n2title = None
        self.idx = 0
        self.graph = None
    
    # 新建节点
    def new_node(self, title, node_shape='box', node_color='gray', text_fontsize=15):
        if title[0] == 'b':
            return pydot.Node(title, label='break', style='filled', shape=node_shape, fillcolor=node_color, fontname='Consolas', fontsize=text_fontsize)
        elif title[0] == 'c':
            return pydot.Node(title, label='continue', style='filled', shape=node_shape, fillcolor=node_color, fontname='Consolas', fontsize=text_fontsize)
        elif title[0] == 'r':
            return pydot.Node(title, label=self.n2title[title], style='filled', shape=node_shape, fillcolor=node_color, fontname='Consolas', fontsize=text_fontsize)
        elif title in self.n2title:
            return pydot.Node(title, label=self.n2title[title], style='filled', shape=node_shape, fillcolor=node_color, fontname='Consolas', fontsize=text_fontsize)
        else:
            return pydot.Node(title, label=title, style='filled', shape=node_shape, fillcolor=node_color, fontname='Consolas', fontsize=text_fontsize)

    # 新建边
    def new_edge(self, begin, end, title = ''):
        e = pydot.Edge(begin, end, label = title, color='black', arrowhead='open', fontname='Consolas', fontsize=15)
        if begin.get_shape() in ['ellipse', 'diamond']:
            if not begin.get_name() in self.special_node:
                self.special_node[begin.get_name()] = 1
                e.set_color('green')
                e.set_label('yes')
            else:
                e.set_color('red')
                e.set_label('no')
        return e

    # 匹配一个token
    def match(self):
        res = ''
        while self.s[self.idx] not in [',', ')']:
            res += self.s[self.idx]
            self.idx += 1
        return res

    # 检测是否不可达
    def check_unreachable(self, last_nodes):
        # 之前的节点为空显然不可达
        if len(last_nodes) == 0:
            return True
        ok = True
        # 之前全部不可达则当前也不可达
        for node in last_nodes:
            ok &= node.get_fillcolor() == 'red'
        return ok

    # 构建分支语句CFG
    def build_If(self, last_nodes, continue_node, break_nodes, return_nodes):
        self.idx += 1
        if not self.check_unreachable(last_nodes):
            node0 = self.new_node(self.match(), node_shape='diamond', node_color='orange')
        else: # 不可达结点置为红色
            node0 = self.new_node(self.match(), node_shape='diamond', node_color='red')
        self.graph.add_node(node0)
        for node in last_nodes:
            self.graph.add_edge(self.new_edge(node, node0))
        self.idx += 1
        nodes1 = self.build_CFG([node0], continue_node, break_nodes, return_nodes)
        self.idx += 1
        nodes2 = self.build_CFG([node0], continue_node, break_nodes, return_nodes)
        self.idx += 1
        nodes1.extend(nodes2)
        return nodes1

    # 构建循环语句CFG
    def build_For(self, last_nodes, return_nodes):
        self.idx += 1
        if not self.check_unreachable(last_nodes):
            node0 = self.new_node(self.match(), node_shape='ellipse', node_color='deeppink')
        else:
            node0 = self.new_node(self.match(), node_shape='ellipse', node_color='red')
        self.graph.add_node(node0)
        for node in last_nodes:
            self.graph.add_edge(self.new_edge(node, node0))
        self.idx += 1
        break_nodes = [] # 收集break节点
        nodes1 = self.build_CFG([node0], node0, break_nodes, return_nodes)
        for node in nodes1:
            self.graph.add_edge(self.new_edge(node, node0))
        self.idx += 1
        break_nodes.append(node0)
        return break_nodes

    # 构建CFG
    def build_CFG(self, last_nodes, continue_node, break_nodes, return_nodes):
        self.idx += 1
        while self.s[self.idx] != ')':
            if self.s[self.idx] == ',':
                self.idx += 1
                continue
            if self.s[self.idx] == 'i': # 分支语句
                self.idx += 1
                last_nodes = self.build_If(last_nodes, continue_node, break_nodes, return_nodes)
            elif self.s[self.idx] == 'x': # 循环语句
                self.idx += 1
                last_nodes = self.build_For(last_nodes, return_nodes)
            elif self.s[self.idx] == 'b': # break语句
                if not self.check_unreachable(last_nodes):
                    node0 = self.new_node(self.match())
                else:
                    node0 = self.new_node(self.match(), node_color='red')
                self.graph.add_node(node0)
                break_nodes.append(node0)
                for node in last_nodes:
                    self.graph.add_edge(self.new_edge(node, node0))
                last_nodes.clear()
            elif self.s[self.idx] == 'c': # continue语句
                if not self.check_unreachable(last_nodes):
                    node0 = self.new_node(self.match())
                else:
                    node0 = self.new_node(self.match(), node_color='red')
                self.graph.add_node(node0)
                for node in last_nodes:
                    self.graph.add_edge(self.new_edge(node, node0))
                self.graph.add_edge(self.new_edge(node0, continue_node))
                last_nodes.clear()
            elif self.s[self.idx] == 'r': # return语句
                if not self.check_unreachable(last_nodes):
                    node0 = self.new_node(self.match())
                else:
                    node0 = self.new_node(self.match(), node_color='red')
                self.graph.add_node(node0)
                return_nodes.append(node0)
                for node in last_nodes:
                    self.graph.add_edge(self.new_edge(node, node0))
                last_nodes.clear()
            else: # 一般语句
                if not self.check_unreachable(last_nodes):
                    node0 = self.new_node(self.match())
                else:
                    node0 = self.new_node(self.match(), node_color='red')
                self.graph.add_node(node0)
                for node in last_nodes:
                    self.graph.add_edge(self.new_edge(node, node0))
                last_nodes = [node0]
        self.idx += 1
        return last_nodes

    # 从ast构建CFG
    def build_CFG_from_ast(self, tree):
        visitor = FunctionVisitor()
        visitor.visit(tree)
        self.special_node.clear()
        self.s = visitor.s
        self.n2title = visitor.n2title
        self.idx = 0
        self.graph = pydot.Dot(graph_type = 'digraph')
        node_entry = self.new_node('Entry', node_shape='Msquare', node_color='green')
        self.graph.add_node(node_entry)
        return_nodes = [] # 收集return节点
        last_nodes = self.build_CFG([node_entry], None, [], return_nodes)
        last_nodes.extend(return_nodes)
        node_end = self.new_node('End', node_shape='Msquare', node_color='brown')
        self.graph.add_node(node_end)
        for node in last_nodes:
            self.graph.add_edge(self.new_edge(node, node_end))
        return self.graph

class CFGGenerator(ast.NodeVisitor):
    def __init__(self, cfg, folder):
        self.cfg = cfg # CFG对象
        self.folder = folder # 保存CFG的文件夹
        self.prefix = '' # 类名前缀

    def generic_visit(self, node):
        ast.NodeVisitor.generic_visit(self, node)

    def visit_ClassDef(self, node):
        self.prefix = node.name
        ast.NodeVisitor.generic_visit(self, node)
    
    def visit_FunctionDef(self, node):
        graph = self.cfg.build_CFG_from_ast(node)
        if len(self.prefix) == 0:
            path = node.name + '.png'
        else:
            path = self.prefix + '.' + node.name + '.png'
        path = os.path.join(self.folder, path)
        graph.write_png(path)
        print('generate ' + path)

if __name__ == '__main__':
    try:
        # 解析参数
        # parser = argparse.ArgumentParser()
        # parser.add_argument('file', type = str)
        # parser.add_argument('-o', '--output', type = str, default = '.')
        # args = parser.parse_args()
        # fp = args.file
        # folder = args.output
        fp = "relativate.py"
        folder = "CFG/"
        # 读取文件内容
        file = codecs.open(fp, 'r', 'utf-8')
        code = file.read()
        file.close()
        st = time.time()
        # 解析成AST
        tree = ast.parse(code)
        # 生成CFG
        cfg = CFG()
        generator = CFGGenerator(cfg, folder)
        generator.visit(tree)
        ed = time.time()
        print("time: %.3f s" % (ed - st))
    except Exception as e:
        print('Error:', e)

enter image description here

enter image description here

Apodaca answered 10/2, 2020 at 8:24 Comment(1)
The code can be run directly, used to build the control flowchart of the function.Apodaca
N
0

I found py2cfg has a better representation of Control Flow Graph (CFG) than one from staticfg.

Let's take this function in Python:

    def fib():
        a, b = 0, 1
        while True:
            yield a
            a, b = b, a + b

    fib_gen = fib()
    for _ in range(10):
        next(fib_gen)

Image from StaticCFG: enter image description here

Image from PY2CFG: enter image description here

Nagel answered 13/6, 2022 at 8:25 Comment(3)
Thank you. This seems really helpful. Would you know if this also provides a data structure that I could read with python code, that captures the same information?Teledu
Yes, it gives an intermediate data structure before visualizing. I think that we need to dig a bit into the documentation to tweak it as per our needs.Nagel
Could you please post an example output data structure for your sample code?Teledu

© 2022 - 2024 — McMap. All rights reserved.