<@U06TM7M40RK> can you generate me a Python script that ingests a dataset by using the SDK?
To ingest a dataset using the DataHub Python SDK, you can follow the steps outlined below. This example demonstrates how to create a dataset schema and emit it to DataHub using the REST emitter.
Step-by-Step Python Script
- Install the DataHub Python SDK: Ensure you have the
acryl-datahub
package installed. - Initialize the REST Emitter: Set up the REST emitter to communicate with your DataHub instance.
- Define the Dataset Schema: Create the schema metadata for your dataset.
- Emit the Metadata: Use the emitter to send the metadata to DataHub.
Here’s a complete Python script:
# Step 1: Install the DataHub Python SDK
# pip install acryl-datahub[datahub-rest]
# Step 2: Initialize the REST Emitter
import datahub.emitter.mce_builder as builder
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
SchemaMetadataClass,
SchemaFieldClass,
SchemaFieldDataTypeClass,
StringTypeClass,
AuditStampClass,
OtherSchemaClass
)
# Initialize the DataHub emitter
emitter = DatahubRestEmitter(gms_server="<http://localhost:8080>")
# Step 3: Define the Dataset Schema
# Define the dataset URN
dataset_urn = builder.make_dataset_urn(platform="hive", name="realestate_db.sales", env="PROD")
# Define the schema fields
schema_fields = [
SchemaFieldClass(
fieldPath="address.zipcode",
type=SchemaFieldDataTypeClass(type=StringTypeClass()),
nativeDataType="VARCHAR(50)",
description="This is the zipcode of the address. Specified using extended form and limited to addresses in the United States",
lastModified=AuditStampClass(time=1640692800000, actor="urn:li:corpuser:ingestion")
),
SchemaFieldClass(
fieldPath="address.street",
type=SchemaFieldDataTypeClass(type=StringTypeClass()),
nativeDataType="VARCHAR(100)",
description="Street corresponding to the address",
lastModified=AuditStampClass(time=1640692800000, actor="urn:li:corpuser:ingestion")
),
SchemaFieldClass(
fieldPath="last_sold_date",
type=SchemaFieldDataTypeClass(type=StringTypeClass()),
nativeDataType="Date",
description="Date of the last sale date for this property",
lastModified=AuditStampClass(time=1640692800000, actor="urn:li:corpuser:ingestion")
)
]
# Create the schema metadata
schema_metadata = SchemaMetadataClass(
schemaName="customer",
platform=builder.make_data_platform_urn("hive"),
version=0,
hash="",
platformSchema=OtherSchemaClass(rawSchema="__insert raw schema here__"),
fields=schema_fields,
lastModified=AuditStampClass(time=1640692800000, actor="urn:li:corpuser:ingestion")
)
# Step 4: Emit the Metadata
from datahub.emitter.mcp import MetadataChangeProposalWrapper
# Create a MetadataChangeProposalWrapper object
mcp = MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=schema_metadata
)
# Emit the metadata
emitter.emit(mcp)
This script sets up a REST emitter, defines a dataset schema with fields, and emits the metadata to DataHub. You can modify the dataset_urn
, schema_fields
, and other parameters as needed for your specific use case.
Would you like more detailed information on any of these steps?
Sources:
- <Slack Slack Thread> 0 button 0 button Hint: Mention @runllm in the thread for followups.
Hi ! Did this interaction resolve your issue or do you still require assistance ?
Please tag @ RunLLM if you still need further assistance !
Or tag @Datahub Community Support if you require a person to take a look
Hope this helps ! ><