Connecting Pipedream with Amazon Athena Data via CData Connect AI MCP Server

Somya Sharma
Somya Sharma
Technical Marketing Engineer
Leverage the CData Connect AI Remote MCP Server to enable Pipedream workflows to securely read and take actions on Amazon Athena data in real time.

Pipedream is a cloud-based workflow automation platform that allows developers to connect APIs, automate tasks, and build event-driven workflows using serverless functions. When combined with CData Connect AI Remote MCP, Pipedream can interact with Amazon Athena data in real time using natural language, without the need for data replication to a natively supported database.

CData Connect AI offers a dedicated cloud-to-cloud interface for connecting to Amazon Athena data. The CData Connect AI Remote MCP Server enables secure communication between Pipedream and Amazon Athena, making it possible to ask questions and retrieve data from Amazon Athena using Pipedream workflows, all powered by an LLM that intelligently discovers data sources and generates SQL queries on the fly.

This article covers how to build a simple natural language data query workflow in Pipedream to conversationally explore Amazon Athena data. The connectivity principles apply to any Pipedream workflow. With Connect AI, workflows and agents can be built with access to live Amazon Athena data, plus hundreds of other sources.

About Amazon Athena Data Integration

CData provides the easiest way to access and integrate live data from Amazon Athena. Customers use CData connectivity to:

  • Authenticate securely using a variety of methods, including IAM credentials, access keys, and Instance Profiles, catering to diverse security needs and simplifying the authentication process.
  • Streamline their setup and quickly resolve issue with detailed error messaging.
  • Enhance performance and minimize strain on client resources with server-side query execution.

Users frequently integrate Athena with analytics tools like Tableau, Power BI, and Excel for in-depth analytics from their preferred tools.

To learn more about unique Amazon Athena use cases with CData, check out our blog post: https://www.cdata.com/blog/amazon-athena-use-cases.


Getting Started


Prerequisites

  1. A CData Connect AI account with at least one active connection (e.g., Amazon Athena)
  2. A Pipedream account
  3. An OpenAI account with API Key
  4. CData Connect AI credentials:
    • Email (used as username for Basic Auth)
    • Personal Access Token (PAT) generated from the CData Connect AI Settings page

Step 1: Configure Amazon Athena connectivity for Pipedream

Connectivity to Amazon Athena from Pipedream is made possible through CData Connect AI Remote MCP. To interact with Amazon Athena from Pipedream, we start by creating and configuring a Amazon Athena connection in CData Connect AI.

  1. Log into Connect AI, click Sources, and then click Add Connection
  2. Select "Amazon Athena" from the Add Connection panel
  3. Enter the necessary authentication properties to connect to Amazon Athena.

    Authenticating to Amazon Athena

    To authorize Amazon Athena requests, provide the credentials for an administrator account or for an IAM user with custom permissions: Set AccessKey to the access key Id. Set SecretKey to the secret access key.

    Note: Though you can connect as the AWS account administrator, it is recommended to use IAM user credentials to access AWS services.

    Obtaining the Access Key

    To obtain the credentials for an IAM user, follow the steps below:

    1. Sign into the IAM console.
    2. In the navigation pane, select Users.
    3. To create or manage the access keys for a user, select the user and then select the Security Credentials tab.

    To obtain the credentials for your AWS root account, follow the steps below:

    1. Sign into the AWS Management console with the credentials for your root account.
    2. Select your account name or number and select My Security Credentials in the menu that is displayed.
    3. Click Continue to Security Credentials and expand the Access Keys section to manage or create root account access keys.

    Authenticating from an EC2 Instance

    If you are using the CData Data Provider for Amazon Athena 2018 from an EC2 Instance and have an IAM Role assigned to the instance, you can use the IAM Role to authenticate. To do so, set UseEC2Roles to true and leave AccessKey and SecretKey empty. The CData Data Provider for Amazon Athena 2018 will automatically obtain your IAM Role credentials and authenticate with them.

    Authenticating as an AWS Role

    In many situations it may be preferable to use an IAM role for authentication instead of the direct security credentials of an AWS root user. An AWS role may be used instead by specifying the RoleARN. This will cause the CData Data Provider for Amazon Athena 2018 to attempt to retrieve credentials for the specified role. If you are connecting to AWS (instead of already being connected such as on an EC2 instance), you must additionally specify the AccessKey and SecretKey of an IAM user to assume the role for. Roles may not be used when specifying the AccessKey and SecretKey of an AWS root user.

    Authenticating with MFA

    For users and roles that require Multi-factor Authentication, specify the MFASerialNumber and MFAToken connection properties. This will cause the CData Data Provider for Amazon Athena 2018 to submit the MFA credentials in a request to retrieve temporary authentication credentials. Note that the duration of the temporary credentials may be controlled via the TemporaryTokenDuration (default 3600 seconds).

    Connecting to Amazon Athena

    In addition to the AccessKey and SecretKey properties, specify Database, S3StagingDirectory and Region. Set Region to the region where your Amazon Athena data is hosted. Set S3StagingDirectory to a folder in S3 where you would like to store the results of queries.

    If Database is not set in the connection, the data provider connects to the default database set in Amazon Athena.

  4. Click Save & Test
  5. Navigate to the Permissions tab in the Add Amazon Athena Connection page and update the User-based permissions.

