DataHub: Implementing Custom XML Metadata Ingestion via MAE Consumer

Original Slack Thread

Hi all, I’m evaluating datahub as Data Governance tool to build a simple portal and share open information with citizens. I need to cenncect datahub to an API endpoint to get metadata in xml format. I’ve already tried with OpenApi UI ingestion in datahub 0.13.3 without successes, I get generic 403 error.
part from this error I would like to understand if to obtain the metadata in xml format from an API endpoint I have to implement something custom or there are native features that allow me to do it quickly.

Hey there! :wave: Make sure your message includes the following information if relevant, so we can help more effectively!

  1. Are you using UI or CLI for ingestion?
  2. Which DataHub version are you using? (e.g. 0.12.0)
  3. What data source(s) are you integrating with DataHub? (e.g. BigQuery)

<@U06TM7M40RK> Can you help with the questions in this thread?


:information_source: We’re experimenting with a new AI assistant for questions in the <#CUMUWQU66> channel to help you track down helpful resources more quickly. Keep in mind that responses may not always be spot-on; feel free to ask it follow-up questions and don’t forget to provide its responses a :thumbsup: or :thumbsdown: so other folks in the Community know if you’re still looking for help on the topic!

I’m not able to find any data sources to answer your question.

Could you provide more details on the specific API endpoint you are trying to connect to and the exact steps you have taken so far?

If this doesn’t help, please reach out to the DataHub support team for further assistance. :+1: 0 button :-1: 0 button :bulb: Hint: Mention @runllm in the thread for followups.

<@U06TM7M40RK> here a link containing the metadata in xml I want to ingest into datahub:http://wms.pcn.minambiente.it/ogc?map=/ms_ogc/WMS_v1.3/servizi-LiDAR/LIDAR_AREA_DOLOMITICA.map&amp;service=wms&amp;request=getCapabilities&amp;version=1.3.0

I’m not able to find any data sources to answer your question.

Could you specify whether you are using the UI or CLI for ingestion and any specific configurations you have tried?

If this doesn’t help, please reach out to the DataHub support team for further assistance. :+1: 0 button :-1: 0 button :bulb: Hint: Mention @runllm in the thread for followups.

<@U06TM7M40RK> I’m using UI and wrote this recipe:
type: openapi
config:
name: test_endpoint
url: '<https://test_endpoint.com/>'
swagger_file: classicapi/doc/swagger.json

sink:
type: "datahub-rest"
config:
server: '<http://datahub-gms-1:8080>'
received this error: Exception: Unable to retrieve
Im’m using docker quickstart on docker to ttry ingest this source via openapi connector

Based on the information provided, it seems like you are trying to ingest metadata from an API endpoint using the OpenAPI connector in DataHub. However, the OpenAPI connector expects the metadata to be in OpenAPI (Swagger) format, not XML.

To ingest metadata in XML format, you would likely need to implement a custom ingestion source. DataHub does not natively support XML ingestion out of the box. Here are the steps you can follow to create a custom ingestion source:

  1. Create a Custom Source: You will need to create a custom ingestion source that can parse the XML metadata and convert it into a format that DataHub can understand.

  2. Develop the Custom Source: Follow the <https://datahubproject.io/docs/metadata-ingestion/developing/|DataHub metadata ingestion development guide> to develop and test your custom source.

  3. Use the Custom Source in a Recipe: Once your custom source is developed, you can use it in an ingestion recipe similar to how you would use any other source.

Here is a high-level example of what the custom source might look like:

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.metadata.schema_classes import MetadataChangeEventClass

class CustomXMLSource(Source):
    def __init__(self, config, ctx: PipelineContext):
        super().__init__(ctx)
        self.config = config

    @classmethod
    def create(cls, config_dict, ctx):
        config = CustomXMLSourceConfig.parse_obj(config_dict)
        return cls(config, ctx)

    def get_workunits(self):
        # Implement your XML parsing logic here
        # Convert the parsed XML data into MetadataChangeEventClass objects
        mce = MetadataChangeEventClass(...)
        yield mce

    def get_report(self):
        return SourceReport()

# Define the configuration for your custom source
class CustomXMLSourceConfig(ConfigModel):
    url: str

# Register the custom source
source = CustomXMLSource.create(config_dict, ctx)

You can then use this custom source in your ingestion recipe:

