Stream RabbitMQ Data into Apache Kafka Topics

Dibyendu Datta
Dibyendu Datta
Lead Technology Evangelist
Access and stream RabbitMQ data in Apache Kafka using the CData JDBC Driver and the Kafka Connect JDBC connector.

Apache Kafka is an open-source stream processing platform that is primarily used for building real-time data pipelines and event-driven applications. When paired with the CData API Driver for JDBC, Kafka can work with live RabbitMQ data. This article describes how to connect, access and stream RabbitMQ data into Apache Kafka Topics and to start Confluent Control Center to help users secure, manage, and monitor the RabbitMQ data received using Kafka infrastructure in the Confluent Platform.

With built-in optimized data processing, the CData JDBC Driver offers unmatched performance for interacting with live RabbitMQ data. When you issue complex SQL queries to RabbitMQ, the driver pushes supported SQL operations, like filters and aggregations, directly to RabbitMQ and utilizes the embedded SQL engine to process unsupported operations client-side (often SQL functions and JOIN operations). Its built-in dynamic metadata querying allows you to work with and analyze RabbitMQ data using native data types.

Prerequisites

Before connecting the CData JDBC Driver for streaming RabbitMQ data in Apache Kafka Topics, install and configure the following in the client Linux-based system.

  1. Confluent Platform for Apache Kafka
  2. Confluent Hub CLI Installation
  3. Self-Managed Kafka JDBC Source Connector for Confluent Platform

