# Databricks SQL Onboarding

Espresso AI makes real-time decisions powered by ML models to optimize your Databricks SQL workloads.

Please run the following commands in Databricks SQL. This will set up a user account we can use to access the metadata we need for our models as well as optimize your Databricks SQL account's operations.

Note that we **never** access, log, or store any data from Databricks SQL. We only look at metadata.

Prerequisites: Make sure you are a Databricks workspace admin, account admin, and metastore admin.

## Steps

1. Open your Databricks workspace, create a Python notebook, and attach it to Serverless compute.
2. In the first cell, install/upgrade the SDK:

```python
%pip install databricks-sdk --upgrade
```

3. When finished, create a second cell and restart the Python kernel:

```python
%restart_python
```

4. In the next cell, copy-paste the code below. When you run this, if you have multiple workspaces, you will be prompted to grant a temporary service principal account admin credentials. This simply lets us grant permissions across workspaces, and we delete that temporary service principal immediately in the script. If you only have one workspace to manage, change the first line to `MULTI_WORKSPACE = False` before running:

```python
MULTI_WORKSPACE = True

import json
import time
from datetime import datetime

from databricks.sdk import AccountClient, WorkspaceClient
from databricks.sdk.service import catalog
from databricks.sdk.service.iam import (
    ComplexValue,
    Patch,
    PatchOp,
    PatchSchema,
    WorkspacePermission,
)
from pydantic import BaseModel, field_validator
from pyspark.sql import SparkSession


class DatabricksOAuthToken(BaseModel):
    id: str
    workspace_id: str
    oauth_secret: str
    created_at: datetime
    expires_at: datetime
    client_id: str

    @field_validator("oauth_secret", "client_id", mode="before")
    @classmethod
    def strip_whitespace(cls, v: str) -> str:
        """Strip whitespace from string fields."""
        return v.strip() if isinstance(v, str) else v


class DatabricksCredentials(BaseModel):
    oauth_token: DatabricksOAuthToken
    workspace_url: str
    service_principal_name: str = "espresso-ai-optimizer"
    service_principal_id: str
    warehouse_id: str | None = None
    warehouse_name: str | None = None
    workspace_id: str | None = None
    workspace_name: str | None = None

    def to_json(self) -> str:
        return json.dumps(self.model_dump(mode="json"))


def _is_duplicate_error(e):
    msg = str(e).lower()
    return "already exists" in msg or "duplicate" in msg


def get_or_create_service_principal(client, name="espresso-ai-optimizer"):
    if sps := list(client.service_principals.list(filter=f"displayName eq '{name}'")):
        sp = sps[0]
        client.service_principals.patch(
            id=sp.id,
            operations=[
                Patch(
                    op=PatchOp.ADD,
                    path="entitlements",
                    value=[
                        {"value": "databricks-sql-access"},
                        {"value": "allow-cluster-create"},
                    ],
                ),
            ],
            schemas=[PatchSchema.URN_IETF_PARAMS_SCIM_API_MESSAGES_2_0_PATCH_OP],
        )
        return sp

    return client.service_principals.create(
        display_name=name,
        active=True,
        entitlements=[
            ComplexValue(value="allow-cluster-create"),
            ComplexValue(value="databricks-sql-access"),
        ],
    )


def create_oauth_token(client, service_principal):
    token = client.service_principal_secrets_proxy.create(
        service_principal_id=service_principal.id,
        lifetime=f"{2 * 365 * 24 * 60 * 60}s",  # 2 years
    )
    return DatabricksOAuthToken(
        id=token.id,
        workspace_id=str(client.get_workspace_id()),
        oauth_secret=token.secret,
        created_at=token.create_time,
        expires_at=token.expire_time,
        client_id=service_principal.application_id,
    )


def make_service_principal_workspace_admin(client, service_principal):
    try:
        admin_group_id = next(client.groups.list(filter="displayName eq 'admins'")).id
        client.groups.patch(
            id=admin_group_id,
            operations=[
                Patch(
                    op=PatchOp.ADD,
                    value={"members": [{"value": service_principal.id}]},
                )
            ],
            schemas=[PatchSchema.URN_IETF_PARAMS_SCIM_API_MESSAGES_2_0_PATCH_OP],
        )
    except Exception as e:
        if not _is_duplicate_error(e):
            raise


def get_or_create_warehouse(client):
    for warehouse in client.warehouses.list():
        if warehouse.name == "ESPRESSO_AI_WAREHOUSE":
            return warehouse.id

    return client.warehouses.create_and_wait(
        name="ESPRESSO_AI_WAREHOUSE",
        cluster_size="X-Small",
        auto_stop_mins=1,
        enable_serverless_compute=True,
        min_num_clusters=1,
        max_num_clusters=1,
    ).id


def allow_service_principal_to_read_system_logs(client, service_principal):
    errors = []

    def grant(asset_name, asset_type, privilege):
        try:
            client.grants.update(
                full_name=asset_name,
                securable_type=asset_type.value,
                changes=[
                    catalog.PermissionsChange(
                        add=[privilege], principal=service_principal.application_id
                    )
                ],
            )
        except Exception as e:
            errors.append(str(e).lower())

    grant("system", catalog.SecurableType.CATALOG, catalog.Privilege.USE_CATALOG)
    for schema in client.schemas.list(catalog_name="system"):
        schema_full_name = f"system.{schema.name}"
        grant(schema_full_name, catalog.SecurableType.SCHEMA, catalog.Privilege.USE_SCHEMA)
        grant(schema_full_name, catalog.SecurableType.SCHEMA, catalog.Privilege.SELECT)

    if any("account admin" in e for e in errors):
        print("\n⚠️  ACCOUNT ADMIN required: Ask an account admin to grant you access.")
    if any("manage on catalog" in e or "metastore" in e for e in errors):
        print("⚠️  METASTORE ADMIN required: Visit https://accounts.cloud.databricks.com/data")


def wait_for_account_admin(account_id, client_id, secret):
    deadline = time.monotonic() + 300  # 5 minutes
    while time.monotonic() < deadline:
        try:
            ac = AccountClient(
                host="https://accounts.cloud.databricks.com",
                account_id=account_id,
                client_id=client_id,
                client_secret=secret,
            )
            workspaces = [
                (ws.workspace_id, ws.workspace_name, ws.deployment_name)
                for ws in ac.workspaces.list()
            ]
            print("Account admin access confirmed.")
            return ac, workspaces
        except Exception:
            time.sleep(5)
    raise TimeoutError(
        "Account admin not granted within 5 minutes. Grant the role via the link above and re-run."
    )


def assign_sp_to_workspaces(account_client, sp_id, workspaces):
    for ws_id, ws_name, _ in workspaces:
        deadline = time.monotonic() + 60
        while True:
            try:
                account_client.workspace_assignment.update(
                    workspace_id=ws_id,
                    principal_id=sp_id,
                    permissions=[WorkspacePermission.ADMIN],
                )
                print(f"  Assigned as admin on {ws_name} ({ws_id})")
                break
            except Exception as e:
                if _is_duplicate_error(e):
                    break
                if time.monotonic() >= deadline:
                    raise
                time.sleep(5)


if __name__ == "__main__":
    spark = SparkSession.getActiveSession() or SparkSession.builder.getOrCreate()
    workspace_url = f"https://{spark.conf.get('spark.databricks.workspaceUrl') or ''}"
    client = WorkspaceClient()
    service_principal = get_or_create_service_principal(client)
    oauth_token = create_oauth_token(client, service_principal)
    warehouse_id = get_or_create_warehouse(client)
    make_service_principal_workspace_admin(client, service_principal)
    allow_service_principal_to_read_system_logs(client, service_principal)

    if not MULTI_WORKSPACE:
        credentials = DatabricksCredentials(
            oauth_token=oauth_token,
            workspace_url=workspace_url,
            service_principal_name=service_principal.display_name,
            service_principal_id=service_principal.id,
            warehouse_id=warehouse_id,
            warehouse_name="ESPRESSO_AI_WAREHOUSE",
        )
        print(
            "\n🎉 Setup complete! Here are the Databricks credentials to send to Espresso AI:"
        )
        print("=" * 50)
        print(credentials.to_json())
        print("=" * 50)
    else:
        account_id = spark.sql(
            "SELECT account_id FROM system.billing.usage LIMIT 1"
        ).collect()[0][0]

        temp_sp = get_or_create_service_principal(client, "espresso-ai-temp")
        temp_secret = client.service_principal_secrets_proxy.create(
            service_principal_id=temp_sp.id, lifetime=f"{60 * 60}s"
        ).secret
        roles_url = (
            f"https://accounts.cloud.databricks.com/user-management/"
            f"serviceprincipals/{temp_sp.id}/roles?account_id={account_id}"
        )
        print(
            f"\n📋 To continue, grant the temporary service principal account admin access."
            f"\n   Open this link and add the 'Account admin' role:\n\n   {roles_url}\n"
        )

        account_client, workspaces = wait_for_account_admin(
            account_id, temp_sp.application_id, temp_secret
        )
        assign_sp_to_workspaces(account_client, int(service_principal.id), workspaces)
        client.service_principals.delete(id=temp_sp.id)
        print("Temporary service principal deleted.")

        current_ws_id = str(client.get_workspace_id())
        print("\n🎉 Setup complete! Here are the credentials for each workspace:")
        print("=" * 50)
        for ws_id, ws_name, deployment_name in workspaces:
            is_primary = str(ws_id) == current_ws_id
            cred = DatabricksCredentials(
                oauth_token=oauth_token,
                workspace_url=f"https://{deployment_name}.cloud.databricks.com",
                workspace_id=str(ws_id),
                workspace_name=ws_name,
                service_principal_name=service_principal.display_name,
                service_principal_id=service_principal.id,
                warehouse_id=warehouse_id if is_primary else None,
                warehouse_name="ESPRESSO_AI_WAREHOUSE" if is_primary else None,
            )
            label = f"{ws_name} (primary)" if is_primary else ws_name
            print(f"\n--- {label} ---")
            print(cred.to_json())
        print("\n" + "=" * 50)

```

