Pyspark filter operation on Dstream

2019-09-15 03:01发布

I have been trying to extend the network word count to be able to filter lines based on certain keyword

I am using spark 1.6.2

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
        exit(-1)
    sc = SparkContext(appName="PythonStreamingNetworkWordCount")
    ssc = StreamingContext(sc, 5)

    lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
    counts = lines.flatMap(lambda line: line.split(" ")).filter("ERROR")
    counts.pprint()

    ssc.start()
    ssc.awaitTermination()

I have tried all the variations,

I almost always get the error I cannot apply functions like

pprint/show/take/collect on TransformedDStream

. I used transform with foreachRDD on lines Dstream with a function to check using native python string methods, that fails too (actually if I use print anywhere in the program, spark-submit just comes out - there are no errors reported.

What I want to is to be able to filter the incoming Dstreams on a keyword like "ERROR" | "WARNING" etc and output it to stdout or stderr.

1条回答
孤傲高冷的网名
2楼-- · 2019-09-15 03:06

What I want to is to be able to filter the incoming Dstreams on a keyword like "ERROR" | "WARNING" etc and output it to stdout or stderr.

Then you don't want to call flatMap, as this will split your lines up into individual tokens. Instead, you can replace that call with a call to filter that checks whether the line contains "error":

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
errors = lines.filter(lambda l: "error" in l.lower())
errors.pprint()
查看更多
登录 后发表回答