Can I use setWorkerCacheMb in Apache Beam 2.0+?

2019-08-18 14:43发布

My Dataflow job (using Java SDK 2.1.0) is quite slow and it is going to take more than a day to process just 50GB. I just pull a whole table from BigQuery (50GB), join with one csv file from GCS (100+MB).

https://cloud.google.com/dataflow/model/group-by-key
I use sideInputs to perform join (the latter way in the documentation above) while I think using CoGroupByKey is more efficient, however I'm not sure that is the only reason my job is super slow.

I googled and it looks by default, a cache of sideinputs set as 100MB and I assume my one is slightly over that limit then each worker continuously re-reads sideinputs. To improve it, I thought I can use setWorkerCacheMb method to increase the cache size.

However it looks DataflowPipelineOptions does not have this method and DataflowWorkerHarnessOptions is hidden. Just passing --workerCacheMb=200 in -Dexec.args results in

An exception occured while executing the Java class.
null: InvocationTargetException:
Class interface com.xxx.yyy.zzz$MyOptions missing a property
named 'workerCacheMb'. -> [Help 1]

How can I use this option? Thanks.

My pipeline:

MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

Pipeline p = Pipeline.create(options);

PCollection<TableRow> rows = p.apply("Read from BigQuery",
        BigQueryIO.read().from("project:MYDATA.events"));

// Read account file
PCollection<String> accounts = p.apply("Read from account file",
        TextIO.read().from("gs://my-bucket/accounts.csv")
                .withCompressionType(CompressionType.GZIP));
PCollection<TableRow> accountRows = accounts.apply("Convert to TableRow",
        ParDo.of(new DoFn<String, TableRow>() {
            private static final long serialVersionUID = 1L;

            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                String line = c.element();
                CSVParser csvParser = new CSVParser();
                String[] fields = csvParser.parseLine(line);

                TableRow row = new TableRow();
                row = row.set("account_id", fields[0]).set("account_uid", fields[1]);
                c.output(row);
            }
        }));
PCollection<KV<String, TableRow>> kvAccounts = accountRows.apply("Populate account_uid:accounts KV",
        ParDo.of(new DoFn<TableRow, KV<String, TableRow>>() {
            private static final long serialVersionUID = 1L;

            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                TableRow row = c.element();
                String uid = (String) row.get("account_uid");
                c.output(KV.of(uid, row));
            }
        }));
final PCollectionView<Map<String, TableRow>> uidAccountView = kvAccounts.apply(View.<String, TableRow>asMap());

// Add account_id from account_uid to event data
PCollection<TableRow> rowsWithAccountID = rows.apply("Join account_id",
        ParDo.of(new DoFn<TableRow, TableRow>() {
            private static final long serialVersionUID = 1L;

            @ProcessElement
            public void processElement(ProcessContext c) throws Exception {
                TableRow row = c.element();

                if (row.containsKey("account_uid") && row.get("account_uid") != null) {
                    String uid = (String) row.get("account_uid");
                    TableRow accRow = (TableRow) c.sideInput(uidAccountView).get(uid);
                    if (accRow == null) {
                        LOG.warn("accRow null, {}", row.toPrettyString());
                    } else {
                        row = row.set("account_id", accRow.get("account_id"));
                    }
                }
                c.output(row);
            }
        }).withSideInputs(uidAccountView));

// Insert into BigQuery
WriteResult result = rowsWithAccountID.apply(BigQueryIO.writeTableRows()
        .to(new TableRefPartition(StaticValueProvider.of("MYDATA"), StaticValueProvider.of("dev"),
                StaticValueProvider.of("deadletter_bucket")))
        .withFormatFunction(new SerializableFunction<TableRow, TableRow>() {
            private static final long serialVersionUID = 1L;

            @Override
            public TableRow apply(TableRow row) {
                return row;
            }
        }).withCreateDisposition(CreateDisposition.CREATE_NEVER)
        .withWriteDisposition(WriteDisposition.WRITE_APPEND));

p.run();

Historically my system have two identifiers of users, new one (account_id) and old one(account_uid). Now I need to add new account_id to our event data stored in BigQuery retroactively, because old data only has old account_uid. Accounts table (which has relation between account_uid and account_id) is already converted as csv and stored in GCS.

The last func TableRefPartition just store data into BQ's corresponding partition depending on each event timestamp. The job is still running (2017-10-30_22_45_59-18169851018279768913) and bottleneck looks Join account_id part. That part of throughput (xxx elements/s) goes up and down according to the graph. According to the graph, estimated size of sideInputs is 106MB.

If switching to CoGroupByKey improves performance dramatically, I will do so. I was just lazy and thought using sideInputs is easier to handle event data which doesn't have account info as well.

2条回答
劳资没心,怎么记你
2楼-- · 2019-08-18 15:18

There's a few things you can do to improve the performance of your code:

  • Your side input is a Map<String, TableRow>, but you're using only a single field in the TableRow - accRow.get("account_id"). How about making it a Map<String, String> instead, having the value be the account_id itself? That'll likely be quite a bit more efficient than the bulky TableRow object.
  • You could extract the value of the side input into a lazily initialized member variable in your DoFn, to avoid repeated invocations of .sideInput().

That said, this performance is unexpected and we are investigating whether there's something else going on.

查看更多
放我归山
3楼-- · 2019-08-18 15:32

Try one of:

1) setting the option using some code:

options.as(DataflowWorkerHarnessOptions.class).setWorkerCacheMb(500);

2) having your application register DataflowWorkerHarnessOptions with the PipelineOptionsFactory

3) Having your own options class extend DataflowWorkerHarnessOptions

查看更多
登录 后发表回答