Fetching the Latest Execution Time of Data Source Ingestion

Original Slack Thread

how can I fetch the latest execution time of a data source ingestion?

Hey there! :wave:

<@U06TM7M40RK> Can you help with the questions in this thread?

:information_source: Please continue to tag RunLLM (e.g <@U06TM7M40RK>) to have it continue generating responses!

To fetch the latest execution time of a data source ingestion in DataHub, you can use the lastIngested field, which captures the last ingested time for a dataset. This can be accessed via a GraphQL query.

Here is an example of how you can achieve this using a GraphQL query:

import requests

# Define the GraphQL query
query = """
{
  dataset(urn: "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,PROD)") {
    urn
    lastIngested
  }
}
"""

# Set the endpoint URL and headers
url = "http://localhost:8080/api/graphql"
headers = {
    "Content-Type": "application/json",
    "Authorization": "Bearer YOUR_ACCESS_TOKEN"
}

# Make the request
response = requests.post(url, json={'query': query}, headers=headers)

# Parse the response
data = response.json()
print(data)

This query will return the lastIngested timestamp for the specified dataset, which indicates the last time the dataset was ingested into DataHub.

Would you like more detailed guidance on setting up this GraphQL query or any additional information?

Sources:

  • <Slack Thread> :+1: 0 button :-1: 0 button :bulb: Hint: Mention @runllm in the thread for followups.

<@U06TM7M40RK> can I also fetch the lastingested time using the python sdk?

It seems likely that the lastIngested property is not directly available as an aspect in the Python SDK. However, you can use the GraphQL API to retrieve this specific property, as the Python SDK does not provide a direct method to access the lastIngested field.

The Python SDK does provide methods to retrieve specific aspects of an entity using the get_aspect method, but the lastIngested property is not one of the aspects that can be retrieved this way. For example, you can retrieve other aspects like ContainerClass using the get_aspect method:

from datahub.ingestion.graph.client import DataHubGraph
from datahub.metadata.schema_classes import ContainerClass

# Initialize the DataHubGraph client
client = DataHubGraph(config)

# Define the entity URN
dataset_urn = "urn:li:dataset:(urn:li:dataPlatform:mysql,datahub.metadata_aspect_v2,PROD)"

# Retrieve the ContainerClass aspect
container_aspect = client.get_aspect(
    entity_urn=dataset_urn, aspect_type=ContainerClass
)

print(container_aspect)

Unfortunately, for the lastIngested property, you would need to use the GraphQL API as shown in the previous response.

Would you like more detailed guidance on using the GraphQL API or any additional information on retrieving other aspects using the SDK?

Sources:

  • <Slack Thread> :+1: 0 button :-1: 0 button :bulb: Hint: Mention @runllm in the thread for followups.

<@U06TM7M40RK> I am interested in the last run of my ingestion source not the dataset

To fetch the latest execution time of an ingestion source, you can use the getLastIngestedTime method from the SystemMetadataUtils class in the DataHub codebase. This method retrieves the last ingested time based on the most recent aspects present for the entity.

Here’s a brief overview of how you can use this method:

  1. Import the necessary classes: You need to import SystemMetadataUtils and EnvelopedAspectMap.

  2. Use the getLastIngestedTime method: This method takes an EnvelopedAspectMap as input and returns the last ingested time.

Here’s a code snippet to illustrate this:

import com.linkedin.datahub.graphql.types.common.mappers.util.SystemMetadataUtils;
import com.linkedin.entity.EnvelopedAspectMap;

public class IngestionExample {
    public static void main(String[] args) {
        EnvelopedAspectMap aspectMap = new EnvelopedAspectMap();
        // Populate aspectMap with your data

        Long lastIngestedTime = SystemMetadataUtils.getLastIngestedTime(aspectMap);
        System.out.println("Last Ingested Time: " + lastIngestedTime);
    }
}

This Java code snippet demonstrates how to retrieve the last ingested time using the SystemMetadataUtils class. You would need to populate the aspectMap with your specific data.

If you are looking to implement this in Python or another language, you would need to adapt the logic accordingly, possibly using the GraphQL API to fetch the necessary metadata.

Would you like more detailed steps on any specific part of this process or any additional assistance?

Sources:

<@U06TM7M40RK> I need this either for the python sdk or graphql

To fetch the latest execution time of an ingestion source using the DataHub GraphQL API, you can query the ingestionSourceExecutionRequests endpoint. This will provide details about the ingestion runs, including their status and execution times.

Here’s how you can do it using a GraphQL query:

import requests

# Define the GraphQL query
query = """
{
  ingestionSourceExecutionRequests(input: {start: 0, count: 1}) {
    executionRequests {
      id
      status
      startTime
      endTime
      ingestionSource {
        name
      }
    }
  }
}
"""

# Set the endpoint URL and headers
url = "<http://localhost:8080/api/graphql>"
headers = {
    "Content-Type": "application/json",
    "Authorization": "Bearer YOUR_ACCESS_TOKEN"
}

# Make the request
response = <http://requests.post|requests.post>(url, json={'query': query}, headers=headers)

# Parse the response
data = response.json()
print(data)

This query will return the most recent execution request for the ingestion source, including the startTime and endTime.

Unfortunately, the Python SDK does not provide a direct method to access the ingestion source execution times. Therefore, using the GraphQL API is the recommended approach for this specific requirement.

Would you like more detailed guidance on setting up this GraphQL query or any additional information?

Sources:

<@U06TM7M40RK> this is not a valid query

It seems there was an error in the query structure. Let’s correct it. To fetch the latest execution time of an ingestion source using the DataHub GraphQL API, you should use the executionRequest query to get the status and timing details of a specific execution request.

Here’s a corrected version of the GraphQL query:

import requests

# Define the GraphQL query
query = """
{
  executionRequest(urn: "urn:li:dataHubExecutionRequest:your-execution-request-id") {
    result {
      startTime
      endTime
      status
    }
  }
}
"""

# Set the endpoint URL and headers
url = "http://localhost:8080/api/graphql"
headers = {
    "Content-Type": "application/json",
    "Authorization": "Bearer YOUR_ACCESS_TOKEN"
}

# Make the request
response = requests.post(url, json={'query': query}, headers=headers)

# Parse the response
data = response.json()
print(data)

In this query, replace "urn:li:dataHubExecutionRequest:your-execution-request-id" with the actual URN of the execution request you are interested in. This will return the startTime, endTime, and status of the specified execution request.

If you need to find the URN of the latest execution request, you might need to list the execution requests first and then query the specific one.

Would you like more detailed guidance on any specific part of this process or any additional assistance?

Sources:

<@U06TM7M40RK> but I don’t have the execution request urn, I only have the ingestion source urn

To fetch the latest execution time of an ingestion source when you only have the ingestion source URN, you can first list the execution requests associated with that ingestion source and then retrieve the latest one. Here’s how you can do it using the DataHub GraphQL API:

  1. List Execution Requests: Use the ingestionSourceExecutionRequests query to list the execution requests for a specific ingestion source.

  2. Fetch Latest Execution: From the list, identify the latest execution request based on the startTime or endTime.

Here’s a step-by-step example using Python and GraphQL:

import requests

# Define the GraphQL query to list execution requests for a specific ingestion source
query = """
{
  ingestionSource(urn: "urn:li:dataHubIngestionSource:your-ingestion-source-id") {
    executions {
      executionRequests {
        urn
        result {
          startTime
          endTime
          status
        }
      }
    }
  }
}
"""

# Set the endpoint URL and headers
url = "http://localhost:8080/api/graphql"
headers = {
    "Content-Type": "application/json",
    "Authorization": "Bearer YOUR_ACCESS_TOKEN"
}

# Make the request
response = requests.post(url, json={'query': query}, headers=headers)

# Parse the response
data = response.json()

# Extract the latest execution request
execution_requests = data['data']['ingestionSource']['executions']['executionRequests']
latest_execution = max(execution_requests, key=lambda x: x['result']['endTime'])

print(f"Latest Execution URN: {latest_execution['urn']}")
print(f"Start Time: {latest_execution['result']['startTime']}")
print(f"End Time: {latest_execution['result']['endTime']}")
print(f"Status: {latest_execution['result']['status']}")

In this example, replace "urn:li:dataHubIngestionSource:your-ingestion-source-id" with your actual ingestion source URN. The script will list all execution requests for the specified ingestion source and then determine the latest one based on the endTime.

Would you like more detailed guidance on any specific part of this process or any additional assistance?

Sources:

<@U06TM7M40RK> can’t I use start and count to filter for the latest execution?

Yes, you can use the start and count parameters to retrieve the most recent execution requests for an ingestion source. By setting start to 0 and count to 1, you can fetch the latest execution request. Here’s how you can do it using the DataHub GraphQL API:

import requests

# Define the GraphQL query to get the latest execution request for a specific ingestion source
query = """
query getIngestionSource($urn: String!, $runStart: Int, $runCount: Int) {
  ingestionSource(urn: $urn) {
    executions(start: $runStart, count: $runCount) {
      executionRequests {
        urn
        result {
          startTimeMs
          endTimeMs
          status
        }
      }
    }
  }
}
"""

# Set the endpoint URL and headers
url = "<http://localhost:8080/api/graphql>"
headers = {
    "Content-Type": "application/json",
    "Authorization": "Bearer YOUR_ACCESS_TOKEN"
}

# Define the variables for the query
variables = {
    "urn": "urn:li:dataHubIngestionSource:your-ingestion-source-id",
    "runStart": 0,
    "runCount": 1
}

# Make the request
response = <http://requests.post|requests.post>(url, json={'query': query, 'variables': variables}, headers=headers)

# Parse the response
data = response.json()

# Extract the latest execution request
latest_execution = data['data']['ingestionSource']['executions']['executionRequests'][0]

print(f"Latest Execution URN: {latest_execution['urn']}")
print(f"Start Time: {latest_execution['result']['startTimeMs']}")
print(f"End Time: {latest_execution['result']['endTimeMs']}")
print(f"Status: {latest_execution['result']['status']}")

In this example, replace "urn:li:dataHubIngestionSource:your-ingestion-source-id" with your actual ingestion source URN. The query will return the most recent execution request by using start as 0 and count as 1.

Would you like more detailed guidance on any specific part of this process or any additional assistance?

Sources:

Hi :slight_smile: ! Did this interaction resolve your issue or do you still require assistance ?

Please tag @ RunLLM if you still need further assistance !

Or tag @Datahub Community Support if you require a person to take a look :slight_smile:

Hope this helps ! ><