Troubleshooting Multiple Ingestion Triggers in DataHub

Original Slack Thread

Hi Team,

I have 8 nodes running Frontend, GMS, MAE and MCE.
I have scheduled ingestion to run at 10 AM daily. It is triggering the ingestion 8 times. Basically, the job is triggered from each GMS node.
Any option to fix this?

Attached ingestion run screenshotattachment

Hey there! :wave: Make sure your message includes the following information if relevant, so we can help more effectively!

  1. Are you using UI or CLI for ingestion?
  2. Which DataHub version are you using? (e.g. 0.12.0)
  3. What data source(s) are you integrating with DataHub? (e.g. BigQuery)

<@U06TM7M40RK> Can you help with the questions in this thread?


:information_source: Please continue to tag RunLLM (e.g <@U06TM7M40RK>) to have it continue generating responses! We’re experimenting with a new AI assistant for questions in the <#CUMUWQU66> channel to help you track down helpful resources more quickly. Keep in mind that responses may not always be spot-on; feel free to ask it follow-up questions and don’t forget to provide its responses a :thumbsup: or :thumbsdown: so other folks in the Community know if you’re still looking for help on the topic and to help us improve the assistant!

To address the issue of multiple ingestion triggers, it is important to understand the setup and configuration of your DataHub deployment. Here are a few questions to help diagnose the problem:

  1. Are you using UI or CLI for ingestion?
  2. Which DataHub version are you using? (e.g., 0.12.0)
  3. What data source(s) are you integrating with DataHub? (e.g., BigQuery)

Based on the information provided, it seems that the ingestion job is being triggered from each GMS node, leading to multiple runs. This is a known issue when running multiple GMS instances or multiple MAE/MCE consumers.

Potential Solutions:

  1. Standalone Mode Configuration:

    • Ensure that you are running the MAE and MCE consumers in standalone mode with a single replica. Multiple replicas can lead to multiple ingestion triggers.
    • According to <Slack Slack discussion>, having multiple replicas of MAE and MCE consumers can cause multiple ingestion triggers. Ensure that the MCE consumer has only a single replica.
  2. Environment Variables:

    • Check the environment variables related to ingestion scheduling. Ensure that the MCP_CONSUMER_ENABLED, MCL_CONSUMER_ENABLED, and PE_CONSUMER_ENABLED variables are correctly set. Refer to the <Deployment Environment Variables | DataHub environment variables documentation> for more details.
  3. IngestionScheduler Singleton:

    • The IngestionScheduler singleton class is responsible for scheduling ingestion runs. Ensure that only one instance of this class is running. This can be achieved by configuring the GMS deployment correctly. Refer to <Slack Slack discussion> for more details.

Would you like more detailed steps on how to configure these settings?

Sources:

  • <Slack discussion on multiple ingestion triggers>
  • <Slack discussion on IngestionScheduler> :+1: 0 button :-1: 0 button :bulb: Hint: Mention <@U06TM7M40RK> in the thread for followups.

<@U06TM7M40RK>
Where is the IngestionScheduler singleton class located?

The IngestionScheduler singleton class is located in the DataHub codebase. Specifically, you can find it in the following file:

The IngestionScheduler class is responsible for scheduling ingestion runs for ingestion sources defined within DataHub. It manages storing and triggering ingestion sources on a pre-defined schedule based on the information present in the DataHubIngestionSourceInfo aspect.

Here is a brief overview of the class:

package com.datahub.metadata.ingestion;

import com.linkedin.entity.client.EntityClient;
import io.datahubproject.metadata.context.OperationContext;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequiredArgsConstructor
public class IngestionScheduler {

  private final OperationContext systemOpContext;
  private final EntityClient entityClient;
  private final Map<Urn, ScheduledFuture<?>> nextIngestionSourceExecutionCache = new HashMap<>();
  private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

  public void init() {
    // Initialization logic
  }

  public void unscheduleNextIngestionSourceExecution(final Urn ingestionSourceUrn) {
    // Unschedule logic
  }

  public void unscheduleAll() {
    // Unschedule all logic
  }

  public void scheduleNextIngestionSourceExecution(final Urn ingestionSourceUrn, final DataHubIngestionSourceInfo newInfo) {
    // Schedule next execution logic
  }

  // Other methods and inner classes
}

