Ensuring Consistency Between Deleted Airflow DAGs and DataHub

Original Slack Thread

I am connected Datahub with airflow using the datahub rest api plugin in airflow. The deleted DAGs are still exist in datahub, how can I make the DAGs consistent in the both platform? <@U06TM7M40RK>

To ensure that deleted DAGs in Airflow are also removed from DataHub, you can follow a manual cleanup process. DataHub does not automatically delete metadata for DAGs that have been removed from Airflow. Here are the steps you can take:

  1. Identify Obsolete DAGs: First, identify the DAGs that have been removed from Airflow but still exist in DataHub. You can do this by comparing the list of DAGs in Airflow with those in DataHub.

  2. Create a Cleanup DAG: You can create a DAG in Airflow specifically for cleaning up obsolete pipelines and tasks in DataHub. This DAG will help you remove the metadata for DAGs that no longer exist in Airflow.

    Here’s an example of how you can set up a cleanup DAG:

    from datetime import datetime
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    
    with DAG(
        "Datahub_Cleanup",
        start_date=datetime(2024, 1, 1),
        schedule_interval=None,
        catchup=False,
    ) as dag:
        task = BashOperator(
            task_id="cleanup_obsolete_data",
            bash_command="echo 'cleaning up the obsolete data from datahub'",
        )
    

    This DAG will help you manage the cleanup process by executing commands to remove obsolete data from DataHub.

  3. Use the DataHub CLI for Deletion: You can use the DataHub CLI to delete the metadata for specific DAGs. You need to identify the URNs of the DAGs you want to delete and then use the datahub delete command.

    For example, to perform a soft delete:

    datahub delete --urn "<urn-of-your-dag>" --soft
    

    For a hard delete:

    datahub delete --urn "<urn-of-your-dag>" --hard
    

    More details on using the DataHub CLI for deletion can be found in the <Removing Metadata from DataHub | DataHub documentation on deleting metadata>.

  4. Stateful Ingestion: If your connector supports stateful ingestion and it’s enabled, it can automatically soft-delete entities in DataHub when they are deleted in the source system. However, this feature needs to be configured properly.

Would you like more detailed steps or information on a specific part of the process?

Sources:

<@U06TM7M40RK> How Cleanup DAG clean the obsolete DAGs? I have already created the cleanup DAG, but it does not work

To ensure that your Cleanup DAG effectively removes obsolete DAGs from DataHub, you need to ensure that it is correctly configured to identify and delete the metadata for DAGs that no longer exist in Airflow. Here are some steps and considerations to troubleshoot and enhance your Cleanup DAG:

  1. Ensure Correct Configuration: Verify that your Cleanup DAG is correctly configured to interact with DataHub. This includes ensuring that the DataHub Airflow plugin is properly set up and that the DAG is using the correct cluster configuration from airflow.cfg.

  2. Check the Cleanup Logic: The Cleanup DAG should be designed to identify obsolete DAGs by comparing the list of DAGs in Airflow with those in DataHub. This involves:

    • Fetching all DAGs currently present in Airflow.
    • Fetching all DAGs recorded in DataHub.
    • Identifying the difference between these two lists to find obsolete DAGs.
  3. Use the DataHub CLI for Deletion: If the Cleanup DAG is not automatically removing obsolete DAGs, you might need to manually use the DataHub CLI to delete the metadata for specific DAGs. Ensure you have the correct URNs for the DAGs you want to delete.

  4. Review the Cleanup DAG Code: Ensure that the code within your Cleanup DAG is correctly set up to perform the cleanup operation. Here is a basic example of how a Cleanup DAG might be structured:

    from datetime import datetime
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    
    with DAG(
        "Datahub_Cleanup",
        start_date=datetime(2024, 1, 1),
        schedule_interval=None,
        catchup=False,
    ) as dag:
        task = BashOperator(
            task_id="cleanup_obsolete_data",
            bash_command="echo 'cleaning up the obsolete data from datahub'",
        )
    

    This example is a placeholder and should be replaced with logic that actually performs the cleanup, such as calling a script or function that interacts with DataHub to delete obsolete DAGs.

  5. Debugging: Check the logs of your Cleanup DAG to see if there are any errors or issues during execution. This can provide insights into why the cleanup might not be working as expected.

  6. Manual Verification: After running the Cleanup DAG, manually verify in DataHub whether the obsolete DAGs have been removed. If not, consider using the DataHub CLI to manually delete them as a temporary workaround.

If you need more detailed guidance on any specific part of this process, please let me know!

Sources:

<@U06TM7M40RK> replace the placeholder with logic that actually performs the cleanup, such as calling a script or function that interacts with DataHub to delete obsolete DAGs

To replace the placeholder in your Cleanup DAG with logic that actually performs the cleanup, you can implement a Python function that interacts with DataHub to delete obsolete DAGs. This function should identify the obsolete DAGs and use the DataHub CLI or API to remove them. Here’s a step-by-step guide on how you can achieve this:

  1. Identify Obsolete DAGs: Write a function to fetch the list of DAGs from both Airflow and DataHub, and determine which DAGs are obsolete (i.e., present in DataHub but not in Airflow).

  2. Delete Obsolete DAGs: Use the DataHub CLI or API to delete the metadata for these obsolete DAGs.

Here’s an example of how you might implement this in your Cleanup DAG:

from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
import subprocess

def fetch_airflow_dags():
    # Implement logic to fetch current DAGs from Airflow
    # This is a placeholder for actual implementation
    return set(["dag1", "dag2", "dag3"])

def fetch_datahub_dags():
    # Implement logic to fetch DAGs from DataHub
    # This is a placeholder for actual implementation
    return set(["dag1", "dag2", "dag3", "obsolete_dag"])

def delete_obsolete_dags():
    airflow_dags = fetch_airflow_dags()
    datahub_dags = fetch_datahub_dags()
    
    obsolete_dags = datahub_dags - airflow_dags
    
    for dag in obsolete_dags:
        urn = f"urn:li:dataFlow:(airflow,{dag},prod)"
        # Use DataHub CLI to delete the DAG
        subprocess.run(["datahub", "delete", "--urn", urn, "--soft"], check=True)
        print(f"Deleted obsolete DAG: {dag}")

with DAG(
    "Datahub_Cleanup",
    start_date=datetime(2024, 1, 1),
    schedule_interval=None,
    catchup=False,
) as dag:
    cleanup_task = PythonOperator(
        task_id="cleanup_obsolete_data",
        python_callable=delete_obsolete_dags,
    )

Explanation:

  • fetch_airflow_dags: This function should contain the logic to fetch the list of current DAGs from Airflow. You might need to use Airflow’s API or database to get this list.
  • fetch_datahub_dags: This function should fetch the list of DAGs from DataHub. You can use DataHub’s API or CLI to get this information.
  • delete_obsolete_dags: This function calculates the difference between the DAGs in DataHub and Airflow to find obsolete DAGs and deletes them using the DataHub CLI.

Note:

  • Ensure that the DataHub CLI is installed and configured on the machine where this DAG runs.
  • Replace the placeholder logic in fetch_airflow_dags and fetch_datahub_dags with actual implementations that suit your environment.

Would you like more detailed steps or information on a specific part of the process?

Sources:

  • <Slack Discussion on Metadata Deletion> :+1: 0 button :-1: 0 button :bulb: Hint: Mention @runllm in the thread for followups.

Hi :slight_smile: ! Did this interaction resolve your issue or do you still require assistance ?

Please tag @ RunLLM if you still need further assistance !

Or tag @Datahub Community Support if you require a person to take a look :slight_smile:

Hope this helps ! ><