SourceWalker.py #!/usr/bin/python3
#!/usr/bin/python3
import sys
import json
import re
import os
import multiprocessing
import codecs
from pathlib import Path
class SourceWalkerException(Exception):
"""
Raised when there is an error processing the given expressions
TODO: Make error handling more informative and clean up. Should display a message to the user explaing what went wrong and close all open files.
"""
pass
class SourceWalker:
_output_file_names = []
_regexs_regexes = {}
_expr_file_names = {}
def __init__(self, source_dir, output_dir, expr_list, encoding = "ISO-8859-1", process_count = 12, extensions = [ ".c", ".h" ]):
try:
if not os.path.exists(source_dir) or not os.path.exists(output_dir):
raise NotADirectoryError
if process_count < 1:
raise SourceWalkerException("Process count cannot be less than one!")
codecs.lookup(encoding)
if not isinstance(extensions, list):
raise SourceWalkerException("Extensions must be passed as a list!")
for extension in extensions:
if extension[0] != '.':
raise SourceWalkerException("Extensions must start with a \'.\'!")
elif len(extension) <= 1:
raise SourceWalkerException("Extensions must be more than one character long!")
except NotADirectoryError as exception:
raise SourceWalkerException("Directory does not exist! " + str(exception))
else:
self._source_dir = source_dir
self._output_dir = output_dir
self._encoding = encoding
self._expr_list = expr_list
self._process_count = process_count
self._extensions = extensions
self._process_expr_list()
def _process_expr_list(self):
for expr in self._expr_list:
try:
if isinstance(expr, list):
if len(expr) == 0:
raise SourceWalkerException("Expression list cannot be empty!")
output_file_name = expr[0]
if not isinstance(output_file_name, str):
raise SourceWalkerException("Expression sub-lists can only contain strings!")
for sub_expr in expr:
if not isinstance(sub_exproutput_file_name, str):
raise SourceWalkerException("Expression sub-lists can only contain strings!")
elif
for sub_expr in self._regexs.keysexpr:
if not isinstance(sub_expr, str):
raise SourceWalkerException("Expressions"Expression sub-lists can only appear once in the expressioncontain liststrings!")
self._regexs[sub_expr] = re elif sub_expr in self.compile("\s+%s_regexes.keys(\s|,|:|;|\n)+":
% (sub_expr)) # Naieve regex to catch expressions
self._expr_file_names[sub_expr]raise =SourceWalkerException("Expressions self._output_dircan +only output_file_nameappear once in the expression list!")
self._output_file_names.append(self._output_dir + output_file_name)
self._regexes[sub_expr] elif= isinstancere.compile(expr"\s+%s(\s|, dict)|:
|;|\n)+" % (sub_expr)) # Naieve regex to catch ifexpressions
len(expr.keys()) == 0:
raise SourceWalkerException("Expressionself._expr_file_names[sub_expr] dictionary= cannotself._output_dir be+ empty!")output_file_name
self._output_file_names.append(self._output_dir + output_file_name)
= list elif isinstance(expr, dict)[0]:
if len(expr.keys()) == 0:
raise SourceWalkerException("Expression dictionary cannot be empty!")
print("Writing to file " + output_file_name = list(expr)[0]
if not isinstance(expr[output_file_name], list):
raise SourceWalkerException("Expression dictionary cannot be empty!")
for sub_expr in expr[output_file_name]:
if not isinstance(sub_expr, str):
raise SourceWalkerException("Expression sub-lists can only contain strings!")
elif sub_expr in self._regexs_regexes.keys():
raise SourceWalkerException("Expressions can only appear once in the expression list!")
self._regexs[sub_expr]_regexes[sub_expr] = re.compile("\s+%s(\s|,|:|;|\n)+" % (sub_expr))
self._expr_file_names[sub_expr] = self._output_dir + output_file_name
self._output_file_names.append(self._output_dir + output_file_name)
elif isinstance(expr, str):
if expr in self._regexs_regexes.keys():
raise SourceWalkerException("Expressions can only appear once in the expression list!")
self._output_file_names.append(self._output_dir + expr)
self._regexs[expr]_regexes[expr] = re.compile("\s+%s(\s|,|:|;|\n)+" % (expr))
self._expr_file_names[expr] = self._output_dir + expr
else:
raise SourceWalkerException("Expression list can only contain dictionaries, lists, and strings!")
except SourceWalkerException as exception:
self.cleanup()
raise
def _process_files(self, input_files, output_files, mutexes): # Find way to process different types of source file, I'd rather not be limited to C only...
for file_name in iter(input_files.get, None):
with open(file_name, "r", encoding = self._encoding) as file_object:
in_multi_comment = False
in_single_comment = False
in_string = False
prev_char = ''
comment = ''
for line_num, line in enumerate(file_object, 1):
for char in line:
if char == '/':
if in_string or in_single_comment:
prev_char = char
continue
if prev_char == '*':
in_multi_comment = False
comment += char
for expr in self._regexs_regexes.keys():
if self._regexs[expr]_regexes[expr].search(comment):
mutexes[expr].acquire()
os.write(output_files[expr], ("%s: %s %s\n" % (file_name, str(line_num), comment)).encode())
mutexes[expr].release()
comment = ''
elif prev_char == '/':
in_single_comment = True
comment += prev_char
elif char == '*':
if in_string or in_single_comment or in_multi_comment:
if in_single_comment or in_multi_comment:
comment += char
prev_char = char
continue
if prev_char == '/':
in_multi_comment = True
comment += prev_char
elif char == '"':
if prev_char == '\\' or in_single_comment or in_multi_comment:
prev_char = char
continue
in_string = not in_string
prev_char = char
if in_single_comment or in_multi_comment:
comment += char
if in_single_comment:
in_single_comment = False
for expr in self._regexs_regexes.keys():
if self._regexs[expr]_regexes[expr].search(comment):
mutexes[expr].acquire()
os.write(output_files[expr], ("%s: %s %s" % (file_name, str(line_num), comment)).encode())
mutexes[expr].release()
comment = ''
def walk(self):
input_files = multiprocessing.Queue(0)
processes = []
mutexes = {}
output_files = {}
for fname in self._output_file_names:
try:
file_handle = os.open(fname, os.O_WRONLY | os.O_CREAT)
mutex = multiprocessing.Lock()
except IOError:
for file in output_files.keys():
output_files[file].close()
raise SourceWalkerException("Error: Could not open output file %s, skipping!" % fname)
for expr in self._expr_file_names.keys():
if self._expr_file_names[expr] == fname:
output_files[expr] = file_handle
mutexes[expr] = mutex
for root, dirs, file_names in os.walk(self._source_dir):
for file_name in file_names:
if any(ext in Path(file_name).suffix for ext in self._extensions):
input_files.put(os.path.join(root, file_name))
for i in range(self._process_count):
input_files.put(None)
for cur_process in range(self._process_count):
process = multiprocessing.Process(target = self._process_files, args = (input_files, output_files, mutexes))
processes.append(process)
process.start()
for i in range(1, self._process_count):
processes[i].join()
for file in output_files.keys(): # Close the file associated with each expression
try:
os.close(output_files[file]) # Since multiple expressions can be associated with the same file we need to avoid invalid file closures
except:
pass