There are lots of online examples of reading json from Kafka (to write to parquet) - but I cannot figure out how to apply a schema to a CSV string from kafka.
The streamed data:
customer_1945,cusaccid_995,27999941
customer_1459,cusaccid_1102,27999942
The schema:
schema = StructType() \
.add("customer_id",StringType()) \
.add("customer_acct_id",StringType()) \
.add("serv_acct_id",StringType())
Read the stream:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "xx.xx.xx.xx:9092") \
.option("subscribe", "test") \
.load()
I used this for JSON:
interval=df \
.select(from_json(col("value").cast("string"), schema).alias("json")) \
.select("json.*")
Before writing it to parquet with the assigned schema:
query=interval \
.writeStream \
.format("parquet") \
.option("checkpointLocation", "/user/whatever/checkpoint24") \
.start("/user/ehatever/interval24")
As I cannot use from_json() for CSV - I dont know how to apply the schema to the dataframe so that I can use a similar writeStream() command.