Add a Personal Access Token

A Personal Access Token (PAT) is used to authenticate the connection to Connect AI from Pipedream. It is best practice to create a separate PAT for each service to maintain granularity of access.

  1. Click on the Gear icon () at the top right of the Connect AI app to open the settings page.
  2. On the Settings page, go to the Access Tokens section and click Create PAT.
  3. Give the PAT a name and click Create.
  4. The personal access token is only visible at creation, so be sure to copy it and store it securely for future use.

With the connection configured and a PAT generated, we are ready to connect to Amazon Athena from Pipedream.


Step 2: Set up environment variables in Pipedream

Store credentials securely as environment variables in Pipedream.

  1. In Pipedream, go to Settings, then Environment Variables
  2. Click on New Variable and add the following variables:
  3. Variable name Value
    CDATA_EMAIL CData Connect AI login email
    CDATA_PAT CData Personal Access Token
    OPENAI_API_KEY OpenAI API Key

Step 3: Create the Pipedream workflow

3.1 Configure the HTTP trigger

  1. Create a new workflow in Pipedream
  2. Select HTTP / Webhook as the trigger
  3. Set HTTP Response to "Return a custom response from your workflow"

3.2 Add the LLM step

Add a Node.js code step named LLM. This step extracts the natural language query from the incoming request.

Replace the default code in the step with the following:

import OpenAI from "openai";

export default defineComponent({
  async run({ steps }) {
    if (steps.trigger.event.method === "OPTIONS") {
      return { userQuery: null, isOptions: true };
    }

    const body = steps.trigger.event.body;
    const parsed = typeof body === "string" ? JSON.parse(body) : body;
    const userQuery = parsed?.query;

    console.log("USER QUERY:", userQuery);
    if (!userQuery) throw new Error("No query found in request body");

    return { userQuery };
  }
});

3.3 Add the MCP step

Add a Node.js code step named MCP. This step implements the full agentic MCP flow, it automatically discovers all available connections, selects the most relevant one based on the question, discovers the schema and tables dynamically, generates a SQL query using the LLM, and executes it against Amazon Athena data.

The step uses the following CData Connect AI MCP tools in sequence:

MCP Tool Purpose
getCatalogs Retrieves all available connections from CData Connect AI
getSchemas Retrieves the database schemas for the selected connection
getTables Retrieves all tables and views for the selected schema
queryData Executes the generated SQL query and returns results

Replace the default code in the step with the following:


import fetch from "node-fetch";
import OpenAI from "openai";

