UDF cause warning: CachedKafkaConsumer is not runn

2019-02-21 20:34发布

In an usual structured_kafka_wordcount.py code,

When I split lines into words by udf like below,

my_split = udf(lambda x: x.split(' '), ArrayType(StringType()))

words = lines.select(
    explode(
        my_split(lines.value)
    )
)

the warning will keep showing:

WARN CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread. It may hang when CachedKafkaConsumer's methods are interrupted because of KAFKA-1894

On the other hand, when I split the lines into words by pyspark.sql.functions.split, everything works well.

words = lines.select(
    explode(
        split(lines.value, ' ') 
    ) 
)

Why this happened and how to fix the warning?

This is the code I am trying to execute in practice:

pattern = "(.+) message repeated (\\d) times: \\[ (.+)\\]"
prog = re.compile(pattern)


def _unfold(x):
    ret = []
    result = prog.match(x)
    if result:
        log = " ".join((result.group(1), result.group(3)))
        times = result.group(2)
        for _ in range(int(times)):
            ret.append(log)
    else:
        ret.append(x)

    return ret

_udf = udf(lambda x: _unfold(x), ArrayType(StringType()))
lines = lines.withColumn('value', explode(_udf(lines['value'])))

1条回答
闹够了就滚
2楼-- · 2019-02-21 20:47

Other than rejecting Python UDFs *, there is nothing you can do about this problem in you code. As you can read in the exception message UninterruptibleThread is a workaround to Kafka bug (KAFKA-1894) and is designed to prevent infinite loop, when interrupting KafkaConsumer.

It is not used with PythonUDFRunner (it probably wouldn't makes sense, to introduce special case there).

Personally I wouldn't worry about it unless you experience some related issues. Your Python code will never interact directly with KafkaConsumer. And if you experience any issues, there should fixed upstream - in that case I recommend creating a JIRA ticket.


* Your unfold function can be rewritten with SQL functions, but it will be a hack. Add message count as integer:

from pyspark.sql.functions import concat_ws, col, expr, coalesce, lit, regexp_extract, when

p = "(.+) message repeated (\\d) times: \\[ (.+)\\]"

lines = spark.createDataFrame(
    ["asd message repeated 3 times: [ 12]", "some other message"], "string"
)

lines_with_count = lines.withColumn(
   "message_count", coalesce(regexp_extract("value", p, 2).cast("int"), lit(1)))

Use it to explode

exploded = lines_with_count.withColumn(
     "i", 
      expr("explode(split(repeat('1', message_count - 1),''))")
).drop("message_count", "i")

and extract:

exploded.withColumn(
    "value",
    when(
        col("value").rlike(p),
         concat_ws(" ", regexp_extract("value", p, 1), regexp_extract("value", p, 3))
    ).otherwise(col("value"))).show(4, False)


# +------------------+
# |value             |
# +------------------+
# |asd 12            |
# |asd 12            |
# |asd 12            |
# |some other message|
# +------------------+
查看更多
登录 后发表回答