Pyspark - Transfer control out of Spark Session (s

2019-09-06 19:12发布

问题:

This is a follow up question on

Pyspark filter operation on Dstream

To keep a count of how many error messages/warning messages has come through for say a day, hour - how does one design the job.

What I have tried:

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext


    def counts():
            counter += 1
            print(counter.value)

    if __name__ == "__main__":

            if len(sys.argv) != 3:
                    print("Usage: network_wordcount.py <hostname> <port>", file=sys.stderr)
            exit(-1)


            sc = SparkContext(appName="PythonStreamingNetworkWordCount")
            ssc = StreamingContext(sc, 5)
            counter = sc.accumulator(0)

            lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
            errors = lines.filter(lambda l: "error" in l.lower())
            errors.foreachRDD(lambda e : e.foreach(counts))
            errors.pprint()

            ssc.start()
            ssc.awaitTermination()

this however has multiple issues, to start with print doesn't work (does not output to stdout, I have read about it, the best I can use here is logging). Can I save the output of that function to a text file and tail that file instead?

I am not sure why the program just comes out, there is no error/dump anywhere to look further into (spark 1.6.2)

How does one preserve state? What I am trying is to aggregate logs by server and severity, another use case is to count how many transactions were processed by looking for certain keywords

Pseudo Code for what I want to try:

foreachRDD(Dstream):
     if RDD.contains("keyword1 | keyword2 | keyword3"):
     dictionary[keyword] = dictionary.get(keyword,0) + 1 //add the keyword if not present and increase the counter
     print dictionary //or send this dictionary to else where

The last part of sending or printing dictionary requires switching out of spark streaming context - Can someone explain the concept please?

回答1:

print doesn't work

I would recommend reading the design patterns section of the Spark documentation. I think that roughly what you want is something like this:

def _process(iter):
    for item in iter:
        print item

lines = ssc.socketTextStream(sys.argv[1], int(sys.argv[2]))
errors = lines.filter(lambda l: "error" in l.lower())
errors.foreachRDD(lambda e : e.foreachPartition(_process))

This will get your call print to work (though it is worth noting that the print statement will execute on the workers and not the drivers, so if you're running this code on a cluster you will only see it on the worker logs).

However, it won't solve your second problem:

How does one preserve state?

For this, take a look at updateStateByKey and the related example.