diff --git a/CHANGELOG.md b/CHANGELOG.md index 13ffe18..3267d8a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed + +- Rename `Args` to `JobArgs` and add `JobArgsWithInsertOpts` protocol. [PR #20](https://github.com/riverqueue/river/pull/20). + ## [0.1.2] - 2024-07-04 ### Changed diff --git a/README.md b/README.md index 576ddc2..091eafd 100644 --- a/README.md +++ b/README.md @@ -38,10 +38,10 @@ insert_res = client.insert( insert_res.job # inserted job row ``` -Job args should comply with the following [protocol](https://peps.python.org/pep-0544/): +Job args should comply with the `riverqueue.JobArgs` [protocol](https://peps.python.org/pep-0544/): ```python -class Args(Protocol): +class JobArgs(Protocol): kind: str def to_json(self) -> str: @@ -137,7 +137,7 @@ with engine.begin() as session: ) ``` -## Asynchronous I/O (`asyncio`) +## Asynchronous I/O (asyncio) The package supports River's [`asyncio` (asynchronous I/O)](https://docs.python.org/3/library/asyncio.html) through an alternate `AsyncClient` and `riversqlalchemy.AsyncDriver`. You'll need to make sure to use SQLAlchemy's alternative async engine and an asynchronous Postgres driver like [`asyncpg`](https://github.com/MagicStack/asyncpg), but otherwise usage looks very similar to use without async: diff --git a/src/riverqueue/__init__.py b/src/riverqueue/__init__.py index 3de5ca9..e096686 100644 --- a/src/riverqueue/__init__.py +++ b/src/riverqueue/__init__.py @@ -8,7 +8,8 @@ JOB_STATE_RUNNING as JOB_STATE_RUNNING, JOB_STATE_SCHEDULED as JOB_STATE_SCHEDULED, AsyncClient as AsyncClient, - Args as Args, + JobArgs as JobArgs, + JobArgsWithInsertOpts as JobArgsWithInsertOpts, Client as Client, InsertManyParams as InsertManyParams, InsertOpts as InsertOpts, diff --git a/src/riverqueue/client.py b/src/riverqueue/client.py index ee510c8..8c52796 100644 --- a/src/riverqueue/client.py +++ b/src/riverqueue/client.py @@ -1,6 +1,16 @@ from dataclasses import dataclass from datetime import datetime, timezone, timedelta -from typing import Any, Awaitable, Literal, Optional, Protocol, Tuple, List, Callable +from typing import ( + Any, + Awaitable, + Literal, + Optional, + Protocol, + Tuple, + List, + Callable, + runtime_checkable, +) from .driver import GetParams, JobInsertParams, DriverProtocol, ExecutorProtocol from .driver.driver_protocol import AsyncDriverProtocol, AsyncExecutorProtocol @@ -27,27 +37,44 @@ ] -class Args(Protocol): +@dataclass +class InsertOpts: + max_attempts: Optional[int] = None + priority: Optional[int] = None + queue: Optional[str] = None + scheduled_at: Optional[datetime] = None + tags: Optional[List[Any]] = None + unique_opts: Optional["UniqueOpts"] = None + + +class JobArgs(Protocol): + """ + Protocol that should be implemented by all job args. + """ + kind: str def to_json(self) -> str: pass -@dataclass -class InsertManyParams: - args: Args - insert_opts: Optional["InsertOpts"] = None +@runtime_checkable +class JobArgsWithInsertOpts(Protocol): + """ + Protocol that's optionally implemented by a JobArgs implementation so that + every inserted instance of them provides the same custom `InsertOpts`. + `InsertOpts` passed to insert functions will take precedence of one returned + by `JobArgsWithInsertOpts`. + """ + + def insert_opts(self) -> InsertOpts: + pass @dataclass -class InsertOpts: - max_attempts: Optional[int] = None - priority: Optional[int] = None - queue: Optional[str] = None - scheduled_at: Optional[datetime] = None - tags: Optional[List[Any]] = None - unique_opts: Optional["UniqueOpts"] = None +class InsertManyParams: + args: JobArgs + insert_opts: Optional[InsertOpts] = None @dataclass @@ -68,7 +95,7 @@ def __init__( ) async def insert( - self, args: Args, insert_opts: Optional[InsertOpts] = None + self, args: JobArgs, insert_opts: Optional[InsertOpts] = None ) -> InsertResult: async with self.driver.executor() as exec: if not insert_opts: @@ -83,7 +110,7 @@ async def insert(): ) async def insert_tx( - self, tx, args: Args, insert_opts: Optional[InsertOpts] = None + self, tx, args: JobArgs, insert_opts: Optional[InsertOpts] = None ) -> InsertResult: exec = self.driver.unwrap_executor(tx) if not insert_opts: @@ -95,11 +122,11 @@ async def insert(): return await self.__check_unique_job(exec, insert_params, unique_opts, insert) - async def insert_many(self, args: List[Args | InsertManyParams]) -> int: + async def insert_many(self, args: List[JobArgs | InsertManyParams]) -> int: async with self.driver.executor() as exec: return await exec.job_insert_many(_make_insert_params_many(args)) - async def insert_many_tx(self, tx, args: List[Args | InsertManyParams]) -> int: + async def insert_many_tx(self, tx, args: List[JobArgs | InsertManyParams]) -> int: exec = self.driver.unwrap_executor(tx) return await exec.job_insert_many(_make_insert_params_many(args)) @@ -137,7 +164,7 @@ def __init__( ) def insert( - self, args: Args, insert_opts: Optional[InsertOpts] = None + self, args: JobArgs, insert_opts: Optional[InsertOpts] = None ) -> InsertResult: with self.driver.executor() as exec: if not insert_opts: @@ -150,7 +177,7 @@ def insert(): return self.__check_unique_job(exec, insert_params, unique_opts, insert) def insert_tx( - self, tx, args: Args, insert_opts: Optional[InsertOpts] = None + self, tx, args: JobArgs, insert_opts: Optional[InsertOpts] = None ) -> InsertResult: exec = self.driver.unwrap_executor(tx) if not insert_opts: @@ -162,11 +189,11 @@ def insert(): return self.__check_unique_job(exec, insert_params, unique_opts, insert) - def insert_many(self, args: List[Args | InsertManyParams]) -> int: + def insert_many(self, args: List[JobArgs | InsertManyParams]) -> int: with self.driver.executor() as exec: return exec.job_insert_many(_make_insert_params_many(args)) - def insert_many_tx(self, tx, args: List[Args | InsertManyParams]) -> int: + def insert_many_tx(self, tx, args: List[JobArgs | InsertManyParams]) -> int: exec = self.driver.unwrap_executor(tx) return exec.job_insert_many(_make_insert_params_many(args)) @@ -257,7 +284,6 @@ def _check_advisory_lock_prefix_bounds( advisory_lock_prefix: Optional[int], ) -> Optional[int]: if advisory_lock_prefix: - print("in_bytes", advisory_lock_prefix.to_bytes(4)) # We only reserve 4 bytes for the prefix, so make sure the given one # properly fits. This will error in case that's not the case. advisory_lock_prefix.to_bytes(4) @@ -265,18 +291,18 @@ def _check_advisory_lock_prefix_bounds( def _make_insert_params( - args: Args, + args: JobArgs, insert_opts: InsertOpts, is_insert_many: bool = False, ) -> Tuple[JobInsertParams, Optional[UniqueOpts]]: - if not hasattr(args, "kind"): - raise Exception("args should respond to `kind`") + args.kind # fail fast in case args don't respond to kind args_json = args.to_json() - if args_json is None: - raise Exception("args should return non-nil from `to_json`") + assert args_json is not None, "args should return non-nil from `to_json`" - args_insert_opts = getattr(args, "insert_opts", InsertOpts()) + args_insert_opts = InsertOpts() + if isinstance(args, JobArgsWithInsertOpts): + args_insert_opts = args.insert_opts() scheduled_at = insert_opts.scheduled_at or args_insert_opts.scheduled_at unique_opts = insert_opts.unique_opts or args_insert_opts.unique_opts @@ -301,7 +327,7 @@ def _make_insert_params( def _make_insert_params_many( - args: List[Args | InsertManyParams], + args: List[JobArgs | InsertManyParams], ) -> List[JobInsertParams]: return [ _make_insert_params( diff --git a/tests/client_test.py b/tests/client_test.py index 15b73e5..852d6d5 100644 --- a/tests/client_test.py +++ b/tests/client_test.py @@ -1,3 +1,4 @@ +from dataclasses import dataclass from datetime import datetime, timezone from unittest.mock import MagicMock, Mock, patch @@ -66,20 +67,95 @@ def mock_unwrap_executor(tx: sqlalchemy.Transaction): assert insert_res.job == "job_row" -def test_insert_with_opts(client, mock_exec): - insert_opts = InsertOpts(queue="high_priority") +def test_insert_with_insert_opts_from_args(client, mock_exec): + mock_exec.job_insert.return_value = "job_row" + + insert_res = client.insert( + SimpleArgs(), + insert_opts=InsertOpts( + max_attempts=23, priority=2, queue="job_custom_queue", tags=["job_custom"] + ), + ) + + mock_exec.job_insert.assert_called_once() + assert insert_res.job == "job_row" + + insert_args = mock_exec.job_insert.call_args[0][0] + assert insert_args.max_attempts == 23 + assert insert_args.priority == 2 + assert insert_args.queue == "job_custom_queue" + assert insert_args.tags == ["job_custom"] + + +def test_insert_with_insert_opts_from_job(client, mock_exec): + @dataclass + class MyArgs: + kind = "my_args" + + @staticmethod + def insert_opts() -> InsertOpts: + return InsertOpts( + max_attempts=23, + priority=2, + queue="job_custom_queue", + tags=["job_custom"], + ) + + @staticmethod + def to_json() -> str: + return "{}" - mock_exec.job_get_by_kind_and_unique_properties.return_value = None mock_exec.job_insert.return_value = "job_row" - insert_res = client.insert(SimpleArgs(), insert_opts=insert_opts) + insert_res = client.insert( + MyArgs(), + ) mock_exec.job_insert.assert_called_once() assert insert_res.job == "job_row" - # Check that the InsertOpts were correctly passed to make_insert_params - call_args = mock_exec.job_insert.call_args[0][0] - assert call_args.queue == "high_priority" + insert_args = mock_exec.job_insert.call_args[0][0] + assert insert_args.max_attempts == 23 + assert insert_args.priority == 2 + assert insert_args.queue == "job_custom_queue" + assert insert_args.tags == ["job_custom"] + + +def test_insert_with_insert_opts_precedence(client, mock_exec): + @dataclass + class MyArgs: + kind = "my_args" + + @staticmethod + def insert_opts() -> InsertOpts: + return InsertOpts( + max_attempts=23, + priority=2, + queue="job_custom_queue", + tags=["job_custom"], + ) + + @staticmethod + def to_json() -> str: + return "{}" + + mock_exec.job_insert.return_value = "job_row" + + insert_res = client.insert( + SimpleArgs(), + insert_opts=InsertOpts( + max_attempts=17, priority=3, queue="my_queue", tags=["custom"] + ), + ) + + mock_exec.job_insert.assert_called_once() + assert insert_res.job == "job_row" + + insert_args = mock_exec.job_insert.call_args[0][0] + assert insert_args.max_attempts == 17 + assert insert_args.priority == 3 + assert insert_args.queue == "my_queue" + assert insert_args.tags == ["custom"] def test_insert_with_unique_opts_by_args(client, mock_exec): @@ -149,6 +225,40 @@ def test_insert_with_unique_opts_by_state(client, mock_exec): assert call_args.kind == "simple" +def test_insert_kind_error(client): + @dataclass + class MyArgs: + pass + + with pytest.raises(AttributeError) as ex: + client.insert(MyArgs()) + assert "'MyArgs' object has no attribute 'kind'" == str(ex.value) + + +def test_insert_to_json_attribute_error(client): + @dataclass + class MyArgs: + kind = "my" + + with pytest.raises(AttributeError) as ex: + client.insert(MyArgs()) + assert "'MyArgs' object has no attribute 'to_json'" == str(ex.value) + + +def test_insert_to_json_none_error(client): + @dataclass + class MyArgs: + kind = "my" + + @staticmethod + def to_json() -> None: + return None + + with pytest.raises(AssertionError) as ex: + client.insert(MyArgs()) + assert "args should return non-nil from `to_json`" == str(ex.value) + + def test_check_advisory_lock_prefix_bounds(): Client(mock_driver, advisory_lock_prefix=123)