Troubleshooting Missing Lineage Information in DataHub for NiFi Processors

Original Slack Thread

Hi, team
I faced problem. Connected my Apache NiFi to the DataHub <https://i.sstatic.net/6c7D9yBM.png|Succeeded ingestion> https://i.sstatic.net/xF6PbuTi.png|Recipe, but I can’t see my data stream and all the components /process groups. <https://i.sstatic.net/MBi4DMnp.png|Empty data flow> Datahub shows only empty NiFi pipeline. Please tell me what’s the matter?
I want to see my data flow in the form of Datahub’s lineage

Hey there! :wave: Make sure your message includes the following information if relevant, so we can help more effectively!

  1. Are you using UI or CLI for ingestion?
  2. Which DataHub version are you using? (e.g. 0.12.0)
  3. What data source(s) are you integrating with DataHub? (e.g. BigQuery)

<@U06TM7M40RK> Can you help with the questions in this thread?


:information_source: We’re experimenting with a new AI assistant for questions in the <#CUMUWQU66> channel to help you track down helpful resources more quickly. Keep in mind that responses may not always be spot-on; feel free to ask it follow-up questions and don’t forget to provide its responses a :thumbsup: or :thumbsdown: so other folks in the Community know if you’re still looking for help on the topic!

To help you more effectively, could you please provide the following information?

  1. Are you using the UI or CLI for ingestion?
  2. Which DataHub version are you using? (e.g., 0.12.0)?
  3. What data source(s) are you integrating with DataHub (e.g., BigQuery)?

Additionally, based on the information provided, here are a few potential areas to check:

  1. Dataset Existence: Ensure that the upstream and downstream datasets exist in DataHub. According to <Slack Slack thread>, lineage information may not be displayed if the datasets do not exist in DataHub.

  2. Lineage Extraction Support: Verify that the NiFi plugin supports the specific processors you are using. As per the <https://datahubproject.io/docs/0.13.1/generated/ingestion/sources/nifi|NiFi documentation>, there are limitations on the processors supported for lineage extraction.

  3. Manual Lineage Addition: If automatic lineage extraction is not working, you can manually add lineage using the DataHub UI or APIs. This is mentioned in the <https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/scripts/docgen.py|lineage feature guide>.

Please provide the requested details so we can assist you further.

Sources:

we use UI
datahub version 0.13.0

Do I understand correctly that only these processors can be extracted from NiFi:

