From 1233d6394cffd392030f1556a0a3e9dad4152858 Mon Sep 17 00:00:00 2001 From: Ajay-Satish-01 Date: Mon, 1 Jun 2026 01:01:13 -0500 Subject: [PATCH] tz_issue_fix Signed-off-by: Ajay-Satish-01 --- docs/concepts/macros/macro_variables.md | 6 +- sqlmesh/cli/main.py | 17 ++- sqlmesh/cli/options.py | 5 + sqlmesh/core/config/connection.py | 23 ++- sqlmesh/core/config/root.py | 15 +- sqlmesh/core/context.py | 150 ++++++++++++++++++- sqlmesh/core/plan/builder.py | 19 ++- sqlmesh/core/plan/definition.py | 4 + sqlmesh/core/renderer.py | 10 +- sqlmesh/core/snapshot/evaluator.py | 18 +-- sqlmesh/utils/date.py | 171 +++++++++++++++++++--- sqlmesh/utils/java.py | 41 ++++++ tests/cli/test_cli.py | 35 +++++ tests/conftest.py | 10 ++ tests/core/test_config.py | 11 ++ tests/core/test_context.py | 182 ++++++++++++++++++++++++ tests/core/test_plan.py | 41 ++++++ tests/core/test_test.py | 6 +- tests/engines/spark/conftest.py | 16 ++- tests/utils/test_date.py | 96 ++++++++++++- 20 files changed, 824 insertions(+), 52 deletions(-) create mode 100644 sqlmesh/utils/java.py diff --git a/docs/concepts/macros/macro_variables.md b/docs/concepts/macros/macro_variables.md index 398117b3a9..3e3e85658e 100644 --- a/docs/concepts/macros/macro_variables.md +++ b/docs/concepts/macros/macro_variables.md @@ -55,10 +55,14 @@ SQLMesh uses the python [datetime module](https://docs.python.org/3/library/date !!! tip "Important" - Predefined variables with a time component always use the [UTC time zone](https://en.wikipedia.org/wiki/Coordinated_Universal_Time). + Macro instants such as `@start_dt`, `@end_dt`, `@start_tstz`, and `@end_tstz` are always stored and rendered as [UTC](https://en.wikipedia.org/wiki/Coordinated_Universal_Time) timestamps. During incremental backfill, `@start_ds` and `@end_ds` also use UTC calendar dates derived from those interval boundaries. Learn more about timezones and incremental models [here](../models/model_kinds.md#timezones). +Relative CLI and API inputs such as `--start "2 weeks ago"` are interpreted using UTC calendar-day boundaries by default. To anchor relative start, end, and execution-time values to a specific timezone (for example, midnight in `America/Los_Angeles`), pass `--time-zone` on supported commands (`plan`, `render`, `evaluate`, `run`, `audit`, `check_intervals`) or set the project-level `time_zone` config. The CLI flag overrides the config value. + +When a **day-or-larger** relative start or end (for example, `"1 week ago"`, `"today"`, `"yesterday"`) is parsed with a configured timezone, `@start_tstz` and `@end_tstz` reflect the correct UTC instant and `@start_ds` / `@end_ds` use that timezone's local calendar date in `render`, `evaluate`, and `audit`. Sub-day relatives such as `"2 hours ago"` ignore `--time-zone` and continue to use UTC-relative parsing. Absolute date strings and `@execution_ds` always use UTC calendar dates. + Prefixes: * start - The inclusive starting interval of a model run diff --git a/sqlmesh/cli/main.py b/sqlmesh/cli/main.py index dd3adaa687..4f9184824b 100644 --- a/sqlmesh/cli/main.py +++ b/sqlmesh/cli/main.py @@ -255,6 +255,7 @@ def init( @opt.start_time @opt.end_time @opt.execution_time +@opt.time_zone @opt.expand @click.option( "--dialect", @@ -272,6 +273,7 @@ def render( start: TimeLike, end: TimeLike, execution_time: t.Optional[TimeLike] = None, + time_zone: t.Optional[str] = None, expand: t.Optional[t.Union[bool, t.Iterable[str]]] = None, dialect: t.Optional[str] = None, no_format: bool = False, @@ -285,6 +287,7 @@ def render( start=start, end=end, execution_time=execution_time, + time_zone=time_zone, expand=expand, ) @@ -310,6 +313,7 @@ def render( @opt.start_time @opt.end_time @opt.execution_time +@opt.time_zone @click.option( "--limit", type=int, @@ -324,6 +328,7 @@ def evaluate( start: TimeLike, end: TimeLike, execution_time: t.Optional[TimeLike] = None, + time_zone: t.Optional[str] = None, limit: t.Optional[int] = None, ) -> None: """Evaluate a model and return a dataframe with a default limit of 1000.""" @@ -332,6 +337,7 @@ def evaluate( start=start, end=end, execution_time=execution_time, + time_zone=time_zone, limit=limit, ) if hasattr(df, "show"): @@ -394,6 +400,7 @@ def diff(ctx: click.Context, environment: t.Optional[str] = None) -> None: @opt.start_time @opt.end_time @opt.execution_time +@opt.time_zone @click.option( "--create-from", type=str, @@ -574,6 +581,7 @@ def plan( @click.argument("environment", required=False) @opt.start_time @opt.end_time +@opt.time_zone @click.option("--skip-janitor", is_flag=True, help="Skip the janitor task.") @click.option( "--ignore-cron", @@ -796,6 +804,7 @@ def test( @opt.start_time @opt.end_time @opt.execution_time +@opt.time_zone @click.pass_obj @error_handler @cli_analytics @@ -805,9 +814,12 @@ def audit( start: TimeLike, end: TimeLike, execution_time: t.Optional[TimeLike] = None, + time_zone: t.Optional[str] = None, ) -> None: """Run audits for the target model(s).""" - if not obj.audit(models=models, start=start, end=end, execution_time=execution_time): + if not obj.audit( + models=models, start=start, end=end, execution_time=execution_time, time_zone=time_zone + ): exit(1) @@ -827,6 +839,7 @@ def audit( ) @opt.start_time @opt.end_time +@opt.time_zone @click.pass_context @error_handler @cli_analytics @@ -837,6 +850,7 @@ def check_intervals( select_model: t.List[str], start: TimeLike, end: TimeLike, + time_zone: t.Optional[str] = None, ) -> None: """Show missing intervals in an environment, respecting signals.""" context = ctx.obj @@ -847,6 +861,7 @@ def check_intervals( select_models=select_model, start=start, end=end, + time_zone=time_zone, ) ) diff --git a/sqlmesh/cli/options.py b/sqlmesh/cli/options.py index 2e4642eb0e..4810f1fd11 100644 --- a/sqlmesh/cli/options.py +++ b/sqlmesh/cli/options.py @@ -37,6 +37,11 @@ help="The execution time (defaults to now).", ) +time_zone = click.option( + "--time-zone", + help="IANA timezone for interpreting relative --start, --end, and --execution-time values (e.g. America/Los_Angeles). Defaults to UTC.", +) + expand = click.option( "--expand", multiple=True, diff --git a/sqlmesh/core/config/connection.py b/sqlmesh/core/config/connection.py index 343414eab2..af5df494fa 100644 --- a/sqlmesh/core/config/connection.py +++ b/sqlmesh/core/config/connection.py @@ -1806,10 +1806,27 @@ def _static_connection_kwargs(self) -> t.Dict[str, t.Any]: from pyspark.conf import SparkConf from pyspark.sql import SparkSession + from sqlmesh.utils.errors import ConfigError + from sqlmesh.utils.java import ( + is_spark_java_supported, + java_major_version, + spark_java_options, + ) + + if not is_spark_java_supported(): + raise ConfigError( + f"Spark is not supported on Java {java_major_version() or 'unknown'}. " + "Use Java 17 through 23 when running Spark locally." + ) + spark_config = SparkConf() - if self.config: - for k, v in self.config.items(): - spark_config.set(k, v) + config = dict(self.config or {}) + java_options = spark_java_options(config.pop("spark.driver.extraJavaOptions", "")) + if java_options: + config["spark.driver.extraJavaOptions"] = java_options + + for k, v in config.items(): + spark_config.set(k, v) if self.config_dir: os.environ["SPARK_CONF_DIR"] = self.config_dir diff --git a/sqlmesh/core/config/root.py b/sqlmesh/core/config/root.py index 211d271b01..6394b54f42 100644 --- a/sqlmesh/core/config/root.py +++ b/sqlmesh/core/config/root.py @@ -46,7 +46,7 @@ from sqlmesh.core.loader import Loader, SqlMeshLoader from sqlmesh.core.notification_target import NotificationTarget from sqlmesh.core.user import User -from sqlmesh.utils.date import to_timestamp, now +from sqlmesh.utils.date import parse_time_zone, to_timestamp, now from sqlmesh.utils.errors import ConfigError from sqlmesh.utils.pydantic import model_validator @@ -76,16 +76,26 @@ def validate_regex_key_dict(value: t.Dict[str | re.Pattern, t.Any]) -> t.Dict[re return compile_regex_mapping(value) +def validate_time_zone(v: t.Any) -> t.Optional[str]: + if not v or v == "UTC": + return None + v = str(v) + parse_time_zone(v) + return v + + if t.TYPE_CHECKING: from sqlmesh.core._typing import Self NoPastTTLString = str GatewayDict = t.Dict[str, GatewayConfig] RegexKeyDict = t.Dict[re.Pattern, str] + TimeZoneString = t.Optional[str] else: NoPastTTLString = t.Annotated[str, BeforeValidator(validate_no_past_ttl)] GatewayDict = t.Annotated[t.Dict[str, GatewayConfig], BeforeValidator(gateways_ensure_dict)] RegexKeyDict = t.Annotated[t.Dict[re.Pattern, str], BeforeValidator(validate_regex_key_dict)] + TimeZoneString = t.Annotated[t.Optional[str], BeforeValidator(validate_time_zone)] class Config(BaseConfig): @@ -129,6 +139,8 @@ class Config(BaseConfig): before_all: SQL statements or macros to be executed at the start of the `sqlmesh plan` and `sqlmesh run` commands. after_all: SQL statements or macros to be executed at the end of the `sqlmesh plan` and `sqlmesh run` commands. cache_dir: The directory to store the SQLMesh cache. Defaults to .cache in the project folder. + time_zone: IANA timezone for interpreting relative start, end, and execution-time values. + Defaults to UTC when unset. """ gateways: GatewayDict = {"": GatewayConfig()} @@ -174,6 +186,7 @@ class Config(BaseConfig): linter: LinterConfig = LinterConfig() janitor: JanitorConfig = JanitorConfig() cache_dir: t.Optional[str] = None + time_zone: TimeZoneString = None dbt: t.Optional[DbtConfig] = None _FIELD_UPDATE_STRATEGY: t.ClassVar[t.Dict[str, UpdateStrategy]] = { diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index 4eb0d3b40b..c32afb0bc3 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -40,6 +40,7 @@ import time import traceback import typing as t +import zoneinfo from functools import cached_property from io import StringIO from itertools import chain @@ -129,6 +130,8 @@ now, to_datetime, make_exclusive, + parse_time_zone, + is_relative, ) from sqlmesh.utils.errors import ( CircuitBreakerError, @@ -765,6 +768,7 @@ def run( select_models: t.Optional[t.Collection[str]] = None, exit_on_env_update: t.Optional[int] = None, no_auto_upstream: bool = False, + time_zone: t.Optional[str] = None, ) -> CompletionStatus: """Run the entire dag through the scheduler. @@ -798,6 +802,16 @@ def run( ) self._load_materializations() + relative_base = to_datetime( + execution_time or now(), relative_tz=self._relative_tz(time_zone) + ) + start = self._resolve_interval_bound( + start, relative_base=relative_base, time_zone=time_zone + ) + end = self._resolve_interval_bound(end, relative_base=relative_base, time_zone=time_zone) + if execution_time and isinstance(execution_time, str): + execution_time = relative_base + env_check_attempts_num = max( 1, self.config.run.environment_check_max_wait @@ -1095,6 +1109,44 @@ def requirements(self) -> t.Dict[str, str]: def default_catalog(self) -> t.Optional[str]: return self.default_catalog_per_gateway.get(self.selected_gateway) + def _effective_time_zone(self, time_zone: t.Optional[str] = None) -> t.Optional[str]: + if time_zone is not None: + return time_zone or None + return self.config.time_zone + + def _relative_tz(self, time_zone: t.Optional[str] = None) -> t.Optional[zoneinfo.ZoneInfo]: + return parse_time_zone(self._effective_time_zone(time_zone)) + + def _localize_ds_flags( + self, + start: t.Optional[TimeLike], + end: t.Optional[TimeLike], + time_zone: t.Optional[str] = None, + ) -> t.Tuple[t.Optional[zoneinfo.ZoneInfo], bool, bool]: + relative_tz = self._relative_tz(time_zone) + localize_start_ds = bool( + relative_tz and start is not None and isinstance(start, str) and is_relative(start) + ) + localize_end_ds = bool( + relative_tz and end is not None and isinstance(end, str) and is_relative(end) + ) + return relative_tz, localize_start_ds, localize_end_ds + + def _resolve_interval_bound( + self, + value: t.Optional[TimeLike], + *, + relative_base: datetime, + time_zone: t.Optional[str] = None, + ) -> t.Optional[TimeLike]: + if value is None or not isinstance(value, str) or not is_relative(value): + return value + return to_datetime( + value, + relative_base=relative_base, + relative_tz=self._relative_tz(time_zone), + ) + @python_api_analytics def render( self, @@ -1104,6 +1156,7 @@ def render( end: t.Optional[TimeLike] = None, execution_time: t.Optional[TimeLike] = None, expand: t.Union[bool, t.Iterable[str]] = False, + time_zone: t.Optional[str] = None, **kwargs: t.Any, ) -> exp.Expr: """Renders a model's query, expanding macros with provided kwargs, and optionally expanding referenced models. @@ -1116,11 +1169,22 @@ def render( expand: Whether or not to use expand materialized models, defaults to False. If True, all referenced models are expanded as raw queries. If a list, only referenced models are expanded as raw queries. + time_zone: IANA timezone for interpreting relative start, end, and execution-time values. Returns: The rendered expression. """ + relative_tz, localize_start_ds, localize_end_ds = self._localize_ds_flags( + start, end, time_zone + ) execution_time = execution_time or now() + relative_base = to_datetime(execution_time, relative_tz=relative_tz) + start = self._resolve_interval_bound( + start, relative_base=relative_base, time_zone=time_zone + ) + end = self._resolve_interval_bound(end, relative_base=relative_base, time_zone=time_zone) + if isinstance(execution_time, str): + execution_time = relative_base model = self.get_model(model_or_snapshot, raise_if_missing=True) @@ -1145,6 +1209,9 @@ def render( start=start, end=end, execution_time=execution_time, + relative_tz=relative_tz, + localize_start_ds=localize_start_ds, + localize_end_ds=localize_end_ds, **kwargs, ) ) @@ -1161,6 +1228,9 @@ def render( expand=expand, deployability_index=deployability_index, engine_adapter=self._get_engine_adapter(model.gateway), + relative_tz=relative_tz, + localize_start_ds=localize_start_ds, + localize_end_ds=localize_end_ds, **kwargs, ) @@ -1170,8 +1240,9 @@ def evaluate( model_or_snapshot: ModelOrSnapshot, start: TimeLike, end: TimeLike, - execution_time: TimeLike, + execution_time: t.Optional[TimeLike] = None, limit: t.Optional[int] = None, + time_zone: t.Optional[str] = None, **kwargs: t.Any, ) -> DF: """Evaluate a model or snapshot (running its query against a DB/Engine). @@ -1182,9 +1253,30 @@ def evaluate( model_or_snapshot: The model, model name, or snapshot to render. start: The start of the interval to evaluate. end: The end of the interval to evaluate. - execution_time: The date/time time reference to use for execution time. + execution_time: The date/time time reference to use for execution time. Defaults to now. limit: A limit applied to the model. + time_zone: IANA timezone for interpreting relative start, end, and execution-time values. """ + relative_tz, localize_start_ds, localize_end_ds = self._localize_ds_flags( + start, end, time_zone + ) + resolved_execution_time: TimeLike = execution_time or now() + relative_base = to_datetime(resolved_execution_time, relative_tz=relative_tz) + if ( + resolved_start := self._resolve_interval_bound( + start, relative_base=relative_base, time_zone=time_zone + ) + ) is not None: + start = resolved_start + if ( + resolved_end := self._resolve_interval_bound( + end, relative_base=relative_base, time_zone=time_zone + ) + ) is not None: + end = resolved_end + if isinstance(execution_time, str): + resolved_execution_time = relative_base + snapshots = self.snapshots fqn = self._node_or_snapshot_to_fqn(model_or_snapshot) if fqn not in snapshots: @@ -1205,10 +1297,13 @@ def evaluate( snapshot, start=start, end=end, - execution_time=execution_time, + execution_time=resolved_execution_time, snapshots=self.snapshots, limit=limit or c.DEFAULT_MAX_LIMIT, expand=expand, + relative_tz=relative_tz, + localize_start_ds=localize_start_ds, + localize_end_ds=localize_end_ds, ) if df is None: @@ -1343,6 +1438,7 @@ def plan( explain: t.Optional[bool] = None, ignore_cron: t.Optional[bool] = None, min_intervals: t.Optional[int] = None, + time_zone: t.Optional[str] = None, ) -> Plan: """Interactively creates a plan. @@ -1423,6 +1519,7 @@ def plan( explain=explain, ignore_cron=ignore_cron, min_intervals=min_intervals, + time_zone=time_zone, ) plan = plan_builder.build() @@ -1476,6 +1573,7 @@ def plan_builder( ignore_cron: t.Optional[bool] = None, min_intervals: t.Optional[int] = None, always_include_local_changes: t.Optional[bool] = None, + time_zone: t.Optional[str] = None, ) -> PlanBuilder: """Creates a plan builder. @@ -1701,6 +1799,7 @@ def plan_builder( backfill_models, snapshots, max_interval_end_per_model, + time_zone=time_zone, ) if not self.config.virtual_environment_mode.is_full: @@ -1757,6 +1856,8 @@ def plan_builder( }, explain=explain or False, ignore_cron=ignore_cron or False, + relative_tz=self._relative_tz(time_zone), + time_zone=self._effective_time_zone(time_zone), ) def apply( @@ -2286,6 +2387,7 @@ def audit( *, models: t.Optional[t.Iterator[str]] = None, execution_time: t.Optional[TimeLike] = None, + time_zone: t.Optional[str] = None, ) -> bool: """Audit models. @@ -2298,6 +2400,24 @@ def audit( Returns: False if any of the audits failed, True otherwise. """ + relative_tz, localize_start_ds, localize_end_ds = self._localize_ds_flags( + start, end, time_zone + ) + relative_base = to_datetime(execution_time or now(), relative_tz=relative_tz) + if ( + resolved_start := self._resolve_interval_bound( + start, relative_base=relative_base, time_zone=time_zone + ) + ) is not None: + start = resolved_start + if ( + resolved_end := self._resolve_interval_bound( + end, relative_base=relative_base, time_zone=time_zone + ) + ) is not None: + end = resolved_end + if execution_time and isinstance(execution_time, str): + execution_time = relative_base snapshots = ( [self.get_snapshot(model, raise_if_missing=True) for model in models] @@ -2317,6 +2437,9 @@ def audit( end=end, execution_time=execution_time, snapshots=self.snapshots, + relative_tz=relative_tz, + localize_start_ds=localize_start_ds, + localize_end_ds=localize_end_ds, ): audit_id = f"{audit_result.audit.name}" if audit_result.model: @@ -2378,6 +2501,7 @@ def check_intervals( select_models: t.Collection[str], start: t.Optional[TimeLike] = None, end: t.Optional[TimeLike] = None, + time_zone: t.Optional[str] = None, ) -> t.Dict[Snapshot, SnapshotIntervals]: """Check intervals for a given environment. @@ -2393,12 +2517,18 @@ def check_intervals( if not env: raise SQLMeshError(f"Environment '{environment}' was not found.") + relative_base = to_datetime(now(), relative_tz=self._relative_tz(time_zone)) + start = self._resolve_interval_bound( + start, relative_base=relative_base, time_zone=time_zone + ) + end = self._resolve_interval_bound(end, relative_base=relative_base, time_zone=time_zone) + snapshots = {k.name: v for k, v in self.state_sync.get_snapshots(env.snapshots).items()} missing = { k.name: v for k, v in missing_intervals( - snapshots.values(), start=start, end=end, execution_time=end + snapshots.values(), start=start, end=end, execution_time=end or relative_base ).items() } @@ -3126,6 +3256,7 @@ def _calculate_start_override_per_model( backfill_model_fqns: t.Optional[t.Set[str]], snapshots_by_model_fqn: t.Dict[str, Snapshot], end_override_per_model: t.Optional[t.Dict[str, datetime]], + time_zone: t.Optional[str] = None, ) -> t.Dict[str, datetime]: if not min_intervals or not backfill_model_fqns or not plan_start: # If there are no models to backfill, there are no intervals to consider for backfill, so we dont need to consider a minimum number @@ -3135,11 +3266,16 @@ def _calculate_start_override_per_model( start_overrides: t.Dict[str, datetime] = {} end_override_per_model = end_override_per_model or {} + relative_tz = self._relative_tz(time_zone) - plan_execution_time_dt = to_datetime(plan_execution_time) - plan_start_dt = to_datetime(plan_start, relative_base=plan_execution_time_dt) + plan_execution_time_dt = to_datetime(plan_execution_time, relative_tz=relative_tz) + plan_start_dt = to_datetime( + plan_start, relative_base=plan_execution_time_dt, relative_tz=relative_tz + ) plan_end_dt = to_datetime( - plan_end or plan_execution_time_dt, relative_base=plan_execution_time_dt + plan_end or plan_execution_time_dt, + relative_base=plan_execution_time_dt, + relative_tz=relative_tz, ) # we need to take the DAG into account so that parent models can be expanded to cover at least as much as their children diff --git a/sqlmesh/core/plan/builder.py b/sqlmesh/core/plan/builder.py index 01834594cd..b06876c2a3 100644 --- a/sqlmesh/core/plan/builder.py +++ b/sqlmesh/core/plan/builder.py @@ -5,7 +5,7 @@ import typing as t from collections import defaultdict from functools import cached_property -from datetime import datetime +from datetime import datetime, tzinfo from sqlmesh.core.console import PlanBuilderConsole, get_console @@ -134,6 +134,8 @@ def __init__( console: t.Optional[PlanBuilderConsole] = None, user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None, selected_models: t.Optional[t.Set[str]] = None, + relative_tz: t.Optional[tzinfo] = None, + time_zone: t.Optional[str] = None, ): self._context_diff = context_diff self._no_gaps = no_gaps @@ -177,6 +179,8 @@ def __init__( self._user_provided_flags = user_provided_flags self._selected_models = selected_models self._explain = explain + self._relative_tz = relative_tz + self._time_zone = time_zone self._start = start if not self._start and ( @@ -208,14 +212,22 @@ def is_start_and_end_allowed(self) -> bool: def start(self) -> t.Optional[TimeLike]: if self._start and is_relative(self._start): # only do this for relative expressions otherwise inclusive date strings like '2020-01-01' can be turned into exclusive timestamps eg '2020-01-01 00:00:00' - return to_datetime(self._start, relative_base=to_datetime(self.execution_time)) + return to_datetime( + self._start, + relative_base=to_datetime(self.execution_time, relative_tz=self._relative_tz), + relative_tz=self._relative_tz, + ) return self._start @property def end(self) -> t.Optional[TimeLike]: if self._end and is_relative(self._end): # only do this for relative expressions otherwise inclusive date strings like '2020-01-01' can be turned into exclusive timestamps eg '2020-01-01 00:00:00' - return to_datetime(self._end, relative_base=to_datetime(self.execution_time)) + return to_datetime( + self._end, + relative_base=to_datetime(self.execution_time, relative_tz=self._relative_tz), + relative_tz=self._relative_tz, + ) return self._end @cached_property @@ -356,6 +368,7 @@ def build(self) -> Plan: ignore_cron=self._ignore_cron, user_provided_flags=self._user_provided_flags, selected_models=self._selected_models, + time_zone=self._time_zone, ) self._latest_plan = plan return plan diff --git a/sqlmesh/core/plan/definition.py b/sqlmesh/core/plan/definition.py index 866299eff8..c7d3860162 100644 --- a/sqlmesh/core/plan/definition.py +++ b/sqlmesh/core/plan/definition.py @@ -83,6 +83,8 @@ class Plan(PydanticModel, frozen=True): user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None selected_models: t.Optional[t.Set[str]] = None """Models that have been selected for this plan (used for dbt selected_resources)""" + time_zone: t.Optional[str] = None + """IANA timezone used to interpret relative start, end, and execution-time values.""" @cached_property def start(self) -> TimeLike: @@ -299,6 +301,7 @@ def to_evaluatable(self) -> EvaluatablePlan: environment_statements=self.context_diff.environment_statements, user_provided_flags=self.user_provided_flags, selected_models=self.selected_models, + time_zone=self.time_zone, ) @cached_property @@ -338,6 +341,7 @@ class EvaluatablePlan(PydanticModel): environment_statements: t.Optional[t.List[EnvironmentStatements]] = None user_provided_flags: t.Optional[t.Dict[str, UserProvidedFlags]] = None selected_models: t.Optional[t.Set[str]] = None + time_zone: t.Optional[str] = None def is_selected_for_backfill(self, model_fqn: str) -> bool: return self.models_to_backfill is None or model_fqn in self.models_to_backfill diff --git a/sqlmesh/core/renderer.py b/sqlmesh/core/renderer.py index 9f403cbcb4..cb4fe689db 100644 --- a/sqlmesh/core/renderer.py +++ b/sqlmesh/core/renderer.py @@ -138,6 +138,9 @@ def _render( kwargs["views"] = views this_model = kwargs.pop("this_model", None) + relative_tz = kwargs.pop("relative_tz", None) + localize_start_ds = kwargs.pop("localize_start_ds", None) + localize_end_ds = kwargs.pop("localize_end_ds", None) this_snapshot = (snapshots or {}).get(self._model_fqn) if self._model_fqn else None if not this_model and self._model_fqn: @@ -182,16 +185,19 @@ def _resolve_table(table: str | exp.Table) -> str: ) start_time, end_time = ( - make_inclusive(start or c.EPOCH, end or c.EPOCH, self._dialect) + make_inclusive(start or c.EPOCH, end or c.EPOCH, self._dialect, relative_tz=relative_tz) if not self._only_execution_time else (None, None) ) render_kwargs = { **date_dict( - to_datetime(execution_time or c.EPOCH), + to_datetime(execution_time or c.EPOCH, relative_tz=relative_tz), start_time, end_time, + relative_tz=relative_tz, + localize_start_ds=localize_start_ds, + localize_end_ds=localize_end_ds, ), **kwargs, } diff --git a/sqlmesh/core/snapshot/evaluator.py b/sqlmesh/core/snapshot/evaluator.py index 497763533b..0d580fc99d 100644 --- a/sqlmesh/core/snapshot/evaluator.py +++ b/sqlmesh/core/snapshot/evaluator.py @@ -1405,17 +1405,17 @@ def _audit( adapter = self.get_adapter(snapshot.model_gateway) - kwargs = { - "start": start, - "end": end, - "execution_time": execution_time, - "snapshots": snapshots, - "deployability_index": deployability_index, - "engine_adapter": adapter, - "runtime_stage": RuntimeStage.AUDITING, + kwargs = dict( + start=start, + end=end, + execution_time=execution_time, + snapshots=snapshots, + deployability_index=deployability_index, + engine_adapter=adapter, + runtime_stage=RuntimeStage.AUDITING, **audit_args, **kwargs, - } + ) if snapshot.is_model: query = snapshot.model.render_audit_query(audit, **kwargs) diff --git a/sqlmesh/utils/date.py b/sqlmesh/utils/date.py index bdc15125d4..c745d670dd 100644 --- a/sqlmesh/utils/date.py +++ b/sqlmesh/utils/date.py @@ -4,6 +4,7 @@ import time import typing as t import warnings +import zoneinfo from datetime import date, datetime, timedelta, timezone, tzinfo @@ -13,6 +14,7 @@ from sqlglot import exp from sqlmesh.utils import ttl_cache +from sqlmesh.utils.errors import ConfigError if t.TYPE_CHECKING: import pandas as pd @@ -46,6 +48,34 @@ } +def parse_time_zone(tz: t.Optional[str]) -> t.Optional[zoneinfo.ZoneInfo]: + """Parse an IANA timezone name. None means UTC/default behavior.""" + if not tz or tz == "UTC": + return None + + try: + return zoneinfo.ZoneInfo(tz) + except Exception as e: + available_timezones = zoneinfo.available_timezones() + + if available_timezones: + raise ConfigError(f"{e}. {tz} must be a valid IANA timezone.") + raise ConfigError( + f"{e}. IANA time zone data is not available on your system. " + "`pip install tzdata` to leverage time zones." + ) + + +def _floor_to_local_midnight(dt: datetime, local_tz: tzinfo) -> datetime: + local_dt = dt.astimezone(local_tz) + return local_dt.replace(hour=0, minute=0, second=0, microsecond=0) + + +def _timezone_name(tzinfo_: tzinfo) -> str: + key = getattr(tzinfo_, "key", None) + return key if key else "UTC" + + def now(minute_floor: bool = True) -> datetime: """ Current utc datetime with optional minute level accuracy / granularity. @@ -121,6 +151,7 @@ def to_timestamp( value: TimeLike, relative_base: t.Optional[datetime] = None, check_categorical_relative_expression: bool = True, + relative_tz: t.Optional[tzinfo] = None, ) -> int: """ Converts a value into an epoch millis timestamp. @@ -129,6 +160,7 @@ def to_timestamp( value: A variety of date formats. If value is a string, it must be in iso format. relative_base: The datetime to reference for time expressions that are using relative terms check_categorical_relative_expression: If True, takes into account the relative expressions that are categorical. + relative_tz: Timezone for interpreting relative/categorical date strings. Returns: Epoch millis timestamp. @@ -138,6 +170,7 @@ def to_timestamp( value, relative_base=relative_base, check_categorical_relative_expression=check_categorical_relative_expression, + relative_tz=relative_tz, ).timestamp() * 1000 ) @@ -149,6 +182,7 @@ def to_datetime( relative_base: t.Optional[datetime] = None, check_categorical_relative_expression: bool = True, tz: t.Optional[tzinfo] = None, + relative_tz: t.Optional[tzinfo] = None, ) -> datetime: """Converts a value into a UTC datetime object. @@ -157,6 +191,8 @@ def to_datetime( relative_base: The datetime to reference for time expressions that are using relative terms. check_categorical_relative_expression: If True, takes into account the relative expressions that are categorical. tz: Timezone to convert datetime to, defaults to utc + relative_tz: Timezone for interpreting relative/categorical date strings. Parsed values are + converted to UTC before the output timezone is applied. Raises: ValueError if value cannot be converted to a datetime. @@ -169,7 +205,13 @@ def to_datetime( elif isinstance(value, date): dt = datetime(value.year, value.month, value.day) elif isinstance(value, exp.Expr): - return to_datetime(value.name) + return to_datetime( + value.name, + relative_base=relative_base, + check_categorical_relative_expression=check_categorical_relative_expression, + tz=tz, + relative_tz=relative_tz, + ) else: try: epoch = float(value) @@ -179,17 +221,37 @@ def to_datetime( if epoch is None: relative_base = relative_base or now() expression = str(value) - if check_categorical_relative_expression and is_categorical_relative_expression( + parse_timezone = "UTC" + parse_relative_base: datetime = relative_base + use_relative_tz = bool( + relative_tz + and check_categorical_relative_expression + and is_categorical_relative_expression(expression) + ) + + if use_relative_tz: + assert relative_tz is not None + parse_timezone = _timezone_name(relative_tz) + parse_relative_base = _floor_to_local_midnight(relative_base, relative_tz) + elif check_categorical_relative_expression and is_categorical_relative_expression( expression ): - relative_base = relative_base.replace(hour=0, minute=0, second=0, microsecond=0) + parse_relative_base = relative_base.replace( + hour=0, minute=0, second=0, microsecond=0 + ) # note: we hardcode TIMEZONE: UTC to work around this bug: https://github.com/scrapinghub/dateparser/issues/896 # where dateparser just silently fails if it cant interpret the contents of /etc/localtime - # this works because SQLMesh only deals with UTC, there is no concept of user local time dt = dateparser.parse( - expression, settings={"RELATIVE_BASE": relative_base, "TIMEZONE": "UTC"} + expression, + settings={"RELATIVE_BASE": parse_relative_base, "TIMEZONE": parse_timezone}, ) + + if use_relative_tz and dt is not None: + assert relative_tz is not None + if not dt.tzinfo: + dt = dt.replace(tzinfo=relative_tz) + dt = dt.astimezone(UTC) else: try: dt = datetime.strptime(str(value), DATE_INT_FMT) @@ -225,6 +287,9 @@ def date_dict( execution_time: TimeLike, start: t.Optional[TimeLike], end: t.Optional[TimeLike], + relative_tz: t.Optional[tzinfo] = None, + localize_start_ds: t.Optional[bool] = None, + localize_end_ds: t.Optional[bool] = None, ) -> t.Dict[str, TimeLike]: """Creates a kwarg dictionary of datetime variables for use in SQL Contexts. @@ -234,38 +299,94 @@ def date_dict( execution_time: Execution time. start: Start time. end: End time. + relative_tz: Timezone for interpreting relative/categorical date strings. + localize_start_ds: Whether @start_ds/@start_date use the local calendar date in + `relative_tz`. Defaults to True when `start` is a categorical relative string. + localize_end_ds: Whether @end_ds/@end_date use the local calendar date in + `relative_tz`. Defaults to True when `end` is a categorical relative string. Returns: A dictionary with various keys pointing to datetime formats. """ + if localize_start_ds is None: + localize_start_ds = bool( + relative_tz + and start is not None + and isinstance(start, str) + and is_categorical_relative_expression(start) + ) + if localize_end_ds is None: + localize_end_ds = bool( + relative_tz + and end is not None + and isinstance(end, str) + and is_categorical_relative_expression(end) + ) + kwargs: t.Dict[str, t.Union[str, datetime, date, float, int]] = {} - execution_dt = to_datetime(execution_time) - prefixes = [ + execution_dt = to_datetime(execution_time, relative_tz=relative_tz) + prefixes: t.List[t.Tuple[str, TimeLike]] = [ ("latest", execution_dt), # TODO: Preserved for backward compatibility. Remove in 1.0.0. ("execution", execution_dt), ] if start is not None: - prefixes.append(("start", to_datetime(start))) + prefixes.append( + ( + "start", + to_datetime( + start, + relative_base=execution_dt, + relative_tz=relative_tz, + ), + ) + ) if end is not None: - prefixes.append(("end", to_datetime(end))) + prefixes.append( + ( + "end", + to_datetime( + end, + relative_base=execution_dt, + relative_tz=relative_tz, + ), + ) + ) for prefix, time_like in prefixes: - dt = to_datetime(time_like) + dt = ( + to_datetime(time_like, relative_tz=relative_tz) + if isinstance(time_like, str) + else to_datetime(time_like) + ) dtntz = dt.replace(tzinfo=None) - millis = to_timestamp(time_like) + millis = ( + to_timestamp(time_like, relative_tz=relative_tz) + if isinstance(time_like, str) + else to_timestamp(time_like) + ) kwargs[f"{prefix}_dt"] = dt kwargs[f"{prefix}_dtntz"] = dtntz - kwargs[f"{prefix}_date"] = to_date(dt) - kwargs[f"{prefix}_ds"] = to_ds(time_like) + localize_ds = (prefix == "start" and localize_start_ds) or ( + prefix == "end" and localize_end_ds + ) + if localize_ds and relative_tz: + local_dt = dt.astimezone(relative_tz) + local_date = local_dt.date() + kwargs[f"{prefix}_date"] = local_date + kwargs[f"{prefix}_ds"] = local_date.strftime("%Y-%m-%d") + kwargs[f"{prefix}_hour"] = local_dt.hour + else: + kwargs[f"{prefix}_date"] = to_date(dt) + kwargs[f"{prefix}_ds"] = to_ds(time_like) + kwargs[f"{prefix}_hour"] = dt.hour kwargs[f"{prefix}_ts"] = to_ts(dt) kwargs[f"{prefix}_tstz"] = to_tstz(dt) kwargs[f"{prefix}_epoch"] = millis / 1000 kwargs[f"{prefix}_millis"] = millis - kwargs[f"{prefix}_hour"] = dt.hour return kwargs @@ -298,7 +419,10 @@ def is_date(obj: TimeLike) -> bool: def make_inclusive( - start: TimeLike, end: TimeLike, dialect: t.Optional[DialectType] = "" + start: TimeLike, + end: TimeLike, + dialect: t.Optional[DialectType] = "", + relative_tz: t.Optional[tzinfo] = None, ) -> DatetimeRange: """Adjust start and end times to to become inclusive datetimes. @@ -324,20 +448,27 @@ def make_inclusive( Returns: A tuple of inclusive datetime objects. """ - return (to_datetime(start), make_inclusive_end(end, dialect=dialect)) + return ( + to_datetime(start, relative_tz=relative_tz), + make_inclusive_end(end, dialect=dialect, relative_tz=relative_tz), + ) -def make_inclusive_end(end: TimeLike, dialect: t.Optional[DialectType] = "") -> datetime: +def make_inclusive_end( + end: TimeLike, + dialect: t.Optional[DialectType] = "", + relative_tz: t.Optional[tzinfo] = None, +) -> datetime: import pandas as pd - exclusive_end = make_exclusive(end) + exclusive_end = make_exclusive(end, relative_tz=relative_tz) if dialect == "tsql": return to_utc_timestamp(exclusive_end) - pd.Timedelta(1, unit="ns") return exclusive_end - timedelta(microseconds=1) -def make_exclusive(time: TimeLike) -> datetime: - dt = to_datetime(time) +def make_exclusive(time: TimeLike, relative_tz: t.Optional[tzinfo] = None) -> datetime: + dt = to_datetime(time, relative_tz=relative_tz) if is_date(time): dt = dt + timedelta(days=1) return dt diff --git a/sqlmesh/utils/java.py b/sqlmesh/utils/java.py new file mode 100644 index 0000000000..0ddb87c6ed --- /dev/null +++ b/sqlmesh/utils/java.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import re +import subprocess +import typing as t + +from sqlmesh.utils import ttl_cache + + +@ttl_cache() +def java_major_version() -> t.Optional[int]: + """Return the major Java version, or None if it cannot be determined.""" + try: + proc = subprocess.run(["java", "-version"], capture_output=True, text=True, check=False) + output = proc.stderr or proc.stdout + if match := re.search(r'version "(\d+)(?:\.(\d+))?', output): + major = int(match.group(1)) + if major == 1 and match.group(2): + return int(match.group(2)) + return major + except Exception: + pass + return None + + +def is_spark_java_supported() -> bool: + """Spark's bundled Hadoop cannot initialize on Java 24+.""" + major = java_major_version() + if major is None: + return True + return major < 24 + + +def spark_java_options(extra: str = "") -> str: + """Return JVM options needed for Spark on newer JDK releases.""" + options: t.List[str] = [] + if java_major_version() == 23: + options.append("-Djava.security.manager=allow") + if extra: + options.append(extra.strip()) + return " ".join(options) diff --git a/tests/cli/test_cli.py b/tests/cli/test_cli.py index 5e0737e1b6..ea56609a1b 100644 --- a/tests/cli/test_cli.py +++ b/tests/cli/test_cli.py @@ -352,6 +352,41 @@ def test_plan_dev_start_date(runner, tmp_path): assert "sqlmesh_example__dev.incremental_model: [2023-01-01" in result.output +def test_plan_accepts_time_zone_option(runner, tmp_path): + create_example_project(tmp_path) + + result = runner.invoke( + cli, + ["--log-file-dir", tmp_path, "--paths", tmp_path, "plan", "--help"], + ) + assert result.exit_code == 0 + assert "--time-zone" in result.output + + +@time_machine.travel("2023-01-20 12:30:30 UTC", tick=False) +def test_plan_rejects_invalid_time_zone(runner, tmp_path): + create_example_project(tmp_path) + + result = runner.invoke( + cli, + [ + "--log-file-dir", + tmp_path, + "--paths", + tmp_path, + "plan", + "dev", + "--start", + "1 week ago", + "--time-zone", + "Not/A_Timezone", + "--skip-tests", + "--no-prompts", + ], + ) + assert result.exit_code != 0 + + def test_plan_dev_end_date(runner, tmp_path): create_example_project(tmp_path) diff --git a/tests/conftest.py b/tests/conftest.py index 4d1bb23577..d44189cf9f 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -652,3 +652,13 @@ def _lax_get_connection(self, gateway_name: t.Optional[str] = None) -> Connectio with ctx: yield + + +def pytest_runtest_setup(item: pytest.Item) -> None: + if "pyspark" not in item.keywords: + return + + from sqlmesh.utils.java import is_spark_java_supported + + if not is_spark_java_supported(): + pytest.skip("Spark is not supported on Java 24+ with bundled Hadoop dependencies.") diff --git a/tests/core/test_config.py b/tests/core/test_config.py index 8c81a90b8d..ed5c8f15bc 100644 --- a/tests/core/test_config.py +++ b/tests/core/test_config.py @@ -323,6 +323,17 @@ def test_load_config_from_env_no_config_vars(): assert load_config_from_env() == {} +def test_config_time_zone(): + config = Config(time_zone="America/Los_Angeles") + assert config.time_zone == "America/Los_Angeles" + + config = Config(time_zone="UTC") + assert config.time_zone is None + + with pytest.raises(ConfigError, match="valid IANA timezone"): + Config(time_zone="Not/A_Timezone") + + def test_load_config_from_env_invalid_variable_name(): with mock.patch.dict( os.environ, diff --git a/tests/core/test_context.py b/tests/core/test_context.py index 23646a6083..41ad25687e 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -522,6 +522,188 @@ def test_plan_execution_time_start_end(): assert to_datetime(dev_plan.end) == to_datetime("2020-01-04") # end relative to execution_time +@time_machine.travel("2023-01-20 12:30:30 UTC", tick=False) +def test_plan_relative_start_with_config_time_zone(): + context = Context(config=Config(time_zone="America/Los_Angeles")) + context.upsert_model( + load_sql_based_model( + parse( + """ + MODEL( + name db.x, + start '2020-01-01', + kind INCREMENTAL_BY_TIME_RANGE ( + time_column ds + ), + cron '@daily' + ); + + SELECT id, ds FROM (VALUES ('1', '2020-01-01')) data(id, ds) + WHERE ds BETWEEN @start_ds AND @end_ds + """ + ) + ) + ) + + dev_plan = context.plan( + "dev", start="1 week ago", execution_time="2023-01-20 12:30:30", skip_tests=True + ) + + assert to_datetime(dev_plan.start) == to_datetime("2023-01-13 08:00:00") + + +@time_machine.travel("2023-01-20 12:30:30 UTC", tick=False) +def test_plan_relative_start_with_time_zone_override(): + context = Context(config=Config(time_zone="America/Los_Angeles")) + context.upsert_model( + load_sql_based_model( + parse( + """ + MODEL( + name db.x, + start '2020-01-01', + kind INCREMENTAL_BY_TIME_RANGE ( + time_column ds + ), + cron '@daily' + ); + + SELECT id, ds FROM (VALUES ('1', '2020-01-01')) data(id, ds) + WHERE ds BETWEEN @start_ds AND @end_ds + """ + ) + ) + ) + + dev_plan = context.plan( + "dev", + start="1 week ago", + execution_time="2023-01-20 12:30:30", + skip_tests=True, + time_zone="UTC", + ) + + assert to_datetime(dev_plan.start) == to_datetime("2023-01-13") + + +@time_machine.travel("2023-01-20 12:30:30 UTC", tick=False) +def test_render_relative_start_uses_local_ds_with_time_zone(): + context = Context(config=Config(time_zone="America/Los_Angeles")) + model = load_sql_based_model( + parse( + """ + MODEL( + name db.x, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column ds + ), + cron '@daily', + start '2020-01-01' + ); + + SELECT @start_ds AS start_ds, CAST('2020-01-01' AS DATE) AS ds + """ + ) + ) + context.upsert_model(model) + + rendered = context.render( + model, + start="1 week ago", + end="2023-01-20", + execution_time="2023-01-20 12:30:30", + ) + assert "'2023-01-13'" in rendered.sql() + + +@time_machine.travel("2023-01-20 12:30:30 UTC", tick=False) +def test_render_absolute_start_uses_utc_ds_with_time_zone(): + context = Context(config=Config(time_zone="America/Los_Angeles")) + model = load_sql_based_model( + parse( + """ + MODEL( + name db.x, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column ds + ), + cron '@daily', + start '2020-01-01' + ); + + SELECT @start_ds AS start_ds, CAST('2020-01-01' AS DATE) AS ds + """ + ) + ) + context.upsert_model(model) + + rendered = context.render( + model, + start="2023-01-13", + end="2023-01-20", + execution_time="2023-01-20 12:30:30", + ) + assert "'2023-01-13'" in rendered.sql() + + +@time_machine.travel("2023-01-20 12:30:30 UTC", tick=False) +def test_backfill_uses_utc_start_ds_with_config_time_zone(): + context = Context(config=Config(time_zone="America/Los_Angeles")) + model = load_sql_based_model( + parse( + """ + MODEL( + name db.x, + start '2023-01-01', + kind INCREMENTAL_BY_TIME_RANGE ( + time_column ds + ), + cron '@daily' + ); + + SELECT @start_ds AS interval_start_ds, CAST(@start_ds AS DATE) AS ds + """ + ) + ) + context.upsert_model(model) + + context.plan( + "dev", + start="2023-01-19", + end="2023-01-19", + execution_time="2023-01-20 12:30:30", + auto_apply=True, + no_prompts=True, + skip_tests=True, + ) + + df = context.fetchdf("SELECT interval_start_ds FROM db__dev.x") + assert list(df["interval_start_ds"]) == ["2023-01-19"] + + +@time_machine.travel("2023-01-20 12:30:30 UTC", tick=False) +def test_plan_rejects_invalid_time_zone(): + context = Context(config=Config()) + context.upsert_model( + load_sql_based_model( + parse( + """ + MODEL( + name db.x, + start '2020-01-01', + kind FULL + ); + + SELECT 1 + """ + ) + ) + ) + + with pytest.raises(ConfigError, match="valid IANA timezone"): + context.plan("dev", start="1 week ago", time_zone="Not/A_Timezone", skip_tests=True) + + def test_override_builtin_audit_blocking_mode(): context = Context(config=Config()) context.upsert_model( diff --git a/tests/core/test_plan.py b/tests/core/test_plan.py index 590cda01ec..1a51304934 100644 --- a/tests/core/test_plan.py +++ b/tests/core/test_plan.py @@ -39,6 +39,7 @@ from sqlmesh.utils.dag import DAG from sqlmesh.utils.date import ( now, + parse_time_zone, to_date, to_datetime, to_timestamp, @@ -3379,6 +3380,46 @@ def _build_plan() -> Plan: assert to_datetime(plan.execution_time) == to_datetime(output_execution_time) +@time_machine.travel("2023-01-20 12:30:30 UTC", tick=False) +def test_plan_relative_dates_with_time_zone(make_snapshot: t.Callable) -> None: + snapshot_a = make_snapshot( + SqlModel(name="a", query=parse_one("select 1, ds"), dialect="duckdb") + ) + + context_diff = ContextDiff( + environment="test_environment", + is_new_environment=True, + is_unfinalized_environment=False, + normalize_environment_name=True, + create_from="prod", + create_from_env_exists=True, + added={snapshot_a.snapshot_id}, + removed_snapshots={}, + modified_snapshots={}, + snapshots={}, + new_snapshots={snapshot_a.snapshot_id: snapshot_a}, + previous_plan_id=None, + previously_promoted_snapshot_ids=set(), + previous_finalized_snapshots=None, + previous_gateway_managed_virtual_layer=False, + gateway_managed_virtual_layer=False, + environment_statements=[], + ) + + la_tz = parse_time_zone("America/Los_Angeles") + plan = PlanBuilder( + context_diff, + start="1 week ago", + execution_time="2023-01-20 12:30:30", + is_dev=True, + relative_tz=la_tz, + time_zone="America/Los_Angeles", + ).build() + + assert to_datetime(plan.start) == to_datetime("2023-01-13 08:00:00") + assert plan.time_zone == "America/Los_Angeles" + + def test_plan_builder_additive_change_error_blocks_plan(make_snapshot): """Test that additive changes block plan when on_additive_change=ERROR.""" # Create models with actual schema differences diff --git a/tests/core/test_test.py b/tests/core/test_test.py index d679f09393..c60f9b21e4 100644 --- a/tests/core/test_test.py +++ b/tests/core/test_test.py @@ -1720,12 +1720,16 @@ def test_generate_input_data_using_sql(mocker: MockerFixture, tmp_path: Path) -> @pytest.mark.pyspark def test_pyspark_python_model(tmp_path: Path) -> None: + from sqlmesh.utils.java import spark_java_options + spark_connection_config = SparkConnectionConfig( config={ "spark.master": "local", "spark.driver.memory": "512m", "spark.sql.warehouse.dir": f"{tmp_path}/data_dir", - "spark.driver.extraJavaOptions": f"-Dderby.system.home={tmp_path}/derby_dir", + "spark.driver.extraJavaOptions": spark_java_options( + f"-Dderby.system.home={tmp_path}/derby_dir" + ), }, ) config = Config( diff --git a/tests/engines/spark/conftest.py b/tests/engines/spark/conftest.py index ce6a99ea35..eaf6eda5f4 100644 --- a/tests/engines/spark/conftest.py +++ b/tests/engines/spark/conftest.py @@ -3,15 +3,25 @@ import pytest from pyspark.sql import SparkSession +from sqlmesh.utils.java import is_spark_java_supported, spark_java_options + +pytestmark = [pytest.mark.slow, pytest.mark.pyspark] + @pytest.fixture(scope="session") def spark_session() -> t.Generator[SparkSession, None, None]: - session = ( + if not is_spark_java_supported(): + pytest.skip("Spark is not supported on Java 24+ with bundled Hadoop dependencies.") + + builder = ( SparkSession.builder.master("local") .appName("SQLMesh Test") .config("spark.driver.memory", "512m") - .enableHiveSupport() - .getOrCreate() ) + java_options = spark_java_options() + if java_options: + builder = builder.config("spark.driver.extraJavaOptions", java_options) + + session = builder.enableHiveSupport().getOrCreate() yield session session.stop() diff --git a/tests/utils/test_date.py b/tests/utils/test_date.py index cb35a6973c..6bccaa4408 100644 --- a/tests/utils/test_date.py +++ b/tests/utils/test_date.py @@ -6,6 +6,7 @@ from sqlglot import exp import pandas as pd # noqa: TID253 +from sqlmesh.utils.errors import ConfigError from sqlmesh.utils.date import ( UTC, TimeLike, @@ -14,6 +15,7 @@ is_categorical_relative_expression, is_relative, make_inclusive, + parse_time_zone, to_datetime, to_time_column, to_timestamp, @@ -64,10 +66,102 @@ def test_to_datetime_with_expressions(expression, result) -> None: assert to_datetime(expression) == result -def test_to_timestamp() -> None: +@time_machine.travel("2023-01-20 12:30:30 UTC", tick=False) +def test_to_datetime_with_relative_tz() -> None: + la_tz = parse_time_zone("America/Los_Angeles") + assert to_datetime("1 week ago") == datetime(2023, 1, 13, 0, 0, 0, tzinfo=UTC) + assert to_datetime("1 week ago", relative_tz=la_tz) == datetime( + 2023, 1, 13, 8, 0, 0, tzinfo=UTC + ) + assert to_datetime("2020-01-01", relative_tz=la_tz) == datetime(2020, 1, 1, tzinfo=UTC) + + +@time_machine.travel("2023-01-20 15:00:00 UTC", tick=False) +def test_to_datetime_with_relative_tz_tokyo() -> None: + tokyo_tz = parse_time_zone("Asia/Tokyo") + assert to_datetime("1 day ago", relative_tz=tokyo_tz) == datetime( + 2023, 1, 19, 15, 0, 0, tzinfo=UTC + ) + + +@time_machine.travel("2023-01-20 15:00:00 UTC", tick=False) +def test_date_dict_with_relative_tz() -> None: + tokyo_tz = parse_time_zone("Asia/Tokyo") + resp = date_dict( + "2023-01-20 15:00:00", + "1 day ago", + "2023-01-20", + relative_tz=tokyo_tz, + ) + assert resp["start_dt"] == datetime(2023, 1, 19, 15, 0, 0, tzinfo=UTC) + assert resp["start_ds"] == "2023-01-20" + assert resp["start_date"] == date(2023, 1, 20) + assert resp["execution_ds"] == "2023-01-20" + assert resp["end_ds"] == "2023-01-20" + + +def test_parse_time_zone() -> None: + assert parse_time_zone(None) is None + assert parse_time_zone("UTC") is None + la_tz = parse_time_zone("America/Los_Angeles") + assert la_tz is not None + assert la_tz.key == "America/Los_Angeles" + + with pytest.raises(ConfigError, match="valid IANA timezone"): + parse_time_zone("Not/A_Timezone") + + +def test_date_dict_interval_bounds_use_utc_ds_with_relative_tz() -> None: + tokyo_tz = parse_time_zone("Asia/Tokyo") + interval_start = to_datetime("2023-01-20 00:00:00") + resp = date_dict( + "2023-01-20 15:00:00", + interval_start, + interval_start, + relative_tz=tokyo_tz, + ) + assert resp["start_ds"] == "2023-01-20" + assert resp["start_dt"] == interval_start + + +def test_date_dict_absolute_start_uses_utc_ds_with_relative_tz() -> None: + la_tz = parse_time_zone("America/Los_Angeles") + resp = date_dict( + "2023-01-20 12:00:00", + "2023-01-13", + "2023-01-20", + relative_tz=la_tz, + ) + assert resp["start_ds"] == "2023-01-13" + assert resp["end_ds"] == "2023-01-20" + + +def test_to_timestamp_from_date_string() -> None: assert to_timestamp("2020-01-01") == 1577836800000 +@time_machine.travel("2023-03-13 12:00:00 UTC", tick=False) +def test_to_datetime_relative_tz_spring_forward_dst() -> None: + la_tz = parse_time_zone("America/Los_Angeles") + assert to_datetime("1 day ago", relative_tz=la_tz) == datetime(2023, 3, 12, 8, 0, 0, tzinfo=UTC) + + +@time_machine.travel("2023-11-06 12:00:00 UTC", tick=False) +def test_to_datetime_relative_tz_fall_back_dst() -> None: + la_tz = parse_time_zone("America/Los_Angeles") + assert to_datetime("1 day ago", relative_tz=la_tz) == datetime(2023, 11, 5, 7, 0, 0, tzinfo=UTC) + + +@time_machine.travel("2023-01-20 12:30:00 UTC", tick=False) +def test_to_datetime_hour_relative_ignores_time_zone() -> None: + la_tz = parse_time_zone("America/Los_Angeles") + relative_base = to_datetime("2023-01-20 12:30:00") + without_tz = to_datetime("2 hours ago", relative_base=relative_base) + with_tz = to_datetime("2 hours ago", relative_base=relative_base, relative_tz=la_tz) + assert with_tz == without_tz + assert with_tz == datetime(2023, 1, 20, 10, 30, 0, tzinfo=UTC) + + @pytest.mark.parametrize( "start_in, end_in, start_out, end_out", [