有什么可以用作测试存根CassandraWriterBolt?(What can be used a

2019-10-28 14:09发布

我读卡夫卡一个JSON,FieldExtractionBolt读取JSON数据提取到元组值并将其传递给CassandraWriterBolt,这反过来写在卡珊德拉的记录写入所有元组值成单独的列。

卡夫卡JSON消息 -

{"pair":"GBPJPY","bid":134.4563,"ask":134.4354}

FieldExtractionBolt -

String message = tuple.getStringByField("message");
Map values = new Gson().fromJson(message, Map.class);
basicOutputCollector.emit(new Values(values.get("pair"), values.get("bid"), values.get("ask")));

CassandraWriterBolt -

return (CassandraWriterBolt) new CassandraWriterBolt(async(simpleQuery("INSERT INTO currency(pair, ask, bid) VALUES (?, ?, ?);").with(fields("pair", "ask", "bid")))

我试着写基于这里给出的答案测试- 如何通过编程方式将消息发送到端到端风暴拓扑的测试功能

在我的项目,我在Spring中定义的配置我所有的螺栓,嘴和溪流。 这使得读/写我的拓扑结构非常简单。 我通过获取螺栓,嘴和流咖啡豆的ApplicationContext建立拓扑。 在我的Spring配置,KafkaSpout和CassandraWriterBolt在下面“督促”配置文件定义,让他们只在督促下“测试”使用档案I定义KafkaSpout和CassandraWriterBolt存根。 对于KafkaSpout,我用FixedToupleSpout和CassandraWriterBolt我用TestWordCounter。

这是我的测试

        @Test
        public void testTopology(){
        StormTopology topology = SpringBasedTopologyBuilder.getInstance().buildStormTopologyUsingApplicationContext(applicationContext);
        TestJob COMPLETE_TOPOLOGY_TESTJOB = (cluster) -> {
              MockedSources mocked = new MockedSources();
                    mocked.addMockData("kafkaSpout",
                new Values("{\"pair\":\"GBPJPY\",\"bid\":134.4563,\"ask\":134.4354}"),
        new Values("{\"pair\":\"GBPUSD\",\"bid\":1.4563,\"ask\":1.4354}"));

        Config topoConf = new Config();
        topoConf.setNumWorkers(2);

        CompleteTopologyParam ctp = new CompleteTopologyParam();
        ctp.setMockedSources(mocked);
        ctp.setStormConf(topoConf);
        Map<String, List<FixedTuple>> results = Testing.completeTopology(cluster, topology, ctp);
                    List<List<Object>> cassandraTuples = Testing.readTuples(results, "cassandraWriterBolt");
        List<List<Object>> expectedCassandraTuples = Arrays.asList(Arrays.asList("GBPJPY", 1), Arrays.asList("GBPUSD", 1),
                Arrays.asList("134.4563", 1), Arrays.asList("1.4563", 1), Arrays.asList("134.4354", 2));
        assertTrue(expectedCassandraTuples + " expected, but found " + cassandraTuples,
                Testing.multiseteq(expectedCassandraTuples, cassandraTuples));
    MkClusterParam param = new MkClusterParam();
    param.setSupervisors(4);

    Testing.withSimulatedTimeLocalCluster(param, COMPLETE_TOPOLOGY_TESTJOB);
}

@Configuration
@Import(MainApplication.class)
public static class TestConfig
{
    @Bean
    public IRichSpout kafkaSpout(){
        return new FixedTupleSpout(Arrays.asList(new FixedTuple(Arrays.asList("{\"pair\":\"GBPJPY\",\"bid\":134.4563,\"ask\":134.4354"))), new Fields(new String[]{"message"}));
    }

    @Bean
    public IBasicBolt cassandraWriterBolt(){
        return new TestWordCounter();
    }
}

结果我得到的是不是我期待的。 我收到以下错误 -

        java.lang.AssertionError: [[GBPJPY, 1], [GBPUSD, 1], [134.4563, 1], [1.4563, 1], [134.4354, 2]] expected, but found [[GBPJPY, 1], [GBPUSD, 1]]

看起来,TestWordCounter只是读第一个值作为一个元组(仅货币对以及跳过买入和卖出)。 似乎TestWordCounter这里不是正确的选择。 什么是正确的存根CassandraWriterBolt,这样我可以断言,这样会得到2只记录一个GBPJPY,另一个为英镑兑美元与他们的买入价和卖出价呢?

Answer 1:

Testing.readTuples(results, "cassandraWriterBolt")将返回“cassandraWriterBolt”发出的元组。 那是你要考什么? 我认为你正在试图断言哪些元组“cassandraWriterBolt”接收,而不是它发出的。

你可以在这里做两件事情。 您可以使用readTuples从被发射,而不是从卡桑德拉螺栓阅读卡桑德拉螺栓,螺栓阅读。 这是一个不错的解决方案,如果您的拓扑结构是简单的(例如,不许多不同的螺栓写入卡桑德拉螺栓)。

更好的解决方案(IMO)是编写一个简单的存根螺栓来代替TestWordCounter 。 螺栓应该做的唯一的事情是接收输入的元组,ACK它,并在新的元组发出的值。

execute(Tuple input, BasicOutputCollector collector) {
  collector.emit(input.getValues());
}

然后你可以使用readTuples读取该螺栓发出的元组,这将是它接收到相同的值。



文章来源: What can be used as a test stub for CassandraWriterBolt?