数据流数据存储区+ = DatastoreException:I / O错误(Dataflow +

2019-09-29 01:38发布

我试图使用从数据流写入到数据存储com.google.cloud.datastore

我的代码看起来是这样的(通过实施例中的启发[1]):

public void processElement(ProcessContext c) {
    LocalDatastoreHelper HELPER = LocalDatastoreHelper.create(1.0);
    Datastore datastore = HELPER.options().toBuilder().namespace("ghijklmnop").build().service();
    Key taskKey = datastore.newKeyFactory()
        .ancestors(PathElement.of("TaskList", "default"))
        .kind("Task")
        .newKey("sampleTask");
    Entity task = Entity.builder(taskKey)
        .set("category", "Personal")
        .set("done", false)
        .set("priority", 4)
        .set("description", "Learn Cloud Datastore")
        .build();
    datastore.put(task);
}

我得到这个错误:

exception: "java.lang.RuntimeException: com.google.cloud.dataflow.sdk.util.UserCodeException: com.google.cloud.datastore.DatastoreException: I/O error
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:162)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnContext.sideOutputWindowedValue(DoFnRunnerBase.java:314)
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase$DoFnProcessContext.sideOutput(DoFnRunnerBase.java:470)
at com.google.cloud.dataflow.sdk.transforms.Partition$PartitionDoFn.processElement(Partition.java:172)

我曾尝试使用DatastoreIO下沉,但它看起来像它目前未流亚军支持。

我怎样才能避免这种错误? 或者什么是写从数据流数据存储区的最佳方法?

[1] https://github.com/GoogleCloudPlatform/java-docs-samples/blob/master/datastore/src/main/java/com/google/datastore/snippets/Concepts.java

Answer 1:

继@Sam McVeety建议,我试图隔离数据流之外的数据存储区的代码。 我确实得到了同样的错误!

但是,这也让我看到了异常,这是我没有在数据流日志中看到的原因:

Caused by: java.net.ConnectException: Connection refused

线索是,我用这个输入行: com.google.cloud.datastore.testing.LocalDatastoreHelper

它是一个测试是帮助本地需要护理基本嘲讽数据存储的API。 哎呀。

所以这是我的一些本地调试后,现在得到的代码:

public void processElement(ProcessContext c) {
    final Datastore datastore = DatastoreOptions.defaultInstance().service();
    final KeyFactory keyFactory = datastore.newKeyFactory().kind("Task");

    Key key = datastore.allocateId(keyFactory.newKey());
    Entity task = Entity.builder(key)
        .set("description", StringValue.builder(":D").excludeFromIndexes(true).build())
        .set("created", DateTime.now())
        .set("done", false)
        .build();
    datastore.put(task);
}

主要的区别是:

LocalDatastoreHelper.create(1.0).options().toBuilder().namespace("ghijklmnop").build().service()

DatastoreOptions.defaultInstance().service();


文章来源: Dataflow + Datastore = DatastoreException: I/O error