Im using Flink (latest via git) to stream from kafka to cassandra. To ease unit testing Im adding dependency injection via Dagger.
The ObjectGraph seems to be setting itself up properly but the 'inner objects' are being flagged as 'not serializable' by Flink. If I include these objects directly they work - so what's the difference?
Class in question implements MapFunction and @Inject a module for cassandra and one for reading config files.
Is there a way to build this so I can use late binding or does Flink make this impossible?
Edit:
fwiw - Dependency injection (via dagger) and RichMapFunction can't coexist. Dagger won't let you include any objects that have extends in their definition.
Further:
Objects instantiated via Dagger Lazy<t> won't serialize either.
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Object com.someapp.SaveMap@2e029d61 not serializable
...
Caused by: java.io.NotSerializableException: dagger.internal.LazyBinding$1
Before diving into the specifics of the question, a bit of background on serializability of functions in Apache Flink:
Serializability
Apache Flink uses Java Serialization (java.io.Serializable) to ship the function objects (here the
MapFunction
) to the workers that execute them in parallel. Because of that, the functions need to be serializable: The function may not contain any non-serializable fields, i.e. types that are not primitive (int, long, double, ...) and not implementingjava.io.Serializable
.The typical way to work with non-serializable constructs is to lazily initialize them.
Lazy Initialization
One way to use non-serializable types in Flink functions is to lazily initialize them. The fields that hold these types are still
null
when the function is serialized to be shipped, and only set after the function has been deserialized by the workers.In Scala, you can simply use lazy fields, for example
lazy val x = new NonSerializableType()
. TheNonSerializableType
type is actually only created upon first access to the variablex
, which is usually on the worker. Consequently, the type can be non serializable, becausex
is null when the function is serialized to shipping to the workers.In Java, you can initialize the non-serializable fields on the
open()
method of the function, if you make it a Rich Function. Rich functions (likeRichMapFunction
) are extended versions of basic functions (hereMapFunction
) and give you access to life-cycle methods likeopen()
andclose()
.Lazy Dependency Injections
I am not too familiar with dependency injection, but dagger seems to provide something like a lazy dependency as well, which may help as a workaround quite like lazy variables in Scala:
I faced a similar issue. There are 2 ways to not deserialize your dependency.
Make your dependency static, but it is not always possible. It can also mess your code design.
Use Transient: By declaring your dependency as transient you are saying that they are not part of the persistent state of an object, and should not be part of serialization.
This is especially useful when you are using external libraries, whose implementations cannot be changed by you to make them serializable.