How to work with RabbitMQ Data in Apache Spark using SQL
Apache Spark is a fast and general engine for large-scale data processing. When paired with the CData JDBC Driver for RabbitMQ, Spark can work with live RabbitMQ data. This article describes how to connect to and query RabbitMQ data from a Spark shell.
The CData JDBC Driver offers unmatched performance for interacting with live RabbitMQ data due to optimized data processing built into the driver. When you issue complex SQL queries to RabbitMQ, the driver pushes supported SQL operations, like filters and aggregations, directly to RabbitMQ and utilizes the embedded SQL engine to process unsupported operations (often SQL functions and JOIN operations) client-side. With built-in dynamic metadata querying, you can work with and analyze RabbitMQ data using native data types.
Install the CData JDBC Driver for RabbitMQ
Download the CData JDBC Driver for RabbitMQ installer, unzip the package, and run the JAR file to install the driver.
Start a Spark Shell and Connect to RabbitMQ Data
- Open a terminal and start the Spark shell with the CData JDBC Driver for RabbitMQ JAR file as the jars parameter:
$ spark-shell --jars /CData/CData JDBC Driver for RabbitMQ/lib/cdata.jdbc.api.jar
- With the shell running, you can connect to RabbitMQ with a JDBC URL and use the SQL Context load() function to read a table.
About RabbitMQ Management HTTP API
RabbitMQ is an open-source message broker that supports multiple messaging protocols. The RabbitMQ Management HTTP API provides HTTP-based access to management and monitoring data for a RabbitMQ server. The API exposes information about virtual hosts, exchanges, queues, bindings, connections, channels, consumers, users, permissions, policies, and cluster-wide statistics.
The Management plugin must be enabled on the RabbitMQ server for the HTTP API to be available. By default, the management interface listens on port 15672.
Using Basic Authentication
RabbitMQ Management HTTP API uses HTTP Basic authentication. You must supply the username and password of a RabbitMQ management user.
To enable access to the management API:
- Ensure the RabbitMQ Management plugin is enabled on your server (rabbitmq-plugins enable rabbitmq_management).
- Use an existing management user or create one with the appropriate management tag (management, policymaker, monitoring, or administrator).
- Note the full base URL of your RabbitMQ Management HTTP API (e.g., http://localhost:15672).
After configuring your RabbitMQ server, set the following connection properties to connect:
- AuthScheme: Set this to Basic.
- URL: Set this to the base URL of your RabbitMQ Management HTTP API (e.g., http://localhost:15672).
- User: Set this to your RabbitMQ management username (e.g., guest).
- Password: Set this to your RabbitMQ management password.
Example connection string:
Profile=C:\profiles\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;
Available Tables
The RabbitMQ profile provides access to the following tables:
- Overview - Cluster-wide statistics and information about the RabbitMQ node
- Nodes - Information about individual nodes in the RabbitMQ cluster
- NodeMemory - Detailed memory usage breakdown for a specific cluster node
- Connections - List of all open AMQP connections to the broker
- Channels - List of all open AMQP channels across all connections
- Consumers - List of all consumers registered across all queues
- Exchanges - List of exchanges declared across all virtual hosts
- Queues - List of queues declared across all virtual hosts
- Bindings - List of all bindings between exchanges and queues
- VirtualHosts - List of virtual hosts configured on the broker
- VhostPermissions - User permissions within a specific virtual host
- Users - List of all RabbitMQ users
- Permissions - Permission records for all users across all virtual hosts
- TopicPermissions - Topic-level permission records for all users
- Policies - List of policies applied to queues and exchanges in virtual hosts
- OperatorPolicies - List of operator policies applied to queues in virtual hosts
- Parameters - List of component parameters (e.g., federation, shovel) per virtual host
- GlobalParameters - List of global parameters that apply across all virtual hosts
- VhostLimits - Resource limits configured for specific virtual hosts
- UserLimits - Resource limits configured for specific users
- FeatureFlags - List of feature flags and their enabled/disabled state on the node
- DeprecatedFeatures - List of deprecated features and their usage state
- AuthAttempts - Authentication attempt statistics for the node
- ClusterName - The name of the RabbitMQ cluster
- WhoAmI - Information about the currently authenticated management user
- ExchangeBindingsSource - Bindings for which a specific exchange is the source
- ExchangeBindingsDestination - Bindings for which a specific exchange is the destination
- QueueBindings - Bindings for a specific queue within a virtual host
Built-in Connection String Designer
For assistance in constructing the JDBC URL, use the connection string designer built into the RabbitMQ JDBC Driver. Either double-click the JAR file or execute the jar file from the command-line.
java -jar cdata.jdbc.api.jar
Fill in the connection properties and copy the connection string to the clipboard.
Configure the connection to RabbitMQ, using the connection string generated above.
scala> val api_df = spark.sqlContext.read.format("jdbc").option("url", "jdbc:api:Profile=C:\profiles\\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;").option("dbtable","AuthAttempts").option("driver","cdata.jdbc.api.APIDriver").load() - Once you connect and the data is loaded you will see the table schema displayed.
Register the RabbitMQ data as a temporary table:
scala> api_df.registerTable("authattempts")-
Perform custom SQL queries against the Data using commands like the one below:
scala> api_df.sqlContext.sql("SELECT , FROM AuthAttempts WHERE NodeName = rabbit@hostname").collect.foreach(println)You will see the results displayed in the console, similar to the following:
Using the CData JDBC Driver for RabbitMQ in Apache Spark, you are able to perform fast and complex analytics on RabbitMQ data, combining the power and utility of Spark with your data. Download a free, 30 day trial of any of the hundreds of CData JDBC Drivers and get started today.