Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pandas to Postgres -> My own method is a lot faster than odo (or I made a mistake) #614

Open
SandervandenOord opened this issue May 2, 2018 · 9 comments

Comments

@SandervandenOord
Copy link

I have a pandas dataframe of 8 million rows. I found a fast method on stackoverflow to write this data to Postgres: takes less than 2 minutes.
Today I found odo and was hoping it would be faster, but it wasn't.
In fact my guess is it would take 30 minutes or so with odo, I stopped after 7 minutes.

Here's the code for faster writing of dataframe to Postgres, hope it helps you guys:

from cStringIO import StringIO

def df_to_database(engine, df, schema, table_name, if_exists='replace', sep='\x01', encoding='utf-8'):
    # Create Table
    df[:0].to_sql(table_name, engine, if_exists=if_exists, index=False, schema=schema)

    # Prepare data
    output = StringIO()
    df.to_csv(output, sep=sep, header=False, encoding=encoding, index=False)
    output.seek(0)

    # Insert data
    connection = engine.raw_connection()
    cursor = connection.cursor()
    schema_tablename = '{}.{}'.format(schema, table_name)
    cursor.copy_from(output, schema_tablename, sep=sep, null='')
    connection.commit()
    cursor.close()
@rpanai
Copy link

rpanai commented Oct 9, 2018

I was not able to use odo and I'm wondering if this project is still maintained. Maybe @mrocklin can tell us something about. Anyway I modified your solution to work in python 3 too.

from io import StringIO
def sendToPG(df, tableName, con):
    output = StringIO()
    df.to_csv(output, sep='\t', header=False, index=False)
    output.getvalue()
    output.seek(0)
    raw = con.raw_connection()
    curs = raw.cursor()
    # null values become ''
    columns = df.columns
    curs.copy_from(output, tableName, null="", columns=(columns))
    curs.connection.commit()
    curs.close()

@d6tdev
Copy link

d6tdev commented Oct 14, 2018

In case you are looking for an alternative, you might want to consider d6tstack. You can process CSV and then export to csv, parquet or SQL without having to write custom functions. You can load multiple files and it deals with data schema changes (added/removed columns). Chunked out of core support is already built in. It benchmarks well vs df.to_sql().

def apply(dfg):
    # do stuff
    return dfg

c = d6tstack.combine_csv.CombinerCSV([bigfile.csv], apply_after_read=apply, sep=',')

# or
c = d6tstack.combine_csv.CombinerCSV(glob.glob('*.csv'), apply_after_read=apply)

# output to various formats, automatically chunked to reduce memory consumption
c.to_csv_combine(filename='out.csv')
c.to_parquet_combine(filename='out.pq')
c.to_psql_combine('postgresql+psycopg2://usr:pwd@localhost/db', 'tablename') # fast for postgres
c.to_mysql_combine('mysql+mysqlconnector://usr:pwd@localhost/db', 'tablename') # fast for mysql
c.to_sql_combine('postgresql+psycopg2://usr:pwd@localhost/db', 'tablename')	# slow but flexible

@camesine
Copy link

I have a pandas dataframe of 8 million rows. I found a fast method on stackoverflow to write this data to Postgres: takes less than 2 minutes.
Today I found odo and was hoping it would be faster, but it wasn't.
In fact my guess is it would take 30 minutes or so with odo, I stopped after 7 minutes.

Here's the code for faster writing of dataframe to Postgres, hope it helps you guys:

from cStringIO import StringIO

def df_to_database(engine, df, schema, table_name, if_exists='replace', sep='\x01', encoding='utf-8'):
    # Create Table
    df[:0].to_sql(table_name, engine, if_exists=if_exists, index=False, schema=schema)

    # Prepare data
    output = StringIO()
    df.to_csv(output, sep=sep, header=False, encoding=encoding, index=False)
    output.seek(0)

    # Insert data
    connection = engine.raw_connection()
    cursor = connection.cursor()
    schema_tablename = '{}.{}'.format(schema, table_name)
    cursor.copy_from(output, schema_tablename, sep=sep, null='')
    connection.commit()
    cursor.close()

Thanks!!!

@PhilippeH1967
Copy link

Hi could you help me to understand what you will add in schema ?

@wolfc86
Copy link

wolfc86 commented May 19, 2020

@SandervandenOord thanks for sharing this technique! I found it really valuable for a project I'm working on.

Some of my schema names needed to be quoted in the SQL statement (long story 🙄) so I had to change this line:

schema_tablename = '{}.{}'.format(schema, table_name)

to this:

schema_tablename = '"{}"."{}"'.format(schema, table_name)

