Generating a Python script to ingest a dataset using the DataHub SDK

Original Slack Thread

<@U06TM7M40RK> can you generate me a Python script that ingests a dataset by using the SDK?

To ingest a dataset using the DataHub Python SDK, you can follow the steps outlined below. This example demonstrates how to create a dataset schema and emit it to DataHub using the REST emitter.

Step-by-Step Python Script

  1. Install the DataHub Python SDK: Ensure you have the acryl-datahub package installed.
  2. Initialize the REST Emitter: Set up the REST emitter to communicate with your DataHub instance.
  3. Define the Dataset Schema: Create the schema metadata for your dataset.
  4. Emit the Metadata: Use the emitter to send the metadata to DataHub.

Here’s a complete Python script:

# Step 1: Install the DataHub Python SDK
# pip install acryl-datahub[datahub-rest]

# Step 2: Initialize the REST Emitter
import datahub.emitter.mce_builder as builder
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
    SchemaMetadataClass,
    SchemaFieldClass,
    SchemaFieldDataTypeClass,
    StringTypeClass,
    AuditStampClass,
    OtherSchemaClass
)

# Initialize the DataHub emitter
emitter = DatahubRestEmitter(gms_server="<http://localhost:8080>")

# Step 3: Define the Dataset Schema
# Define the dataset URN
dataset_urn = builder.make_dataset_urn(platform="hive", name="realestate_db.sales", env="PROD")

# Define the schema fields
schema_fields = [
    SchemaFieldClass(
        fieldPath="address.zipcode",
        type=SchemaFieldDataTypeClass(type=StringTypeClass()),
        nativeDataType="VARCHAR(50)",
        description="This is the zipcode of the address. Specified using extended form and limited to addresses in the United States",
        lastModified=AuditStampClass(time=1640692800000, actor="urn:li:corpuser:ingestion")
    ),
    SchemaFieldClass(
        fieldPath="address.street",
        type=SchemaFieldDataTypeClass(type=StringTypeClass()),
        nativeDataType="VARCHAR(100)",
        description="Street corresponding to the address",
        lastModified=AuditStampClass(time=1640692800000, actor="urn:li:corpuser:ingestion")
    ),
    SchemaFieldClass(
        fieldPath="last_sold_date",
        type=SchemaFieldDataTypeClass(type=StringTypeClass()),
        nativeDataType="Date",
        description="Date of the last sale date for this property",
        lastModified=AuditStampClass(time=1640692800000, actor="urn:li:corpuser:ingestion")
    )
]

# Create the schema metadata
schema_metadata = SchemaMetadataClass(
    schemaName="customer",
    platform=builder.make_data_platform_urn("hive"),
    version=0,
    hash="",
    platformSchema=OtherSchemaClass(rawSchema="__insert raw schema here__"),
    fields=schema_fields,
    lastModified=AuditStampClass(time=1640692800000, actor="urn:li:corpuser:ingestion")
)

# Step 4: Emit the Metadata
from datahub.emitter.mcp import MetadataChangeProposalWrapper

# Create a MetadataChangeProposalWrapper object
mcp = MetadataChangeProposalWrapper(
    entityUrn=dataset_urn,
    aspect=schema_metadata
)

# Emit the metadata
emitter.emit(mcp)

This script sets up a REST emitter, defines a dataset schema with fields, and emits the metadata to DataHub. You can modify the dataset_urn, schema_fields, and other parameters as needed for your specific use case.

Would you like more detailed information on any of these steps?

Sources:

  • <Slack Slack Thread> :+1: 0 button :-1: 0 button :bulb: Hint: Mention @runllm in the thread for followups.

Hi :slight_smile: ! Did this interaction resolve your issue or do you still require assistance ?

Please tag @ RunLLM if you still need further assistance !

Or tag @Datahub Community Support if you require a person to take a look :slight_smile:

Hope this helps ! ><