3

I have a column of long strings (like sentences) on which I want to do the following:

  1. replace certain characters
  2. create a list of the remaining strings
  3. if a string is all text see whether it is in a dictionary and if so keep it
  4. if a string is all numeric keep it
  5. if a string is a mix of numeric/text, find ratio of numbers to letters and keep if above a threshold

I currently do this as follows:

            for memo_field in self.memo_columns:
                data = data.with_columns(
                    pl.col(memo_field).map_elements(
                        lambda x: self.filter_field(text=x, word_dict=word_dict))
                    )

The filter_field method uses plain python, so:

  • text_sani = re.sub(r'[^a-zA-Z0-9\s\_\-\%]', ' ', text) to replace
  • text_sani = text_sani.split(' ') to split
  • len(re.findall(r'[A-Za-z]', x)) to find num letters for each element in text_sani list (similar for num digits) and ratio is difference divided by overall num characters
  • list comprehension and if to filter list of words

It actually isn't too bad, 128M rows takes about 10 minutes. Unfortunately, future files will be much bigger. On a ~300M row file this approach gradually increases memory consumption until the OS (Ubuntu) kills the process. Also, all processing seems to take place on a single core.

I have started to try to use the Polars string expressions and code and a toy example are provided below.

At this point it looks like my only option is a function call to a do the rest. My questions are:

  1. in my original approach is it normal that memory consumption grows? Does map_elements create a copy of the original series and so consumes more memory?
  2. is my original approach correct or is there a better way eg. I have just started reading about struct in Polars?
  3. is it possible to do what I want using just Polars expressions?

UPDATE

The code example in answers from @Hericks and @ΩΠΟΚΕΚΡΥΜΜΕΝΟΣ were applied and largely addressed my third question. Implementing the Polars expressions greatly reduced run time with two observations:

  1. the complexity of the memo fields in my use-case greatly affect the run time. The key challenge is the look up of items in the dictionary; a large dictionary and many valid words in the memo field can severely affect run time; and
  2. I experienced many seg fault errors when saving in .parquet format when I used pl.DataFrame. When using pl.LazyFrame and sink_parquet there were no errors but run time was greatly extended (drives are NVME SSD at 2000MB/s)

EXAMPLE CODE/DATA:

Toy data:

temp = pl.DataFrame({"foo": ['COOOPS.autom.SAPF124',
                            'OSS REEE PAAA comp. BEEE  atm 6079 19000000070 04-04-2023',
                            'ABCD 600000000397/7667896-6/REG.REF.REE PREPREO/HMO',
                            'OSS REFF pagopago cost. Becf  atm 9682 50012345726 10-04-2023']
                    })

Code Functions:

def num_dec(x):
    return len(re.findall(r'[0-9_\/]', x))

def num_letter(x):
    return len(re.findall(r'[A-Za-z]', x))

def letter_dec_ratio(x):
    if len(x) == 0:
        return None
    nl = num_letter(x)
    nd = num_dec(x)
    if (nl + nd) == 0:       
        return None
    ratio = (nl - nd)/(nl + nd)
    return ratio

def filter_field(text=None, word_dict=None):

    if type(text) is not str or word_dict is None:
        return 'no memo and/or dictionary'

    if len(text) > 100:
        text = text[0:101]
    print("TEXT: ",text)
    text_sani = re.sub(r'[^a-zA-Z0-9\s\_\-\%]', ' ', text) # parse by replacing most artifacts and symbols with space 

    words = text_sani.split(' ') # create words separated by spaces
    print("WORDS: ",words)

    kept = []
    ratios = [letter_dec_ratio(w) for w in words]
    [kept.append(w.lower()) for i, w in enumerate(words) if ratios[i] is not None and ((ratios[i] == -1 or (-0.7 <= ratios[i] <= 0)) or (ratios[i] == 1 and w.lower() in word_dict))]
    print("FINAL: ",' '.join(kept))

    return ' '.join(kept)

Code Current Implementation:

temp.with_columns(
                pl.col("foo").map_elements(
                    lambda x: filter_field(text=x, word_dict=['cost','atm'])).alias('clean_foo') # baseline
                )

Code Partial Attempt w/Polars:

This gets me the correct WORDS (see next code block)

temp.with_columns(
    (
        pl.col(col)
        .str.replace_all(r'[^a-zA-Z0-9\s\_\-\%]',' ')
        .str.split(' ')
    )
)

Expected Result (at each step, see print statements above):