I also wanted to use ',' as my separator, but found my routine would error when a CSV row would have a cell whose value also contained a comma, even if the value was enclosed in quotation marks. It seems cursor.copy_from doesn't expose a way of handling quoted values.

Here's a minor revision that uses cursor.copy_expert instead:

def df_to_database(engine, df, schema, table_name, if_exists='replace', encoding='utf-8', index=False):
    # Create Table
    df[:0].to_sql(table_name, engine, if_exists=if_exists, index=index, schema=schema)

    # Prepare data
    output = StringIO()
    df.to_csv(output, sep=',', header=False, encoding=encoding, index=index)
    output.seek(0)

    # Insert data
    connection = engine.raw_connection()
    cursor = connection.cursor()

    schema_tablename = '"{}"."{}"'.format(schema, table_name)
    cursor.copy_expert("COPY " + schema_tablename + " FROM STDIN WITH (FORMAT CSV)""", output)

    connection.commit()
    cursor.close()

Just sharing in case other people run into the same.

Thank you again for the great technique!

@mairanteodoro
Copy link

mairanteodoro commented Dec 11, 2020

@wolfc86 You forgot to add index as a function parameter. The first line of your code should be:

def df_to_database(engine, df, schema, table_name, if_exists='replace', encoding='utf-8', index=False):
    [...]

@wolfc86
Copy link

wolfc86 commented Dec 11, 2020

Ah, right you are @mairanteodoro! I updated the snippet in my comment. Thanks!

@Chithra1206
Copy link

Chithra1206 commented Oct 13, 2021

@SandervandenOord Thank you for this amazing technique! I had to push over 4 GB of data(mostly text fields - conversations) to Postgres and this made my task much simpler. It took 30 minutes (actual wall time - 30min 35s) to run.

I modified @wolfc86 code to add chunksize option in df.to_csv(...).

Then I split the dataframe into chunks, created a header outside the method, and pushed the data into the db in small batches(instead of bulk insert). Took 8 minutes to execute.

Revised snippet:

def df_to_database(engine, df, schema, table_name, if_exists='append', encoding='utf-8', index=False):
    # Prepare data
    output = StringIO()
    df.to_csv(output, sep=',', header=False, encoding=encoding, index=index, chunksize = 5000)
    output.seek(0)

    # Insert data
    connection = engine.raw_connection()
    cursor = connection.cursor()

    schema_tablename = '"{}"."{}"'.format(schema, table_name)
    cursor.copy_expert("COPY " + schema_tablename + " FROM STDIN WITH (FORMAT CSV)""", output)

    connection.commit()
    cursor.close()

# Add header
df[:0].to_sql(table_name, engine, schema , index=False, if_exists= 'replace')

# Split the dataframe into a list of dataframes
n = int(round(1/10 * df.shape[0], -2))
split_df = [df[i:i+n] for i in range(0, df.shape[0], n)]

# Call df_to_database() on chunks
for chunk in range(len(split_df)):
    df_to_database(engine, split_df[chunk], schema, table_name)

Thanks again!

@PandaWhoCodes
Copy link

@SandervandenOord Thank you for this amazing technique! I had to push over 4 GB of data(mostly text fields - conversations) to Postgres and this made my task much simpler. It took 30 minutes (actual wall time - 30min 35s) to run.

I modified @wolfc86 code to add chunksize option in df.to_csv(...).

Then I split the dataframe into chunks, created a header outside the method, and pushed the data into the db in small batches(instead of bulk insert). Took 8 minutes to execute.

Revised snippet:

def df_to_database(engine, df, schema, table_name, if_exists='append', encoding='utf-8', index=False):
    # Prepare data
    output = StringIO()
    df.to_csv(output, sep=',', header=False, encoding=encoding, index=index, chunksize = 5000)
    output.seek(0)

    # Insert data
    connection = engine.raw_connection()
    cursor = connection.cursor()

    schema_tablename = '"{}"."{}"'.format(schema, table_name)
    cursor.copy_expert("COPY " + schema_tablename + " FROM STDIN WITH (FORMAT CSV)""", output)

    connection.commit()
    cursor.close()

# Add header
df[:0].to_sql(table_name, engine, schema , index=False, if_exists= 'replace')

# Split the dataframe into a list of dataframes
n = int(round(1/10 * df.shape[0], -2))
split_df = [df[i:i+n] for i in range(0, df.shape[0], n)]

# Call df_to_database() on chunks
for chunk in range(len(split_df)):
    df_to_database(engine, split_df[chunk], schema, table_name)

Thanks again!

Works like a charm. Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants