Can I use setWorkerCacheMb in Apache Beam 2.0+?

2019-08-18 14:35发布

问题:

My Dataflow job (using Java SDK 2.1.0) is quite slow and it is going to take more than a day to process just 50GB. I just pull a whole table from BigQuery (50GB), join with one csv file from GCS (100+MB).

https://cloud.google.com/dataflow/model/group-by-key
I use sideInputs to perform join (the latter way in the documentation above) while I think using CoGroupByKey is more efficient, however I'm not sure that is the only reason my job is super slow.

I googled and it looks by default, a cache of sideinputs set as 100MB and I assume my one is slightly over that limit then each worker continuously re-reads sideinputs. To improve it, I thought I can use setWorkerCacheMb method to increase the cache size.

However it looks DataflowPipelineOptions does not have this method and DataflowWorkerHarnessOptions is hidden. Just passing --workerCacheMb=200 in -Dexec.args results in

An exception occured while executing the Java class.
null: InvocationTargetException:
Class interface com.xxx.yyy.zzz$MyOptions missing a property
named 'workerCacheMb'. -> [Help 1]

How can I use this option? Thanks.

My pipeline:

MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

Pipeline p = Pipeline.create(options);

PCollection<TableRow> rows = p.apply("Read from BigQuery",
        BigQueryIO.read().from("project:MYDATA.events"));

// Read account file
PCollection<String> accounts = p.apply("Read from account file",
        TextIO.read().from("gs://my-bucket/accounts.csv")
                .withCompressionType(CompressionType.GZIP));
PCollection<TableRow> accountRows = accounts.apply("Convert to TableRow",
        ParDo.of(new DoFn<String, TableRow>() {
            private static final long serialVersionUID = 1L;

            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                String line = c.element();
                CSVParser csvParser = new CSVParser();
                String[] fields = csvParser.parseLine(line);

                TableRow row = new TableRow();
                row = row.set("account_id", fields[0]).set("account_uid", fields[1]);
                c.output(row);
            }
        }));
PCollection<KV<String, TableRow>> kvAccounts = accountRows.apply("Populate account_uid:accounts KV",
        ParDo.of(new DoFn<TableRow, KV<String, TableRow>>() {
            private static final long serialVersionUID = 1L;

            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                TableRow row = c.element();
                String uid = (String) row.get("account_uid");
                c.output(KV.of(uid, row));
            }
        }));
final PCollectionView<Map<String, TableRow>> uidAccountView = kvAccounts.apply(View.<String, TableRow>asMap());

// Add account_id from account_uid to event data
PCollection<TableRow> rowsWithAccountID = rows.apply("Join account_id",
        ParDo.of(new DoFn<TableRow, TableRow>() {
            private static final long serialVersionUID = 1L;

            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                TableRow row = c.element();

                if (row.containsKey("account_uid") && row.get("account_uid") != null) {
                    String uid = (String) row.get("account_uid");
                    TableRow accRow = (TableRow) c.sideInput(uidAccountView).get(uid);
                    if (accRow == null) {
                        LOG.warn("accRow null, {}", row.toPrettyString());
                    } else {
                        row = row.set("account_id", accRow.get("account_id"));
                    }
                }
                c.output(row);
            }
        }).withSideInputs(uidAccountView));

// Insert into BigQuery
WriteResult result = rowsWithAccountID.apply(BigQueryIO.writeTableRows()
        .to(new TableRefPartition(StaticValueProvider.of("MYDATA"), StaticValueProvider.of("dev"),
                StaticValueProvider.of("deadletter_bucket")))
        .withFormatFunction(new SerializableFunction<TableRow, TableRow>() {
            private static final long serialVersionUID = 1L;

            @Override
            public TableRow apply(TableRow row) {
                return row;
            }
        }).withCreateDisposition(CreateDisposition.CREATE_NEVER)
        .withWriteDisposition(WriteDisposition.WRITE_APPEND));

p.run();

Historically my system have two identifiers of users, new one (account_id) and old one(account_uid). Now I need to add new account_id to our event data stored in BigQuery retroactively, because old data only has old account_uid. Accounts table (which has relation between account_uid and account_id) is already converted as csv and stored in GCS.

The last func TableRefPartition just store data into BQ's corresponding partition depending on each event timestamp. The job is still running (2017-10-30_22_45_59-18169851018279768913) and bottleneck looks Join account_id part. That part of throughput (xxx elements/s) goes up and down according to the graph. According to the graph, estimated size of sideInputs is 106MB.

If switching to CoGroupByKey improves performance dramatically, I will do so. I was just lazy and thought using sideInputs is easier to handle event data which doesn't have account info as well.

回答1:

There's a few things you can do to improve the performance of your code:

  • Your side input is a Map<String, TableRow>, but you're using only a single field in the TableRow - accRow.get("account_id"). How about making it a Map<String, String> instead, having the value be the account_id itself? That'll likely be quite a bit more efficient than the bulky TableRow object.
  • You could extract the value of the side input into a lazily initialized member variable in your DoFn, to avoid repeated invocations of .sideInput().

That said, this performance is unexpected and we are investigating whether there's something else going on.



回答2:

Try one of:

1) setting the option using some code:

options.as(DataflowWorkerHarnessOptions.class).setWorkerCacheMb(500);

2) having your application register DataflowWorkerHarnessOptions with the PipelineOptionsFactory

3) Having your own options class extend DataflowWorkerHarnessOptions