TEXT:  COOOPS.autom.SAPF124
WORDS:  ['COOOPS', 'autom', 'SAPF124']
FINAL:  
TEXT:  OSS REEE PAAA comp. BEEE  atm 6079 19000000070 04-04-2023
WORDS:  ['OSS', 'REEE', 'PAAA', 'comp', '', 'BEEE', '', 'atm', '6079', '19000000070', '04-04-2023']
FINAL:  atm 6079 19000000070 04-04-2023
TEXT:  ABCD 600000000397/7667896-6/REG.REF.REE PREPREO/HMO
WORDS:  ['ABCD', '600000000397', '7667896-6', 'REG', 'REF', 'REE', 'PREPREO', 'HMO']
FINAL:  600000000397 7667896-6
TEXT:  OSS REFF pagopago cost. Becf  atm 9682 50012345726 10-04-2023
WORDS:  ['OSS', 'REFF', 'pagopago', 'cost', '', 'Becf', '', 'atm', '9682', '50012345726', '10-04-2023']
FINAL:  cost atm 9682 50012345726 10-04-2023
6
  • Can you show the remaining steps (full code?) of filter_field? Also, using a for loop like that is not optimal. You can do everything in a single .with_columns by passing all the column names to pl.col() at once. Commented Mar 21, 2024 at 21:54
  • 1
    Please also add the expected output for your example dataframe. Commented Mar 22, 2024 at 5:29
  • @jqurious made mod and added code Commented Mar 22, 2024 at 16:10
  • @TeemuRisikko added in expected output Commented Mar 22, 2024 at 16:10
  • 1
    .name.prefix() e.g. pl.col(multiple_columns).str.foo().str.bar().name.prefix('_cleaned') Commented Mar 22, 2024 at 21:02

2 Answers 2

3

The filtering can be implemented using polars' native expression API as follows. I've taken the regular expressions from the naive implementation in the question.

word_list = ["cost", "atm"]

# to avoid long expressions in ``pl.Expr.list.eval``
num_dec_expr = pl.element().str.count_matches(r'[0-9_\/]').cast(pl.Int32)
num_letter_expr = pl.element().str.count_matches(r'[A-Za-z]').cast(pl.Int32)
ratio_expr = (num_letter_expr - num_dec_expr) / (num_letter_expr + num_dec_expr)

(
    df
    .with_columns(
        pl.col("foo")
        # convert to lowercase
        .str.to_lowercase()
        # replace special characters with space
        .str.replace_all(r"[^a-z0-9\s\_\-\%]", " ")
        # split string at spaces into list of words
        .str.split(" ")
        # filter list of words
        .list.eval(
            pl.element().filter(
                # only keep non-empty string...
                pl.element().str.len_chars() > 0,
                # ...that either 
                # - are in the list of words,
                # - consist only of characters related to numbers,
                # - have a ratio between -0.7 and 0
                pl.element().is_in(word_list) | num_letter_expr.eq(0) | ratio_expr.is_between(-0.7, 0)
            )
        )
        # join list of words into string
        .list.join(" ")
        .alias("foo_clean")
    )
)
shape: (4, 2)
┌───────────────────────────────────────────────────────────────┬──────────────────────────────────────┐
│ foo                                                           ┆ foo_clean                            │
│ ---                                                           ┆ ---                                  │
│ str                                                           ┆ str                                  │
╞═══════════════════════════════════════════════════════════════╪══════════════════════════════════════╡
│ COOOPS.autom.SAPF124                                          ┆                                      │
│ OSS REEE PAAA comp. BEEE  atm 6079 19000000070 04-04-2023     ┆ atm 6079 19000000070 04-04-2023      │
│ ABCD 600000000397/7667896-6/REG.REF.REE PREPREO/HMO           ┆ 600000000397 7667896-6               │
│ OSS REFF pagopago cost. Becf  atm 9682 50012345726 10-04-2023 ┆ cost atm 9682 50012345726 10-04-2023 │
└───────────────────────────────────────────────────────────────┴──────────────────────────────────────┘
Sign up to request clarification or add additional context in comments.

2 Comments

Short tests seemed good and I now have this running on the 330M row file. On a 20-core/128GB RAM, it uses 1 core 100%, 2-5 other cores at ~1-3%, steady ~78GB of RAM, and 7.8GB of 8GB swap. It's been running ~2.5 hours and I'll post run time when it completes. Any idea why my approach kept increasing memory consumption?
I've marked this as the answer b/c I think it's an excellent minitutorial of Polars expr use. However, on the latest run, after 11 hrs the job has not completed on a 330M row file so it really hasn't solved my problem. This run was preceded by three restarts each of which failed for different reasons before this one 'took'. The fails were: parquet read fail (I'm using scan_parquet), a seg fault (error 4), a RUST error (deleted before I could copy/investigate).
3

Let's see if I can help with the performance concerns. There may be a way to get your algorithm to run in a performance envelope that is acceptable for your system.

For the benchmarking below, I created the following dataset of over a billion records, along with some extra columns (to simulate data other than the string column that is being processed).

shape: (1_073_741_824, 6)
┌───────────────────────────────────┬────────┬───────┬─────┬─────────────────────┬─────────────────────┐
│ foo                               ┆ string ┆ float ┆ int ┆ datetime            ┆ date                │
│ ---                               ┆ ---    ┆ ---   ┆ --- ┆ ---                 ┆ ---                 │
│ str                               ┆ str    ┆ f32   ┆ i32 ┆ datetime[μs]        ┆ datetime[μs]        │
╞═══════════════════════════════════╪════════╪═══════╪═════╪═════════════════════╪═════════════════════╡
│ COOOPS.autom.SAPF124              ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ OSS REEE PAAA comp. BEEE  atm 60… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ ABCD 600000000397/7667896-6/REG.… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ OSS REFF pagopago cost. Becf  at… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ COOOPS.autom.SAPF124              ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ …                                 ┆ …      ┆ …     ┆ …   ┆ …                   ┆ …                   │
│ OSS REFF pagopago cost. Becf  at… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ COOOPS.autom.SAPF124              ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ OSS REEE PAAA comp. BEEE  atm 60… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ ABCD 600000000397/7667896-6/REG.… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
│ OSS REFF pagopago cost. Becf  at… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
└───────────────────────────────────┴────────┴───────┴─────┴─────────────────────┴─────────────────────┘

To simulate the situation you describe, I saved the dataframe to a parquet file with default compression. This parquet file is then used as the input below (input_filepath in the code below).

For reference, the above dataset consumes 77 GB of RAM when fully loaded in RAM (per Polars estimated_size method). I'm running these benchmarks on a 32-core Threadripper Pro with 504 GB of available RAM.

Using collect

Using the normal collect method with default options, I ran the following code copied from @Hericks excellent answer above:

from time import perf_counter


start = perf_counter()
word_list = ["cost", "atm"]

num_dec_expr = pl.element().str.count_matches(r"[0-9_\/]").cast(pl.Int32)
num_letter_expr = pl.element().str.count_matches(r"[A-Za-z]").cast(pl.Int32)
ratio_expr = (num_letter_expr - num_dec_expr) / (num_letter_expr + num_dec_expr)

(
    pl.scan_parquet(input_filepath)
    .with_columns(
        pl.col("foo")
        .str.to_lowercase()
        .str.replace_all(r"[^a-z0-9\s\_\-\%]", " ")
        .str.split(" ")
        .list.eval(
            pl.element().filter(
                pl.element().str.len_chars() > 0,
                pl.element().is_in(word_list)
                | num_letter_expr.eq(0)
                | ratio_expr.is_between(-0.7, 0),
            )
        )
        .list.join(" ")
        .alias("foo_clean")
    )
    .collect()
)

print("Elapsed time: ", perf_counter() - start)

The runtime performance was as you describe. The algorithm quickly reverts to single-threaded behavior, and allocates an inordinate amount of RAM (250 GB of RAM) per the top command. I killed the process after waiting for over 15 minutes.

Using collect(streaming=True, comm_subplan_elim=False)

By substituting the collect statement with collect(streaming=True, comm_subplan_elim=False), the above completes in a mere 167 seconds. And top shows that the algorithm pushes all 64 logical cores of my Threadripper Pro system to 100%.

However, the algorithm does continue to consume a large amount of RAM: 160 GB while running, just eyeballing the top command. Still, that certainly is better than the 250 GB of RAM when using collect with default options.

using sink_parquet

By substituting the sink_parquet method instead of the collect method and thus saving the results directly to disk, the algorithm ran in 459 seconds (about 7.5 minutes). And the RAM topped out at a mere 8GB while processing the file.

One thing I'd caution, the runtime behavior showed that it used all 64 logical cores of my system, but not to 100%. This could be due to an I/O bottleneck, but I doubt it. (My system has four Gen4 1TB NVME sticks in RAID 0 as a working project storage.) As such, I would suggest that using sink_parquet could take longer on a system where I/O bottlenecks are more pronounced.

Admittedly, streaming the results directly to disk may not be want you want. But it may be what you need to get past this step of your processing with acceptable run-time and memory footprints.

If any of this helps, well and good. And please continue to give credit for the accepted answer to @Hericks. His answer was spot-on regarding how to use Polars' native string-manipulation capabilities.

2 Comments

Very nice addition and performance analysis! Thanks :)
Excellent insight on using collect() and sink_parquet(). I achieved a processing time of ~22min, all cores active, with memory use ~40 GB on the 330M row file. A test with a 28M row file ran much longer and I ended up killing the process after an hour of processing. The difference appears to be in the complexity of the memo fields. I've edited the question to reflect these results.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.