Ingesting a Table with Columns into DataHub using a Python Emitter

Original Slack Thread

<@U06TM7M40RK> provide an example of a Python Emitter ingesting a table (not a dataset, but a table) with its columns

To ingest a table with its columns into DataHub using a Python Emitter, you can use the DataHub Python SDK to create and send Metadata Change Proposals (MCPs) to DataHub. Here’s a step-by-step example:

  1. Install the DataHub Python SDK: Ensure you have the acryl-datahub package installed.

    pip install acryl-datahub[datahub-rest]
    
  2. Initialize the REST Emitter: Set up the REST emitter to communicate with your DataHub instance.

  3. Define the Table and Columns: Create the schema metadata for your table, including its columns.

  4. Emit the Metadata: Use the emitter to send the metadata to DataHub.

Here’s a detailed example:

import datahub.emitter.mce_builder as builder
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
    SchemaMetadataClass,
    SchemaFieldClass,
    SchemaFieldDataTypeClass,
    StringTypeClass,
    AuditStampClass,
    OtherSchemaClass
)

# Initialize the DataHub emitter
emitter = DatahubRestEmitter(gms_server="http://localhost:8080")

# Define the table URN
table_urn = builder.make_dataset_urn(platform="mysql", name="my_database.my_table", env="PROD")

# Define the schema fields (columns)
schema_fields = [
    SchemaFieldClass(
        fieldPath="column1",
        type=SchemaFieldDataTypeClass(type=StringTypeClass()),
        nativeDataType="VARCHAR(50)",
        description="Description for column1",
        lastModified=AuditStampClass(time=1640692800000, actor="urn:li:corpuser:ingestion")
    ),
    SchemaFieldClass(
        fieldPath="column2",
        type=SchemaFieldDataTypeClass(type=StringTypeClass()),
        nativeDataType="VARCHAR(100)",
        description="Description for column2",
        lastModified=AuditStampClass(time=1640692800000, actor="urn:li:corpuser:ingestion")
    )
]

# Create the schema metadata
schema_metadata = SchemaMetadataClass(
    schemaName="my_table_schema",
    platform=builder.make_data_platform_urn("mysql"),
    version=0,
    hash="",
    platformSchema=OtherSchemaClass(rawSchema="__insert raw schema here__"),
    fields=schema_fields,
    lastModified=AuditStampClass(time=1640692800000, actor="urn:li:corpuser:ingestion")
)

# Create a MetadataChangeProposalWrapper object
from datahub.emitter.mcp import MetadataChangeProposalWrapper

mcp = MetadataChangeProposalWrapper(
    entityUrn=table_urn,
    aspect=schema_metadata
)

# Emit the metadata
emitter.emit(mcp)

This code snippet creates a table schema and emits it to DataHub using the REST emitter. You can modify the table_urn, schema_fields, and other parameters as needed for your specific use case.

Would you like more detailed information on any of these steps?

Sources:

Hi <@U08H0MDFJTH>, how did my answer work out for you? Need any more assistance? You can also tag <@U07FV3UHE4D> to let your fellow community members know you’re still looking for guidance.

Hi :slight_smile: ! Did this interaction resolve your issue or do you still require assistance ?

Please tag @ RunLLM if you still need further assistance !

Or tag @Datahub Community Support if you require a person to take a look :slight_smile:

Hope this helps ! ><