I am testing out NiFi to replace our current ingestion setup which imports data from multiple MySQL shards of a table and store it in HDFS.
I am using GenerateTableFetch
and ExecuteSQL
to achieve this.
Each incoming flow file will have a database.name
attribute which is being used by DBCPConnectionPoolLookup
to select the relevant shard.
Issue is that, let's say I have 2 shards to pull data from, shard_1
and shard_2
for table accounts and also I have updated_at
as Maximum Value Columns
, it is not storing state for the for the table@updated_at
per shard. There is only 1 entry per table in state.
When I check in Data Provenance, I see the shard_2 flowfile file getting dropped without being passed to ExecuteSQL. And my guess is it's because shard_1 query gets executed first and then when shard_2 query comes, it's records are checked against shard_1's updated_at and since it returns empty, it drops the file.
Has anyone faced this issue? Or am I missing something?