spark streaming: read CSV string from kafka, write

There are lots of online examples of reading json from Kafka (to write to parquet) - but I cannot figure out how to apply a schema to a CSV string from kafka.

The streamed data:


The schema:

schema = StructType() \
.add("customer_id",StringType()) \
.add("customer_acct_id",StringType()) \

Read the stream:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "xx.xx.xx.xx:9092") \
  .option("subscribe", "test") \

I used this for JSON:

interval=df \
  .select(from_json(col("value").cast("string"), schema).alias("json")) \

Before writing it to parquet with the assigned schema:

query=interval     \
  .writeStream  \
  .format("parquet") \
  .option("checkpointLocation", "/user/whatever/checkpoint24") \

As I cannot use from_json() for CSV - I dont know how to apply the schema to the dataframe so that I can use a similar writeStream() command.


This is how I did it. Without from_json, extract the csv string:"value").cast("string")) .alias("csv").select("csv.*")

And then split it into columns. This can be written as a parquet file using the same statement above

interval2=interval \
      .selectExpr("split(value,',')[0] as customer_id" \
                 ,"split(value,',')[1] as customer_acct_id" \
                 ,"split(value,',')[2] as serv_acct_id" \
                 ,"split(value,',')[3] as installed_service_id" \
                 ,"split(value,',')[4] as meter_id" \
                 ,"split(value,',')[5] as channel_number" \
                 ... etc