Would you like more detailed information on how to configure or troubleshoot the IngestionScheduler?

Sources:

<@U06TM7M40RK>
What triggers the scheduled ingestion?

The scheduled ingestion in DataHub is triggered by the IngestionScheduler class. This class is responsible for managing and executing scheduled ingestion runs based on the schedule defined in the DataHubIngestionSourceInfo aspect.

How Scheduled Ingestion is Triggered:

  1. Initialization:

    • The IngestionScheduler class initializes by loading all the ingestion source schedules and setting up the necessary tasks to trigger ingestion runs at the specified times.
    • This is done in the init method, which sets up a recurring task to refresh the schedules and schedule the next ingestion runs.
  2. Scheduling the Next Execution:

    • The scheduleNextIngestionSourceExecution method computes the next execution time for an ingestion source based on its schedule and sets up a task to trigger the ingestion run at that time.
    • It uses a ScheduledExecutorService to schedule the execution of the ingestion run.
  3. Execution Request:

    • When the scheduled time arrives, the ExecutionRequestRunnable is executed. This runnable creates an ExecutionRequest entity, which triggers the actual ingestion process.
    • The ExecutionRequestRunnable is responsible for creating a new DataHubExecutionRequest entity instance using an EntityClient.

Here is a simplified code snippet to illustrate the process:

public class IngestionScheduler {

  private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
  private final Map<Urn, ScheduledFuture<?>> nextIngestionSourceExecutionCache = new HashMap<>();

  public void init() {
    // Schedule a recurring task to refresh schedules
    scheduledExecutorService.scheduleAtFixedRate(
        new BatchRefreshSchedulesRunnable(),
        initialDelay,
        refreshInterval,
        TimeUnit.SECONDS);
  }

  public void scheduleNextIngestionSourceExecution(final Urn ingestionSourceUrn, final DataHubIngestionSourceInfo newInfo) {
    // Compute the next execution time
    ZonedDateTime nextExecDate = computeNextExecutionTime(newInfo.getSchedule());
    long scheduleTime = nextExecDate.toInstant().toEpochMilli() - ZonedDateTime.now().toInstant().toEpochMilli();

    // Schedule the ingestion run
    ScheduledFuture<?> scheduledFuture = scheduledExecutorService.schedule(
        new ExecutionRequestRunnable(ingestionSourceUrn, newInfo),
        scheduleTime,
        TimeUnit.MILLISECONDS);
    nextIngestionSourceExecutionCache.put(ingestionSourceUrn, scheduledFuture);
  }

  private ZonedDateTime computeNextExecutionTime(DataHubIngestionSourceSchedule schedule) {
    // Compute the next execution time based on the schedule
    CronExpression generator = CronExpression.parse(schedule.getInterval());
    return generator.next(ZonedDateTime.now());
  }
}

Key Components:

  • ScheduledExecutorService: Used to schedule tasks for future execution.
  • ExecutionRequestRunnable: A runnable that creates an ExecutionRequest to trigger the ingestion process.
  • BatchRefreshSchedulesRunnable: A runnable that refreshes the schedules periodically.

Would you like more detailed information on any specific part of this process?

Sources:

<@U06TM7M40RK>
Is it done by MAE or MCE?

The scheduled ingestion process itself is not directly managed by Metadata Audit Events (MAE) or Metadata Change Events (MCE). Instead, it is managed by the IngestionScheduler class, which schedules and triggers ingestion runs based on predefined schedules.

However, MAE and MCE play roles in the broader DataHub architecture, particularly in how metadata changes are processed and propagated. Here’s a breakdown of their roles:

Metadata Change Event (MCE):

  • Purpose: MCEs represent proposed changes to metadata. They are used to ingest new metadata or update existing metadata.
  • Emission: MCEs are emitted by clients during the ingestion process.
  • Processing: The MCE consumer processes these events and applies the changes to the metadata store (e.g., MySQL).
  • Example: When an ingestion job runs, it may emit MCEs to propose changes to metadata entities.

Metadata Audit Event (MAE):

  • Purpose: MAEs capture committed changes to metadata. They are used to audit and track changes to metadata over time.
  • Emission: MAEs are emitted after changes have been successfully committed to the metadata store.
  • Processing: The MAE consumer processes these events to update search indices, graph indices, and other downstream systems.
  • Example: After an MCE is processed and changes are committed, an MAE is emitted to reflect the committed changes.

