# Databricks Metadata Share

#### Steps <a href="#steps" id="steps"></a>

1. Visit <https://accounts.cloud.databricks.com/data/> and click on your metastore.
2. If unchecked, check the box saying "Allow Delta Sharing with parties outside your organization."
3. Set your "Organization name" to the name of your company/organization
4. In the first cell of a Databricks notebook, install/upgrade the SDK: `%pip install databricks-sdk --upgrade`
5. When finished, create a second cell and restart the Python kernel: `%restart_python`
6. Copy-paste and run the following in a third cell to securely share your metadata with Espresso's Databricks account. This will allow us to generate savings estimates for your Databricks account.

```python
from datetime import UTC, datetime, timedelta

from databricks.sdk import WorkspaceClient
from databricks.sdk.service import sharing
from pyspark.sql import SparkSession

TABLES_TO_SHARE = [
    ("system", "query", "history", "start_time"),
    ("system", "compute", "warehouse_events", "event_time"),
    ("system", "compute", "warehouses", None),
    ("system", "compute", "clusters", None),
    ("system", "compute", "node_timeline", "start_time"),
    ("system", "compute", "node_types", None),
    ("system", "billing", "list_prices", None),
    ("system", "billing", "usage", "usage_start_time"),
    ("system", "access", "audit", "event_time"),
    ("system", "access", "workspaces_latest", None),
    ("system", "lakeflow", "job_run_timeline", "period_start_time"),
    ("system", "lakeflow", "job_task_run_timeline", "period_start_time"),
    ("system", "serving", "served_entities", None),
    ("system", "serving", "endpoint_usage", "request_time"),
    ("system", "information_schema", "metastores", None),
]

TABLE_FILTERS = {
    ("system", "access", "audit"): [
        "action_name NOT IN ('listHistoryQueries', 'oidcTokenAuthorization', 'tokenLogin', 'workspaceInHouseOAuthClientAuthentication', 'mintOAuthToken', 'getTable', 'generateTemporaryTableCredential', 'authzEval', 'aadTokenLogin', 'metadataAndPermissionsSnapshot', 'getPipeline', 'listCatalogs', 'metadataSnapshot', 'setTaskValue', 'getVolume', 'loadTable', 'getCatalog', 'config', 'getTableById', 'getSchema')",
        "service_name NOT IN ('syslog', 'capsule8-alerts-dataplane', 'clamAVScanService-dataplane', 'monit')",
    ],
}


def is_user_workspace_admin(client):
    current_user = client.current_user.me()
    return current_user.groups is not None and any(
        group.display == "admins" for group in current_user.groups
    )


def is_user_metastore_admin(client):
    current_user = client.current_user.me()
    metastore = client.metastores.summary()
    if metastore.owner == current_user.user_name:
        return True
    return current_user.groups is not None and any(
        g.display == metastore.owner for g in current_user.groups
    )


def check_permissions(client):
    missing = []
    if not is_user_workspace_admin(client):
        missing.append(
            "WORKSPACE ADMIN required: ask a workspace admin to add you to the 'admins' group."
        )
    if not is_user_metastore_admin(client):
        missing.append(
            "METASTORE ADMIN required: visit https://accounts.cloud.databricks.com/data, "
            "open your metastore's Configuration tab, and set yourself as Metastore Admin."
        )
    if missing:
        raise PermissionError("Cannot run share_data.py:\n  - " + "\n  - ".join(missing))


spark = SparkSession.getActiveSession() or SparkSession.builder.getOrCreate()


def get_current_catalog_storage_root(client):
    current_catalog = spark.sql("SELECT current_catalog()").collect()[0][0]
    return client.catalogs.get(name=current_catalog).storage_root


def get_or_create_warehouse(client):
    for warehouse in client.warehouses.list():
        if warehouse.name == "ESPRESSO_AI_WAREHOUSE":
            return warehouse.id

    created_warehouse = client.warehouses.create_and_wait(
        name="ESPRESSO_AI_WAREHOUSE",
        cluster_size="X-Small",
        auto_stop_mins=5,
        enable_serverless_compute=True,
        min_num_clusters=1,
        max_num_clusters=1,
    )
    return created_warehouse.id


def get_or_create_catalog(client):
    CATALOG_NAME = "espresso_ai_system_metadata"
    if CATALOG_NAME in [c.name for c in client.catalogs.list()]:
        return CATALOG_NAME

    comment = "System metadata catalog for Espresso AI"
    try:
        client.catalogs.create(name=CATALOG_NAME, comment=comment)
    except Exception as e:
        if "Metastore storage root URL does not exist" in str(e):
            if not (storage_root := get_current_catalog_storage_root(client)):
                raise RuntimeError("No metastorage root or current catalog storage root")
            client.catalogs.create(
                name=CATALOG_NAME, comment=comment, storage_root=storage_root
            )
        else:
            raise
    return CATALOG_NAME


def get_or_create_metadata_tables(client, warehouse_id, catalog_name):
    cutoff_timestamp = (datetime.now(UTC) - timedelta(days=60)).isoformat()

    # `information_schema` is auto-created and read-only in every UC catalog,
    # so we can't mirror into it — write those tables to `metadata` instead.
    schema_names = list(
        {("metadata" if s == "information_schema" else s) for _, s, _, _ in TABLES_TO_SHARE}
    )

    existing_schemas = {s.name for s in client.schemas.list(catalog_name=catalog_name)}
    for schema_name in schema_names:
        if schema_name not in existing_schemas:
            client.schemas.create(
                catalog_name=catalog_name,
                name=schema_name,
                comment=f"Mirror of system.{schema_name}",
            )

    for catalog, schema, table_name, timestamp_col in TABLES_TO_SHARE:
        target_schema = "metadata" if schema == "information_schema" else schema
        target_table = f"{catalog_name}.{target_schema}.{table_name}"

        conditions = TABLE_FILTERS.get((catalog, schema, table_name), []).copy()
        if timestamp_col:
            conditions.insert(0, f"{timestamp_col} >= '{cutoff_timestamp}'")

        where = f"WHERE {' AND '.join(conditions)}" if conditions else ""

        sql = f"""
            CREATE MATERIALIZED VIEW IF NOT EXISTS {target_table}
            AS SELECT * FROM {catalog}.{schema}.{table_name}
            {where}
        """
        client.statement_execution.execute_statement(
            warehouse_id=warehouse_id,
            catalog=catalog_name,
            schema=target_schema,
            statement=sql,
        )

    return schema_names


def create_share_with_system_catalog_schemas(client, catalog_name, schema_names):
    SHARE_NAME = "espresso_ai_system_data"

    existing_shares = {s.name for s in client.shares.list_shares()}
    if SHARE_NAME in existing_shares:
        share = client.shares.get(name=SHARE_NAME, include_shared_data=True)
    else:
        share = client.shares.create(
            name=SHARE_NAME, comment="System catalog data for Espresso AI"
        )

    existing = {obj.name for obj in (share.objects or [])}
    updates = [
        sharing.SharedDataObjectUpdate(
            action=sharing.SharedDataObjectUpdateAction.ADD,
            data_object=sharing.SharedDataObject(
                name=f"{catalog_name}.{schema_name}",
                data_object_type=sharing.SharedDataObjectDataObjectType.SCHEMA,
            ),
        )
        for schema_name in schema_names
        if f"{catalog_name}.{schema_name}" not in existing
    ]

    if updates:
        client.shares.update(name=SHARE_NAME, updates=updates)
    return SHARE_NAME


def get_or_create_recipient(client):
    RECIPIENT_NAME = "espresso_ai"
    SHARING_IDENTIFIER = "aws:us-west-2:6a0451ec-2d11-48b7-8fde-ffaf14401682"

    existing_recipients = {r.name: r for r in client.recipients.list()}
    if RECIPIENT_NAME in existing_recipients:
        if (
            existing_recipients[RECIPIENT_NAME].data_recipient_global_metastore_id
            == SHARING_IDENTIFIER
        ):
            return RECIPIENT_NAME
        client.recipients.delete(name=RECIPIENT_NAME)

    client.recipients.create(
        name=RECIPIENT_NAME,
        authentication_type=sharing.AuthenticationType.DATABRICKS,
        data_recipient_global_metastore_id=SHARING_IDENTIFIER,
        comment="Espresso AI optimizer service",
    )
    return RECIPIENT_NAME


def grant_recipient_access(client, share_name, recipient_name):
    client.shares.update_permissions(
        name=share_name,
        changes=[sharing.PermissionsChange(principal=recipient_name, add=["SELECT"])],
    )


if __name__ == "__main__":
    client = WorkspaceClient()
    check_permissions(client)

    warehouse_id = get_or_create_warehouse(client)
    catalog_name = get_or_create_catalog(client)
    schema_names = get_or_create_metadata_tables(client, warehouse_id, catalog_name)
    share_name = create_share_with_system_catalog_schemas(client, catalog_name, schema_names)
    recipient_name = get_or_create_recipient(client)
    grant_recipient_access(client, share_name, recipient_name)

    print("\n🎉 Delta Sharing setup complete!")
    print("=" * 50)
    print(f"Share: {share_name}")
    print(f"Recipient: {recipient_name}")
    print(f"Catalog: {catalog_name}")
    print("=" * 50)
```

## Questions?

[Book a Call](https://espresso.ai/demo) or email <savings@espresso.ai>.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs.espresso.ai/databricks-optimizer/databricks-sql-onboarding.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
