Finding and Utilizing the Source Code for Example Ingestion in DataHub

Original Slack Thread

<@U06TM7M40RK> where is the source code for example ingestion in datahub. as i use datahub ingest sample data like:

datahub docker ingest-sample-data

The source code for example ingestion in DataHub can be found in the <https://github.com/datahub-project/datahub|DataHub GitHub repository>. Specifically, the command datahub docker ingest-sample-data is used to ingest sample metadata into DataHub for demo and testing purposes. This command is part of the DataHub CLI and is documented in the <https://github.com/datahub-project/datahub/blob/master/docs/quickstart.md|DataHub Quickstart guide>.

If you are looking for a sample recipe that performs a similar function, you can refer to the https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/examples/recipes/file_to_datahub_rest.dhub.yaml|file_to_datahub_rest.dhub.yaml example in the repository. This YAML file can be used with the DataHub CLI to ingest data by specifying the source and destination configurations.

Would you like more details on how to use these resources or any specific part of the ingestion process?

Sources:

Hi <@U082Z415Y6P>, how did my answer work out for you? Need any more assistance? You can also tag <@U07FV3UHE4D> to let your fellow community members know you’re still looking for guidance.

<@U06TM7M40RK> datahub delete --urn “urn:li:dataFlow:(airflow,dbt_run,prod)” --hard

datahub.configuration.common.OperationalError: (‘Unable to get metadata from DataHub’, {‘message’: '401 Client Error: Unauthorized for url: http://localhost:8080/entities?action=delete’})

The error message you’re encountering, 401 Client Error: Unauthorized, indicates that the request to delete the entity is not authorized. This typically means there is an issue with authentication or permissions.

Here are a few steps you can take to resolve this issue:

  1. Check API Token: Ensure that you are using the correct API token for authentication. The environment variable for the API token should be set correctly. According to <Slack discussion>, the correct environment variable name is DATAHUB_GMS_TOKEN.

  2. Verify Permissions: Make sure that the user associated with the API token has the necessary permissions to perform delete operations. As per <Slack thread>, the user needs to have the DELETE_ENTITY_PRIVILEGE.

  3. Check Configuration: If you have enabled REST_API_AUTHORIZATION_ENABLED, ensure that the user has the appropriate roles and privileges assigned. You might need to explicitly grant the DELETE_ENTITY permission to the user.

  4. Environment Variables: Double-check that all necessary environment variables are set correctly and that the DataHub CLI is configured to use them.

If you have verified all the above and the issue persists, you might want to check the DataHub logs for more detailed error messages or consult the <DataHub CLI | DataHub documentation> for further troubleshooting steps.

Would you like more detailed information on any of these steps, or do you have any specific concerns about the deletion process?

Sources:

  • <Slack Slack Discussion on Unauthorized Error>
  • <Slack Slack Discussion on Permissions> :+1: 0 button :-1: 0 button :bulb: Hint: Mention @runllm in the thread for followups.

<@U06TM7M40RK>

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import os

os.chdir(‘/opt/airflow/dbt-sample/dbt_mssql_connector/’)

default_args = {
‘owner’: ‘airflow’,
‘depends_on_past’: False,
‘email_on_failure’: False,
‘email_on_retry’: False,
‘retries’: 1,
‘retry_delay’: timedelta(seconds=5),
}

with DAG(
‘dbt_run’,
default_args=default_args,
description=‘A simple dbt DAG’,
schedule_interval=‘@daily’, # Change to a cron-style interval
start_date=datetime(2024, 11, 28), # Use a fixed start date
catchup=False,
) as dag:

dbt_run = BashOperator(
    task_id='test',

bash_command=‘/bin/bash -c “cd /opt/airflow/dbt-sample/dbt_mssql_connector/ && \ dbt run --project-dir /opt/airflow/dbt-sample/dbt_mssql_connector/ && \ cp target/run_results.json target/run_results_backup.json && \ dbt docs generate --target dev && \ datahub ingest -c dbt-recipe.dhub.yml”’,

    bash_command='cd /opt/airflow/dbt-sample/dbt_mssql_connector/ &amp;&amp; dbt clean &amp;&amp; dbt source freshness &amp;&amp; dbt build --project-dir /opt/airflow/dbt-sample/dbt_mssql_connector/ &amp;&amp; cp target/run_results.json target/run_results_backup.json &amp;&amp; dbt docs generate --target dev &amp;&amp; datahub ingest -c dbt-recipe.dhub.yml',
)

dbt_run

this is my dag, how can i make datahub to capture the dataflow start from airflow and then dbt

To capture the data flow starting from Airflow and then dbt in DataHub, you can use the DataHub Airflow plugin to emit metadata events as part of the DAG execution. This involves setting up the DataHubEmitterOperator to emit lineage and schema metadata to DataHub. Here’s how you can modify your DAG to achieve this:

  1. Install the DataHub Airflow Plugin: Ensure that the DataHub Airflow plugin is installed in your Airflow environment. You can find more information on how to install and configure the plugin in the <https://github.com/datahub-project/datahub/blob/master/docs/lineage/airflow.md|DataHub Airflow documentation>.

  2. Modify the DAG to Emit Metadata: Use the DatahubEmitterOperator to emit metadata events. Below is an example of how you can modify your DAG to include this operator:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import os
from datahub_airflow_plugin.operators.datahub import DatahubEmitterOperator
import datahub.emitter.mce_builder as builder

os.chdir('/opt/airflow/dbt-sample/dbt_mssql_connector/')

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(seconds=5),
}

with DAG(
    'dbt_run',
    default_args=default_args,
    description='A simple dbt DAG',
    schedule_interval='@daily',
    start_date=datetime(2024, 11, 28),
    catchup=False,
) as dag:

    dbt_run = BashOperator(
        task_id='test',
        bash_command='cd /opt/airflow/dbt-sample/dbt_mssql_connector/ && dbt clean && dbt source freshness && dbt build --project-dir /opt/airflow/dbt-sample/dbt_mssql_connector/ && cp target/run_results.json target/run_results_backup.json && dbt docs generate --target dev && datahub ingest -c dbt-recipe.dhub.yml',
    )

    emit_lineage_task = DatahubEmitterOperator(
        task_id="emit_lineage",
        datahub_conn_id="datahub_rest_default",
        mces=[
            builder.make_lineage_mce(
                upstream_urns=[
                    builder.make_dataset_urn(
                        platform="dbt", name="your_dbt_model_name"
                    ),
                ],
                downstream_urn=builder.make_dataset_urn(
                    platform="your_platform", name="your_downstream_dataset_name"
                ),
            ),
        ],
    )

    dbt_run >> emit_lineage_task
  1. Configure DataHub Connection: Ensure that the datahub_conn_id is configured in your Airflow connections to point to your DataHub instance.

This setup will allow you to capture the data flow from Airflow to dbt and emit the relevant metadata to DataHub. The DatahubEmitterOperator is used to emit lineage information, which can be customized based on your specific data flow and datasets.

Would you like more detailed information on any specific part of this setup?

Sources:

<@U06TM7M40RK>
here is the thing, my airflow is used to call dbt, which capture mssql server database table DimCustomer and then process as dim_customer then stores back to mssql

To capture the data flow from Airflow to dbt and back to MSSQL Server in DataHub, you can use the DataHub Airflow plugin to emit lineage metadata. This involves setting up the DataHubEmitterOperator to emit lineage information about the data flow from the MSSQL Server table DimCustomer to the processed table dim_customer.

Here’s how you can modify your DAG to achieve this:

  1. Install the DataHub Airflow Plugin: Ensure that the DataHub Airflow plugin is installed in your Airflow environment. You can find more information on how to install and configure the plugin in the <https://github.com/datahub-project/datahub/blob/master/docs/lineage/airflow.md|DataHub Airflow documentation>.

  2. Modify the DAG to Emit Metadata: Use the DatahubEmitterOperator to emit metadata events. Below is an example of how you can modify your DAG to include this operator:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import os
from datahub_airflow_plugin.operators.datahub import DatahubEmitterOperator
import datahub.emitter.mce_builder as builder

os.chdir('/opt/airflow/dbt-sample/dbt_mssql_connector/')

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(seconds=5),
}

