diff --git a/examples/vulnerable_code/list_append.py b/examples/vulnerable_code/list_append.py new file mode 100644 index 00000000..9c4bbd63 --- /dev/null +++ b/examples/vulnerable_code/list_append.py @@ -0,0 +1,13 @@ +import os + +from flask import request + + +def func(): + TAINT = request.args.get("TAINT") + + cmd = [] + cmd.append("echo") + cmd.append(TAINT) + + os.system(" ".join(cmd)) diff --git a/pyt/cfg/expr_visitor.py b/pyt/cfg/expr_visitor.py index 57537875..d49be28e 100644 --- a/pyt/cfg/expr_visitor.py +++ b/pyt/cfg/expr_visitor.py @@ -21,6 +21,7 @@ ) from .expr_visitor_helper import ( BUILTINS, + MUTATORS, return_connection_handler, SavedVariable ) @@ -59,6 +60,7 @@ def __init__( self.module_definitions_stack = list() self.prev_nodes_to_avoid = list() self.last_control_flow_nodes = list() + self._within_mutating_call = False # Are we already in a module? if module_definitions: @@ -578,6 +580,23 @@ def visit_Call(self, node): else: raise Exception('Definition was neither FunctionDef or ' + 'ClassDef, cannot add the function ') + elif ( + not self._within_mutating_call and + last_attribute in MUTATORS + and isinstance(node.func, ast.Attribute) + ): + # Change list.append(x) ---> list += list.append(x) + # This does in fact propagate as we don't know that append returns None + fake_aug_assign = ast.AugAssign( + target=node.func.value, + op=ast.Add, + value=node, + ) + ast.copy_location(fake_aug_assign, node) + self._within_mutating_call = True # Don't do this recursively + result = self.visit(fake_aug_assign) + self._within_mutating_call = False + return result elif last_attribute not in BUILTINS: # Mark the call as a blackbox because we don't have the definition return self.add_blackbox_or_builtin_call(node, blackbox=True) diff --git a/pyt/cfg/expr_visitor_helper.py b/pyt/cfg/expr_visitor_helper.py index 9667f7c1..a0df3c90 100644 --- a/pyt/cfg/expr_visitor_helper.py +++ b/pyt/cfg/expr_visitor_helper.py @@ -31,6 +31,13 @@ 'flash', 'jsonify' ) +MUTATORS = ( # list.append(x) taints list if x is tainted + 'add', + 'append', + 'extend', + 'insert', + 'update', +) def return_connection_handler(nodes, exit_node): diff --git a/tests/main_test.py b/tests/main_test.py index bc985629..561d8bd1 100644 --- a/tests/main_test.py +++ b/tests/main_test.py @@ -108,11 +108,11 @@ def test_targets_with_recursive(self): excluded_files = "" included_files = discover_files(targets, excluded_files, True) - self.assertEqual(len(included_files), 32) + self.assertEqual(len(included_files), 33) def test_targets_with_recursive_and_excluded(self): targets = ["examples/vulnerable_code/"] excluded_files = "inter_command_injection.py" included_files = discover_files(targets, excluded_files, True) - self.assertEqual(len(included_files), 31) + self.assertEqual(len(included_files), 32) diff --git a/tests/vulnerabilities/vulnerabilities_test.py b/tests/vulnerabilities/vulnerabilities_test.py index 8f5e70f1..a621f4d4 100644 --- a/tests/vulnerabilities/vulnerabilities_test.py +++ b/tests/vulnerabilities/vulnerabilities_test.py @@ -110,12 +110,17 @@ def test_build_sanitiser_node_dict(self): self.assertEqual(sanitiser_dict['escape'][0], cfg.nodes[3]) - def run_analysis(self, path=None): + def run_analysis( + self, + path=None, + adaptor_function=is_flask_route_function, + trigger_file=default_trigger_word_file, + ): if path: self.cfg_create_from_file(path) cfg_list = [self.cfg] - FrameworkAdaptor(cfg_list, [], [], is_flask_route_function) + FrameworkAdaptor(cfg_list, [], [], adaptor_function) initialize_constraint_table(cfg_list) analyse(cfg_list) @@ -123,7 +128,7 @@ def run_analysis(self, path=None): return find_vulnerabilities( cfg_list, default_blackbox_mapping_file, - default_trigger_word_file + trigger_file, ) def test_find_vulnerabilities_assign_other_var(self): @@ -470,6 +475,13 @@ def test_recursion(self): vulnerabilities = self.run_analysis('examples/vulnerable_code/recursive.py') self.assert_length(vulnerabilities, expected_length=2) + def test_list_append_taints_list(self): + vulnerabilities = self.run_analysis( + 'examples/vulnerable_code/list_append.py', + adaptor_function=is_function, + ) + self.assert_length(vulnerabilities, expected_length=1) + class EngineDjangoTest(VulnerabilitiesBaseTestCase): def run_analysis(self, path):