This message was deleted.
Hey there! Make sure your message includes the following information if relevant, so we can help more effectively!
- Are you using UI or CLI for ingestion?
- Which DataHub version are you using? (e.g. 0.12.0)
- What data source(s) are you integrating with DataHub? (e.g. BigQuery)
<@U06TM7M40RK> Can you help with the questions in this thread?
Please continue to tag RunLLM (e.g <@U06TM7M40RK>) to have it continue generating responses! We’re experimenting with a new AI assistant for questions in the <#CUMUWQU66|> channel to help you track down helpful resources more quickly. Keep in mind that responses may not always be spot-on; feel free to ask it follow-up questions and don’t forget to provide its responses a or so other folks in the Community know if you’re still looking for help on the topic and to help us improve the assistant!
To help you effectively, could you please provide the following information:
- Are you using the UI or CLI for ingestion?
- Which DataHub version are you using? (e.g., 0.12.0)?
- What data source(s) are you integrating with DataHub (e.g., BigQuery)?
This information will help me provide you with the most relevant guidance and resources.
Based on your description, you want to create a custom ingestion source that reads from a CSV file and represents the Dataroot
as an entity similar to a database. Here’s a high-level approach to achieve this:
- Define the Custom Source Configuration: Create a configuration model for your custom source using Pydantic.
- Implement the Custom Source Class: Create a class for your custom source that extends the
Source
class from DataHub. - Parse the CSV Data: Implement the logic to read and parse the CSV data.
- Create Metadata Entities: Map the parsed data to DataHub metadata entities, such as
Dataset
forDataroot
,Datafolder
, andDatafield
. - Emit MetadataChangeEvent: Use the
MetadataChangeEventClass
to emit metadata changes to DataHub.
Here’s a detailed example:
Step 1: Define the Custom Source Configuration
from pydantic import BaseModel
class CustomCSVSourceConfig(BaseModel):
file_path: str
Step 2: Implement the Custom Source Class
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.source import Source, SourceReport
from datahub.metadata.schema_classes import MetadataChangeEventClass, DatasetSnapshotClass, DatasetPropertiesClass, AuditStampClass
from datahub.emitter.mce_builder import make_dataset_urn
import csv
class CustomCSVSource(Source):
def __init__(self, config: CustomCSVSourceConfig, ctx: PipelineContext):
super().__init__(ctx)
self.config = config
@classmethod
def create(cls, config_dict, ctx):
config = CustomCSVSourceConfig.parse_obj(config_dict)
return cls(config, ctx)
def get_workunits(self):
with open(self.config.file_path, mode='r') as file:
csv_reader = csv.DictReader(file)
for row in csv_reader:
dataroot = row['Dataroot']
datafolder = row['Datafolder']
datafield = row['Datafield']
# Create URNs
dataroot_urn = make_dataset_urn("custom", dataroot, "PROD")
datafolder_urn = make_dataset_urn("custom", f"{dataroot}.{datafolder}", "PROD")
datafield_urn = make_dataset_urn("custom", f"{dataroot}.{datafolder}.{datafield}", "PROD")
# Create DatasetProperties
dataset_properties = DatasetPropertiesClass(
description=f"Datafield {datafield} in folder {datafolder} under root {dataroot}",
customProperties={"source": "CSV"}
)
# Create AuditStamp
audit_stamp = AuditStampClass(
time=1640692800000, # Example timestamp
actor="urn:li:corpuser:ingestion"
)
# Create DatasetSnapshot
dataset_snapshot = DatasetSnapshotClass(
urn=datafield_urn,
aspects=[dataset_properties]
)
# Create MetadataChangeEvent
mce = MetadataChangeEventClass(
proposedSnapshot=dataset_snapshot
)
yield mce
def get_report(self):
return SourceReport()
Step 3: Use the Custom Source in an Ingestion Recipe
Create an ingestion recipe and run the ingestion using the DataHub CLI.
source:
type: custom_source.CustomCSVSource
config:
file_path: '/path/to/your/csvfile.csv'
sink:
type: datahub-rest
config:
server: '<http://localhost:8080>'
Run the ingestion:
datahub ingest -c path/to/your/recipe.yaml
This setup will allow you to ingest the Dataroot
as an entity similar to a database, and you can retrieve and filter these entities via the DataHub API or UI.
Would you like more detailed steps or specific configurations for any of these points?
Sources:
- <https://datahubproject.io/docs/metadata-ingestion/developing/|DataHub metadata ingestion development guide>
- <https://datahubproject.io/docs/0.13.1/how/add-custom-ingestion-source|Adding a Custom Ingestion Source> 0 button 0 button Hint: Mention @runllm in the thread for followups.
<@U06TM7M40RK> can this be registered as a custom source under name “ExternalSource”