source:
  type: custom_xml
  config:
    url: '<http://wms.pcn.minambiente.it/ogc?map=/ms_ogc/WMS_v1.3/servizi-LiDAR/LIDAR_AREA_DOLOMITICA.map&service=wms&request=getCapabilities&version=1.3.0>'
sink:
  type: datahub-rest
  config:
    server: 'http://datahub-gms:8080'

If you need more detailed guidance on customizing the ingestion source, please let me know!

Sources:

<@U06TM7M40RK> Hi want to go into detail for point one: Create a Custom Source. Please give me more details on how to implement a custom source

To create a custom ingestion source for DataHub, you will need to follow a series of steps to define, implement, and integrate your custom source. Here’s a detailed breakdown of the process:

Step 1: Set Up Your Development Environment

  1. Clone the DataHub Repository: Start by cloning the DataHub repository to your local machine.

    git clone https://github.com/datahub-project/datahub.git
    cd datahub
    
  2. Set Up a Virtual Environment: Create and activate a virtual environment.

    python3 -m venv venv
    source venv/bin/activate
    
  3. Install Dependencies: Install the necessary dependencies.

    pip install -r requirements.txt
    

Step 2: Create Your Custom Source

  1. Define the Configuration Model: Create a configuration model for your custom source using Pydantic.

    from pydantic import BaseModel
    
    class CustomXMLSourceConfig(BaseModel):
        url: str
    
  2. Implement the Custom Source Class: Create a class for your custom source that extends the Source class from DataHub.

    from datahub.ingestion.api.common import PipelineContext
    from datahub.ingestion.api.source import Source, SourceReport
    from datahub.metadata.schema_classes import MetadataChangeEventClass
    
    class CustomXMLSource(Source):
        def __init__(self, config: CustomXMLSourceConfig, ctx: PipelineContext):
            super().__init__(ctx)
            self.config = config
    
        @classmethod
        def create(cls, config_dict, ctx):
            config = CustomXMLSourceConfig.parse_obj(config_dict)
            return cls(config, ctx)
    
        def get_workunits(self):
            # Implement your XML parsing logic here
            # Convert the parsed XML data into MetadataChangeEventClass objects
            mce = MetadataChangeEventClass(
                # Populate with your XML metadata
            )
            yield mce
    
        def get_report(self):
            return SourceReport()
    
  3. Parse the XML Data: Implement the logic to fetch and parse the XML data from the provided URL.

    import requests
    from xml.etree import ElementTree
    
    def fetch_and_parse_xml(url):
        response = requests.get(url)
        if response.status_code == 200:
            tree = ElementTree.fromstring(response.content)
            # Parse the XML tree and extract metadata
            return tree
        else:
            raise Exception(f"Failed to fetch XML data: {response.status_code}")
    
  4. Convert XML to MetadataChangeEvent: Convert the parsed XML data into MetadataChangeEventClass objects.

    def convert_xml_to_mce(xml_tree):
        # Implement the conversion logic
        mce = MetadataChangeEventClass(
            # Populate with your XML metadata
        )
        return mce
    
  1. Integrate Parsing and Conversion: Integrate the parsing and conversion logic into the get_workunits method.
    def get_workunits(self):
        xml_tree = fetch_and_parse_xml(self.config.url)
        mce = convert_xml_to_mce(xml_tree)
        yield mce
    

Step 3: Package and Install the Custom Source

  1. Create a Python Package: Package your custom source as a Python package.

    mkdir custom_source
    touch custom_source/__init__.py
    
  2. Add Setup Script: Create a setup.py script to package your custom source.

    from setuptools import setup, find_packages
    
    setup(
        name='custom_source',
        version='0.1',
        packages=find_packages(),
        install_requires=[
            'datahub',
            'requests',
        ],
    )
    
  3. Build and Install the Package: Build and install the package in your environment.

    python setup.py sdist bdist_wheel
    pip install dist/custom_source-0.1-py3-none-any.whl
    

Step 4: Use the Custom Source in an Ingestion Recipe

  1. Create an Ingestion Recipe: Create a YAML file for your ingestion recipe.

    source:
      type: custom_source.CustomXMLSource
      config:
        url: '<http://wms.pcn.minambiente.it/ogc?map=/ms_ogc/WMS_v1.3/servizi-LiDAR/LIDAR_AREA_DOLOMITICA.map&amp;service=wms&amp;request=getCapabilities&amp;version=1.3.0>'
    sink:
      type: datahub-rest
      config:
        server: '<http://datahub-gms:8080>'
    
  2. Run the Ingestion: Use the DataHub CLI to run the ingestion.

    datahub ingest -c path/to/your/recipe.yaml
    

