Databricks Metadata Share

Steps

  1. If unchecked, check the box saying "Allow Delta Sharing with parties outside your organization."

  2. Set your "Organization name" to the name of your company/organization

  3. In the first cell of a Databricks notebook, install/upgrade the SDK:

    %pip install databricks-sdk --upgrade
  4. When finished, create a second cell and restart the Python kernel:

    %restart_python
  5. Copy-paste and run the following in a third cell to securely share your metadata with Espresso's Databricks account. This will allow us to generate savings estimates for your Databricks account.

from datetime import UTC, datetime, timedelta

from databricks.sdk import WorkspaceClient
from databricks.sdk.service import sharing
from pyspark.sql import SparkSession

TABLES_TO_SHARE = [
    ("system", "query", "history", "start_time"),
    ("system", "compute", "warehouse_events", "event_time"),
    ("system", "compute", "warehouses", None),
    ("system", "compute", "clusters", None),
    ("system", "compute", "node_timeline", "start_time"),
    ("system", "compute", "node_types", None),
    ("system", "billing", "list_prices", None),
    ("system", "billing", "usage", "usage_start_time"),
    ("system", "access", "audit", "event_time"),
    ("system", "lakeflow", "job_run_timeline", "period_start_time"),
    ("system", "lakeflow", "job_task_run_timeline", "period_start_time"),
    ("system", "serving", "served_entities", None),
    ("system", "serving", "endpoint_usage", "request_time"),
]

TABLE_FILTERS = {
    ("system", "access", "audit"): [
        "action_name NOT IN ('listHistoryQueries', 'oidcTokenAuthorization', 'tokenLogin', 'workspaceInHouseOAuthClientAuthentication', 'mintOAuthToken', 'getTable', 'generateTemporaryTableCredential', 'authzEval', 'aadTokenLogin', 'metadataAndPermissionsSnapshot', 'getPipeline', 'listCatalogs', 'metadataSnapshot', 'setTaskValue', 'getVolume')"
    ],
}


def is_user_workspace_admin(client):
    current_user = client.current_user.me()
    return current_user.groups is not None and any(
        group.display == "admins" for group in current_user.groups
    )


def is_user_metastore_admin(client):
    current_user = client.current_user.me()
    metastore = client.metastores.summary()
    if metastore.owner == current_user.user_name:
        return True
    return current_user.groups is not None and any(
        g.display == metastore.owner for g in current_user.groups
    )


spark = SparkSession.getActiveSession() or SparkSession.builder.getOrCreate()


def get_current_catalog_storage_root():
    current_catalog = spark.sql("SELECT current_catalog()").collect()[0][0]
    return client.catalogs.get(name=current_catalog).storage_root


def get_or_create_warehouse(client):
    for warehouse in client.warehouses.list():
        if warehouse.name == "ESPRESSO_AI_WAREHOUSE":
            return warehouse.id

    created_warehouse = client.warehouses.create_and_wait(
        name="ESPRESSO_AI_WAREHOUSE",
        cluster_size="X-Small",
        auto_stop_mins=5,
        enable_serverless_compute=True,
        min_num_clusters=1,
        max_num_clusters=1,
    )
    return created_warehouse.id


def get_or_create_catalog(client):
    CATALOG_NAME = "espresso_ai_system_metadata"
    if CATALOG_NAME in [c.name for c in client.catalogs.list()]:
        return CATALOG_NAME

    comment = "System metadata catalog for Espresso AI"
    try:
        client.catalogs.create(name=CATALOG_NAME, comment=comment)
    except Exception as e:
        if "Metastore storage root URL does not exist" in str(e):
            if not (storage_root := get_current_catalog_storage_root()):
                raise RuntimeError("No metastorage root or current catalog storage root")
            client.catalogs.create(
                name=CATALOG_NAME, comment=comment, storage_root=storage_root
            )
        else:
            raise
    return CATALOG_NAME


