Python read Cassandra data into pandas

2020-02-07 17:19发布

What is the proper and fastest way to read Cassandra data into pandas? Now I use the following code but it's very slow...

import pandas as pd

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory

auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
    auth_provider=auth_provider)

session = cluster.connect(CASSANDRA_DB)
session.row_factory = dict_factory

sql_query = "SELECT * FROM {}.{};".format(CASSANDRA_DB, CASSANDRA_TABLE)

df = pd.DataFrame()

for row in session.execute(sql_query):
    df = df.append(pd.DataFrame(row, index=[0]))

df = df.reset_index(drop=True).fillna(pd.np.nan)

Reading 1000 rows takes 1 minute, and I have a "bit more"... If I run the same query eg. in DBeaver, I get the whole results (~40k rows) within a minute.

Thank you!!!

4条回答
Luminary・发光体
2楼-- · 2020-02-07 17:38

I have been working on moving data from Cassandra to mssql, and used answers given here for the reference, I am able to move data but my source table in cassandra is huge and my query is getting timeout error from cassandra , the thing is we cannot increase the timeout and I am only left with the option of selecting rows in batches in my query, my code is also converting the cassandra collection data types to str as I want to insert those in mssql and then parse it, please let me know if anyone faces similar issue, the code I built is given below:

import sys
import pandas as pd
import petl as etl
import pyodbc
import sqlalchemy
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from sqlalchemy import *
from cassandra.query import SimpleStatement


def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)
    engine = sqlalchemy.create_engine('sql_server_connection string')

cluster = Cluster(
    contact_points=['cassandra_host'], 
    auth_provider = PlainTextAuthProvider(username='username', password='passwrd')
)

session = cluster.connect('keyspace',wait_for_all_pools=True)

session.row_factory = pandas_factory
request_timeout = 60000
query = "SELECT * FROM cassandratable"
statement = SimpleStatement(query, fetch_size=5000) 
rows = session.execute(statement)

df = rows._current_rows
df['attributes'] = df.attributes.astype(str)
df['attributesgenerated'] = df.attributesgenerated.astype(str)
df['components'] = df.components.astype(str)
df['distributioncenterinfo'] = df.distributioncenterinfo.astype(str)
df['images'] = df.images.astype(str)
df['itemcustomerzonezoneproductids'] = 
df.itemcustomerzonezoneproductids.astype(str)
df['itempodconfigids'] = df.itempodconfigids.astype(str)
df['keywords'] = df.keywords.astype(str)
df['validationmessages'] = df.validationmessages.astype(str)
df['zones'] = df.zones.astype(str)
#error_bad_lines=False
#print(df)
df.to_sql(
           name='mssql_table_name',
           con=engine,
           index=False,
           if_exists='append',
           chunksize=1
         )
查看更多
ゆ 、 Hurt°
3楼-- · 2020-02-07 17:41

I got the answer at the official mailing list (it works perfectly):

Hi,

try to define your own pandas row factory:

def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)

session.row_factory = pandas_factory
session.default_fetch_size = None

query = "SELECT ..."
rslt = session.execute(query, timeout=None)
df = rslt._current_rows

That's the way i do it - an it should be faster...

If you find a faster method - i'm interested in :)

Michael

查看更多
家丑人穷心不美
4楼-- · 2020-02-07 17:42

Fastest way to read Cassandra data into pandas with automatic iteration of pages. Create dictionary and add each to it by automatically iterating all pages. Then, create dataframe with this dictionary.

import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory

auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
    auth_provider=auth_provider)

session = cluster.connect(CASSANDRA_DB)
session.row_factory = dict_factory

sql_query = "SELECT * FROM {}.{};".format(CASSANDRA_DB, CASSANDRA_TABLE)

dictionary ={"column1":[],"column2":[]}

for row in session.execute(sql_query):
    dictionary["column1"].append(row.column1)
    dictionary["column1"].append(row.column1)

df = pd.DataFrame(dictionary)
查看更多
我命由我不由天
5楼-- · 2020-02-07 17:49

What I do (in python 3) is :

query = "SELECT ..."
df = pd.DataFrame(list(session.execute(query)))
查看更多
登录 后发表回答