Creating Dataset Schema with Python REST Emitter in DataHub

Original Slack Thread

<@U06TM7M40RK> how to create dataset schema using python rest emitter

To create a dataset schema using the DataHub Python REST emitter, you can follow these steps:

  1. Install the DataHub Python SDK: Ensure you have the acryl-datahub package installed.
  2. Initialize the REST Emitter: Set up the REST emitter to communicate with your DataHub instance.
  3. Define the Dataset Schema: Create the schema metadata for your dataset.
  4. Emit the Metadata: Use the emitter to send the metadata to DataHub.

Here is a detailed example:

Step 1: Install the DataHub Python SDK

pip install acryl-datahub[datahub-rest]

Step 2: Initialize the REST Emitter

import datahub.emitter.mce_builder as builder
from datahub.emitter.rest_emitter import DatahubRestEmitter
from datahub.metadata.schema_classes import (
    SchemaMetadataClass,
    SchemaFieldClass,
    SchemaFieldDataTypeClass,
    StringTypeClass,
    AuditStampClass,
    OtherSchemaClass
)

# Initialize the DataHub emitter
emitter = DatahubRestEmitter(gms_server="http://localhost:8080")

Step 3: Define the Dataset Schema

# Define the dataset URN
dataset_urn = builder.make_dataset_urn(platform="hive", name="realestate_db.sales", env="PROD")

# Define the schema fields
schema_fields = [
    SchemaFieldClass(
        fieldPath="address.zipcode",
        type=SchemaFieldDataTypeClass(type=StringTypeClass()),
        nativeDataType="VARCHAR(50)",
        description="This is the zipcode of the address. Specified using extended form and limited to addresses in the United States",
        lastModified=AuditStampClass(time=1640692800000, actor="urn:li:corpuser:ingestion")
    ),
    SchemaFieldClass(
        fieldPath="address.street",
        type=SchemaFieldDataTypeClass(type=StringTypeClass()),
        nativeDataType="VARCHAR(100)",
        description="Street corresponding to the address",
        lastModified=AuditStampClass(time=1640692800000, actor="urn:li:corpuser:ingestion")
    ),
    SchemaFieldClass(
        fieldPath="last_sold_date",
        type=SchemaFieldDataTypeClass(type=DateTypeClass()),
        nativeDataType="Date",
        description="Date of the last sale date for this property",
        lastModified=AuditStampClass(time=1640692800000, actor="urn:li:corpuser:ingestion")
    )
]

# Create the schema metadata
schema_metadata = SchemaMetadataClass(
    schemaName="customer",
    platform=builder.make_data_platform_urn("hive"),
    version=0,
    hash="",
    platformSchema=OtherSchemaClass(rawSchema="__insert raw schema here__"),
    fields=schema_fields,
    lastModified=AuditStampClass(time=1640692800000, actor="urn:li:corpuser:ingestion")
)

Step 4: Emit the Metadata

from datahub.emitter.mcp import MetadataChangeProposalWrapper

# Create a MetadataChangeProposalWrapper object
mcp = MetadataChangeProposalWrapper(
    entityUrn=dataset_urn,
    aspect=schema_metadata
)

# Emit the metadata
emitter.emit(mcp)

Hey there! :wave: Make sure your message includes the following information if relevant, so we can help more effectively!

  1. Are you using UI or CLI for ingestion?
  2. Which DataHub version are you using? (e.g. 0.12.0)
  3. What data source(s) are you integrating with DataHub? (e.g. BigQuery)

<@U06TM7M40RK> Can you help with the questions in this thread?


:information_source: We’re experimenting with a new AI assistant for questions in the <#CUMUWQU66> channel to help you track down helpful resources more quickly. Keep in mind that responses may not always be spot-on; feel free to ask it follow-up questions and don’t forget to provide its responses a :thumbsup: or :thumbsdown: so other folks in the Community know if you’re still looking for help on the topic!

I’m not able to find any data sources to answer your question. Could you provide more details about the specific issue you are facing or the type of assistance you need? If this doesn’t help, please reach out to the DataHub support team for further assistance. :+1: 0 button :-1: 0 button :bulb: Hint: Mention @runllm in the thread for followups.

This code snippet creates a dataset schema and emits it to DataHub using the REST emitter. You can modify the dataset_urn, schema_fields, and other parameters as needed for your specific use case.

Would you like more detailed information on any of these steps?

Sources:

<@U06TM7M40RK> Getting below error. Datahub is deployed in eks cluster and i am trying to run code from local system by port forwarding to gms server
datahub.configuration.common.OperationalError: (‘Unable to emit metadata to DataHub GMS’, {‘message’: ‘401 Client Error: Unauthorized for url: http://localhost:808o/aspects?action=ingestProposal’})

Sure thing! I’m looking through the available data to answer your question. Hold on a second… :hourglass_flowing_sand: