Nested parallelisms

I’m trying to do some nested parallelism to assist with complicated computations for a SQL database.

my functions are setup like so to achieve this

import modin.pandas as pd
import multiprocessing
import pandas

PARENT_PARALLELISMS = 2
TOTAL_PARALLELISMS = int(multiprocessing.cpu_count() * 2)
CHILD_PARALLELISMS = max(TOTAL_PARALLELISMS // PARENT_PARALLELISMS, 1)


def some_complex_python(child_row, some_kwarg="foo"):
    # do some stuff
    return some_calculations

def apply_some_logic(row, uri):
    # do child parallelisms here
    NPartitions.put(CHILD_PARALLELISMS)
    # read the entire sql table name with pandas in a single process then pass to modin
    entire_table_df = pandas.read_sql(row.table_name, uri)
    entire_table_df = pd.DataFrame(entire_table_df)
    # run n calculations on the entire_table and concat
    calc_1_df = entire_table_df.apply(some_complex_python, some_kwarg="foo", axis=1)
    calc_2_df = entire_table_df.apply(some_complex_python, some_kwarg="bar", axis=1)
    # merge the results
    merged_dfs = pd.concat([calc_1_df , calc_2_df ], ignore_index=True)
    # do some sorting 
    merged_df.sort_values(by="some_column")
    # find the best result
    best_result_df = merged_df.groupby("some_other_column").first().reset_index()
    # populate the best results back to sql
    best_result_df.to_sql(f"{row.table_name}_calculated", uri, if_exists="replace", index=False)
    print(f"Finished {row.table_name}")


def main(uri):
    # tell modin how many workers we'll need total
    CpuCount.put(TOTAL_PARALLELISMS)
    NPartitions.put(PARENT_PARALLELISMS)
    # load some table names from a sql database
    table_names_df = pd.read_sql(some_sql_table_name_query, uri)
    # for each table name 
    table_names_df.apply(apply_some_logic, args=(uri), axis=1)
    return 

It looks like i’m getting some rows dropped out in my final calculated sql table.
Could my nested applys be causing drop outs due to memory references?

@zkyz

It’s a good question, let me follow up with a few clarifying questions:

  • What is the type of uri?
  • What sql database are you using?
  • Is any part defaulting to pandas?
  • Can you check that best_result_df is/isn’t dropping rows? (i.e. could the insert be the source)

It’s not immediately clear that the nested parallelism here would cause rows to be dropped, let’s figure it out!

Hey, sorry for the slow response.

  • uri here is a db connection string so that modin can use the workers.
  • postgres
  • these two parts are actually multiple columns so I believe they’re referring to default pandas
    # do some sorting 
    merged_df.sort_values(by=["some_column_1", "some_column_2"])
    # find the best result
    best_result_df = merged_df.groupby(["pkey1", "pkey2"]).first().reset_index()

^ the group by is based on not null primary keys so shouldn’t being any drops caused by NaNs.

The only other purely pandas method is my initial table copy: entire_table_df = pandas.read_sql(row.table_name, uri)
This is only to stop too many connections pulling from the database (and crashing it).
(pushing doesn’t seem to be as big an issue)

I did also notice if my PARENT_PARALLELISMS are too big (i.e. >4), it appears to have duplicated rows in my table_names_df.

I’ve resolve this now by:

  1. Uploading the entire n calculations merged_df to a new SQL table
  2. Using a SQL statement to update my main table with the best calculation

i.e.

SQL_STATEMENT = """
UPDATE main_table as to_update
SET best_result = (
    SELECT
    some_calculation
    FROM {calc_table} as from_update
    WHERE 
        to_update.pkey1 = from_update.pkey1 AND
        to_update.pkey2 = from_update.pkey2
    ORDER BY some_column_1, some_column_2
    LIMIT 1
)
"""

def apply_some_logic(row, uri):
    # do child parallelisms here
    NPartitions.put(CHILD_PARALLELISMS)
    # read the entire sql table name with pandas in a single process then pass to modin
    entire_table_df = pandas.read_sql(row.table_name, uri)
    entire_table_df = pd.DataFrame(entire_table_df)
    # run n calculations on the entire_table and concat
    calc_1_df = entire_table_df.apply(some_complex_python, some_kwarg="foo", axis=1)
    calc_2_df = entire_table_df.apply(some_complex_python, some_kwarg="bar", axis=1)
    # merge the results
    merged_dfs = pd.concat([calc_1_df , calc_2_df ], ignore_index=True)
    # do some sorting
    new_calc_table = f"{row.table_name}_all_calculations"
    merged_dfs.to_sql(new_calc_table , uri, if_exists="replace", index=False)
    # open an sqlalchemy connection to the db
    engine = create_engine(uri)
    with engine.connect() as connection:
         # dump results to table
         connection.execute(SQL_STATEMENT.format(calc_table=new_calc_table)
    print(f"Finished {row.table_name}")

This is likely faster in the long run anyway, I was hoping I could leverage modin’s multiple cores to speed up the sorting and grouping. I’ll have to reinvestigate this when modin support’s multi-column sorting/grouping. (understand that’s difficult due to the multi axis partitioning)

Is there anyway to flag the sql statement as an asynchronous action and move on?
Since the DB can finish the calculations python could theoretically move on.

I know there are sqlalchemy asynchronous engines that I could await but not sure how to put it all together with modin