Scheduled Ingestion:

  • Managed by: IngestionScheduler class.
  • Trigger: The IngestionScheduler schedules and triggers ingestion runs based on the schedule defined in the DataHubIngestionSourceInfo aspect.
  • Execution: When the scheduled time arrives, the ExecutionRequestRunnable is executed, which creates an ExecutionRequest entity to trigger the ingestion process.

How They Interact:

  • Ingestion Process: During an ingestion run, MCEs may be emitted to propose changes to metadata. These MCEs are processed by the MCE consumer, which applies the changes to the metadata store.
  • Post-Ingestion: After the changes are committed, MAEs are emitted to reflect the committed changes. These MAEs are processed by the MAE consumer to update search indices and other downstream systems.

Example Workflow:

  1. IngestionScheduler schedules an ingestion run.
  2. ExecutionRequestRunnable triggers the ingestion process at the scheduled time.
  3. MCEs are emitted during the ingestion process to propose changes to metadata.
  4. MCE Consumer processes the MCEs and applies changes to the metadata store.
  5. MAEs are emitted after changes are committed to reflect the committed changes.
  6. MAE Consumer processes the MAEs to update search indices and other downstream systems.

Would you like more detailed information on any specific part of this process?

Sources:

<@U06TM7M40RK>
To prevent duplicate scheduled ingestions runs what consumer should have 1 replica?

To prevent duplicate scheduled ingestion runs, you should ensure that the MCE (Metadata Change Event) consumer has only one replica. This is because the MCE consumer is responsible for processing Metadata Change Proposals (MCPs) and triggering ingestion jobs. Having multiple replicas of the MCE consumer can lead to multiple ingestion triggers, as each replica may independently process the same events.

Steps to Configure MCE Consumer with a Single Replica:

  1. Update Helm Chart:

    • If you are using Helm to deploy DataHub, you can configure the number of replicas for the MCE consumer in the Helm chart values file.
    • Set the replicaCount for the MCE consumer to 1.
    datahub-mce-consumer:
      replicaCount: 1
    
  2. Update Docker Compose:

    • If you are using Docker Compose, ensure that the replicas field for the MCE consumer service is set to 1.
    version: '3.7'
    services:
      datahub-mce-consumer:
        image: datahub-mce-consumer:latest
        container_name: datahub-mce-consumer
        environment:
          - ELASTICSEARCH_HOST=elasticsearch
          - ELASTICSEARCH_PORT=9200
          - ELASTICSEARCH_PROTOCOL=http
        deploy:
          replicas: 1
    
  3. Kubernetes Deployment:

    • If you are deploying DataHub on Kubernetes, ensure that the replicas field in the MCE consumer deployment YAML is set to 1.
    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: datahub-mce-consumer
    spec:
      replicas: 1
      template:
        spec:
          containers:
            - name: datahub-mce-consumer
              image: datahub-mce-consumer:latest
              env:
                - name: ELASTICSEARCH_HOST
                  value: "elasticsearch"
                - name: ELASTICSEARCH_PORT
                  value: "9200"
                - name: ELASTICSEARCH_PROTOCOL
                  value: "http"
    

Additional Considerations:

  • MAE Consumer: The MAE consumer can have multiple replicas to handle the load of processing Metadata Audit Events (MAEs) and updating search indices. However, ensure that the MCE consumer has only one replica to avoid duplicate ingestion triggers.
  • GMS Instances: Ensure that the GMS instances are correctly configured to avoid triggering multiple ingestion runs. The IngestionScheduler should be managed in a way that only one instance is responsible for scheduling ingestion runs.

Would you like more detailed information on any specific part of this configuration?

Sources:

  • <Slack discussion on multiple ingestion triggers> :+1: 0 button :-1: 0 button :bulb: Hint: Mention @runllm in the thread for followups.

<@U06TM7M40RK>
How to configure GMS to avoid triggering multiple ingestion runs?

To configure the Global Metadata Service (GMS) to avoid triggering multiple ingestion runs, you need to ensure that the ingestion scheduling is managed in a way that only one instance of the IngestionScheduler is responsible for scheduling ingestion runs. Here are the steps to achieve this:

