It may seem stupid but it is my very first post here. Sorry for doing anything wrong.
I am currently building a simple ML pipeline with TFX 0.11 (i.e. tfdv-tft-tfserving) and tensorflow 1.11, using python2.7. I currently have a apache-flink cluster and I want to use that for TFX. I know the framework behind TFX is apache-beams 2.8, and it (apache-beams) supports flink with python SDK currently through a portable runner layer.
But the problem is how I can code in TFX (tfdv-tft) using apache-beams with flink runner through this portable runner concept, as TFX currently seems to only support DirectRunner and DataFlowRunner (Google Cloud).
I have been searching through the web for some time, and see the last line in TFX website,
"Please direct any questions about working with tf.Transform to Stack Overflow using the tensorflow-transform tag."
And that's why I am here. Any idea or workaround is really appreciated. Thank you!
Thanks for the question.
Disclaimer: Portable Flink Runner is still in experimental phase will only work with trivial amount of input data.
Here is how you can run TFX on Flink via Beam.
Prerequisite
- Linux
- Docker
- Beam Repo: https://github.com/apache/beam
- Distributed file system for input and output.
Instructions to run a python pipeline: https://beam.apache.org/roadmap/portability/#python-on-flink
Note: We currently only support Flink 1.5.5
Instructions
1) Build Worker Containers:
- Go to Beam checkout dir
- Run gradle command: ./gradlew :beam-sdks-python-container:docker
2) Run Beam JobServer for Flink:
- Go to Beam checkout dir
- Run gradle command: ./gradlew beam-runners-flink_2.11-job-server:runShadow
Note: this command will not finish as it starts the job server and keep it running.
3) Submit a pipeline
- Please refer to https://github.com/angoenka/model-analysis/blob/hack_1/examples/chicago_taxi/preprocess_flink.sh
- Note: make sure to pass following flags to your pipeline
--experiments=beam_fn_api
--runner PortableRunner
--job_endpoint=localhost:8099
--experiments=worker_threads=100
--execution_mode_for_batch=BATCH_FORCED