Ingesting Data from Private Postgres Cluster into DataHub using APIs and SDKs

Original Slack Thread

hi team
I was exploring how to ingest data from postgres to datahub. Our clusters are private and due to security reasons cannot be accessed directly. We would need to deploy a service that sits inside the same vpc that the cluster belongs to. Hence I would need to use either datahub SDK or API to be able to ingest data from the database and then store it in the datahub store. I came across this set of https://github.com/datahub-project/datahub/blob/5886e63e5c1e409a6a69a92524230c46a7f0f3f4/metadata-service/README.md#ingesting-entities|APIs. But I am not very clear on a few points below:

  1. How will I be able to get values for entity related fields? I checked the boostrap file but still not very clear.
  2. I do not see us specifying credentials for the source(looker,mysql etc.) whichever source we wish to ingest data from. How do we specify the values we do as part of recipe when we do ingestion via UI?
  3. I assume after calling the apis, the metadata would be created directly in the datahub store? Or do I need to make another call to push the data to datahub?
  4. If I could get example of how datahub API’s are being used to ingest data same thing we do via UI since in our case we cannot utilize UI for databases in production.
    Thanks!

<@U06TM7M40RK> answer the above question please

To address your questions about ingesting data from a private PostgreSQL cluster into DataHub using the DataHub SDK or API, let’s break down each of your points:

  1. Entity Related Fields:

    • When using the DataHub API or SDK, you need to construct the metadata entities manually. This involves defining the entity’s URN, aspects, and other metadata fields. The <The Metadata Model | DataHub Metadata Model> provides a comprehensive guide on the structure of entities and aspects. You will need to gather the necessary metadata from your PostgreSQL database, such as table names, column types, and any other relevant information, and map these to the DataHub metadata model.
  2. Specifying Credentials:

    • When using the DataHub API or SDK, you will need to handle the connection to your data source (e.g., PostgreSQL) separately. This means you will need to write code to connect to your PostgreSQL database using appropriate credentials and extract the metadata. The DataHub ingestion framework typically handles this in the ingestion recipe, but when using the API or SDK, you must manage this connection yourself.
  3. Metadata Creation:

    • Once you have constructed the metadata entities using the API or SDK, you can directly ingest them into DataHub. The API calls you make will create or update the metadata in the DataHub store. There is no need for an additional call to push the data; the API handles this as part of the ingestion process.
  4. Example of Using DataHub API:

Here is a basic example of how you might use the DataHub Python SDK to ingest a dataset:

from datahub.emitter.mce_builder import make_dataset_urn
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import DatasetSnapshotClass, MetadataChangeEventClass

# Create a DataHub emitter
emitter = DatahubRestEmitter("http://localhost:8080")

# Define the dataset URN
dataset_urn = make_dataset_urn(platform="postgres", name="my_table", env="PROD")

# Create a dataset snapshot
dataset_snapshot = DatasetSnapshotClass(
    urn=dataset_urn,
    aspects=[]  # Add aspects like schema, ownership, etc.
)

# Create a metadata change event
mce = MetadataChangeEventClass(proposedSnapshot=dataset_snapshot)

# Emit the metadata change event
emitter.emit(mce)

This example demonstrates how to create a dataset entity and emit it to DataHub using the Python SDK. You will need to customize the aspects and metadata according to your specific use case.

Would you like more detailed steps or further assistance on any specific part of this process?

Sources:

<@U07FV3UHE4D> Can someone please help answer the questions above

<@U06TM7M40RK> provide an example of using openapi endpoints to create ingestion source and ingest data from it

To use OpenAPI endpoints to create an ingestion source and ingest data into DataHub, you can follow these steps. This involves using the OpenAPI endpoints to define and execute an ingestion process programmatically.

Step-by-Step Guide

  1. Create an Ingestion Source:

    • Use the OpenAPI endpoint to define a new ingestion source. This involves specifying the source type, configuration, and any necessary credentials.
  2. Ingest Data:

    • Once the ingestion source is created, you can trigger the ingestion process to pull metadata from the specified source into DataHub.

