Remove blank space from data frame column values i

2019-04-14 01:04发布

问题:

I have a data frame (business_df) of schema:

|-- business_id: string (nullable = true)
|-- categories: array (nullable = true)
|    |-- element: string (containsNull = true)
|-- city: string (nullable = true)
|-- full_address: string (nullable = true)
|-- hours: struct (nullable = true)
|-- name: string (nullable = true)

I want to make a new data frame (new_df) so that the values in the 'name' column do not contain any blank spaces.

My code is:

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

udf = UserDefinedFunction(lambda x: x.replace(' ', ''), StringType())
new_df = business_df.select(*[udf(column).alias(name) if column == name else column for column in business_df.columns])
new_df.registerTempTable("vegas")
new_df.printSchema()
vegas_business = sqlContext.sql("SELECT stars, name from vegas limit 10").collect()

I keep receiving this error:

NameError: global name 'replace' is not defined

What's wrong with this code?

回答1:

While the problem you've described is not reproducible with provided code, using Python UDFs to handle simple tasks like this, is rather inefficient. If you want to simply remove spaces from the text use regexp_replace:

from pyspark.sql.functions import regexp_replace, col

df = sc.parallelize([
    (1, "foo bar"), (2, "foobar "), (3, "   ")
]).toDF(["k", "v"])

df.select(regexp_replace(col("v"), " ", ""))

If you want to normalize empty lines use trim:

from pyspark.sql.functions import trim

df.select(trim(col("v")))

If you want to keep leading / trailing spaces you can adjust regexp_replace:

df.select(regexp_replace(col("v"), "^\s+$", ""))


回答2:

As @zero323 said, it's probably that you overlapped the replace function somewhere. I tested your code and it works perfectly.

from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import HiveContext
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

df = sqlContext.createDataFrame([("aaa 111",), ("bbb 222",), ("ccc 333",)], ["names"])
spaceDeleteUDF = udf(lambda s: s.replace(" ", ""), StringType())
df.withColumn("names", spaceDeleteUDF("names")).show()

#+------+
#| names|
#+------+
#|aaa111|
#|bbb222|
#|ccc333|
#+------+


回答3:

Here's a function that removes all whitespace in a string:

import pyspark.sql.functions as F

def remove_all_whitespace(col):
    return F.regexp_replace(col, "\\s+", "")

You can use the function like this:

actual_df = source_df.withColumn(
    "words_without_whitespace",
    quinn.remove_all_whitespace(col("words"))
)

The remove_all_whitespace function is defined in the quinn library. quinn also defines single_space and anti_trim methods to manage whitespace. PySpark defines ltrim, rtrim, and trim methods to manage whitespace.



回答4:

I think that solution using regexp_replace too slow even for few data! So I've tried to find another way and I think I found it!

Not beaultiful, little naive, but it's fast! What do you think?

def normalizeSpace(df,colName):

  # Left and right trim
  df = df.withColumn(colName,ltrim(df[colName]))
  df = df.withColumn(colName,rtrim(df[colName]))

  #This is faster than regexp_replace function!
  def normalize(row,colName):
      data = row.asDict()
      text = data[colName]
      spaceCount = 0;
      Words = []
      word = ''

      for char in text:
          if char != ' ':
              word += char
          elif word == '' and char == ' ':
              continue
          else:
              Words.append(word)
              word = ''

      if len(Words) > 0:
          data[colName] = ' '.join(Words)

      return Row(**data)

      df = df.rdd.map(lambda row:
                     normalize(row,colName)
                 ).toDF()
      return df
schema = StructType([StructField('name',StringType())])
rows = [Row(name='  dvd player samsung   hdmi hdmi 160W reais    de potencia 
bivolt   ')]
df = spark.createDataFrame(rows, schema)
df = normalizeSpace(df,'name')
df.show(df.count(),False)

That prints

+---------------------------------------------------+
|name                                               |
+---------------------------------------------------+
|dvd player samsung hdmi hdmi 160W reais de potencia|
+---------------------------------------------------+