spark.ml StringIndexer throws 'Unseen label

2019-01-09 12:55发布

问题:

I'm preparing a toy spark.ml example. Spark version 1.6.0, running on top of Oracle JDK version 1.8.0_65, pyspark, ipython notebook.

First, it hardly has anything to do with Spark, ML, StringIndexer: handling unseen labels. The exception is thrown while fitting a pipeline to a dataset, not transforming it. And suppressing the exception might not be a solution here, since, I'm afraid, the dataset gets messed pretty bad in this case.

My dataset is about 800Mb uncompressed, so it might be hard to reproduce (smaller subsets seem to dodge this issue).

The dataset looks like this:

+--------------------+-----------+-----+-------+-----+--------------------+
|                 url|         ip|   rs|   lang|label|                 txt|
+--------------------+-----------+-----+-------+-----+--------------------+
|http://3d-detmold...|217.160.215|378.0|     de|  0.0|homwillkommskip c...|
|   http://3davto.ru/| 188.225.16|891.0|     id|  1.0|оформить заказ пе...|
| http://404.szm.com/|  85.248.42| 58.0|     cs|  0.0|kliknite tu alebo...|
|  http://404.xls.hu/| 212.52.166|168.0|     hu|  0.0|honlapkészítés404...|
|http://a--m--a--t...|    66.6.43|462.0|     en|  0.0|back top archiv r...|
|http://a-wrf.ru/c...|  78.108.80|126.0|unknown|  1.0|                    |
|http://a-wrf.ru/s...|  78.108.80|214.0|     ru|  1.0|установк фаркопна...|
+--------------------+-----------+-----+-------+-----+--------------------+

The value being predicted is label. The whole pipeline applied to it:

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, Tokenizer, HashingTF
from pyspark.ml.classification import LogisticRegression

train, test = munge(src_dataframe).randomSplit([70., 30.], seed=12345)

pipe_stages = [
    StringIndexer(inputCol='lang', outputCol='lang_idx'),
    OneHotEncoder(inputCol='lang_idx', outputCol='lang_onehot'),
    Tokenizer(inputCol='ip', outputCol='ip_tokens'),
    HashingTF(numFeatures=2**10, inputCol='ip_tokens', outputCol='ip_vector'),
    Tokenizer(inputCol='txt', outputCol='txt_tokens'),
    HashingTF(numFeatures=2**18, inputCol='txt_tokens', outputCol='txt_vector'),
    VectorAssembler(inputCols=['lang_onehot', 'ip_vector', 'txt_vector'], outputCol='features'),
    LogisticRegression(labelCol='label', featuresCol='features')
]

pipe = Pipeline(stages=pipe_stages)
pipemodel = pipe.fit(train)

And here is the stacktrace:

Py4JJavaError: An error occurred while calling o10793.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 627.0 failed 1 times, most recent failure: Lost task 18.0 in stage 627.0 (TID 23259, localhost): org.apache.spark.SparkException: Unseen label: pl-PL.
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157)
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282)
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1136)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1113)
    at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:271)
    at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:159)
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:71)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Unseen label: pl-PL.
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157)
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282)
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

The most interesting line is:

org.apache.spark.SparkException: Unseen label: pl-PL.

No idea, how pl-PL which is a value from lang column could have gotten mixed up in the label column, which is a float, not string edited: some hasty coclusions, corrected thanks to @zero323

I've looked further into it and found, that pl-PL is a value from the testing part of the dataset, not training. So now I don't even know where to look for the culprit: it might easily be the randomSplit code, not StringIndexer, and who knows what else.

How do I investigate this?

回答1:

Unseen label is a generic message which doesn't correspond to a specific column. Most likely problem is with a following stage:

StringIndexer(inputCol='lang', outputCol='lang_idx')

with pl-PL present in train("lang") and not present in test("lang").

You can correct it using setHandleInvalid with skip:

from pyspark.ml.feature import StringIndexer

train = sc.parallelize([(1, "foo"), (2, "bar")]).toDF(["k", "v"])
test = sc.parallelize([(3, "foo"), (4, "foobar")]).toDF(["k", "v"])

indexer = StringIndexer(inputCol="v", outputCol="vi")
indexer.fit(train).transform(test).show()

## Py4JJavaError: An error occurred while calling o112.showString.
## : org.apache.spark.SparkException: Job aborted due to stage failure: 
##   ...
##   org.apache.spark.SparkException: Unseen label: foobar.

indexer.setHandleInvalid("skip").fit(train).transform(test).show()

## +---+---+---+
## |  k|  v| vi|
## +---+---+---+
## |  3|foo|1.0|
## +---+---+---+

or, in the latest versions, keep:

indexer.setHandleInvalid("keep").fit(train).transform(test).show()

## +---+------+---+
## |  k|     v| vi|
## +---+------+---+
## |  3|   foo|0.0|
## |  4|foobar|2.0|
## +---+------+---+


回答2:

Okay I think I got this. At least I got this working.

Caching the dataframe(including train/test partes) solves the problem. That's what I found in this JIRA issue: https://issues.apache.org/jira/browse/SPARK-12590.

So it's not a bug, just the fact that randomSample might yield a different result on the same, but differently partitioned dataset. And apparently, some of my munging functions (or Pipeline) involve repartition, therefore, results of the trainset recomputation from its definition might diverge.

What still interests me - it's the reproducibility: it's always 'pl-PL' row that gets mixed in the wrong part of the dataset, i.e. it's not random repartition. It's deterministic, just inconsistent. I wonder how exactly it happens.