From fd72b459ae33ad07c149ce03515c0193846dcc1e Mon Sep 17 00:00:00 2001 From: "jake.ross" Date: Thu, 8 Jan 2026 10:30:59 -0700 Subject: [PATCH 1/2] feat: update Cloud SQL authentication to support IAM and refactor connection logic --- .github/workflows/CD_staging.yml | 13 ++++------- db/engine.py | 37 ++++++++++++++++++++++---------- 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/.github/workflows/CD_staging.yml b/.github/workflows/CD_staging.yml index 02ab90063..c5fc36e9f 100644 --- a/.github/workflows/CD_staging.yml +++ b/.github/workflows/CD_staging.yml @@ -35,7 +35,7 @@ jobs: - name: Authenticate to Google Cloud uses: 'google-github-actions/auth@v2' with: - credentials_json: ${{ secrets.CLOUD_SQL_SERVICE_ACCOUNT_KEY }} + credentials_json: ${{ secrets.CLOUD_DEPLOY_SERVICE_ACCOUNT_KEY }} - name: Run Alembic migrations on staging database env: @@ -43,7 +43,7 @@ jobs: CLOUD_SQL_INSTANCE_NAME: "${{ secrets.CLOUD_SQL_INSTANCE_NAME }}" CLOUD_SQL_DATABASE: "${{ vars.CLOUD_SQL_DATABASE }}" CLOUD_SQL_USER: "${{ secrets.CLOUD_SQL_USER }}" - CLOUD_SQL_PASSWORD: "${{ secrets.CLOUD_SQL_PASSWORD }}" + CLOUD_SQL_IAM_AUTH: true run: | uv run alembic upgrade head @@ -53,17 +53,12 @@ jobs: CLOUD_SQL_INSTANCE_NAME: "${{ secrets.CLOUD_SQL_INSTANCE_NAME }}" CLOUD_SQL_DATABASE: "${{ vars.CLOUD_SQL_DATABASE }}" CLOUD_SQL_USER: "${{ secrets.CLOUD_SQL_USER }}" - CLOUD_SQL_PASSWORD: "${{ secrets.CLOUD_SQL_PASSWORD }}" + CLOUD_SQL_IAM_AUTH: true GCS_SERVICE_ACCOUNT_KEY: "${{ secrets.GCS_SERVICE_ACCOUNT_KEY }}" GCS_BUCKET_NAME: "${{ vars.GCS_BUCKET_NAME }}" run: | uv run python -m transfers.backfill.staging - - name: Authenticate to Google Cloud - uses: 'google-github-actions/auth@v2' - with: - credentials_json: ${{ secrets.CLOUD_DEPLOY_SERVICE_ACCOUNT_KEY }} - # Uses Google Cloud Secret Manager to store secret credentials - name: Create app.yaml run: | @@ -82,7 +77,7 @@ jobs: CLOUD_SQL_INSTANCE_NAME: "${{ secrets.CLOUD_SQL_INSTANCE_NAME }}" CLOUD_SQL_DATABASE: "${{ vars.CLOUD_SQL_DATABASE }}" CLOUD_SQL_USER: "${{ secrets.CLOUD_SQL_USER }}" - CLOUD_SQL_PASSWORD: "${{ secrets.CLOUD_SQL_PASSWORD }}" + CLOUD_SQL_IAM_AUTH: true GCS_SERVICE_ACCOUNT_KEY: "${{ secrets.GCS_SERVICE_ACCOUNT_KEY }}" GCS_BUCKET_NAME: "${{ vars.GCS_BUCKET_NAME }}" AUTHENTIK_URL: "${{ vars.AUTHENTIK_URL }}" diff --git a/db/engine.py b/db/engine.py index ce31aec5f..4cc1d3a3a 100644 --- a/db/engine.py +++ b/db/engine.py @@ -29,6 +29,8 @@ ) from sqlalchemy.util import await_only +from services.util import get_bool_env + load_dotenv() driver = os.environ.get("DB_DRIVER", "") @@ -48,14 +50,19 @@ def asyncify_connection(): user = os.environ.get("CLOUD_SQL_USER") password = os.environ.get("CLOUD_SQL_PASSWORD") database = os.environ.get("CLOUD_SQL_DATABASE") + use_iam_auth = get_bool_env("CLOUD_SQL_IAM_AUTH", False) + ip_type = os.environ.get("CLOUD_SQL_IP_TYPE", "public") - connection = connector.connect_async( - instance_name, - "asyncpg", - db=database, - password=password, - user=user, - ) + connect_kwargs = { + "db": database, + "user": user, + "enable_iam_auth": use_iam_auth, + "ip_type": ip_type, + } + if not use_iam_auth: + connect_kwargs["password"] = password + + connection = connector.connect_async(instance_name, "asyncpg", **connect_kwargs) return AsyncAdapt_asyncpg_connection( engine.dialect.dbapi, @@ -78,15 +85,23 @@ def init_connection_pool(connector): user = os.environ.get("CLOUD_SQL_USER") password = os.environ.get("CLOUD_SQL_PASSWORD") database = os.environ.get("CLOUD_SQL_DATABASE") + use_iam_auth = get_bool_env("CLOUD_SQL_IAM_AUTH", False) + ip_type = os.environ.get("CLOUD_SQL_IP_TYPE", "public") def getconn(): + connect_kwargs = { + "user": user, + "db": database, + "ip_type": ip_type, + "enable_iam_auth": use_iam_auth, + } + if not use_iam_auth: + connect_kwargs["password"] = password + conn = connector.connect( instance_name, # The Cloud SQL instance name "pg8000", - user=user, - password=password, - db=database, - ip_type="public", + **connect_kwargs, ) return conn From afd0995f9d19f809b995fd97ded1618392c4feaa Mon Sep 17 00:00:00 2001 From: "jake.ross" Date: Thu, 8 Jan 2026 20:36:40 -0700 Subject: [PATCH 2/2] feat: implement IAM authentication for Cloud SQL and refactor connection logic --- .gitignore | 2 +- db/engine.py | 33 +++++++++++++++++++++++++++++---- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/.gitignore b/.gitignore index 73d3f2c7e..03f20e83e 100644 --- a/.gitignore +++ b/.gitignore @@ -29,7 +29,7 @@ reset_db.sh tests/uploads migrate.sh launcher.sh -gcs_credentials.json +*credentials.json transfers/data/assets* transfers/data/nma_csv_cache/* transfers/data/*.csv diff --git a/db/engine.py b/db/engine.py index 4cc1d3a3a..4fa1e638d 100644 --- a/db/engine.py +++ b/db/engine.py @@ -15,6 +15,7 @@ # =============================================================================== import asyncio +import copy import getpass import os from contextlib import contextmanager @@ -35,6 +36,26 @@ driver = os.environ.get("DB_DRIVER", "") +def get_iam_login_token() -> str: + """ + Return a short-lived IAM DB auth token for Cloud SQL Postgres. + """ + from google.auth import default + from google.auth.transport.requests import Request + + scopes = ["https://www.googleapis.com/auth/sqlservice.login"] + creds, _ = default() + if hasattr(creds, "with_scopes"): + creds = creds.with_scopes(scopes=scopes) + else: + creds = copy.copy(creds) + creds._scopes = scopes # type: ignore[attr-defined] + creds.refresh(Request()) + if not getattr(creds, "token", None): + raise RuntimeError("Unable to acquire IAM DB auth token.") + return creds.token + + async def get_async_engine(): """ Asynchronous database session generator. @@ -59,7 +80,9 @@ def asyncify_connection(): "enable_iam_auth": use_iam_auth, "ip_type": ip_type, } - if not use_iam_auth: + if use_iam_auth: + connect_kwargs["password"] = get_iam_login_token() + else: connect_kwargs["password"] = password connection = connector.connect_async(instance_name, "asyncpg", **connect_kwargs) @@ -95,7 +118,9 @@ def getconn(): "ip_type": ip_type, "enable_iam_auth": use_iam_auth, } - if not use_iam_auth: + if use_iam_auth: + connect_kwargs["password"] = get_iam_login_token() + else: connect_kwargs["password"] = password conn = connector.connect( @@ -122,7 +147,7 @@ def getconn(): connector = Connector() engine = init_connection_pool(connector) - async_engine = asyncio.run(get_async_engine()) + # async_engine = asyncio.run(get_async_engine()) else: # if driver == "sqlite": @@ -176,7 +201,7 @@ def getconn(): # listen(engine, "connect", on_connect) -async_database_sessionmaker = async_sessionmaker(async_engine) +# async_database_sessionmaker = async_sessionmaker(async_engine) database_sessionmaker = sessionmaker(engine, expire_on_commit=False)