I have a pandas implementation of this question here. I want to implement this using pyspark
for spark environment.
I have 2 csv
files. first csv
has keyword
and corresponding lookipid
column. I converted this into 2 lists in pure python.
keyword = ['IT Manager', 'Sales Manager', 'IT Analyst', 'Store Manager']
lookupid = ['##10##','##13##','##12##','##13##']
Second csv
file has a title
column with sample data below
current_title
I have been working here as a store manager since after I passed from college
I am sales manager and primarily work in the ASEAN region. My primary rolw is to bring new customers.
I initially joined as a IT analyst and because of my sheer drive and dedication, I was promoted to IT manager position within 3 years
I want to do find and replace using regular expression
as well and return below output
current_title
I have been working here as a ##13## since after I passed from college
I am ##13## and primarily work in the ASEAN region. My primary rolw is to bring new customers.
I initially joined as a ##12## and because of my sheer drive and dedication, I was promoted to ##10## position within 3 years
How to do this using pyspark? Please suggest
Here's a way to do this using pyspark.sql.functions.regexp_replace()
and a simple loop:
First, create a sample dataset:
data = [
("I have been working here as a store manager.",),
("I am sales manager.",),
("I joined as an IT analyst and was promoted to IT manager.",)
]
df = sqlCtx.createDataFrame(data, ["current_title"])
df.show(truncate=False)
#+---------------------------------------------------------+
#|current_title |
#+---------------------------------------------------------+
#|I have been working here as a store manager. |
#|I am sales manager. |
#|I joined as an IT analyst and was promoted to IT manager.|
#+---------------------------------------------------------+
Now apply the each replacement:
import pyspark.sql.functions as f
keyword = ['IT Manager', 'Sales Manager', 'IT Analyst', 'Store Manager']
lookupid = ['##10##','##13##','##12##','##13##']
for k, replacement in zip(keyword, lookupid):
pattern = r'\b(?i)' + k + r'\b'
df = df.withColumn(
'current_title',
f.regexp_replace(f.col('current_title'), pattern, replacement)
)
Don't worry about the loops here as spark is lazy. If you look at the execution plan, you will see that it's smart enough to chain these operations so they all happen in one pass through the data:
df.explain()
== Physical Plan ==
*Project [regexp_replace(regexp_replace(regexp_replace(regexp_replace(current_title#737,
\b(?i)IT Manager\b, ##10##), \b(?i)Sales Manager\b, ##13##), \b(?i)IT
Analyst\b, ##12##), \b(?i)Store Manager\b, ##13##) AS
current_title#752]
+- Scan ExistingRDD[current_title#737]
Finally, the output:
df.show(truncate=False)
#+-------------------------------------------------+
#|current_title |
#+-------------------------------------------------+
#|I have been working here as a ##13##. |
#|I am ##13##. |
#|I joined as an ##12## and was promoted to ##10##.|
#+-------------------------------------------------+