Troubleshooting JSON Structure for DataHub Column Metadata_ingestion Script

Original Slack Thread

Hey <@U06TM7M40RK>, I’ve implemented a DataHub ingestion script that processes a hierarchical JSON structure (domains -> schemas -> tables -> columns) using MCPs. While the domains and tables are visible in the DataHub UI, the columns/fields/properties of the tables aren’t showing up.

    mcps = []
    dataset_urn = make_dataset_urn(
        platform=PLATFORM_NAME,
        name=f"{schema_name}.{table['name']}",
        env=ENV_NAME
    )
    
    # Process columns
    schema_fields = []
    for column in table.get("columns", []):
        field_path = column["name"]
        schema_fields.append(SchemaFieldClass(
            fieldPath=field_path,
            type=SchemaFieldDataTypeClass(type=column["type"]),
            description=column.get("description", ""),
            nativeDataType=column["type"]
        ))

    schema_metadata = SchemaMetadataClass(
        schemaName=table["name"],
        platform=f"urn:li:dataPlatform:{PLATFORM_NAME}",
        version=0,
        fields=schema_fields,
        hash="",
        platformSchema=SchemaFieldDataTypeClass(type="struct")
    )```
Here's a sample of my JSON structure:
```{
  "name": "Marketing",
  "schemas": [{
    "name": "ora_mktg",
    "tables": [{
      "name": "Customers",
      "columns": [{
        "name": "CUSTOMER_ID",
        "description": "Unique identifier for the customer",
        "type": "NUMBER(10)",
        "is_pk": "Y",
        "order": 1
      },
      {
        "name": "FIRST_NAME",
        "description": "Customer's first name",
        "type": "VARCHAR2(50)",
        "order": 2
      }]
    }]
  }]
}```
Any suggestions on why the column metadata might not be ingesting properly?

Based on the provided code snippet and JSON structure, it seems like the issue might be related to how the SchemaMetadataClass is being constructed and emitted. Here are a few suggestions to troubleshoot and resolve the issue:

  1. Ensure Correct Data Type Instantiation: The type parameter in SchemaFieldDataTypeClass should be an instance of the data type class, not the class itself. Make sure you are instantiating the data type classes correctly. For example, if column["type"] is a string like “NUMBER(10)”, you need to map it to the appropriate DataHub type class instance.

  2. Check Platform Schema: The platformSchema parameter in SchemaMetadataClass should be an instance of a schema class, such as OtherSchemaClass, rather than SchemaFieldDataTypeClass. You might want to use OtherSchemaClass with a rawSchema parameter.

  3. Emit the Metadata: Ensure that you are emitting the MetadataChangeProposalWrapper correctly after constructing the SchemaMetadataClass. The emission step is crucial for the metadata to be visible in DataHub.

Here is a revised version of your code snippet:

from datahub.metadata.schema_classes import (
    SchemaMetadataClass,
    SchemaFieldClass,
    SchemaFieldDataTypeClass,
    StringTypeClass,
    NumberTypeClass,
    OtherSchemaClass
)

def process_table(table: Dict, schema_name: str) -> List[MetadataChangeProposalWrapper]:
    mcps = []
    dataset_urn = make_dataset_urn(
        platform=PLATFORM_NAME,
        name=f"{schema_name}.{table['name']}",
        env=ENV_NAME
    )
    
    # Process columns
    schema_fields = []
    for column in table.get("columns", []):
        field_path = column["name"]
        # Map the column type to the appropriate DataHub type class
        data_type = StringTypeClass() if "VARCHAR" in column["type"] else NumberTypeClass()
        schema_fields.append(SchemaFieldClass(
            fieldPath=field_path,
            type=SchemaFieldDataTypeClass(type=data_type),
            description=column.get("description", ""),
            nativeDataType=column["type"]
        ))

    schema_metadata = SchemaMetadataClass(
        schemaName=table["name"],
        platform=f"urn:li:dataPlatform:{PLATFORM_NAME}",
        version=0,
        fields=schema_fields,
        hash="",
        platformSchema=OtherSchemaClass(rawSchema="__insert raw schema here__")
    )

    # Emit the metadata
    mcp = MetadataChangeProposalWrapper(
        entityUrn=dataset_urn,
        aspect=schema_metadata
    )
    mcps.append(mcp)
    return mcps

Make sure to replace PLATFORM_NAME and ENV_NAME with the actual values you are using. Also, ensure that the emit function is called with the mcp object to send the metadata to DataHub.

Would you like more detailed steps or further assistance on any specific part of this process?

Sources:

Hi <@U08P2KL1Y2U>, how did my answer work out for you? Need any more assistance? You can also tag <@U07FV3UHE4D> to let your fellow community members know you’re still looking for guidance.

Hi :slight_smile: ! Did this interaction resolve your issue or do you still require assistance ?

Please tag @ RunLLM if you still need further assistance !

Or tag @Datahub Community Support if you require a person to take a look :slight_smile:

Hope this helps ! ><