Integrating Airflow 2.5.0 to Datahub using Datahub Plugin v2 encountered PluginValidationError: Plugin 'datahub_airflow_plugin._datahub_listener_module'

Original Slack Thread

Hi guys,

I tried to integrate Airflow 2.5.0 (Python 3.8) to Datahub using the Datahub plugin v2 (0.12.1.3), I follow the step in this https://datahubproject.io/docs/lineage/airflow#datahub-plugin-v2|docs, yet I always encounter this error:

hookimpl definition: on_dag_run_running(dag_run, session)Argument(s) {'session'} are declared in the hookimpl but can not be found in the hookspec```
Please help. Thanks in advance! :blob_help:

Hey there! :wave: Make sure your message includes the following information if relevant, so we can help more effectively!

  1. Which DataHub version are you using? (e.g. 0.12.0)
  2. Please post any relevant error logs on the thread!

Bump guys! :blob_help:

I’ve shared the details in this post -> https://github.com/datahub-project/datahub/issues/9702.

<@U01GZEETMEZ> sorry for tagging, could you please help me? Thank you! :blob_help:

Here is the error log from the scheduler pod:


[2024-01-24T05:14:06.241+0000] {utils.py:430} WARNING - No module named 'airflow.providers.ftp.operators'
[2024-01-24T05:14:06.247+0000] {utils.py:430} WARNING - No module named 'airflow.providers.dbt'
[2024-01-24T05:14:07.101+0000] {base.py:73} INFO - Using connection ID 'datahub_rest_default' for task execution.
[2024-01-24T05:14:07.103+0000] {base.py:73} INFO - Using connection ID 'datahub_rest_default' for task execution.
[2024-01-24T05:14:07.104+0000] {datahub_listener.py:140} INFO - DataHub plugin v2 using DataHubRestEmitter: configured to talk to <http://datahub-datahub-gms.datahub:8080/> with token: eyJh**********aZVg
  ____________       _____________
 ____    |__( )_________  __/__  /________      __
____  /| |_  /__  ___/_  /_ __  /_  __ \_ | /| / /
___  ___ |  / _  /   _  __/ _  / / /_/ /_ |/ |/ /
 _/_/  |_/_/  /_/    /_/    /_/  \____/____/|__/
Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 8, in &lt;module&gt;
    sys.exit(main())
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/__main__.py", line 39, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 52, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/utils/cli.py", line 108, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py", line 73, in scheduler
    _run_scheduler_job(args=args)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/cli/commands/scheduler_command.py", line 36, in _run_scheduler_job
    job = SchedulerJob(
  File "&lt;string&gt;", line 4, in __init__
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/state.py", line 482, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/util/compat.py", line 210, in raise_
    raise exception
  File "/home/airflow/.local/lib/python3.8/site-packages/sqlalchemy/orm/state.py", line 479, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/scheduler_job.py", line 153, in __init__
    super().__init__(*args, **kwargs)
  File "&lt;string&gt;", line 6, in __init__
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/jobs/base_job.py", line 114, in __init__
    get_listener_manager().hook.on_starting(component=self)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/listeners/listener.py", line 72, in get_listener_manager
    integrate_listener_plugins(_listener_manager)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/plugins_manager.py", line 505, in integrate_listener_plugins
    listener_manager.add_listener(listener)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/listeners/listener.py", line 59, in add_listener
    self.pm.register(listener)
  File "/home/airflow/.local/lib/python3.8/site-packages/pluggy/_manager.py", line 114, in register
    self._verify_hook(hook, hookimpl)
  File "/home/airflow/.local/lib/python3.8/site-packages/pluggy/_manager.py", line 232, in _verify_hook
    raise PluginValidationError(
pluggy._manager.PluginValidationError: Plugin 'datahub_airflow_plugin._datahub_listener_module' for hook 'on_dag_run_running'
hookimpl definition: on_dag_run_running(dag_run, session)Argument(s) {'session'} are declared in the hookimpl but can not be found in the hookspec```

Hey <@U06EMMG9ARZ>, thanks for your patience here! <@UV14447EU> <@U01GZEETMEZ> any ideas/guidance here?

Hi <@U0121TRV0FL>, thanks for bumping up the thread. <@U01GZEETMEZ> already help me to fix this issue in this https://github.com/datahub-project/datahub/pull/9719|PR. Thank you!