From 04366e8d688a97835ac7649f3ffe26b2d9a704f1 Mon Sep 17 00:00:00 2001 From: "jake.ross" Date: Thu, 8 Jan 2026 20:48:37 -0700 Subject: [PATCH] feat: add IAM authentication support for Cloud SQL connection --- alembic/env.py | 37 +++++++++++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/alembic/env.py b/alembic/env.py index 3d3febdfa..679ebec6f 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -1,3 +1,4 @@ +import copy import os from logging.config import fileConfig @@ -5,6 +6,7 @@ from dotenv import load_dotenv from sqlalchemy import engine_from_config, pool, create_engine +from services.util import get_bool_env # this is the Alembic Config object, which provides # access to the values within the .ini file in use. @@ -46,7 +48,10 @@ def build_database_url(): user = os.environ.get("CLOUD_SQL_USER", "") password = os.environ.get("CLOUD_SQL_PASSWORD", "") database = os.environ.get("CLOUD_SQL_DATABASE", "") + use_iam_auth = get_bool_env("CLOUD_SQL_IAM_AUTH", False) # Host is provided by connector, so leave blank. + if use_iam_auth: + return f"postgresql+pg8000://{user}@/{database}" return f"postgresql+pg8000://{user}:{password}@/{database}" # Default/Postgres @@ -96,22 +101,46 @@ def run_migrations_online() -> None: if db_driver == "cloudsql": # Use the Cloud SQL Python Connector for direct Cloud SQL access. from google.cloud.sql.connector import Connector + from google.auth import default + from google.auth.transport.requests import Request instance_name = os.environ.get("CLOUD_SQL_INSTANCE_NAME") user = os.environ.get("CLOUD_SQL_USER") password = os.environ.get("CLOUD_SQL_PASSWORD") database = os.environ.get("CLOUD_SQL_DATABASE") + use_iam_auth = get_bool_env("CLOUD_SQL_IAM_AUTH", False) + ip_type = os.environ.get("CLOUD_SQL_IP_TYPE", "public") connector = Connector() + def get_iam_login_token() -> str: + scopes = ["https://www.googleapis.com/auth/sqlservice.login"] + creds, _ = default() + if hasattr(creds, "with_scopes"): + creds = creds.with_scopes(scopes=scopes) + else: + creds = copy.copy(creds) + creds._scopes = scopes # type: ignore[attr-defined] + creds.refresh(Request()) + if not getattr(creds, "token", None): + raise RuntimeError("Unable to acquire IAM DB auth token.") + return creds.token + def getconn(): + connect_kwargs = { + "user": user, + "db": database, + "ip_type": ip_type, + "enable_iam_auth": use_iam_auth, + } + if use_iam_auth: + connect_kwargs["password"] = get_iam_login_token() + else: + connect_kwargs["password"] = password return connector.connect( instance_name, "pg8000", - user=user, - password=password, - db=database, - ip_type="public", + **connect_kwargs, ) connectable = create_engine(