How to Work with RabbitMQ Data in AWS Glue Jobs Using JDBC

Jerod Johnson
Jerod Johnson
Director, Technology Evangelism
Connect to RabbitMQ from AWS Glue jobs using the CData JDBC Driver hosted in Amazon S3.

AWS Glue is an ETL service from Amazon that allows you to easily prepare and load your data for storage and analytics. Using the PySpark module along with AWS Glue, you can create jobs that work with data over JDBC connectivity, loading the data directly into AWS data stores. In this article, we walk through uploading the CData JDBC Driver for RabbitMQ into an Amazon S3 bucket and creating and running an AWS Glue job to extract RabbitMQ data and store it in S3 as a CSV file.

Upload the CData JDBC Driver for RabbitMQ to an Amazon S3 Bucket

In order to work with the CData JDBC Driver for RabbitMQ in AWS Glue, you will need to store it (and any relevant license files) in an Amazon S3 bucket.

  1. Open the Amazon S3 Console.
  2. Select an existing bucket (or create a new one).
  3. Click Upload
  4. Select the JAR file (cdata.jdbc.api.jar) found in the lib directory in the installation location for the driver.

Configure the Amazon Glue Job

  1. Navigate to ETL -> Jobs from the AWS Glue Console.
  2. Click Add Job to create a new Glue job.
  3. Fill in the Job properties:
    • Name: Fill in a name for the job, for example: APIGlueJob.
    • IAM Role: Select (or create) an IAM role that has the AWSGlueServiceRole and AmazonS3FullAccess permissions policies. The latter policy is necessary to access both the JDBC Driver and the output destination in Amazon S3.
    • Type: Select "Spark".
    • Glue Version: Select "Spark 2.4, Python 3 (Glue Version 1.0)".
    • This job runs: Select "A new script to be authored by you".
      Populate the script properties:
      • Script file name: A name for the script file, for example: GlueAPIJDBC
      • S3 path where the script is stored: Fill in or browse to an S3 bucket.
      • Temporary directory: Fill in or browse to an S3 bucket.
    • Expand Security configuration, script libraries and job parameters (optional). For Dependent jars path, fill in or browse to the S3 bucket where you uploaded the JAR file. Be sure to include the name of the JAR file itself in the path, i.e.: s3://mybucket/cdata.jdbc.api.jar
  4. Click Next. Here you will have the option to add connection to other AWS endpoints. So, if your Destination is Redshift, MySQL, etc, you can create and use connections to those data sources.
  5. Click "Save job and edit script" to create the job.
  6. In the editor that opens, write a python script for the job. You can use the sample script (see below) as an example.

Sample Glue Script

To connect to RabbitMQ using the CData JDBC driver, you will need to create a JDBC URL, populating the necessary connection properties. Additionally, you will need to set the RTK property in the JDBC URL (unless you are using a Beta driver). You can view the licensing file included in the installation for information on how to set this property.

About RabbitMQ Management HTTP API

RabbitMQ is an open-source message broker that supports multiple messaging protocols. The RabbitMQ Management HTTP API provides HTTP-based access to management and monitoring data for a RabbitMQ server. The API exposes information about virtual hosts, exchanges, queues, bindings, connections, channels, consumers, users, permissions, policies, and cluster-wide statistics.

The Management plugin must be enabled on the RabbitMQ server for the HTTP API to be available. By default, the management interface listens on port 15672.

Using Basic Authentication

RabbitMQ Management HTTP API uses HTTP Basic authentication. You must supply the username and password of a RabbitMQ management user.

