Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 8 additions & 18 deletions schemas/water_level_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# ===============================================================================
from __future__ import annotations

from datetime import datetime, timezone
from datetime import datetime
from typing import Annotated

from core.enums import DataQuality, GroundwaterLevelReason, SampleMethod
Expand All @@ -29,7 +29,7 @@
)
from pydantic.functional_validators import BeforeValidator

from services.util import convert_dt_tz_naive_to_tz_aware
from services.util import normalize_datetime_to_utc

WATER_LEVEL_REQUIRED_FIELDS = [
"well_name_point_id",
Expand Down Expand Up @@ -84,18 +84,6 @@ def empty_str_to_none(value):
OptionalFloat = Annotated[float | None, BeforeValidator(empty_str_to_none)]


def _normalize_datetime_to_utc(value: datetime | str) -> datetime:
if isinstance(value, str):
value = datetime.fromisoformat(value)
elif not isinstance(value, datetime):
raise ValueError("value must be a datetime or ISO format string")

if value.tzinfo is None:
value = convert_dt_tz_naive_to_tz_aware(value, "America/Denver")

return value.astimezone(timezone.utc)


def _canonicalize_enum_value(
value: str | None, enum_cls, field_name: str
) -> str | None:
Expand Down Expand Up @@ -168,7 +156,7 @@ def canonicalize_sample_method(value: str) -> str:

@field_validator("sample_method")
@classmethod
def normalize_sample_method(cls, value: str) -> str:
def normalize_sample_method(cls, value: str) -> str | None:
return _canonicalize_enum_value(
cls.canonicalize_sample_method(value),
SampleMethod,
Expand All @@ -178,11 +166,13 @@ def normalize_sample_method(cls, value: str) -> str:
@field_validator(
"field_event_date_time",
"water_level_date_time",
mode="before",
mode="after",
)
@classmethod
def normalize_datetime_field(cls, value: datetime | str) -> datetime:
return _normalize_datetime_to_utc(value)
def normalize_datetime_field(cls, value: datetime | None) -> datetime | None:
if value is None or (isinstance(value, str) and value.strip() == ""):
return None
return normalize_datetime_to_utc(value)

@field_validator("depth_to_water_ft")
@classmethod
Expand Down
29 changes: 14 additions & 15 deletions schemas/well_inventory.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
AliasChoices,
)
from schemas import past_or_today_validator, PastOrTodayDatetime
from services.util import convert_dt_tz_naive_to_tz_aware
from services.util import normalize_datetime_to_utc


def empty_str_to_none(v):
Expand Down Expand Up @@ -361,20 +361,19 @@ def normalize_complete_monitoring_frequency(cls, data):

return data

@field_validator("date_time", mode="before")
def make_date_time_tz_aware(cls, v):
if isinstance(v, str):
dt = datetime.fromisoformat(v)
elif isinstance(v, datetime):
dt = v
else:
raise ValueError("date_time must be a datetime or ISO format string")

if dt.tzinfo is None:
aware_dt = convert_dt_tz_naive_to_tz_aware(dt, "America/Denver")
return aware_dt
else:
raise ValueError("date_time must be a timezone-naive datetime")
@field_validator("date_time", mode="after")
@classmethod
def normalize_date_time(cls, value: datetime | None) -> datetime | None:
if value is None or (isinstance(value, str) and value.strip() == ""):
return None
return normalize_datetime_to_utc(value)

@field_validator("measurement_date_time", mode="after")
@classmethod
def normalize_measurement_date_time(cls, value: datetime | None) -> datetime | None:
if value is None or (isinstance(value, str) and value.strip() == ""):
return None
return normalize_datetime_to_utc(value)

@model_validator(mode="after")
def validate_model(self):
Expand Down
24 changes: 22 additions & 2 deletions services/util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import json
import logging
import time
from datetime import datetime
from datetime import datetime, timezone
from zoneinfo import ZoneInfo

import httpx
Expand Down Expand Up @@ -64,6 +64,26 @@ def transform_srid(geometry, source_srid, target_srid):
return transform(transformer.transform, geometry)


def normalize_datetime_to_utc(value: datetime | str) -> datetime:
dt: datetime

if isinstance(value, str):
dt = datetime.fromisoformat(value)
elif isinstance(value, datetime):
dt = value
else:
raise ValueError("value must be a datetime or ISO format string")

# Treat the datetime as "naive" if it has no tzinfo OR its tzinfo does not
# provide a valid UTC offset (utcoffset() returns None). Some tzinfo
# implementations can be attached but still behave like naive datetimes,
# so we handle both cases before assigning a default timezone.
if dt.tzinfo is None or dt.utcoffset() is None:
dt = convert_dt_tz_naive_to_tz_aware(dt, "America/Denver")

return dt.astimezone(timezone.utc)


def convert_dt_tz_naive_to_tz_aware(
dt_naive: datetime,
iana_timezone: str = "America/Denver",
Expand Down Expand Up @@ -156,7 +176,7 @@ def get_county_from_point(lon: float, lat: float) -> str | None:
return attrs["BASENAME"]


def get_quad_name_from_point(lon: float, lat: float) -> str:
def get_quad_name_from_point(lon: float, lat: float) -> str | None:
url = "https://carto.nationalmap.gov/arcgis/rest/services/map_indices/MapServer/10/query"
params = {
"f": "json",
Expand Down
116 changes: 54 additions & 62 deletions tests/features/steps/well-inventory-csv.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import tempfile
from datetime import datetime, timedelta
from datetime import datetime, timezone
from pathlib import Path

from behave import given, when, then
Expand All @@ -9,8 +9,10 @@
from db import Thing
from db.engine import session_ctx
from db.lexicon import LexiconCategory
from services.util import convert_dt_tz_naive_to_tz_aware
from sqlalchemy import select
from zoneinfo import ZoneInfo

MOUNTAIN_TZ = ZoneInfo("America/Denver")


@given("valid lexicon values exist for:")
Expand Down Expand Up @@ -64,32 +66,28 @@ def step_given_the_csv_includes_optional_water_level_entry_fields_when_available


@given(
'the required "date_time" values are valid ISO 8601 timezone-naive datetime strings (e.g. "2025-02-15T10:30:00")'
'the required "date_time" values are valid ISO 8601 datetime strings (timezone-naive or timezone-aware)'
)
def step_step_step(context: Context):
def step_validate_required_datetime(context: Context):
"""Verifies that "date_time" values are valid ISO 8601 timezone-naive datetime strings."""
for row in context.rows:
try:
date_time = datetime.fromisoformat(row["date_time"])
assert (
date_time.tzinfo is None
), f"date_time should be timezone-naive: {row['date_time']}"
value = row["date_time"].replace("Z", "+00:00")
datetime.fromisoformat(value)
except ValueError as e:
raise ValueError(f"Invalid date_time: {row['date_time']}") from e


@given(
'the optional "water_level_date_time" values are valid ISO 8601 timezone-naive datetime strings (e.g. "2025-02-15T10:30:00") when provided'
'the optional "water_level_date_time" values are valid ISO 8601 datetime strings (timezone-naive or timezone-aware) when provided'
)
def step_step_step_2(context: Context):
def step_validate_optional_datetime(context: Context):
"""Verifies that "water_level_date_time" values are valid ISO 8601 timezone-naive datetime strings."""
for row in context.rows:
if row.get("water_level_date_time", None):
try:
date_time = datetime.fromisoformat(row["water_level_date_time"])
assert (
date_time.tzinfo is None
), f"water_level_date_time should be timezone-naive: {row['water_level_date_time']}"
value = row["water_level_date_time"].replace("Z", "+00:00")
datetime.fromisoformat(value)
except ValueError as e:
raise ValueError(
f"Invalid water_level_date_time: {row['water_level_date_time']}"
Expand All @@ -99,6 +97,28 @@ def step_step_step_2(context: Context):
@when("I upload the file to the bulk upload endpoint")
@when("I run the well inventory bulk upload command")
def step_when_i_run_the_well_inventory_bulk_upload_command(context: Context):
context.datetime_pairs = []
context.normalized_datetimes = []

for row in getattr(context, "rows", []):
raw = row.get("date_time")
if not raw:
continue
try:
original = datetime.fromisoformat(raw.replace("Z", "+00:00"))
except ValueError:
continue

if original.tzinfo is None:
aware = original.replace(tzinfo=MOUNTAIN_TZ)
else:
aware = original

normalized = aware.astimezone(timezone.utc)

context.datetime_pairs.append((original, normalized))
context.normalized_datetimes.append(normalized)

suffix = Path(getattr(context, "file_name", "upload.csv")).suffix or ".csv"
with tempfile.NamedTemporaryFile(mode="w", suffix=suffix, delete=False) as fp:
fp.write(context.file_content)
Expand Down Expand Up @@ -141,58 +161,30 @@ def json(self):
return self._json


@then(
"all datetime objects are assigned the correct Mountain Time timezone offset based on the date value."
)
def step_step_step_3(context: Context):
"""Converts all datetime strings in the CSV rows to timezone-aware datetime objects with Mountain Time offset."""
for i, row in enumerate(context.rows):
# Convert date_time field
date_time_naive = datetime.fromisoformat(row["date_time"])
date_time_aware = convert_dt_tz_naive_to_tz_aware(
date_time_naive, "America/Denver"
)
row["date_time"] = date_time_aware.isoformat()

# confirm correct time zone and offset
if i == 0:
# MST, offset -07:00
assert date_time_aware.utcoffset() == timedelta(
hours=-7
), "date_time offset is not -07:00"
else:
# MDT, offset -06:00
assert date_time_aware.utcoffset() == timedelta(
hours=-6
), "date_time offset is not -06:00"
@then("all datetime objects are normalized to UTC")
def step_all_normalized_to_utc(context):
for dt in context.normalized_datetimes:
assert dt.tzinfo == timezone.utc, f"Not UTC: {dt}"

# confirm the time was not changed from what was provided
assert (
date_time_aware.replace(tzinfo=None) == date_time_naive
), "date_time value was changed during timezone assignment"

# Convert water_level_date_time field if it exists
if row.get("water_level_date_time", None):
wl_date_time_naive = datetime.fromisoformat(row["water_level_date_time"])
wl_date_time_aware = convert_dt_tz_naive_to_tz_aware(
wl_date_time_naive, "America/Denver"
)
row["water_level_date_time"] = wl_date_time_aware.isoformat()

if wl_date_time_aware.dst():
# MDT, offset -06:00
assert wl_date_time_aware.utcoffset() == timedelta(
hours=-6
), "water_level_date_time offset is not -06:00"
else:
# MST, offset -07:00
assert wl_date_time_aware.utcoffset() == timedelta(
hours=-7
), "water_level_date_time offset is not -07:00"
@then("timezone-naive datetimes are interpreted as Mountain Time before conversion")
def step_naive_as_mountain(context):
for original, normalized in context.datetime_pairs:
if original.tzinfo is None:
expected = original.replace(tzinfo=MOUNTAIN_TZ).astimezone(timezone.utc)
assert (
normalized == expected
), f"Naive datetime not handled as Mountain Time: {original}"


@then("timezone-aware datetimes are converted to UTC using their provided offset")
def step_aware_to_utc(context):
for original, normalized in context.datetime_pairs:
if original.tzinfo is not None:
expected = original.astimezone(timezone.utc)
assert (
wl_date_time_aware.replace(tzinfo=None) == wl_date_time_naive
), "water_level_date_time value was changed during timezone assignment"
normalized == expected
), f"Aware datetime not converted correctly: {original}"


@then("the response includes a summary containing:")
Expand Down
10 changes: 7 additions & 3 deletions tests/features/well-inventory-csv.feature
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,20 @@ Feature: Bulk upload well inventory from CSV via CLI
| depth_to_water_ft |
| data_quality |
| water_level_notes |
And the required "date_time" values are valid ISO 8601 timezone-naive datetime strings (e.g. "2025-02-15T10:30:00")
And the optional "water_level_date_time" values are valid ISO 8601 timezone-naive datetime strings (e.g. "2025-02-15T10:30:00") when provided
And the required "date_time" values are valid ISO 8601 datetime strings (timezone-naive or timezone-aware)
# e.g. "2025-02-15T10:30:00" or "2025-02-15T10:30:00-07:00"
And the optional "water_level_date_time" values are valid ISO 8601 datetime strings (timezone-naive or timezone-aware) when provided
# e.g. "2025-02-15T10:30:00" or "2025-02-15T10:30:00-07:00

# And all optional lexicon fields contain valid lexicon values when provided
# And all optional numeric fields contain valid numeric values when provided
# And all optional date fields contain valid ISO 8601 timestamps when provided

When I run the well inventory bulk upload command
# assumes users are entering datetimes as Mountain Time because location is restricted to New Mexico
Then all datetime objects are assigned the correct Mountain Time timezone offset based on the date value.
Then all datetime objects are normalized to UTC
And timezone-naive datetimes are interpreted as Mountain Time before conversion
And timezone-aware datetimes are converted to UTC using their provided offset
And the command exits with code 0
# And null values in the response are represented as JSON null
And the response includes a summary containing:
Expand Down
Loading
Loading