export default defineComponent({
  async run({ steps }) {
    const email = process.env.CDATA_EMAIL;
    const pat = process.env.CDATA_PAT;
    const credentials = email + ":" + pat;
    const auth = Buffer.from(credentials).toString("base64");
    const llmOutput = steps.LLM;
    const userQuery = llmOutput.return_value.userQuery; // In Pipedream replace with: steps.LLM.$return_value.userQuery
    const MCP_URL = "https://mcp.cloud.cdata.com/mcp";
    const NL = String.fromCharCode(10);
    const CRNL = String.fromCharCode(13) + String.fromCharCode(10);

    const headers = {
      "Content-Type": "application/json",
      "Accept": "application/json, text/event-stream",
      "Authorization": "Basic " + auth
    };

    function parseSSE(raw) {
      try {
        const lines = raw.split(NL);
        for (let i = 0; i < lines.length; i++) {
          const line = lines.at(i);
          const trimmed = line.trim();
          if (trimmed.indexOf("data:") === 0) {
            const jsonStr = trimmed.slice(5).trim();
            if (jsonStr) {
              const json = JSON.parse(jsonStr);
              const result = json && json.result;
              const content = result && result.content;
              if (Array.isArray(content)) {
                return {
                  parsed: content.map(function(c) { return c.text || ""; }).join(NL),
                  isError: (result && result.isError) || false,
                  full: json
                };
              }
            }
          }
        }
      } catch (e) {
        console.log("SSE parse error:", e.message);
      }
      return { parsed: raw, isError: false, full: null };
    }

    function parseCSV(text) {
      let clean = text || "";
      if (clean.charAt(0) === '"' && clean.charAt(clean.length - 1) === '"') {
        clean = clean.slice(1, -1);
      }
      const ESC_CRNL = String.fromCharCode(92) + "r" + String.fromCharCode(92) + "n";
      const ESC_QUOTE = String.fromCharCode(92) + '"';
      const ESC_SLASH = String.fromCharCode(92) + String.fromCharCode(92);
      const SINGLE_SLASH = String.fromCharCode(92);
      clean = clean.split(ESC_CRNL).join(CRNL).split(ESC_QUOTE).join('"').split(ESC_SLASH).join(SINGLE_SLASH);
      const lines = clean.split(CRNL).filter(function(l) { return l.trim(); });
      return lines.slice(1).map(function(l) { return l.split(",").at(0).trim(); }).filter(Boolean);
    }

    async function initSession() {
      const res = await fetch(MCP_URL, {
        method: "POST",
        headers: headers,
        body: JSON.stringify({
          jsonrpc: "2.0",
          id: 1,
          method: "initialize",
          params: {
            protocolVersion: "2024-11-05",
            capabilities: {},
            clientInfo: { name: "pipedream", version: "1.0" }
          }
        })
      });
      return res.headers.get("mcp-session-id");
    }

    async function callMCP(id, method, args, sessionId) {
      const reqHeaders = Object.assign({}, headers);
      if (sessionId) {
        Object.assign(reqHeaders, { "mcp-session-id": sessionId });
      }
      const res = await fetch(MCP_URL, {
        method: "POST",
        headers: reqHeaders,
        body: JSON.stringify({
          jsonrpc: "2.0",
          id: id,
          method: "tools/call",
          params: { name: method, arguments: args }
        })
      });
      const raw = await res.text();
      const result = parseSSE(raw);
      result.raw = raw;
      return result;
    }

    const client = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });
    const completions = client.chat.completions;

    const session1 = await initSession();
    const catalogsResult = await callMCP(2, "getCatalogs", {}, session1);
    const catalogs = parseCSV(catalogsResult.parsed);

    const systemMsg1 = "You are a data routing expert. Pick the MOST relevant connection name from the list. Return ONLY the connection name. Available connections: " + catalogs.join(", ");
    const connectionResponse = await completions.create({
      model: "gpt-4o-mini",
      messages: new Array(
        { role: "system", content: systemMsg1 },
        { role: "user", content: userQuery }
      )
    });
    const connectionName = connectionResponse.choices.at(0).message.content.trim();

    const session2 = await initSession();
    const schemasResult = await callMCP(2, "getSchemas", {
      connectionName: connectionName,
      catalogName: connectionName
    }, session2);
    const schemas = parseCSV(schemasResult.parsed);
    const schemaName = schemas.at(0) || "REST";

    const session3 = await initSession();
    const tablesResult = await callMCP(2, "getTables", {
      connectionName: connectionName,
      catalogName: connectionName,
      schemaName: schemaName
    }, session3);
    const tableNames = parseCSV(tablesResult.parsed);

    const queryLower = userQuery.toLowerCase();
    const isListTablesQuery =
      queryLower.indexOf("list") !== -1 ||
      queryLower.indexOf("what tables") !== -1 ||
      queryLower.indexOf("show tables") !== -1;

    if (isListTablesQuery) {
      return {
        success: true,
        connection: connectionName,
        message: "Available tables in " + connectionName + "." + schemaName,
        tables: tableNames
      };
    }

    const tableList = tableNames.map(function(t) {
      return connectionName + "." + schemaName + "." + t;
    }).join(", ");

    const systemMsg2 = "You are a SQL expert. Generate SQL for CData. Use format: connectionName.schemaName.TableName. Available tables: " + tableList + ". Return ONLY SQL. No markdown. No brackets.";
    const sqlResponse = await completions.create({
      model: "gpt-4o-mini",
      messages: new Array(
        { role: "system", content: systemMsg2 },
        { role: "user", content: userQuery }
      )
    });
    const sql = sqlResponse.choices.at(0).message.content.trim();

    if (!sql) { return { error: "LLM returned empty SQL" }; }

    const session4 = await initSession();
    const queryResult = await callMCP(2, "queryData", {
      query: sql,
      connectionName: connectionName
    }, session4);

    if (queryResult.full) {
      const content = queryResult.full.result && queryResult.full.result.content;
      if (Array.isArray(content)) {
        try {
          const parsed = JSON.parse(content.at(0).text);
          const results = parsed.results && parsed.results.at(0);
          return {
            sql: sql,
            connection: connectionName,
            data: (results && results.rows) || new Array(),
            schema: (results && results.schema) || new Array(),
            success: true
          };
        } catch (e) {
          return { sql: sql, connection: connectionName, raw: content.at(0).text, success: true };
        }
      }
    }
    return { sql: sql, connection: connectionName, raw: queryResult.raw };
  }
});

