Creating Field-to-Field Lineage from MySQL Tables to Elasticsearch Indices using DataHub API

Original Slack Thread

<@U06TM7M40RK> how do i do field to field lineage from mysql tables to several elasticsearch indices using the api.

To create field-to-field (column-level) lineage from MySQL tables to Elasticsearch indices in DataHub using the API, you need to programmatically emit fine-grained lineage metadata. This is done by constructing and sending lineage information that maps specific columns in MySQL tables to fields in Elasticsearch indices.

Key Points:

  • DataHub does not support automatic column-level lineage extraction for MySQL out of the box, nor for Elasticsearch. You must emit this lineage manually using the Python SDK or API.
  • You must ensure that both the MySQL tables and Elasticsearch indices (and their schemas) are already ingested into DataHub before emitting lineage, otherwise the operation will fail.
  • The process involves creating URNs for the source (MySQL column) and target (Elasticsearch field), building FineGrainedLineage objects, and emitting them using the DataHub Python SDK.

Example: Emitting Field-to-Field Lineage (Python SDK)

Below is a sample script that demonstrates how to create column-level lineage from a MySQL table to an Elasticsearch index:

import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
    DatasetLineageType,
    FineGrainedLineage,
    FineGrainedLineageDownstreamType,
    FineGrainedLineageUpstreamType,
    Upstream,
    UpstreamLineage,
)

# Helper functions to build URNs
def mysql_dataset_urn(db, table):
    return builder.make_dataset_urn("mysql", f"{db}.{table}")

def es_dataset_urn(index):
    return builder.make_dataset_urn("elasticsearch", index)

def mysql_field_urn(db, table, column):
    return builder.make_schema_field_urn(mysql_dataset_urn(db, table), column)

def es_field_urn(index, field):
    return builder.make_schema_field_urn(es_dataset_urn(index), field)

# Example: Map 'user_id' in MySQL to 'user_id' in two Elasticsearch indices
fine_grained_lineages = [
    FineGrainedLineage(
        upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
        upstreams=[mysql_field_urn("mydb", "users", "user_id")],
        downstreamType=FineGrainedLineageDownstreamType.FIELD,
        downstreams=[es_field_urn("users_index", "user_id")],
    ),
    FineGrainedLineage(
        upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
        upstreams=[mysql_field_urn("mydb", "users", "user_id")],
        downstreamType=FineGrainedLineageDownstreamType.FIELD,
        downstreams=[es_field_urn("audit_index", "user_id")],
    ),
]

# Define upstream dataset (MySQL table)
upstream = Upstream(
    dataset=mysql_dataset_urn("mydb", "users"),
    type=DatasetLineageType.TRANSFORMED,
)

# Create the UpstreamLineage aspect
field_lineages = UpstreamLineage(
    upstreams=[upstream],
    fineGrainedLineages=fine_grained_lineages,
)

# Emit lineage for each Elasticsearch index
emitter = DatahubRestEmitter("<http://localhost:8080>")
for index in ["users_index", "audit_index"]:
    lineage_mcp = MetadataChangeProposalWrapper(
        entityUrn=es_dataset_urn(index),
        aspect=field_lineages,
    )
    emitter.emit_mcp(lineage_mcp)

What this does:

  • Maps the user_id column in the MySQL users table to the user_id field in two Elasticsearch indices: users_index and audit_index.
  • You can extend this pattern to map multiple columns/fields as needed.

Additional Notes

Would you like a more detailed breakdown of any part of this process, or help with a specific mapping scenario?


Sources:

<@U06TM7M40RK> could you give me a code that will list all the urn for the elasticsearch indcies?

Certainly! To list all the URNs for Elasticsearch indices in DataHub, you can use the GraphQL API to search for entities of type DATASET and filter by the platform field with the value elasticsearch. Below is a Python script that uses the requests library to do this, including token authentication.

import requests
import os
import json

# Set your DataHub token and GraphQL endpoint URL
datahub_token = os.getenv("DATAHUB_TOKEN")  # Set your token as an environment variable
graphql_url = "https://demo.datahubproject.io/api/graphql"