SFTP: ListSFTP, FetchSFTP, GetSFTP, PutSFTP ```
?

<@U06TM7M40RK>

Yes, you are correct. According to the <https://datahubproject.io/docs/0.13.1/generated/ingestion/sources/nifi|NiFi documentation>, the current limitations for lineage extraction support only the following processors:

  • S3: ListS3, FetchS3Object, PutS3Object
  • SFTP: ListSFTP, FetchSFTP, GetSFTP, PutSFTP

If your NiFi flow uses processors other than these, the lineage information may not be extracted automatically.

Steps to Troubleshoot and Resolve:

  1. Verify Processor Types: Ensure that your NiFi flow uses the supported processors listed above. If not, consider modifying your flow to use these processors if possible.

  2. Check Provenance Events: Make sure that the provenance events are being captured correctly. The plugin extracts lineage information by analyzing these events. Ensure that your NiFi configuration retains provenance events long enough for the DataHub ingestion to process them.

  3. Manual Lineage Addition: If automatic extraction is not feasible, you can manually add lineage using the DataHub UI or APIs. This can be a workaround if your processors are not supported.

Example Configuration for NiFi Ingestion:

Here is an example of how you might configure the NiFi ingestion using the DataHub UI:

source:
  type: "nifi"
  config:
    site_url: "https://localhost:8443/nifi/"
    auth:
      type: "SINGLE_USER"
      username: "admin"
      password: "password"
sink:
  type: "datahub-rest"
  config:
    server: "http://localhost:8080"

Additional Resources:

If you need further assistance, please provide more details about your NiFi flow and the specific processors you are using.

Sources:

<@U06TM7M40RK>
how can I add processors manually? give detailed advice please

To manually add lineage for processors that are not automatically supported by the NiFi plugin, you can use the DataHub UI or APIs. Here’s a detailed guide on how to do this:

Using the DataHub UI

  1. Navigate to the Lineage Tab:

    • Go to the entity (e.g., dataset, data job) for which you want to add lineage.
    • Click on the Lineage tab.
  2. Add Upstream/Downstream Lineage:

    • Click on the Add Upstream or Add Downstream button.
    • Search for the entity you want to link as upstream or downstream.
    • Save the changes.

Using the DataHub API

If you prefer to use the API, you can programmatically emit lineage edges between entities. Here’s how you can do it:

  1. Set Up Your Environment:

    • Ensure you have the datahub CLI installed. If not, install it using:
      pip install acryl-datahub
      
  2. Create a Lineage JSON File:

    • Create a JSON file (e.g., lineage.json) with the lineage information. Here’s an example:
      {
        "edges": [
          {
            "source": "urn:li:dataset:(urn:li:dataPlatform:hive,source_dataset,PROD)",
            "destination": "urn:li:dataset:(urn:li:dataPlatform:hive,destination_dataset,PROD)",
            "created": {
              "time": 1622547800000
            }
          }
        ]
      }
      
  3. Ingest the Lineage:

    • Use the datahub CLI to ingest the lineage:
      datahub ingest -c lineage.json
      

Example Code for Adding Lineage Using Python

Here’s a Python example using the DataHub API to add lineage:

import requests

# Define the lineage information
lineage_data = {
    "edges": [
        {
            "source": "urn:li:dataset:(urn:li:dataPlatform:hive,source_dataset,PROD)",
            "destination": "urn:li:dataset:(urn:li:dataPlatform:hive,destination_dataset,PROD)",
            "created": {
                "time": 1622547800000
            }
        }
    ]
}

# Define the DataHub REST API endpoint
datahub_rest_endpoint = "http://localhost:8080/api/v2/lineage"

# Send the lineage data to DataHub
response = requests.post(datahub_rest_endpoint, json=lineage_data)

# Check the response
if response.status_code == 200:
    print("Lineage added successfully!")
else:
    print(f"Failed to add lineage: {response.status_code} - {response.text}")

Additional Resources

By following these steps, you can manually add lineage for processors that are not automatically supported by the NiFi plugin.

Sources:

<@U06TM7M40RK>
how can I add nifi processors via api python manually?

To manually add NiFi processors and their lineage to DataHub using the API in Python, you need to follow these steps:

  1. Set Up Your Environment:

    • Ensure you have the requests library installed. If not, install it using:
      pip install requests
      
  2. Define the Metadata for the Processors:

    • Create the metadata for the NiFi processors, including their lineage information.
  3. Send the Metadata to DataHub:

    • Use the DataHub REST API to send the metadata.

Here’s a detailed example:

Step-by-Step Guide

1. Define the Metadata for the Processors

Create a JSON structure that represents the NiFi processors and their lineage. For example, if you have a processor that reads from an S3 bucket and writes to a Snowflake table, you can define it as follows:

import requests
import json

# Define the metadata for the NiFi processors
nifi_processors_metadata = {
    "entities": [
        {
            "entityType": "dataset",
            "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:s3,source_bucket,PROD)",
            "aspects": [
                {
                    "com.linkedin.common.urn": "urn:li:dataset:(urn:li:dataPlatform:s3,source_bucket,PROD)",
                    "com.linkedin.common.Ownership": {
                        "owners": [
                            {
                                "owner": "urn:li:corpuser:data_owner",
                                "type": "DATAOWNER"
                            }
                        ],
                        "lastModified": {
                            "time": 1622547800000,
                            "actor": "urn:li:corpuser:data_owner"
                        }
                    }
                }
            ]
        },
        {
            "entityType": "dataset",
            "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,destination_table,PROD)",
            "aspects": [
                {
                    "com.linkedin.common.urn": "urn:li:dataset:(urn:li:dataPlatform:snowflake,destination_table,PROD)",
                    "com.linkedin.common.Ownership": {
                        "owners": [
                            {
                                "owner": "urn:li:corpuser:data_owner",
                                "type": "DATAOWNER"
                            }
                        ],
                        "lastModified": {
                            "time": 1622547800000,
                            "actor": "urn:li:corpuser:data_owner"
                        }
                    }
                }
            ]
        }
    ],
    "lineage": {
        "edges": [
            {
                "source": "urn:li:dataset:(urn:li:dataPlatform:s3,source_bucket,PROD)",
                "destination": "urn:li:dataset:(urn:li:dataPlatform:snowflake,destination_table,PROD)",
                "created": {
                    "time": 1622547800000
                }
            }

        ]
    }
}

# Convert the metadata to JSON
nifi_processors_metadata_json = json.dumps(nifi_processors_metadata)

2. Send the Metadata to DataHub

Use the DataHub REST API to send the metadata. Here’s how you can do it:

# Define the DataHub REST API endpoint
datahub_rest_endpoint = "<http://localhost:8080/api/v2/lineage>"

# Send the metadata to DataHub
response = <http://requests.post|requests.post>(datahub_rest_endpoint, data=nifi_processors_metadata_json, headers={"Content-Type": "application/json"})

# Check the response
if response.status_code == 200:
    print("NiFi processors and lineage added successfully!")
else:
    print(f"Failed to add NiFi processors and lineage: {response.status_code} - {response.text}")

Additional Resources

By following these steps, you can manually add NiFi processors and their lineage to DataHub using the API in Python.

Sources: