Databricks system tables are currently in Public Preview, which means they are accessible but some detail may still change. This is how Databricks describes system tables:
System tables are a Databricks-hosted analytical store of your account’s operational data found in the system catalog. System tables can be used for historical observability across your account.
I’m going to describe just one instance of where system tables were able to provide me with a simple, out-of-the-box solution to replace a less simple, less out-of-the-box solution. The ability to replace custom code in this manner is important when you’re trying to build a stable, resilient, manageable system. Thisis ajust a small example, but imaging how much easier an organization’s codebase could be if you recreate this process a few dozen times.
The Challenge
There are a number of different system tables but I want to focus on just the lineage tables: table_lineage and column_lineage. Specifically, I want to talk about how I leveraged them to dramatically simplify a process I was using to monitor a large number of Delta Share tables. We had a client that was consuming around 9,000 shared tables from a provider. The provider would drop tables into the share location and then a view (with column names for performance) would be created in a catalog in a business workspace. The provider said they would provide a list of changes every month; new or removed tables and/or columns. However, the client said they had a single-day SLA for changes. The business wanted to make sure they knew in advance that a job or report against a view would fail because the underlying table was dropped or altered. Also, if they requested a new table, they didn’t want to wait a month to find out if it had been delivered.
The Solution (Without System Tables)
We have Unity Catalog enabled, since this is a prerequisite for Delta Share. I needed to be able to monitor changes for all of the securable objects within the provider’s collection in Delta Share, which are schemas, tables and columns. There were only four or five schemas, so that monitoring could almost be done manually. However, there were a lot of tables and these tables even had a lot of columns. I only had access to the INFORMATION_SCHEMA for each of the schemas, so I could query table and column names. The idea was to maintain a hash of the ordered table (or column) names for a quick comparison. (I’m only showing the code for tables; the column code is similar.)
def get_tables_hash_and_names(spark):
tables_query = “””
SELECT table_name
FROM xyz.information_schema.tables
WHERE data_source_format = ‘DELTASHARING’
ORDER BY table_name DESC
“””
tables_df = spark.sql(tables_query)
table_names = sorted([row[‘table_name’] for row in tables_df.collect()])
tables_str = ‘|’.join(table_names)
tables_hash = hashlib.md5(tables_str.encode(‘utf-8’)).hexdigest()
return tables_hash, table_names
If there is discrepancy, then do a deeper dive to identify the changes. This means the state needs to be saved to compare the current day to the prior day,
def save_schema_hashes(spark, current_schema_hashes):
current_timestamp = datetime.now()
records = [(table_name, schema_hash, current_timestamp) for table_name, schema_hash in current_schema_hashes.items()]
schema_hashes_df = spark.createDataFrame(records, [“table_name”, “schema_hash”, “timestamp”])
schema_hashes_df.write.format(“delta”).mode(“append”).saveAsTable(“delta_schema_hashes”)
Finally, performing a quick comparison using the hash value stored in the keys identifies any tables or columns that may have changed, necessitating a new view creation.
def identify_schema_changes(current_schema_hashes, previous_schema_hashes):
added_tables = set(current_schema_hashes.keys()) – set(previous_schema_hashes.keys())
deleted_tables = set(previous_schema_hashes.keys()) – set(current_schema_hashes.keys())
modified_tables = set()
for table in (set(current_schema_hashes.keys()) & set(previous_schema_hashes.keys())):
if current_schema_hashes[table Ҡnot found /]
!= previous_schema_hashes[table Ҡnot found /]
:
modified_tables.add(table)
return added_tables, deleted_tables, modified_tables
The Solution (With System Tables)
Basically, I needed to build out an analytical store of the account’s operational data found in the system catalog. Sound familiar? Once Databricks provided lineage along with their system tables, none of this functionality was needed. Now everything changed from code to SQL, substantially reducing the complexity of the solution. Here is a query to see if a source exists in the target:
target_table_name
FROM
system.access.table_lineage tl_outer
WHERE
target_table_catalog = ‘{catalog_name}’ AND
target_table_schema = ‘{schema_name}’ AND
target_table_type = ‘VIEW’ AND
NOT EXISTS (SELECT 1
FROM system.access.table_lineage as tl_inner
WHERE
tl_outer.target_table_catalog = tl_inner.source_table_catalog AND
tl_outer.target_table_schema = tl_inner.source_table_schema AND
tl_outer.target_table_name = tl_inner.source_table_name)
And here is how you check if a target table no longer exists in the source:
source_table_name
FROM
system.access.table_lineage
WHERE
source_table_catalog = ‘{catalog_name}’ AND
source_table_schema = ‘{schema_name}’ AND
(target_table_catalog IS NULL AND
target_table_schema IS NULL AND
target_table_name IS NULL)
Conclusion
Views can be created or dropped automatically with this information. The same is true for columns. If something isn’t there, its either removed or modified and a new view is created automatically with the new columns. The client teams were more comfortable supporting SQL than pyspark. Also, I wasn’t storing and maintaining lineage in a table that the client had to pay for. All in all, the system tables let me revisit and simplify custom code.
Get in touch with us if you want to know more about how Databricks system tables could help you manage lineage, imporve governance, provide costs insights or potentially other use case.
Source: Read MoreÂ