import inspect
import os
import sys
import parser
from ctypes import *
import _ctypes
import stat
import tempfile
import subprocess
import platform
# GLOBALS
#
# Dictionary with list of files that have been "compiled"
processed = {}
# Dictionary that holds pointers to function names in a DLL/SO corresponding to source file/line number
savefunc = {}
# hash of CDLL objects for each source file
mylib = {}
#
# class deletes temporary DLL/SO files. Unfortunately, ctypes has functions for loading DLL/SOs, but not
# for unloading them. Thus deletion must occur at program termination.
#
class AutoCleanup:
def __init__(self):
self.files = []
self.unloadlib = []
def add(self, file):
self.files.append(file)
def unload(self, dll):
self.unloadlib.append(dll)
def __del__(self):
for file in self.files:
try:
os.remove(file)
except:
continue
del self.files
for dll in self.unloadlib:
try:
_unload_library(dll)
except:
continue
del self.unloadlib
_cleanup = AutoCleanup()
def _unload_library(tdll):
if windows:
cdll.kernel32.FreeLibrary(tdll._handle)
else:
libdl = CDLL("libdl.so")
libdl.dlclose(tdll._handle)
class CodeFragment:
funcname = "func"
sourcecode = [""]
arguments = []
variables = []
rettype = ""
prefunc = []
postfunc = []
lineno = 0
gcc = ""
inline = False
def type2c(self, t):
if t == "string":
typ = "char*"
else:
typ = "%s" % t
return typ
def write_func(self, fpo, pfuncname=""):
if pfuncname != "":
funcname = pfuncname
arglist = []
for i in range(len(self.arguments)):
arglist.append(self.type2c(self.arguments[i])+" "+self.variables[i])
fpo.write("\n".join(self.prefunc))
fpo.write("\nextern \"C\" {\n")
if self.inline:
fpo.write("%s %s(%s) {" % (self.type2c(self.rettype), self.funcname, ",".join(arglist)))
fpo.write("\n".join(self.sourcecode))
if self.inline:
fpo.write("}\n")
fpo.write("void %s_post() {\n" % self.funcname)
fpo.write("\n".join(self.postfunc))
fpo.write("}\n")
fpo.write("} //extern C\n")
fpo.write("\n")
return self.funcname
def parse_embed_code(self, plineno, source, pinline):
self.arguments = []
self.variables = []
self.sourcecode = [""]
self.prefunc = [
"#include <string.h>",
"#include <stdio.h>",
"#include <stdlib.h>"]
self.postfunc = []
self.rettype = "int"
self.gcc = ""
self.inline = pinline
self.lineno = plineno
for line in source:
if line.strip().find("#")==0: # find "#" as first non-whitespace
self.prefunc.append(line)
continue
if line.find("GLOBAL")>=0:
exp = line.replace("GLOBAL","", 1)
self.prefunc.append(exp)
continue
if line.find("CC")>=0:
self.gcc = line.replace("CC","", 1)
continue
if line.find("IMPORT")>=0:
(cmd, type, var) = line.split()
self.arguments.append(type)
self.variables.append(var.replace(";",""))
continue
if line.find("RETURN")>=0:
(cmd, rtype, exp) = line.split(None, 3)
self.rettype = rtype
self.sourcecode.append("return %s;" % exp)
continue
if line.find("POST")>=0:
exp = line.replace("POST","", 1)
self.postfunc.append(exp)
continue
self.sourcecode.append(line)
return True
class CodeFile:
def need_reload(self, file):
s1 = os.stat(file)
dll = _get_dll_name(file)
try:
s2 = os.stat(dll)
except:
return True
if s1.st_mtime > s2.st_mtime:
return True
return False
def parse_file(self, file):
funcnum = 0
newer = self.need_reload(file)
comp = EmbedCompile()
gcc = ""
if newer:
fpo = open("%s.cpp" % file, "w")
fpo.write("//%s\n" % file)
for code in self.parse_file_next(file):
funcnum+=1
code.funcname = "func_%d" % funcnum
savefunc[(file, code.lineno)] = code
gcc = code.gcc
if newer:
code.write_func(fpo)
if newer:
fpo.close()
comp.compile(file, gcc)
global mylib
if file not in mylib:
self._load_file_into_library(file)
def _load_file_into_library(self, file):
dll = _get_dll_name(file)
cdll.LoadLibrary(dll)
mylib[file] = CDLL(dll)
_cleanup.unload(mylib[file])
def parse_file_next(self, file):
fp = open(file, "r")
lineno=0
infunc = False
inline = False
for line in fp:
lineno += 1
if line.find("inline_c_precompile(\"\"\"")>=0:
infunc = True
inline = True
funcx = []
continue
if line.find("embed_c_precompile(\"\"\"")>=0:
infunc = True
inline = False
funcx = []
continue
if infunc:
if line.find("\"\"\"")>=0:
code = CodeFragment()
code.parse_embed_code(lineno, funcx, inline)
yield code
infunc = False
inline = False
else:
funcx.append(line.rstrip())
def _type2ctype(t, ev=False):
if t == "string":
typ = "c_char_p"
else:
typ = "c_%s" % t
if ev:
return eval(typ)
else:
return typ
def inline_c_precompile(source):
"""Precompile C/C++ code and run it in-line"""
fr = inspect.currentframe().f_back
(lib, code) = _load_func(fr)
return _call_func(lib, code)
def inline_c(source):
"""Compile at runtime and run code in-line"""
code = CodeFragment()
compile = EmbedCompile()
code.parse_embed_code(0, source.split("\n"), True)
(dll, func) = compile.temp_compile(code)
tdll = cdll.LoadLibrary(dll)
r = _call_func(tdll, code)
_cleanup.unload(tdll)
return r
def embed_c_precompile(source):
"""Precompile function definitions and load the code, but don't execute"""
fr = inspect.currentframe().f_back
(lib, source) = _load_func(fr)
return lib
def embed_c(source):
"""Compile at runtime function definitions and load the code, but don't execute"""
code = CodeFragment()
compile = EmbedCompile()
code.parse_embed_code(0, source.split("\n"), False)
(dll, func) = compile.temp_compile(code)
tdll = cdll.LoadLibrary(dll)
_cleanup.unload(tdll)
return tdll
# Getting gram info is slow on Windows
def _get_source(fr):
lineno = inspect.getframeinfo(fr)[1]
sourcepath = os.path.abspath( inspect.getsourcefile( fr.f_code ) )
return sourcepath, lineno
def _load_func(fr):
(sourcepath, lineno) = _get_source(fr)
_load_func_parse(sourcepath)
lib = mylib[sourcepath]
code = savefunc[sourcepath, lineno]
return lib, code
def _load_func_parse(sourcepath):
global processed
if sourcepath not in processed:
CodeFile().parse_file(sourcepath)
processed[sourcepath] = True
def _call_func(dll, code):
f = getattr(dll, code.funcname)
f.argtypes = []
f.restype = _type2ctype(code.rettype, True)
value = []
fr = inspect.currentframe().f_back.f_back
for v in range(len(code.variables)):
if code.variables[v].find("&") >= 0:
ref = True
varname = code.variables[v].replace("&","")
else:
ref = False
varname = code.variables[v]
vardata = 0
local = False
try:
vardata = fr.f_locals[varname]
local = True
except:
vardata = fr.f_globals[varname]
value.append(vardata)
if code.arguments[v].endswith("*"):
tt = code.arguments[v].rstrip("*")
f.argtypes.append(POINTER(_type2ctype(tt, True) * len(value[v])))
str = "(%s * %d)(*value[v])" % (_type2ctype(tt), len(value[v]))
value[v] = eval(str)
continue
if ref:
f.argtypes.append(_type2ctype(code.arguments[v], True))
value[v] = eval("byref(%s(value[v]))" % _type2ctype(code.arguments[v]))
continue
f.argtypes.append(_type2ctype(code.arguments[v], True))
value[v] = eval("%s(value[v])" % _type2ctype(code.arguments[v]))
r = f(*value)
for v in range(len(code.variables)):
if code.variables[v].find("&") >= 0:
varname = code.variables[v].replace("&","")
if local:
# this doesn't work, but keep it just in case
fr.f_locals[varname] = value[v]._obj.value
else:
fr.f_globals[varname] = value[v]._obj.value
if code.arguments[v].endswith("*"):
varname = code.variables[v]
vals = []
for i in value[v]:
vals.append(i)
if local:
if not _is_tuple(fr.f_locals[varname]):
# this doesn't work (see above)
fr.f_locals[varname] = vals
else:
if not _is_tuple(fr.f_globals[varname]):
fr.f_globals[varname] = vals
f = getattr(dll, "%s_post" % code.funcname)
if f is not None:
f()
return r
def _is_tuple(x):
if type(x) is type(()):
return True
return False
def _get_dll_name(file):
return "%s.cpp.%s" % (file, dllext)
class EmbedCompile:
def testgcc(self, cmd="gcc", opt="--version"):
try:
(ret, output) = self.run_capture([cmd, opt])
except:
raise Exception("GCC not found")
if ret == 0:
return True
raise Exception("GCC not configured", output)
def run_capture(self, command):
(out, filepath) = tempfile.mkstemp(".log")
try:
p = subprocess.Popen(command, stdout=out, stderr=out)
ret = p.wait()
except:
os.close(out)
os.remove(filepath)
raise
os.close(out)
fp = open(filepath, "r")
output = fp.readlines()
fp.close()
os.remove(filepath)
return (ret, output)
def compile(self, file, gcc):
cpp="%s.cpp"%file
lib = _get_dll_name(file)
return self.compile_file(cpp, lib, gcc, True)
def compile_file(self, filename, lib, gcc="", wipe=False):
parms = ["gcc", "-fPIC", "-lstdc++", "-shared", "-o", lib, filename]
(ret, output) = self.run_capture(parms)
if ret == 0:
if wipe:
os.remove(filename)
return lib
else:
raise Exception(filename, ret, parms, output)
def temp_compile(self, code):
(fp, filename) = tempfile.mkstemp(".cpp")
fpo = os.fdopen(fp, "w")
funcname = "func"
code.write_func(fpo, funcname)
fpo.close()
lib = "%s.%s" % (filename, dllext)
self.compile_file(filename, lib, code.gcc, True)
_cleanup.add(lib)
return lib, funcname
if platform.system() == "Linux":
windows = False
dllext = "so"
else:
windows = True
dllext = "dll"
EmbedCompile().testgcc()