Creating a Custom Ingestion Source in DataHub with CSV Data

Original Slack Thread

This message was deleted.

Hey there! :wave: Make sure your message includes the following information if relevant, so we can help more effectively!

  1. Are you using UI or CLI for ingestion?
  2. Which DataHub version are you using? (e.g. 0.12.0)
  3. What data source(s) are you integrating with DataHub? (e.g. BigQuery)

<@U06TM7M40RK> Can you help with the questions in this thread?


:information_source: Please continue to tag RunLLM (e.g <@U06TM7M40RK>) to have it continue generating responses! We’re experimenting with a new AI assistant for questions in the <#CUMUWQU66|> channel to help you track down helpful resources more quickly. Keep in mind that responses may not always be spot-on; feel free to ask it follow-up questions and don’t forget to provide its responses a :+1: or :-1: so other folks in the Community know if you’re still looking for help on the topic and to help us improve the assistant!

To help you effectively, could you please provide the following information:

  1. Are you using the UI or CLI for ingestion?
  2. Which DataHub version are you using? (e.g., 0.12.0)?
  3. What data source(s) are you integrating with DataHub (e.g., BigQuery)?

This information will help me provide you with the most relevant guidance and resources.

Based on your description, you want to create a custom ingestion source that reads from a CSV file and represents the Dataroot as an entity similar to a database. Here’s a high-level approach to achieve this:

  1. Define the Custom Source Configuration: Create a configuration model for your custom source using Pydantic.
  2. Implement the Custom Source Class: Create a class for your custom source that extends the Source class from DataHub.
  3. Parse the CSV Data: Implement the logic to read and parse the CSV data.
  4. Create Metadata Entities: Map the parsed data to DataHub metadata entities, such as Dataset for Dataroot, Datafolder, and Datafield.
  5. Emit MetadataChangeEvent: Use the MetadataChangeEventClass to emit metadata changes to DataHub.

Here’s a detailed example:

Step 1: Define the Custom Source Configuration

from pydantic import BaseModel

class CustomCSVSourceConfig(BaseModel):
    file_path: str

Step 2: Implement the Custom Source Class

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.metadata.schema_classes import MetadataChangeEventClass, DatasetSnapshotClass, DatasetPropertiesClass, AuditStampClass
from datahub.emitter.mce_builder import make_dataset_urn
import csv

class CustomCSVSource(Source):
    def __init__(self, config: CustomCSVSourceConfig, ctx: PipelineContext):
        super().__init__(ctx)
        self.config = config

    @classmethod
    def create(cls, config_dict, ctx):
        config = CustomCSVSourceConfig.parse_obj(config_dict)
        return cls(config, ctx)

    def get_workunits(self):
        with open(self.config.file_path, mode='r') as file:
            csv_reader = csv.DictReader(file)
            for row in csv_reader:
                dataroot = row['Dataroot']
                datafolder = row['Datafolder']
                datafield = row['Datafield']
                
                # Create URNs
                dataroot_urn = make_dataset_urn("custom", dataroot, "PROD")
                datafolder_urn = make_dataset_urn("custom", f"{dataroot}.{datafolder}", "PROD")
                datafield_urn = make_dataset_urn("custom", f"{dataroot}.{datafolder}.{datafield}", "PROD")
                
                # Create DatasetProperties
                dataset_properties = DatasetPropertiesClass(
                    description=f"Datafield {datafield} in folder {datafolder} under root {dataroot}",
                    customProperties={"source": "CSV"}
                )
                
                # Create AuditStamp
                audit_stamp = AuditStampClass(
                    time=1640692800000,  # Example timestamp
                    actor="urn:li:corpuser:ingestion"
                )
                
                # Create DatasetSnapshot
                dataset_snapshot = DatasetSnapshotClass(
                    urn=datafield_urn,
                    aspects=[dataset_properties]
                )
                
                # Create MetadataChangeEvent
                mce = MetadataChangeEventClass(
                    proposedSnapshot=dataset_snapshot
                )
                
                yield mce

    def get_report(self):
        return SourceReport()

Step 3: Use the Custom Source in an Ingestion Recipe

Create an ingestion recipe and run the ingestion using the DataHub CLI.

source:
  type: custom_source.CustomCSVSource
  config:
    file_path: '/path/to/your/csvfile.csv'
sink:
  type: datahub-rest
  config:
    server: '<http://localhost:8080>'

Run the ingestion:

datahub ingest -c path/to/your/recipe.yaml

This setup will allow you to ingest the Dataroot as an entity similar to a database, and you can retrieve and filter these entities via the DataHub API or UI.

Would you like more detailed steps or specific configurations for any of these points?

Sources:

<@U06TM7M40RK> can this be registered as a custom source under name “ExternalSource”