Example Using OpenAPI

Below is a conceptual example of how you might use OpenAPI to create an ingestion source and ingest data. Note that this example assumes you have access to the OpenAPI endpoints and necessary authentication tokens.

Step 1: Create an Ingestion Source

curl --location --request POST 'http://localhost:8080/openapi/entities/v1/ingest' \
--header 'Content-Type: application/json' \
--header 'Accept: application/json' \
--header 'Authorization: Bearer <your-personal-access-token>' \
--data-raw '[
  {
    "aspect": {
      "__type": "IngestionSource",
      "name": "Postgres Ingestion",
      "type": "postgres",
      "config": {
        "host_port": "your-postgres-host:5432",
        "database": "your_database",
        "username": "your_postgres_username",
        "password": "your_postgres_password",
        "include_tables": true,
        "include_views": true
      }
    },
    "entityType": "ingestionSource",
    "entityUrn": "urn:li:ingestionSource:(urn:li:dataPlatform:postgres,PostgresIngestion,PROD)"
  }
]'

Step 2: Trigger Ingestion

After creating the ingestion source, you can trigger the ingestion process. This might involve another API call to start the ingestion job, depending on your setup.

Explanation

  • URL: The endpoint URL for creating an ingestion source.
  • Headers: Include Content-Type, Accept, and Authorization headers. Replace <your-personal-access-token> with your actual token.
  • Data: The JSON payload defines the ingestion source, including its type and configuration.

Additional Information

Would you like more detailed steps or further assistance on any specific part of this process?

Sources:

<@U06TM7M40RK> how do we trigger ingestion using openapi ? plz give example

To trigger an ingestion using OpenAPI in DataHub, you can use the createIngestionExecutionRequest mutation via the GraphQL API. This allows you to programmatically start an ingestion run for a specified ingestion source.

Example Using GraphQL with cURL

Here is an example of how you can trigger an ingestion using the GraphQL API with a cURL command:

curl -X POST https://your-datahub-instance/api/graphql \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
  -d '{
    "query": "mutation { createIngestionExecutionRequest(input: { ingestionSourceUrn: \"urn:li:dataHubIngestionSource:your-ingestion-source-urn\" }) }"
  }'

Explanation

  • URL: Replace https://your-datahub-instance/api/graphql with the URL of your DataHub GraphQL endpoint.
  • Headers: Include Content-Type and Authorization headers. Replace YOUR_ACCESS_TOKEN with your actual authentication token.
  • Data: The JSON payload contains the GraphQL mutation createIngestionExecutionRequest, which requires the ingestionSourceUrn of the ingestion source you want to execute.

Example Using Python

If you prefer using Python, you can use the requests library to send the GraphQL mutation:

import requests

url = "https://your-datahub-instance/api/graphql"
headers = {
    "Content-Type": "application/json",
    "Authorization": "Bearer YOUR_ACCESS_TOKEN"
}
query = """
mutation {
  createIngestionExecutionRequest(input: { ingestionSourceUrn: "urn:li:dataHubIngestionSource:your-ingestion-source-urn" })
}
"""

response = requests.post(url, headers=headers, json={"query": query})
print(response.json())

Additional Information

  • GraphQL API Documentation: For more details on the GraphQL API and available mutations, refer to the <Mutations | DataHub API Documentation>.
  • Authentication: Ensure you have the necessary authentication tokens to access the GraphQL API.

Would you like more detailed steps or further assistance on any specific part of this process?

Sources:

<@U08LD7GGJ7Q>, I can try to help answering your questions above.
Based on your message, I haven’t fully understood what you want to achieve. Do you want to run DataHub in the same private network than the Postgres or not? Which one of both is the one supposed to run private?