Example Code

Here is a complete example of the custom source implementation:

from pydantic import BaseModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.metadata.schema_classes import MetadataChangeEventClass
import requests
from xml.etree import ElementTree

class CustomXMLSourceConfig(BaseModel):
    url: str

class CustomXMLSource(Source):
    def __init__(self, config: CustomXMLSourceConfig, ctx: PipelineContext):
        super().__init__(ctx)
        self.config = config

    @classmethod
    def create(cls, config_dict, ctx):
        config = CustomXMLSourceConfig.parse_obj(config_dict)
        return cls(config, ctx)

    def fetch_and_parse_xml(self, url):
        response = requests.get(url)
        if response.status_code == 200:
            tree = ElementTree.fromstring(response.content)
            return tree
        else:
            raise Exception(f"Failed to fetch XML data: {response.status_code}")

    def convert_xml_to_mce(self, xml_tree):
        # Implement the conversion logic
        mce = MetadataChangeEventClass(
            # Populate with your XML metadata
        )
        return mce

    def get_workunits(self):
        xml_tree = self.fetch_and_parse_xml(self.config.url)

        mce = self.convert_xml_to_mce(xml_tree)
        yield mce

    def get_report(self):
        return SourceReport()

Sources:

<@U06TM7M40RK> in depth explanation on how to populate MetadataChangeEventClass with the xml metadata with a simple example

To populate the MetadataChangeEventClass with XML metadata, you need to parse the XML data and map it to the appropriate fields in the MetadataChangeEventClass. Here’s a step-by-step guide with a simple example:

Step 1: Parse the XML Data

First, you need to fetch and parse the XML data. For this example, let’s assume the XML data contains information about datasets.

import requests
from xml.etree import ElementTree

def fetch_and_parse_xml(url):
    response = requests.get(url)
    if response.status_code == 200:
        tree = ElementTree.fromstring(response.content)
        return tree
    else:
        raise Exception(f"Failed to fetch XML data: {response.status_code}")

Step 2: Define the MetadataChangeEventClass

The MetadataChangeEventClass is used to represent metadata changes. You will need to map the parsed XML data to this class.

from datahub.metadata.schema_classes import (
    MetadataChangeEventClass,
    DatasetSnapshotClass,
    DatasetPropertiesClass,
    AuditStampClass,
    ChangeTypeClass
)
from datahub.emitter.mce_builder import make_dataset_urn

def convert_xml_to_mce(xml_tree):
    # Extract relevant information from the XML tree
    dataset_name = xml_tree.find('Service/Title').text
    description = xml_tree.find('Service/Abstract').text
    platform = "urn:li:dataPlatform:custom"
    env = "PROD"
    
    # Create the dataset URN
    dataset_urn = make_dataset_urn(platform, dataset_name, env)
    
    # Create the DatasetProperties aspect
    dataset_properties = DatasetPropertiesClass(
        description=description,
        customProperties={"source": "XML API"}
    )
    
    # Create the AuditStamp
    audit_stamp = AuditStampClass(
        time=1640692800000,  # Example timestamp
        actor="urn:li:corpuser:ingestion"
    )
    
    # Create the DatasetSnapshot
    dataset_snapshot = DatasetSnapshotClass(
        urn=dataset_urn,
        aspects=[dataset_properties]
    )
    
    # Create the MetadataChangeEvent
    mce = MetadataChangeEventClass(
        proposedSnapshot=dataset_snapshot
    )
    
    return mce

Step 3: Integrate Parsing and Conversion

Integrate the parsing and conversion logic into the get_workunits method of your custom source.

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport

