Skip to content

Commit

Permalink
Merge pull request #33 from airflow-laminar/tkp/tz
Browse files Browse the repository at this point in the history
Allow for timezones in start/end dates, fixes #31
  • Loading branch information
timkpaine authored Jan 1, 2025
2 parents 257fea5 + 12c9738 commit 8e20f9f
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 9 deletions.
31 changes: 24 additions & 7 deletions airflow_config/configuration/airflow.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from datetime import datetime, timedelta
from typing import Dict, List, Literal, Optional, Union
from typing import Annotated, Any, Dict, List, Literal, Optional, Tuple, Union

from pydantic import BaseModel, Extra, Field
from pydantic import AfterValidator, BaseModel, Field
from pytz import timezone

from .utils import ImportPath, RelativeDelta

Expand All @@ -22,6 +23,20 @@
ScheduleArg = Union[timedelta, RelativeDelta, Literal["NOTSET"], str, None]


def _datetime_or_datetime_and_timezone(val: Any):
if isinstance(val, datetime):
return val
elif isinstance(val, (tuple,)):
dt = val[0]
tz = timezone(val[1])
dt = dt.replace(tzinfo=tz)
return dt
raise ValueError(f"Expected datetime or Dict[str, datetime|str], got {val!r}")


DatetimeArg = Annotated[Union[datetime, Tuple[datetime, str]], AfterValidator(_datetime_or_datetime_and_timezone)]


class TaskArgs(BaseModel):
# Operator Args
# https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/baseoperator/index.html#airflow.models.baseoperator.BaseOperator
Expand All @@ -39,11 +54,11 @@ class TaskArgs(BaseModel):
# description="allow progressively longer waits between retries by using exponential backoff algorithm on retry delay (delay will be converted into seconds)",
# )
# max_retry_delay: Optional[timedelta] = Field(default=None, description="maximum delay interval between retries")
start_date: Optional[datetime] = Field(
start_date: Optional[DatetimeArg] = Field(
default=None,
description="The start_date for the task, determines the execution_date for the first task instance. The best practice is to have the start_date rounded to your DAG’s schedule_interval. Daily jobs have their start_date some day at 00:00:00, hourly jobs have their start_date at 00:00 of a specific hour. Note that Airflow simply looks at the latest execution_date and adds the schedule_interval to determine the next execution_date. It is also very important to note that different tasks’ dependencies need to line up in time. If task A depends on task B and their start_date are offset in a way that their execution_date don’t line up, A’s dependencies will never be met. If you are looking to delay a task, for example running a daily task at 2AM, look into the TimeSensor and TimeDeltaSensor. We advise against using dynamic start_date and recommend using fixed ones. Read the FAQ entry about start_date for more information.",
)
end_date: Optional[datetime] = Field(default=None, description="if specified, the scheduler won’t go beyond this date")
end_date: Optional[DatetimeArg] = Field(default=None, description="if specified, the scheduler won’t go beyond this date")
depends_on_past: Optional[bool] = Field(
default=None,
description="when set to true, task instances will run sequentially and only if the previous instance has succeeded or has been skipped. The task instance for the start_date is allowed to run.",
Expand Down Expand Up @@ -121,7 +136,7 @@ class TaskArgs(BaseModel):
class _TaskSpecificArgs(BaseModel): ...


class Task(TaskArgs, extra=Extra.allow):
class Task(TaskArgs, extra="allow"):
task_id: Optional[str] = Field(default=None, description="a unique, meaningful id for the task")
operator: ImportPath = Field(description="airflow operator path")
dependencies: Optional[List[str]] = Field(default=None, description="dependencies")
Expand All @@ -141,8 +156,10 @@ class DagArgs(BaseModel):
description="Defines the rules according to which DAG runs are scheduled. Can accept cron string, timedelta object, Timetable, or list of Dataset objects. If this is not provided, the DAG will be set to the default schedule timedelta(days=1). See also Customizing DAG Scheduling with Timetables.",
union_mode="left_to_right",
)
start_date: Optional[datetime] = Field(default=None, description="The timestamp from which the scheduler will attempt to backfill")
end_date: Optional[datetime] = Field(default=None, description="A date beyond which your DAG won’t run, leave to None for open-ended scheduling")
start_date: Optional[DatetimeArg] = Field(default=None, description="The timestamp from which the scheduler will attempt to backfill")
end_date: Optional[DatetimeArg] = Field(
default=None, description="A date beyond which your DAG won’t run, leave to None for open-ended scheduling"
)
# template_searchpath: Optional[List[str]] = Field(default_factory=None, description="This list of folders (non-relative) defines where jinja will look for your templates. Order matters. Note that jinja/airflow includes the path of your DAG file by default")
# template_undefined (type[jinja2.StrictUndefined]) – Template undefined type.
# user_defined_macros (dict | None) – a dictionary of macros that will be exposed in your jinja templates. For example, passing dict(foo='bar') to this argument allows you to {{ foo }} in all jinja templates related to this DAG. Note that you can pass any type of object here.
Expand Down
2 changes: 1 addition & 1 deletion airflow_config/tests/setups/good/options/config/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ default_args:
default_dag_args:
_target: airflow_config.DagArgs
schedule: "01:10"
start_date: "2024-01-01"
start_date: ["2024-01-01", "America/New_York"]
catchup: false
tags: ["utility", "test"]
3 changes: 2 additions & 1 deletion airflow_config/tests/setups/good/options/test_options.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from datetime import datetime, timedelta

from airflow.timetables.interval import DeltaDataIntervalTimetable
from pytz import timezone

from airflow_config import DAG, create_dag, load_config

Expand All @@ -13,7 +14,7 @@ def test_config_and_options():
assert conf.default_args.email_on_retry is False
assert conf.default_args.retries == 0
assert conf.default_args.depends_on_past is False
assert conf.default_dag_args.start_date == datetime(2024, 1, 1)
assert conf.default_dag_args.start_date == datetime(2024, 1, 1, tzinfo=timezone("America/New_York"))
assert conf.default_dag_args.catchup is False
assert conf.default_dag_args.tags == ["utility", "test"]

Expand Down

0 comments on commit 8e20f9f

Please sign in to comment.