Unable to write to S3 using S3 sink using StreamEx

2019-07-23 02:33发布

问题:

I have created a simple Apache Flink project that will read data from a Kafka topic and write that data to an S3 bucket. I do not receive any errors when I run the project and it successfully reads each message from the Kafka topic, but nothing is written to my S3 bucket. There are no errors so it is difficult to try and debug what is going on. Below is my project and my configurations. This is only occurring when I am using a StreamExecutionEnviornment. If I try to just produce to S3 using a regular batch ExecutionEnviornment it works.

S3 Test Java Program

public class S3Test {

public static void main(String[] args) throws Exception {
    // parse input arguments
    final ParameterTool parameterTool = ParameterTool.fromPropertiesFile(args[0]);

    if(parameterTool.getNumberOfParameters() < 4) {
        System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " +
                "--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>");
        return;
    }

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.getConfig().disableSysoutLogging();
    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
    env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
    env.getConfig().setGlobalJobParameters(parameterTool); //make parameters available in the web interface

    DataStream<String> messageStream = env
            .addSource(new FlinkKafkaConsumer09<String>(
                    parameterTool.getRequired("kafka.topic"),
                    new SimpleStringSchema(),
                    parameterTool.getProperties()));


    // write kafka stream to standard out.
    //messageStream.print();
    String id = UUID.randomUUID().toString();
    messageStream.writeAsText("s3://flink-data/" + id + ".txt").setParallelism(1);

    env.execute("Write to S3 Example");
}
}

pom.xml

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.1.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.10</artifactId>
        <version>1.1.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.10</artifactId>
        <version>1.1.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.9_2.10</artifactId>
        <version>1.1.4</version>
    </dependency>

    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk</artifactId>
        <version>1.7.4</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-aws</artifactId>
        <version>2.7.2</version>
    </dependency>

    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
        <version>4.2.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpcore</artifactId>
        <version>4.2.5</version>
    </dependency>

    <!-- Apache Kafka Dependencies -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.9.0.1</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

</dependencies>

core-site.xml (Hadoop configurations)

<configuration>
<property>
    <name>fs.defaultFS</name>
    <value>hdfs://localhost:9000</value>
</property>

<property>
   <name>fs.s3.impl</name>
   <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>

<!-- Comma separated list of local directories used to buffer
 large results prior to transmitting them to S3. -->
<property>
  <name>fs.s3a.buffer.dir</name>
  <value>/tmp</value>
</property>

<!-- set your AWS ID using key defined in org.apache.hadoop.fs.s3a.Constants -->
<property>
    <name>fs.s3a.access.key</name>
    <value>***************</value>
</property>

<!-- set your AWS access key -->
<property>
    <name>fs.s3a.secret.key</name>
    <value>****************</value>
</property>

</configuration>

回答1:

Persisting from Kafka topic to S3 via Flink requires the use of the RollingSink. RollingSink uses Bucketer to specify the name of the directories to which the part files will be saved. DateTime is the default Bucketer, but you can also create a custom one. Part files will be saved and closed whenever the max batch size is reached and then a new part file will be created. The code below works:

public class TestRollingSink {

    public static void main(String[] args){
        Map<String, String> configs = ConfigUtils.loadConfigs("/Users/path/to/config.yaml");

    final ParameterTool parameterTool = ParameterTool.fromMap(configs);

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.getConfig().disableSysoutLogging();
    env.getConfig().setGlobalJobParameters(parameterTool);
    env.socketTextStream("localhost", 9092);

    DataStream<String> parsed = env
            .addSource(new FlinkKafkaConsumer09<String>(
                    parameterTool.getRequired("kafka.topic"),
                    new SimpleStringSchema(),
                    parameterTool.getProperties()));

    env.enableCheckpointing(2000, CheckpointingMode.AT_LEAST_ONCE);

    RollingSink<String> sink = new RollingSink<String>("s3://flink-test/"+"TEST");
    sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd--HHmm"));
    sink.setWriter(new StringWriter<String>());
    sink.setBatchSize(200);
    sink.setPendingPrefix("file-");
    sink.setPendingSuffix(".txt");
    parsed.print();
    parsed.addSink(sink).setParallelism(1);

    try {
        env.execute();
    } catch (Exception e) {
        e.printStackTrace();
    }
}

}



回答2:

IAM permissions - Make sure the role you are asumming to write to S3 bucket.



回答3:

A simple way to help you get some debug information would be to turn on logging for the s3 bucket that is supposed to receive the kafka data. This will give you more information to help determine the source of the error from the s3 perspective:

http://docs.aws.amazon.com/AmazonS3/latest/UG/ManagingBucketLogging.html