with DAG(
    'dbt_run',
    default_args=default_args,
    description='A simple dbt DAG',
    schedule_interval='@daily',
    start_date=datetime(2024, 11, 28),
    catchup=False,
) as dag:

    dbt_run = BashOperator(
        task_id='test',
        bash_command='cd /opt/airflow/dbt-sample/dbt_mssql_connector/ && dbt clean && dbt source freshness && dbt build --project-dir /opt/airflow/dbt-sample/dbt_mssql_connector/ && cp target/run_results.json target/run_results_backup.json && dbt docs generate --target dev && datahub ingest -c dbt-recipe.dhub.yml',
    )

    emit_lineage_task = DatahubEmitterOperator(
        task_id="emit_lineage",
        datahub_conn_id="datahub_rest_default",
        mces=[
            builder.make_lineage_mce(
                upstream_urns=[
                    builder.make_dataset_urn(
                        platform="mssql", name="database.schema.DimCustomer"
                    ),
                ],
                downstream_urn=builder.make_dataset_urn(
                    platform="mssql", name="database.schema.dim_customer"
                ),
            ),
        ],
    )

    dbt_run >> emit_lineage_task
  1. Configure DataHub Connection: Ensure that the datahub_conn_id is configured in your Airflow connections to point to your DataHub instance.

This setup will allow you to capture the data flow from the MSSQL Server table DimCustomer to the processed table dim_customer and emit the relevant metadata to DataHub. The DatahubEmitterOperator is used to emit lineage information, which can be customized based on your specific data flow and datasets.

Would you like more detailed information on any specific part of this setup?

Sources:

Hi :slight_smile: ! Did this interaction resolve your issue or do you still require assistance ?

Please tag @ RunLLM if you still need further assistance !

Or tag @Datahub Community Support if you require a person to take a look :slight_smile:

Hope this helps ! ><