我一直试图找到一种合理的方式来测试SparkSession
与JUnit测试框架。 虽然似乎是很好的例子SparkContext
,我无法弄清楚如何获得相应的例子为工作SparkSession
,即使它在几个地方使用内部的火花试验基地 。 我很乐意去尝试不使用火花试验基地,以及如果它是不是真的到这里去正确的方式解决。
简单的测试用例( 完整MWE项目与build.sbt
):
import com.holdenkarau.spark.testing.DataFrameSuiteBase
import org.junit.Test
import org.scalatest.FunSuite
import org.apache.spark.sql.SparkSession
class SessionTest extends FunSuite with DataFrameSuiteBase {
implicit val sparkImpl: SparkSession = spark
@Test
def simpleLookupTest {
val homeDir = System.getProperty("user.home")
val training = spark.read.format("libsvm")
.load(s"$homeDir\\Documents\\GitHub\\sample_linear_regression_data.txt")
println("completed simple lookup test")
}
}
用JUnit运行这个的结果是在负载线的NPE:
java.lang.NullPointerException
at SessionTest.simpleLookupTest(SessionTest.scala:16)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
注意它不应该的问题装载的文件存在与否; 在一个正确配置的SparkSession,一个更明智的错误将被抛出 。
感谢您把这个悬而未决的问题在那里。 出于某种原因,当谈到火花,每个人都得到这样的,他们忘了所出现过去15年左右的伟大的软件工程实践的分析赶上了。 这就是为什么我们让一个点来讨论测试和持续集成(其中像DevOps的其他东西)在我们的课程。
一个快速除术语
一个真正的单元测试意味着你有过在测试每个组件的完全控制。 可以有与数据库,REST调用,文件系统,甚至是系统时钟没有相互作用; 一切都要“翻番”(例如嘲笑,存根等)作为杰拉德Mezaros把它的xUnit测试模式 。 我知道这看起来像语义,但它真的很重要。 如果不明白这是为什么你看到在持续集成间歇测试失败的一个重要原因。
我们仍然可以单元测试
所以,有了这样的认识,单元测试的RDD
是不可能的。 然而,仍然有单元测试开发分析时的地方。
考虑一个简单的操作:
rdd.map(foo).map(bar)
这里foo
和bar
是简单的功能。 这些可单位以正常的方式进行测试,他们应该与尽可能多的角落情况下,你可以鼓起。 毕竟,为什么他们关心他们来自哪里,是否是一个测试夹具或得到他们的投入RDD
?
不要忘记星火壳牌
这不是测试本身 ,而在这些早期阶段,你也应该在Spark外壳可以尝试找出你的变换和你的方法,尤其是后果。 例如,您可以检查物理和逻辑查询计划,分区策略和保存,并与像许多不同的功能数据的状态toDebugString
, explain
, glom
, show
, printSchema
,等等。 我会让你探索的。
您还可以设置你的主人到local[2]
在Spark外壳,并在您的测试,以确定一旦你开始分配工作可能仅出现的任何问题。
集成测试与星火
现在到了有趣的东西。
为了集成测试星火你觉得在你的辅助功能和质量的信心后RDD
/ DataFrame
转换逻辑,关键是做了几件事情(无论构建工具和测试框架):
- 增加JVM内存。
- 启用分叉但禁用并行执行。
- 使用测试框架,积累你的星火集成测试成套房,并初始化
SparkContext
所有测试前,所有的测试后停止。
随着ScalaTest,您可以在混合BeforeAndAfterAll
(我一般喜欢)或BeforeAndAfterEach
作为@ShankarKoirala做初始化和拆除星火文物。 我知道这是一个合理的地方,使一个例外,但我真的不喜欢那些易变的var
,你必须虽然使用s。
贷款模式
另一种方法是使用贷款模式 。
例如(使用ScalaTest):
class MySpec extends WordSpec with Matchers with SparkContextSetup {
"My analytics" should {
"calculate the right thing" in withSparkContext { (sparkContext) =>
val data = Seq(...)
val rdd = sparkContext.parallelize(data)
val total = rdd.map(...).filter(...).map(...).reduce(_ + _)
total shouldBe 1000
}
}
}
trait SparkContextSetup {
def withSparkContext(testMethod: (SparkContext) => Any) {
val conf = new SparkConf()
.setMaster("local")
.setAppName("Spark test")
val sparkContext = new SparkContext(conf)
try {
testMethod(sparkContext)
}
finally sparkContext.stop()
}
}
正如你所看到的,在贷款模式利用了高阶函数“贷款”的SparkContext
的测试,然后它完成后处置。
痛苦,面向对象编程(谢谢,内森)
这是完全喜好的问题,但我更愿意把在另一个框架之前,只要我可以使用的贷款模式和电线的事情了我自己。 除了只是试图保持轻巧,框架有时会增添了不少的“神奇”,使调试测试失败难推理。 所以我采取了面向苦难的编程方法-在这里我避免增加一个新的框架,直到没有它的痛苦实在无法忍受。 但同样,这是你的。
该备用框架的最佳选择是当然的火花试验基如@ShankarKoirala提及。 在这种情况下,上述测试是这样的:
class MySpec extends WordSpec with Matchers with SharedSparkContext {
"My analytics" should {
"calculate the right thing" in {
val data = Seq(...)
val rdd = sc.parallelize(data)
val total = rdd.map(...).filter(...).map(...).reduce(_ + _)
total shouldBe 1000
}
}
}
注意:我怎么没有做任何事情来对付SparkContext
。 SharedSparkContext
给了我一切-与sc
作为SparkContext
-获得自由。 个人虽然我不会在此依赖带来的只是这个目的,因为贷款模式不正是我需要为。 此外,有这么多的不可预测性,与分布式系统发生的,它可以是一个真正的痛苦都通过发生在第三方库的源代码,当事情出错的不断融合魔法追查。
现在这里火花试验基地真正的亮点是与基于Hadoop的助手一样HDFSClusterLike
和YARNClusterLike
。 在混合这些特质真的可以为您节省大量的安装痛苦。 照到哪里哪里的地方是与Scalacheck般的性能和发电机-假设你当然知道如何基于属性的测试工作,为什么它是非常有用的。 但同样,我会亲自暂缓使用它,直到我的分析和我的测试中达到成熟的那个级别。
“只有西斯交易中绝对的。” -欧比旺
当然,你不必选择一个或其他任。 也许你可以使用大部分的测试和火花试验基地的贷款模式的方法只有几个,更严峻的考验。 选择不是二进制; 你可以两者都做。
集成测试与星火流
最后,我只想提出一个什么样的SparkStreaming集成测试设置的内存值可能看起来像没有火花试验基地的一个片段:
val sparkContext: SparkContext = ...
val data: Seq[(String, String)] = Seq(("a", "1"), ("b", "2"), ("c", "3"))
val rdd: RDD[(String, String)] = sparkContext.parallelize(data)
val strings: mutable.Queue[RDD[(String, String)]] = mutable.Queue.empty[RDD[(String, String)]]
val streamingContext = new StreamingContext(sparkContext, Seconds(1))
val dStream: InputDStream = streamingContext.queueStream(strings)
strings += rdd
这是简单的比它的外观。 这真的只是原来数据序列到队列中喂到DStream
。 它的大部分实际上只是样板设置,与星火API的工作。 无论如何,你可以比较这StreamingSuiteBase
如发现 火花试验基地 ,以决定您喜欢哪一种。
这可能是我永远的最长的职位,所以我会离开这里。 我希望其他人附和其他的想法,以帮助改善我们的分析与具有改进的所有其他应用程序的开发一样敏捷软件工程实践的质量。
并与道歉无耻的插件,你可以看看我们的课程与Apache星火分析 ,在这里我们可以解决很多这些点子多。 我们希望尽快有一个在线版本。
你可以写与FunSuite和BeforeAndAfterEach像下面一个简单的测试
class Tests extends FunSuite with BeforeAndAfterEach {
var sparkSession : SparkSession = _
override def beforeEach() {
sparkSession = SparkSession.builder().appName("udf testings")
.master("local")
.config("", "")
.getOrCreate()
}
test("your test name here"){
//your unit test assert here like below
assert("True".toLowerCase == "true")
}
override def afterEach() {
sparkSession.stop()
}
}
你不需要测试,你可以简单地写为创建一个功能
test ("test name") {//implementation and assert}
霍顿卡劳写了非常好的测试火花测试基地
你需要看看下面是一个简单的例子
class TestSharedSparkContext extends FunSuite with SharedSparkContext {
val expectedResult = List(("a", 3),("b", 2),("c", 4))
test("Word counts should be equal to expected") {
verifyWordCount(Seq("c a a b a c b c c"))
}
def verifyWordCount(seq: Seq[String]): Unit = {
assertResult(expectedResult)(new WordCount().transform(sc.makeRDD(seq)).collect().toList)
}
}
希望这可以帮助!
我喜欢创造一个SparkSessionTestWrapper
特质,可以混合到测试类。 Shankar的方法有效,但它是为测试套件与多个文件过于缓慢。
import org.apache.spark.sql.SparkSession
trait SparkSessionTestWrapper {
lazy val spark: SparkSession = {
SparkSession.builder().master("local").appName("spark session").getOrCreate()
}
}
该性状可以使用如下:
class DatasetSpec extends FunSpec with SparkSessionTestWrapper {
import spark.implicits._
describe("#count") {
it("returns a count of all the rows in a DataFrame") {
val sourceDF = Seq(
("jets"),
("barcelona")
).toDF("team")
assert(sourceDF.count === 2)
}
}
}
检查火花规范项目使用了一个真实的例子SparkSessionTestWrapper
方法。
更新
该火花试验基地库会自动添加某些性状的测试类混合的SparkSession(例如,当DataFrameSuiteBase
混入,你必须通过访问SparkSession spark
变量)。
我创建了一个名为独立测试库火花快速测试运行他们的测试时,给用户SparkSession的完全控制。 我不认为一个测试助手库应设置SparkSession。 用户应该能够启动和停止其SparkSession他们认为合适的(我喜欢创造一个SparkSession,并用它在整个测试套件运行)。
这里的火花快速测试的一个例子assertSmallDatasetEquality
在操作方法:
import com.github.mrpowers.spark.fast.tests.DatasetComparer
class DatasetSpec extends FunSpec with SparkSessionTestWrapper with DatasetComparer {
import spark.implicits._
it("aliases a DataFrame") {
val sourceDF = Seq(
("jose"),
("li"),
("luisa")
).toDF("name")
val actualDF = sourceDF.select(col("name").alias("student"))
val expectedDF = Seq(
("jose"),
("li"),
("luisa")
).toDF("student")
assertSmallDatasetEquality(actualDF, expectedDF)
}
}
}
由于星火1.6,你可以使用SharedSparkContext
或SharedSQLContext
火花用于其自己的单元测试:
class YourAppTest extends SharedSQLContext {
var app: YourApp = _
protected override def beforeAll(): Unit = {
super.beforeAll()
app = new YourApp
}
protected override def afterAll(): Unit = {
super.afterAll()
}
test("Your test") {
val df = sqlContext.read.json("examples/src/main/resources/people.json")
app.run(df)
}
由于星火2.3 SharedSparkSession
可用:
class YourAppTest extends SharedSparkSession {
var app: YourApp = _
protected override def beforeAll(): Unit = {
super.beforeAll()
app = new YourApp
}
protected override def afterAll(): Unit = {
super.afterAll()
}
test("Your test") {
df = spark.read.json("examples/src/main/resources/people.json")
app.run(df)
}
更新:
Maven的依赖关系:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql</artifactId>
<version>SPARK_VERSION</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
SBT依赖性:
"org.apache.spark" %% "spark-sql" % SPARK_VERSION % Test classifier "tests"
此外,你可以检查测试源星火那里有一个巨大的集各种测试服。
我可以解决下面的代码问题
火花蜂巢关系在项目POM添加
class DataFrameTest extends FunSuite with DataFrameSuiteBase{
test("test dataframe"){
val sparkSession=spark
import sparkSession.implicits._
var df=sparkSession.read.format("csv").load("path/to/csv")
//rest of the operations.
}
}
使用JUnit的另一种方法进行单元测试
import org.apache.spark.sql.SparkSession
import org.junit.Assert._
import org.junit.{After, Before, _}
@Test
class SessionSparkTest {
var spark: SparkSession = _
@Before
def beforeFunction(): Unit = {
//spark = SessionSpark.getSparkSession()
spark = SparkSession.builder().appName("App Name").master("local").getOrCreate()
System.out.println("Before Function")
}
@After
def afterFunction(): Unit = {
spark.stop()
System.out.println("After Function")
}
@Test
def testRddCount() = {
val rdd = spark.sparkContext.parallelize(List(1, 2, 3))
val count = rdd.count()
assertTrue(3 == count)
}
@Test
def testDfNotEmpty() = {
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val numDf = spark.sparkContext.parallelize(List(1, 2, 3)).toDF("nums")
assertFalse(numDf.head(1).isEmpty)
}
@Test
def testDfEmpty() = {
val sqlContext = spark.sqlContext
import sqlContext.implicits._
val emptyDf = spark.sqlContext.createDataset(spark.sparkContext.emptyRDD[Num])
assertTrue(emptyDf.head(1).isEmpty)
}
}
case class Num(id: Int)