Steps to Configure GMS:

  1. Standalone Mode for MCE Consumer:

    • Ensure that the MCE consumer is running in standalone mode with a single replica. This prevents multiple instances from processing the same ingestion events.
  2. Disable Ingestion Scheduling on Additional GMS Instances:

    • If you have multiple GMS instances, you need to disable the ingestion scheduling on all but one instance. This can be done by setting an environment variable or configuration flag to disable the IngestionScheduler on additional instances.

Example Configuration:

Helm Chart Configuration:

  • Update your Helm chart values to ensure that only one GMS instance is responsible for scheduling ingestion runs.
datahub-gms:
  replicaCount: 3
  extraEnv:
    - name: INGESTION_SCHEDULER_ENABLED
      value: "false"
  • Set the INGESTION_SCHEDULER_ENABLED environment variable to true only on one GMS instance.

Kubernetes Deployment Configuration:

  • Update your Kubernetes deployment YAML to include the INGESTION_SCHEDULER_ENABLED environment variable.
apiVersion: apps/v1
kind: Deployment
metadata:
  name: datahub-gms
spec:
  replicas: 3
  template:
    spec:
      containers:
        - name: datahub-gms
          image: datahub-gms:latest
          env:
            - name: INGESTION_SCHEDULER_ENABLED
              value: "false"
  • Create a separate deployment for the GMS instance that will handle ingestion scheduling and set the INGESTION_SCHEDULER_ENABLED environment variable to true.
apiVersion: apps/v1
kind: Deployment
metadata:
  name: datahub-gms-scheduler
spec:
  replicas: 1
  template:
    spec:
      containers:
        - name: datahub-gms
          image: datahub-gms:latest
          env:
            - name: INGESTION_SCHEDULER_ENABLED
              value: "true"

Docker Compose Configuration:

  • If you are using Docker Compose, ensure that the INGESTION_SCHEDULER_ENABLED environment variable is set to false for additional GMS instances.
version: '3.7'
services:
  datahub-gms:
    image: datahub-gms:latest
    environment:
      - INGESTION_SCHEDULER_ENABLED=false
    deploy:
      replicas: 3

  datahub-gms-scheduler:
    image: datahub-gms:latest
    environment:
      - INGESTION_SCHEDULER_ENABLED=true
    deploy:
      replicas: 1

Additional Considerations:

  • Health Checks: Ensure that health checks are configured for your GMS instances to monitor their status and ensure they are running correctly.
  • Logging and Monitoring: Set up logging and monitoring to track the status of ingestion runs and identify any issues with scheduling.

By following these steps, you can configure GMS to avoid triggering multiple ingestion runs and ensure that only one instance is responsible for scheduling ingestion runs.

Would you like more detailed steps on any specific part of this configuration?

Sources:

<@U06TM7M40RK>
INGESTION_SCHEDULER_ENABLED is not present in the codebase

Thank you for pointing that out. If the INGESTION_SCHEDULER_ENABLED environment variable is not present in the codebase, we need to take a different approach to ensure that only one instance of the IngestionScheduler is responsible for scheduling ingestion runs.

Alternative Approach:

  1. Leader Election:

    • Implement a leader election mechanism to ensure that only one GMS instance acts as the scheduler. This can be done using tools like Kubernetes leader election or a distributed lock service like ZooKeeper or etcd.
  2. Custom Configuration:

    • Modify the IngestionScheduler class to check for a custom configuration or environment variable that determines whether it should run the scheduler.

Example Implementation:

Leader Election with Kubernetes:

  1. Install the Leader Election Library:

    • Use a leader election library like kubernetes-client to implement leader election in your GMS deployment.
  2. Modify the IngestionScheduler Class:

    • Modify the IngestionScheduler class to participate in leader election and only run the scheduler if it is the leader.
import io.kubernetes.client.extended.leaderelection.LeaderElectionConfig;
import io.kubernetes.client.extended.leaderelection.LeaderElector;
import io.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock;
import io.kubernetes.client.util.credentials.AccessTokenAuthentication;
import io.kubernetes.client.util.credentials.ClientCertificateAuthentication;
import io.kubernetes.client.util.credentials.KubeconfigAuthentication;
import io.kubernetes.client.util.credentials.TokenFileAuthentication;
import java.time.Duration;

public class IngestionScheduler {

