运行不EMR当地DynamoDB火花工作(Run local DynamoDB spark job

2019-10-29 05:49发布

我要在不使用EMR集群,从一些表中读取数据运行本地Dynamodb火花的工作,并把它写到镶/ CSV文件。 我没有发现支持,也许你有什么想法任何火花发电机连接器?

我的代码示例:

import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.mapred.JobConf
import org.apache.spark.sql.SparkSession

object copyDynamoTable extends App {
  val spark = SparkSession
    .builder()
    .appName("test")
    .master("local")
    .getOrCreate()

  val jobConf = new JobConf(spark.sparkContext.hadoopConfiguration)
  jobConf.set("dynamodb.servicename", "dynamodb")
  jobConf.set("dynamodb.input.tableName", "hen.poc.client") // Pointing to DynamoDB table
  jobConf.set("dynamodb.endpoint", "dynamodb.us-east-1.amazonaws.com")
  jobConf.set("dynamodb.regionid", "us-east-1")
  jobConf.set("dynamodb.throughput.read", "1")
  jobConf.set("dynamodb.throughput.read.percent", "1")
  jobConf.set("dynamodb.version", "2011-12-05")

  jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")
  jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat")

  val orders = spark.sparkContext.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])

  println(orders.count)

我收到以下异常:

18/09/05 17:06:41 INFO util.TaskCalculator: Cluster has 1 active nodes.
18/09/05 17:06:41 WARN util.ClusterTopologyNodeCapacityProvider: Exception when trying to determine instance types
java.nio.file.NoSuchFileException: /mnt/var/lib/info/job-flow.json
    at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
    at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
    at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
    at java.nio.file.Files.newByteChannel(Files.java:361)
    at java.nio.file.Files.newByteChannel(Files.java:407)
    at java.nio.file.Files.readAllBytes(Files.java:3152)
    at org.apache.hadoop.dynamodb.util.ClusterTopologyNodeCapacityProvider.readJobFlowJsonString(ClusterTopologyNodeCapacityProvider.java:103)
    at org.apache.hadoop.dynamodb.util.ClusterTopologyNodeCapacityProvider.getCoreNodeMemoryMB(ClusterTopologyNodeCapacityProvider.java:42)
    at org.apache.hadoop.dynamodb.util.TaskCalculator.getMaxMapTasks(TaskCalculator.java:54)
    at org.apache.hadoop.dynamodb.DynamoDBUtil.calcMaxMapTasks(DynamoDBUtil.java:265)
    at org.apache.hadoop.dynamodb.read.AbstractDynamoDBInputFormat.getSplits(AbstractDynamoDBInputFormat.java:47)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1162)
    at com.data.spark.dynamodb.copyDynamoTable$.delayedEndpoint$com$riskified$data$spark$dynamodb$copyDynamoTable$1(copyDynamoTable.scala:30)
    at com.data.spark.dynamodb.copyDynamoTable$delayedInit$body.apply(copyDynamoTable.scala:9)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)
    at com.data.spark.dynamodb.copyDynamoTable$.main(copyDynamoTable.scala:9)
    at com.data.spark.dynamodb.copyDynamoTable.main(copyDynamoTable.scala)
Exception in thread "main" java.lang.ArithmeticException: / by zero

Answer 1:

这是一个文件,该文件是存在的EMR集群上。 这是试图确定它运行的是何种类型的实例对,以确定一些职位的设置,如内存。 显然,在本地运行,你就没有这个文件,这是预期。

请按下面的主题:

EMR / github.com /问题/ 50



文章来源: Run local DynamoDB spark job without EMR