276 lines
9.4 KiB
Python
276 lines
9.4 KiB
Python
import signal
|
|
import sys
|
|
import types
|
|
from typing import Any, TypeVar
|
|
|
|
import pytest
|
|
|
|
import trio
|
|
from trio.testing import Matcher, RaisesGroup
|
|
|
|
from .. import _core
|
|
from .._core._tests.tutil import (
|
|
create_asyncio_future_in_new_loop,
|
|
ignore_coroutine_never_awaited_warnings,
|
|
)
|
|
from .._util import (
|
|
ConflictDetector,
|
|
NoPublicConstructor,
|
|
coroutine_or_error,
|
|
final,
|
|
fixup_module_metadata,
|
|
generic_function,
|
|
is_main_thread,
|
|
signal_raise,
|
|
)
|
|
from ..testing import wait_all_tasks_blocked
|
|
|
|
T = TypeVar("T")
|
|
|
|
|
|
def test_signal_raise() -> None:
|
|
record = []
|
|
|
|
def handler(signum: int, _: object) -> None:
|
|
record.append(signum)
|
|
|
|
old = signal.signal(signal.SIGFPE, handler)
|
|
try:
|
|
signal_raise(signal.SIGFPE)
|
|
finally:
|
|
signal.signal(signal.SIGFPE, old)
|
|
assert record == [signal.SIGFPE]
|
|
|
|
|
|
async def test_ConflictDetector() -> None:
|
|
ul1 = ConflictDetector("ul1")
|
|
ul2 = ConflictDetector("ul2")
|
|
|
|
with ul1:
|
|
with ul2:
|
|
print("ok")
|
|
|
|
with pytest.raises(_core.BusyResourceError, match="ul1"):
|
|
with ul1:
|
|
with ul1:
|
|
pass # pragma: no cover
|
|
|
|
async def wait_with_ul1() -> None:
|
|
with ul1:
|
|
await wait_all_tasks_blocked()
|
|
|
|
with RaisesGroup(Matcher(_core.BusyResourceError, "ul1")):
|
|
async with _core.open_nursery() as nursery:
|
|
nursery.start_soon(wait_with_ul1)
|
|
nursery.start_soon(wait_with_ul1)
|
|
|
|
|
|
def test_module_metadata_is_fixed_up() -> None:
|
|
import trio
|
|
import trio.testing
|
|
|
|
assert trio.Cancelled.__module__ == "trio"
|
|
assert trio.open_nursery.__module__ == "trio"
|
|
assert trio.abc.Stream.__module__ == "trio.abc"
|
|
assert trio.lowlevel.wait_task_rescheduled.__module__ == "trio.lowlevel"
|
|
assert trio.testing.trio_test.__module__ == "trio.testing"
|
|
|
|
# Also check methods
|
|
assert trio.lowlevel.ParkingLot.__init__.__module__ == "trio.lowlevel"
|
|
assert trio.abc.Stream.send_all.__module__ == "trio.abc"
|
|
|
|
# And names
|
|
assert trio.Cancelled.__name__ == "Cancelled"
|
|
assert trio.Cancelled.__qualname__ == "Cancelled"
|
|
assert trio.abc.SendStream.send_all.__name__ == "send_all"
|
|
assert trio.abc.SendStream.send_all.__qualname__ == "SendStream.send_all"
|
|
assert trio.to_thread.__name__ == "trio.to_thread"
|
|
assert trio.to_thread.run_sync.__name__ == "run_sync"
|
|
assert trio.to_thread.run_sync.__qualname__ == "run_sync"
|
|
|
|
|
|
async def test_is_main_thread() -> None:
|
|
assert is_main_thread()
|
|
|
|
def not_main_thread() -> None:
|
|
assert not is_main_thread()
|
|
|
|
await trio.to_thread.run_sync(not_main_thread)
|
|
|
|
|
|
# @coroutine is deprecated since python 3.8, which is fine with us.
|
|
@pytest.mark.filterwarnings("ignore:.*@coroutine.*:DeprecationWarning")
|
|
def test_coroutine_or_error() -> None:
|
|
class Deferred:
|
|
"Just kidding"
|
|
|
|
with ignore_coroutine_never_awaited_warnings():
|
|
|
|
async def f() -> None: # pragma: no cover
|
|
pass
|
|
|
|
with pytest.raises(TypeError) as excinfo:
|
|
coroutine_or_error(f()) # type: ignore[arg-type, unused-coroutine]
|
|
assert "expecting an async function" in str(excinfo.value)
|
|
|
|
import asyncio
|
|
|
|
if sys.version_info < (3, 11):
|
|
# not bothering to type this one
|
|
@asyncio.coroutine # type: ignore[misc]
|
|
def generator_based_coro() -> Any: # pragma: no cover
|
|
yield from asyncio.sleep(1)
|
|
|
|
with pytest.raises(TypeError) as excinfo:
|
|
coroutine_or_error(generator_based_coro()) # type: ignore[arg-type, unused-coroutine]
|
|
assert "asyncio" in str(excinfo.value)
|
|
|
|
with pytest.raises(TypeError) as excinfo:
|
|
coroutine_or_error(create_asyncio_future_in_new_loop()) # type: ignore[arg-type, unused-coroutine]
|
|
assert "asyncio" in str(excinfo.value)
|
|
|
|
# does not raise arg-type error
|
|
with pytest.raises(TypeError) as excinfo:
|
|
coroutine_or_error(create_asyncio_future_in_new_loop) # type: ignore[unused-coroutine]
|
|
assert "asyncio" in str(excinfo.value)
|
|
|
|
with pytest.raises(TypeError) as excinfo:
|
|
coroutine_or_error(Deferred()) # type: ignore[arg-type, unused-coroutine]
|
|
assert "twisted" in str(excinfo.value)
|
|
|
|
with pytest.raises(TypeError) as excinfo:
|
|
coroutine_or_error(lambda: Deferred()) # type: ignore[arg-type, unused-coroutine, return-value]
|
|
assert "twisted" in str(excinfo.value)
|
|
|
|
with pytest.raises(TypeError) as excinfo:
|
|
coroutine_or_error(len, [[1, 2, 3]]) # type: ignore[arg-type, unused-coroutine]
|
|
|
|
assert "appears to be synchronous" in str(excinfo.value)
|
|
|
|
async def async_gen(_: object) -> Any: # pragma: no cover
|
|
yield
|
|
|
|
# does not give arg-type typing error
|
|
with pytest.raises(TypeError) as excinfo:
|
|
coroutine_or_error(async_gen, [0]) # type: ignore[unused-coroutine]
|
|
msg = "expected an async function but got an async generator"
|
|
assert msg in str(excinfo.value)
|
|
|
|
# Make sure no references are kept around to keep anything alive
|
|
del excinfo
|
|
|
|
|
|
def test_generic_function() -> None:
|
|
@generic_function # Decorated function contains "Any".
|
|
def test_func(arg: T) -> T: # type: ignore[misc]
|
|
"""Look, a docstring!"""
|
|
return arg
|
|
|
|
assert test_func is test_func[int] is test_func[int, str]
|
|
assert test_func(42) == test_func[int](42) == 42
|
|
assert test_func.__doc__ == "Look, a docstring!"
|
|
assert test_func.__qualname__ == "test_generic_function.<locals>.test_func" # type: ignore[attr-defined]
|
|
assert test_func.__name__ == "test_func" # type: ignore[attr-defined]
|
|
assert test_func.__module__ == __name__
|
|
|
|
|
|
def test_final_decorator() -> None:
|
|
"""Test that subclassing a @final-annotated class is not allowed.
|
|
|
|
This checks both runtime results, and verifies that type checkers detect
|
|
the error statically through the type-ignore comment.
|
|
"""
|
|
|
|
@final
|
|
class FinalClass:
|
|
pass
|
|
|
|
with pytest.raises(TypeError):
|
|
|
|
class SubClass(FinalClass): # type: ignore[misc]
|
|
pass
|
|
|
|
|
|
def test_no_public_constructor_metaclass() -> None:
|
|
"""The NoPublicConstructor metaclass prevents calling the constructor directly."""
|
|
|
|
class SpecialClass(metaclass=NoPublicConstructor):
|
|
def __init__(self, a: int, b: float) -> None:
|
|
"""Check arguments can be passed to __init__."""
|
|
assert a == 8
|
|
assert b == 3.14
|
|
|
|
with pytest.raises(TypeError):
|
|
SpecialClass(8, 3.14)
|
|
|
|
# Private constructor should not raise, and passes args to __init__.
|
|
assert isinstance(SpecialClass._create(8, b=3.14), SpecialClass)
|
|
|
|
|
|
def test_fixup_module_metadata() -> None:
|
|
# Ignores modules not in the trio.X tree.
|
|
non_trio_module = types.ModuleType("not_trio")
|
|
non_trio_module.some_func = lambda: None # type: ignore[attr-defined]
|
|
non_trio_module.some_func.__name__ = "some_func"
|
|
non_trio_module.some_func.__qualname__ = "some_func"
|
|
|
|
fixup_module_metadata(non_trio_module.__name__, vars(non_trio_module))
|
|
|
|
assert non_trio_module.some_func.__name__ == "some_func"
|
|
assert non_trio_module.some_func.__qualname__ == "some_func"
|
|
|
|
# Bulild up a fake module to test. Just use lambdas since all we care about is the names.
|
|
mod = types.ModuleType("trio._somemodule_impl")
|
|
mod.some_func = lambda: None # type: ignore[attr-defined]
|
|
mod.some_func.__name__ = "_something_else"
|
|
mod.some_func.__qualname__ = "_something_else"
|
|
|
|
# No __module__ means it's unchanged.
|
|
mod.not_funclike = types.SimpleNamespace() # type: ignore[attr-defined]
|
|
mod.not_funclike.__name__ = "not_funclike"
|
|
|
|
# Check __qualname__ being absent works.
|
|
mod.only_has_name = types.SimpleNamespace() # type: ignore[attr-defined]
|
|
mod.only_has_name.__module__ = "trio._somemodule_impl"
|
|
mod.only_has_name.__name__ = "only_name"
|
|
|
|
# Underscored names are unchanged.
|
|
mod._private = lambda: None # type: ignore[attr-defined]
|
|
mod._private.__module__ = "trio._somemodule_impl"
|
|
mod._private.__name__ = mod._private.__qualname__ = "_private"
|
|
|
|
# We recurse into classes.
|
|
mod.SomeClass = type( # type: ignore[attr-defined]
|
|
"SomeClass",
|
|
(),
|
|
{
|
|
"__init__": lambda self: None,
|
|
"method": lambda self: None,
|
|
},
|
|
)
|
|
# Reference loop is fine.
|
|
mod.SomeClass.recursion = mod.SomeClass # type: ignore[attr-defined]
|
|
|
|
fixup_module_metadata("trio.somemodule", vars(mod))
|
|
assert mod.some_func.__name__ == "some_func"
|
|
assert mod.some_func.__module__ == "trio.somemodule"
|
|
assert mod.some_func.__qualname__ == "some_func"
|
|
|
|
assert mod.not_funclike.__name__ == "not_funclike"
|
|
assert mod._private.__name__ == "_private"
|
|
assert mod._private.__module__ == "trio._somemodule_impl"
|
|
assert mod._private.__qualname__ == "_private"
|
|
|
|
assert mod.only_has_name.__name__ == "only_has_name"
|
|
assert mod.only_has_name.__module__ == "trio.somemodule"
|
|
assert not hasattr(mod.only_has_name, "__qualname__")
|
|
|
|
assert mod.SomeClass.method.__name__ == "method" # type: ignore[attr-defined]
|
|
assert mod.SomeClass.method.__module__ == "trio.somemodule" # type: ignore[attr-defined]
|
|
assert mod.SomeClass.method.__qualname__ == "SomeClass.method" # type: ignore[attr-defined]
|
|
# Make coverage happy.
|
|
non_trio_module.some_func()
|
|
mod.some_func()
|
|
mod._private()
|
|
mod.SomeClass().method()
|