  private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
  private final Map&lt;Urn, ScheduledFuture&lt;?&gt;&gt; nextIngestionSourceExecutionCache = new HashMap&lt;&gt;();

  public void init() {
    // Leader election configuration
    LeaseLock leaseLock = new LeaseLock("default", "datahub-gms-leader", "datahub-gms");
    LeaderElectionConfig config = new LeaderElectionConfig(
        leaseLock,
        Duration.ofMillis(10000), // Lease duration
        Duration.ofMillis(8000),  // Renew deadline
        Duration.ofMillis(2000)   // Retry period
    );

    LeaderElector leaderElector = new LeaderElector(config);
    leaderElector.run(
        () -&gt; {
          // This block will only be executed by the leader
          scheduledExecutorService.scheduleAtFixedRate(
              new BatchRefreshSchedulesRunnable(),
              initialDelay,
              refreshInterval,
              TimeUnit.SECONDS);
        }
    );
  }

  public void scheduleNextIngestionSourceExecution(final Urn ingestionSourceUrn, final DataHubIngestionSourceInfo newInfo) {
    // Compute the next execution time
    ZonedDateTime nextExecDate = computeNextExecutionTime(newInfo.getSchedule());
    long scheduleTime = nextExecDate.toInstant().toEpochMilli() - ZonedDateTime.now().toInstant().toEpochMilli();

    // Schedule the ingestion run
    ScheduledFuture&lt;?&gt; scheduledFuture = scheduledExecutorService.schedule(
        new ExecutionRequestRunnable(ingestionSourceUrn, newInfo),
        scheduleTime,
        TimeUnit.MILLISECONDS);
    nextIngestionSourceExecutionCache.put(ingestionSourceUrn, scheduledFuture);
  }

  private ZonedDateTime computeNextExecutionTime(DataHubIngestionSourceSchedule schedule) {
    // Compute the next execution time based on the schedule
    CronExpression generator = CronExpression.parse(schedule.getInterval());
    return generator.next(ZonedDateTime.now());
  }
}
  1. Deploy the Modified GMS:
    • Deploy the modified GMS with leader election enabled. Ensure that all GMS instances participate in the leader election, but only the leader will run the IngestionScheduler.

Custom Configuration:

  1. Add a Custom Configuration Flag:
    • Add a custom configuration flag to the IngestionScheduler class to determine whether it should run the scheduler.
public class IngestionScheduler {

  private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
  private final Map&lt;Urn, ScheduledFuture&lt;?&gt;&gt; nextIngestionSourceExecutionCache = new HashMap&lt;&gt;();
  private final boolean isSchedulerEnabled;

  public IngestionScheduler(boolean isSchedulerEnabled) {
    this.isSchedulerEnabled = isSchedulerEnabled;
  }

  public void init() {
    if (isSchedulerEnabled) {
      // Schedule a recurring task to refresh schedules
      scheduledExecutorService.scheduleAtFixedRate(
          new BatchRefreshSchedulesRunnable(),
          initialDelay,
          refreshInterval,
          TimeUnit.SECONDS);
    }
  }

  public void scheduleNextIngestionSourceExecution(final Urn ingestionSourceUrn, final DataHubIngestionSourceInfo newInfo) {
    if (isSchedulerEnabled) {
      // Compute the next execution time
      ZonedDateTime nextExecDate = computeNextExecutionTime(newInfo.getSchedule());
      long scheduleTime = nextExecDate.toInstant().toEpochMilli() - ZonedDateTime.now().toInstant().toEpochMilli();

      // Schedule the ingestion run
      ScheduledFuture&lt;?&gt; scheduledFuture = scheduledExecutorService.schedule(
          new ExecutionRequestRunnable(ingestionSourceUrn, newInfo),
          scheduleTime,
          TimeUnit.MILLISECONDS);
      nextIngestionSourceExecutionCache.put(ingestionSourceUrn, scheduledFuture);
    }
  }

  private ZonedDateTime computeNextExecutionTime(DataHubIngestionSourceSchedule schedule) {
    // Compute the next execution time based on the schedule
    CronExpression generator = CronExpression.parse(schedule.getInterval());
    return generator.next(ZonedDateTime.now());
  }
}
  1. Configure the GMS Instances:
    • Set the isSchedulerEnabled flag to true for only one GMS instance and false for the others.

Would you like more detailed steps on implementing any of these solutions?

Sources: