I am trying to perform an insert into an Ignite cache from a Spark RDD. I'm using version 2.2 of Ignite and 2.1 of Spark.
The first step I take, is to create the cache in a separate scala script, like so:
object Create_Ignite_Cache {
case class Custom_Class(
@(QuerySqlField @field)(index = true) a: String,
@(QuerySqlField @field)(index = true) b: String,
@(QuerySqlField @field)(index = true) c: String,
@(QuerySqlField @field)(index = true) d: String,
@(QuerySqlField @field)(index = true) e: String,
@(QuerySqlField @field)(index = true) f: String,
@(QuerySqlField @field)(index = true) g: String,
@(QuerySqlField @field)(index = true) h: String
)
def main(args: Array[String]): Unit = {
val spi = new TcpDiscoverySpi
val ipFinder = new TcpDiscoveryMulticastIpFinder
val adresses = new util.ArrayList[String]
adresses.add("127.0.0.1:48500..48520")
ipFinder.setAddresses(adresses)
spi.setIpFinder(ipFinder)
val cfg = new IgniteConfiguration().setDiscoverySpi(spi).setClientMode(true)
val cache_conf = new CacheConfiguration[String, Custom_Class]().setCacheMode(CacheMode.PARTITIONED).setAtomicityMode(CacheAtomicityMode.ATOMIC).setBackups(1).setIndexedTypes(classOf[String], classOf[Custom_Class]).setName("Spark_Ignite")
val ignite = Ignition.getOrStart(cfg)
ignite.getOrCreateCache(cache_conf)
System.out.println("[INFO] CACHE CREATED")
ignite.close()
}
}
The cache is created successfully, as can be seen from the ignitevisor:
Next I ran a Spark app to insert the contents of an igniteRDD into the cache:
object Spark_Streaming_Processing {
case class Custom_Class(
@(QuerySqlField @field)(index = true) a: String,
@(QuerySqlField @field)(index = true) b: String,
@(QuerySqlField @field)(index = true) c: String,
@(QuerySqlField @field)(index = true) d: String,
@(QuerySqlField @field)(index = true) e: String,
@(QuerySqlField @field)(index = true) f: String,
@(QuerySqlField @field)(index = true) g: String,
@(QuerySqlField @field)(index = true) h: String
)
//START IGNITE CONTEXT
val addresses=new util.ArrayList[String]()
addresses.add("127.0.0.1:48500..48520")
val igniteContext:IgniteContext=new IgniteContext(sqlContext.sparkContext,()=>
new IgniteConfiguration().setDiscoverySpi(new TcpDiscoverySpi().setIpFinder(new TcpDiscoveryVmIpFinder().setAddresses(addresses))
).setCacheConfiguration(new CacheConfiguration[String,Custom_Class]()
.setName("Spark_Ignite").setBackups(1).setIndexedTypes(classOf[String],classOf[Custom_Class]))
,true)
println(igniteContext.ignite().cacheNames())
val ignite_cache_rdd:IgniteRDD[String,Custom_Class] =igniteContext.fromCache[String,Custom_Class]("Spark_Ignite")
val processed_Pair:RDD[(String,Custom_Class)]=(...)// rdd with data, which as you can see has the correct datatypes as parameters
ignite_cache_rdd.savePairs(processed_PairRDD)
}
}
As can be seen, the classes are completely identical.
After running the app successfully, I can see in ignitevisor that the cache contains 63 records, as can be seen in the previous screeshot of the console.
However, if I try to perform an sql query to the cache, like so:
ignite_cache_rdd.sql("select * from Custom_Class").show(truncate = false)
I get an empty table as a result.
The same thing happens if I query via an external sql server.
Curiously, if I don't create the cache apriori, and run the Spark app, the IgniteContext creates the cache if it doesnt exist and THEN I am able to see records in my queries!
What might be the problem here?
As far as I can tell the data types for both the key and values are exactly the same, so I should be able to see them when I query.
Thank you for your time.
The problem here is that you use different classes to create cache and insert data into it. Even though fields of these two classes match, their fully-qualified names are different, so these are two different classes.
If you want to be able to query data from SQL, you should use the same class during cache creation and insertion of data.
The reason why skipping cache creation solves the problem is that Spark app creates a cache itself instead of using an existing one. So, when Spark creates it, the class of the actual objects is used during cache creation.