# Define the GraphQL query to filter by platform 'elasticsearch'
query = """
query listElasticsearchIndices {
  search(input: {
    type: DATASET,
    query: "*",
    start: 0,
    count: 100,
    orFilters: [
      {
        and: [
          {
            field: "platform"
            values: ["elasticsearch"]
            condition: CONTAIN
          }
        ]
      }
    ]
  }) {
    start
    count
    total
    searchResults {
      entity {
        urn
      }
    }
  }
}
"""

headers = {
    'Authorization': f'Bearer {datahub_token}',
    'Content-Type': 'application/json'
}

response = requests.post(graphql_url, headers=headers, data=json.dumps({'query': query}))
response.raise_for_status()
data = response.json()

# Print the URNs for Elasticsearch indices
for result in data['data']['search']['searchResults']:
    print(result['entity']['urn'])

How it works:

  • The query searches for all DATASET entities where the platform is elasticsearch.
  • It returns the URNs for the first 100 Elasticsearch indices. You can adjust the count and use pagination if you have more indices.

Would you like more details on pagination or how to retrieve additional metadata for each index?


Sources:

<@U06TM7M40RK> is the gms_server the same as graphql_url

Great question! The terms gms_server and graphql_url are related but not always exactly the same, depending on your DataHub deployment and how you access the APIs.

Explanation

  • GMS Server (gms_server):
    This refers to the Generalized Metadata Service (GMS), which is the backend service in DataHub responsible for storing and serving metadata. The GMS exposes both a REST API (Rest.li) and a GraphQL API. The GMS server is typically accessible at a base URL like http://localhost:8080 (for self-hosted) or https://<your-account>.acryl.io/api/gms/ (for managed DataHub).

  • GraphQL URL (graphql_url):
    This is the specific endpoint for the GraphQL API. In DataHub, the GraphQL endpoint is usually /api/graphql relative to the GMS server. For example:

    • Self-hosted: http://localhost:8080/api/graphql
    • Managed: https://<your-account>.acryl.io/api/gms/graphql

So, the graphql_url is a specific path on the gms_server where the GraphQL API is exposed.

Example

If your gms_server is:

http://localhost:8080

then your graphql_url would be:

http://localhost:8080/api/graphql

If your managed DataHub GMS server is:

https://your-account.acryl.io/api/gms/

then your graphql_url would be:

https://your-account.acryl.io/api/gms/graphql

References

Would you like more detail on how to configure these URLs or how authentication works with each?


Sources:

<@U06TM7M40RK> Raw response:
{
“data”: {
“search”: {
“start”: 0,
“count”: 100,
“total”: 0,
“searchResults”:
}
},
“extensions”: {}
}
No Elasticsearch datasets found.
so i guess how do i ingest the elasticsearch data into the datahub to make sure i see data.

To ingest Elasticsearch data into DataHub so that you can see Elasticsearch datasets (indices) in your DataHub instance, you need to run a metadata ingestion job using the Elasticsearch source connector. This process extracts metadata about your Elasticsearch indices and ingests it into DataHub, making them discoverable and queryable via the UI and API.

Here’s how you can do it:


1. Prepare an Ingestion Recipe

You need a YAML recipe that specifies the Elasticsearch source and the DataHub sink. Here’s a minimal example:

source:
  type: "elasticsearch"
  config:
    host: "http://localhost:9200"  # Change to your Elasticsearch host
    index_pattern:
      allow: ["*"]                 # Ingest all indices
    ingest_index_templates: true   # Optional: ingest index templates

sink:
  type: "datahub-rest"
  config:
    server: "http://localhost:8080"  # Change to your DataHub GMS server
    # token: "<your-datahub-token>"  # Uncomment and set if authentication is enabled

2. Run the Ingestion

You can run the ingestion using the DataHub CLI:

datahub ingest -c path/to/your/recipe.yaml
  • Make sure you have installed the DataHub CLI and the Elasticsearch plugin:
    pip install 'acryl-datahub[elasticsearch]'
    

