Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- Rename `Args` to `JobArgs` and add `JobArgsWithInsertOpts` protocol. [PR #20](https://github.com/riverqueue/river/pull/20).

## [0.1.2] - 2024-07-04

### Changed
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ insert_res = client.insert(
insert_res.job # inserted job row
```

Job args should comply with the following [protocol](https://peps.python.org/pep-0544/):
Job args should comply with the `riverqueue.JobArgs` [protocol](https://peps.python.org/pep-0544/):

```python
class Args(Protocol):
class JobArgs(Protocol):
kind: str

def to_json(self) -> str:
Expand Down Expand Up @@ -137,7 +137,7 @@ with engine.begin() as session:
)
```

## Asynchronous I/O (`asyncio`)
## Asynchronous I/O (asyncio)

The package supports River's [`asyncio` (asynchronous I/O)](https://docs.python.org/3/library/asyncio.html) through an alternate `AsyncClient` and `riversqlalchemy.AsyncDriver`. You'll need to make sure to use SQLAlchemy's alternative async engine and an asynchronous Postgres driver like [`asyncpg`](https://github.com/MagicStack/asyncpg), but otherwise usage looks very similar to use without async:

Expand Down
3 changes: 2 additions & 1 deletion src/riverqueue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
JOB_STATE_RUNNING as JOB_STATE_RUNNING,
JOB_STATE_SCHEDULED as JOB_STATE_SCHEDULED,
AsyncClient as AsyncClient,
Args as Args,
JobArgs as JobArgs,
JobArgsWithInsertOpts as JobArgsWithInsertOpts,
Client as Client,
InsertManyParams as InsertManyParams,
InsertOpts as InsertOpts,
Expand Down
84 changes: 55 additions & 29 deletions src/riverqueue/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
from dataclasses import dataclass
from datetime import datetime, timezone, timedelta
from typing import Any, Awaitable, Literal, Optional, Protocol, Tuple, List, Callable
from typing import (
Any,
Awaitable,
Literal,
Optional,
Protocol,
Tuple,
List,
Callable,
runtime_checkable,
)

from .driver import GetParams, JobInsertParams, DriverProtocol, ExecutorProtocol
from .driver.driver_protocol import AsyncDriverProtocol, AsyncExecutorProtocol
Expand All @@ -27,27 +37,44 @@
]


class Args(Protocol):
@dataclass
class InsertOpts:
max_attempts: Optional[int] = None
priority: Optional[int] = None
queue: Optional[str] = None
scheduled_at: Optional[datetime] = None
tags: Optional[List[Any]] = None
unique_opts: Optional["UniqueOpts"] = None


class JobArgs(Protocol):
"""
Protocol that should be implemented by all job args.
"""

kind: str

def to_json(self) -> str:
pass


@dataclass
class InsertManyParams:
args: Args
insert_opts: Optional["InsertOpts"] = None
@runtime_checkable
class JobArgsWithInsertOpts(Protocol):
"""
Protocol that's optionally implemented by a JobArgs implementation so that
every inserted instance of them provides the same custom `InsertOpts`.
`InsertOpts` passed to insert functions will take precedence of one returned
by `JobArgsWithInsertOpts`.
"""

def insert_opts(self) -> InsertOpts:
pass


@dataclass
class InsertOpts:
max_attempts: Optional[int] = None
priority: Optional[int] = None
queue: Optional[str] = None
scheduled_at: Optional[datetime] = None
tags: Optional[List[Any]] = None
unique_opts: Optional["UniqueOpts"] = None
class InsertManyParams:
args: JobArgs
insert_opts: Optional[InsertOpts] = None


@dataclass
Expand All @@ -68,7 +95,7 @@ def __init__(
)

async def insert(
self, args: Args, insert_opts: Optional[InsertOpts] = None
self, args: JobArgs, insert_opts: Optional[InsertOpts] = None
) -> InsertResult:
async with self.driver.executor() as exec:
if not insert_opts:
Expand All @@ -83,7 +110,7 @@ async def insert():
)

async def insert_tx(
self, tx, args: Args, insert_opts: Optional[InsertOpts] = None
self, tx, args: JobArgs, insert_opts: Optional[InsertOpts] = None
) -> InsertResult:
exec = self.driver.unwrap_executor(tx)
if not insert_opts:
Expand All @@ -95,11 +122,11 @@ async def insert():

return await self.__check_unique_job(exec, insert_params, unique_opts, insert)

async def insert_many(self, args: List[Args | InsertManyParams]) -> int:
async def insert_many(self, args: List[JobArgs | InsertManyParams]) -> int:
async with self.driver.executor() as exec:
return await exec.job_insert_many(_make_insert_params_many(args))

async def insert_many_tx(self, tx, args: List[Args | InsertManyParams]) -> int:
async def insert_many_tx(self, tx, args: List[JobArgs | InsertManyParams]) -> int:
exec = self.driver.unwrap_executor(tx)
return await exec.job_insert_many(_make_insert_params_many(args))

Expand Down Expand Up @@ -137,7 +164,7 @@ def __init__(
)

def insert(
self, args: Args, insert_opts: Optional[InsertOpts] = None
self, args: JobArgs, insert_opts: Optional[InsertOpts] = None
) -> InsertResult:
with self.driver.executor() as exec:
if not insert_opts:
Expand All @@ -150,7 +177,7 @@ def insert():
return self.__check_unique_job(exec, insert_params, unique_opts, insert)

def insert_tx(
self, tx, args: Args, insert_opts: Optional[InsertOpts] = None
self, tx, args: JobArgs, insert_opts: Optional[InsertOpts] = None
) -> InsertResult:
exec = self.driver.unwrap_executor(tx)
if not insert_opts:
Expand All @@ -162,11 +189,11 @@ def insert():

return self.__check_unique_job(exec, insert_params, unique_opts, insert)

def insert_many(self, args: List[Args | InsertManyParams]) -> int:
def insert_many(self, args: List[JobArgs | InsertManyParams]) -> int:
with self.driver.executor() as exec:
return exec.job_insert_many(_make_insert_params_many(args))

def insert_many_tx(self, tx, args: List[Args | InsertManyParams]) -> int:
def insert_many_tx(self, tx, args: List[JobArgs | InsertManyParams]) -> int:
exec = self.driver.unwrap_executor(tx)
return exec.job_insert_many(_make_insert_params_many(args))

Expand Down Expand Up @@ -257,26 +284,25 @@ def _check_advisory_lock_prefix_bounds(
advisory_lock_prefix: Optional[int],
) -> Optional[int]:
if advisory_lock_prefix:
print("in_bytes", advisory_lock_prefix.to_bytes(4))
# We only reserve 4 bytes for the prefix, so make sure the given one
# properly fits. This will error in case that's not the case.
advisory_lock_prefix.to_bytes(4)
return advisory_lock_prefix


def _make_insert_params(
args: Args,
args: JobArgs,
insert_opts: InsertOpts,
is_insert_many: bool = False,
) -> Tuple[JobInsertParams, Optional[UniqueOpts]]:
if not hasattr(args, "kind"):
raise Exception("args should respond to `kind`")
args.kind # fail fast in case args don't respond to kind

args_json = args.to_json()
if args_json is None:
raise Exception("args should return non-nil from `to_json`")
assert args_json is not None, "args should return non-nil from `to_json`"

args_insert_opts = getattr(args, "insert_opts", InsertOpts())
args_insert_opts = InsertOpts()
if isinstance(args, JobArgsWithInsertOpts):
args_insert_opts = args.insert_opts()

scheduled_at = insert_opts.scheduled_at or args_insert_opts.scheduled_at
unique_opts = insert_opts.unique_opts or args_insert_opts.unique_opts
Expand All @@ -301,7 +327,7 @@ def _make_insert_params(


def _make_insert_params_many(
args: List[Args | InsertManyParams],
args: List[JobArgs | InsertManyParams],
) -> List[JobInsertParams]:
return [
_make_insert_params(
Expand Down
124 changes: 117 additions & 7 deletions tests/client_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from dataclasses import dataclass
from datetime import datetime, timezone
from unittest.mock import MagicMock, Mock, patch

Expand Down Expand Up @@ -66,20 +67,95 @@ def mock_unwrap_executor(tx: sqlalchemy.Transaction):
assert insert_res.job == "job_row"


def test_insert_with_opts(client, mock_exec):
insert_opts = InsertOpts(queue="high_priority")
def test_insert_with_insert_opts_from_args(client, mock_exec):
mock_exec.job_insert.return_value = "job_row"

insert_res = client.insert(
SimpleArgs(),
insert_opts=InsertOpts(
max_attempts=23, priority=2, queue="job_custom_queue", tags=["job_custom"]
),
)

mock_exec.job_insert.assert_called_once()
assert insert_res.job == "job_row"

insert_args = mock_exec.job_insert.call_args[0][0]
assert insert_args.max_attempts == 23
assert insert_args.priority == 2
assert insert_args.queue == "job_custom_queue"
assert insert_args.tags == ["job_custom"]


def test_insert_with_insert_opts_from_job(client, mock_exec):
@dataclass
class MyArgs:
kind = "my_args"

@staticmethod
def insert_opts() -> InsertOpts:
return InsertOpts(
max_attempts=23,
priority=2,
queue="job_custom_queue",
tags=["job_custom"],
)

@staticmethod
def to_json() -> str:
return "{}"

mock_exec.job_get_by_kind_and_unique_properties.return_value = None
mock_exec.job_insert.return_value = "job_row"

insert_res = client.insert(SimpleArgs(), insert_opts=insert_opts)
insert_res = client.insert(
MyArgs(),
)

mock_exec.job_insert.assert_called_once()
assert insert_res.job == "job_row"

# Check that the InsertOpts were correctly passed to make_insert_params
call_args = mock_exec.job_insert.call_args[0][0]
assert call_args.queue == "high_priority"
insert_args = mock_exec.job_insert.call_args[0][0]
assert insert_args.max_attempts == 23
assert insert_args.priority == 2
assert insert_args.queue == "job_custom_queue"
assert insert_args.tags == ["job_custom"]


def test_insert_with_insert_opts_precedence(client, mock_exec):
@dataclass
class MyArgs:
kind = "my_args"

@staticmethod
def insert_opts() -> InsertOpts:
return InsertOpts(
max_attempts=23,
priority=2,
queue="job_custom_queue",
tags=["job_custom"],
)

@staticmethod
def to_json() -> str:
return "{}"

mock_exec.job_insert.return_value = "job_row"

insert_res = client.insert(
SimpleArgs(),
insert_opts=InsertOpts(
max_attempts=17, priority=3, queue="my_queue", tags=["custom"]
),
)

mock_exec.job_insert.assert_called_once()
assert insert_res.job == "job_row"

insert_args = mock_exec.job_insert.call_args[0][0]
assert insert_args.max_attempts == 17
assert insert_args.priority == 3
assert insert_args.queue == "my_queue"
assert insert_args.tags == ["custom"]


def test_insert_with_unique_opts_by_args(client, mock_exec):
Expand Down Expand Up @@ -149,6 +225,40 @@ def test_insert_with_unique_opts_by_state(client, mock_exec):
assert call_args.kind == "simple"


def test_insert_kind_error(client):
@dataclass
class MyArgs:
pass

with pytest.raises(AttributeError) as ex:
client.insert(MyArgs())
assert "'MyArgs' object has no attribute 'kind'" == str(ex.value)


def test_insert_to_json_attribute_error(client):
@dataclass
class MyArgs:
kind = "my"

with pytest.raises(AttributeError) as ex:
client.insert(MyArgs())
assert "'MyArgs' object has no attribute 'to_json'" == str(ex.value)


def test_insert_to_json_none_error(client):
@dataclass
class MyArgs:
kind = "my"

@staticmethod
def to_json() -> None:
return None

with pytest.raises(AssertionError) as ex:
client.insert(MyArgs())
assert "args should return non-nil from `to_json`" == str(ex.value)


def test_check_advisory_lock_prefix_bounds():
Client(mock_driver, advisory_lock_prefix=123)

Expand Down