diff --git a/api/contact.py b/api/contact.py index 38ddc254e..0525e4126 100644 --- a/api/contact.py +++ b/api/contact.py @@ -13,23 +13,27 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -from typing import List, Annotated - -from fastapi import APIRouter, Depends, Query -from pydantic import BaseModel, Field -from pydantic_core import PydanticUndefined +from fastapi import APIRouter, Query from fastapi import APIRouter from sqlalchemy import select from starlette import status - +from sqlalchemy.exc import ProgrammingError from api.pagination import CustomPage from fastapi_pagination.ext.sqlalchemy import paginate -from core.dependencies import session_dependency -from db import ThingContactAssociation, Thing -from db.contact import Contact, Email, Phone, Address +from core.dependencies import ( + session_dependency, + amp_admin_dependency, + amp_editor_dependency, + amp_viewer_dependency, +) +from db import ThingContactAssociation, Thing, Contact, Email, Phone, Address, adder from schemas.contact import ( CreateContact, + CreateAddress, + CreateEmail, + CreatePhone, + CreateThingAssociation, PhoneResponse, EmailResponse, AddressResponse, @@ -39,16 +43,88 @@ UpdatePhone, UpdateAddress, ) -from services.crud_helper import model_patcher -from services.people_helper import add_contact +from services.crud_helper import model_patcher, model_deleter +from services.contact_helper import ( + add_contact, +) from services.query_helper import ( simple_get_by_id, paginated_all_getter, order_sort_filter, ) +from services.exceptions_helper import PydanticStyleException router = APIRouter(prefix="/contact", tags=["contact"]) +# ====== DB ERROR HANDLERS ===================================================== + + +def database_error_handler( + payload: CreateThingAssociation, error: ProgrammingError +) -> None: + """ + Handle errors raised by the database when adding or updating a sample. + """ + + error_message = error.orig.args[0]["M"] + + if ( + error_message + == 'insert or update on table "thing_contact_association" violates foreign key constraint "thing_contact_association_thing_id_fkey"' + ): + detail = { + "loc": ["body", "thing_id"], + "msg": f"Thing with ID {payload.thing_id} not found.", + "type": "value_error", + "input": {"thing_id": payload.thing_id}, + } + + elif ( + error_message + == 'insert or update on table "thing_contact_association" violates foreign key constraint "thing_contact_association_contact_id_fkey"' + ): + detail = { + "loc": ["body", "contact_id"], + "msg": f"Contact with ID {payload.contact_id} not found.", + "type": "value_error", + "input": {"contact_id": payload.contact_id}, + } + elif ( + error_message + == 'insert or update on table "email" violates foreign key constraint "email_contact_id_fkey"' + ): + detail = { + "loc": ["body", "contact_id"], + "msg": f"Contact with ID {payload.contact_id} not found.", + "type": "value_error", + "input": {"contact_id": payload.contact_id}, + } + elif ( + error_message + == 'insert or update on table "phone" violates foreign key constraint "phone_contact_id_fkey"' + ): + detail = { + "loc": ["body", "contact_id"], + "msg": f"Contact with ID {payload.contact_id} not found.", + "type": "value_error", + "input": {"contact_id": payload.contact_id}, + } + elif ( + error_message + == 'insert or update on table "address" violates foreign key constraint "address_contact_id_fkey"' + ): + detail = { + "loc": ["body", "contact_id"], + "msg": f"Contact with ID {payload.contact_id} not found.", + "type": "value_error", + "input": {"contact_id": payload.contact_id}, + } + + raise PydanticStyleException(status_code=status.HTTP_409_CONFLICT, detail=[detail]) + + +# ====== POST ================================================================== + @router.post( "", @@ -56,41 +132,94 @@ status_code=status.HTTP_201_CREATED, ) def create_contact( - contact_data: CreateContact, session: session_dependency + contact_data: CreateContact, session: session_dependency, user: amp_admin_dependency ) -> ContactResponse: + try: + return add_contact(session, contact_data, user=user) + except ProgrammingError as e: + database_error_handler(contact_data, e) - return add_contact(session, contact_data) - - # return adder(session, Contact, contact_data) - -@router.patch("/{contact_id}", summary="Update contact") -def update_contact( - contact_id: int, - contact_data: UpdateContact, +@router.post( + "/address", + summary="Add an address to a contact", + status_code=status.HTTP_201_CREATED, +) +def create_address( + address_data: CreateAddress, session: session_dependency, -) -> ContactResponse: + user: amp_admin_dependency, +) -> AddressResponse: """ - Update an existing contact in the database. - :param contact_id: ID of the contact to update - :param contact_data: Data to update the contact with + Add a new address to an existing contact in the database. + :param contact_id: ID of the contact to add the address to + :param address_data: Data for the new address :param session: Database session - :return: Updated contact response + :return: Response containing the added address """ - # contact = simple_get_by_id(session, Contact, contact_id) - # if not contact: - # return {"message": "Contact not found"} - # - # for key, value in contact_data.model_dump().items(): - # setattr(contact, key, value) - # - # session.commit() - # session.refresh(contact) + try: + return adder(session, Address, address_data, user=user) + except ProgrammingError as e: + database_error_handler(address_data, e) - # return contact - return model_patcher(session, Contact, contact_id, contact_data) +@router.post( + "/email", + summary="Add an email to a contact", + status_code=status.HTTP_201_CREATED, +) +def create_email( + email_data: CreateEmail, + session: session_dependency, + user: amp_admin_dependency, +) -> EmailResponse: + try: + return adder(session, Email, email_data, user=user) + except ProgrammingError as e: + database_error_handler(email_data, e) + +@router.post( + "/phone", + summary="Add a phone number to a contact", + status_code=status.HTTP_201_CREATED, +) +def create_phone( + phone_data: CreatePhone, + session: session_dependency, + user: amp_admin_dependency, +) -> PhoneResponse: + try: + return adder(session, Phone, phone_data, user=user) + except ProgrammingError as e: + database_error_handler(phone_data, e) + + +# @router.post( +# "/{contact_id}/thing-association", +# summary="Add a thing-contact association to a contact", +# status_code=status.HTTP_201_CREATED, +# ) +# def add_thing_association_to_contact( +# contact_id: int, +# thing_association_data: CreateThingAssociation, +# session: session_dependency, +# user: amp_admin_dependency, +# ) -> ThingContactAssociationResponse: +# contact = simple_get_by_id(session, Contact, contact_id) +# try: +# return add_thing_association( +# session, contact.id, thing_association_data, user=user +# ) +# except ProgrammingError as e: +# database_error_handler(thing_association_data, e) + + +# PATCH ======================================================================== + + +# TODO: catch database errors with patches, most likely foreign key constraints +# then return a 409 response @router.patch( "/email/{email_id}", ) @@ -98,11 +227,12 @@ def update_contact_email( email_id: int, email_data: UpdateEmail, session: session_dependency, + user: amp_editor_dependency, ) -> EmailResponse: """ Update an existing contact's email in the database. """ - return model_patcher(session, Email, email_id, email_data) + return model_patcher(session, Email, email_id, email_data, user=user) @router.patch( @@ -112,6 +242,7 @@ def update_contact_phone( phone_id: int, phone_data: UpdatePhone, session: session_dependency, + user: amp_editor_dependency, ) -> PhoneResponse: """ Update an existing contact's phone number in the database. @@ -121,7 +252,7 @@ def update_contact_phone( :param session: Database session :return: Updated contact response """ - return model_patcher(session, Phone, phone_id, phone_data) + return model_patcher(session, Phone, phone_id, phone_data, user=user) @router.patch( @@ -131,6 +262,7 @@ def update_contact_address( address_id: int, address_data: UpdateAddress, session: session_dependency, + user: amp_editor_dependency, ) -> AddressResponse: """ Update an existing contact's address in the database. @@ -140,12 +272,158 @@ def update_contact_address( :param session: :return: """ - return model_patcher(session, Address, address_id, address_data) + return model_patcher(session, Address, address_id, address_data, user=user) + + +# @router.patch( +# "/thing-association/{thing_contact_association_id}", +# summary="Update thing-contact association", +# ) +# def update_thing_contact_association( +# thing_contact_association_id: int, +# thing_contact_association_data: UpdateThingContactAssociation, +# session: session_dependency, +# user: amp_editor_dependency, +# ) -> ThingContactAssociationResponse: +# """ +# Update an existing thing-contact association in the database. + +# :param thing_contact_association_id: +# :param thing_contact_association_data: +# :param session: +# :return: +# """ +# try: +# return model_patcher( +# session, +# ThingContactAssociation, +# thing_contact_association_id, +# thing_contact_association_data, +# user=user, +# ) +# except ProgrammingError as e: +# database_error_handler(thing_contact_association_data, e) + + +@router.patch("/{contact_id}", summary="Update contact") +def update_contact( + contact_id: int, + contact_data: UpdateContact, + session: session_dependency, + user: amp_editor_dependency, +) -> ContactResponse: + """ + Update an existing contact in the database. + :param contact_id: ID of the contact to update + :param contact_data: Data to update the contact with + :param session: Database session + :return: Updated contact response + """ + return model_patcher(session, Contact, contact_id, contact_data, user=user) + + +# ====== GET =================================================================== + + +@router.get("/email", summary="Get all emails") +async def get_emails( + session: session_dependency, user: amp_viewer_dependency +) -> CustomPage[EmailResponse]: + """ + Retrieve all emails from the database. + :param session: + :return: + """ + return paginated_all_getter(session, Email) + + +@router.get("/email/{email_id}", summary="Get email by ID") +async def get_email_by_id( + email_id: int, session: session_dependency, user: amp_viewer_dependency +) -> EmailResponse: + """ + Retrieve an email by ID from the database. + """ + return simple_get_by_id(session, Email, email_id) + + +@router.get("/phone", summary="Get all phones") +async def get_phones( + session: session_dependency, user: amp_viewer_dependency +) -> CustomPage[PhoneResponse]: + """ + Retrieve all phone numbers from the database. + :param session: + :return: + """ + return paginated_all_getter(session, Phone) + + +@router.get("/phone/{phone_id}", summary="Get phone by ID") +async def get_phone_by_id( + phone_id: int, session: session_dependency, user: amp_viewer_dependency +) -> PhoneResponse: + """ + Retrieve a phone by ID from the database. + """ + return simple_get_by_id(session, Phone, phone_id) + + +@router.get("/address", summary="Get all addresses") +async def get_addresses( + session: session_dependency, user: amp_viewer_dependency +) -> CustomPage[AddressResponse]: + """ + Retrieve all addresses from the database. + :param session: + :return: + """ + return paginated_all_getter(session, Address) + + +@router.get("/address/{address_id}", summary="Get address by ID") +async def get_address_by_id( + address_id: int, session: session_dependency, user: amp_viewer_dependency +) -> AddressResponse: + """ + Retrieve an address by ID from the database. + """ + return simple_get_by_id(session, Address, address_id) + + +# @router.get("/thing-association", summary="Get all thing-contact associations") +# async def get_thing_contact_associations( +# session: session_dependency, user: amp_viewer_dependency +# ) -> CustomPage[ThingContactAssociationResponse]: +# """ +# Retrieve all thing-contact associations from the database. +# :param session: +# :return: +# """ +# return paginated_all_getter(session, ThingContactAssociation) + + +# @router.get( +# "/thing-association/{thing_contact_association_id}", +# summary="Get thing-contact association by ID", +# ) +# async def get_thing_contact_association_by_id( +# thing_contact_association_id: int, +# session: session_dependency, +# user: amp_viewer_dependency, +# ) -> ThingContactAssociationResponse: +# """ +# Retrieve a thing-contact association by ID from the database. +# """ +# return simple_get_by_id( +# session, ThingContactAssociation, thing_contact_association_id +# ) @router.get("", summary="Get contacts") async def get_contacts( session: session_dependency, + user: amp_viewer_dependency, sort: str = None, order: str = None, filter_: str = Query(alias="filter", default=None), @@ -169,59 +447,120 @@ async def get_contacts( @router.get("/{contact_id}", summary="Get contact by ID") async def get_contact_by_id( - contact_id: int, session: session_dependency + contact_id: int, session: session_dependency, user: amp_viewer_dependency ) -> ContactResponse: """ Retrieve a contact by ID from the database. """ - contact = simple_get_by_id(session, Contact, contact_id) - if not contact: - return {"message": "Contact not found"} - return contact + return simple_get_by_id(session, Contact, contact_id) @router.get("/{contact_id}/email", summary="Get contact emails") async def get_contact_emails( - contact_id: int, session: session_dependency + contact_id: int, session: session_dependency, user: amp_viewer_dependency ) -> CustomPage[EmailResponse]: """ Retrieve all emails associated with a contact. """ contact = simple_get_by_id(session, Contact, contact_id) - if not contact: - return {"message": "Contact not found"} - - sql = select(Email).where(Email.contact_id == contact_id) - + sql = select(Email).where(Email.contact_id == contact.id) return paginate(query=sql, conn=session) @router.get("/{contact_id}/phone", summary="Get contact phones") async def get_contact_phones( - contact_id: int, session: session_dependency + contact_id: int, session: session_dependency, user: amp_viewer_dependency ) -> CustomPage[PhoneResponse]: """ Retrieve all phone numbers associated with a contact. """ contact = simple_get_by_id(session, Contact, contact_id) - if not contact: - return {"message": "Contact not found"} - sql = select(Phone).where(Phone.contact_id == contact_id) + sql = select(Phone).where(Phone.contact_id == contact.id) return paginate(query=sql, conn=session) @router.get("/{contact_id}/address", summary="Get contact addresses") async def get_contact_addresses( - contact_id: int, session: session_dependency + contact_id: int, session: session_dependency, user: amp_viewer_dependency ) -> CustomPage[AddressResponse]: """ Retrieve all addresses associated with a contact. """ contact = simple_get_by_id(session, Contact, contact_id) - if not contact: - return {"message": "Contact not found"} - sql = select(Address).where(Address.contact_id == contact_id) + sql = select(Address).where(Address.contact_id == contact.id) return paginate(query=sql, conn=session) +# @router.get("/{contact_id}/thing-association", summary="Get contact's things") +# async def get_contact_thing_associations( +# contact_id: int, session: session_dependency, user: amp_viewer_dependency +# ) -> CustomPage[ThingContactAssociationResponse]: +# """ +# Retrieve all thing-contact associations for a contact. +# """ +# contact = simple_get_by_id(session, Contact, contact_id) +# sql = select(ThingContactAssociation).where( +# ThingContactAssociation.contact_id == contact.id +# ) +# return paginate(query=sql, conn=session) + + +# DELETE ======================================================================= + + +@router.delete("/email/{email_id}", summary="Delete contact email") +def delete_contact_email( + email_id: int, session: session_dependency, user: amp_admin_dependency +): + """ + Delete a contact email by ID from the database. + """ + return model_deleter(session, Email, email_id) + + +@router.delete("/phone/{phone_id}", summary="Delete contact phone") +def delete_contact_phone( + phone_id: int, session: session_dependency, user: amp_admin_dependency +): + """ + Delete a contact phone by ID from the database. + """ + return model_deleter(session, Phone, phone_id) + + +@router.delete("/address/{address_id}", summary="Delete contact address") +def delete_contact_address( + address_id: int, session: session_dependency, user: amp_admin_dependency +): + """ + Delete a contact address by ID from the database. + """ + return model_deleter(session, Address, address_id) + + +# @router.delete( +# "/thing-association/{thing_contact_association_id}", +# summary="Delete contact thing association", +# ) +# def delete_contact_thing_association( +# thing_contact_association_id: int, +# session: session_dependency, +# user: amp_admin_dependency, +# ): +# """ +# Delete a contact's thing association from the database +# """ +# return model_deleter(session, ThingContactAssociation, thing_contact_association_id) + + +@router.delete("/{contact_id}", summary="Delete contact") +def delete_contact( + contact_id: int, session: session_dependency, user: amp_admin_dependency +): + """ + Delete a contact by ID from the database. + """ + return model_deleter(session, Contact, contact_id) + + # ============= EOF ============================================= diff --git a/api/sample.py b/api/sample.py index cfb5aad7c..9a3755093 100644 --- a/api/sample.py +++ b/api/sample.py @@ -14,7 +14,7 @@ # limitations under the License. # =============================================================================== -from fastapi import APIRouter, Depends, Query, HTTPException, Response +from fastapi import APIRouter, Depends, Query, Response from sqlalchemy.exc import IntegrityError, ProgrammingError from sqlalchemy.orm import Session from starlette.status import HTTP_201_CREATED, HTTP_409_CONFLICT @@ -28,6 +28,7 @@ from schemas.sample import SampleResponse, CreateSample, UpdateSample from services.query_helper import paginated_all_getter, simple_get_by_id from services.crud_helper import model_patcher, model_deleter +from services.exceptions_helper import PydanticStyleException router = APIRouter( prefix="/sample", @@ -46,16 +47,24 @@ def database_error_handler( error_message == 'duplicate key value violates unique constraint "sample_field_sample_id_key"' ): - detail = ( - f"Sample with field_sample_id {payload.field_sample_id} already exists." - ) + detail = { + "loc": ["body", "field_sample_id"], + "msg": f"Sample with field_sample_id {payload.field_sample_id} already exists.", + "type": "value_error", + "input": {"field_sample_id": payload.field_sample_id}, + } elif ( error_message == 'insert or update on table "sample" violates foreign key constraint "sample_thing_id_fkey"' ): - detail = f"Thing with ID {payload.thing_id} does not exist." - - raise HTTPException(status_code=HTTP_409_CONFLICT, detail=detail) + detail = { + "loc": ["body", "thing_id"], + "msg": f"Thing with ID {payload.thing_id} does not exist.", + "type": "value_error", + "input": {"thing_id": payload.thing_id}, + } + + raise PydanticStyleException(status_code=HTTP_409_CONFLICT, detail=[detail]) # ============= Post ============================================= diff --git a/api/sensor.py b/api/sensor.py index bf908c080..da772c6f5 100644 --- a/api/sensor.py +++ b/api/sensor.py @@ -63,16 +63,18 @@ def update_sensor( existing_datetime_removed is not None and sensor_data.datetime_installed >= existing_datetime_removed ): - raise PydanticStyleException( - status_code=status.HTTP_409_CONFLICT, - loc=["body", "datetime_installed"], - msg=f"new datetime installed must be before existing datetime removed of {existing_datetime_removed.isoformat().replace('+00:00', 'Z')}", - type="value_error", - input={ + detail = { + "loc": ["body", "datetime_installed"], + "msg": f"new datetime installed must be before existing datetime removed of {existing_datetime_removed.isoformat().replace('+00:00', 'Z')}", + "type": "value_error", + "input": { "datetime_installed": sensor_data.datetime_installed.isoformat().replace( "+00:00", "Z" ) }, + } + raise PydanticStyleException( + status_code=status.HTTP_409_CONFLICT, detail=[detail] ) elif ( sensor_data.datetime_installed is None @@ -81,16 +83,18 @@ def update_sensor( sensor = simple_get_by_id(session, Sensor, sensor_id) existing_datetime_installed = sensor.datetime_installed if sensor_data.datetime_removed <= existing_datetime_installed: - raise PydanticStyleException( - status_code=status.HTTP_409_CONFLICT, - loc=["body", "datetime_removed"], - msg=f"new datetime removed must be after existing datetime installed of {existing_datetime_installed.isoformat().replace('+00:00', 'Z')}", - type="value_error", - input={ + detail = { + "loc": ["body", "datetime_removed"], + "msg": f"new datetime removed must be after existing datetime installed of {existing_datetime_installed.isoformat().replace('+00:00', 'Z')}", + "type": "value_error", + "input": { "datetime_removed": sensor_data.datetime_removed.isoformat().replace( "+00:00", "Z" ) }, + } + raise PydanticStyleException( + status_code=status.HTTP_409_CONFLICT, detail=[detail] ) return model_patcher(session, Sensor, sensor_id, sensor_data) diff --git a/db/contact.py b/db/contact.py index 20833c679..8d910a879 100644 --- a/db/contact.py +++ b/db/contact.py @@ -22,8 +22,12 @@ class ThingContactAssociation(Base, AutoBaseMixin): - thing_id = Column(Integer, ForeignKey("thing.id"), primary_key=True) - contact_id = Column(Integer, ForeignKey("contact.id"), primary_key=True) + thing_id = Column( + Integer, ForeignKey("thing.id", ondelete="CASCADE"), nullable=False + ) + contact_id = Column( + Integer, ForeignKey("contact.id", ondelete="CASCADE"), nullable=False + ) contact = relationship("Contact") thing = relationship("Thing") @@ -33,9 +37,9 @@ class Contact(Base, AutoBaseMixin): name = Column(String(100), nullable=False) role = lexicon_term(nullable=False) - phones = relationship("Phone", back_populates="contact") - emails = relationship("Email", back_populates="contact") - addresses = relationship("Address", back_populates="contact") + phones = relationship("Phone", back_populates="contact", passive_deletes=True) + emails = relationship("Email", back_populates="contact", passive_deletes=True) + addresses = relationship("Address", back_populates="contact", passive_deletes=True) search_vector = Column(TSVectorType("name", "role")) @@ -49,6 +53,7 @@ class Contact(Base, AutoBaseMixin): "ThingContactAssociation", back_populates="contact", cascade="all, delete-orphan", + passive_deletes=True, ) things = association_proxy("thing_associations", "thing") @@ -60,7 +65,7 @@ class Phone(Base, AutoBaseMixin): phone_number = Column(String(20), nullable=False) phone_type = lexicon_term(nullable=False) - contact = relationship("Contact", back_populates="phones") + contact = relationship("Contact", back_populates="phones", passive_deletes=True) search_vector = Column(TSVectorType("phone_number")) @@ -71,7 +76,7 @@ class Email(Base, AutoBaseMixin): email = Column(String(100), nullable=False) email_type = lexicon_term(nullable=False) - contact = relationship("Contact", back_populates="emails") + contact = relationship("Contact", back_populates="emails", passive_deletes=True) search_vector = Column(TSVectorType("email")) @@ -88,7 +93,7 @@ class Address(Base, AutoBaseMixin): country = lexicon_term(nullable=False, default="United States") address_type = lexicon_term(nullable=False) - contact = relationship("Contact", back_populates="addresses") + contact = relationship("Contact", back_populates="addresses", passive_deletes=True) search_vector = Column( TSVectorType( "address_line_1", diff --git a/db/thing.py b/db/thing.py index a363b9c67..7ebd20f30 100644 --- a/db/thing.py +++ b/db/thing.py @@ -15,7 +15,7 @@ # =============================================================================== from sqlalchemy import Integer, ForeignKey, String, Column, Float from sqlalchemy.ext.associationproxy import association_proxy -from sqlalchemy.orm import relationship, mapped_column, declared_attr +from sqlalchemy.orm import relationship, mapped_column from sqlalchemy_utils import TSVectorType from db import lexicon_term @@ -45,6 +45,14 @@ class Thing(Base, AutoBaseMixin, ReleaseMixin): ) locations = association_proxy("location_associations", "location") + contact_associations = relationship( + "ThingContactAssociation", + back_populates="thing", + overlaps="contacts", + cascade="all, delete-orphan", + ) + contacts = association_proxy("contact_associations", "contact") + # Well fields well_depth = Column( Float, diff --git a/schemas/contact.py b/schemas/contact.py index 522abf153..c8ff04439 100644 --- a/schemas/contact.py +++ b/schemas/contact.py @@ -13,73 +13,84 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -from typing import Optional, List +from typing import List import phonenumbers from email_validator import validate_email, EmailNotValidError from phonenumbers import NumberParseException -from pydantic import field_validator, BaseModel, AwareDatetime +from pydantic import field_validator, BaseModel from schemas import ORMBaseModel from schemas.thing import ThingResponse -""" -REFACTOR TODO -Create common validator classes to be shared amongst create and update schemas. -Since many fields are optional in the update schemas set check_fields=False in the field_validator. -""" +# -------- VALIDATORS ---------- + + +class ValidateEmail(BaseModel): + + email: str | None = None + + @field_validator("email", check_fields=False) + @classmethod + def validate_email(cls, email: str | None) -> str | None: + if email is not None: + try: + emailinfo = validate_email(email, check_deliverability=False) + return emailinfo.normalized + except EmailNotValidError as e: + raise ValueError(f"Invalid email format. {email}") + + +class ValidatePhone(BaseModel): + + phone_number: str | None = None + + @field_validator("phone_number", check_fields=False) + @classmethod + def validate_phone(cls, phone_number_str: str | None) -> str | None: + if phone_number_str is not None: + region = "US" + try: + parsed_number = phonenumbers.parse(phone_number_str, region) + if phonenumbers.is_valid_number(parsed_number): + formatted_number = phonenumbers.format_number( + parsed_number, phonenumbers.PhoneNumberFormat.E164 + ) + return formatted_number + else: + raise ValueError(f"Invalid phone number. {phone_number_str}") + except NumberParseException as e: + raise ValueError(f"Invalid phone number. {phone_number_str}") # -------- CREATE ---------- -class CreateEmail(BaseModel): +class CreateEmail(ValidateEmail): """ Schema for creating an email. """ + contact_id: int | None = None # set to None for when made via POST /contact email: str email_type: str = "Primary" # Default to 'Primary' - @field_validator("email") - @classmethod - def validate_email(cls, email): - try: - emailinfo = validate_email(email, check_deliverability=False) - return emailinfo.normalized - except EmailNotValidError as e: - raise ValueError(f"Invalid email format. {email}") - -class CreatePhone(BaseModel): +class CreatePhone(ValidatePhone): """ Schema for creating a phone number. """ + contact_id: int | None = None # set to None for when made via POST /contact phone_number: str phone_type: str = "Primary" # Default to 'Primary' - @field_validator("phone_number", mode="before") - @classmethod - def validate_phone(cls, phone_number_str): - region = "US" - try: - parsed_number = phonenumbers.parse(phone_number_str, region) - if phonenumbers.is_valid_number(parsed_number): - formatted_number = phonenumbers.format_number( - parsed_number, phonenumbers.PhoneNumberFormat.E164 - ) - return formatted_number - else: - raise ValueError(f"Invalid phone number. {phone_number_str}") - except NumberParseException as e: - raise ValueError(f"Invalid phone number. {phone_number_str}") - class CreateAddress(BaseModel): """ Schema for creating an address. """ + contact_id: int | None = None # set to None for when made via POST /contact # todo: use a postal API to validate address and suggest corrections address_line_1: str # Required (e.g., "123 Main St") address_line_2: str | None = None # Optional (e.g., "Apt 4B", "Suite 200") @@ -91,6 +102,15 @@ class CreateAddress(BaseModel): address_type: str = "Primary" +class CreateThingAssociation(BaseModel): + """ + Schema for creating a ContactThingAssociation + """ + + contact_id: int + thing_id: int + + class CreateContact(BaseModel): """ Schema for creating a contact. @@ -108,73 +128,37 @@ class CreateContact(BaseModel): addresses: list[CreateAddress] | None = None -# -# -# @field_validator("phone", mode="before") -# @classmethod -# def validate_phone(cls, phone_number_str): -# region = "US" -# try: -# parsed_number = phonenumbers.parse(phone_number_str, region) -# if phonenumbers.is_valid_number(parsed_number): -# # You can also format the number if needed -# formatted_number = phonenumbers.format_number( -# parsed_number, phonenumbers.PhoneNumberFormat.E164 -# ) -# return formatted_number -# else: -# raise ValueError(f"Invalid phone number. {phone_number_str}") -# except NumberParseException as e: -# raise ValueError(f"Invalid phone number. {phone_number_str}") -# -# @field_validator("email") -# @classmethod -# def validate_email(cls, email): -# # try: -# # Check that the email address is valid. Turn on check_deliverability -# # for first-time validations like on account creation pages (but not -# # login pages). -# emailinfo = validate_email(email, check_deliverability=False) -# -# # After this point, use only the normalized form of the email address, -# # especially before going to a database query. -# email = emailinfo.normalized -# return email -# # except EmailNotValidError as e: -# # if v is not None: -# # # Basic email validation -# # if not re.fullmatch(r"[^@]+@[^@]+\.[^@]+", v): -# # raise ValueError(f"Invalid email format. {v}") -# # return v +# -------- RESPONSE ---------- -# -------- RESPONSE ---------- -class PhoneResponse(ORMBaseModel): +class BaseItemResponse(ORMBaseModel): + id: int + contact_id: int + + +class PhoneResponse(BaseItemResponse): """ Response schema for phone details. """ - id: int phone_number: str phone_type: str # e.g., 'mobile', 'landline', etc. -class EmailResponse(ORMBaseModel): +class EmailResponse(BaseItemResponse): """ Response schema for email details. """ - id: int email: str email_type: str # e.g., 'personal', 'work', etc. -class AddressResponse(ORMBaseModel): +class AddressResponse(BaseItemResponse): """ Response schema for address details. """ - id: int address_line_1: str address_line_2: str | None = None city: str @@ -192,45 +176,54 @@ class ContactResponse(ORMBaseModel): id: int name: str role: str - created_at: AwareDatetime emails: List[EmailResponse] = [] phones: List[PhoneResponse] = [] addresses: List[AddressResponse] = [] things: List[ThingResponse] = [] # List of related things +class ThingContactAssociationResponse(ORMBaseModel): + """ + Response schema for thing-contact association details. + """ + + id: int + thing_id: int + contact_id: int + + # -------- UPDATE ---------- class UpdateContact(BaseModel): """ Schema for updating contact information. """ - name: Optional[str] = None - role: Optional[str] = None - # thing_id: int | None = None + name: str | None = None + role: str | None = None + thing_id: int | None = None # email: str | None = None # phone: str | None = None # address: str | None = None -class UpdateEmail(BaseModel): +class UpdateEmail(ValidateEmail): """ Schema for updating email information. """ - # email: Annotated[Optional[str], None] - # email_type: Annotated[Optional[str], None] - email: Optional[str] = None # None - email_type: Optional[str] = None # None + contact_id: int | None = None + email: str | None = None + email_type: str | None = None -class UpdatePhone(BaseModel): +class UpdatePhone(ValidatePhone): """ Schema for updating phone information. """ - phone_number: Optional[str] = None - phone_type: Optional[str] = None + contact_id: int | None = None + phone_number: str | None = None + phone_type: str | None = None class UpdateAddress(BaseModel): @@ -238,13 +231,23 @@ class UpdateAddress(BaseModel): Schema for updating address information. """ - address_line_1: Optional[str] = None - address_line_2: Optional[str] = None - city: Optional[str] = None - state: Optional[str] = None - postal_code: Optional[str] = None - country: Optional[str] = None - address_type: Optional[str] = None + contact_id: int | None = None + address_line_1: str | None = None + address_line_2: str | None = None + city: str | None = None + state: str | None = None + postal_code: str | None = None + country: str | None = None + address_type: str | None = None + + +class UpdateThingContactAssociation(BaseModel): + """ + Schema for updating thing-contact association information. + """ + + thing_id: int | None = None + contact_id: int | None = None # ============= EOF ============================================= diff --git a/schemas/sample.py b/schemas/sample.py index a9125dbaa..0a86b6620 100644 --- a/schemas/sample.py +++ b/schemas/sample.py @@ -52,6 +52,7 @@ class ValidateSample(BaseModel): # ) # return sample_bottom + sample_date: AwareDatetime | None = None sample_top: float | None = None sample_bottom: float | None = None diff --git a/schemas/thing.py b/schemas/thing.py index ade141b63..69fc0a7b7 100644 --- a/schemas/thing.py +++ b/schemas/thing.py @@ -13,12 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -from datetime import datetime from typing import List -from geoalchemy2 import WKBElement -from geoalchemy2.shape import to_shape -from pydantic import BaseModel, model_validator, field_validator +from pydantic import BaseModel, model_validator from schemas import ORMBaseModel from schemas.location import LocationResponse diff --git a/services/__init__.py b/services/__init__.py index 8e546ddc2..850ec5839 100644 --- a/services/__init__.py +++ b/services/__init__.py @@ -14,4 +14,5 @@ # limitations under the License. # =============================================================================== + # ============= EOF ============================================= diff --git a/services/contact_helper.py b/services/contact_helper.py new file mode 100644 index 000000000..8860da7a2 --- /dev/null +++ b/services/contact_helper.py @@ -0,0 +1,90 @@ +# =============================================================================== +# Copyright 2025 ross +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# =============================================================================== +from db.contact import Contact, Email, Phone, Address, ThingContactAssociation +from schemas.contact import ( + CreateContact, +) +from services.audit_helper import audit_add +from sqlalchemy.orm import Session + + +def add_contact( + session: Session, contact_data: CreateContact | dict, user: dict +) -> Contact: + """ + Add a new contact to the database. + """ + + if isinstance(contact_data, CreateContact): + contact_data = contact_data.model_dump(exclude_unset=True) + + """ + Developer's note + + Rollback if there's an error creating a record in one of the tables so + that orphaned/fractured records are not made + """ + + try: + contact = Contact( + name=contact_data["name"], + role=contact_data["role"], + ) + for e in contact_data.get("emails", []): + email = Email(**e) + contact.emails.append(email) + # session.add(email) + + for p in contact_data.get("phones", []): + phone = Phone(**p) + contact.phones.append(phone) + # session.add(phone) + + for a in contact_data.get("addresses", []): + address = Address(**a) + contact.addresses.append(address) + # session.add(address) + + audit_add(user, contact) + audit_add(user, contact.emails) + audit_add(user, contact.phones) + audit_add(user, contact.addresses) + + session.add(contact) + session.flush() + session.refresh(contact) + + location_contact_association = ThingContactAssociation() + location_contact_association.thing_id = contact_data.get("thing_id") + location_contact_association.contact_id = contact.id + + audit_add(user, location_contact_association) + + session.add(location_contact_association) + # owner_contact_association = OwnerContactAssociation() + # owner_contact_association.owner_id = owner.id + # owner_contact_association.contact_id = contact.id + # session.add(owner_contact_association) + session.flush() + session.commit() + except Exception as e: + session.rollback() + raise e + + return contact + + +# ============= EOF ============================================= diff --git a/services/exceptions_helper.py b/services/exceptions_helper.py index 742f5e2fc..bd83a0263 100644 --- a/services/exceptions_helper.py +++ b/services/exceptions_helper.py @@ -1,4 +1,12 @@ from fastapi import HTTPException +from typing import TypedDict + + +class PydanticDetailDict(TypedDict): + loc: list + msg: str + type: str + input: dict class PydanticStyleException(HTTPException): @@ -10,12 +18,16 @@ class PydanticStyleException(HTTPException): def __init__( self, status_code: int, - loc: list, - msg: str, - type: str, - input: dict, + detail: list[PydanticDetailDict], ): + """ + The detail needs to be a list with the following keys: + "loc": list + "msg": string + "type": string + "input": Any + """ super().__init__( status_code=status_code, - detail=[{"loc": loc, "msg": msg, "type": type, "input": input}], + detail=detail, ) diff --git a/services/people_helper.py b/services/people_helper.py deleted file mode 100644 index 3f250ffaa..000000000 --- a/services/people_helper.py +++ /dev/null @@ -1,69 +0,0 @@ -# =============================================================================== -# Copyright 2025 ross -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# =============================================================================== -from db.contact import Contact, Email, Phone, Address, ThingContactAssociation -from schemas.contact import CreateContact -from sqlalchemy.orm import Session - - -def add_contact( - session: Session, - contact_data: CreateContact | dict, -) -> Contact: - """ - Add a new contact to the database. - """ - - if isinstance(contact_data, CreateContact): - contact_data = contact_data.model_dump(exclude_unset=True) - - contact = Contact( - name=contact_data["name"], - role=contact_data["role"], - ) - for e in contact_data.get("emails", []): - email = Email(**e) - contact.emails.append(email) - # session.add(email) - - for p in contact_data.get("phones", []): - phone = Phone(**p) - contact.phones.append(phone) - # session.add(phone) - - for a in contact_data.get("addresses", []): - address = Address(**a) - contact.addresses.append(address) - # session.add(address) - - session.add(contact) - session.commit() - session.refresh(contact) - - location_contact_association = ThingContactAssociation() - location_contact_association.thing_id = contact_data.get("thing_id") - location_contact_association.contact_id = contact.id - - session.add(location_contact_association) - # owner_contact_association = OwnerContactAssociation() - # owner_contact_association.owner_id = owner.id - # owner_contact_association.contact_id = contact.id - # session.add(owner_contact_association) - session.commit() - - return contact - - -# ============= EOF ============================================= diff --git a/tests/__init__.py b/tests/__init__.py index 5d7220f57..cb3f6b192 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -29,6 +29,19 @@ client = TestClient(app) +def override_authentication(default=True): + """ + Override the authentication dependency for testing purposes. + This allows all users to be considered authenticated. + """ + + def closure(): + # print("Overriding authentication") + return default + + return closure + + def cleanup_post_test(model: Base, new_record_id: int) -> None: """ Function to cleanup POST tests diff --git a/tests/conftest.py b/tests/conftest.py index a0596c671..957540ea2 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -76,3 +76,93 @@ def sample(thing, sensor): yield sample session.close() + + +@pytest.fixture(scope="session") +def contact(thing): + with session_ctx() as session: + contact = Contact( + name="Test Contact", + role="Owner", + ) + session.add(contact) + session.commit() + session.refresh(contact) + + association = ThingContactAssociation(thing_id=thing.id, contact_id=contact.id) + session.add(association) + session.commit() + session.refresh(association) + + yield contact + + session.close() + + +@pytest.fixture(scope="session") +def address(contact): + with session_ctx() as session: + address = Address( + address_line_1="123 Main St", + address_line_2="Apt 4B", + city="Test City", + state="NM", + postal_code="87501", + country="United States", + address_type="Primary", + contact_id=contact.id, + ) + session.add(address) + session.commit() + session.refresh(address) + yield address + + session.close() + + +@pytest.fixture(scope="session") +def email(contact): + with session_ctx() as session: + email = Email( + email="test@example.com", email_type="Primary", contact_id=contact.id + ) + session.add(email) + session.commit() + session.refresh(email) + yield email + + session.close() + + +@pytest.fixture(scope="session") +def phone(contact): + with session_ctx() as session: + phone = Phone( + phone_number="+15051234567", phone_type="Mobile", contact_id=contact.id + ) + session.add(phone) + session.commit() + session.refresh(phone) + yield phone + + session.close() + + +@pytest.fixture(scope="session") +def asset(): + with session_ctx() as session: + asset = Asset( + name="Test Asset", + label="test label", + mime_type="image/png", + size=12345, + storage_service="mock_service", + storage_path="mock/path/to/asset", + uri="https://storage.googleapis.com/mock-bucket/mock-asset", + ) + session.add(asset) + session.commit() + session.refresh(asset) + yield asset + + session.close() diff --git a/tests/test_asset.py b/tests/test_asset.py index d6c00f51e..8c146a17d 100644 --- a/tests/test_asset.py +++ b/tests/test_asset.py @@ -15,7 +15,11 @@ # =============================================================================== from api.asset import get_storage_bucket from core.app import app -from tests import client +from core.dependencies import viewer_function, admin_function, editor_function +from db import Asset +from tests import client, cleanup_post_test, override_authentication + +import pytest class MockBlob: @@ -40,9 +44,23 @@ def mock_storage_bucket(): return MockStorageBucket() -app.dependency_overrides = { - get_storage_bucket: mock_storage_bucket, -} +@pytest.fixture(scope="module", autouse=True) +def override_dependency_fixture(): + app.dependency_overrides = { + get_storage_bucket: mock_storage_bucket, + } + + app.dependency_overrides[viewer_function] = override_authentication() + app.dependency_overrides[admin_function] = override_authentication( + default={"name": "test", "sub": "314159"} + ) + app.dependency_overrides[editor_function] = override_authentication( + default={"name": "test", "sub": "314159"} + ) + + yield + + app.dependency_overrides = {} def test_upload_asset(): @@ -57,16 +75,9 @@ def test_upload_asset(): assert response.status_code == 201 data = response.json() assert "storage_path" in data - # assert data["name"] == "riochama.png" - # assert data["label"] == "riochama.png" - # assert data["storage_service"] == "mock_service" - # assert data["storage_path"] == "mock/path/to/asset" - # assert data["mime_type"] == "image/png" - # assert data["size"] == 12345 - # assert data["url"] == "https://storage.googleapis.com/mock-bucket/mock-asset" -def test_add_asset(location, thing): +def test_add_asset(thing): resp = client.post( "/asset", json={ @@ -80,25 +91,11 @@ def test_add_asset(location, thing): }, ) - print(resp.json()) assert resp.status_code == 201 data = resp.json() assert data["name"] == "riochama.png" - # assert data["label"] == "Test Asset" - # path = "tests/data/riochama.png" - # - # with open(path, "rb") as file: - # response = client.post( - # "/asset", - # params={"thing_id": thing.id}, - # files={"file": ("riochama.png", file, "image/png")}, - # ) - # - # data = response.json() - # assert response.status_code == 201 - # assert data["name"] == "riochama.png" - # url = data["url"] - # assert url.startswith("https://storage.googleapis.com/") + + cleanup_post_test(Asset, data["id"]) def test_add_asset_with_label(thing): @@ -119,28 +116,17 @@ def test_add_asset_with_label(thing): data = resp.json() assert data["name"] == "test_asset.png" assert data["label"] == "Test Asset" - # path = "tests/data/riochama.png" - # - # with open(path, "rb") as file: - # response = client.post( - # "/asset", - # params={'label': 'test label'}, - # files={"file": ("riochama.png", file, "image/png")}, - # ) - # - # assert response.status_code == 201 - # data = response.json() - # assert data["name"] == "riochama.png" - # assert data["label"] == "test label" - - -def test_get_asset(): - response = client.get("/asset/1") + + cleanup_post_test(Asset, data["id"]) + + +def test_get_asset(asset): + response = client.get(f"/asset/{asset.id}") assert response.status_code == 200 data = response.json() - assert data["id"] == 1 - assert data["name"] == "riochama.png" - assert data["uri"] == "https://storage.googleapis.com/mock-bucket/mock-asset" + assert data["id"] == asset.id + assert data["name"] == asset.name + assert data["uri"] == MockBlob().generate_signed_url() def test_get_asset_not_found(): diff --git a/tests/test_contact.py b/tests/test_contact.py index 7657fbb27..dd80ffea6 100644 --- a/tests/test_contact.py +++ b/tests/test_contact.py @@ -1,356 +1,964 @@ -# from fastapi.testclient import TestClient -# from main import app -# from models import Base, engine -# Base.metadata.drop_all(engine) -# Base.metadata.create_all(engine) +from core.dependencies import ( + amp_viewer_function, + amp_editor_function, + amp_admin_function, +) +from db import Contact, Address, Email, Phone +from db.engine import session_ctx +from main import app +from tests import client, cleanup_post_test, cleanup_patch_test, override_authentication +from schemas.contact import ValidateEmail, ValidatePhone + +import pytest +from pydantic import ValidationError +import re + + +@pytest.fixture(scope="module", autouse=True) +def override_authentication_dependency_fixture(): + + app.dependency_overrides[amp_admin_function] = override_authentication( + default={"name": "foobar", "sub": "1234567890"} + ) + app.dependency_overrides[amp_editor_function] = override_authentication( + default={"name": "foobar", "sub": "1234567890"} + ) + app.dependency_overrides[amp_viewer_function] = override_authentication() -# client = TestClient(app) + yield -from tests import client + app.dependency_overrides = {} -# ADD tests ====================================================== +# ============= module & function fixtures ======================================= -def test_add_contact(thing): - response = client.post( - "/contact", - json={ - "name": "Test Contact", - "role": "Owner", - "thing_id": thing.id, - "emails": [{"email": "fasdfasdf@gmail.com", "email_type": "Primary"}], - "phones": [{"phone_number": "+12345678901", "phone_type": "Primary"}], - "addresses": [ - { - "address_line_1": "123 Main St", - "city": "Test City", - "state": "NM", - "postal_code": "87501", - "country": "United States", - "address_type": "Primary", - } - ], - }, - ) - data = response.json() - assert response.status_code == 201 - assert "id" in data - assert data["name"] == "Test Contact" - assert data["role"] == "Owner" +@pytest.fixture(scope="function") +def second_contact(): + with session_ctx() as session: + contact = Contact( + name="Test Second Contact", + role="Owner", + ) + session.add(contact) + session.commit() + session.refresh(contact) - assert len(data["emails"]) == 1 - assert data["emails"][0]["email"] == "fasdfasdf@gmail.com" + yield contact - assert len(data["phones"]) == 1 - assert data["phones"][0]["phone_number"] == "+12345678901" - assert len(data["addresses"]) == 1 - assert data["addresses"][0]["address_line_1"] == "123 Main St" - - # assert data["email"] == "fasdfasdf@gmail.com" - - # for i in range(2, 5): - # response = client.post( - # "/base/contact", - # json={ - # "owner_id": i, - # "name": f"Test Contact {i}", - # "email": f"foo{i}@gmail.com", - # "phone": f"+1234567890{i}", - # }, - # ) - # assert response.status_code == 201 - # data = response.json() - # assert "id" in data - # assert data["name"] == f"Test Contact {i}" - # assert data["email"] == f"foo{i}@gmail.com" - # assert data["phone"] == f"+1234567890{i}" - - -def test_phone_validation_fail(thing): + session.delete(contact) + session.commit() + session.close() + + +@pytest.fixture(scope="function") +def second_email(second_contact): + with session_ctx() as session: + email = Email( + email="testsecondcontact@gmail.com", + email_type="Primary", + contact_id=second_contact.id, + ) + session.add(email) + session.commit() + session.refresh(email) + yield email + session.delete(email) + session.commit() + session.close() + + +@pytest.fixture(scope="function") +def second_phone(second_contact): + with session_ctx() as session: + phone = Phone( + phone_number="123-456-7890", + phone_type="Primary", + contact_id=second_contact.id, + ) + session.add(phone) + session.commit() + session.refresh(phone) + yield phone + session.delete(phone) + session.commit() + session.close() + + +@pytest.fixture(scope="function") +def second_address(second_contact): + with session_ctx() as session: + address = Address( + address_line_1="456 Secondary St", + address_line_2="Apt 12A", + city="Test Metropolis", + state="NM", + postal_code="87501", + country="United States", + address_type="Primary", + contact_id=second_contact.id, + ) + session.add(address) + session.commit() + session.refresh(address) + yield address + session.delete(address) + session.commit() + session.close() + + +# @pytest.fixture(scope="function") +# def second_thing_contact_association(thing, second_contact): +# with session_ctx() as session: +# association = ThingContactAssociation( +# thing_id=thing.id, contact_id=second_contact.id +# ) +# session.add(association) +# session.commit() +# session.refresh(association) +# yield association +# session.delete(association) +# session.commit() +# session.close() + + +# VALIDATION tests ============================================================= + + +def test_validate_phone(): for phone in [ "definitely not a phone", - # "1234567890", - # "123-456-7890", - # "123-456-78901", - # "123-4567-890", "123-456-789a", "123-456-7890x1234", "123.456.7890", "(123) 456-7890", ]: + pattern = re.escape(f"Value error, Invalid phone number. {phone}") + with pytest.raises(ValidationError, match=pattern): + ValidatePhone(phone_number=phone, phone_type="Primary") - response = client.post( - "/contact", - json={ - "name": "Test Contact 2", - "thing_id": thing.id, - "role": "Primary", - "emails": [{"email": "fasdfasdf@gmail.com", "email_type": "Primary"}], - "phones": [{"phone_number": phone, "phone_type": "Primary"}], - "addresses": [ - { - "address_line_1": "123 Main St", - "city": "Test City", - "state": "NM", - "postal_code": "87501", - "country": "United States", - "address_type": "Primary", - } - ], - }, - ) - data = response.json() - assert response.status_code == 422 - assert "detail" in data, "Expected 'detail' in response" - assert len(data["detail"]) == 1, "Expected 1 error in response" - detail = data["detail"][0] - assert detail["msg"] == f"Value error, Invalid phone number. {phone}" - - -def test_email_validation_fail(thing): +def test_validate_email(): for email in [ - "", "invalid-email", - "invalid@domain", - "invalid@domain.", - "@domain.com", + "user@.com", + "user@domain..com", ]: - response = client.post( - "/contact", - json={ - "name": "Test ContactX", - "thing_id": thing.id, - "role": "Primary", - "emails": [{"email": email, "email_type": "Primary"}], - "phones": [{"phone_number": "+12345678901", "phone_type": "Primary"}], - "addresses": [ - { - "address_line_1": "123 Main St", - "city": "Test City", - "state": "NM", - "postal_code": "87501", - "country": "United States", - "address_type": "Primary", - } - ], - }, - ) - data = response.json() - assert response.status_code == 422 - assert "detail" in data, "Expected 'detail' in response" - assert len(data["detail"]) == 1, "Expected 1 error in response" - detail = data["detail"][0] - assert detail["msg"] == f"Value error, Invalid email format. {email}" + pattern = re.escape(f"Value error, Invalid email format. {email}") + with pytest.raises(ValidationError, match=pattern): + ValidateEmail(email=email) -# GET tests ====================================================== +# ADD tests ==================================================================== -# def test_get_locations(): -# response = client.get("/base/location") -# assert response.status_code == 200 -# assert len(response.json()) > 0 +def test_add_contact(thing): + payload = { + "name": "Test Contact 2", + "role": "Owner", + "thing_id": thing.id, + "emails": [{"email": "testcontact2@gmail.com", "email_type": "Primary"}], + "phones": [{"phone_number": "+14153334444", "phone_type": "Primary"}], + "addresses": [ + { + "address_line_1": "123 Default St", + "address_line_2": "Apt 8R", + "city": "Test Metropolis", + "state": "NM", + "postal_code": "87501", + "country": "United States", + "address_type": "Primary", + } + ], + } + response = client.post("/contact", json=payload) + data = response.json() + assert response.status_code == 201 + assert "id" in data + assert data["name"] == payload["name"] + assert data["role"] == payload["role"] + assert len(data["emails"]) == 1 + assert data["emails"][0]["contact_id"] == data["id"] + assert data["emails"][0]["email"] == payload["emails"][0]["email"] + assert data["emails"][0]["email_type"] == payload["emails"][0]["email_type"] + + assert len(data["phones"]) == 1 + assert data["phones"][0]["contact_id"] == data["id"] + assert data["phones"][0]["phone_number"] == payload["phones"][0]["phone_number"] + assert data["phones"][0]["phone_type"] == payload["phones"][0]["phone_type"] + + assert len(data["addresses"]) == 1 + assert data["addresses"][0]["contact_id"] == data["id"] + assert ( + data["addresses"][0]["address_line_1"] + == payload["addresses"][0]["address_line_1"] + ) + assert ( + data["addresses"][0]["address_line_2"] + == payload["addresses"][0]["address_line_2"] + ) + assert data["addresses"][0]["city"] == payload["addresses"][0]["city"] + assert data["addresses"][0]["state"] == payload["addresses"][0]["state"] + assert data["addresses"][0]["postal_code"] == payload["addresses"][0]["postal_code"] + assert data["addresses"][0]["country"] == payload["addresses"][0]["country"] + assert ( + data["addresses"][0]["address_type"] == payload["addresses"][0]["address_type"] + ) -def test_get_contacts(): + cleanup_post_test(Contact, data["id"]) + + +def test_add_contact_409_bad_thing_id(): + bad_thing_id = 9999 + payload = { + "name": "Test Contact 3", + "role": "Owner", + "thing_id": bad_thing_id, + "emails": [{"email": "testcontact3@gmail.com", "email_type": "Primary"}], + "phones": [{"phone_number": "+14153334445", "phone_type": "Primary"}], + "addresses": [ + { + "address_line_1": "123 Default St", + "address_line_2": "Apt 8R", + "city": "Test Metropolis", + "state": "NM", + "postal_code": "87501", + "country": "United States", + "address_type": "Primary", + } + ], + } + response = client.post("/contact", json=payload) + assert response.status_code == 409 + data = response.json() + assert data["detail"][0]["msg"] == f"Thing with ID {bad_thing_id} not found." + + +def test_add_address(contact): + payload = { + "contact_id": contact.id, + "address_line_1": "456 Secondary St", + "address_line_2": "Apt 12A", + "city": "Test Metropolis", + "state": "NM", + "postal_code": "87502", + "country": "United States", + "address_type": "Primary", + } + response = client.post("/contact/address", json=payload) + data = response.json() + assert response.status_code == 201 + assert "id" in data + assert data["contact_id"] == contact.id + assert data["address_line_1"] == payload["address_line_1"] + assert data["address_line_2"] == payload["address_line_2"] + assert data["city"] == payload["city"] + assert data["state"] == payload["state"] + assert data["postal_code"] == payload["postal_code"] + assert data["country"] == payload["country"] + assert data["address_type"] == payload["address_type"] + + cleanup_post_test(Address, data["id"]) + + +def test_add_address_409_contact_not_found(contact): + bad_contact_id = 9999 + payload = { + "contact_id": bad_contact_id, + "address_line_1": "456 Secondary St", + "address_line_2": "Apt 12A", + "city": "Test Metropolis", + "state": "NM", + "postal_code": "87502", + "country": "United States", + "address_type": "Secondary", + } + response = client.post("/contact/address", json=payload) + assert response.status_code == 409 + data = response.json() + assert data["detail"][0]["msg"] == f"Contact with ID {bad_contact_id} not found." + assert data["detail"][0]["loc"] == ["body", "contact_id"] + assert data["detail"][0]["type"] == "value_error" + assert data["detail"][0]["input"] == {"contact_id": bad_contact_id} + + +def test_add_email(contact): + payload = { + "contact_id": contact.id, + "email": "anothertestemail@nmt.edu", + "email_type": "Primary", + } + response = client.post("/contact/email", json=payload) + data = response.json() + assert response.status_code == 201 + assert "id" in data + assert data["contact_id"] == contact.id + assert data["email"] == payload["email"] + assert data["email_type"] == payload["email_type"] + + cleanup_post_test(Email, data["id"]) + + +def test_add_email_409_contact_not_found(contact): + bad_contact_id = 9999 + payload = { + "contact_id": bad_contact_id, + "email": "anothertestemail@nmt.edu", + "email_type": "Primary", + } + response = client.post("/contact/email", json=payload) + assert response.status_code == 409 + data = response.json() + assert data["detail"][0]["msg"] == f"Contact with ID {bad_contact_id} not found." + assert data["detail"][0]["loc"] == ["body", "contact_id"] + assert data["detail"][0]["type"] == "value_error" + assert data["detail"][0]["input"] == {"contact_id": bad_contact_id} + + +def test_add_phone(contact): + payload = { + "contact_id": contact.id, + "phone_number": "+12345678901", + "phone_type": "Primary", + } + response = client.post("/contact/phone", json=payload) + data = response.json() + assert response.status_code == 201 + assert "id" in data + assert data["contact_id"] == contact.id + assert data["phone_number"] == payload["phone_number"] + assert data["phone_type"] == payload["phone_type"] + + cleanup_post_test(Phone, data["id"]) + + +def test_add_phone_409_contact_not_found(contact): + bad_contact_id = 9999 + payload = { + "contact_id": bad_contact_id, + "phone_number": "+12345678901", + "phone_type": "Primary", + } + response = client.post("/contact/phone", json=payload) + assert response.status_code == 409 + data = response.json() + assert data["detail"][0]["msg"] == f"Contact with ID {bad_contact_id} not found." + assert data["detail"][0]["loc"] == ["body", "contact_id"] + assert data["detail"][0]["type"] == "value_error" + assert data["detail"][0]["input"] == {"contact_id": bad_contact_id} + + +# def test_add_thing_association(thing, second_contact): +# payload = {"thing_id": thing.id} +# response = client.post( +# f"/contact/{second_contact.id}/thing-association", json=payload +# ) +# data = response.json() +# assert response.status_code == 201 +# assert "id" in data +# assert data["thing_id"] == thing.id +# assert data["contact_id"] == second_contact.id + +# cleanup_post_test(ThingContactAssociation, data["id"]) + + +# def test_add_thing_association_404_contact_not_found(contact, thing): +# bad_contact_id = 99999 +# payload = {"thing_id": thing.id} +# response = client.post(f"/contact/{bad_contact_id}/thing-association", json=payload) +# assert response.status_code == 404 +# data = response.json() +# assert data["detail"] == f"Contact with ID {bad_contact_id} not found." + + +# def test_add_thing_association_409_thing_not_found(thing, contact): +# bad_thing_id = 9999 +# payload = {"thing_id": bad_thing_id} +# response = client.post(f"/contact/{contact.id}/thing-association", json=payload) +# assert response.status_code == 409 +# data = response.json() +# assert data["detail"][0]["msg"] == f"Thing with ID {bad_thing_id} not found." +# assert data["detail"][0]["loc"] == ["body", "thing_id"] +# assert data["detail"][0]["type"] == "value_error" +# assert data["detail"][0]["input"] == {"thing_id": bad_thing_id} + + +# GET tests ====================================================== + + +def test_get_contacts(contact, email, address, phone): response = client.get("/contact") assert response.status_code == 200 + data = response.json() + assert data["total"] == 1 + assert data["items"][0]["id"] == contact.id + assert data["items"][0]["name"] == contact.name + assert data["items"][0]["role"] == contact.role + + assert len(data["items"][0]["emails"]) == 1 + assert data["items"][0]["emails"][0]["id"] == email.id + assert data["items"][0]["emails"][0]["contact_id"] == email.contact_id + assert data["items"][0]["emails"][0]["email"] == email.email + assert data["items"][0]["emails"][0]["email_type"] == email.email_type + + assert len(data["items"][0]["phones"]) == 1 + assert data["items"][0]["phones"][0]["id"] == phone.id + assert data["items"][0]["phones"][0]["contact_id"] == phone.contact_id + assert data["items"][0]["phones"][0]["phone_number"] == phone.phone_number + assert data["items"][0]["phones"][0]["phone_type"] == phone.phone_type + + assert len(data["items"][0]["addresses"]) == 1 + assert data["items"][0]["addresses"][0]["id"] == address.id + assert data["items"][0]["addresses"][0]["contact_id"] == address.contact_id + assert data["items"][0]["addresses"][0]["address_line_1"] == address.address_line_1 + assert data["items"][0]["addresses"][0]["address_line_2"] == address.address_line_2 + assert data["items"][0]["addresses"][0]["city"] == address.city + assert data["items"][0]["addresses"][0]["state"] == address.state + assert data["items"][0]["addresses"][0]["postal_code"] == address.postal_code + assert data["items"][0]["addresses"][0]["country"] == address.country + assert data["items"][0]["addresses"][0]["address_type"] == address.address_type + + +def test_get_contact_by_id(contact, email, address, phone): + response = client.get(f"/contact/{contact.id}") + assert response.status_code == 200 + data = response.json() + assert data["id"] == contact.id + assert data["name"] == contact.name + assert data["role"] == contact.role + + assert len(data["emails"]) == 1 + assert data["emails"][0]["id"] == email.id + assert data["emails"][0]["contact_id"] == email.contact_id + assert data["emails"][0]["email"] == email.email + assert data["emails"][0]["email_type"] == email.email_type + + assert len(data["phones"]) == 1 + assert data["phones"][0]["id"] == phone.id + assert data["phones"][0]["contact_id"] == phone.contact_id + assert data["phones"][0]["phone_number"] == phone.phone_number + assert data["phones"][0]["phone_type"] == phone.phone_type + assert len(data["addresses"]) == 1 + assert data["addresses"][0]["id"] == address.id + assert data["addresses"][0]["contact_id"] == address.contact_id + assert data["addresses"][0]["address_line_1"] == address.address_line_1 + assert data["addresses"][0]["address_line_2"] == address.address_line_2 + assert data["addresses"][0]["city"] == address.city + assert data["addresses"][0]["state"] == address.state + assert data["addresses"][0]["postal_code"] == address.postal_code + assert data["addresses"][0]["country"] == address.country + assert data["addresses"][0]["address_type"] == address.address_type + + +def test_get_contact_by_id_404_not_found(contact): + bad_contact_id = 99999 + response = client.get(f"/contact/{bad_contact_id}") data = response.json() - assert "items" in data, "Expected 'items' in response" - items = data["items"] - assert isinstance(items, list), "'items' should be a list" - assert len(items) > 0, "'items' should not be empty" - item = items[0] - assert "id" in item, "Expected 'id' in contact item" - assert "name" in item, "Expected 'name' in contact item" - assert "role" in item, "Expected 'role' in contact item" - assert "emails" in item, "Expected 'emails' in contact item" - assert "phones" in item, "Expected 'phones' in contact item" - assert "addresses" in item, "Expected 'addresses' in contact item" - assert isinstance(item["emails"], list), "'emails' should be a list" - assert isinstance(item["phones"], list), "'phones' should be a list" - assert isinstance(item["addresses"], list), "'addresses' should be a list" - assert len(item["emails"]) == 1, "'emails' should not be empty" - assert len(item["phones"]) == 1, "'phones' should not be empty" - assert len(item["addresses"]) == 1, "'addresses' should not be empty" - - # print(response.json()) - # assert len(response.json()) > 0 - - -def test_get_email_by_contact_id(): - response = client.get("/contact/1/email") + assert response.status_code == 404 + assert data["detail"] == f"Contact with ID {bad_contact_id} not found." + + +def test_get_contact_emails(contact, email): + response = client.get(f"/contact/{contact.id}/email") assert response.status_code == 200 data = response.json() - assert isinstance(data, dict), "Expected a paginated response" - assert "items" in data, "Expected 'items' in response" - data = data["items"] - assert len(data) == 1, "Expected one phone number" - email = data[0] - assert "id" in email, "Expected 'id' in email item" - assert "email" in email, "Expected 'email' in email item" - assert "email_type" in email, "Expected 'email_type' in email item" + assert data["total"] == 1 + assert data["items"][0]["id"] == email.id + assert data["items"][0]["contact_id"] == email.contact_id + assert data["items"][0]["email"] == email.email + assert data["items"][0]["email_type"] == email.email_type + +def test_get_contact_emails_404_contact_not_found(contact, email): + bad_contact_id = 99999 + response = client.get(f"/contact/{bad_contact_id}/email") + data = response.json() + assert response.status_code == 404 + assert data["detail"] == f"Contact with ID {bad_contact_id} not found." -def test_get_phone_by_contact_id(): - response = client.get("/contact/1/phone") + +def test_get_contact_phones(contact, phone): + response = client.get(f"/contact/{contact.id}/phone") assert response.status_code == 200 data = response.json() - assert isinstance(data, dict), "Expected a paginated response" - assert "items" in data, "Expected 'items' in response" - data = data["items"] - assert len(data) == 1, "Expected one phone number" - phone = data[0] - assert "id" in phone, "Expected 'id' in phone item" - assert "phone_number" in phone, "Expected 'phone_number' in phone item" - assert "phone_type" in phone, "Expected 'phone_type' in phone item" + assert data["total"] == 1 + assert data["items"][0]["id"] == phone.id + assert data["items"][0]["contact_id"] == phone.contact_id + assert data["items"][0]["phone_number"] == phone.phone_number + assert data["items"][0]["phone_type"] == phone.phone_type + + +def test_get_contact_phones_404_contact_not_found(contact, phone): + bad_contact_id = 99999 + response = client.get(f"/contact/{bad_contact_id}/phone") + data = response.json() + assert response.status_code == 404 + assert data["detail"] == f"Contact with ID {bad_contact_id} not found." -def test_get_address_by_contact_id(): - response = client.get("/contact/1/address") +def test_get_contact_addresses(contact, address): + response = client.get(f"/contact/{contact.id}/address") + assert response.status_code == 200 data = response.json() + assert data["total"] == 1 + assert data["items"][0]["id"] == address.id + assert data["items"][0]["contact_id"] == address.contact_id + assert data["items"][0]["address_line_1"] == address.address_line_1 + assert data["items"][0]["address_line_2"] == address.address_line_2 + assert data["items"][0]["city"] == address.city + assert data["items"][0]["state"] == address.state + assert data["items"][0]["postal_code"] == address.postal_code + assert data["items"][0]["country"] == address.country + assert data["items"][0]["address_type"] == address.address_type + + +def test_get_contact_addresses_404_contact_not_found(contact, address): + bad_contact_id = 99999 + response = client.get(f"/contact/{bad_contact_id}/address") + data = response.json() + assert response.status_code == 404 + assert data["detail"] == f"Contact with ID {bad_contact_id} not found." + + +def test_get_emails(email): + response = client.get("/contact/email") assert response.status_code == 200 - assert isinstance(data, dict), "Expected a paginated response" - assert "items" in data, "Expected 'items' in response" - data = data["items"] - assert len(data) == 1, "Expected one phone number" - address = data[0] - assert "id" in address, "Expected 'id' in address item" - assert "address_line_1" in address, "Expected 'address_line_1' in address item" - assert "city" in address, "Expected 'city' in address item" - assert "state" in address, "Expected 'state' in address item" - assert "postal_code" in address, "Expected 'postal_code' in address item" - assert "country" in address, "Expected 'country' in address item" - assert "address_type" in address, "Expected 'address_type' in address item" - - -# test item retrieval via filter =========================================== - - -# Test item retrieval ====================================================== -def test_item_get_contact(): - response = client.get("/contact/1") + data = response.json() + assert data["total"] == 1 + assert data["items"][0]["id"] == email.id + assert data["items"][0]["contact_id"] == email.contact_id + assert data["items"][0]["email"] == email.email + assert data["items"][0]["email_type"] == email.email_type + + +def test_get_email_by_id(email): + response = client.get(f"/contact/email/{email.id}") assert response.status_code == 200 data = response.json() - assert data["id"] == 1 - assert data["name"] == "Test Contact" - - assert "emails" in data - emails = data["emails"] - assert len(emails) == 1 - email = emails[0] - assert email["email"] == "fasdfasdf@gmail.com" - assert email["email_type"] == "Primary" - - assert "phones" in data - phones = data["phones"] - assert len(phones) == 1 - phone = phones[0] - assert phone["phone_number"] == "+12345678901" - assert phone["phone_type"] == "Primary" - - assert "addresses" in data - addresses = data["addresses"] - assert len(addresses) == 1 - address = addresses[0] - assert address["address_line_1"] == "123 Main St" - assert address["city"] == "Test City" - assert address["state"] == "NM" - assert address["postal_code"] == "87501" - assert address["country"] == "United States" - assert address["address_type"] == "Primary" - - -# Test item edit ========================================================== -def test_item_edit_contact_name(): + assert data["id"] == email.id + assert data["contact_id"] == email.contact_id + assert data["email"] == email.email + assert data["email_type"] == email.email_type + + +def test_get_email_404_not_found(email): + bad_email_id = 99999 + response = client.get(f"/contact/email/{bad_email_id}") + data = response.json() + assert response.status_code == 404 + assert data["detail"] == f"Email with ID {bad_email_id} not found." + + +def test_get_phones(phone): + response = client.get("/contact/phone") + assert response.status_code == 200 + data = response.json() + assert data["total"] == 1 + assert data["items"][0]["id"] == phone.id + assert data["items"][0]["contact_id"] == phone.contact_id + assert data["items"][0]["phone_number"] == phone.phone_number + assert data["items"][0]["phone_type"] == phone.phone_type + + +def test_get_phone_by_id(phone): + response = client.get(f"/contact/phone/{phone.id}") + assert response.status_code == 200 + data = response.json() + assert data["id"] == phone.id + assert data["contact_id"] == phone.contact_id + assert data["phone_number"] == phone.phone_number + assert data["phone_type"] == phone.phone_type + + +def test_get_addresses(address): + response = client.get("/contact/address") + assert response.status_code == 200 + data = response.json() + assert data["total"] == 1 + assert data["items"][0]["id"] == address.id + assert data["items"][0]["contact_id"] == address.contact_id + assert data["items"][0]["address_line_1"] == address.address_line_1 + assert data["items"][0]["address_line_2"] == address.address_line_2 + assert data["items"][0]["city"] == address.city + assert data["items"][0]["state"] == address.state + assert data["items"][0]["postal_code"] == address.postal_code + assert data["items"][0]["country"] == address.country + assert data["items"][0]["address_type"] == address.address_type + + +def test_get_address_by_id(address): + response = client.get(f"/contact/address/{address.id}") + assert response.status_code == 200 + data = response.json() + assert data["id"] == address.id + assert data["contact_id"] == address.contact_id + assert data["address_line_1"] == address.address_line_1 + assert data["address_line_2"] == address.address_line_2 + assert data["city"] == address.city + assert data["state"] == address.state + assert data["postal_code"] == address.postal_code + assert data["country"] == address.country + assert data["address_type"] == address.address_type + + +def test_get_address_by_id_404_not_found(address): + bad_address_id = 99999 + response = client.get(f"/contact/address/{bad_address_id}") + data = response.json() + assert response.status_code == 404 + assert data["detail"] == f"Address with ID {bad_address_id} not found." + + +# def test_get_thing_contact_associations(thing_contact_association): +# response = client.get("/contact/thing-association") +# assert response.status_code == 200 +# data = response.json() +# assert data["total"] == 1 +# assert data["items"][0]["id"] == thing_contact_association.id +# assert data["items"][0]["contact_id"] == thing_contact_association.contact_id +# assert data["items"][0]["thing_id"] == thing_contact_association.thing_id + + +# def test_get_contact_thing_contact_association(contact, thing_contact_association): +# response = client.get(f"/contact/{contact.id}/thing-association") +# assert response.status_code == 200 +# data = response.json() +# assert data["total"] == 1 +# assert data["items"][0]["id"] == thing_contact_association.id +# assert data["items"][0]["contact_id"] == thing_contact_association.contact_id +# assert data["items"][0]["thing_id"] == thing_contact_association.thing_id + + +# def test_get_thing_contact_association_404_contact_not_found( +# contact, thing_contact_association +# ): +# bad_contact_id = 999999 +# response = client.get(f"/contact/{bad_contact_id}/thing-association") +# assert response.status_code == 404 +# data = response.json() +# assert data["detail"] == f"Contact with ID {bad_contact_id} not found." + + +# def test_get_thing_contact_association_by_id(thing_contact_association): +# response = client.get(f"/contact/thing-association/{thing_contact_association.id}") +# assert response.status_code == 200 +# data = response.json() +# assert data["id"] == thing_contact_association.id +# assert data["contact_id"] == thing_contact_association.contact_id +# assert data["thing_id"] == thing_contact_association.thing_id + + +# def test_get_thing_contact_association_by_id_404_not_found(thing_contact_association): +# bad_id = 999999 +# response = client.get(f"/contact/thing-association/{bad_id}") +# assert response.status_code == 404 +# data = response.json() +# assert data["detail"] == f"ThingContactAssociation with ID {bad_id} not found." + + +# PATCH tests ================================================================== + + +def test_patch_contact(contact): + payload = {"name": "Updated Contact"} response = client.patch( - "/contact/1", - json={ - "name": "Updated Contact", - }, + f"/contact/{contact.id}", + json=payload, ) assert response.status_code == 200 data = response.json() - assert data["id"] == 1 - assert data["name"] == "Updated Contact" - assert data["role"] == "Owner" + assert data["id"] == contact.id + assert data["name"] == payload["name"] + + cleanup_patch_test(Contact, payload, contact) - # put contact name back to original + +def test_patch_contact_404_not_found(contact): + bad_contact_id = 999999 + payload = {"name": "Updated Contact"} response = client.patch( - "/contact/1", - json={ - "name": "Test Contact", - }, + f"/contact/{bad_contact_id}", + json=payload, ) - assert response.status_code == 200 + assert response.status_code == 404 + data = response.json() + assert data["detail"] == f"Contact with ID {bad_contact_id} not found." -def test_edit_contact_email(): - response = client.patch("/contact/email/1", json={"email": "boo@bar.com"}) + +def test_patch_email(email): + payload = {"email": "boo@bar.com"} + response = client.patch(f"/contact/email/{email.id}", json=payload) data = response.json() assert response.status_code == 200 - assert data["id"] == 1 - assert data["email"] == "boo@bar.com" - assert data["email_type"] == "Primary" + assert data["id"] == email.id + assert data["contact_id"] == email.contact_id + assert data["email"] == payload["email"] + assert data["email_type"] == email.email_type + + cleanup_patch_test(Email, payload, email) - # put contact email back to original - response = client.patch("/contact/email/1", json={"email": "fasdfasdf@gmail.com"}) + +def test_patch_email_404_not_found(email): + bad_email_id = 999999 + payload = {"email": "boo@bar.com"} + response = client.patch(f"/contact/email/{bad_email_id}", json=payload) + assert response.status_code == 404 + data = response.json() + assert data["detail"] == f"Email with ID {bad_email_id} not found." + + +def test_patch_phone(phone): + payload = {"phone_number": "+19709654321"} + response = client.patch(f"/contact/phone/{phone.id}", json=payload) data = response.json() assert response.status_code == 200 - assert data["id"] == 1 - assert data["email"] == "fasdfasdf@gmail.com" + assert data["id"] == phone.id + assert data["contact_id"] == phone.contact_id + assert data["phone_number"] == payload["phone_number"] + assert data["phone_type"] == phone.phone_type + + cleanup_patch_test(Phone, payload, phone) -def test_edit_contact_phone(): - response = client.patch("/contact/phone/1", json={"phone_number": "+19876543210"}) +def test_patch_phone_404_not_found(phone): + bad_phone_id = 999999 + payload = {"phone_number": "+19709654321"} + response = client.patch(f"/contact/phone/{bad_phone_id}", json=payload) + assert response.status_code == 404 + data = response.json() + assert data["detail"] == f"Phone with ID {bad_phone_id} not found." + + +def test_edit_address(address): + payload = { + "address_line_1": "456 Elm St", + "address_line_2": "Apt 21B", + "city": "Updated City", + "state": "CA", + "postal_code": "90210", + "country": "United States", + } + response = client.patch(f"/contact/address/{address.id}", json=payload) data = response.json() assert response.status_code == 200 - assert data["id"] == 1 - assert data["phone_number"] == "+19876543210" + assert data["id"] == address.id + assert data["contact_id"] == address.contact_id + assert data["address_line_1"] == payload["address_line_1"] + assert data["address_line_2"] == payload["address_line_2"] + assert data["city"] == payload["city"] + assert data["state"] == payload["state"] + assert data["postal_code"] == payload["postal_code"] + assert data["country"] == payload["country"] + assert data["address_type"] == address.address_type + + cleanup_patch_test(Address, payload, address) + + +def test_patch_address_404_not_found(address): + bad_address_id = 999999 + payload = { + "address_line_1": "456 Elm St", + "address_line_2": "Apt 21B", + "city": "Updated City", + "state": "CA", + "postal_code": "90210", + "country": "United States", + } + response = client.patch(f"/contact/address/{bad_address_id}", json=payload) + data = response.json() + assert response.status_code == 404 + assert data["detail"] == f"Address with ID {bad_address_id} not found." + + +# def test_patch_thing_contact_association(thing_contact_association, second_contact): +# payload = {"contact_id": second_contact.id} +# response = client.patch( +# f"/contact/thing-association/{thing_contact_association.id}", json=payload +# ) +# data = response.json() +# assert response.status_code == 200 +# assert data["id"] == thing_contact_association.id +# assert data["contact_id"] == payload["contact_id"] + +# cleanup_patch_test(ThingContactAssociation, payload, thing_contact_association) + + +# def test_patch_thing_contact_association_404_not_found( +# thing_contact_association, second_contact +# ): +# bad_id = 999999 +# payload = {"contact_id": second_contact.id} +# response = client.patch(f"/contact/thing-association/{bad_id}", json=payload) +# assert response.status_code == 404 +# data = response.json() +# assert data["detail"] == f"ThingContactAssociation with ID {bad_id} not found." + + +# def test_patch_thing_contact_association_409_contact_not_found( +# thing_contact_association, +# ): +# bad_contact_id = 999999 +# payload = {"contact_id": bad_contact_id} +# response = client.patch( +# f"/contact/thing-association/{thing_contact_association.id}", json=payload +# ) +# assert response.status_code == 409 +# data = response.json() +# assert len(data["detail"]) == 1 +# assert data["detail"][0]["msg"] == f"Contact with ID {bad_contact_id} not found." + + +# def test_patch_thing_contact_association_409_thing_not_found(thing_contact_association): +# bad_thing_id = 999999 +# payload = {"thing_id": bad_thing_id} +# response = client.patch( +# f"/contact/thing-association/{thing_contact_association.id}", json=payload +# ) +# assert response.status_code == 409 +# data = response.json() +# assert len(data["detail"]) == 1 +# assert data["detail"][0]["msg"] == f"Thing with ID {bad_thing_id} not found." + + +# DELETE tests ================================================================= + + +def test_delete_contact(second_contact, second_email, second_phone, second_address): + response = client.delete(f"/contact/{second_contact.id}") + assert response.status_code == 204 + + # verify contact is deleted and it cascades to emails, phones, and addresses + response = client.get(f"/contact/{second_contact.id}") + assert response.status_code == 404 + data = response.json() + assert data["detail"] == f"Contact with ID {second_contact.id} not found." + + response = client.get(f"/contact/email/{second_email.id}") + assert response.status_code == 404 + data = response.json() + assert data["detail"] == f"Email with ID {second_email.id} not found." + + response = client.get(f"/contact/phone/{second_phone.id}") + assert response.status_code == 404 + data = response.json() + assert data["detail"] == f"Phone with ID {second_phone.id} not found." - # put contact phone back to original - response = client.patch("/contact/phone/1", json={"phone_number": "+12345678901"}) + response = client.get(f"/contact/address/{second_address.id}") + assert response.status_code == 404 data = response.json() + assert data["detail"] == f"Address with ID {second_address.id} not found." + + +def test_delete_contact_404_not_found(second_contact): + bad_contact_id = 999999 + response = client.delete(f"/contact/{bad_contact_id}") + assert response.status_code == 404 + data = response.json() + assert data["detail"] == f"Contact with ID {bad_contact_id} not found." + + +def test_delete_email(second_contact, second_email): + response = client.delete(f"/contact/email/{second_email.id}") + assert response.status_code == 204 + + # verify email is deleted + response = client.get(f"/contact/email/{second_email.id}") + assert response.status_code == 404 + data = response.json() + assert data["detail"] == f"Email with ID {second_email.id} not found." + + # verify email is no longer associated with the contact + response = client.get(f"/contact/{second_contact.id}") assert response.status_code == 200 - assert data["id"] == 1 - assert data["phone_number"] == "+12345678901" + data = response.json() + assert data["emails"] == [] -def test_edit_contact_address(): - response = client.patch( - "/contact/address/1", - json={ - "address_line_1": "456 Elm St", - "city": "Updated City", - "postal_code": "90210", - "country": "United States", - }, - ) +def test_delete_email_404_not_found(second_email): + bad_email_id = 999999 + response = client.delete(f"/contact/email/{bad_email_id}") + assert response.status_code == 404 + data = response.json() + assert data["detail"] == f"Email with ID {bad_email_id} not found." + + +def test_delete_phone(second_contact, second_phone): + response = client.delete(f"/contact/phone/{second_phone.id}") + assert response.status_code == 204 + + # verify phone is deleted + response = client.get(f"/contact/phone/{second_phone.id}") + assert response.status_code == 404 data = response.json() + assert data["detail"] == f"Phone with ID {second_phone.id} not found." + + # verify phone is no longer associated with the contact + response = client.get(f"/contact/{second_contact.id}") assert response.status_code == 200 - assert data["id"] == 1 - assert data["address_line_1"] == "456 Elm St" - assert data["city"] == "Updated City" - assert data["state"] == "NM" - assert data["postal_code"] == "90210" - assert data["country"] == "United States" - assert data["address_type"] == "Primary" - - # put contact address back to original - response = client.patch( - "/contact/address/1", - json={ - "address_line_1": "123 Main St", - "city": "Test City", - "state": "NM", - "postal_code": "87501", - "country": "United States", - "address_type": "Primary", - }, - ) data = response.json() + assert data["phones"] == [] + + +def test_delete_phone_404_not_found(second_phone): + bad_phone_id = 999999 + response = client.delete(f"/contact/phone/{bad_phone_id}") + assert response.status_code == 404 + data = response.json() + assert data["detail"] == f"Phone with ID {bad_phone_id} not found." + + +def test_delete_address(second_contact, second_address): + response = client.delete(f"/contact/address/{second_address.id}") + assert response.status_code == 204 + + # verify address is deleted + response = client.get(f"/contact/address/{second_address.id}") + assert response.status_code == 404 + data = response.json() + assert data["detail"] == f"Address with ID {second_address.id} not found." + + # verify address is no longer associated with the contact + response = client.get(f"/contact/{second_contact.id}") assert response.status_code == 200 + data = response.json() + assert data["addresses"] == [] + + +def test_delete_address_404_not_found(second_address): + bad_address_id = 99999 + response = client.delete(f"/contact/address/{bad_address_id}") + assert response.status_code == 404 + data = response.json() + assert data["detail"] == f"Address with ID {bad_address_id} not found." + + +# def test_delete_thing_contact_association(second_thing_contact_association): +# response = client.delete( +# f"/contact/thing-association/{second_thing_contact_association.id}" +# ) +# assert response.status_code == 204 + +# # verify association is deleted +# response = client.get( +# f"/contact/thing-association/{second_thing_contact_association.id}" +# ) +# assert response.status_code == 404 +# data = response.json() +# assert ( +# data["detail"] +# == f"ThingContactAssociation with ID {second_thing_contact_association.id} not found." +# ) + + +# def test_delete_thing_contact_association_404_not_found( +# second_thing_contact_association, +# ): +# bad_id = 999999 +# response = client.delete(f"/contact/thing-association/{bad_id}") +# assert response.status_code == 404 +# data = response.json() +# assert data["detail"] == f"ThingContactAssociation with ID {bad_id} not found." diff --git a/tests/test_geospatial.py b/tests/test_geospatial.py index d676c20bd..82592d195 100644 --- a/tests/test_geospatial.py +++ b/tests/test_geospatial.py @@ -16,11 +16,43 @@ from pathlib import Path import pytest +from main import app +from core.dependencies import ( + admin_function, + editor_function, + amp_admin_function, + amp_editor_function, + viewer_function, + amp_viewer_function, +) from db import Thing, Location, LocationThingAssociation, Group from db.engine import session_ctx -from tests import client +from tests import client, override_authentication from geoalchemy2 import functions as geofunc + +@pytest.fixture(scope="module", autouse=True) +def override_authentication_dependency_fixture(): + app.dependency_overrides[admin_function] = override_authentication( + default={"name": "foobar", "sub": "1234567890"} + ) + app.dependency_overrides[editor_function] = override_authentication( + default={"name": "foobar", "sub": "1234567890"} + ) + app.dependency_overrides[viewer_function] = override_authentication() + app.dependency_overrides[amp_admin_function] = override_authentication( + default={"name": "foobar", "sub": "1234567890"} + ) + app.dependency_overrides[amp_editor_function] = override_authentication( + default={"name": "foobar", "sub": "1234567890"} + ) + app.dependency_overrides[amp_viewer_function] = override_authentication() + + yield + + app.dependency_overrides = {} + + # @pytest.fixture(scope="module", autouse=True) # def location_fixture(): # client.post( diff --git a/tests/test_group.py b/tests/test_group.py index 9989b5c9d..e652339a7 100644 --- a/tests/test_group.py +++ b/tests/test_group.py @@ -1,5 +1,24 @@ import pytest -from tests import client + +from core.dependencies import admin_function, viewer_function, editor_function +from main import app +from tests import client, override_authentication + + +@pytest.fixture(scope="module", autouse=True) +def override_authentication_dependency_fixture(): + + app.dependency_overrides[admin_function] = override_authentication( + default={"name": "foobar", "sub": "1234567890"} + ) + app.dependency_overrides[editor_function] = override_authentication( + default={"name": "foobar", "sub": "1234567890"} + ) + app.dependency_overrides[viewer_function] = override_authentication() + + yield + + app.dependency_overrides = {} # ADD tests ====================================================== diff --git a/tests/test_lexicon.py b/tests/test_lexicon.py index 79e0a9fe1..91c152d26 100644 --- a/tests/test_lexicon.py +++ b/tests/test_lexicon.py @@ -14,7 +14,28 @@ # limitations under the License. # =============================================================================== from services.validation import get_category -from tests import client +from tests import client, override_authentication + +from core.dependencies import admin_function, viewer_function, editor_function +from main import app + +import pytest + + +@pytest.fixture(scope="module", autouse=True) +def override_authentication_dependency_fixture(): + + app.dependency_overrides[admin_function] = override_authentication( + default={"name": "foobar", "sub": "1234567890"} + ) + app.dependency_overrides[editor_function] = override_authentication( + default={"name": "foobar", "sub": "1234567890"} + ) + app.dependency_overrides[viewer_function] = override_authentication() + + yield + + app.dependency_overrides = {} def test_add_lexicon_category(): diff --git a/tests/test_lexicon_pagination.py b/tests/test_lexicon_pagination.py index c9688737b..f5255f4d5 100644 --- a/tests/test_lexicon_pagination.py +++ b/tests/test_lexicon_pagination.py @@ -13,7 +13,28 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== -from tests import client +from tests import client, override_authentication + +from core.dependencies import admin_function, viewer_function, editor_function +from main import app + +import pytest + + +@pytest.fixture(scope="module", autouse=True) +def override_authentication_dependency_fixture(): + + app.dependency_overrides[admin_function] = override_authentication( + default={"name": "foobar", "sub": "1234567890"} + ) + app.dependency_overrides[editor_function] = override_authentication( + default={"name": "foobar", "sub": "1234567890"} + ) + app.dependency_overrides[viewer_function] = override_authentication() + + yield + + app.dependency_overrides = {} def test_get_lexicon_terms_sort_categories_branch(): diff --git a/tests/test_sample.py b/tests/test_sample.py index 6588ae89d..6004a60ed 100644 --- a/tests/test_sample.py +++ b/tests/test_sample.py @@ -47,6 +47,7 @@ def second_sample(thing, sensor): yield sample session.delete(sample) session.commit() + session.close() # ============== Custom validators ================================================= @@ -134,10 +135,13 @@ def test_409_add_sample_invalid_field_sample_id(sample, thing): ) data = response.json() assert response.status_code == 409 + assert data["detail"][0]["loc"] == ["body", "field_sample_id"] assert ( - data["detail"] + data["detail"][0]["msg"] == f"Sample with field_sample_id {sample.field_sample_id} already exists." ) + assert data["detail"][0]["type"] == "value_error" + assert data["detail"][0]["input"] == {"field_sample_id": sample.field_sample_id} def test_409_add_sample_invalid_thing_id(): @@ -165,7 +169,13 @@ def test_409_add_sample_invalid_thing_id(): ) data = response.json() assert response.status_code == 409 - assert data["detail"] == f"Thing with ID {payload['thing_id']} does not exist." + assert data["detail"][0]["loc"] == ["body", "thing_id"] + assert ( + data["detail"][0]["msg"] + == f"Thing with ID {payload['thing_id']} does not exist." + ) + assert data["detail"][0]["type"] == "value_error" + assert data["detail"][0]["input"] == {"thing_id": payload["thing_id"]} # ============= Patch tests for samples ============================================= @@ -230,10 +240,13 @@ def test_409_patch_sample_invalid_field_sample_id(sample, second_sample): ) data = response.json() assert response.status_code == 409 + assert data["detail"][0]["loc"] == ["body", "field_sample_id"] assert ( - data["detail"] + data["detail"][0]["msg"] == f"Sample with field_sample_id {payload['field_sample_id']} already exists." ) + assert data["detail"][0]["type"] == "value_error" + assert data["detail"][0]["input"] == {"field_sample_id": sample.field_sample_id} def test_409_patch_sample_invalid_thing_id(sample): @@ -249,7 +262,13 @@ def test_409_patch_sample_invalid_thing_id(sample): ) data = response.json() assert response.status_code == 409 - assert data["detail"] == f"Thing with ID {payload['thing_id']} does not exist." + assert data["detail"][0]["loc"] == ["body", "thing_id"] + assert ( + data["detail"][0]["msg"] + == f"Thing with ID {payload['thing_id']} does not exist." + ) + assert data["detail"][0]["type"] == "value_error" + assert data["detail"][0]["input"] == {"thing_id": payload["thing_id"]} # ============= Get tests for samples ============================================= diff --git a/tests/test_search.py b/tests/test_search.py index 7daef334b..620c74cc2 100644 --- a/tests/test_search.py +++ b/tests/test_search.py @@ -22,15 +22,14 @@ from tests import client -def test_search_api(thing, sample): +def test_search_api(thing, contact, email, phone, address): response = client.get("/search", params={"q": "Test"}) assert response.status_code == 200 data = response.json() - assert isinstance(data, dict) items = data.get("items") assert isinstance(items, list) - assert len(items) == 3 + assert len(items) == 2 @pytest.mark.skip(reason="This test is not working .") @@ -54,42 +53,42 @@ def test_search_api3(): assert len(items) == 0 -def test_search_contact(): +def test_search_contact(contact): with session_ctx() as session: query = search(select(Contact), "Test") - contact = session.scalars(query).first() - assert contact is not None + queried_contact = session.scalars(query).first() + assert queried_contact is not None -def test_search_contact_no_results(): +def test_search_contact_no_results(contact): with session_ctx() as session: query = search(select(Contact), "NonExistent") - contact = session.scalars(query).first() - assert contact is None + queried_contact = session.scalars(query).first() + assert queried_contact is None -def test_search_contact_like(): +def test_search_contact_like(contact): with session_ctx() as session: query = search(select(Contact), "Te") - contact = session.scalars(query).first() - assert contact is not None + queried_contact = session.scalars(query).first() + assert queried_contact is not None -def test_search_contact_by_email(): +def test_search_contact_by_email(contact, email): with session_ctx() as session: vector = Contact.search_vector | Email.search_vector query = search( select(Contact).join(Email), - "fasdfasdf@gmail.co", + "test@example.com", vector=vector, ) - contact = session.scalars(query).first() - assert contact is not None + queried_contact = session.scalars(query).first() + assert queried_contact is not None -def test_search_contact_by_email_no_results(): +def test_search_contact_by_email_no_results(contact, email): with session_ctx() as session: vector = Contact.search_vector | Email.search_vector query = search( @@ -97,23 +96,23 @@ def test_search_contact_by_email_no_results(): "foo", vector=vector, ) - contact = session.scalars(query).first() - assert contact is None + queried_contact = session.scalars(query).first() + assert queried_contact is None -def test_search_contact_by_phone_number(): +def test_search_contact_by_phone_number(contact, phone): with session_ctx() as session: vector = Contact.search_vector | Phone.search_vector query = search( select(Contact).join(Phone), - "+12345678901", + "+15051234567", vector=vector, ) - contact = session.scalars(query).first() - assert contact is not None + queried_contact = session.scalars(query).first() + assert queried_contact is not None -def test_search_contact_by_phone_number_no_results(): +def test_search_contact_by_phone_number_no_results(contact, phone): with session_ctx() as session: vector = Contact.search_vector | Phone.search_vector query = search( @@ -121,23 +120,23 @@ def test_search_contact_by_phone_number_no_results(): "+12345678902", vector=vector, ) - contact = session.scalars(query).first() - assert contact is None + queried_contact = session.scalars(query).first() + assert queried_contact is None -def test_search_contact_by_phone_like(): +def test_search_contact_by_phone_like(contact, phone): with session_ctx() as session: vector = Contact.search_vector | Phone.search_vector query = search( select(Contact).join(Phone), - "+12", + "+15", vector=vector, ) - contact = session.scalars(query).first() - assert contact is not None + queried_contact = session.scalars(query).first() + assert queried_contact is not None -def test_search_contact_by_phone_like_no_results(): +def test_search_contact_by_phone_like_no_results(contact, phone): with session_ctx() as session: vector = Contact.search_vector | Phone.search_vector query = search( @@ -145,8 +144,8 @@ def test_search_contact_by_phone_like_no_results(): "+99", vector=vector, ) - contact = session.scalars(query).first() - assert contact is None + queried_contact = session.scalars(query).first() + assert queried_contact is None # def test_search_owner_by_contact_name(): diff --git a/tests/test_sensor.py b/tests/test_sensor.py index bfa47c6bc..e01cbb035 100644 --- a/tests/test_sensor.py +++ b/tests/test_sensor.py @@ -39,6 +39,8 @@ def second_sensor(): session.add(sensor) session.commit() yield sensor + session.delete(sensor) + session.commit() session.close() diff --git a/tests/test_thing.py b/tests/test_thing.py index 04f57e92e..8c0cfd0b7 100644 --- a/tests/test_thing.py +++ b/tests/test_thing.py @@ -13,7 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # =============================================================================== -from tests import client +import pytest +from tests import client, override_authentication from main import app from core.dependencies import ( admin_function, @@ -25,33 +26,26 @@ ) -def override_authentication(default=True): - """ - Override the authentication dependency for testing purposes. - This allows all users to be considered authenticated. - """ - - def closure(): - print("Overriding authentication") - return default - - return closure +@pytest.fixture(scope="module", autouse=True) +def override_authentication_dependency_fixture(): + app.dependency_overrides[admin_function] = override_authentication( + default={"name": "foobar", "sub": "1234567890"} + ) + app.dependency_overrides[editor_function] = override_authentication( + default={"name": "foobar", "sub": "1234567890"} + ) + app.dependency_overrides[viewer_function] = override_authentication() + app.dependency_overrides[amp_admin_function] = override_authentication( + default={"name": "foobar", "sub": "1234567890"} + ) + app.dependency_overrides[amp_editor_function] = override_authentication( + default={"name": "foobar", "sub": "1234567890"} + ) + app.dependency_overrides[amp_viewer_function] = override_authentication() + yield -app.dependency_overrides[admin_function] = override_authentication( - default={"name": "foobar", "sub": "1234567890"} -) -app.dependency_overrides[editor_function] = override_authentication( - default={"name": "foobar", "sub": "1234567890"} -) -app.dependency_overrides[viewer_function] = override_authentication() -app.dependency_overrides[amp_admin_function] = override_authentication( - default={"name": "foobar", "sub": "1234567890"} -) -app.dependency_overrides[amp_editor_function] = override_authentication( - default={"name": "foobar", "sub": "1234567890"} -) -app.dependency_overrides[amp_viewer_function] = override_authentication() + app.dependency_overrides = {} def test_add_group():