5. Copy the JSON credentials printed between the ===== lines and share it securely with Espresso AI.

You can securely [upload the output here](https://www.dropbox.com/request/IGcHPWby1x9tPPv8hXWr).

## What the Script Does:

* Create or reuse a service principal named "espresso-ai-optimizer" for Espresso AI.
* Grant that service principal the ability to manage SQL Warehouses.
* Grant SELECT access on Databricks system tables used for usage and cost analysis (e.g. query history, warehouse events, node timeline).
* If we're granting access to multiple workspaces, we create a temporary service principal "espresso-ai-temp" with account admin privileges to grant "espresso-ai-optimizer" the same permissions across the workspaces. We then delete "espresso-ai-temp."
* Print a JSON blob with credentials and your workspace URL for you to share securely with Espresso AI.

## Troubleshooting

The script provides error messages for permission issues:

1. "You are not a workspace admin" → Contact one of the listed admins for access
2. "ACCOUNT ADMIN PERMISSIONS REQUIRED" → You need account admin to create service principals
3. "METASTORE ADMIN PERMISSIONS REQUIRED"  →You need metastore admin to grant system table access
4. "Account admin not granted within 5 minutes. Grant the role via the link above and re-run." → You need to grant the temporary service principal account admin privileges. This SP will be deleted.

Quick permission checks:

* Verify that you started a serverless notebook, not a notebook on a specific cluster
* Verify you're a workspace admin:
  * Click on the circle with your initials
  * Click on "Settings"
  * You should see "Workspace Admin" section in addition to a "User" section.
* Verify you're an account admin:
  * You should be able to login to <https://accounts.cloud.databricks.com/> and see the console to manage your account.
* Verify you're a metastore admin:
  * Visit <https://accounts.cloud.databricks.com/data>
  * Click on the name of the metastore listed, which will open up a "Configuration" page.
  * Set yourself as the "Metastore Admin".

Please rerun the script once you have the required permissions.

## Questions?

[Book a Call](https://espresso.ai/demo)
