Pyspark find entire array in master array and repl

2019-06-13 20:25发布

问题:

I have a pandas implementation of this question here. I want to implement this using pyspark for spark environment.

I have 2 csv files. first csv has keyword and corresponding lookipid column. I converted this into 2 lists in pure python.

keyword = ['IT Manager', 'Sales Manager', 'IT Analyst', 'Store Manager']
lookupid = ['##10##','##13##','##12##','##13##']

Second csv file has a title column with sample data below

current_title
I have been working here as a store manager since after I passed from college
I am sales manager and primarily work in the ASEAN region. My primary rolw is to bring new customers.
I initially joined as a IT analyst and because of my sheer drive and dedication, I was promoted to IT manager position within 3 years

I want to do find and replace using regular expression as well and return below output

current_title
I have been working here as a ##13## since after I passed from college
I am ##13## and primarily work in the ASEAN region. My primary rolw is to bring new customers.
I initially joined as a ##12## and because of my sheer drive and dedication, I was promoted to ##10## position within 3 years

How to do this using pyspark? Please suggest

回答1:

Here's a way to do this using pyspark.sql.functions.regexp_replace() and a simple loop:

First, create a sample dataset:

data = [
    ("I have been working here as a store manager.",),
    ("I am sales manager.",),
    ("I joined as an IT analyst and was promoted to IT manager.",)
]

df = sqlCtx.createDataFrame(data, ["current_title"])
df.show(truncate=False)
#+---------------------------------------------------------+
#|current_title                                            |
#+---------------------------------------------------------+
#|I have been working here as a store manager.             |
#|I am sales manager.                                      |
#|I joined as an IT analyst and was promoted to IT manager.|
#+---------------------------------------------------------+

Now apply the each replacement:

import pyspark.sql.functions as f

keyword = ['IT Manager', 'Sales Manager', 'IT Analyst', 'Store Manager']
lookupid = ['##10##','##13##','##12##','##13##']

for k, replacement in zip(keyword, lookupid):
    pattern = r'\b(?i)' + k + r'\b'
    df = df.withColumn(
        'current_title',
        f.regexp_replace(f.col('current_title'), pattern, replacement)
    )

Don't worry about the loops here as spark is lazy. If you look at the execution plan, you will see that it's smart enough to chain these operations so they all happen in one pass through the data:

df.explain()  

== Physical Plan == *Project [regexp_replace(regexp_replace(regexp_replace(regexp_replace(current_title#737, \b(?i)IT Manager\b, ##10##), \b(?i)Sales Manager\b, ##13##), \b(?i)IT Analyst\b, ##12##), \b(?i)Store Manager\b, ##13##) AS current_title#752] +- Scan ExistingRDD[current_title#737]

Finally, the output:

df.show(truncate=False)
#+-------------------------------------------------+
#|current_title                                    |
#+-------------------------------------------------+
#|I have been working here as a ##13##.            |
#|I am ##13##.                                     |
#|I joined as an ##12## and was promoted to ##10##.|
#+-------------------------------------------------+