class CustomXMLSource(Source):
    def __init__(self, config, ctx: PipelineContext):
        super().__init__(ctx)
        self.config = config

    @classmethod
    def create(cls, config_dict, ctx):
        config = CustomXMLSourceConfig.parse_obj(config_dict)
        return cls(config, ctx)

    def fetch_and_parse_xml(self, url):
        response = requests.get(url)
        if response.status_code == 200:
            tree = ElementTree.fromstring(response.content)
            return tree
        else:

            raise Exception(f"Failed to fetch XML data: {response.status_code}")

    def convert_xml_to_mce(self, xml_tree):
        # Extract relevant information from the XML tree
        dataset_name = xml_tree.find('Service/Title').text
        description = xml_tree.find('Service/Abstract').text
        platform = "urn:li:dataPlatform:custom"
        env = "PROD"
        
        # Create the dataset URN
        dataset_urn = make_dataset_urn(platform, dataset_name, env)
        
        # Create the DatasetProperties aspect
        dataset_properties = DatasetPropertiesClass(
            description=description,
            customProperties={"source": "XML API"}
        )
        
        # Create the AuditStamp
        audit_stamp = AuditStampClass(
            time=1640692800000,  # Example timestamp
            actor="urn:li:corpuser:ingestion"
        )
        
        # Create the DatasetSnapshot
        dataset_snapshot = DatasetSnapshotClass(
            urn=dataset_urn,
            aspects=[dataset_properties]
        )
        
        # Create the MetadataChangeEvent
        mce = MetadataChangeEventClass(
            proposedSnapshot=dataset_snapshot
        )
        
        return mce

    def get_workunits(self):
        xml_tree = self.fetch_and_parse_xml(self.config.url)
        mce = self.convert_xml_to_mce(xml_tree)
        yield mce

    def get_report(self):
        return SourceReport()

Step 4: Run the Ingestion

Create an ingestion recipe and run the ingestion using the DataHub CLI.

source:
  type: custom_source.CustomXMLSource
  config:
    url: '<http://wms.pcn.minambiente.it/ogc?map=/ms_ogc/WMS_v1.3/servizi-LiDAR/LIDAR_AREA_DOLOMITICA.map&amp;service=wms&amp;request=getCapabilities&amp;version=1.3.0>'
sink:
  type: datahub-rest
  config:
    server: '<http://datahub-gms:8080>'

Run the ingestion:

datahub ingest -c path/to/your/recipe.yaml

Example Code

Here is the complete example code for the custom source:

from pydantic import BaseModel
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.metadata.schema_classes import (
    MetadataChangeEventClass,
    DatasetSnapshotClass,
    DatasetPropertiesClass,
    AuditStampClass,
    ChangeTypeClass
)
from datahub.emitter.mce_builder import make_dataset_urn
import requests
from xml.etree import ElementTree

class CustomXMLSourceConfig(BaseModel):
    url: str

class CustomXMLSource(Source):
    def __init__(self, config: CustomXMLSourceConfig, ctx: PipelineContext):
        super().__init__(ctx)
        self.config = config

    @classmethod
    def create(cls, config_dict, ctx):
        config = CustomXMLSourceConfig.parse_obj(config_dict)
        return cls(config, ctx)

    def fetch_and_parse_xml(self, url):
        response = requests.get(url)
        if response.status_code == 200:
            tree = ElementTree.fromstring(response.content)

            return tree
        else:
            raise Exception(f"Failed to fetch XML data: {response.status_code}")

    def convert_xml_to_mce(self, xml_tree):
        # Extract relevant information from the XML tree
        dataset_name = xml_tree.find('Service/Title').text
        description = xml_tree.find('Service/Abstract').text
        platform = "urn:li:dataPlatform:custom"
        env = "PROD"
        
        # Create the dataset URN
        dataset_urn = make_dataset_urn(platform, dataset_name, env)
        
        # Create the DatasetProperties aspect
        dataset_properties = DatasetPropertiesClass(
            description=description,
            customProperties={"source": "XML API"}
        )
        
        # Create the AuditStamp
        audit_stamp = AuditStampClass(
            time=1640692800000,  # Example timestamp
            actor="urn:li:corpuser:ingestion"
        )
        
        # Create the DatasetSnapshot
        dataset_snapshot = DatasetSnapshotClass(
            urn=dataset_urn,
            aspects=[dataset_properties]
        )
        
        # Create the MetadataChangeEvent
        mce = MetadataChangeEventClass(
            proposedSnapshot=dataset_snapshot
        )
        
        return mce

    def get_workunits(self):
        xml_tree = self.fetch_and_parse_xml(self.config.url)
        mce = self.convert_xml_to_mce(xml_tree)
        yield mce

    def get_report(self):
        return SourceReport()

Sources:

<@U06TM7M40RK> Qhat does mysql do in datahub architecture?