3. Verify the Ingestion

  • After the job completes, you should see Elasticsearch indices as datasets in the DataHub UI and via the API.
  • If you still see no results, check the ingestion logs for errors and ensure your Elasticsearch instance is reachable from where you run the ingestion.

4. UI-based Ingestion (Optional)

You can also configure and run the ingestion from the DataHub UI:

  • Go to the Ingestion tab.
  • Click + Create new source and select Elasticsearch.
  • Fill in the connection details and run the ingestion job.

For more details, see the <https://datahubproject.io/docs/0.13.1/ui-ingestion|UI Ingestion Guide>.


Troubleshooting

  • If ingestion fails, check for:
    • Network connectivity issues between DataHub and Elasticsearch.
    • Authentication or SSL errors.
    • Misconfiguration in the recipe (e.g., wrong host, missing credentials).
  • Review the logs in the DataHub UI under the ingestion run details, or in your CLI output.

Would you like a step-by-step walkthrough for your specific environment or help with a particular error message?


Sources:

<@U06TM7M40RK> Raw response:
{
“data”: {
“search”: {
“start”: 0,
“count”: 100,
“total”: 1685,
“searchResults”:
}
},
“extensions”: {}
}
No Elasticsearch datasets found.

Based on the raw response you provided, your GraphQL search query is returning a total of 1685 datasets, but the searchResults array is empty. This means that while there are datasets indexed in DataHub, none match the specific search criteria you used (likely filtering for Elasticsearch datasets).

Possible reasons and troubleshooting steps:

  1. No Elasticsearch Datasets Ingested:
    If you have not yet run an ingestion job for Elasticsearch, DataHub will not have any Elasticsearch datasets to return. You need to ingest Elasticsearch metadata as described in the <https://datahubproject.io/docs/0.13.1/generated/ingestion/sources/elasticsearch|Elasticsearch Source Documentation>.

  2. Incorrect Search Filter:
    If your search query is filtering for datasets with platform: elasticsearch but none exist, the result will be empty. Double-check your ingestion and the filter in your query.

  3. Ingestion Succeeded, but Platform Name Mismatch:
    Sometimes, the platform name in DataHub may not be exactly elasticsearch (it could be elasticsearch or opensearch depending on your configuration). You can try searching for all datasets without a platform filter and inspect their platform field to confirm the correct value.

  4. Pagination Issue:
    If you have a large number of datasets, but the first page is empty, there may be a pagination or sorting issue. Try setting start: 0 and count: 10 to see if you get any results.


What to do next

  • Verify Ingestion:
    Make sure you have run an Elasticsearch ingestion job as described in the <https://datahubproject.io/docs/0.13.1/generated/ingestion/sources/elasticsearch|Elasticsearch Source Documentation>.

  • Check Platform Name:
    Run a search query for all datasets and inspect the platform field to confirm the correct value for Elasticsearch datasets.

  • Example: Search All Datasets and Print Platforms

    import requests
    import json
    
    url = "http://localhost:8080/api/graphql"
    query = """
    query {
      search(input: {type: DATASET, query: "*", start: 0, count: 10}) {
        searchResults {
          entity {
            urn
            ... on Dataset {
              platform {
                name
              }
            }
          }
        }
      }
    }
    """
    headers = {"Content-Type": "application/json"}
    response = requests.post(url, json={"query": query}, headers=headers)
    print(json.dumps(response.json(), indent=2))
    

    This will help you see what platforms are present in your DataHub instance.


Would you like more detailed help with your ingestion recipe or troubleshooting your ingestion job?


Sources:

<@U06TM7M40RK> so this code work for mysql but how i can get it to work for elasticsearch. i am looking to get urn and a specific field with each elasticsearch index as well. how can you modify this code.

import json

CONFIG = {
    "datahub": {
        "gms_server": "<http://datahub-gms:8080>",  # 🔁 Replace with your actual GMS URL
        "token": "4gfgfsafvsdffv"
            }
}

HEADERS = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {CONFIG['datahub']['token']}"
}

