I'm trying to read a RabbitMQ queue using Apache Beam. I've written some transformation code to have messages written to Kafka. I've even tested my scenario using simple text messages.
Now I try to deploy it with the effective messages this transformer is made to run on. These are JSON message of a quite moderate size.
Strangely, when i try to read "production" messages, I get this exception stack trace.
java.lang.IllegalArgumentException: Unable to encode element 'ValueWithRecordId{id=[], value=org.apache.beam.sdk.io.rabbitmq.RabbitMqMessage@f179a7f}' with coder 'ValueWithRecordId$ValueWithRecordIdCoder(org.apache.beam.sdk.coders.SerializableCoder@76190fb2)'.
org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:300)
org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:564)
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:480)
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:400)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125)
org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:64)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1283)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:147)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1020)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
java.util.HashMap.internalWriteEntries(HashMap.java:1785)
java.util.HashMap.writeObject(HashMap.java:1362)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:183)
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:53)
org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:105)
org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:99)
org.apache.beam.sdk.values.ValueWithRecordId$ValueWithRecordIdCoder.encode(ValueWithRecordId.java:81)
org.apache.beam.sdk.coders.Coder.getEncodedElementByteSize(Coder.java:297)
org.apache.beam.sdk.coders.Coder.registerByteSizeObserver(Coder.java:291)
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:564)
org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.registerByteSizeObserver(WindowedValue.java:480)
org.apache.beam.runners.dataflow.worker.IntrinsicMapTaskExecutorFactory$ElementByteSizeObservableCoder.registerByteSizeObserver(IntrinsicMapTaskExecutorFactory.java:400)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputObjectAndByteCounter.update(OutputObjectAndByteCounter.java:125)
org.apache.beam.runners.dataflow.worker.DataflowOutputCounter.update(DataflowOutputCounter.java:64)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:43)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:201)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1283)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:147)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:1020)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
My understanding is that the RabbitMQ reader consider the messages big enough to require the use of LongString
, which is not serializable.
Am I right on this point ? And if so, how do I suggest RabbitMQ to use a simple String (which will be enough for these messages) ?