To enable access to the management API:

  1. Ensure the RabbitMQ Management plugin is enabled on your server (rabbitmq-plugins enable rabbitmq_management).
  2. Use an existing management user or create one with the appropriate management tag (management, policymaker, monitoring, or administrator).
  3. Note the full base URL of your RabbitMQ Management HTTP API (e.g., http://localhost:15672).

After configuring your RabbitMQ server, set the following connection properties to connect:

  • AuthScheme: Set this to Basic.
  • URL: Set this to the base URL of your RabbitMQ Management HTTP API (e.g., http://localhost:15672).
  • User: Set this to your RabbitMQ management username (e.g., guest).
  • Password: Set this to your RabbitMQ management password.

Example connection string:

Profile=C:\profiles\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;

Available Tables

The RabbitMQ profile provides access to the following tables:

  • Overview - Cluster-wide statistics and information about the RabbitMQ node
  • Nodes - Information about individual nodes in the RabbitMQ cluster
  • NodeMemory - Detailed memory usage breakdown for a specific cluster node
  • Connections - List of all open AMQP connections to the broker
  • Channels - List of all open AMQP channels across all connections
  • Consumers - List of all consumers registered across all queues
  • Exchanges - List of exchanges declared across all virtual hosts
  • Queues - List of queues declared across all virtual hosts
  • Bindings - List of all bindings between exchanges and queues
  • VirtualHosts - List of virtual hosts configured on the broker
  • VhostPermissions - User permissions within a specific virtual host
  • Users - List of all RabbitMQ users
  • Permissions - Permission records for all users across all virtual hosts
  • TopicPermissions - Topic-level permission records for all users
  • Policies - List of policies applied to queues and exchanges in virtual hosts
  • OperatorPolicies - List of operator policies applied to queues in virtual hosts
  • Parameters - List of component parameters (e.g., federation, shovel) per virtual host
  • GlobalParameters - List of global parameters that apply across all virtual hosts
  • VhostLimits - Resource limits configured for specific virtual hosts
  • UserLimits - Resource limits configured for specific users
  • FeatureFlags - List of feature flags and their enabled/disabled state on the node
  • DeprecatedFeatures - List of deprecated features and their usage state
  • AuthAttempts - Authentication attempt statistics for the node
  • ClusterName - The name of the RabbitMQ cluster
  • WhoAmI - Information about the currently authenticated management user
  • ExchangeBindingsSource - Bindings for which a specific exchange is the source
  • ExchangeBindingsDestination - Bindings for which a specific exchange is the destination
  • QueueBindings - Bindings for a specific queue within a virtual host

Built-in Connection String Designer

For assistance in constructing the JDBC URL, use the connection string designer built into the RabbitMQ JDBC Driver. Either double-click the JAR file or execute the JAR file from the command-line.

java -jar cdata.jdbc.api.jar

Fill in the connection properties and copy the connection string to the clipboard.

To host the JDBC driver in Amazon S3, you will need a license (full or trial) and a Runtime Key (RTK). For more information on obtaining this license (or a trial), contact our sales team.

Below is a sample script that uses the CData JDBC driver with the PySpark and AWSGlue modules to extract RabbitMQ data and write it to an S3 bucket in CSV format. Make any necessary changes to the script to suit your needs and save the job.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sparkContext = SparkContext()
glueContext = GlueContext(sparkContext)
sparkSession = glueContext.spark_session

##Use the CData JDBC driver to read RabbitMQ data from the AuthAttempts table into a DataFrame
##Note the populated JDBC URL and driver class name
source_df = sparkSession.read.format("jdbc").option("url","jdbc:api:RTK=5246...;Profile=C:\profiles\\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;").option("dbtable","AuthAttempts").option("driver","cdata.jdbc.api.APIDriver").load()

glueJob = Job(glueContext)
glueJob.init(args['JOB_NAME'], args)

##Convert DataFrames to AWS Glue's DynamicFrames Object
dynamic_dframe = DynamicFrame.fromDF(source_df, glueContext, "dynamic_df")

##Write the DynamicFrame as a file in CSV format to a folder in an S3 bucket. 
##It is possible to write to any Amazon data store (SQL Server, Redshift, etc) by using any previously defined connections.
retDatasink4 = glueContext.write_dynamic_frame.from_options(frame = dynamic_dframe, connection_type = "s3", connection_options = {"path": "s3://mybucket/outfiles"}, format = "csv", transformation_ctx = "datasink4")

glueJob.commit()

Run the Glue Job

With the script written, we are ready to run the Glue job. Click Run Job and wait for the extract/load to complete. You can view the status of the job from the Jobs page in the AWS Glue Console. Once the Job has succeeded, you will have a CSV file in your S3 bucket with data from the RabbitMQ AuthAttempts table.

Using the CData JDBC Driver for RabbitMQ in AWS Glue, you can easily create ETL jobs for RabbitMQ data, whether writing the data to an S3 bucket or loading it into any other AWS data store.

Ready to get started?

Connect to live data from RabbitMQ with the API Driver

Connect to RabbitMQ