MySQL (or other relational databases like PostgreSQL or MariaDB) plays a crucial role in the DataHub architecture as the primary storage backend for metadata. Here’s a detailed explanation of its role:

Metadata Storage

  1. Primary Metadata Store: MySQL is used as the primary metadata store where the core metadata entities and aspects are persisted. This includes datasets, dashboards, data pipelines, and other metadata entities.

  2. CRUD Operations: The Metadata Service in DataHub uses MySQL to perform Create, Read, Update, and Delete (CRUD) operations on metadata. This service exposes REST and GraphQL APIs for these operations.

  3. Schema and Entity Storage: MySQL stores the schema definitions and instances of metadata entities. Each entity is identified by a unique identifier called a URN (Uniform Resource Name).

Metadata Change Log Stream (MCL)

  1. Commit Events: When a metadata change is committed to MySQL, a commit event called a Metadata Change Log (MCL) is emitted. This event is sent over Kafka and can be subscribed to by external systems.

  2. Real-time Reactions: The MCL stream allows external systems to react in real-time to changes in metadata. For example, an access control enforcer can react to changes in metadata to immediately lock down a dataset.

Metadata Index Applier (MAE Consumer Job)

  1. Index Updates: The Metadata Change Log events are consumed by a job called the MAE Consumer Job, which applies the changes to the graph and search index accordingly. This ensures that the metadata changes are reflected in the search and graph indices.

  2. Entity-agnostic Processing: The MAE Consumer Job is entity-agnostic and executes corresponding graph and search index builders based on the metadata change.

Metadata Query Serving

  1. Primary-key Based Reads: Reads based on primary keys (e.g., getting schema metadata for a dataset based on the dataset URN) are routed to MySQL.

  2. Secondary Index Reads: Reads based on secondary indices, full-text search queries, and complex graph queries (e.g., lineage) are routed to the search index (e.g., Elasticsearch).

Example Architecture Diagram

Here is a high-level system diagram for DataHub’s Serving Tier, which includes MySQL as the primary metadata store:

Relevant Documentation

Sources:

<@U06TM7M40RK> where ingested metadata are stored precisely?

In DataHub, ingested metadata is stored in multiple components, each serving a specific purpose. Here’s a detailed breakdown of where the ingested metadata is stored:

1. Relational Database (MySQL, PostgreSQL, etc.)

  • Primary Metadata Store: The core metadata entities and aspects are stored in a relational database like MySQL, PostgreSQL, or MariaDB. This includes datasets, dashboards, data pipelines, and other metadata entities.
  • Tables: The metadata is stored in specific tables such as metadata_aspect_v2, which contains the ingested aspects for all entities.

2. Elasticsearch

  • Search Index: Elasticsearch is used to store metadata for search and indexing purposes. This allows for efficient full-text search queries and secondary index-based reads.
  • Time-Series Data: Some time-series and telemetry aspects of the metadata are also stored in Elasticsearch, which might grow over time with the number of entities.

3. Kafka

  • Metadata Change Log (MCL): Kafka is used to emit commit events (Metadata Change Log) when a metadata change has been successfully committed to the relational database. These events can be subscribed to by external systems for real-time reactions to metadata changes.
  • Metadata Change Proposal (MCP): Kafka can also be used to ingest metadata via Metadata Change Proposals, which are then processed and stored in the relational database.

4. Graph Database (Optional)

  • Graph Index: In some deployments, a graph database like Neo4j is used to store metadata for complex graph queries such as lineage. This is optional and depends on the specific deployment setup.

Example Architecture Diagram

Here is a high-level system diagram for DataHub’s Serving Tier, which includes the various storage components:

Detailed Storage Breakdown

  1. Relational Database (MySQL, PostgreSQL, etc.):

    • Entities and Aspects: Core metadata entities and their aspects are stored here.
    • CRUD Operations: All Create, Read, Update, and Delete operations on metadata are performed on this database.
  2. Elasticsearch:

    • Search Index: Metadata is indexed here for efficient search operations.
    • Time-Series Data: Telemetry and time-series data related to metadata are stored here.
  3. Kafka:

    • Metadata Change Log (MCL): Commit events are emitted to Kafka for real-time processing.
    • Metadata Change Proposal (MCP): Metadata can be ingested via Kafka and then processed.
  4. Graph Database (Optional):

    • Graph Index: Used for storing metadata for complex graph queries like lineage.

Relevant Documentation