4

I have a 25Gb plaintext file with ~10 million lines, several hundred words per line. Each line needs to be individually processed and I'm trying to split off chunks to a dozen workers to be processed in parallel. Currently loading in a million lines at a time (this for some reason takes up ~10Gb in RAM even though it's only ~3Gb uncompressed on disk,) splitting it evenly 12 ways, and then mapping it to 12 workers using multiprocessing.Pool.

Problem is when each of my 12 workers finish processing their allocated data, their RAM is not being freed and only increases another ~10Gb on the next million line iteration.

I've tried "del"'ing the previous data, resetting the previous data to an empty allocation, creating iterable variable names with eval(), gc.collect() after deletion, and entirely separating the IO to its own function, all with no luck and the exact same issue. Running debug shows that the python interpreter is only recognizing the expected data and data from previous iteration is not accessible, so why isn't the RAM actually being freed?

The code below is my latest iteration of trying to separate all the environments, not the most efficient but "BigFileOnDisk" is on an SSD so re-reading through the file each time is negligible compared to actually processing the data. Previously had the "read" functionality within the allocation function, deleting all data after workers finished, with same results.

def allocation():
    fileCompleted = False
    currentLine = 0
    while not fileCompleted:
        lineData, currentLine, fileCompleted = read(numLines=1000000, startLine=currentLine)
        list_of_values(function_object=worker, inputs=lineData, workers=12)


def read(numLines, startLine=0):
    currentLine = 0
    lines = []
    with open(BigFileOnDisk, 'r') as fid:
        for line in fid:
            if currentLine >= startLine:
                lines.append(line)
            if currentLine - startLine >= numLines:
                return lines, counter, False
            currentLine += 1
        # or if we've hit the end of the file
        return lines, counter, True


def worker(lines):
    outputPath = *root* + str(datetime.datetime.now().time())
    processedData = {}

    for line in lines:
        # process data

    del lines
    with open(outputPath, 'a') as fid:
        for item in processedData:
            fid.write(str(item) + ', ' + str(processedData[item]) + '\n')


def list_of_values(function_object, inputs, workers = 10):
    inputs_split = []
    subsection_start = 0
    for n in range(workers):
        start = int(subsection_start)
        end = int(subsection_start + len(inputs) / workers)
        subsection_start = end

        inputs_split.append( inputs[start:end] )

    p = Pool(workers)
    p.map(function_object, inputs_split)
9
  • We need to see your code!! Commented Jul 19, 2016 at 23:30
  • @kirbyfan64sos Preferably a minimal reproducible example of it. Commented Jul 19, 2016 at 23:43
  • Code has been posted Commented Jul 19, 2016 at 23:54
  • Try adding del linedata to the end of the loop in allocation, and del inputs to the end of list_of_values. Commented Jul 20, 2016 at 0:12
  • See this answer. Also see this article. Commented Jul 20, 2016 at 0:13

1 Answer 1

4

You are not joining sub processes. After list_of_values done processes created by Pool still alive (kinda, more like zombie, but with alive parent process). They still hold all their values. You can't see their data in main because it in another processes (for same reason gc.collect not working).

To free memory allocated by workers you need to manually join Pool or use with.

def list_of_values(function_object, inputs, workers = 10):
    inputs_split = []
    subsection_start = 0
    for n in range(workers):
        start = int(subsection_start)
        end = int(subsection_start + len(inputs) / workers)
        subsection_start = end

        inputs_split.append( inputs[start:end] )

    with Pool(workers) as p:
        p.map(function_object, inputs_split)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.