def search_mysql_datasets(folder_name="customer_data", target_table="books1"):
    url = f"{CONFIG['datahub']['gms_server']}/api/graphql"
    query = {
        "query": """
        query search($input: SearchInput!) {
            search(input: $input) {
                searchResults {
                    entity {
                        urn
                        ... on Dataset {
                            name
                            platform {
                                name
                            }
                        }
                    }
                }
            }
        }
        """,
        "variables": {
            "input": {
                "type": "DATASET",
                "query": folder_name,
                "orFilters": [
                    {
                        "and": [
                            {
                                "field": "platform",
                                "values": ["urn:li:dataPlatform:mysql"]
                            }
                        ]
                    }
                ],
                "start": 0,
                "count": 500
            }
        }
    }

    response = <http://requests.post|requests.post>(url, headers=HEADERS, json=query)
    print("🔍 Status:", response.status_code)
    print("🔍 Raw response:", response.text)

    results = response.json().get("data", {}).get("search", {}).get("searchResults", [])

    matched_tables = []
    for r in results:
        dataset = r["entity"]
        urn = dataset["urn"]
        name = dataset["name"]

        if name.lower() == target_table.lower():
            matched_tables.append(dataset)

    return matched_tables

def get_schema_fields(dataset_urn):
    url = f"{CONFIG['datahub']['gms_server']}/api/graphql"
    query = {
        "query": """
        query getSchema($urn: String!) {
            dataset(urn: $urn) {
                schemaMetadata {
                    fields {
                        fieldPath
                    }
                }
            }
        }
        """,
        "variables": {
            "urn": dataset_urn
        }
    }

    response = <http://requests.post|requests.post>(url, headers=HEADERS, json=query)
    fields = response.json().get("data", {}).get("dataset", {}).get("schemaMetadata", {}).get("fields", [])
    return [f["fieldPath"] for f in fields]


