0

I have a Python file:

from concurrent.futures import ProcessPoolExecutor
import tarfile, rapidgzip

def processNdjson(ndjsonName):
    with rapidgzip.open(inTarDir) as myZip:
        myZip.import_index(rapidgzipDir)
        with tarfile.open(fileobj=myZip, mode="r:*") as f:
            member = f.getmember(ndjsonName)
            dataFile = f.extractfile(member)
            for oneLine in dataFile:
                # process oneLine here

if __name__ == "__main__":
    inTarDir = ...
    rapidgzipDir = ...
    nCore = 5
    ndjsonNames = ["name1.ndjson", "name2.ndjson"]

    with ProcessPoolExecutor(nCore) as pool:
        results = pool.map(worker, ndjsonNames)

Above,

  • inTarDir is the directory to a .tar.gz file that contains multiple .ndjson files.
  • rapidgzipDir is the pre-index file to be used by rapidgzip. This allows fast random access and is a drop-in replacement for the built-in Python gzip.GzipFile.
  • Each process will
with rapidgzip.open(inTarDir) as myZip:
    myZip.import_index(rapidgzipDir)
    with tarfile.open(fileobj=myZip, mode="r:*") as f:

My concern: each command myZip.import_index(rapidgzipDir) will take up a certain amount of RAM (for example, 500MB for a 20GB .tar.gz file). This will grow linearly with nCore.

Is there any way to avoid the multiple import of the same rapidgzipDir?

3
  • Given that rapidgzip is already parallelized internally, wouldn't it make sense to have only a few processes (or just the main process) read the files and then pass the content on to another, larger set of processes for parsing? Sharing byte arrays across processes is much simpler than sharing complex index data structures Commented 16 hours ago
  • @Homer512 i tried processing ndjson files sequentially and parallel processing within each of them. However, this is slower than parallel processing ndjson files and sequentially processing within each ndjson file. For record, the former method takes 30 mins while the latter takes 20 mins. Commented 16 hours ago
  • You can only share data structures between processes that are known to be address-free (no pointers). A quick look at the code shows that this is not the case here. In theory you could load the index into memory and then os.fork() but that's generally not possible in complex applications because it kills all threads without cleanup and can have other weird effects, e.g. sharing file descriptors. Maybe I'm missing something in rapidgzip but I don't think it's possible without changing the C++ implementation Commented 16 hours ago

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.