Discussion on BigQuery Plugin Query Issues and Data Scanning Costs

Original Slack Thread

Hi everyone, I’m having a problem with the bigQuery Plugin.
The BigQuery Plugin executes some wrong queries in BigQuery, which cause errors.
Here is an example.

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/dist-packages/google/cloud/bigquery/dbapi/cursor.py", line 203, in _execute
    self._query_job.result()
  File "/usr/local/lib/python3.7/dist-packages/google/cloud/bigquery/job/query.py", line 1499, in result
    do_get_result()
  File "/usr/local/lib/python3.7/dist-packages/google/api_core/retry.py", line 354, in retry_wrapped_func
    on_error=on_error,
  File "/usr/local/lib/python3.7/dist-packages/google/api_core/retry.py", line 191, in retry_target
    return target()
  File "/usr/local/lib/python3.7/dist-packages/google/cloud/bigquery/job/query.py", line 1489, in do_get_result
    super(QueryJob, self).result(retry=retry, timeout=timeout)
  File "/usr/local/lib/python3.7/dist-packages/google/cloud/bigquery/job/base.py", line 728, in result
    return super(_AsyncJob, self).result(timeout=timeout, **kwargs)
  File "/usr/local/lib/python3.7/dist-packages/google/api_core/future/polling.py", line 261, in result
    raise self._exception
google.api_core.exceptions.BadRequest: 400 Cannot query over table 'automizely-data-warehouse.dwb_utc_00.dwb_flow_mt_start_product_i_d' without a filter over column(s) 'processing_time' that can be used for partition elimination

Location: US
Job ID: 76336456-c91b-4cb2-99ca-516d14c7f065```
The query executed here is: (obviously the partition field is using the wrong function)
```SELECT
    *
FROM
    `automizely-data-warehouse.dwb_utc_00.dwb_flow_mt_start_product_i_d`
WHERE
    TIMESTAMP(`processing_time`) BETWEEN TIMESTAMP('2023-08-08 00:00:00') AND TIMESTAMP('2023-08-09 00:00:00')```
This query is also incorrect in that it should not use `select *`, which will cause a cost to the BigQuery for the data.

Why is it querying the data, I realized that the BigQuery Plugin is querying the new partition data every day, which is equivalent to scanning the whole table.

This is interesting - <@U01GZEETMEZ> could you look into this?

To make sure I understand - the problem is that we should be using just processing_time instead of TIMESTAMP(processing_time) , is that right?

Yep, this query error shows that we should be using just processing_time .
But more, why bigQuery plugin need scan the data with select *

I think the select * is a quirk of how we create “temp tables” with bigquery (see the comment here https://github.com/datahub-project/datahub/blob/8fb5912978d35200367a68b7bcd69a69f93a540e/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py#L937|https://github.com/datahub-project/datahub/blob/8fb5912978d35200367a68b7bcd69a69f9[…]data-ingestion/src/datahub/ingestion/source/ge_data_profiler.py) but not 100% sure - I’d need to see more of the stack trace to be sure

Well, not very well understood, but there is a charge for scanning bigQuery tables, and if every table is scanned for the day’s partitions, that’s the equivalent of a full table scan, which is expensive, so beware:joy:

Hey <@U056FMMD2NA> it is attempted in bigquery profiling to scan only the latest partition data for partitioned tables when reporting field level profile (min, max, median, sample values etc). This may very well consume cost for scanning only the latest partition and not full table scan.

Can you elaborate a bit more what do you mean by “that’s the equivalent of a full table scan” ? Also if you’ve any suggestions to make this better in terms of cost, please let us know.

I think they’re saying that it should be where processing_time between X and Y instead of where timestamp(processing_time) between X and Y

yes, I got that part. I’ll be making that fix soon. I was wondering if there is something else here that I did not understand.

This should be fixed by https://github.com/datahub-project/datahub/pull/8778.

Hi <@U02G4B6ADL6>
I’m concerned that scanning for the newest partition will be costly, assuming I set up the bigQuery ingest program to run once a day, which would equate to scanning for the newest partition every day.
I’m also curious as to why datahub is running SELECT * , which is a QUERY that scans the data directly, and if it’s to get some metadata, would it be better to do it another way?
I’ve been using bigQuery for a long time, so maybe I can help you in some way.

As I mentioned earlier - The newest partition is to be read/scanned to be able to compute most uptodate field level statistics (mean, median, etc). So only the newest partition would get scanned on the day you perform field level profiling. A full table scan is never performed on any day, however, I believe, you are suggesting that scanning latest partition everyday would amount to full table scan if scans for individual days are summed up.

This should answer your question about need to select * from latest partition - https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py#L974

A couple of optimisations already available in terms of profiling execution (and saving cost) are -

  1. profiling is skipped if the table has not been updated, as per last_modified_time, if stateful ingestion is enabled (store_last_profiling_timestamps / enable_stateful_profiling)
  2. One can schedule profiling to run only particular day of week/month to save on cost. (profile_day_of_week / profile_date_of_month)
  3. You can completely disable column level statistics. (profile_table_level_only )
    We are also planning to support profiling on sample of table (using TABLESAMPLE SYSTEM query), however this would help to reduce scanning only with non-partitioned tables. We’d be glad to hear your thoughts if you have any suggestions.

We forgot to follow up here, but this was fixed by https://github.com/datahub-project/datahub/pull/8778