How to read from multiple topics using spark readStream() which having different schemas, and writeStream() to a single topic using Spark StructedSchema.

Note: Each input topic having different schema

  • Probably you will have to join your streaming datasets into your desired dataset and then do writeStream. An important remark is joining of streaming datasets were introduced in Spark 2.3. – Krzysztof Atłasik Jun 12 at 13:17
  • if you are using scala you could take a functional approach to this. case class SchemaA extends MySchema, case class SchemaB extends MySchema.. then after readStream Try to map the input to each of these classes using pattern matching – jf2010 Jun 12 at 13:30
  • @KrzysztofAtłasik, just want to append processed result in a single stream, how to perform join with that? – Karthikeyan Jun 12 at 14:00
  • what is message type avro ? – Ram Ghadiyaram Jun 12 at 16:36
  • next time pls ask question in detail with examples and details – user3190018 Jun 13 at 17:29

How to read from multiple topics using spark readStream() which having different schemas, and writeStream() to a single topic using Spark StructedSchema ?

I am giving general idea or pointers here .... may suite your case.

I assume you are using avro messages, there are 2 topics one for message and another one is for schema I am reffering as message topic and schema topic.

Now, prepare a generic row wrapper schema say avro_yourrow_wrapper.avsc which holds different schema messages(since you told each message has different schema).for example... modify this sample as per your requirements.

  "type" : "record",
  "name" : "generic_schema",
  "namespace" : "yournamespace",
  "fields" : [ {
    "name" : "messagenameOrTableNames",
    "type" : "string"
  }, {
    "name" : "schema",
    "type" : "long"
  }, {
    "name" : "payload",
    "type" : "bytes"
  } ]

save it to file called avro_yourrow_wrapper.avsc since its static...

// Read the wrapper schema in your consumer.
    val inputStream = getClass.getResourceAsStream("avro_yourrow_wrapper.avsc")
    val source = scala.io.Source.fromInputStream(inputStream)
    val wrapperSchema = try source.mkString finally source.close()

from spark structured stream you will get a dataframe. read the wrapper schema based on type of message apply record specific schema by reading schema topic and message topic read the avro message.

Now using twitter bijection api (with GenericRecord) you can decode the message in to readable format.

sample pseudo code snippet :

import com.twitter.bijection.Injection
        import com.twitter.bijection.avro.GenericAvroCodecs
        import org.apache.avro.generic.GenericRecord
        val schema = new Schema.Parser().parse(localschema.get( recordlevelschema).get)
        val recordInjection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
        val record: GenericRecord = recordInjection.invert(bytes).get
        log.info("record.getSchema" +record.getSchema)
        record.getSchema.getFields.toArray().foreach(x =>log.info(x.toString))

and then you can write in to seperate topic as you wish.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service, privacy policy and cookie policy

Not the answer you're looking for? Browse other questions tagged or ask your own question.