Optimizing MAE Consumer Performance for Index Restoration

Original Slack Thread

we are trying to restoreIndices for ALL aspects in our SQL database, but its taking too long. we have been experimenting with changing the values for ES_BULK_REQUESTS_LIMIT and ES_BULK_FLUSH_PERIOD but no matter what it seems like we can’t get the ElasticSearch BulkProcessor to do much batching (see logs copied below). Any tips how we can optimize the MAE Consumer?

2023-11-09 20:36:14,315 [ThreadPoolTaskExecutor-1] INFO c.l.m.s.e.update.BulkListener - Successfully fed bulk request. Number of events: 3 Took time ms: -1
2023-11-09 20:36:14,330 [ThreadPoolTaskExecutor-1] INFO c.l.m.s.e.update.BulkListener - Successfully fed bulk request. Number of events: 3 Took time ms: -1
2023-11-09 20:36:14,652 [ThreadPoolTaskExecutor-1] INFO c.l.m.s.e.update.BulkListener - Successfully fed bulk request. Number of events: 4 Took time ms: -1
2023-11-09 20:36:14,654 [ThreadPoolTaskExecutor-1] INFO c.l.m.s.e.update.BulkListener - Successfully fed bulk request. Number of events: 4 Took time ms: -1
2023-11-09 20:36:14,836 [ThreadPoolTaskExecutor-1] INFO c.l.m.s.e.update.BulkListener - Successfully fed bulk request. Number of events: 34 Took time ms: -1
2023-11-09 20:36:14,851 [ThreadPoolTaskExecutor-1] INFO c.l.m.s.e.update.BulkListener - Successfully fed bulk request. Number of events: 3 Took time ms: -1
2023-11-09 20:36:14,916 [ThreadPoolTaskExecutor-1] INFO c.l.m.s.e.update.BulkListener - Successfully fed bulk request. Number of events: 63 Took time ms: -1
2023-11-09 20:36:14,930 [ThreadPoolTaskExecutor-1] INFO c.l.m.s.e.update.BulkListener - Successfully fed bulk request. Number of events: 3 Took time ms: -1
2023-11-09 20:36:15,108 [ThreadPoolTaskExecutor-1] INFO c.l.m.s.e.update.BulkListener - Successfully fed bulk request. Number of events: 14 Took time ms: -1
2023-11-09 20:36:15,212 [ThreadPoolTaskExecutor-1] INFO c.l.m.s.e.update.BulkListener - Successfully fed bulk request. Number of events: 62 Took time ms: -1
2023-11-09 20:36:15,228 [ThreadPoolTaskExecutor-1] INFO c.l.m.s.e.update.BulkListener - Successfully fed bulk request. Number of events: 3 Took time ms: -1

Anyone able to look at this?

Are you running any other operations while doing the restore? Specifically deletes will hamper the batching

Nope just the restore

I am facing the same issue as well. Initially I will try to set the bulk parameters, if you are able to get it working please let us know.

ES_BULK_ENABLE_BATCH_DELETE try enabling deletes to be included in the same batch

ELASTICSEARCH_INDEX_BUILDER_REFRESH_INTERVAL_SECONDS

set to 1 by default

https://github.com/datahub-project/datahub/blob/master/metadata-service/configuration/src/main/resources/application.yml#L185C31-L185C83

<@U03MF8MU5P0> <@UV5UEC3LN>

to follow-up from office hours - we are using Elastic v8.6 … to get this to work we had to update our Forked Datahub code to use the ElasticSearch v7.17 client and used this ApiCompatibilityMode setting to talk to Elastic 8

https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-compatibility.html

<@U03MF8MU5P0> - the index refresh interval setting … that needs to be set in the GMS config right? (not the MAE consumer)? can that be changed at any time?

That setting for the index is used by the setup job called system-update

It sets index settings on datahub helm installs and upgrades

update - setting ES_BULK_ENABLE_BATCH_DELETE allowed us to get the bulk processor honoring batch size again. so I guess we must have been doing deletes somehow

however, we are still seeing things move pretty slowly. we tried commenting out this step in the UpdateIndicesHook and that caused things to speed up considerably:

  SystemMetadata systemMetadata = event.getSystemMetadata();
  if (_graphDiffMode &amp;&amp; !(_graphService instanceof DgraphGraphService)
      &amp;&amp; (systemMetadata == null || systemMetadata.getProperties() == null
      || !Boolean.parseBoolean(systemMetadata.getProperties().get(FORCE_INDEXING_KEY)))) {
    updateGraphServiceDiff(urn, aspectSpec, previousAspect, aspect, event);
  } else {
    updateGraphService(urn, aspectSpec, aspect, event);
  }
}```

so trying to figure out what we can optimize in the updateGraphService code. the logic it implements seems to be:
• find existing relationships by aspect/urn
• delete those relationships
• add relationships back (based on new event)
any thoughts on how we can make this part faster? any additional configs to try? in our case we are restoringIndicies to an already loaded Elastic…wonder if this would work better if we were starting from an empty ES index

Since you’re doing a full restore, it is possible that starting from an empty graph index would be faster as this would reduce time taken to search the index and remove existing relationships. Restore bypasses the diff mode optimizations to ensure you’re starting from a clean state

we’ll give that a shot

Ok yep it looks like starting from an empty index yields much better performance. We still found that setting ES_BULK_ENABLE_BATCH_DELETE=true improved performance by ~3x

our new plan is to automate a process for Elasticsearch Refreshes that works something like this:
• Configure upgrade-job to publish the restore events to a new, dedicated “restore-MCL” topic
• deploy a second instance of MAE-consumer (call it “restore-consumer”) which loads data from the restore-mcl topic into a brand new empty set of ElasticSearch indexes
• wait for job to complete
• shut down restore-consumer
• point original mae-consumer at the new ElasticSearch indexes & replay X hours of events
• point GMS/rest of datahub at new Elasticsearch indexes
• delete old ES indexes
the goals there are:
• we do not have any impact to the live GMS/ElasticSearch indexes while we reload from scratch
• action handlers dont have to reprocess a full set of MCL events (not needed since they don’t skip failed messages)
Interested in any feedback with this plan

Seems reasonable to me, restores should be very infrequent, but this plan should keep full uptime when you have to do one.