<@U06TM7M40RK> how do i do field to field lineage from mysql tables to several elasticsearch indices using the api.
To create field-to-field (column-level) lineage from MySQL tables to Elasticsearch indices in DataHub using the API, you need to programmatically emit fine-grained lineage metadata. This is done by constructing and sending lineage information that maps specific columns in MySQL tables to fields in Elasticsearch indices.
Key Points:
- DataHub does not support automatic column-level lineage extraction for MySQL out of the box, nor for Elasticsearch. You must emit this lineage manually using the Python SDK or API.
- You must ensure that both the MySQL tables and Elasticsearch indices (and their schemas) are already ingested into DataHub before emitting lineage, otherwise the operation will fail.
- The process involves creating URNs for the source (MySQL column) and target (Elasticsearch field), building
FineGrainedLineage
objects, and emitting them using the DataHub Python SDK.
Example: Emitting Field-to-Field Lineage (Python SDK)
Below is a sample script that demonstrates how to create column-level lineage from a MySQL table to an Elasticsearch index:
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
FineGrainedLineage,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
Upstream,
UpstreamLineage,
)
# Helper functions to build URNs
def mysql_dataset_urn(db, table):
return builder.make_dataset_urn("mysql", f"{db}.{table}")
def es_dataset_urn(index):
return builder.make_dataset_urn("elasticsearch", index)
def mysql_field_urn(db, table, column):
return builder.make_schema_field_urn(mysql_dataset_urn(db, table), column)
def es_field_urn(index, field):
return builder.make_schema_field_urn(es_dataset_urn(index), field)
# Example: Map 'user_id' in MySQL to 'user_id' in two Elasticsearch indices
fine_grained_lineages = [
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[mysql_field_urn("mydb", "users", "user_id")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[es_field_urn("users_index", "user_id")],
),
FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[mysql_field_urn("mydb", "users", "user_id")],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[es_field_urn("audit_index", "user_id")],
),
]
# Define upstream dataset (MySQL table)
upstream = Upstream(
dataset=mysql_dataset_urn("mydb", "users"),
type=DatasetLineageType.TRANSFORMED,
)
# Create the UpstreamLineage aspect
field_lineages = UpstreamLineage(
upstreams=[upstream],
fineGrainedLineages=fine_grained_lineages,
)
# Emit lineage for each Elasticsearch index
emitter = DatahubRestEmitter("<http://localhost:8080>")
for index in ["users_index", "audit_index"]:
lineage_mcp = MetadataChangeProposalWrapper(
entityUrn=es_dataset_urn(index),
aspect=field_lineages,
)
emitter.emit_mcp(lineage_mcp)
What this does:
- Maps the
user_id
column in the MySQLusers
table to theuser_id
field in two Elasticsearch indices:users_index
andaudit_index
. - You can extend this pattern to map multiple columns/fields as needed.
Additional Notes
- You can view the resulting lineage in the DataHub UI under the Lineage tab for the relevant Elasticsearch index or MySQL table. Make sure to enable “Show Columns” to see column-level lineage.
- For more details on the structure and usage of fine-grained lineage, see the <https://datahubproject.io/docs/0.14.0/api/tutorials/lineage#add-column-level-lineage|DataHub Lineage Documentation> and <https://datahubproject.io/docs/0.13.1/generated/metamodel/entities/dataset#fine-grained-lineage|Fine-Grained Lineage Guide>.
- If you need to patch or update lineage, refer to the <https://datahubproject.io/docs/0.14.0/advanced/patch#examples|Advanced Patch Guide>.
Would you like a more detailed breakdown of any part of this process, or help with a specific mapping scenario?
Sources:
- <https://datahubproject.io/docs/0.14.0/api/tutorials/lineage#add-column-level-lineage|Add Column-level Lineage (DataHub Docs)>
- <Slack DataHub field-level lineage for MySQL and Elasticsearch>
- <https://datahubproject.io/docs/0.14.0/advanced/patch#examples|Advanced Patch Guide>
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> could you give me a code that will list all the urn for the elasticsearch indcies?
Certainly! To list all the URNs for Elasticsearch indices in DataHub, you can use the GraphQL API to search for entities of type DATASET
and filter by the platform
field with the value elasticsearch
. Below is a Python script that uses the requests library to do this, including token authentication.
import requests
import os
import json
# Set your DataHub token and GraphQL endpoint URL
datahub_token = os.getenv("DATAHUB_TOKEN") # Set your token as an environment variable
graphql_url = "https://demo.datahubproject.io/api/graphql"
# Define the GraphQL query to filter by platform 'elasticsearch'
query = """
query listElasticsearchIndices {
search(input: {
type: DATASET,
query: "*",
start: 0,
count: 100,
orFilters: [
{
and: [
{
field: "platform"
values: ["elasticsearch"]
condition: CONTAIN
}
]
}
]
}) {
start
count
total
searchResults {
entity {
urn
}
}
}
}
"""
headers = {
'Authorization': f'Bearer {datahub_token}',
'Content-Type': 'application/json'
}
response = requests.post(graphql_url, headers=headers, data=json.dumps({'query': query}))
response.raise_for_status()
data = response.json()
# Print the URNs for Elasticsearch indices
for result in data['data']['search']['searchResults']:
print(result['entity']['urn'])
How it works:
- The query searches for all
DATASET
entities where the platform iselasticsearch
. - It returns the URNs for the first 100 Elasticsearch indices. You can adjust the
count
and use pagination if you have more indices.
Would you like more details on pagination or how to retrieve additional metadata for each index?
Sources:
- <Slack GraphQL query for platform-specific dataset URNs>
- <Slack Python code for authenticated GraphQL queries>
- <https://datahubproject.io/docs/0.14.0/api/graphql/getting-started|DataHub GraphQL API Getting Started>
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> is the gms_server the same as graphql_url
Great question! The terms gms_server
and graphql_url
are related but not always exactly the same, depending on your DataHub deployment and how you access the APIs.
Explanation
-
GMS Server (
gms_server
):
This refers to the Generalized Metadata Service (GMS), which is the backend service in DataHub responsible for storing and serving metadata. The GMS exposes both a REST API (Rest.li) and a GraphQL API. The GMS server is typically accessible at a base URL likehttp://localhost:8080
(for self-hosted) orhttps://<your-account>.acryl.io/api/gms/
(for managed DataHub). -
GraphQL URL (
graphql_url
):
This is the specific endpoint for the GraphQL API. In DataHub, the GraphQL endpoint is usually/api/graphql
relative to the GMS server. For example:- Self-hosted:
http://localhost:8080/api/graphql
- Managed:
https://<your-account>.acryl.io/api/gms/graphql
- Self-hosted:
So, the graphql_url
is a specific path on the gms_server
where the GraphQL API is exposed.
Example
If your gms_server
is:
http://localhost:8080
then your graphql_url
would be:
http://localhost:8080/api/graphql
If your managed DataHub GMS server is:
https://your-account.acryl.io/api/gms/
then your graphql_url
would be:
https://your-account.acryl.io/api/gms/graphql
References
- According to the <https://datahubproject.io/docs/0.14.0/metadata-service|DataHub Metadata Service documentation>, the GMS exposes a GraphQL API at
/api/graphql
. - The <https://github.com/datahub-project/datahub/blob/master/docs/managed-datahub/datahub-api/graphql-api/getting-started.md|Getting Started with DataHub GraphQL API guide> explains that you can use either the main API endpoint or the GMS-specific endpoint for GraphQL queries.
Would you like more detail on how to configure these URLs or how authentication works with each?
Sources:
- <https://datahubproject.io/docs/0.14.0/metadata-service|DataHub Metadata Service>
- <https://github.com/datahub-project/datahub/blob/master/docs/managed-datahub/datahub-api/graphql-api/getting-started.md|Getting Started with DataHub GraphQL API>
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> Raw response:
{
“data”: {
“search”: {
“start”: 0,
“count”: 100,
“total”: 0,
“searchResults”:
}
},
“extensions”: {}
}
No Elasticsearch datasets found.
so i guess how do i ingest the elasticsearch data into the datahub to make sure i see data.
To ingest Elasticsearch data into DataHub so that you can see Elasticsearch datasets (indices) in your DataHub instance, you need to run a metadata ingestion job using the Elasticsearch source connector. This process extracts metadata about your Elasticsearch indices and ingests it into DataHub, making them discoverable and queryable via the UI and API.
Here’s how you can do it:
1. Prepare an Ingestion Recipe
You need a YAML recipe that specifies the Elasticsearch source and the DataHub sink. Here’s a minimal example:
source:
type: "elasticsearch"
config:
host: "http://localhost:9200" # Change to your Elasticsearch host
index_pattern:
allow: ["*"] # Ingest all indices
ingest_index_templates: true # Optional: ingest index templates
sink:
type: "datahub-rest"
config:
server: "http://localhost:8080" # Change to your DataHub GMS server
# token: "<your-datahub-token>" # Uncomment and set if authentication is enabled
- Adjust the
host
to point to your Elasticsearch instance. - If you have authentication or SSL, add the relevant fields as shown in the <https://datahubproject.io/docs/0.13.1/generated/ingestion/sources/elasticsearch|Elasticsearch source documentation>.
2. Run the Ingestion
You can run the ingestion using the DataHub CLI:
datahub ingest -c path/to/your/recipe.yaml
- Make sure you have installed the DataHub CLI and the Elasticsearch plugin:
pip install 'acryl-datahub[elasticsearch]'
3. Verify the Ingestion
- After the job completes, you should see Elasticsearch indices as datasets in the DataHub UI and via the API.
- If you still see no results, check the ingestion logs for errors and ensure your Elasticsearch instance is reachable from where you run the ingestion.
4. UI-based Ingestion (Optional)
You can also configure and run the ingestion from the DataHub UI:
- Go to the Ingestion tab.
- Click + Create new source and select Elasticsearch.
- Fill in the connection details and run the ingestion job.
For more details, see the <https://datahubproject.io/docs/0.13.1/ui-ingestion|UI Ingestion Guide>.
Troubleshooting
- If ingestion fails, check for:
- Network connectivity issues between DataHub and Elasticsearch.
- Authentication or SSL errors.
- Misconfiguration in the recipe (e.g., wrong host, missing credentials).
- Review the logs in the DataHub UI under the ingestion run details, or in your CLI output.
Would you like a step-by-step walkthrough for your specific environment or help with a particular error message?
Sources:
- <https://datahubproject.io/docs/0.13.1/generated/ingestion/sources/elasticsearch|Elasticsearch Source Documentation>
- <https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/cli-ingestion.md|CLI Ingestion Guide>
- <https://datahubproject.io/docs/0.13.1/ui-ingestion|UI Ingestion Guide>
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> Raw response:
{
“data”: {
“search”: {
“start”: 0,
“count”: 100,
“total”: 1685,
“searchResults”:
}
},
“extensions”: {}
}
No Elasticsearch datasets found.
Based on the raw response you provided, your GraphQL search query is returning a total of 1685 datasets, but the searchResults array is empty. This means that while there are datasets indexed in DataHub, none match the specific search criteria you used (likely filtering for Elasticsearch datasets).
Possible reasons and troubleshooting steps:
-
No Elasticsearch Datasets Ingested:
If you have not yet run an ingestion job for Elasticsearch, DataHub will not have any Elasticsearch datasets to return. You need to ingest Elasticsearch metadata as described in the <https://datahubproject.io/docs/0.13.1/generated/ingestion/sources/elasticsearch|Elasticsearch Source Documentation>. -
Incorrect Search Filter:
If your search query is filtering for datasets withplatform: elasticsearch
but none exist, the result will be empty. Double-check your ingestion and the filter in your query. -
Ingestion Succeeded, but Platform Name Mismatch:
Sometimes, the platform name in DataHub may not be exactlyelasticsearch
(it could beelasticsearch
oropensearch
depending on your configuration). You can try searching for all datasets without a platform filter and inspect their platform field to confirm the correct value. -
Pagination Issue:
If you have a large number of datasets, but the first page is empty, there may be a pagination or sorting issue. Try settingstart: 0
andcount: 10
to see if you get any results.
What to do next
-
Verify Ingestion:
Make sure you have run an Elasticsearch ingestion job as described in the <https://datahubproject.io/docs/0.13.1/generated/ingestion/sources/elasticsearch|Elasticsearch Source Documentation>. -
Check Platform Name:
Run a search query for all datasets and inspect theplatform
field to confirm the correct value for Elasticsearch datasets. -
Example: Search All Datasets and Print Platforms
import requests import json url = "http://localhost:8080/api/graphql" query = """ query { search(input: {type: DATASET, query: "*", start: 0, count: 10}) { searchResults { entity { urn ... on Dataset { platform { name } } } } } } """ headers = {"Content-Type": "application/json"} response = requests.post(url, json={"query": query}, headers=headers) print(json.dumps(response.json(), indent=2))
This will help you see what platforms are present in your DataHub instance.
Would you like more detailed help with your ingestion recipe or troubleshooting your ingestion job?
Sources:
- <https://datahubproject.io/docs/0.13.1/ui-ingestion|UI Ingestion Guide>
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> so this code work for mysql but how i can get it to work for elasticsearch. i am looking to get urn and a specific field with each elasticsearch index as well. how can you modify this code.
import json
CONFIG = {
"datahub": {
"gms_server": "<http://datahub-gms:8080>", # 🔁 Replace with your actual GMS URL
"token": "4gfgfsafvsdffv"
}
}
HEADERS = {
"Content-Type": "application/json",
"Authorization": f"Bearer {CONFIG['datahub']['token']}"
}
def search_mysql_datasets(folder_name="customer_data", target_table="books1"):
url = f"{CONFIG['datahub']['gms_server']}/api/graphql"
query = {
"query": """
query search($input: SearchInput!) {
search(input: $input) {
searchResults {
entity {
urn
... on Dataset {
name
platform {
name
}
}
}
}
}
}
""",
"variables": {
"input": {
"type": "DATASET",
"query": folder_name,
"orFilters": [
{
"and": [
{
"field": "platform",
"values": ["urn:li:dataPlatform:mysql"]
}
]
}
],
"start": 0,
"count": 500
}
}
}
response = <http://requests.post|requests.post>(url, headers=HEADERS, json=query)
print("🔍 Status:", response.status_code)
print("🔍 Raw response:", response.text)
results = response.json().get("data", {}).get("search", {}).get("searchResults", [])
matched_tables = []
for r in results:
dataset = r["entity"]
urn = dataset["urn"]
name = dataset["name"]
if name.lower() == target_table.lower():
matched_tables.append(dataset)
return matched_tables
def get_schema_fields(dataset_urn):
url = f"{CONFIG['datahub']['gms_server']}/api/graphql"
query = {
"query": """
query getSchema($urn: String!) {
dataset(urn: $urn) {
schemaMetadata {
fields {
fieldPath
}
}
}
}
""",
"variables": {
"urn": dataset_urn
}
}
response = <http://requests.post|requests.post>(url, headers=HEADERS, json=query)
fields = response.json().get("data", {}).get("dataset", {}).get("schemaMetadata", {}).get("fields", [])
return [f["fieldPath"] for f in fields]
if __name__ == "__main__":
financials = search_mysql_datasets(target_table="books1")
for ds in financials:
urn = ds["urn"]
print(f"✅ Found MySQL table URN: {urn}")
fields = get_schema_fields(urn)
print(f"📌 Schema Fields: {fields}")
if "code1" in fields:
print("🎯 Field 'code1' found ✅")
else:
print("⚠️ Field 'code1' NOT found ❌")```
To adapt your code to work for Elasticsearch indices instead of MySQL tables, you mainly need to:
- Change the platform filter from MySQL to Elasticsearch.
- Optionally, adjust the search query if you want to filter by index name or other criteria.
- The rest of the logic (fetching URN and schema fields) remains the same, since DataHub represents Elasticsearch indices as datasets.
Below is the modified code to search for Elasticsearch indices and retrieve their URN and schema fields (e.g., to check for a field like code1
):
import requests
import json
CONFIG = {
"datahub": {
"gms_server": "<http://datahub-gms:8080>", # Replace with your actual GMS URL
"token": "4gfgfsafvsdffv"
}
}
HEADERS = {
"Content-Type": "application/json",
"Authorization": f"Bearer {CONFIG['datahub']['token']}"
}
def search_elasticsearch_indices(index_name="customer_data", target_index="books1"):
url = f"{CONFIG['datahub']['gms_server']}/api/graphql"
query = {
"query": """
query search($input: SearchInput!) {
search(input: $input) {
searchResults {
entity {
urn
... on Dataset {
name
platform {
name
}
}
}
}
}
}
""",
"variables": {
"input": {
"type": "DATASET",
"query": index_name,
"orFilters": [
{
"and": [
{
"field": "platform",
"values": ["urn:li:dataPlatform:elasticsearch"]
}
]
}
],
"start": 0,
"count": 500
}
}
}
response = <http://requests.post|requests.post>(url, headers=HEADERS, json=query)
print("🔍 Status:", response.status_code)
print("🔍 Raw response:", response.text)
results = response.json().get("data", {}).get("search", {}).get("searchResults", [])
matched_indices = []
for r in results:
dataset = r["entity"]
urn = dataset["urn"]
name = dataset["name"]
if name.lower() == target_index.lower():
matched_indices.append(dataset)
return matched_indices
def get_schema_fields(dataset_urn):
url = f"{CONFIG['datahub']['gms_server']}/api/graphql"
query = {
"query": """
query getSchema($urn: String!) {
dataset(urn: $urn) {
schemaMetadata {
fields {
fieldPath
}
}
}
}
""",
"variables": {
"urn": dataset_urn
}
}
response = <http://requests.post|requests.post>(url, headers=HEADERS, json=query)
fields = response.json().get("data", {}).get("dataset", {}).get("schemaMetadata", {}).get("fields", [])
return [f["fieldPath"] for f in fields]
if __name__ == "__main__":
indices = search_elasticsearch_indices(target_index="books1")
for ds in indices:
urn = ds["urn"]
print(f"✅ Found Elasticsearch index URN: {urn}")
fields = get_schema_fields(urn)
print(f"📌 Schema Fields: {fields}")
if "code1" in fields:
else:
print("⚠️ Field 'code1' NOT found ❌")
Key changes:
- The platform filter is now
"urn:li:dataPlatform:elasticsearch"
. - The function and variable names are updated for clarity.
- The rest of the logic is unchanged and will work for Elasticsearch indices as long as they are ingested into DataHub.
Note:
Make sure you have ingested your Elasticsearch indices into DataHub. If you have not, follow the <https://datahubproject.io/docs/0.13.1/generated/ingestion/sources/elasticsearch|Elasticsearch ingestion guide>.
Would you like more detail on how to adjust the search criteria or how to ingest Elasticsearch indices?
Sources:
- <Slack GraphQL query for platform-specific dataset URNs>
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> with this code
import requests
import json
CONFIG = {
“datahub”: {
“gms_server”: “http://datahub-gms:8080”, # Replace with your actual GMS host
“token”: “your_personal_access_token” # Replace with your actual token
}
}
HEADERS = {
“Content-Type”: “application/json”,
“Authorization”: f"Bearer {CONFIG[‘datahub’][‘token’]}"
}
def search_datasets(platform: str, folder_name: str, target_table: str = None):
url = f"{CONFIG[‘datahub’][‘gms_server’]}/api/graphql"
query = {
“query”: “”"
query search($input: SearchInput!) {
search(input: $input) {
searchResults {
entity {
urn
… on Dataset {
name
platform {
name
}
}
}
}
}
}
“”“,
“variables”: {
“input”: {
“type”: “DATASET”,
“query”: folder_name,
“orFilters”: [
{
“and”: [
{
“field”: “platform”,
“values”: [f"urn:li:dataPlatform:{platform}”]
}
]
}
],
“start”: 0,
“count”: 500
}
}
}
response = <http://requests.post|requests.post>(url, headers=HEADERS, json=query)
if response.status_code != 200:
raise Exception(f"Search failed: {response.text}")
results = response.json().get("data", {}).get("search", {}).get("searchResults", [])
matched = []
for r in results:
dataset = r["entity"]
if not target_table or dataset["name"].lower() == target_table.lower():
matched.append(dataset)
return matched
def get_schema_fields(dataset_urn):
url = f"{CONFIG[‘datahub’][‘gms_server’]}/api/graphql"
query = {
“query”: “”"
query getSchema($urn: String!) {
dataset(urn: $urn) {
schemaMetadata {
fields {
fieldPath
}
}
}
}
“”",
“variables”: {
“urn”: dataset_urn
}
}
response = <http://requests.post|requests.post>(url, headers=HEADERS, json=query)
if response.status_code != 200:
raise Exception(f"Schema fetch failed: {response.text}")
fields = response.json().get("data", {}).get("dataset", {}).get("schemaMetadata", {}).get("fields", [])
return [f["fieldPath"] for f in fields]
def emit_field_lineage(upstream_urn: str, upstream_field: str, downstream_urn: str, downstream_field: str):
url = f"{CONFIG[‘datahub’][‘gms_server’]}/entities?action=ingestProposal"
lineage_payload = {
"entityType": "dataset",
"changeType": "UPSERT",
"entityUrn": downstream_urn,
"aspectName": "schemaFieldFlow",
"aspect": {
"fieldLineages": [
{
"downstreamField": downstream_field,
"upstreams": [
{
"dataset": upstream_urn,
"schemaField": upstream_field,
"transformationDescription": "Mapped via ETL process",
"transformationType": "manual"
}
]
}
]
}
}
response = <http://requests.post|requests.post>(url, headers=HEADERS, json=lineage_payload)
if response.status_code != 200:
raise Exception(f"Lineage push failed: {response.text}")
print(f":white_check_mark: Lineage emitted: {upstream_urn}.{upstream_field} :arrow_right: {downstream_urn}.{downstream_field}")
if name == “main”:
# Example configuration
mysql_folder = “c1customer_info.fake21records.financial”
mysql_table = “financial”
mysql_field = “code1hgs”
es_folder = "c1_onguard_data_es"
es_field = "code52sd"
# Step 1: Find MySQL source table
mysql_datasets = search_datasets("mysql", mysql_folder, mysql_table)
if not mysql_datasets:
print(":x: MySQL dataset not found.")
exit()
mysql_urn = mysql_datasets[0]["urn"]
mysql_fields = get_schema_fields(mysql_urn)
if mysql_field not in mysql_fields:
print(f":x: MySQL field '{mysql_field}' not found in schema.")
exit()
# Step 2: Find ES indices that contain the field
es_datasets = search_datasets("elasticsearch", es_folder)
for ds in es_datasets:
es_urn = ds["urn"]
fields = get_schema_fields(es_urn)
matching_fields = [f for f in fields if f.endswith(f".{es_field}")]
if not matching_fields:
print(f":warning: Field '{es_field}' not found in {es_urn}")
continue
for field_path in matching_fields:
emit_field_lineage(
upstream_urn=mysql_urn,
upstream_field=mysql_field,
downstream_urn=es_urn,
downstream_field=field_path
)
i get the below error
back.succeeded(AbstractConnection.java:322)\n\tat http://org.eclipse.jetty.io|org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:99)\n\tat http://org.eclipse.jetty.io|org.eclipse.jetty.io.SelectableChannelEndPoint$1.run(SelectableChannelEndPoint.java:53)\n\tat org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.runTask(AdaptiveExecutionStrategy.java:480)\n\tat org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.consumeTask(AdaptiveExecutionStrategy.java:443)\n\tat org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.tryProduce(AdaptiveExecutionStrategy.java:293)\n\tat org.eclipse.jetty.util.thread.strategy.AdaptiveExecutionStrategy.run(AdaptiveExecutionStrategy.java:201)\n\tat org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:311)\n\tat org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:979)\n\tat org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.doRunJob(QueuedThreadPool.java:1209)\n\tat org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1164)\n\tat java.base/java.lang.Thread.run(Thread.java:840)\nCaused by: com.linkedin.restli.server.RoutingException: POST operation named ingestProposal not supported on resource ‘com.linkedin.metadata.resources.entity.EntityResource’ URI: ‘/entities?action=ingestProposal’\n\tat com.linkedin.restli.internal.server.RestLiRouter.findMethodDescriptor(RestLiRouter.java:263)\n\tat com.linkedin.restli.internal.server.RestLiRouter.processResourceTree(RestLiRouter.java:219)\n\tat com.linkedin.restli.internal.server.RestLiRouter.process(RestLiRouter.java:142)\n\tat com.linkedin.restli.server.BaseRestLiServer.getRoutingResult(BaseRestLiServer.java:183)\n\tat com.linkedin.restli.server.RestRestLiServer.handleResourceRequest(RestRestLiServer.java:224)\n\t… 64 more\n",“message”:“POST operation named ingestProposal not supported on resource ‘com.linkedin.metadata.resources.entity.EntityResource’ URI: ‘/entities?action=ingestProposal’”,“status”:400}
The error you are seeing:
"POST operation named ingestProposal not supported on resource 'com.linkedin.metadata.resources.entity.EntityResource' URI: '/entities?action=ingestProposal'"
means that the endpoint /entities?action=ingestProposal
does not support the ingestProposal
action. This is a common confusion in DataHub API usage.
Why This Happens
- The correct endpoint for emitting metadata change proposals (MCPs) is
/aspects?action=ingestProposal
, not/entities?action=ingestProposal
. - The
/entities
endpoint does not have theingestProposal
action, which is why you get a 400 error and the message about the operation not being supported.
This is confirmed in the <https://github.com/datahub-project/datahub/blob/master/docs/api/restli/restli-overview.md|DataHub API documentation> and in multiple community discussions.
How to Fix
Change this line:
url = f"{CONFIG['datahub']['gms_server']}/entities?action=ingestProposal"
To:
url = f"{CONFIG['datahub']['gms_server']}/aspects?action=ingestProposal"
Additional Notes
- The payload structure for field-level lineage is not standardized as
schemaFieldFlow
in the DataHub public API. The officially supported way to emit field-level (fine-grained) lineage is to use the <https://datahubproject.io/docs/0.14.0/api/tutorials/lineage#add-column-level-lineage|UpstreamLineage aspect with FineGrainedLineage objects>, typically via the Python SDK, not by hand-crafting REST payloads. - If you want to emit field-level lineage via REST, you must use the correct aspect and structure. The
schemaFieldFlow
aspect is not documented as a public API for this use case.
Example: Emitting Fine-Grained Lineage via Python SDK
The recommended way is to use the DataHub Python SDK, which handles the correct aspect and endpoint for you. Here is a <https://datahubproject.io/docs/0.14.0/api/tutorials/lineage#add-column-level-lineage|documented example>:
import datahub.emitter.mce_builder as builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.com.linkedin.pegasus2avro.dataset import (
DatasetLineageType,
FineGrainedLineage,
FineGrainedLineageDownstreamType,
FineGrainedLineageUpstreamType,
Upstream,
UpstreamLineage,
)
def dataset_urn(platform, name):
return builder.make_dataset_urn(platform, name)
def field_urn(platform, name, field):
return builder.make_schema_field_urn(dataset_urn(platform, name), field)
# Example: MySQL to Elasticsearch field-level lineage
upstream_field_urn = field_urn("mysql", "c1customer_info.fake21records.financial", "code1hgs")
downstream_field_urn = field_urn("elasticsearch", "c1_onguard_data_es", "code52sd")
fine_grained_lineage = FineGrainedLineage(
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
upstreams=[upstream_field_urn],
downstreamType=FineGrainedLineageDownstreamType.FIELD,
downstreams=[downstream_field_urn],
)
upstream = Upstream(
dataset=dataset_urn("mysql", "c1customer_info.fake21records.financial"),
type=DatasetLineageType.TRANSFORMED,
)
field_lineages = UpstreamLineage(
upstreams=[upstream],
fineGrainedLineages=[fine_grained_lineage],
)
lineage_mcp = MetadataChangeProposalWrapper(
entityUrn=dataset_urn("elasticsearch", "c1_onguard_data_es"),
aspect=field_lineages,
)
emitter = DatahubRestEmitter(CONFIG['datahub']['gms_server'], token=CONFIG['datahub']['token'])
emitter.emit_mcp(lineage_mcp)
Summary
- Use
/aspects?action=ingestProposal
for REST API calls. - Prefer the Python SDK for emitting field-level lineage, as it handles the correct aspect and structure.
- The
schemaFieldFlow
aspect is not a documented public API for this use case.
Would you like a step-by-step example using the Python SDK for your specific MySQL and Elasticsearch fields?
Sources:
- <https://datahubproject.io/docs/0.14.0/api/tutorials/lineage#add-column-level-lineage|Add Column-level Lineage (DataHub Docs)>
- <https://datahubproject.io/docs/0.13.1/api/tutorials/lineage#add-column-level-lineage|Fine-grained lineage example>
0 button
0 button
Hint: Mention @runllm in the thread for followups.