I've got a lousy HTTPD access_log and just want to skip the "lousy" lines.
In scala this is straightforward:
import scala.util.Try
val log = sc.textFile("access_log")
log.map(_.split(' ')).map(a => Try(a(8))).filter(_.isSuccess).map(_.get).map(code => (code,1)).reduceByKey(_ + _).collect()
For python I've got the following solution by explicitly defining a function in contrast using the "lambda" notation:
log = sc.textFile("access_log")
def wrapException(a):
try:
return a[8]
except:
return 'error'
log.map(lambda s : s.split(' ')).map(wrapException).filter(lambda s : s!='error').map(lambda code : (code,1)).reduceByKey(lambda acu,value : acu + value).collect()
Is there a better way doing this (e.g. like in Scala) in pyspark?
Thanks a lot!
Better is a subjective term but there are a few approaches you can try.
The simplest thing you can do in this particular case is to avoid exceptions whatsoever. All you need is a flatMap
and some slicing:
log.flatMap(lambda s : s.split(' ')[8:9])
As you can see it means no need for an exception handling or subsequent filter
.
Previous idea can be extended with a simple wrapper
def seq_try(f, *args, **kwargs):
try:
return [f(*args, **kwargs)]
except:
return []
and example usage
from operator import div # FYI operator provides getitem as well.
rdd = sc.parallelize([1, 2, 0, 3, 0, 5, "foo"])
rdd.flatMap(lambda x: seq_try(div, 1., x)).collect()
## [1.0, 0.5, 0.3333333333333333, 0.2]
finally more OO approach:
import inspect as _inspect
class _Try(object): pass
class Failure(_Try):
def __init__(self, e):
if Exception not in _inspect.getmro(e.__class__):
msg = "Invalid type for Failure: {0}"
raise TypeError(msg.format(e.__class__))
self._e = e
self.isSuccess = False
self.isFailure = True
def get(self): raise self._e
def __repr__(self):
return "Failure({0})".format(repr(self._e))
class Success(_Try):
def __init__(self, v):
self._v = v
self.isSuccess = True
self.isFailure = False
def get(self): return self._v
def __repr__(self):
return "Success({0})".format(repr(self._v))
def Try(f, *args, **kwargs):
try:
return Success(f(*args, **kwargs))
except Exception as e:
return Failure(e)
and example usage:
tries = rdd.map(lambda x: Try(div, 1.0, x))
tries.collect()
## [Success(1.0),
## Success(0.5),
## Failure(ZeroDivisionError('float division by zero',)),
## Success(0.3333333333333333),
## Failure(ZeroDivisionError('float division by zero',)),
## Success(0.2),
## Failure(TypeError("unsupported operand type(s) for /: 'float' and 'str'",))]
tries.filter(lambda x: x.isSuccess).map(lambda x: x.get()).collect()
## [1.0, 0.5, 0.3333333333333333, 0.2]
You can even use pattern matching with multipledispatch
from multipledispatch import dispatch
from operator import getitem
@dispatch(Success)
def check(x): return "Another great success"
@dispatch(Failure)
def check(x): return "What a failure"
a_list = [1, 2, 3]
check(Try(getitem, a_list, 1))
## 'Another great success'
check(Try(getitem, a_list, 10))
## 'What a failure'
If you like this approach I've pushed a little bit more complete implementation to GitHub and pypi.
First, let me generate some random data to start working with.
import random
number_of_rows = int(1e6)
line_error = "error line"
text = []
for i in range(number_of_rows):
choice = random.choice([1,2,3,4])
if choice == 1:
line = line_error
elif choice == 2:
line = "1 2 3 4 5 6 7 8 9_1"
elif choice == 3:
line = "1 2 3 4 5 6 7 8 9_2"
elif choice == 4:
line = "1 2 3 4 5 6 7 8 9_3"
text.append(line)
Now I have a string text
looks like
1 2 3 4 5 6 7 8 9_2
error line
1 2 3 4 5 6 7 8 9_3
1 2 3 4 5 6 7 8 9_2
1 2 3 4 5 6 7 8 9_3
1 2 3 4 5 6 7 8 9_1
error line
1 2 3 4 5 6 7 8 9_2
....
Your solution:
def wrapException(a):
try:
return a[8]
except:
return 'error'
log.map(lambda s : s.split(' ')).map(wrapException).filter(lambda s : s!='error').map(lambda code : (code,1)).reduceByKey(lambda acu,value : acu + value).collect()
#[('9_3', 250885), ('9_1', 249307), ('9_2', 249772)]
Here is my solution:
from operator import add
def myfunction(l):
try:
return (l.split(' ')[8],1)
except:
return ('MYERROR', 1)
log.map(myfunction).reduceByKey(add).collect()
#[('9_3', 250885), ('9_1', 249307), ('MYERROR', 250036), ('9_2', 249772)]
Comment:
(1) I highly recommend also calculating the lines with "error" because it won't add too much overhead, and also can be used for sanity check, for example, all the counts should add up to the total number of rows in the log, if you filter out those lines, you have no idea those are truly bad lines or something went wrong in your coding logic.
(2) I will try to package all the line level operations in one function to avoid chaining of map
, filter
functions, so it is more readable.
(3) From performance perspective, I generated a sample of 1M records and my code finished in 3 seconds and yours in 2 seconds, it is not a fair comparasion since the data is so small and my cluster is pretty beefy, I would recommend you generate a bigger file (1e12?) and do a benchmark on yours.