DataflowAssert doesn't pass TableRow test

2019-09-11 03:44发布

问题:

We don't know why when running this simple test, DataflowAssert fails:

  @Test
  @Category(RunnableOnService.class)
  public void testTableRow() throws Exception {
      Pipeline p = TestPipeline.create();
      PCollection<TableRow> pCollectionTable1 = p.apply("a",Create.of(TABLEROWS_ARRAY_1));
      PCollection<TableRow> pCollectionTable2 = p.apply("b",Create.of(TABLEROWS_ARRAY_2));
      PCollection<TableRow> joinedTables = Table.join(pCollectionTable1, pCollectionTable2);
      DataflowAssert.that(joinedTables).containsInAnyOrder(TABLEROW_TEST);
      p.run();
  }

We are getting the following exception:

    Sep 25, 2015 10:42:50 AM com.google.cloud.dataflow.sdk.testing.DataflowAssert$TwoSideInputAssert$CheckerDoFn processElement 
SEVERE: DataflowAssert failed expectations.
 java.lang.AssertionError: 
   Expected: iterable over [<{id=x}>] in any order
     but: Not matched: <{id=x}>
    at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
    at org.junit.Assert.assertThat(Assert.java:865)
    at org.junit.Assert.assertThat(Assert.java:832)
    at ...

In order to simplify the DataflowAssert test we hardcoded the output of Table.join to match DataflowAssert,having:

private static final TableRow TABLEROW_TEST = new TableRow()
        .set("id", "x");


static PCollection<TableRow> join(PCollection<TableRow> pCollectionTable1,
        PCollection<TableRow> pCollectionTable2) throws Exception {

    final TupleTag<String> pCollectionTable1Tag = new TupleTag<String>();
    final TupleTag<String> pCollectionTable2Tag = new TupleTag<String>();

    PCollection<KV<String, String>> table1Data = pCollectionTable1
            .apply(ParDo.of(new ExtractTable1DataFn()));
    PCollection<KV<String, String>> table2Data = pCollectionTable2
            .apply(ParDo.of(new ExtractTable2DataFn()));

    PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple
            .of(pCollectionTable1Tag, table1Data).and(pCollectionTable2Tag, table2Data)
            .apply(CoGroupByKey.<String> create());

    PCollection<KV<String, String>> resultCollection = kvpCollection
            .apply(ParDo.named("Process join")
                    .of(new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
                        private static final long serialVersionUID = 0;

                        @Override
                        public void processElement(ProcessContext c) {
                            // System.out.println(c);
                            KV<String, CoGbkResult> e = c.element();
                            String key = e.getKey();
                            String value = null;
                            for (String table1Value : c.element().getValue().getAll(pCollectionTable2Tag)) {

                                for (String table2Value : c.element().getValue().getAll(pCollectionTable2Tag)) {
                                    value = table1Value + "," + table2Value;
                                }
                            }
                            c.output(KV.of(key, value));
                        }
                    }));

    PCollection<TableRow> formattedResults = resultCollection.apply(
            ParDo.named("Format join").of(new DoFn<KV<String, String>, TableRow>() {
                private static final long serialVersionUID = 0;

                public void processElement(ProcessContext c) {
                    TableRow row = new TableRow().set("id", "x");
                    c.output(row);                      
                }
            }));

    return formattedResults;
}

Does anyone know what we are doing wrong?

回答1:

I think the error message is telling you that the actual collection contains more copies of that element than the expectation.

Expected: iterable over [<{id=x}>] in any order
 but: Not matched: <{id=x}>

This is hamcrest indicating that you wanted an iterable over a single element, but the actual collection had an item which wasn't matched. Since all of the items coming out of "format join" have the same value, it made this harder to read than it should have been.

Specifically, this is the message produced when I run the following test, which checks to see if the collection with two copies of row is the contains exactly one copy of row:

@Category(RunnableOnService.class)
@Test
public void testTableRow() throws Exception {
  Pipeline p = TestPipeline.create();

  TableRow row = new TableRow().set("id", "x");

  PCollection<TableRow> rows = p.apply(Create.<TableRow>of(row, row));
  DataflowAssert.that(rows).containsInAnyOrder(row);

  p.run();
}

In order to get that result with your code, I had to take advantage of the fact that you only iterate over entries in table2. Specifically:

// Use these as the input tables.
table1 = [("keyA", "A1a"), ("keyA", "A1b]
table2 = [("keyA", "A2a"), ("keyA", "A2b"), ("keyB", "B2")]

// The CoGroupByKey returns
[("keyA", (["A1a", "A1b"], ["A2a", "A2b"])),
 ("keyB", ([], ["B2"]))]

// When run through "Process join" this produces.
// For details on why see the next section.
["A2b,A2b",
 "B2,B2"]

// When run through "Format join" this becomes the following.
[{id=x}, {id=x}]

Note that the DoFn for "Process join" may not produce the expected results as commented below:

String key = e.getKey();
String value = null;
// NOTE: Both table1Value and table2Value iterate over pCollectionTable2Tag
for (String table1Value : c.element().getValue().getAll(pCollectionTable2Tag)) {
    for (String table2Value : c.element().getValue().getAll(pCollectionTable2Tag)) {
        // NOTE: this updates value, and doesn't output it. So for each
        // key there will be a single output with the *last* value
        // rather than one for each pair.
        value = table1Value + "," + table2Value;
    }
}
c.output(KV.of(key, value));