if __name__ == "__main__":
    financials = search_mysql_datasets(target_table="books1")

    for ds in financials:
        urn = ds["urn"]
        print(f"✅ Found MySQL table URN: {urn}")
        fields = get_schema_fields(urn)
        print(f"📌 Schema Fields: {fields}")

        if "code1" in fields:
            print("🎯 Field 'code1' found ✅")
        else:
            print("⚠️ Field 'code1' NOT found ❌")```

To adapt your code to work for Elasticsearch indices instead of MySQL tables, you mainly need to:

  1. Change the platform filter from MySQL to Elasticsearch.
  2. Optionally, adjust the search query if you want to filter by index name or other criteria.
  3. The rest of the logic (fetching URN and schema fields) remains the same, since DataHub represents Elasticsearch indices as datasets.

Below is the modified code to search for Elasticsearch indices and retrieve their URN and schema fields (e.g., to check for a field like code1):

import requests
import json

CONFIG = {
    "datahub": {
        "gms_server": "<http://datahub-gms:8080>",  # Replace with your actual GMS URL
        "token": "4gfgfsafvsdffv"
    }
}

HEADERS = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {CONFIG['datahub']['token']}"
}

def search_elasticsearch_indices(index_name="customer_data", target_index="books1"):
    url = f"{CONFIG['datahub']['gms_server']}/api/graphql"
    query = {
        "query": """
        query search($input: SearchInput!) {
            search(input: $input) {
                searchResults {
                    entity {
                        urn
                        ... on Dataset {
                            name
                            platform {
                                name
                            }
                        }
                    }
                }
            }
        }
        """,
        "variables": {
            "input": {
                "type": "DATASET",
                "query": index_name,
                "orFilters": [
                    {
                        "and": [
                            {
                                "field": "platform",
                                "values": ["urn:li:dataPlatform:elasticsearch"]
                            }
                        ]
                    }
                ],
                "start": 0,
                "count": 500
            }
        }
    }

    response = <http://requests.post|requests.post>(url, headers=HEADERS, json=query)
    print("🔍 Status:", response.status_code)
    print("🔍 Raw response:", response.text)

    results = response.json().get("data", {}).get("search", {}).get("searchResults", [])

    matched_indices = []
    for r in results:
        dataset = r["entity"]
        urn = dataset["urn"]
        name = dataset["name"]

        if name.lower() == target_index.lower():
            matched_indices.append(dataset)

    return matched_indices

def get_schema_fields(dataset_urn):
    url = f"{CONFIG['datahub']['gms_server']}/api/graphql"
    query = {
        "query": """
        query getSchema($urn: String!) {
            dataset(urn: $urn) {
                schemaMetadata {
                    fields {
                        fieldPath
                    }
                }
            }
        }
        """,
        "variables": {
            "urn": dataset_urn
        }
    }

    response = <http://requests.post|requests.post>(url, headers=HEADERS, json=query)
    fields = response.json().get("data", {}).get("dataset", {}).get("schemaMetadata", {}).get("fields", [])
    return [f["fieldPath"] for f in fields]

if __name__ == "__main__":
    indices = search_elasticsearch_indices(target_index="books1")

    for ds in indices:
        urn = ds["urn"]
        print(f"✅ Found Elasticsearch index URN: {urn}")
        fields = get_schema_fields(urn)
        print(f"📌 Schema Fields: {fields}")

        if "code1" in fields:
        else:
            print("⚠️ Field 'code1' NOT found ❌")

Key changes:

  • The platform filter is now "urn:li:dataPlatform:elasticsearch".
  • The function and variable names are updated for clarity.
  • The rest of the logic is unchanged and will work for Elasticsearch indices as long as they are ingested into DataHub.

Note:
Make sure you have ingested your Elasticsearch indices into DataHub. If you have not, follow the <https://datahubproject.io/docs/0.13.1/generated/ingestion/sources/elasticsearch|Elasticsearch ingestion guide>.

Would you like more detail on how to adjust the search criteria or how to ingest Elasticsearch indices?


Sources:

  • <Slack GraphQL query for platform-specific dataset URNs> :+1: 0 button :-1: 0 button :bulb: Hint: Mention @runllm in the thread for followups.

<@U06TM7M40RK> with this code
import requests
import json

CONFIG = {
“datahub”: {
“gms_server”: “http://datahub-gms:8080”, # Replace with your actual GMS host
“token”: “your_personal_access_token” # Replace with your actual token
}
}

HEADERS = {
“Content-Type”: “application/json”,
“Authorization”: f"Bearer {CONFIG[‘datahub’][‘token’]}"
}

def search_datasets(platform: str, folder_name: str, target_table: str = None):
url = f"{CONFIG[‘datahub’][‘gms_server’]}/api/graphql"
query = {
“query”: “”"
query search($input: SearchInput!) {
search(input: $input) {
searchResults {
entity {
urn
… on Dataset {
name
platform {
name
}
}
}
}
}
}
“”“,
“variables”: {
“input”: {
“type”: “DATASET”,
“query”: folder_name,
“orFilters”: [
{
“and”: [
{
“field”: “platform”,
“values”: [f"urn:li:dataPlatform:{platform}”]
}
]
}
],
“start”: 0,
“count”: 500
}
}
}

response = <http://requests.post|requests.post>(url, headers=HEADERS, json=query)
if response.status_code != 200:
    raise Exception(f"Search failed: {response.text}")

results = response.json().get("data", {}).get("search", {}).get("searchResults", [])

matched = []
for r in results:
    dataset = r["entity"]
    if not target_table or dataset["name"].lower() == target_table.lower():
        matched.append(dataset)

return matched

def get_schema_fields(dataset_urn):
url = f"{CONFIG[‘datahub’][‘gms_server’]}/api/graphql"
query = {
“query”: “”"
query getSchema($urn: String!) {
dataset(urn: $urn) {
schemaMetadata {
fields {
fieldPath
}
}
}
}
“”",
“variables”: {
“urn”: dataset_urn
}
}

response = <http://requests.post|requests.post>(url, headers=HEADERS, json=query)
if response.status_code != 200:
    raise Exception(f"Schema fetch failed: {response.text}")

fields = response.json().get("data", {}).get("dataset", {}).get("schemaMetadata", {}).get("fields", [])
return [f["fieldPath"] for f in fields]

def emit_field_lineage(upstream_urn: str, upstream_field: str, downstream_urn: str, downstream_field: str):
url = f"{CONFIG[‘datahub’][‘gms_server’]}/entities?action=ingestProposal"

lineage_payload = {
    "entityType": "dataset",
    "changeType": "UPSERT",
    "entityUrn": downstream_urn,
    "aspectName": "schemaFieldFlow",
    "aspect": {
        "fieldLineages": [
            {
                "downstreamField": downstream_field,
                "upstreams": [
                    {
                        "dataset": upstream_urn,
                        "schemaField": upstream_field,
                        "transformationDescription": "Mapped via ETL process",
                        "transformationType": "manual"
                    }
                ]
            }
        ]
    }
}

response = <http://requests.post|requests.post>(url, headers=HEADERS, json=lineage_payload)
if response.status_code != 200:
    raise Exception(f"Lineage push failed: {response.text}")
print(f":white_check_mark: Lineage emitted: {upstream_urn}.{upstream_field} :arrow_right: {downstream_urn}.{downstream_field}")

if name == “main”:
# Example configuration
mysql_folder = “c1customer_info.fake21records.financial”
mysql_table = “financial”
mysql_field = “code1hgs”

es_folder = "c1_onguard_data_es"
es_field = "code52sd"

# Step 1: Find MySQL source table
mysql_datasets = search_datasets("mysql", mysql_folder, mysql_table)
if not mysql_datasets:
    print(":x: MySQL dataset not found.")
    exit()

mysql_urn = mysql_datasets[0]["urn"]
mysql_fields = get_schema_fields(mysql_urn)

if mysql_field not in mysql_fields:
    print(f":x: MySQL field '{mysql_field}' not found in schema.")
    exit()

# Step 2: Find ES indices that contain the field
es_datasets = search_datasets("elasticsearch", es_folder)
for ds in es_datasets:
    es_urn = ds["urn"]
    fields = get_schema_fields(es_urn)

    matching_fields = [f for f in fields if f.endswith(f".{es_field}")]
    if not matching_fields:
        print(f":warning: Field '{es_field}' not found in {es_urn}")
        continue

    for field_path in matching_fields:
        emit_field_lineage(
            upstream_urn=mysql_urn,
            upstream_field=mysql_field,
            downstream_urn=es_urn,
            downstream_field=field_path
        )

i get the below error
back.succeeded(AbstractConnection.java:322)\n\tat http://org.eclipse.jetty.io|org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:99)\n\tat http://org.eclipse.jetty.io|org.eclipse.jetty.io.SelectableChannelEndPoint$1.run(SelectableChannelEndPoint.java:53)\n\tat org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.runTask(AdaptiveExecutionStrategy.java:480)\n\tat org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.consumeTask(AdaptiveExecutionStrategy.java:443)\n\tat org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.tryProduce(AdaptiveExecutionStrategy.java:293)\n\tat org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.run(AdaptiveExecutionStrategy.java:201)\n\tat org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:311)\n\tat org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:979)\n\tat org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.doRunJob(QueuedThreadPool.java:1209)\n\tat org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1164)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\nCaused by: com.linkedin.restli.server.RoutingException: POST operation named ingestProposal not supported on resource ‘com.linkedin.metadata.resources.entity.EntityResource’ URI: ‘/entities?action=ingestProposal’\n\tat com.linkedin.restli.internal.server.RestLiRouter.findMethodDescriptor(RestLiRouter.java:263)\n\tat com.linkedin.restli.internal.server.RestLiRouter.processResourceTree(RestLiRouter.java:219)\n\tat com.linkedin.restli.internal.server.RestLiRouter.process(RestLiRouter.java:142)\n\tat com.linkedin.restli.server.BaseRestLiServer.getRoutingResult(BaseRestLiServer.java:183)\n\tat com.linkedin.restli.server.RestRestLiServer.handleResourceRequest(RestRestLiServer.java:224)\n\t… 64 more\n",“message”:“POST operation named ingestProposal not supported on resource ‘com.linkedin.metadata.resources.entity.EntityResource’ URI: ‘/entities?action=ingestProposal’”,“status”:400}

The error you are seeing:

"POST operation named ingestProposal not supported on resource 'com.linkedin.metadata.resources.entity.EntityResource' URI: '/entities?action=ingestProposal'"

means that the endpoint /entities?action=ingestProposal does not support the ingestProposal action. This is a common confusion in DataHub API usage.


Why This Happens

  • The correct endpoint for emitting metadata change proposals (MCPs) is /aspects?action=ingestProposal, not /entities?action=ingestProposal.
  • The /entities endpoint does not have the ingestProposal action, which is why you get a 400 error and the message about the operation not being supported.

This is confirmed in the <https://github.com/datahub-project/datahub/blob/master/docs/api/restli/restli-overview.md|DataHub API documentation> and in multiple community discussions.


How to Fix

Change this line:

url = f"{CONFIG['datahub']['gms_server']}/entities?action=ingestProposal"

To:

url = f"{CONFIG['datahub']['gms_server']}/aspects?action=ingestProposal"

Additional Notes

  • The payload structure for field-level lineage is not standardized as schemaFieldFlow in the DataHub public API. The officially supported way to emit field-level (fine-grained) lineage is to use the <https://datahubproject.io/docs/0.14.0/api/tutorials/lineage#add-column-level-lineage|UpstreamLineage aspect with FineGrainedLineage objects>, typically via the Python SDK, not by hand-crafting REST payloads.
  • If you want to emit field-level lineage via REST, you must use the correct aspect and structure. The schemaFieldFlow aspect is not documented as a public API for this use case.

Example: Emitting Fine-Grained Lineage via Python SDK

The recommended way is to use the DataHub Python SDK, which handles the correct aspect and endpoint for you. Here is a <https://datahubproject.io/docs/0.14.0/api/tutorials/lineage#add-column-level-lineage|documented example>:

import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
    DatasetLineageType,
    FineGrainedLineage,
    FineGrainedLineageDownstreamType,
    FineGrainedLineageUpstreamType,
    Upstream,
    UpstreamLineage,
)

def dataset_urn(platform, name):
    return builder.make_dataset_urn(platform, name)

def field_urn(platform, name, field):
    return builder.make_schema_field_urn(dataset_urn(platform, name), field)

# Example: MySQL to Elasticsearch field-level lineage
upstream_field_urn = field_urn("mysql", "c1customer_info.fake21records.financial", "code1hgs")
downstream_field_urn = field_urn("elasticsearch", "c1_onguard_data_es", "code52sd")

fine_grained_lineage = FineGrainedLineage(
    upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
    upstreams=[upstream_field_urn],
    downstreamType=FineGrainedLineageDownstreamType.FIELD,
    downstreams=[downstream_field_urn],
)

upstream = Upstream(
    dataset=dataset_urn("mysql", "c1customer_info.fake21records.financial"),
    type=DatasetLineageType.TRANSFORMED,
)

field_lineages = UpstreamLineage(
    upstreams=[upstream],
    fineGrainedLineages=[fine_grained_lineage],
)

lineage_mcp = MetadataChangeProposalWrapper(
    entityUrn=dataset_urn("elasticsearch", "c1_onguard_data_es"),
    aspect=field_lineages,
)

emitter = DatahubRestEmitter(CONFIG['datahub']['gms_server'], token=CONFIG['datahub']['token'])
emitter.emit_mcp(lineage_mcp)

Summary

  • Use /aspects?action=ingestProposal for REST API calls.
  • Prefer the Python SDK for emitting field-level lineage, as it handles the correct aspect and structure.
  • The schemaFieldFlow aspect is not a documented public API for this use case.

Would you like a step-by-step example using the Python SDK for your specific MySQL and Elasticsearch fields?


Sources: