Illustration Image

PySpark/Spark connection to Geomesa Cassandra DB

Im trying to make a PySpark connection to Cassandra DB indexed with Geomesa. Searching about it, I noticed that it uses the Geotools spark runtime since there is no optimized runtime for Cassandra. I'm struggling searching about the keys and values I may used for this connection. My code is returning that it does not find a SpatialRDD.

Code

import geomesa_pyspark

print("Configuring")
conf = geomesa_pyspark.configure(
    jars=['path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar'],
    packages=['geomesa_pyspark', 'pytz'],
    spark_home='path/to/spark'). \
    setAppName('MyTestApp')

conf.get('spark.master')

print(f"Configuration set: {conf.getAll()}")

from pyspark.sql import SparkSession

print("Imported Pyspark")

spark = SparkSession.builder.config(
    conf=conf
).config(
    "spark.driver.extraClassPath",
    "path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar"
).config(
    "spark.executor.extraClassPath",
    "path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar"
).enableHiveSupport().getOrCreate()

print("SparkSession UP")
params = {
    "geotools": "true",
    "dbtype": "cassandradb", # Also tried with "cassandra"
    "cassandra.contact.point": "contact_point",
    "cassandra.username": "user",
    "cassandra.password": "password",
    "cassandra.keyspace": "keyspace",
    "cassandra.catalog": "catalog",
    "geomesa.feature": "feature"
}

print("Spark loading")

df = spark.read.format("geomesa").options(**params).load()

print("Loaded DB")

df.createOrReplaceTempView("tbl")
spark.sql("show tables").show()

Response

Configuring
Configuration set: [('spark.master', 'yarn'), ('spark.yarn.dist.jars', 'path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar'), ('spark.yarn.dist.files', '/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip,/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip,/tmp/spark-python-3.9/geomesa_pyspark-5.2.0.zip,/tmp/spark-python-3.9/pytz-2025.1.zip'), ('spark.executorEnv.PYTHONPATH', 'py4j-0.10.9.7-src.zip:pyspark.zip:geomesa_pyspark-5.2.0.zip:pytz-2025.1.zip'), ('spark.executorEnv.PYSPARK_PYTHON', '/usr/bin/python'), ('spark.driver.extraClassPath', 'path/to/geomesa-gt-spark-runtime_2.12-5.2.0.jar'), ('spark.app.name', 'MyTestApp')]
Imported Pyspark
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/04 21:13:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/02/04 21:13:25 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
25/02/04 21:13:27 WARN Client: Same path resource file:/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip added multiple times to distributed cache.
25/02/04 21:13:27 WARN Client: Same path resource file:/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip added multiple times to distributed cache.
SparkSession UP
Spark loading
25/02/04 21:13:39 WARN ServiceLoader$: Using a context ClassLoader that does not contain the class to load (org.locationtech.geomesa.index.view.MergedViewConfigLoader): org.apache.spark.util.MutableURLClassLoader@1c4ee95c
25/02/04 21:13:39 WARN ServiceLoader$: Using a context ClassLoader that does not contain the class to load (org.locationtech.geomesa.index.view.RouteSelector): org.apache.spark.util.MutableURLClassLoader@1c4ee95c
Traceback (most recent call last):
  File "/home/rocky/test2.py", line 46, in <module>
    df = spark.read.format("geomesa").options(**params).load()
  File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/sql/readwriter.py", line 314, in load
    return self._df(self._jreader.load())
  File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
  File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
  File "/home/rocky/.local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o50.load.
: java.lang.RuntimeException: Could not find a SpatialRDDProvider
    at org.locationtech.geomesa.spark.GeoMesaSpark$.$anonfun$apply$2(GeoMesaSpark.scala:20)
    at scala.Option.getOrElse(Option.scala:189)
    at org.locationtech.geomesa.spark.GeoMesaSpark$.apply(GeoMesaSpark.scala:20)
    at org.locationtech.geomesa.spark.sql.GeoMesaRelation$.apply(GeoMesaRelation.scala:162)
    at org.locationtech.geomesa.spark.sql.GeoMesaDataSource.createRelation(GeoMesaDataSource.scala:45)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
    at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:172)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:829)

I had checked documentation and made some test with the keys but it still returning this error. I also had passed through some other errors about Hadoop and Java config.

Jars mentioned on the comment.

"org.locationtech.geomesa:geomesa-gt-spark-runtime_2.12:5.2.0," "com.datastax.cassandra:cassandra-driver-core:4.0.0," "org.locationtech.geomesa:geomesa-cassandra-datastore_2.12:5.2.0," "com.datastax.spark:spark-cassandra-connector_2.12:3.5.1," "org.locationtech.geomesa:geomesa-cassandra_2.12:5.2.0," "org.locationtech.geomesa:geomesa-cassandra-gs-plugin_2.12:5.2.0," "org.locationtech.geomesa:geomesa-cassandra-dist_2.12:5.2.0," "org.locationtech.geomesa:geomesa-spark-jts_2.12:5.2.0," "org.locationtech.geomesa:geomesa-spark-core_2.12:5.2.0," "org.locationtech.geomesa:geomesa-cassandra-tools_2.12:5.2.0"

Become part of our
growing community!
Welcome to Planet Cassandra, a community for Apache Cassandra®! We're a passionate and dedicated group of users, developers, and enthusiasts who are working together to make Cassandra the best it can be. Whether you're just getting started with Cassandra or you're an experienced user, there's a place for you in our community.
A dinosaur
Planet Cassandra is a service for the Apache Cassandra® user community to share with each other. From tutorials and guides, to discussions and updates, we're here to help you get the most out of Cassandra. Connect with us and become part of our growing community today.
© 2009-2023 The Apache Software Foundation under the terms of the Apache License 2.0. Apache, the Apache feather logo, Apache Cassandra, Cassandra, and the Cassandra logo, are either registered trademarks or trademarks of The Apache Software Foundation. Sponsored by Anant Corporation and Datastax, and Developed by Anant Corporation.

Get Involved with Planet Cassandra!

We believe that the power of the Planet Cassandra community lies in the contributions of its members. Do you have content, articles, videos, or use cases you want to share with the world?