How to Enable Lineage Metadata Emission in Spark Functions Using Datahub Integration

Original Slack Thread

I’m wondering what spark functions I should expect to have lineage metadata emitted for. I have the following snippet to test out lineage which references the AWS Glue Catalog as it’s Hive Metastore. (Running on EMR Serverless 6.7.0)

spark = SparkSession.builder.appName("TestDatahubJob").enableHiveSupport().getOrCreate()
df = spark.sql("select * from database.kpi_metrics limit 10")
This code runs successfully. The table is also successfully created in the glue `tmp_drew` database and written to s3. However there is no lineage metadata showing up on the UI. Do I need to use a different function to have datahub catch the lineage connection?
Datahub version `0.12.0`

Hey there! :wave: Make sure your message includes the following information if relevant, so we can help more effectively!

  1. Which DataHub version are you using? (e.g. 0.12.0)

  2. Please post any relevant error logs on the thread!

I’m fairly certain I’m using a supported command type since the queryPlan property on my task is
CreateHiveTableAsSelectCommand [Database: tmp_drew, TableName: table_datahub_4, InsertIntoHiveTable] +- GlobalLimit 10 +- LocalLimit 10 +- Relation database.kpi_metrics[,... 572 more fields] parquet
And both CreateHiveTableAsSelectCommand and InsertIntoHiveTable are supposed to be <Spark | DataHub for lineage>

Just to verify: to provide metadata to Datahub you’re using the spark jar, and NOT the glue integration? You provided the agent JAR to EMR? Routing between Datahub and EMR is working, and you have other push-based integrations working?

If that is all in place then I’d expect lineage to populate for you based on your write example. If you are instead extracting from glue then I wouldn’t expect lineage information. Glue doesn’t have a true table DDL stored when you write to it via df.write, hence needing to pull lineage off of the query plan with the Datahub Agent JAR.
I don’t use serverless EMR, but same idea I imagine.

CREATE TABLE spark_catalog.schema.table (   company_id INT,   dimension STRING NOT NULL,   id INT,   code STRING,   name STRING,   parent STRING,   parent_id INT) USING delta LOCATION '<s3://BUCKET/gold/schema/table>' TBLPROPERTIES (   'delta.enableChangeDataFeed' = 'true',   'delta.minReaderVersion' = '1',   'delta.minWriterVersion' = '4')
-- useless for lineage :) ```

Yup! your understanding is correct.

As update, I got the downstream lineage working. It appears that since 0.10.? nodes which are in the metadata graph don’t appear unless they have been ingested in glue.
So by re-ingesting in glue, the data lineage was captured from Spark Job → Glue Table.

My remaining problem is that the spark extraction treats the upstream reference as coming from s3, so it’s tmp_drew.test_table -&gt; DOWNSTREAM_OF -&gt; <s3://path/to/data> Instead of tmp_drew.test_table -&gt; DOWNSTREAM_OF -&gt; database.kpi_metrics

Interesting, are there any logs on the spark side listing what URNs each run is trying to change metadata on? And is glue data already loaded in?

For glue, the datasets are loaded in both as glue entities and as s3 entities (I turned on the emit_s3_lineage option for the glue connector). When I inspect the neo4j DB I see both the s3 nodes and glue nodes. Curiously these are also not showing lineage edges.
On spark I’ve had trouble setting the logs to debug mode since I’m submitting a python file to EMR. Is there a way to set logging to debug via the spark confs? I don’t see much from the non-debug logging other than the query plan

Here’s the non-debug log for basically the same test job, just different table namesattachment