Managing Event Processing Order in Multiple `mae-consumer` Servers

Original Slack Thread

Hi team, I’m a data engineer in Samsung, and we’re considering to use DataHub as a engine of our company’s data catalog system.
We are expecting that huge number of Metadata Change Log events will be generated, so we need to scale-out mae-consumer server.
Currently, mae-consumer is deployed as a Deployment in k8s, so physically scaling out is not a problem, but I’m curious about whether the event processing order will be the problem or not.

For example, let’s assume MCL1 is generated first, and MCL2 is generated after it and mae-consumer-1 pod, mae-consumer-2 pod is deployed.
mae-consumer-1 pod recieves MCL1, mae-consumer-2 pod receives MCL2.
For some reason, mae-consumer-2 pod processed MCL2 first, so some action is done to ElasticSearch, Neo4j. Next mae-consumer-1 pod processed MCL1 after, so another action is done to ES, Neo4j too.
In this situation, I expect that there can be some problems…

So, will there be some problems because of the processing order in multiple mae-consumer? Will our team need to develop some functionalities for them? (edited)

  1. CLI for integration
  2. 0.12.0 version
  3. Not directly about the data source, but about the MCL processing…

Hello <@U069AP5MJGG> I think I know the answer to your question (was actually my concern at some moment in the past too - so I would be very happy to compare our experiences). First of all - if you want to use multiple mae-consumers they need to be part of a topic consumer group (the MCL events are stored, by default, in MetadataChangeLog_Versioned_v1 and MetadataChangeLog_Timeseries_v1 kafka topics). These topics need to have proper amount of partitions defined according to the number of consumers you are going to use (if topic has, let’s say, 6 partitions and you run 7 consumers one of them will not get any partition assigned and therefor be idle).
The key used for partitioning events is urn of the involved entity:
https://github.com/datahub-project/datahub/blob/master/metadata-dao-impl/kafka-producer/src/main/java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java#L77|https://github.com/datahub-project/datahub/blob/master/metadata-dao-impl/kafka-pro[…]java/com/linkedin/metadata/dao/producer/KafkaEventProducer.java
It is my understanding that events targeting particular entity will be stored in the same partition - then processed according to the arrival time by a single consumer.
By the way - my team found out that a bottleneck in processing a lot of events is not kafka or mae-consumer performance - rather indexing latency of Elastic Search. Looking forward to get to know statistics from your ingestion runs, if possible!

First of all, thank you for your kind answer. And last Friday, we also find that you’ve already solved this problem by making mae/mce-consumers into one consumer group & make all entity’s urn as a partition key.
And wow, thank you for another unexpected answer. Then we need to consider ES configuration for indexing latency.
If we have any relevant experience, we’ll be happy to share it with you. thank you so much.