def get_or_create_metadata_tables(client, warehouse_id, catalog_name):
    cutoff_timestamp = (datetime.now(UTC) - timedelta(days=60)).isoformat()
    schema_names = list(set(schema for _, schema, _, _ in TABLES_TO_SHARE))

    existing_schemas = {s.name for s in client.schemas.list(catalog_name=catalog_name)}
    for schema_name in schema_names:
        if schema_name not in existing_schemas:
            client.schemas.create(
                catalog_name=catalog_name,
                name=schema_name,
                comment=f"Mirror of system.{schema_name}",
            )

    for catalog, schema, table_name, timestamp_col in TABLES_TO_SHARE:
        target_table = f"{catalog_name}.{schema}.{table_name}"

        conditions = TABLE_FILTERS.get((catalog, schema, table_name), []).copy()
        if timestamp_col:
            conditions.insert(0, f"{timestamp_col} >= '{cutoff_timestamp}'")

        where = f"WHERE {' AND '.join(conditions)}" if conditions else ""

        sql = f"""
            CREATE MATERIALIZED VIEW IF NOT EXISTS {target_table}
            AS SELECT * FROM {catalog}.{schema}.{table_name}
            {where}
        """
        client.statement_execution.execute_statement(
            warehouse_id=warehouse_id,
            catalog=catalog_name,
            schema=schema,
            statement=sql,
        )

    return schema_names


def create_share_with_system_catalog_schemas(client, catalog_name, schema_names):
    SHARE_NAME = "espresso_ai_system_data"

    existing_shares = {s.name for s in client.shares.list_shares()}
    if SHARE_NAME in existing_shares:
        share = client.shares.get(name=SHARE_NAME, include_shared_data=True)
    else:
        share = client.shares.create(
            name=SHARE_NAME, comment="System catalog data for Espresso AI"
        )

    existing = {obj.name for obj in (share.objects or [])}
    updates = [
        sharing.SharedDataObjectUpdate(
            action=sharing.SharedDataObjectUpdateAction.ADD,
            data_object=sharing.SharedDataObject(
                name=f"{catalog_name}.{schema_name}",
                data_object_type=sharing.SharedDataObjectDataObjectType.SCHEMA,
            ),
        )
        for schema_name in schema_names
        if f"{catalog_name}.{schema_name}" not in existing
    ]

    if updates:
        client.shares.update(name=SHARE_NAME, updates=updates)
    return SHARE_NAME


def get_or_create_recipient(client):
    RECIPIENT_NAME = "espresso_ai"
    SHARING_IDENTIFIER = "aws:us-west-2:6a0451ec-2d11-48b7-8fde-ffaf14401682"

    existing_recipients = {r.name: r for r in client.recipients.list()}
    if RECIPIENT_NAME in existing_recipients:
        if (
            existing_recipients[RECIPIENT_NAME].data_recipient_global_metastore_id
            == SHARING_IDENTIFIER
        ):
            return RECIPIENT_NAME
        client.recipients.delete(name=RECIPIENT_NAME)

    client.recipients.create(
        name=RECIPIENT_NAME,
        authentication_type=sharing.AuthenticationType.DATABRICKS,
        data_recipient_global_metastore_id=SHARING_IDENTIFIER,
        comment="Espresso AI optimizer service",
    )
    return RECIPIENT_NAME


def grant_recipient_access(client, share_name, recipient_name):
    client.shares.update_permissions(
        name=share_name,
        changes=[sharing.PermissionsChange(principal=recipient_name, add=["SELECT"])],
    )


if __name__ == "__main__":
    client = WorkspaceClient()

    if not is_user_workspace_admin(client):
        print("⚠️  WORKSPACE ADMIN required: Ask a workspace admin for access.")
    if not is_user_metastore_admin(client):
        print("⚠️  METASTORE ADMIN required: Visit https://accounts.cloud.databricks.com/data")

    warehouse_id = get_or_create_warehouse(client)
    catalog_name = get_or_create_catalog(client)
    schema_names = get_or_create_metadata_tables(client, warehouse_id, catalog_name)
    share_name = create_share_with_system_catalog_schemas(client, catalog_name, schema_names)
    recipient_name = get_or_create_recipient(client)
    grant_recipient_access(client, share_name, recipient_name)

    print("\n🎉 Delta Sharing setup complete!")
    print("=" * 50)
    print(f"Share: {share_name}")
    print(f"Recipient: {recipient_name}")
    print(f"Catalog: {catalog_name}")
    print("=" * 50)

Questions?

Book a Callarrow-up-right or email [email protected]envelope.

Last updated