Define a New JDBC Connection to RabbitMQ data

  1. Download CData API Driver for JDBC on a Linux-based system
  2. Follow the given instructions to create a new directory extract all the driver contents into it:
    1. Create a new directory named RabbitMQ
      		mkdir API
      		
    2. Move the downloaded driver file (.zip) into this new directory
      		mv APIJDBCDriver.zip API/
      		
    3. Unzip the CData APIJDBCDriver contents into this new directory
      		unzip APIJDBCDriver.zip
      		
  3. Open the RabbitMQ directory and navigate to the lib folder
    ls
    cd lib/
    
  4. Copy the contents of the lib folder of the CData API Driver for JDBC into the lib folder of Kafka Connect JDBC. Check the Kafka Connect JDBC folder contents to confirm that the cdata.jdbc.api.jar file is successfully copied into the lib folder
    cp -r /path/to/CData API Driver for JDBC/lib/* /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
    cd /usr/share/confluent-hub-components/confluentinc-kafka-connect-jdbc/lib/
    
  5. Install the CData RabbitMQ JDBC driver license using the given command, followed by your Name and Email ID
    	java -jar cdata.jdbc.api.jar -l
    	
  6. Enter the product key or "TRIAL" (In the scenarios of license expiry, please contact our CData Support team)
  7. Start the Confluent local services using the command:
    	confluent local services start
    	

    This starts all the Confluent Services like Zookeeper, Kafka, Schema Registry, Kafka REST, Kafka CONNECT, ksqlDB and Control Center. You are now ready to use the CData JDBC driver for RabbitMQ to stream messages using Kafka Connect Driver into Kafka Topics on ksqlDB.

    Start the Confluent local services
  8. Create the Kafka topics manually using a POST HTTP API Request:
     curl --location 'server_address:8083/connectors' 
    	--header 'Content-Type: application/json'
    	--data '{ 
    		"name": "jdbc_source_cdata_api_01", 
    		"config": { 
    			"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", 
    			"connection.url": "jdbc:api:Profile=C:\profiles\\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;",
    		"topic.prefix": "api-01-", 
    		"mode": "bulk" 
    		} 
    	}'
    

    Let us understand the fields used in the HTTP POST body (shown above):

    • connector.class: Specifies the Java class of the Kafka Connect connector to be used.
    • connection.url: The JDBC connection URL to connect with RabbitMQ data.

      Built-in Connection String Designer

      For assistance in constructing the JDBC URL, use the connection string designer built into the CData API Driver for JDBC. Either double-click the JAR file or execute the jar file from the command-line.

      		java -jar cdata.jdbc.api.jar
      		

      Fill in the connection properties and copy the connection string to the clipboard.

      About RabbitMQ Management HTTP API

      RabbitMQ is an open-source message broker that supports multiple messaging protocols. The RabbitMQ Management HTTP API provides HTTP-based access to management and monitoring data for a RabbitMQ server. The API exposes information about virtual hosts, exchanges, queues, bindings, connections, channels, consumers, users, permissions, policies, and cluster-wide statistics.

      The Management plugin must be enabled on the RabbitMQ server for the HTTP API to be available. By default, the management interface listens on port 15672.

      Using Basic Authentication

      RabbitMQ Management HTTP API uses HTTP Basic authentication. You must supply the username and password of a RabbitMQ management user.

      To enable access to the management API:

      1. Ensure the RabbitMQ Management plugin is enabled on your server (rabbitmq-plugins enable rabbitmq_management).
      2. Use an existing management user or create one with the appropriate management tag (management, policymaker, monitoring, or administrator).
      3. Note the full base URL of your RabbitMQ Management HTTP API (e.g., http://localhost:15672).

      After configuring your RabbitMQ server, set the following connection properties to connect:

      • AuthScheme: Set this to Basic.
      • URL: Set this to the base URL of your RabbitMQ Management HTTP API (e.g., http://localhost:15672).
      • User: Set this to your RabbitMQ management username (e.g., guest).
      • Password: Set this to your RabbitMQ management password.

      Example connection string:

      Profile=C:\profiles\RabbitMQ.apip;AuthScheme=Basic;URL=http://localhost:15672;User=guest;Password=guest;
      

      Available Tables

      The RabbitMQ profile provides access to the following tables:

      • Overview - Cluster-wide statistics and information about the RabbitMQ node
      • Nodes - Information about individual nodes in the RabbitMQ cluster
      • NodeMemory - Detailed memory usage breakdown for a specific cluster node
      • Connections - List of all open AMQP connections to the broker
      • Channels - List of all open AMQP channels across all connections
      • Consumers - List of all consumers registered across all queues
      • Exchanges - List of exchanges declared across all virtual hosts
      • Queues - List of queues declared across all virtual hosts
      • Bindings - List of all bindings between exchanges and queues
      • VirtualHosts - List of virtual hosts configured on the broker
      • VhostPermissions - User permissions within a specific virtual host
      • Users - List of all RabbitMQ users
      • Permissions - Permission records for all users across all virtual hosts
      • TopicPermissions - Topic-level permission records for all users
      • Policies - List of policies applied to queues and exchanges in virtual hosts
      • OperatorPolicies - List of operator policies applied to queues in virtual hosts
      • Parameters - List of component parameters (e.g., federation, shovel) per virtual host
      • GlobalParameters - List of global parameters that apply across all virtual hosts
      • VhostLimits - Resource limits configured for specific virtual hosts
      • UserLimits - Resource limits configured for specific users
      • FeatureFlags - List of feature flags and their enabled/disabled state on the node
      • DeprecatedFeatures - List of deprecated features and their usage state
      • AuthAttempts - Authentication attempt statistics for the node
      • ClusterName - The name of the RabbitMQ cluster
      • WhoAmI - Information about the currently authenticated management user
      • ExchangeBindingsSource - Bindings for which a specific exchange is the source
      • ExchangeBindingsDestination - Bindings for which a specific exchange is the destination
      • QueueBindings - Bindings for a specific queue within a virtual host
      Using the built-in connection string designer to generate a JDBC URL (Salesforce is shown.)
    • topic.prefix: A prefix that will be added to the Kafka topics created by the connector. It's set to "api-01-".
    • mode: Specifies the mode in which the connector operates. In this case, it's set to "bulk", which suggests that the connector is configured to perform bulk data transfer.

    This request adds all the tables/contents from RabbitMQ as Kafka Topics.

    Note: The IP Address (server) to POST the request (shown above) is the Linux Network IP Address.

  9. Run ksqlDB and list the topics. Use the commands:
    ksql
    list topics;
    
    List the Kafka Topics (BigCommerce is shown)
  10. To view the data inside the topics, type the SQL Statement:
    PRINT topic FROM BEGINNING;
    

Connecting with the Confluent Control Center

To access the Confluent Control Center user interface, ensure to run the "confluent local services" as described in the above section and type http://<server address>:9021/clusters/ on your local browser.

Connect with Confluent Control Center

Get Started Today

Download a free, 30-day trial of the CData API Driver for JDBC and start streaming RabbitMQ data into Apache Kafka. Reach out to our Support Team if you have any questions.

Ready to get started?

Connect to live data from RabbitMQ with the API Driver

Connect to RabbitMQ