Why is my RabbitMQ message impossible to serialize

2020-04-30 18:03发布

问题:

I'm trying to read a RabbitMQ queue using Apache Beam. I've written some transformation code to have messages written to Kafka. I've even tested my scenario using simple text messages.

Now I try to deploy it with the effective messages this transformer is made to run on. These are JSON message of a quite moderate size.

Strangely, when i try to read "production" messages, I get this exception stack trace.

java.lang.IllegalArgumentException: Unable to encode element 'ValueWithRecordId{id=[], value=org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage@f179a7f}' with coder 'ValueWithRecordId$ValueWithRecordIdCoder(org.apache.beam.sdk.coders.SerializableCoder@76190fb2)'.
        org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
        org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
        org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:564)
        org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:480)
        org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:400)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125)
        org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:64)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1283)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:147)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1020)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString
        java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
        java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        java.util.HashMap.internalWriteEntries(HashMap.java:1785)
        java.util.HashMap.writeObject(HashMap.java:1362)
        sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        java.lang.reflect.Method.invoke(Method.java:498)
        java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
        java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:183)
        org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:53)
        org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:105)
        org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:99)
        org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:81)
        org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
        org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
        org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:564)
        org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:480)
        org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:400)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125)
        org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:64)
        org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1283)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:147)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1020)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)

My understanding is that the RabbitMQ reader consider the messages big enough to require the use of LongString, which is not serializable.

Am I right on this point ? And if so, how do I suggest RabbitMQ to use a simple String (which will be enough for these messages) ?

回答1:

This is an Apache Beam (https://issues.apache.org/jira/browse/BEAM-7414) for which solution is ... not yet merged into Apache Beam repo by pure laziness (this is bad). If someone wants to have the fix immediatly, it is possible to build my branch https://github.com/Riduidel/beam/tree/fix/rabbitmq-message-not-serializable