Databricks Metadata Share
Steps
%pip install databricks-sdk --upgrade%restart_python
from datetime import UTC, datetime, timedelta
from databricks.sdk import WorkspaceClient
from databricks.sdk.service import sharing
from pyspark.sql import SparkSession
TABLES_TO_SHARE = [
("system", "query", "history", "start_time"),
("system", "compute", "warehouse_events", "event_time"),
("system", "compute", "warehouses", None),
("system", "compute", "clusters", None),
("system", "compute", "node_timeline", "start_time"),
("system", "compute", "node_types", None),
("system", "billing", "list_prices", None),
("system", "billing", "usage", "usage_start_time"),
("system", "access", "audit", "event_time"),
("system", "lakeflow", "job_run_timeline", "period_start_time"),
("system", "lakeflow", "job_task_run_timeline", "period_start_time"),
("system", "serving", "served_entities", None),
("system", "serving", "endpoint_usage", "request_time"),
]
TABLE_FILTERS = {
("system", "access", "audit"): [
"action_name NOT IN ('listHistoryQueries', 'oidcTokenAuthorization', 'tokenLogin', 'workspaceInHouseOAuthClientAuthentication', 'mintOAuthToken', 'getTable', 'generateTemporaryTableCredential', 'authzEval', 'aadTokenLogin', 'metadataAndPermissionsSnapshot', 'getPipeline', 'listCatalogs', 'metadataSnapshot', 'setTaskValue', 'getVolume')"
],
}
def is_user_workspace_admin(client):
current_user = client.current_user.me()
return current_user.groups is not None and any(
group.display == "admins" for group in current_user.groups
)
def is_user_metastore_admin(client):
current_user = client.current_user.me()
metastore = client.metastores.summary()
if metastore.owner == current_user.user_name:
return True
return current_user.groups is not None and any(
g.display == metastore.owner for g in current_user.groups
)
spark = SparkSession.getActiveSession() or SparkSession.builder.getOrCreate()
def get_current_catalog_storage_root():
current_catalog = spark.sql("SELECT current_catalog()").collect()[0][0]
return client.catalogs.get(name=current_catalog).storage_root
def get_or_create_warehouse(client):
for warehouse in client.warehouses.list():
if warehouse.name == "ESPRESSO_AI_WAREHOUSE":
return warehouse.id
created_warehouse = client.warehouses.create_and_wait(
name="ESPRESSO_AI_WAREHOUSE",
cluster_size="X-Small",
auto_stop_mins=5,
enable_serverless_compute=True,
min_num_clusters=1,
max_num_clusters=1,
)
return created_warehouse.id
def get_or_create_catalog(client):
CATALOG_NAME = "espresso_ai_system_metadata"
if CATALOG_NAME in [c.name for c in client.catalogs.list()]:
return CATALOG_NAME
comment = "System metadata catalog for Espresso AI"
try:
client.catalogs.create(name=CATALOG_NAME, comment=comment)
except Exception as e:
if "Metastore storage root URL does not exist" in str(e):
if not (storage_root := get_current_catalog_storage_root()):
raise RuntimeError("No metastorage root or current catalog storage root")
client.catalogs.create(
name=CATALOG_NAME, comment=comment, storage_root=storage_root
)
else:
raise
return CATALOG_NAME
def get_or_create_metadata_tables(client, warehouse_id, catalog_name):
cutoff_timestamp = (datetime.now(UTC) - timedelta(days=60)).isoformat()
schema_names = list(set(schema for _, schema, _, _ in TABLES_TO_SHARE))
existing_schemas = {s.name for s in client.schemas.list(catalog_name=catalog_name)}
for schema_name in schema_names:
if schema_name not in existing_schemas:
client.schemas.create(
catalog_name=catalog_name,
name=schema_name,
comment=f"Mirror of system.{schema_name}",
)
for catalog, schema, table_name, timestamp_col in TABLES_TO_SHARE:
target_table = f"{catalog_name}.{schema}.{table_name}"
conditions = TABLE_FILTERS.get((catalog, schema, table_name), []).copy()
if timestamp_col:
conditions.insert(0, f"{timestamp_col} >= '{cutoff_timestamp}'")
where = f"WHERE {' AND '.join(conditions)}" if conditions else ""
sql = f"""
CREATE MATERIALIZED VIEW IF NOT EXISTS {target_table}
AS SELECT * FROM {catalog}.{schema}.{table_name}
{where}
"""
client.statement_execution.execute_statement(
warehouse_id=warehouse_id,
catalog=catalog_name,
schema=schema,
statement=sql,
)
return schema_names
def create_share_with_system_catalog_schemas(client, catalog_name, schema_names):
SHARE_NAME = "espresso_ai_system_data"
existing_shares = {s.name for s in client.shares.list_shares()}
if SHARE_NAME in existing_shares:
share = client.shares.get(name=SHARE_NAME, include_shared_data=True)
else:
share = client.shares.create(
name=SHARE_NAME, comment="System catalog data for Espresso AI"
)
existing = {obj.name for obj in (share.objects or [])}
updates = [
sharing.SharedDataObjectUpdate(
action=sharing.SharedDataObjectUpdateAction.ADD,
data_object=sharing.SharedDataObject(
name=f"{catalog_name}.{schema_name}",
data_object_type=sharing.SharedDataObjectDataObjectType.SCHEMA,
),
)
for schema_name in schema_names
if f"{catalog_name}.{schema_name}" not in existing
]
if updates:
client.shares.update(name=SHARE_NAME, updates=updates)
return SHARE_NAME
def get_or_create_recipient(client):
RECIPIENT_NAME = "espresso_ai"
SHARING_IDENTIFIER = "aws:us-west-2:6a0451ec-2d11-48b7-8fde-ffaf14401682"
existing_recipients = {r.name: r for r in client.recipients.list()}
if RECIPIENT_NAME in existing_recipients:
if (
existing_recipients[RECIPIENT_NAME].data_recipient_global_metastore_id
== SHARING_IDENTIFIER
):
return RECIPIENT_NAME
client.recipients.delete(name=RECIPIENT_NAME)
client.recipients.create(
name=RECIPIENT_NAME,
authentication_type=sharing.AuthenticationType.DATABRICKS,
data_recipient_global_metastore_id=SHARING_IDENTIFIER,
comment="Espresso AI optimizer service",
)
return RECIPIENT_NAME
def grant_recipient_access(client, share_name, recipient_name):
client.shares.update_permissions(
name=share_name,
changes=[sharing.PermissionsChange(principal=recipient_name, add=["SELECT"])],
)
if __name__ == "__main__":
client = WorkspaceClient()
if not is_user_workspace_admin(client):
print("⚠️ WORKSPACE ADMIN required: Ask a workspace admin for access.")
if not is_user_metastore_admin(client):
print("⚠️ METASTORE ADMIN required: Visit https://accounts.cloud.databricks.com/data")
warehouse_id = get_or_create_warehouse(client)
catalog_name = get_or_create_catalog(client)
schema_names = get_or_create_metadata_tables(client, warehouse_id, catalog_name)
share_name = create_share_with_system_catalog_schemas(client, catalog_name, schema_names)
recipient_name = get_or_create_recipient(client)
grant_recipient_access(client, share_name, recipient_name)
print("\n🎉 Delta Sharing setup complete!")
print("=" * 50)
print(f"Share: {share_name}")
print(f"Recipient: {recipient_name}")
print(f"Catalog: {catalog_name}")
print("=" * 50)
Questions?
Last updated