Hi all, I’m evaluating datahub as Data Governance tool to build a simple portal and share open information with citizens. I need to cenncect datahub to an API endpoint to get metadata in xml format. I’ve already tried with OpenApi UI ingestion in datahub 0.13.3 without successes, I get generic 403 error.
part from this error I would like to understand if to obtain the metadata in xml format from an API endpoint I have to implement something custom or there are native features that allow me to do it quickly.
Hey there! Make sure your message includes the following information if relevant, so we can help more effectively!
- Are you using UI or CLI for ingestion?
- Which DataHub version are you using? (e.g. 0.12.0)
- What data source(s) are you integrating with DataHub? (e.g. BigQuery)
<@U06TM7M40RK> Can you help with the questions in this thread?
We’re experimenting with a new AI assistant for questions in the <#CUMUWQU66> channel to help you track down helpful resources more quickly. Keep in mind that responses may not always be spot-on; feel free to ask it follow-up questions and don’t forget to provide its responses a
or
so other folks in the Community know if you’re still looking for help on the topic!
I’m not able to find any data sources to answer your question.
Could you provide more details on the specific API endpoint you are trying to connect to and the exact steps you have taken so far?
If this doesn’t help, please reach out to the DataHub support team for further assistance. 0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> here a link containing the metadata in xml I want to ingest into datahub:http://wms.pcn.minambiente.it/ogc?map=/ms_ogc/WMS_v1.3/servizi-LiDAR/LIDAR_AREA_DOLOMITICA.map&service=wms&request=getCapabilities&version=1.3.0
I’m not able to find any data sources to answer your question.
Could you specify whether you are using the UI or CLI for ingestion and any specific configurations you have tried?
If this doesn’t help, please reach out to the DataHub support team for further assistance. 0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> I’m using UI and wrote this recipe:
type: openapi
config:
name: test_endpoint
url: '<https://test_endpoint.com/>'
swagger_file: classicapi/doc/swagger.json
sink:
type: "datahub-rest"
config:
server: '<http://datahub-gms-1:8080>'
received this error: Exception: Unable to retrieve
Im’m using docker quickstart on docker to ttry ingest this source via openapi connector
Based on the information provided, it seems like you are trying to ingest metadata from an API endpoint using the OpenAPI connector in DataHub. However, the OpenAPI connector expects the metadata to be in OpenAPI (Swagger) format, not XML.
To ingest metadata in XML format, you would likely need to implement a custom ingestion source. DataHub does not natively support XML ingestion out of the box. Here are the steps you can follow to create a custom ingestion source:
-
Create a Custom Source: You will need to create a custom ingestion source that can parse the XML metadata and convert it into a format that DataHub can understand.
-
Develop the Custom Source: Follow the <https://datahubproject.io/docs/metadata-ingestion/developing/|DataHub metadata ingestion development guide> to develop and test your custom source.
-
Use the Custom Source in a Recipe: Once your custom source is developed, you can use it in an ingestion recipe similar to how you would use any other source.
Here is a high-level example of what the custom source might look like:
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.metadata.schema_classes import MetadataChangeEventClass
class CustomXMLSource(Source):
def __init__(self, config, ctx: PipelineContext):
super().__init__(ctx)
self.config = config
@classmethod
def create(cls, config_dict, ctx):
config = CustomXMLSourceConfig.parse_obj(config_dict)
return cls(config, ctx)
def get_workunits(self):
# Implement your XML parsing logic here
# Convert the parsed XML data into MetadataChangeEventClass objects
mce = MetadataChangeEventClass(...)
yield mce
def get_report(self):
return SourceReport()
# Define the configuration for your custom source
class CustomXMLSourceConfig(ConfigModel):
url: str
# Register the custom source
source = CustomXMLSource.create(config_dict, ctx)
You can then use this custom source in your ingestion recipe:
source:
type: custom_xml
config:
url: '<http://wms.pcn.minambiente.it/ogc?map=/ms_ogc/WMS_v1.3/servizi-LiDAR/LIDAR_AREA_DOLOMITICA.map&service=wms&request=getCapabilities&version=1.3.0>'
sink:
type: datahub-rest
config:
server: 'http://datahub-gms:8080'
If you need more detailed guidance on customizing the ingestion source, please let me know!
Sources:
- <https://datahubproject.io/docs/metadata-ingestion/developing/|DataHub metadata ingestion development guide>
- <https://datahubproject.io/docs/0.13.1/generated/ingestion/sources/openapi|OpenAPI ingestion source documentation>
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> Hi want to go into detail for point one: Create a Custom Source. Please give me more details on how to implement a custom source
To create a custom ingestion source for DataHub, you will need to follow a series of steps to define, implement, and integrate your custom source. Here’s a detailed breakdown of the process:
Step 1: Set Up Your Development Environment
-
Clone the DataHub Repository: Start by cloning the DataHub repository to your local machine.
git clone https://github.com/datahub-project/datahub.git cd datahub
-
Set Up a Virtual Environment: Create and activate a virtual environment.
python3 -m venv venv source venv/bin/activate
-
Install Dependencies: Install the necessary dependencies.
pip install -r requirements.txt
Step 2: Create Your Custom Source
-
Define the Configuration Model: Create a configuration model for your custom source using Pydantic.
from pydantic import BaseModel class CustomXMLSourceConfig(BaseModel): url: str
-
Implement the Custom Source Class: Create a class for your custom source that extends the
Source
class from DataHub.from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.source import Source, SourceReport from datahub.metadata.schema_classes import MetadataChangeEventClass class CustomXMLSource(Source): def __init__(self, config: CustomXMLSourceConfig, ctx: PipelineContext): super().__init__(ctx) self.config = config @classmethod def create(cls, config_dict, ctx): config = CustomXMLSourceConfig.parse_obj(config_dict) return cls(config, ctx) def get_workunits(self): # Implement your XML parsing logic here # Convert the parsed XML data into MetadataChangeEventClass objects mce = MetadataChangeEventClass( # Populate with your XML metadata ) yield mce def get_report(self): return SourceReport()
-
Parse the XML Data: Implement the logic to fetch and parse the XML data from the provided URL.
import requests from xml.etree import ElementTree def fetch_and_parse_xml(url): response = requests.get(url) if response.status_code == 200: tree = ElementTree.fromstring(response.content) # Parse the XML tree and extract metadata return tree else: raise Exception(f"Failed to fetch XML data: {response.status_code}")
-
Convert XML to MetadataChangeEvent: Convert the parsed XML data into
MetadataChangeEventClass
objects.def convert_xml_to_mce(xml_tree): # Implement the conversion logic mce = MetadataChangeEventClass( # Populate with your XML metadata ) return mce
- Integrate Parsing and Conversion: Integrate the parsing and conversion logic into the
get_workunits
method.def get_workunits(self): xml_tree = fetch_and_parse_xml(self.config.url) mce = convert_xml_to_mce(xml_tree) yield mce
Step 3: Package and Install the Custom Source
-
Create a Python Package: Package your custom source as a Python package.
mkdir custom_source touch custom_source/__init__.py
-
Add Setup Script: Create a
setup.py
script to package your custom source.from setuptools import setup, find_packages setup( name='custom_source', version='0.1', packages=find_packages(), install_requires=[ 'datahub', 'requests', ], )
-
Build and Install the Package: Build and install the package in your environment.
python setup.py sdist bdist_wheel pip install dist/custom_source-0.1-py3-none-any.whl
Step 4: Use the Custom Source in an Ingestion Recipe
-
Create an Ingestion Recipe: Create a YAML file for your ingestion recipe.
source: type: custom_source.CustomXMLSource config: url: '<http://wms.pcn.minambiente.it/ogc?map=/ms_ogc/WMS_v1.3/servizi-LiDAR/LIDAR_AREA_DOLOMITICA.map&service=wms&request=getCapabilities&version=1.3.0>' sink: type: datahub-rest config: server: '<http://datahub-gms:8080>'
-
Run the Ingestion: Use the DataHub CLI to run the ingestion.
datahub ingest -c path/to/your/recipe.yaml
Example Code
Here is a complete example of the custom source implementation:
from pydantic import BaseModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.metadata.schema_classes import MetadataChangeEventClass
import requests
from xml.etree import ElementTree
class CustomXMLSourceConfig(BaseModel):
url: str
class CustomXMLSource(Source):
def __init__(self, config: CustomXMLSourceConfig, ctx: PipelineContext):
super().__init__(ctx)
self.config = config
@classmethod
def create(cls, config_dict, ctx):
config = CustomXMLSourceConfig.parse_obj(config_dict)
return cls(config, ctx)
def fetch_and_parse_xml(self, url):
response = requests.get(url)
if response.status_code == 200:
tree = ElementTree.fromstring(response.content)
return tree
else:
raise Exception(f"Failed to fetch XML data: {response.status_code}")
def convert_xml_to_mce(self, xml_tree):
# Implement the conversion logic
mce = MetadataChangeEventClass(
# Populate with your XML metadata
)
return mce
def get_workunits(self):
xml_tree = self.fetch_and_parse_xml(self.config.url)
mce = self.convert_xml_to_mce(xml_tree)
yield mce
def get_report(self):
return SourceReport()
Sources:
- <https://datahubproject.io/docs/metadata-ingestion/developing/|DataHub metadata ingestion development guide>
- <https://datahubproject.io/docs/0.13.1/how/add-custom-ingestion-source|Adding a Custom Ingestion Source>
- <Slack Slack Discussion>
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> in depth explanation on how to populate MetadataChangeEventClass with the xml metadata with a simple example
To populate the MetadataChangeEventClass
with XML metadata, you need to parse the XML data and map it to the appropriate fields in the MetadataChangeEventClass
. Here’s a step-by-step guide with a simple example:
Step 1: Parse the XML Data
First, you need to fetch and parse the XML data. For this example, let’s assume the XML data contains information about datasets.
import requests
from xml.etree import ElementTree
def fetch_and_parse_xml(url):
response = requests.get(url)
if response.status_code == 200:
tree = ElementTree.fromstring(response.content)
return tree
else:
raise Exception(f"Failed to fetch XML data: {response.status_code}")
Step 2: Define the MetadataChangeEventClass
The MetadataChangeEventClass
is used to represent metadata changes. You will need to map the parsed XML data to this class.
from datahub.metadata.schema_classes import (
MetadataChangeEventClass,
DatasetSnapshotClass,
DatasetPropertiesClass,
AuditStampClass,
ChangeTypeClass
)
from datahub.emitter.mce_builder import make_dataset_urn
def convert_xml_to_mce(xml_tree):
# Extract relevant information from the XML tree
dataset_name = xml_tree.find('Service/Title').text
description = xml_tree.find('Service/Abstract').text
platform = "urn:li:dataPlatform:custom"
env = "PROD"
# Create the dataset URN
dataset_urn = make_dataset_urn(platform, dataset_name, env)
# Create the DatasetProperties aspect
dataset_properties = DatasetPropertiesClass(
description=description,
customProperties={"source": "XML API"}
)
# Create the AuditStamp
audit_stamp = AuditStampClass(
time=1640692800000, # Example timestamp
actor="urn:li:corpuser:ingestion"
)
# Create the DatasetSnapshot
dataset_snapshot = DatasetSnapshotClass(
urn=dataset_urn,
aspects=[dataset_properties]
)
# Create the MetadataChangeEvent
mce = MetadataChangeEventClass(
proposedSnapshot=dataset_snapshot
)
return mce
Step 3: Integrate Parsing and Conversion
Integrate the parsing and conversion logic into the get_workunits
method of your custom source.
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
class CustomXMLSource(Source):
def __init__(self, config, ctx: PipelineContext):
super().__init__(ctx)
self.config = config
@classmethod
def create(cls, config_dict, ctx):
config = CustomXMLSourceConfig.parse_obj(config_dict)
return cls(config, ctx)
def fetch_and_parse_xml(self, url):
response = requests.get(url)
if response.status_code == 200:
tree = ElementTree.fromstring(response.content)
return tree
else:
raise Exception(f"Failed to fetch XML data: {response.status_code}")
def convert_xml_to_mce(self, xml_tree):
# Extract relevant information from the XML tree
dataset_name = xml_tree.find('Service/Title').text
description = xml_tree.find('Service/Abstract').text
platform = "urn:li:dataPlatform:custom"
env = "PROD"
# Create the dataset URN
dataset_urn = make_dataset_urn(platform, dataset_name, env)
# Create the DatasetProperties aspect
dataset_properties = DatasetPropertiesClass(
description=description,
customProperties={"source": "XML API"}
)
# Create the AuditStamp
audit_stamp = AuditStampClass(
time=1640692800000, # Example timestamp
actor="urn:li:corpuser:ingestion"
)
# Create the DatasetSnapshot
dataset_snapshot = DatasetSnapshotClass(
urn=dataset_urn,
aspects=[dataset_properties]
)
# Create the MetadataChangeEvent
mce = MetadataChangeEventClass(
proposedSnapshot=dataset_snapshot
)
return mce
def get_workunits(self):
xml_tree = self.fetch_and_parse_xml(self.config.url)
mce = self.convert_xml_to_mce(xml_tree)
yield mce
def get_report(self):
return SourceReport()
Step 4: Run the Ingestion
Create an ingestion recipe and run the ingestion using the DataHub CLI.
source:
type: custom_source.CustomXMLSource
config:
url: '<http://wms.pcn.minambiente.it/ogc?map=/ms_ogc/WMS_v1.3/servizi-LiDAR/LIDAR_AREA_DOLOMITICA.map&service=wms&request=getCapabilities&version=1.3.0>'
sink:
type: datahub-rest
config:
server: '<http://datahub-gms:8080>'
Run the ingestion:
datahub ingest -c path/to/your/recipe.yaml
Example Code
Here is the complete example code for the custom source:
from pydantic import BaseModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.metadata.schema_classes import (
MetadataChangeEventClass,
DatasetSnapshotClass,
DatasetPropertiesClass,
AuditStampClass,
ChangeTypeClass
)
from datahub.emitter.mce_builder import make_dataset_urn
import requests
from xml.etree import ElementTree
class CustomXMLSourceConfig(BaseModel):
url: str
class CustomXMLSource(Source):
def __init__(self, config: CustomXMLSourceConfig, ctx: PipelineContext):
super().__init__(ctx)
self.config = config
@classmethod
def create(cls, config_dict, ctx):
config = CustomXMLSourceConfig.parse_obj(config_dict)
return cls(config, ctx)
def fetch_and_parse_xml(self, url):
response = requests.get(url)
if response.status_code == 200:
tree = ElementTree.fromstring(response.content)
return tree
else:
raise Exception(f"Failed to fetch XML data: {response.status_code}")
def convert_xml_to_mce(self, xml_tree):
# Extract relevant information from the XML tree
dataset_name = xml_tree.find('Service/Title').text
description = xml_tree.find('Service/Abstract').text
platform = "urn:li:dataPlatform:custom"
env = "PROD"
# Create the dataset URN
dataset_urn = make_dataset_urn(platform, dataset_name, env)
# Create the DatasetProperties aspect
dataset_properties = DatasetPropertiesClass(
description=description,
customProperties={"source": "XML API"}
)
# Create the AuditStamp
audit_stamp = AuditStampClass(
time=1640692800000, # Example timestamp
actor="urn:li:corpuser:ingestion"
)
# Create the DatasetSnapshot
dataset_snapshot = DatasetSnapshotClass(
urn=dataset_urn,
aspects=[dataset_properties]
)
# Create the MetadataChangeEvent
mce = MetadataChangeEventClass(
proposedSnapshot=dataset_snapshot
)
return mce
def get_workunits(self):
xml_tree = self.fetch_and_parse_xml(self.config.url)
mce = self.convert_xml_to_mce(xml_tree)
yield mce
def get_report(self):
return SourceReport()
Sources:
- <https://datahubproject.io/docs/metadata-ingestion/developing/|DataHub metadata ingestion development guide>
- <https://datahubproject.io/docs/0.13.1/how/add-custom-ingestion-source|Adding a Custom Ingestion Source>
- <Slack Slack Discussion>
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> Qhat does mysql do in datahub architecture?
MySQL (or other relational databases like PostgreSQL or MariaDB) plays a crucial role in the DataHub architecture as the primary storage backend for metadata. Here’s a detailed explanation of its role:
Metadata Storage
-
Primary Metadata Store: MySQL is used as the primary metadata store where the core metadata entities and aspects are persisted. This includes datasets, dashboards, data pipelines, and other metadata entities.
-
CRUD Operations: The Metadata Service in DataHub uses MySQL to perform Create, Read, Update, and Delete (CRUD) operations on metadata. This service exposes REST and GraphQL APIs for these operations.
-
Schema and Entity Storage: MySQL stores the schema definitions and instances of metadata entities. Each entity is identified by a unique identifier called a URN (Uniform Resource Name).
Metadata Change Log Stream (MCL)
-
Commit Events: When a metadata change is committed to MySQL, a commit event called a Metadata Change Log (MCL) is emitted. This event is sent over Kafka and can be subscribed to by external systems.
-
Real-time Reactions: The MCL stream allows external systems to react in real-time to changes in metadata. For example, an access control enforcer can react to changes in metadata to immediately lock down a dataset.
Metadata Index Applier (MAE Consumer Job)
-
Index Updates: The Metadata Change Log events are consumed by a job called the MAE Consumer Job, which applies the changes to the graph and search index accordingly. This ensures that the metadata changes are reflected in the search and graph indices.
-
Entity-agnostic Processing: The MAE Consumer Job is entity-agnostic and executes corresponding graph and search index builders based on the metadata change.
Metadata Query Serving
-
Primary-key Based Reads: Reads based on primary keys (e.g., getting schema metadata for a dataset based on the dataset URN) are routed to MySQL.
-
Secondary Index Reads: Reads based on secondary indices, full-text search queries, and complex graph queries (e.g., lineage) are routed to the search index (e.g., Elasticsearch).
Example Architecture Diagram
Here is a high-level system diagram for DataHub’s Serving Tier, which includes MySQL as the primary metadata store:
Relevant Documentation
- <https://datahubproject.io/docs/0.13.1/architecture/metadata-serving|DataHub Serving Architecture>: This document provides an overview of the serving architecture, including the role of MySQL in metadata storage.
- <https://github.com/datahub-project/datahub/blob/master/docs/components.md|DataHub Components Overview>: This document provides an overview of the various components in DataHub, including the Metadata Store.
Sources:
- <https://datahubproject.io/docs/0.13.1/architecture/metadata-serving|DataHub Serving Architecture>
- <https://github.com/datahub-project/datahub/blob/master/docs/components.md|DataHub Components Overview>
- <Slack Slack Discussion>
- <https://raw.githubusercontent.com/datahub-project/static-assets/main/imgs/datahub-serving.png|DataHub Serving Architecture> Image
0 button
0 button
Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> where ingested metadata are stored precisely?
In DataHub, ingested metadata is stored in multiple components, each serving a specific purpose. Here’s a detailed breakdown of where the ingested metadata is stored:
1. Relational Database (MySQL, PostgreSQL, etc.)
- Primary Metadata Store: The core metadata entities and aspects are stored in a relational database like MySQL, PostgreSQL, or MariaDB. This includes datasets, dashboards, data pipelines, and other metadata entities.
- Tables: The metadata is stored in specific tables such as
metadata_aspect_v2
, which contains the ingested aspects for all entities.
2. Elasticsearch
- Search Index: Elasticsearch is used to store metadata for search and indexing purposes. This allows for efficient full-text search queries and secondary index-based reads.
- Time-Series Data: Some time-series and telemetry aspects of the metadata are also stored in Elasticsearch, which might grow over time with the number of entities.
3. Kafka
- Metadata Change Log (MCL): Kafka is used to emit commit events (Metadata Change Log) when a metadata change has been successfully committed to the relational database. These events can be subscribed to by external systems for real-time reactions to metadata changes.
- Metadata Change Proposal (MCP): Kafka can also be used to ingest metadata via Metadata Change Proposals, which are then processed and stored in the relational database.
4. Graph Database (Optional)
- Graph Index: In some deployments, a graph database like Neo4j is used to store metadata for complex graph queries such as lineage. This is optional and depends on the specific deployment setup.
Example Architecture Diagram
Here is a high-level system diagram for DataHub’s Serving Tier, which includes the various storage components:
Detailed Storage Breakdown
-
Relational Database (MySQL, PostgreSQL, etc.):
- Entities and Aspects: Core metadata entities and their aspects are stored here.
- CRUD Operations: All Create, Read, Update, and Delete operations on metadata are performed on this database.
-
Elasticsearch:
- Search Index: Metadata is indexed here for efficient search operations.
- Time-Series Data: Telemetry and time-series data related to metadata are stored here.
-
Kafka:
- Metadata Change Log (MCL): Commit events are emitted to Kafka for real-time processing.
- Metadata Change Proposal (MCP): Metadata can be ingested via Kafka and then processed.
-
Graph Database (Optional):
- Graph Index: Used for storing metadata for complex graph queries like lineage.
Relevant Documentation
- <https://datahubproject.io/docs/0.13.1/architecture/metadata-serving|DataHub Serving Architecture>: This document provides an overview of the serving architecture, including the role of various storage components.