Note: When pasting into Pipedream, replace llmOutput.return_value.userQuery with steps.LLM.$return_value.userQuery as indicated in the comment on that line.

3.4 Configure the response step

  1. Add a Return HTTP Response step and name it Response
  2. Set Response Status Code to 200
  3. Set Response Body to {{steps.mcp.$return_value}}
  4. Add the following Response Headers by clicking Response Headers then :
  5. Key Value
    Access-Control-Allow-Origin *
    Access-Control-Allow-Methods POST, OPTIONS
    Access-Control-Allow-Headers Content-Type

Step 4: Test the flow and interact with Amazon Athena data

Set a test event on the trigger:

  1. Click the trigger step in the workflow
  2. Click Generate Test Event
  3. Edit the event body and set it to:
  4. {
      "query": "list all tables"
    }
    

Run the full workflow

  1. Click Test workflow at the bottom of the trigger step
  2. Pipedream will run all steps in sequence using the test event
  3. Watch each step turn green as it completes successfully

View results in each step

After the test run completes, click each step tab and check the Exports tab to inspect outputs:

Step What to look for in exports
trigger body.query - confirms the query was received
LLM userQuery - confirms the query was extracted
MCP connection, sql, data, schema - confirms data was fetched
Response $response.body - the final JSON response

The Logs tab inside any step shows detailed outputs including the generated SQL, selected connection, and raw MCP responses.

Note: The Response step's Exports tab only shows a summary like { "success": true } with "status 200"; this confirms the workflow ran successfully but does not show the full data.

To see the complete output including data rows, SQL, and schema, click the MCP step tab and check the Exports tab. Expand $return_value to see the full response:

The workflow automatically:

  1. Discovers all CData connections
  2. Selects the most relevant connection for the question
  3. Discovers the schema and tables dynamically
  4. Generates and executes the appropriate SQL query
  5. Returns the results

How it works

The integration uses the following CData Connect AI MCP tools in sequence:

MCP tool Purpose
getCatalogs Retrieves all available connections from CData Connect AI
getSchemas Retrieves the database schemas for a specific connection
getTables Retrieves all tables and views for a specific schema
queryData Executes SQL queries and returns results

The OpenAI LLM acts as the intelligent layer between the natural language question and the CData MCP tools, selecting the right connection, discovering the data structure, and generating accurate SQL queries automatically.

Build real-time, data-aware workflows with Pipedream and CData

Pipedream and CData Connect AI together enable intelligent, AI-driven workflows where natural language queries are automatically translated into live data operations across enterprise systems, without ETL pipelines, data sync jobs, or custom integration logic. This streamlined approach delivers stronger governance, lower operational overhead, and faster, more grounded responses from AI-powered workflows.

Start a free trial today to see how CData Connect AI can empower Pipedream with live, secure access to hundreds of external systems.

Ready to get started?

Learn more about CData Connect AI or sign up for free trial access:

Free Trial