diff --git a/.github/workflows/CD_staging.yml b/.github/workflows/CD_staging.yml index f72bd9d9c..519288372 100644 --- a/.github/workflows/CD_staging.yml +++ b/.github/workflows/CD_staging.yml @@ -37,6 +37,26 @@ jobs: with: credentials_json: ${{ secrets.CLOUD_DEPLOY_SERVICE_ACCOUNT_KEY }} + - name: Run Alembic migrations on staging database + env: + DB_DRIVER: "cloudsql" + CLOUD_SQL_INSTANCE_NAME: "${{ secrets.CLOUD_SQL_INSTANCE_NAME }}" + CLOUD_SQL_DATABASE: "${{ vars.CLOUD_SQL_DATABASE }}" + CLOUD_SQL_USER: "${{ secrets.CLOUD_SQL_USER }}" + CLOUD_SQL_PASSWORD: "${{ secrets.CLOUD_SQL_PASSWORD }}" + run: | + uv run alembic upgrade head + + - name: Run backfill script on staging database + env: + DB_DRIVER: "cloudsql" + CLOUD_SQL_INSTANCE_NAME: "${{ secrets.CLOUD_SQL_INSTANCE_NAME }}" + CLOUD_SQL_DATABASE: "${{ vars.CLOUD_SQL_DATABASE }}" + CLOUD_SQL_USER: "${{ secrets.CLOUD_SQL_USER }}" + CLOUD_SQL_PASSWORD: "${{ secrets.CLOUD_SQL_PASSWORD }}" + run: | + uv run python transfers/backfill/staging.py + # Uses Google Cloud Secret Manager to store secret credentials - name: Create app.yaml run: | diff --git a/alembic/env.py b/alembic/env.py index d02a07101..3d3febdfa 100644 --- a/alembic/env.py +++ b/alembic/env.py @@ -1,9 +1,9 @@ +import os +from logging.config import fileConfig + from alembic import context from dotenv import load_dotenv -from logging.config import fileConfig -from os import environ -from sqlalchemy import engine_from_config -from sqlalchemy import pool +from sqlalchemy import engine_from_config, pool, create_engine # this is the Alembic Config object, which provides @@ -33,15 +33,33 @@ load_dotenv() -# Fallback to environment variables for PostgreSQL connection -user = environ.get("POSTGRES_USER", None) -password = environ.get("POSTGRES_PASSWORD", None) -db = environ.get("POSTGRES_DB", None) -host = environ.get("POSTGRES_HOST", "localhost") -port = environ.get("POSTGRES_PORT", 5432) -SQLALCHEMY_DATABASE_URL = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}" -config.set_main_option("sqlalchemy.url", SQLALCHEMY_DATABASE_URL) +def build_database_url(): + """ + Build a SQLAlchemy URL based on driver/env vars. + For cloudsql we still return a pg8000 URL (hostless) so Alembic can render + offline migrations; the actual connection uses a Connector creator in + run_migrations_online. + """ + db_driver = os.environ.get("DB_DRIVER", "").lower() + if db_driver == "cloudsql": + user = os.environ.get("CLOUD_SQL_USER", "") + password = os.environ.get("CLOUD_SQL_PASSWORD", "") + database = os.environ.get("CLOUD_SQL_DATABASE", "") + # Host is provided by connector, so leave blank. + return f"postgresql+pg8000://{user}:{password}@/{database}" + + # Default/Postgres + user = os.environ.get("POSTGRES_USER", "") + password = os.environ.get("POSTGRES_PASSWORD", "") + db = os.environ.get("POSTGRES_DB", "") + host = os.environ.get("POSTGRES_HOST", "localhost") + port = os.environ.get("POSTGRES_PORT", 5432) + return f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}" + + +url = build_database_url() +config.set_main_option("sqlalchemy.url", url) def include_object(object, name, type_, reflected, compare_to): @@ -73,11 +91,41 @@ def run_migrations_online() -> None: and associate a connection with the context. """ - connectable = engine_from_config( - config.get_section(config.config_ini_section, {}), - prefix="sqlalchemy.", - poolclass=pool.NullPool, - ) + db_driver = os.environ.get("DB_DRIVER", "").lower() + + if db_driver == "cloudsql": + # Use the Cloud SQL Python Connector for direct Cloud SQL access. + from google.cloud.sql.connector import Connector + + instance_name = os.environ.get("CLOUD_SQL_INSTANCE_NAME") + user = os.environ.get("CLOUD_SQL_USER") + password = os.environ.get("CLOUD_SQL_PASSWORD") + database = os.environ.get("CLOUD_SQL_DATABASE") + + connector = Connector() + + def getconn(): + return connector.connect( + instance_name, + "pg8000", + user=user, + password=password, + db=database, + ip_type="public", + ) + + connectable = create_engine( + "postgresql+pg8000://", + creator=getconn, + pool_pre_ping=True, + poolclass=pool.NullPool, + ) + else: + connectable = engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) with connectable.connect() as connection: context.configure( diff --git a/alembic/versions/2101e0b029dc_make_location_description_nullable.py b/alembic/versions/2101e0b029dc_make_location_description_nullable.py new file mode 100644 index 000000000..efe5b413f --- /dev/null +++ b/alembic/versions/2101e0b029dc_make_location_description_nullable.py @@ -0,0 +1,52 @@ +"""Make location description nullable + +Revision ID: 2101e0b029dc +Revises: 66ac1af4ba69 +Create Date: 2026-01-02 23:19:38.901275 + +""" + +from typing import Sequence, Union + +from alembic import op +import geoalchemy2 +import sqlalchemy as sa +import sqlalchemy_utils + + +def _column_exists(bind, table: str, column: str) -> bool: + inspector = sa.inspect(bind) + cols = [c["name"] for c in inspector.get_columns(table)] + return column in cols + + +# revision identifiers, used by Alembic. +revision: str = "2101e0b029dc" +down_revision: Union[str, Sequence[str], None] = "66ac1af4ba69" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Upgrade schema. + + Makes the location.description column nullable to accommodate + legacy data from MS Access that may not have descriptions. + """ + bind = op.get_bind() + if _column_exists(bind, "location", "description"): + op.alter_column( + "location", "description", existing_type=sa.String(), nullable=True + ) + else: + # If the column is absent (non-standard schema), skip the alteration. + pass + + +def downgrade() -> None: + """Downgrade schema.""" + bind = op.get_bind() + if _column_exists(bind, "location", "description"): + op.alter_column( + "location", "description", existing_type=sa.String(), nullable=False + ) diff --git a/alembic/versions/66ac1af4ba69_initial_migration.py b/alembic/versions/66ac1af4ba69_initial_migration.py index f8813f9db..ff56eff1a 100644 --- a/alembic/versions/66ac1af4ba69_initial_migration.py +++ b/alembic/versions/66ac1af4ba69_initial_migration.py @@ -18,405 +18,10 @@ depends_on: Union[str, Sequence[str], None] = None -from db import * # Import your Base from models/__init__.py -from db.engine import engine - -configure_mappers() - -Base.metadata.drop_all(engine) -Base.metadata.create_all(engine) - - def upgrade() -> None: """Upgrade schema.""" pass - - # The autogenerated code below is commented out to prevent accidental execution. - # It is here as a record of the initial database state. - # Actual initial database creation should be done through the Base.metadata.create_all(engine) call above. - - """ - TODO - The following code will need to be regenerated by Alembic since configure_mappers() is now called - in db/__init__.py to ensure all models are loaded before creating the database schema. This is - require for SQL Alchemy continuum. - - The following code will also need to be added: - - - op.drop_index("idx_location_version_point", table_name="location_version", if_exists=True) - - before calling op.create_index("idx_location_version_point", "location_version", ["point"], unique=False, postgresql_using="gist",) - - op.drop_index("idx_location_point", table_name="location", if_exists=True) - - before calling op.create_index("idx_location_point", "location", ["point"], unique=False, postgresql_using="gist",) - - We will also need to figure out how to handle the SQL Alchemy searchable columns in the models, as they are not currently handled by Alembic. - There is some documentation about sync_triggers, but that has not yet been tested. - """ - - # ### commands auto generated by Alembic - please adjust! ### - # op.create_table('asset', - # sa.Column('name', sa.String(), nullable=False), - # sa.Column('label', sa.String(), nullable=True), - # sa.Column('storage_service', sa.String(), nullable=False), - # sa.Column('storage_path', sa.String(), nullable=False), - # sa.Column('mime_type', sa.String(), nullable=False), - # sa.Column('size', sa.Integer(), nullable=False), - # sa.Column('search_vector', sqlalchemy_utils.types.ts_vector.TSVectorType(), nullable=True), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_index('ix_asset_search_vector', 'asset', ['search_vector'], unique=False, postgresql_using='gin') - # op.create_table('group', - # sa.Column('name', sa.String(length=100), nullable=False), - # sa.Column('description', sa.String(length=255), nullable=True), - # sa.Column('parent_group_id', sa.Integer(), nullable=True), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.ForeignKeyConstraint(['parent_group_id'], ['group.id'], ondelete='CASCADE'), - # sa.PrimaryKeyConstraint('id'), - # sa.UniqueConstraint('name') - # ) - # op.create_table('lexicon_category', - # sa.Column('name', sa.String(length=100), nullable=False), - # sa.Column('description', sa.String(length=255), nullable=True), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.PrimaryKeyConstraint('id'), - # sa.UniqueConstraint('name') - # ) - # op.create_table('lexicon_term', - # sa.Column('term', sa.String(length=100), nullable=False), - # sa.Column('definition', sa.String(length=255), nullable=False), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.PrimaryKeyConstraint('id'), - # sa.UniqueConstraint('term') - # ) - # op.create_table('pub_author', - # sa.Column('name', sa.String(), nullable=False), - # sa.Column('affiliation', sa.String(), nullable=True), - # sa.Column('search_vector', sqlalchemy_utils.types.ts_vector.TSVectorType(), nullable=True), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_index('ix_pub_author_search_vector', 'pub_author', ['search_vector'], unique=False, postgresql_using='gin') - # op.create_table('sensor', - # sa.Column('name', sa.String(length=255), nullable=False), - # sa.Column('model', sa.String(length=50), nullable=True), - # sa.Column('serial_no', sa.String(length=50), nullable=True), - # sa.Column('date_installed', sa.DateTime(), nullable=True), - # sa.Column('date_removed', sa.DateTime(), nullable=True), - # sa.Column('recording_interval', sa.Integer(), nullable=True), - # sa.Column('notes', sa.String(length=50), nullable=True), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_table('user', - # sa.Column('id', sa.Integer(), nullable=False), - # sa.Column('username', sa.String(length=255), nullable=False), - # sa.Column('password', sa.String(length=255), nullable=False), - # sa.Column('is_superuser', sa.Boolean(), nullable=False), - # sa.Column('is_active', sa.Boolean(), nullable=False), - # sa.Column('avatar_url', sa.Text(), nullable=True), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_table('contact', - # sa.Column('name', sa.String(length=100), nullable=False), - # sa.Column('role', sa.String(length=100), nullable=False), - # sa.Column('search_vector', sqlalchemy_utils.types.ts_vector.TSVectorType(), nullable=True), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.ForeignKeyConstraint(['role'], ['lexicon_term.term'], ), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_index('ix_contact_search_vector', 'contact', ['search_vector'], unique=False, postgresql_using='gin') - # op.create_table('geochronology_age', - # sa.Column('location_id', sa.Integer(), nullable=False), - # sa.Column('age', sa.Float(), nullable=False), - # sa.Column('age_error', sa.Float(), nullable=True), - # sa.Column('method', sa.String(length=100), nullable=True), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.ForeignKeyConstraint(['method'], ['lexicon_term.term'], ), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_table('groundwater_level_sensor', - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.Column('sensor_id', sa.Integer(), nullable=False), - # sa.ForeignKeyConstraint(['sensor_id'], ['sensor.id'], ondelete='CASCADE'), - # sa.PrimaryKeyConstraint('id'), - # sa.UniqueConstraint('sensor_id') - # ) - # op.create_table('lexicon_term_category_association', - # sa.Column('lexicon_term', sa.String(length=100), nullable=False), - # sa.Column('category_name', sa.String(length=255), nullable=False), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.ForeignKeyConstraint(['category_name'], ['lexicon_category.name'], ondelete='CASCADE'), - # sa.ForeignKeyConstraint(['lexicon_term'], ['lexicon_term.term'], ondelete='CASCADE'), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_table('lexicon_triple', - # sa.Column('subject', sa.String(length=100), nullable=False), - # sa.Column('predicate', sa.String(length=100), nullable=False), - # sa.Column('object_', sa.String(length=100), nullable=False), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.ForeignKeyConstraint(['object_'], ['lexicon_term.term'], ondelete='CASCADE'), - # sa.ForeignKeyConstraint(['subject'], ['lexicon_term.term'], ondelete='CASCADE'), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_table('location', - # sa.Column('name', sa.String(length=255), nullable=True), - # sa.Column('notes', sa.Text(), nullable=True), - # sa.Column('point', geoalchemy2.types.Geometry(geometry_type='POINT', srid=4326, from_text='ST_GeomFromEWKT', name='geometry', nullable=False), nullable=False), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.Column('release_status', sa.String(length=100), nullable=True), - # sa.ForeignKeyConstraint(['release_status'], ['lexicon_term.term'], ), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_index('idx_location_point', 'location', ['point'], unique=False, postgresql_using='gist') - # op.create_table('publication', - # sa.Column('title', sa.Text(), nullable=False), - # sa.Column('abstract', sa.Text(), nullable=True), - # sa.Column('doi', sa.String(), nullable=True), - # sa.Column('year', sa.Integer(), nullable=True), - # sa.Column('publisher', sa.String(), nullable=True), - # sa.Column('url', sa.String(), nullable=True), - # sa.Column('publication_type', sa.String(length=100), nullable=False), - # sa.Column('search_vector', sqlalchemy_utils.types.ts_vector.TSVectorType(), nullable=True), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.ForeignKeyConstraint(['publication_type'], ['lexicon_term.term'], ), - # sa.PrimaryKeyConstraint('id'), - # sa.UniqueConstraint('doi') - # ) - # op.create_index('ix_publication_search_vector', 'publication', ['search_vector'], unique=False, postgresql_using='gin') - # op.create_table('thing', - # sa.Column('name', sa.String(length=255), nullable=False), - # sa.Column('description', sa.String(length=500), nullable=True), - # sa.Column('thing_type', sa.String(length=100), nullable=True), - # sa.Column('spring_type', sa.String(length=100), nullable=True), - # sa.Column('well_depth', sa.Float(), nullable=True), - # sa.Column('hole_depth', sa.Float(), nullable=True), - # sa.Column('well_purpose', sa.String(length=100), nullable=True), - # sa.Column('well_casing_diameter', sa.Float(), nullable=True), - # sa.Column('well_casing_depth', sa.Float(), nullable=True), - # sa.Column('well_casing_description', sa.String(length=50), nullable=True), - # sa.Column('well_construction_notes', sa.String(length=250), nullable=True), - # sa.Column('search_vector', sqlalchemy_utils.types.ts_vector.TSVectorType(), nullable=True), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.Column('release_status', sa.String(length=100), nullable=True), - # sa.ForeignKeyConstraint(['release_status'], ['lexicon_term.term'], ), - # sa.ForeignKeyConstraint(['spring_type'], ['lexicon_term.term'], ), - # sa.ForeignKeyConstraint(['thing_type'], ['lexicon_term.term'], ), - # sa.ForeignKeyConstraint(['well_purpose'], ['lexicon_term.term'], ), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_index('ix_thing_search_vector', 'thing', ['search_vector'], unique=False, postgresql_using='gin') - # op.create_table('address', - # sa.Column('contact_id', sa.Integer(), nullable=False), - # sa.Column('address_line_1', sa.String(length=255), nullable=False), - # sa.Column('address_line_2', sa.String(length=255), nullable=True), - # sa.Column('city', sa.String(length=100), nullable=False), - # sa.Column('state', sa.String(length=50), nullable=False), - # sa.Column('postal_code', sa.String(length=20), nullable=False), - # sa.Column('country', sa.String(length=100), nullable=False), - # sa.Column('address_type', sa.String(length=100), nullable=False), - # sa.Column('search_vector', sqlalchemy_utils.types.ts_vector.TSVectorType(), nullable=True), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.ForeignKeyConstraint(['address_type'], ['lexicon_term.term'], ), - # sa.ForeignKeyConstraint(['contact_id'], ['contact.id'], ondelete='CASCADE'), - # sa.ForeignKeyConstraint(['country'], ['lexicon_term.term'], ), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_index('ix_address_search_vector', 'address', ['search_vector'], unique=False, postgresql_using='gin') - # op.create_table('asset_thing_association', - # sa.Column('asset_id', sa.Integer(), nullable=False), - # sa.Column('thing_id', sa.Integer(), nullable=False), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.ForeignKeyConstraint(['asset_id'], ['asset.id'], ondelete='CASCADE'), - # sa.ForeignKeyConstraint(['thing_id'], ['thing.id'], ondelete='CASCADE'), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_table('collaborative_network_well', - # sa.Column('actively_monitored', sa.Boolean(), nullable=False), - # sa.Column('thing_id', sa.Integer(), nullable=False), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.ForeignKeyConstraint(['thing_id'], ['thing.id'], ondelete='CASCADE'), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_table('email', - # sa.Column('contact_id', sa.Integer(), nullable=False), - # sa.Column('email', sa.String(length=100), nullable=False), - # sa.Column('email_type', sa.String(length=100), nullable=False), - # sa.Column('search_vector', sqlalchemy_utils.types.ts_vector.TSVectorType(), nullable=True), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.ForeignKeyConstraint(['contact_id'], ['contact.id'], ondelete='CASCADE'), - # sa.ForeignKeyConstraint(['email_type'], ['lexicon_term.term'], ), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_index('ix_email_search_vector', 'email', ['search_vector'], unique=False, postgresql_using='gin') - # op.create_table('group_thing_association', - # sa.Column('group_id', sa.Integer(), nullable=False), - # sa.Column('thing_id', sa.Integer(), nullable=False), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.ForeignKeyConstraint(['group_id'], ['group.id'], ondelete='CASCADE'), - # sa.ForeignKeyConstraint(['thing_id'], ['thing.id'], ondelete='CASCADE'), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_table('location_thing_association', - # sa.Column('location_id', sa.Integer(), nullable=False), - # sa.Column('thing_id', sa.Integer(), nullable=False), - # sa.Column('effective_start', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.Column('effective_end', sa.DateTime(), nullable=True), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.ForeignKeyConstraint(['location_id'], ['location.id'], ondelete='CASCADE'), - # sa.ForeignKeyConstraint(['thing_id'], ['thing.id'], ondelete='CASCADE'), - # sa.PrimaryKeyConstraint('location_id', 'thing_id', 'id') - # ) - # op.create_table('phone', - # sa.Column('contact_id', sa.Integer(), nullable=False), - # sa.Column('phone_number', sa.String(length=20), nullable=False), - # sa.Column('phone_type', sa.String(length=100), nullable=False), - # sa.Column('search_vector', sqlalchemy_utils.types.ts_vector.TSVectorType(), nullable=True), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.ForeignKeyConstraint(['contact_id'], ['contact.id'], ondelete='CASCADE'), - # sa.ForeignKeyConstraint(['phone_type'], ['lexicon_term.term'], ), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_index('ix_phone_search_vector', 'phone', ['search_vector'], unique=False, postgresql_using='gin') - # op.create_table('pub_author_contact_association', - # sa.Column('author_id', sa.Integer(), nullable=False), - # sa.Column('contact_id', sa.Integer(), nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.ForeignKeyConstraint(['author_id'], ['pub_author.id'], ondelete='CASCADE'), - # sa.ForeignKeyConstraint(['contact_id'], ['contact.id'], ondelete='CASCADE'), - # sa.PrimaryKeyConstraint('author_id', 'contact_id') - # ) - # op.create_table('pub_author_publication_association', - # sa.Column('publication_id', sa.Integer(), nullable=False), - # sa.Column('author_id', sa.Integer(), nullable=False), - # sa.Column('author_order', sa.Integer(), nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.ForeignKeyConstraint(['author_id'], ['pub_author.id'], ondelete='CASCADE'), - # sa.ForeignKeyConstraint(['publication_id'], ['publication.id'], ondelete='CASCADE'), - # sa.PrimaryKeyConstraint('publication_id', 'author_id') - # ) - # op.create_table('sample', - # sa.Column('collection_timestamp', sa.DateTime(), nullable=False), - # sa.Column('collection_method', sa.String(length=100), nullable=False), - # sa.Column('thing_id', sa.Integer(), nullable=False), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.ForeignKeyConstraint(['collection_method'], ['lexicon_term.term'], ), - # sa.ForeignKeyConstraint(['thing_id'], ['thing.id'], ondelete='CASCADE'), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_table('series', - # sa.Column('observed_property', sa.String(length=100), nullable=False), - # sa.Column('unit', sa.String(length=100), nullable=False), - # sa.Column('name', sa.String(length=255), nullable=False), - # sa.Column('description', sa.Text(), nullable=True), - # sa.Column('sensor_id', sa.Integer(), nullable=True), - # sa.Column('thing_id', sa.Integer(), nullable=False), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.Column('release_status', sa.String(length=100), nullable=True), - # sa.ForeignKeyConstraint(['observed_property'], ['lexicon_term.term'], ), - # sa.ForeignKeyConstraint(['release_status'], ['lexicon_term.term'], ), - # sa.ForeignKeyConstraint(['sensor_id'], ['sensor.id'], ondelete='CASCADE'), - # sa.ForeignKeyConstraint(['thing_id'], ['thing.id'], ondelete='CASCADE'), - # sa.ForeignKeyConstraint(['unit'], ['lexicon_term.term'], ), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_table('thing_contact_association', - # sa.Column('thing_id', sa.Integer(), nullable=False), - # sa.Column('contact_id', sa.Integer(), nullable=False), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.ForeignKeyConstraint(['contact_id'], ['contact.id'], ), - # sa.ForeignKeyConstraint(['thing_id'], ['thing.id'], ), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_table('thing_id_link', - # sa.Column('thing_id', sa.Integer(), nullable=True), - # sa.Column('relation', sa.String(length=100), nullable=False), - # sa.Column('alternate_id', sa.String(length=100), nullable=False), - # sa.Column('alternate_organization', sa.String(length=100), nullable=False), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.ForeignKeyConstraint(['alternate_organization'], ['lexicon_term.term'], ), - # sa.ForeignKeyConstraint(['relation'], ['lexicon_term.term'], ), - # sa.ForeignKeyConstraint(['thing_id'], ['thing.id'], ondelete='CASCADE'), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_table('well_screen', - # sa.Column('thing_id', sa.Integer(), nullable=False), - # sa.Column('screen_depth_top', sa.Float(), nullable=False), - # sa.Column('screen_depth_bottom', sa.Float(), nullable=False), - # sa.Column('screen_type', sa.String(length=100), nullable=True), - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.ForeignKeyConstraint(['screen_type'], ['lexicon_term.term'], ), - # sa.ForeignKeyConstraint(['thing_id'], ['thing.id'], ondelete='CASCADE'), - # sa.PrimaryKeyConstraint('id') - # ) - # op.create_table('geochemical_series', - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.Column('series_id', sa.Integer(), nullable=False), - # sa.ForeignKeyConstraint(['series_id'], ['series.id'], ondelete='CASCADE'), - # sa.PrimaryKeyConstraint('id'), - # sa.UniqueConstraint('series_id') - # ) - # op.create_table('geothermal_series', - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.Column('series_id', sa.Integer(), nullable=False), - # sa.ForeignKeyConstraint(['series_id'], ['series.id'], ondelete='CASCADE'), - # sa.PrimaryKeyConstraint('id'), - # sa.UniqueConstraint('series_id') - # ) - # op.create_table('groundwater_level_series', - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.Column('series_id', sa.Integer(), nullable=False), - # sa.ForeignKeyConstraint(['series_id'], ['series.id'], ondelete='CASCADE'), - # sa.PrimaryKeyConstraint('id'), - # sa.UniqueConstraint('series_id') - # ) - # op.create_table('observation', - # sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), - # sa.Column('series_id', sa.Integer(), nullable=False), - # sa.Column('observation_datetime', sa.TIMESTAMP(), nullable=False), - # sa.Column('observation_type', sa.String(length=100), nullable=True), - # sa.Column('depth_to_water', sa.Float(), nullable=True), - # sa.Column('measuring_point_height', sa.Float(), nullable=True), - # sa.Column('level_status', sa.String(length=100), nullable=True), - # sa.Column('depth', sa.Float(), nullable=True), - # sa.Column('temperature', sa.Float(), nullable=True), - # sa.Column('created_at', sa.DateTime(), server_default=sa.text('now()'), nullable=False), - # sa.Column('release_status', sa.String(length=100), nullable=True), - # sa.ForeignKeyConstraint(['level_status'], ['lexicon_term.term'], ), - # sa.ForeignKeyConstraint(['observation_type'], ['lexicon_term.term'], ), - # sa.ForeignKeyConstraint(['release_status'], ['lexicon_term.term'], ), - # sa.ForeignKeyConstraint(['series_id'], ['series.id'], ondelete='CASCADE'), - # sa.PrimaryKeyConstraint('id', 'observation_datetime') - # ) # ### end Alembic commands ### @@ -424,46 +29,5 @@ def downgrade() -> None: """Downgrade schema.""" pass # ### commands auto generated by Alembic - please adjust! ### - # op.drop_table('observation') - # op.drop_table('groundwater_level_series') - # op.drop_table('geothermal_series') - # op.drop_table('geochemical_series') - # op.drop_table('well_screen') - # op.drop_table('thing_id_link') - # op.drop_table('thing_contact_association') - # op.drop_table('series') - # op.drop_table('sample') - # op.drop_table('pub_author_publication_association') - # op.drop_table('pub_author_contact_association') - # op.drop_index('ix_phone_search_vector', table_name='phone', postgresql_using='gin') - # op.drop_table('phone') - # op.drop_table('location_thing_association') - # op.drop_table('group_thing_association') - # op.drop_index('ix_email_search_vector', table_name='email', postgresql_using='gin') - # op.drop_table('email') - # op.drop_table('collaborative_network_well') - # op.drop_table('asset_thing_association') - # op.drop_index('ix_address_search_vector', table_name='address', postgresql_using='gin') - # op.drop_table('address') - # op.drop_index('ix_thing_search_vector', table_name='thing', postgresql_using='gin') - # op.drop_table('thing') - # op.drop_index('ix_publication_search_vector', table_name='publication', postgresql_using='gin') - # op.drop_table('publication') - # op.drop_index('idx_location_point', table_name='location', postgresql_using='gist') - # op.drop_table('location') - # op.drop_table('lexicon_triple') - # op.drop_table('lexicon_term_category_association') - # op.drop_table('groundwater_level_sensor') - # op.drop_table('geochronology_age') - # op.drop_index('ix_contact_search_vector', table_name='contact', postgresql_using='gin') - # op.drop_table('contact') - # op.drop_table('user') - # op.drop_table('sensor') - # op.drop_index('ix_pub_author_search_vector', table_name='pub_author', postgresql_using='gin') - # op.drop_table('pub_author') - # op.drop_table('lexicon_term') - # op.drop_table('lexicon_category') - # op.drop_table('group') - # op.drop_index('ix_asset_search_vector', table_name='asset', postgresql_using='gin') - # op.drop_table('asset') + # ### end Alembic commands ### diff --git a/alembic/versions/7c02d9f8f412_create_nmawaterlevelscontinuouspressuredaily.py b/alembic/versions/7c02d9f8f412_create_nmawaterlevelscontinuouspressuredaily.py new file mode 100644 index 000000000..680d5f8d8 --- /dev/null +++ b/alembic/versions/7c02d9f8f412_create_nmawaterlevelscontinuouspressuredaily.py @@ -0,0 +1,55 @@ +"""Create legacy NMAWaterLevelsContinuousPressureDaily table. + +Revision ID: 7c02d9f8f412 +Revises: 2101e0b029dc +Create Date: 2026-01-09 00:00:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy import inspect + +# revision identifiers, used by Alembic. +revision: str = "7c02d9f8f412" +down_revision: Union[str, Sequence[str], None] = "2101e0b029dc" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create the legacy daily pressure table used for backfill.""" + bind = op.get_bind() + inspector = inspect(bind) + if not inspector.has_table("NMA_WaterLevelsContinuous_Pressure_Daily"): + op.create_table( + "NMA_WaterLevelsContinuous_Pressure_Daily", + sa.Column("GlobalID", sa.String(length=40), primary_key=True), + sa.Column("OBJECTID", sa.Integer(), autoincrement=True, nullable=True), + sa.Column("WellID", sa.String(length=40), nullable=True), + sa.Column("PointID", sa.String(length=50), nullable=True), + sa.Column("DateMeasured", sa.DateTime(), nullable=False), + sa.Column("TemperatureWater", sa.Float(), nullable=True), + sa.Column("WaterHead", sa.Float(), nullable=True), + sa.Column("WaterHeadAdjusted", sa.Float(), nullable=True), + sa.Column("DepthToWaterBGS", sa.Float(), nullable=True), + sa.Column("MeasurementMethod", sa.String(length=2), nullable=True), + sa.Column("DataSource", sa.String(length=5), nullable=True), + sa.Column("MeasuringAgency", sa.String(length=50), nullable=True), + sa.Column("QCed", sa.Boolean(), nullable=True), + sa.Column("Notes", sa.String(length=100), nullable=True), + sa.Column("Created", sa.DateTime(), nullable=False), + sa.Column("Updated", sa.DateTime(), nullable=False), + sa.Column("ProcessedBy", sa.String(length=4), nullable=True), + sa.Column("CheckedBy", sa.String(length=4), nullable=True), + sa.Column("CONDDL (mS/cm)", sa.Float(), nullable=True), + ) + + +def downgrade() -> None: + """Drop the legacy daily pressure table.""" + bind = op.get_bind() + inspector = inspect(bind) + if inspector.has_table("NMA_WaterLevelsContinuous_Pressure_Daily"): + op.drop_table("NMA_WaterLevelsContinuous_Pressure_Daily") diff --git a/alembic/versions/8a1de3e3f0b3_add_ngwmn_unique_constraints.py b/alembic/versions/8a1de3e3f0b3_add_ngwmn_unique_constraints.py new file mode 100644 index 000000000..3ac2e0631 --- /dev/null +++ b/alembic/versions/8a1de3e3f0b3_add_ngwmn_unique_constraints.py @@ -0,0 +1,55 @@ +"""Add unique constraints for NGWMN backfill upserts + +Revision ID: 8a1de3e3f0b3 +Revises: 9c0f061c8322 +Create Date: 2026-02-10 00:10:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision: str = "8a1de3e3f0b3" +down_revision: Union[str, Sequence[str], None] = "9c0f061c8322" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Add unique constraints to support ON CONFLICT upserts.""" + op.create_unique_constraint( + "uq_nma_view_ngwmn_waterlevels_point_date", + "NMA_view_NGWMN_WaterLevels", + ["PointID", "DateMeasured"], + ) + op.create_unique_constraint( + "uq_nma_view_ngwmn_wellconstruction_point_casing_screen", + "NMA_view_NGWMN_WellConstruction", + ["PointID", "CasingTop", "ScreenTop"], + ) + op.create_unique_constraint( + "uq_nma_view_ngwmn_lithology_objectid", + "NMA_view_NGWMN_Lithology", + ["OBJECTID"], + ) + + +def downgrade() -> None: + """Drop unique constraints.""" + op.drop_constraint( + "uq_nma_view_ngwmn_lithology_objectid", + "NMA_view_NGWMN_Lithology", + type_="unique", + ) + op.drop_constraint( + "uq_nma_view_ngwmn_wellconstruction_point_casing_screen", + "NMA_view_NGWMN_WellConstruction", + type_="unique", + ) + op.drop_constraint( + "uq_nma_view_ngwmn_waterlevels_point_date", + "NMA_view_NGWMN_WaterLevels", + type_="unique", + ) diff --git a/alembic/versions/9c0f061c8322_create_ngwmn_legacy_views.py b/alembic/versions/9c0f061c8322_create_ngwmn_legacy_views.py new file mode 100644 index 000000000..b2ceda3e7 --- /dev/null +++ b/alembic/versions/9c0f061c8322_create_ngwmn_legacy_views.py @@ -0,0 +1,79 @@ +"""Create legacy NGWMN view tables + +Revision ID: 9c0f061c8322 +Revises: 7c02d9f8f412 +Create Date: 2026-02-10 00:00:00.000000 +""" + +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy import inspect + +# revision identifiers, used by Alembic. +revision: str = "9c0f061c8322" +down_revision: Union[str, Sequence[str], None] = "7c02d9f8f412" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + """Create the three NGWMN legacy view tables.""" + bind = op.get_bind() + inspector = inspect(bind) + + if not inspector.has_table("NMA_view_NGWMN_WellConstruction"): + op.create_table( + "NMA_view_NGWMN_WellConstruction", + sa.Column("id", sa.Integer(), primary_key=True, autoincrement=True), + sa.Column("PointID", sa.String(length=50), nullable=True), + sa.Column("CasingTop", sa.Float(), nullable=True), + sa.Column("CasingBottom", sa.Float(), nullable=True), + sa.Column("CasingDepthUnits", sa.String(length=20), nullable=True), + sa.Column("ScreenTop", sa.Float(), nullable=True), + sa.Column("ScreenBottom", sa.Float(), nullable=True), + sa.Column("ScreenBottomUnit", sa.String(length=20), nullable=True), + sa.Column("ScreenDescription", sa.String(length=250), nullable=True), + sa.Column("CasingDescription", sa.String(length=250), nullable=True), + ) + + if not inspector.has_table("NMA_view_NGWMN_WaterLevels"): + op.create_table( + "NMA_view_NGWMN_WaterLevels", + sa.Column("PointID", sa.String(length=50), primary_key=True), + sa.Column("DateMeasured", sa.Date(), primary_key=True), + sa.Column("DepthToWaterBGS", sa.Float(), nullable=True), + sa.Column("WLUnits", sa.String(length=10), nullable=True), + sa.Column("MeasurementMethod", sa.String(length=50), nullable=True), + sa.Column("WLAccuracy", sa.Float(), nullable=True), + sa.Column("PublicRelease", sa.Boolean(), nullable=True), + ) + + if not inspector.has_table("NMA_view_NGWMN_Lithology"): + op.create_table( + "NMA_view_NGWMN_Lithology", + sa.Column("OBJECTID", sa.Integer(), primary_key=True, autoincrement=True), + sa.Column("PointID", sa.String(length=50), nullable=True), + sa.Column("Lithology", sa.String(length=50), nullable=True), + sa.Column("TERM", sa.String(length=100), nullable=True), + sa.Column("StratSource", sa.String(length=100), nullable=True), + sa.Column("StratTop", sa.Float(), nullable=True), + sa.Column("StratTopUnit", sa.String(length=20), nullable=True), + sa.Column("StratBottom", sa.Float(), nullable=True), + sa.Column("StratBottomUnit", sa.String(length=20), nullable=True), + ) + + +def downgrade() -> None: + """Drop the NGWMN legacy view tables.""" + bind = op.get_bind() + inspector = inspect(bind) + + for table in ( + "NMA_view_NGWMN_Lithology", + "NMA_view_NGWMN_WaterLevels", + "NMA_view_NGWMN_WellConstruction", + ): + if inspector.has_table(table): + op.drop_table(table) diff --git a/api/ngwmn.py b/api/ngwmn.py new file mode 100644 index 000000000..4d8b065cf --- /dev/null +++ b/api/ngwmn.py @@ -0,0 +1,56 @@ +# =============================================================================== +# Copyright 2023 Jake Ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +from fastapi import APIRouter +from starlette.responses import Response + +from core.dependencies import session_dependency +from services.ngwmn_helper import ( + make_waterlevels_response, + make_well_construction_response, + make_lithology_response, +) + +router = APIRouter(prefix="/ngwmn", tags=["NGWMN"]) + + +@router.get( + "/waterlevels/{pointid}", + summary="Get waterlevels for a given pointid in the NGWMN format", +) +async def read_ngwmn_waterlevels(pointid: str, db: session_dependency): + data = make_waterlevels_response(pointid, db) + return Response(content=data, media_type="application/xml") + + +@router.get( + "/wellconstruction/{pointid}", + summary="Get wellconstruction for a given pointid in the NGWMN format", +) +async def read_ngwmn_wellconstruction(pointid: str, db: session_dependency): + data = make_well_construction_response(pointid, db) + return Response(content=data, media_type="application/xml") + + +@router.get( + "/lithology/{pointid}", + summary="Get lithology for a given pointid in the NGWMN format", +) +async def read_ngwmn_lithology(pointid: str, db: session_dependency): + data = make_lithology_response(pointid, db) + return Response(content=data, media_type="application/xml") + + +# ============= EOF ============================================= diff --git a/core/initializers.py b/core/initializers.py index 74c811bff..cc43cda8e 100644 --- a/core/initializers.py +++ b/core/initializers.py @@ -123,6 +123,7 @@ def register_routes(app): from api.asset import router as asset_router from api.search import router as search_router from api.geospatial import router as geospatial_router + from api.ngwmn import router as ngwmn_router app.include_router(asset_router) app.include_router(author_router) @@ -137,6 +138,7 @@ def register_routes(app): app.include_router(sensor_router) app.include_router(search_router) app.include_router(thing_router) + app.include_router(ngwmn_router) add_pagination(app) diff --git a/db/__init__.py b/db/__init__.py index 4a0fc8e70..1c5e33896 100644 --- a/db/__init__.py +++ b/db/__init__.py @@ -48,6 +48,7 @@ from db.thing_aquifer_association import * from db.thing_geologic_formation_association import * from db.aquifer_type import * +from db.nma_legacy import * from sqlalchemy import ( func, diff --git a/db/nma_legacy.py b/db/nma_legacy.py new file mode 100644 index 000000000..8033dcc47 --- /dev/null +++ b/db/nma_legacy.py @@ -0,0 +1,151 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== + +"""Legacy NM Aquifer models copied from AMPAPI.""" + +from datetime import date, datetime +from typing import Optional + +from sqlalchemy import ( + Boolean, + Date, + DateTime, + Float, + Integer, + String, +) +from sqlalchemy.orm import Mapped, mapped_column + +from db.base import Base + + +class NMAWaterLevelsContinuousPressureDaily(Base): + """ + Legacy view of the WaterLevelsContinuous_Pressure_Daily table from AMPAPI. + + This model is used for read-only migration/interop with the legacy NM Aquifer + data and mirrors the original column names/types closely so transfer scripts + can operate without further schema mapping. + """ + + __tablename__ = "NMA_WaterLevelsContinuous_Pressure_Daily" + + global_id: Mapped[str] = mapped_column("GlobalID", String(40), primary_key=True) + object_id: Mapped[Optional[int]] = mapped_column( + "OBJECTID", Integer, autoincrement=True + ) + well_id: Mapped[Optional[str]] = mapped_column("WellID", String(40)) + point_id: Mapped[Optional[str]] = mapped_column("PointID", String(50)) + date_measured: Mapped[datetime] = mapped_column( + "DateMeasured", DateTime, nullable=False + ) + temperature_water: Mapped[Optional[float]] = mapped_column( + "TemperatureWater", Float + ) + water_head: Mapped[Optional[float]] = mapped_column("WaterHead", Float) + water_head_adjusted: Mapped[Optional[float]] = mapped_column( + "WaterHeadAdjusted", Float + ) + depth_to_water_bgs: Mapped[Optional[float]] = mapped_column( + "DepthToWaterBGS", Float + ) + measurement_method: Mapped[Optional[str]] = mapped_column( + "MeasurementMethod", String(2) + ) + data_source: Mapped[Optional[str]] = mapped_column("DataSource", String(5)) + measuring_agency: Mapped[Optional[str]] = mapped_column( + "MeasuringAgency", String(50) + ) + qced: Mapped[Optional[bool]] = mapped_column("QCed", Boolean) + notes: Mapped[Optional[str]] = mapped_column("Notes", String(100)) + created: Mapped[datetime] = mapped_column("Created", DateTime, nullable=False) + updated: Mapped[datetime] = mapped_column("Updated", DateTime, nullable=False) + processed_by: Mapped[Optional[str]] = mapped_column("ProcessedBy", String(4)) + checked_by: Mapped[Optional[str]] = mapped_column("CheckedBy", String(4)) + cond_dl_ms_cm: Mapped[Optional[float]] = mapped_column("CONDDL (mS/cm)", Float) + + +class ViewNGWMNWellConstruction(Base): + """ + Legacy NGWMN well construction view. + + A surrogate primary key is used so rows with missing depth values can still + be represented faithfully from the legacy view. + """ + + __tablename__ = "NMA_view_NGWMN_WellConstruction" + + id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True) + point_id: Mapped[str] = mapped_column("PointID", String(50)) + casing_top: Mapped[Optional[float]] = mapped_column("CasingTop", Float) + casing_bottom: Mapped[Optional[float]] = mapped_column("CasingBottom", Float) + casing_depth_units: Mapped[Optional[str]] = mapped_column( + "CasingDepthUnits", String(20) + ) + screen_top: Mapped[Optional[float]] = mapped_column("ScreenTop", Float) + screen_bottom: Mapped[Optional[float]] = mapped_column("ScreenBottom", Float) + screen_bottom_unit: Mapped[Optional[str]] = mapped_column( + "ScreenBottomUnit", String(20) + ) + screen_description: Mapped[Optional[str]] = mapped_column( + "ScreenDescription", String(250) + ) + casing_description: Mapped[Optional[str]] = mapped_column( + "CasingDescription", String(250) + ) + + +class ViewNGWMNWaterLevels(Base): + """ + Legacy NGWMN water levels view. + """ + + __tablename__ = "NMA_view_NGWMN_WaterLevels" + + point_id: Mapped[str] = mapped_column("PointID", String(50), primary_key=True) + date_measured: Mapped[date] = mapped_column("DateMeasured", Date, primary_key=True) + depth_to_water_bgs: Mapped[Optional[float]] = mapped_column( + "DepthToWaterBGS", Float + ) + wl_units: Mapped[Optional[str]] = mapped_column("WLUnits", String(10)) + measurement_method: Mapped[Optional[str]] = mapped_column( + "MeasurementMethod", String(50) + ) + wl_accuracy: Mapped[Optional[float]] = mapped_column("WLAccuracy", Float) + public_release: Mapped[Optional[bool]] = mapped_column("PublicRelease", Boolean) + + +class ViewNGWMNLithology(Base): + """ + Legacy NGWMN lithology view. + """ + + __tablename__ = "NMA_view_NGWMN_Lithology" + + object_id: Mapped[int] = mapped_column("OBJECTID", Integer, primary_key=True) + point_id: Mapped[str] = mapped_column("PointID", String(50)) + lithology: Mapped[Optional[str]] = mapped_column("Lithology", String(50)) + term: Mapped[Optional[str]] = mapped_column("TERM", String(100)) + strat_source: Mapped[Optional[str]] = mapped_column("StratSource", String(100)) + strat_top: Mapped[Optional[float]] = mapped_column("StratTop", Float) + strat_top_unit: Mapped[Optional[str]] = mapped_column("StratTopUnit", String(20)) + strat_bottom: Mapped[Optional[float]] = mapped_column("StratBottom", Float) + strat_bottom_unit: Mapped[Optional[str]] = mapped_column( + "StratBottomUnit", String(20) + ) + + +# ============= EOF ============================================= diff --git a/services/ngwmn_helper.py b/services/ngwmn_helper.py new file mode 100644 index 000000000..630da72cd --- /dev/null +++ b/services/ngwmn_helper.py @@ -0,0 +1,261 @@ +# =============================================================================== +# Copyright 2018 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +from xml.etree import ElementTree as etree +from sqlalchemy import text + +# NSMAP = dict(xsi="http://www.w3.org/2001/XMLSchema-instance", xsd="http://www.w3.org/2001/XMLSchema") + + +def make_xml_response(db, sql, point_id, func): + if not isinstance(sql, (tuple, list)): + sql = (sql,) + + rs = [] + for si in sql: + + records = db.execute(text(si), {"point_id": point_id}) + rs.append(records.fetchall()) + return func(*rs) + + +def make_lithology_response(point_id, db): + sql = "select * from NMA_view_NGWMN_Lithology where PointID=:point_id" + return make_xml_response(db, sql, point_id, lithology_xml) + + +def make_well_construction_response(point_id, db): + sql = "select * from NMA_view_NGWMN_WellConstruction where PointID=:point_id" + return make_xml_response(db, sql, point_id, well_construction_xml) + + +def make_waterlevels_response(point_id, db): + sql = "select * from dbo.view_NGWMN_WaterLevels where PointID=:point_id order by DateMeasured" + sql2 = ( + "select * from NMAWaterLevelsContinuous_Pressure_Daily where PointID=:point_id and QCed=1 order by " + "DateMeasured" + ) + + return make_xml_response(db, (sql, sql2), point_id, water_levels_xml2) + + +# ==================== make xml ======================= +def continuous_water_levels_xml(records): + return make_xml("WaterLevels", records, make_continuous_water_level) + + +def water_levels_xml(records): + return make_xml("WaterLevels", records, make_water_level) + + +def water_levels_xml2(manual, pressure): + if not pressure: + return make_xml("WaterLevels", manual, make_water_level) + else: + root = etree.Element("WaterLevels") + # doc = etree.ElementTree(root) + + columns = [ + "GlobalID", + "OBJECTID", + "WellID", + "PointID", + "DateMeasured", + "TemperatureWater", + "WaterHead", + "WaterHeadAdjusted", + "DepthToWaterBGS", + "MeasurementMethod", + "DataSource", + "MeasuringAgency", + "QCed", + "Notes", + "Created", + "Updated", + "ProcessedBy", + "CheckedBy", + "CONDDL (mS/cm)", + ] + + manual_dates = [r[1] for r in manual] + records = [] + for r in pressure: + dm = r[columns.index("DateMeasured")] + tag = "pressure" + if dm.date() in manual_dates: + ri = next((ri for ri in manual if ri[1] == dm.date())) + if ri[2] < r[columns.index("DepthToWaterBGS")]: + r = ri + tag = "manual" + manual.remove(ri) + + records.append((tag, r)) + + for mi in manual: + records.append(("manual", mi)) + + for k, record in sorted( + records, key=lambda r: r[1][4].date() if r[0] == "pressure" else r[1][1] + ): + if k == "pressure": + make_continuous_water_level(root, record) + else: + make_water_level(root, record) + return etree.tostring(root) + + +def well_construction_xml(records): + return make_xml("Casings", records, make_well_construction) + + +def lithology_xml(records): + return make_xml("Lithologies", records, make_lithology) + + +def make_xml(name, records, make_record): + root = etree.Element(name) + # doc = etree.ElementTree(root) + for r in records: + make_record(root, r) + + # etree.register_namespace('xsi', 'http://www.w3.org/2001/XMLSchema-instance') + # etree.register_namespace('xsd', 'http://www.w3.org/2001/XMLSchema') + + return etree.tostring(root) + + +# ==================== make records ======================= +def make_continuous_water_level(root, r): + elem = etree.SubElement(root, "WaterLevel") + make_point_id(elem, r, idx=3) + + columns = [ + "GlobalID", + "OBJECTID", + "WellID", + "PointID", + "DateMeasured", + "TemperatureWater", + "WaterHead", + "WaterHeadAdjusted", + "DepthToWaterBGS", + "MeasurementMethod", + "DataSource", + "MeasuringAgency", + "QCed", + "Notes", + "Created", + "Updated", + "ProcessedBy", + "CheckedBy", + "CONDDL (mS/cm)", + ] + + m = r[columns.index("DateMeasured")] + + # m = datetime.strptime(m, '%Y-%m-%d') + for attr, val in ( + ( + "DepthFromLandSurfaceData", + "{:0.2f}".format(r[columns.index("DepthToWaterBGS")]), + ), + ("WaterLevelUnits", "ft bgs"), + ("MeasuringMethod", "Pressure Transducer"), + ("MeasurementMonth", m.month), + ("MeasurementDay", m.day), + ("MeasurementYear", m.year), + ("MeasurementTime", "0:00:00"), + ("MeasurementTimezone", "MST"), + ("WaterLevelAccuracy", "0.02 ft"), + ): + e = etree.SubElement(elem, attr) + e.text = str(val) + + +def make_water_level(root, r): + elem = etree.SubElement(root, "WaterLevel") + make_point_id(elem, r) + + m = r[1] + + # m = datetime.strptime(m, '%Y-%m-%d') + for attr, val in ( + ("DepthFromLandSurfaceData", "{:0.2f}".format(r[2])), + ("WaterLevelUnits", r[3]), + ("MeasuringMethod", r[4]), + ("MeasurementMonth", m.month), + ("MeasurementDay", m.day), + ("MeasurementYear", m.year), + ("MeasurementTime", "0:00:00"), + ("MeasurementTimezone", "MST"), + ("WaterLevelAccuracy", r[5]), + ): + e = etree.SubElement(elem, attr) + e.text = str(val) + + +def make_well_construction(root, r): + """ + 0 1 2 3 4 5 6, 7, 8 + pointid, castop, casbottom, cadepthunits, screentop, screenbotom, units,screen description, casing description + :param root: + :param r: + :return: + """ + elem = etree.SubElement(root, "Casing") + make_point_id(elem, r) + + e = etree.SubElement(elem, "CasingTop") + e.text = str(r[1]) + + e = etree.SubElement(elem, "CasingBottom") + e.text = str(r[2]) + + e = etree.SubElement(elem, "CasingDepthUnits") + e.text = str(r[3]) + + e = etree.SubElement(elem, "ScreenTop") + e.text = str(r[4]) + + e = etree.SubElement(elem, "ScreenBottom") + e.text = str(r[5]) + + e = etree.SubElement(elem, "ScreenDescription") + e.text = str(r[7]) + + e = etree.SubElement(elem, "ScreenMaterial") + e.text = "steel" + + +def make_lithology(root, r): + elem = etree.SubElement(root, "Lithology") + make_point_id(elem, r) + + e = etree.SubElement(elem, "TopDepth") + e.text = str(r[1]) + + e = etree.SubElement(elem, "BottomDepth") + e.text = str(r[2]) + + e = etree.SubElement(elem, "Units") + e.text = "feet" + + e = etree.SubElement(elem, "Description") + e.text = str(r[3]) + + +def make_point_id(elem, r, idx=0): + e = etree.SubElement(elem, "PointID") + e.text = r[idx] diff --git a/transfers/backfill/__init__.py b/transfers/backfill/__init__.py new file mode 100644 index 000000000..e966b4ad9 --- /dev/null +++ b/transfers/backfill/__init__.py @@ -0,0 +1,17 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== + +# ============= EOF ============================================= diff --git a/transfers/backfill/ngwmn_views.py b/transfers/backfill/ngwmn_views.py new file mode 100644 index 000000000..05f16459e --- /dev/null +++ b/transfers/backfill/ngwmn_views.py @@ -0,0 +1,211 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== + +from __future__ import annotations + +from typing import Any, Optional + +import pandas as pd +from sqlalchemy import insert, text +from sqlalchemy.orm import Session + +from db import ( + ViewNGWMNLithology, + ViewNGWMNWaterLevels, + ViewNGWMNWellConstruction, +) +from transfers.logger import logger +from transfers.transferer import Transferer +from transfers.util import read_csv + + +class _BaseNGWMNBackfill(Transferer): + """ + Base class for backfilling legacy NGWMN view tables from CSVs in GCS. + """ + + model = None + parse_dates: list[str] | None = None + + def __init__(self, *args, batch_size: int = 1000, **kwargs): + super().__init__(*args, **kwargs) + self.batch_size = batch_size + + def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: + df = read_csv(self.source_table, parse_dates=self.parse_dates) + return df, df + + def _transfer_hook(self, session: Session) -> None: + rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] + + for i in range(0, len(rows), self.batch_size): + chunk = rows[i : i + self.batch_size] + logger.info( + f"Upserting batch {i}-{i+len(chunk)-1} ({len(chunk)} rows) into {self.model.__tablename__}" + ) + stmt = ( + insert(self.model) + .values(chunk) + .on_conflict_do_update( + index_elements=self._conflict_columns(), + set_=self._upsert_set_clause(), + ) + ) + session.execute(stmt) + session.commit() + session.expunge_all() + + def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + raise NotImplementedError("_row_dict must be implemented in subclasses") + + @staticmethod + def _val(row: dict[str, Any], key: str) -> Optional[Any]: + v = row.get(key) + if pd.isna(v): + return None + return v + + def _conflict_columns(self) -> list[str]: + raise NotImplementedError("_conflict_columns must be implemented") + + def _upsert_set_clause(self) -> dict[str, Any]: + raise NotImplementedError("_upsert_set_clause must be implemented") + + +class NGWMNWellConstructionBackfill(_BaseNGWMNBackfill): + source_table = "view_NGWMN_WellConstruction" + model = ViewNGWMNWellConstruction + + def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + val = self._val + return { + "PointID": val(row, "PointID"), + "CasingTop": val(row, "CasingTop"), + "CasingBottom": val(row, "CasingBottom"), + "CasingDepthUnits": val(row, "CasingDepthUnits"), + "ScreenTop": val(row, "ScreenTop"), + "ScreenBottom": val(row, "ScreenBottom"), + "ScreenBottomUnit": val(row, "ScreenBottomUnit"), + "ScreenDescription": val(row, "ScreenDescription"), + "CasingDescription": val(row, "CasingDescription"), + } + + def _conflict_columns(self) -> list[str]: + return ["PointID", "CasingTop", "ScreenTop"] + + def _upsert_set_clause(self) -> dict[str, Any]: + excluded = insert(self.model).excluded + return { + "CasingBottom": excluded.CasingBottom, + "CasingDepthUnits": excluded.CasingDepthUnits, + "ScreenBottom": excluded.ScreenBottom, + "ScreenBottomUnit": excluded.ScreenBottomUnit, + "ScreenDescription": excluded.ScreenDescription, + "CasingDescription": excluded.CasingDescription, + } + + +class NGWMNWaterLevelsBackfill(_BaseNGWMNBackfill): + source_table = "view_NGWMN_WaterLevels" + model = ViewNGWMNWaterLevels + parse_dates = ["DateMeasured"] + + def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + val = self._val + dm = val(row, "DateMeasured") + if hasattr(dm, "date"): + dm = dm.date() + return { + "PointID": val(row, "PointID"), + "DateMeasured": dm, + "DepthToWaterBGS": val(row, "DepthToWaterBGS"), + "WLUnits": val(row, "WLUnits"), + "MeasurementMethod": val(row, "MeasurementMethod"), + "WLAccuracy": val(row, "WLAccuracy"), + "PublicRelease": val(row, "PublicRelease"), + } + + def _conflict_columns(self) -> list[str]: + return ["PointID", "DateMeasured"] + + def _upsert_set_clause(self) -> dict[str, Any]: + excluded = insert(self.model).excluded + return { + "DepthToWaterBGS": excluded.DepthToWaterBGS, + "WLUnits": excluded.WLUnits, + "MeasurementMethod": excluded.MeasurementMethod, + "WLAccuracy": excluded.WLAccuracy, + "PublicRelease": excluded.PublicRelease, + } + + +class NGWMNLithologyBackfill(_BaseNGWMNBackfill): + source_table = "view_NGWMN_Lithology" + model = ViewNGWMNLithology + + def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + val = self._val + return { + "OBJECTID": val(row, "OBJECTID"), + "PointID": val(row, "PointID"), + "Lithology": val(row, "Lithology"), + "TERM": val(row, "TERM"), + "StratSource": val(row, "StratSource"), + "StratTop": val(row, "StratTop"), + "StratTopUnit": val(row, "StratTopUnit"), + "StratBottom": val(row, "StratBottom"), + "StratBottomUnit": val(row, "StratBottomUnit"), + } + + def _conflict_columns(self) -> list[str]: + return ["OBJECTID"] + + def _upsert_set_clause(self) -> dict[str, Any]: + excluded = insert(self.model).excluded + return { + "PointID": excluded.PointID, + "Lithology": excluded.Lithology, + "TERM": excluded.TERM, + "StratSource": excluded.StratSource, + "StratTop": excluded.StratTop, + "StratTopUnit": excluded.StratTopUnit, + "StratBottom": excluded.StratBottom, + "StratBottomUnit": excluded.StratBottomUnit, + } + + +def run(batch_size: int = 1000) -> None: + """ + Entrypoint to backfill all NGWMN view tables. + + Tables are processed sequentially to keep memory use bounded. + """ + + for backfill_cls in ( + NGWMNWellConstructionBackfill, + NGWMNWaterLevelsBackfill, + NGWMNLithologyBackfill, + ): + logger.info(f"Starting {backfill_cls.__name__}") + backfill = backfill_cls(batch_size=batch_size) + backfill.transfer() + logger.info(f"Finished {backfill_cls.__name__}") + + +if __name__ == "__main__": + run() + +# ============= EOF ============================================= diff --git a/transfers/backfill/staging.py b/transfers/backfill/staging.py new file mode 100644 index 000000000..7e0417ce3 --- /dev/null +++ b/transfers/backfill/staging.py @@ -0,0 +1,66 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +""" +Orchestrates all backfills used in the staging CD pipeline. + +Run with: + python -m transfers.backfill.staging --batch-size 1000 +""" + +import argparse +import sys +from transfers.backfill.ngwmn_views import run as run_ngwmn_views +from transfers.backfill.waterlevelscontinuous_pressure_daily import ( + run as run_pressure_daily, +) +from transfers.logger import logger + + +def run(batch_size: int = 1000) -> None: + """ + Execute all backfill steps in a deterministic order. + """ + steps = ( + ("WaterLevelsContinuous_Pressure_Daily", run_pressure_daily), + ("NGWMN views", lambda: run_ngwmn_views(batch_size=batch_size)), + ) + + for name, fn in steps: + logger.info(f"Starting backfill: {name}") + fn(batch_size=batch_size) + logger.info(f"Completed backfill: {name}") + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Run staging backfills.") + parser.add_argument( + "--batch-size", + type=int, + default=1000, + help="Number of rows to insert per batch.", + ) + return parser.parse_args() + + +if __name__ == "__main__": + args = _parse_args() + try: + run(batch_size=args.batch_size) + except Exception as exc: + logger.critical(f"Backfill orchestration failed: {exc}") + sys.exit(1) + +# ============= EOF ============================================= diff --git a/transfers/backfill/waterlevelscontinuous_pressure_daily.py b/transfers/backfill/waterlevelscontinuous_pressure_daily.py new file mode 100644 index 000000000..886c9a900 --- /dev/null +++ b/transfers/backfill/waterlevelscontinuous_pressure_daily.py @@ -0,0 +1,133 @@ +# =============================================================================== +# Copyright 2026 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== + +from __future__ import annotations + +from typing import Any, Optional + +import pandas as pd +from sqlalchemy import insert, text +from sqlalchemy.orm import Session + +from db import NMAWaterLevelsContinuousPressureDaily +from db.engine import session_ctx +from transfers.logger import logger +from transfers.transferer import Transferer +from transfers.util import read_csv + + +class NMAWaterLevelsContinuousPressureDailyBackfill(Transferer): + """ + Backfill for the legacy WaterLevelsContinuous_Pressure_Daily table. + + Uses the Transferer utilities to load the CSV into a DataFrame and performs + a batch insert into the legacy table. + """ + + source_table = "WaterLevelsContinuous_Pressure_Daily" + + def __init__(self, *args, batch_size: int = 1000, **kwargs): + super().__init__(*args, **kwargs) + self.batch_size = batch_size + + def _get_dfs(self) -> tuple[pd.DataFrame, pd.DataFrame]: + # Parse key datetime columns eagerly to avoid per-row parsing later. + input_df = read_csv( + self.source_table, + parse_dates=["DateMeasured", "Created", "Updated"], + ) + # No special cleaning/validation beyond raw import; keep identical copy. + return input_df, input_df + + def _transfer_hook(self, session: Session) -> None: + rows = [self._row_dict(row) for row in self.cleaned_df.to_dict("records")] + + insert_stmt = insert(NMAWaterLevelsContinuousPressureDaily) + excluded = insert_stmt.excluded + + for i in range(0, len(rows), self.batch_size): + chunk = rows[i : i + self.batch_size] + logger.info( + f"Upserting batch {i}-{i+len(chunk)-1} ({len(chunk)} rows) into NMA_WaterLevelsContinuous_Pressure_Daily" + ) + stmt = insert_stmt.values(chunk).on_conflict_do_update( + index_elements=["GlobalID"], + set_={ + "OBJECTID": excluded.OBJECTID, + "WellID": excluded.WellID, + "PointID": excluded.PointID, + "DateMeasured": excluded.DateMeasured, + "TemperatureWater": excluded.TemperatureWater, + "WaterHead": excluded.WaterHead, + "WaterHeadAdjusted": excluded.WaterHeadAdjusted, + "DepthToWaterBGS": excluded.DepthToWaterBGS, + "MeasurementMethod": excluded.MeasurementMethod, + "DataSource": excluded.DataSource, + "MeasuringAgency": excluded.MeasuringAgency, + "QCed": excluded.QCed, + "Notes": excluded.Notes, + "Created": excluded.Created, + "Updated": excluded.Updated, + "ProcessedBy": excluded.ProcessedBy, + "CheckedBy": excluded.CheckedBy, + "CONDDL (mS/cm)": excluded["CONDDL (mS/cm)"], + }, + ) + session.execute(stmt) + session.commit() + session.expunge_all() + + def _row_dict(self, row: dict[str, Any]) -> dict[str, Any]: + def val(key: str) -> Optional[Any]: + v = row.get(key) + if pd.isna(v): + return None + return v + + return { + "GlobalID": val("GlobalID"), + "OBJECTID": val("OBJECTID"), + "WellID": val("WellID"), + "PointID": val("PointID"), + "DateMeasured": val("DateMeasured"), + "TemperatureWater": val("TemperatureWater"), + "WaterHead": val("WaterHead"), + "WaterHeadAdjusted": val("WaterHeadAdjusted"), + "DepthToWaterBGS": val("DepthToWaterBGS"), + "MeasurementMethod": val("MeasurementMethod"), + "DataSource": val("DataSource"), + "MeasuringAgency": val("MeasuringAgency"), + "QCed": val("QCed"), + "Notes": val("Notes"), + "Created": val("Created"), + "Updated": val("Updated"), + "ProcessedBy": val("ProcessedBy"), + "CheckedBy": val("CheckedBy"), + "CONDDL (mS/cm)": val("CONDDL (mS/cm)"), + } + + +def run(batch_size: int = 1000) -> None: + """Entrypoint to execute the backfill.""" + transferer = NMAWaterLevelsContinuousPressureDailyBackfill(batch_size=batch_size) + transferer.transfer() + + +if __name__ == "__main__": + # Allow running via `python -m transfers.backfill.waterlevelscontinuous_pressure_daily` + run() + +# ============= EOF =============================================