Im trying to make a PySpark connection to Cassandra DB indexed with Geomesa. Searching about it, I noticed that it uses the Geotools spark runtime since there is no optimized runtime for Cassandra. I'm struggling searching about the keys and values I may used for this connection. My code is returning that it does not find a SpatialRDD.
Code
import geomesa_pyspark
print("Configuring")
conf = geomesa_pyspark.configure(
jars=['path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar'],
packages=['geomesa_pyspark', 'pytz'],
spark_home='path/to/spark'). \
setAppName('MyTestApp')
conf.get('spark.master')
print(f"Configuration set: {conf.getAll()}")
from pyspark.sql import SparkSession
print("Imported Pyspark")
spark = SparkSession.builder.config(
conf=conf
).config(
"spark.driver.extraClassPath",
"path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar"
).config(
"spark.executor.extraClassPath",
"path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar"
).enableHiveSupport().getOrCreate()
print("SparkSession UP")
params = {
"geotools": "true",
"dbtype": "cassandradb", # Also tried with "cassandra"
"cassandra.contact.point": "contact_point",
"cassandra.username": "user",
"cassandra.password": "password",
"cassandra.keyspace": "keyspace",
"cassandra.catalog": "catalog",
"geomesa.feature": "feature"
}
print("Spark loading")
df = spark.read.format("geomesa").options(**params).load()
print("Loaded DB")
df.createOrReplaceTempView("tbl")
spark.sql("show tables").show()
Response
Configuring
Configuration set: [('spark.master', 'yarn'), ('spark.yarn.dist.jars', 'path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar'), ('spark.yarn.dist.files', '/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip,/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip,/tmp/spark-python-3.9/geomesa_pyspark-5.2.0.zip,/tmp/spark-python-3.9/pytz-2025.1.zip'), ('spark.executorEnv.PYTHONPATH', 'py4j-0.10.9.7-src.zip:pyspark.zip:geomesa_pyspark-5.2.0.zip:pytz-2025.1.zip'), ('spark.executorEnv.PYSPARK_PYTHON', '/usr/bin/python'), ('spark.driver.extraClassPath', 'path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar'), ('spark.app.name', 'MyTestApp')]
Imported Pyspark
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/04 21:13:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/02/04 21:13:25 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
25/02/04 21:13:27 WARN Client: Same path resource file:/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip added multiple times to distributed cache.
25/02/04 21:13:27 WARN Client: Same path resource file:/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip added multiple times to distributed cache.
SparkSession UP
Spark loading
25/02/04 21:13:39 WARN ServiceLoader$: Using a context ClassLoader that does not contain the class to load (org.locationtech.geomesa.index.view.MergedViewConfigLoader): org.apache.spark.util.MutableURLClassLoader@1c4ee95c
25/02/04 21:13:39 WARN ServiceLoader$: Using a context ClassLoader that does not contain the class to load (org.locationtech.geomesa.index.view.RouteSelector): org.apache.spark.util.MutableURLClassLoader@1c4ee95c
Traceback (most recent call last):
File "/home/rocky/test2.py", line 46, in <module>
df = spark.read.format("geomesa").options(**params).load()
File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 314, in load
return self._df(self._jreader.load())
File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
return f(*a, **kw)
File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o50.load.
: java.lang.RuntimeException: Could not find a SpatialRDDProvider
at org.locationtech.geomesa.spark.GeoMesaSpark$.$anonfun$apply$2(GeoMesaSpark.scala:20)
at scala.Option.getOrElse(Option.scala:189)
at org.locationtech.geomesa.spark.GeoMesaSpark$.apply(GeoMesaSpark.scala:20)
at org.locationtech.geomesa.spark.sql.GeoMesaRelation$.apply(GeoMesaRelation.scala:162)
at org.locationtech.geomesa.spark.sql.GeoMesaDataSource.createRelation(GeoMesaDataSource.scala:45)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
I had checked documentation and made some test with the keys but it still returning this error. I also had passed through some other errors about Hadoop and Java config.
Jars mentioned on the comment.
"org.locationtech.geomesa:geomesa-gt-spark-runtime_2.12:5.2.0,"
"com.datastax.cassandra:cassandra-driver-core:4.0.0,"
"org.locationtech.geomesa:geomesa-cassandra-datastore_2.12:5.2.0,"
"com.datastax.spark:spark-cassandra-connector_2.12:3.5.1,"
"org.locationtech.geomesa:geomesa-cassandra_2.12:5.2.0,"
"org.locationtech.geomesa:geomesa-cassandra-gs-plugin_2.12:5.2.0,"
"org.locationtech.geomesa:geomesa-cassandra-dist_2.12:5.2.0,"
"org.locationtech.geomesa:geomesa-spark-jts_2.12:5.2.0,"
"org.locationtech.geomesa:geomesa-spark-core_2.12:5.2.0,"
"org.locationtech.geomesa:geomesa-cassandra-tools_2.12:5.2.0"