<@U049WUH4155> Postgres cluster is already private. Hence we would need to create an internal service. The datahub platform will be a separate deployment. This service will sit between our postgres clusters and the datahub platform. we would need to call datahub apis inside this service to pull metadata from the database. Hence I am trying to check if there is any challenge in creating this service? We cannot use the SDK since we priortise golang so we would need to rely on APIs. Let me know if this clarifies your doubts.

Thanks for clarifying!

Okay in this case, it’s not possible to use the built-int Postgres connector of DataHub because this is designed to pull the metadata from the source system. So what you would need to achieve is to mimic the behavior of the <https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/sql/postgres.py|postgres connector >

The link to the documentation you shared is a good starting point. E.g. this is a good example for generating a Dataset via the API: https://github.com/datahub-project/datahub/blob/5886e63e5c1e409a6a69a92524230c46a7f0f3f4/metadata-service/README.md#create-a-dataset

Similar to that you would need to send all kind of information to DataHub, e.g., information about lineage etc.

Ok <@U049WUH4155> Thanks for the quick response. I was under the impression that we should be able to connect to postgres and if we can connect then we should be able to add ingestion source via the api, and then the api https://github.com/datahub-project/datahub/blob/5886e63e5c1e409a6a69a92524230c46a7f0f3f4/metadata-service/README.md#create-a-dataset|above should be helpful to be used to actually start the ingestion process? Is the api above not for ingesting? Sorry but I am a bit confused. Because from what you say you are suggesting to use the api to actually add metadata manually for the entitiy like postgres tables in this case? Is my understanding right?

Sorry if I might be missing something obvious but I am havinf difficulty in understanding how to use the APIs for my usecase if possible. Is it possible to share an example perhaps on a similar usecase wherein people are not using UI or CLi directin to ingest metadata?

and if we can connect then we should be able to add ingestion source via the api, and then the api above should be helpful to be used to actually start the ingestion process?
The API endpoint takes metadata assets to be created in DataHub, in this example a data set. The endpoint expects a so called metadata change proposal (https://datahubproject.io/docs/what/mxe/#metadata-change-proposal-mcp|MCP).
You send the API endpoint the definition of a data set and then DataHub creates the data set as an asset.
You can create these MCPs using the available SKDs. But in case you use the SDKs there is no need to talk to the API directly, the SDK covers that for you.

Is the api above not for ingesting?
The API is for ingesting, but you can’t configure an ingestion there. It just takes the metadata assets as MCPs to be created on DataHub.

What is typically done in a scenario like yours, is to use the Python or Java SDK of DataHub, configuring the ingestion via recipe files, using the SDK to read the recipes, collect the metadata via the DataHub sources and emit the metadata to DataHub.
I can provide you with an example here.

For some reason I thought you want to write your service in Go and therefore you are not able to use the SDKs, that’s why I came up with the manual creation, sorry for the confusion :slightly_smiling_face:

yes you understood correctly. We use go as the primary choice and therefore I was checking out the API documentation to understand what all APIs I would need to call and what inputs I would need to provide. If there would be a challenge in using APIs in a service we create to pull data from postgres and then upload that to datahub.

I can provide you with an example here.
Can you share the example nonetheless? If the difficulty of using API directly is quite high, there might be a valid case to use SDK even in Java or Python.

perhaps <@U01GZEETMEZ> or <@UV5UEC3LN> can you help here?

It sounds like you’re basically trying to recreate the source and sink behavior in Go? I would recommend against this and just use the standard https://datahubproject.io/docs/generated/ingestion/sources/postgres|Postgres connector. You can run this as a service and either directly sink to your https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub|DataHub instance or if you need it to upload to a file output you can upload to an S3 bucket that is accessible by the DataHub instance using the <https://datahubproject.io/docs/metadata-ingestion/sink_docs/metadata-file|file sink >

Please take a look at the <https://datahubproject.io/docs/actions/quickstart/|DataHub Actions> documentation.

https://datahubproject.io/docs/actions/concepts/
https://datahubproject.io/docs/actions/

We already ship a deployable service, there is no need to create a wrapper around the CLI