I have a Python file:
from concurrent.futures import ProcessPoolExecutor
import tarfile, rapidgzip
def processNdjson(ndjsonName):
with rapidgzip.open(inTarDir) as myZip:
myZip.import_index(rapidgzipDir)
with tarfile.open(fileobj=myZip, mode="r:*") as f:
member = f.getmember(ndjsonName)
dataFile = f.extractfile(member)
for oneLine in dataFile:
# process oneLine here
if __name__ == "__main__":
inTarDir = ...
rapidgzipDir = ...
nCore = 5
ndjsonNames = ["name1.ndjson", "name2.ndjson"]
with ProcessPoolExecutor(nCore) as pool:
results = pool.map(worker, ndjsonNames)
Above,
inTarDiris the directory to a .tar.gz file that contains multiple .ndjson files.rapidgzipDiris the pre-index file to be used byrapidgzip. This allows fast random access and is a drop-in replacement for the built-in Pythongzip.GzipFile.- Each process will
with rapidgzip.open(inTarDir) as myZip:
myZip.import_index(rapidgzipDir)
with tarfile.open(fileobj=myZip, mode="r:*") as f:
My concern: each command myZip.import_index(rapidgzipDir) will take up a certain amount of RAM (for example, 500MB for a 20GB .tar.gz file). This will grow linearly with nCore.
Is there any way to avoid the multiple import of the same rapidgzipDir?
rapidgzipis already parallelized internally, wouldn't it make sense to have only a few processes (or just the main process) read the files and then pass the content on to another, larger set of processes for parsing? Sharing byte arrays across processes is much simpler than sharing complex index data structuresos.fork()but that's generally not possible in complex applications because it kills all threads without cleanup and can have other weird effects, e.g. sharing file descriptors. Maybe I'm missing something in rapidgzip but I don't think it's possible without changing the C++ implementation