Bases: QObject
, EventLoop
Methods:
Attributes:
move class-attribute
instance-attribute
attach
Source code
| def attach(self) -> None:
self._signal_lock = Lock()
self._signal_counter = 0
self._signallers = dict[int, Callable[[], None]]()
self._slot = self._receive_task
self.move.connect(self._slot)
|
detach
Source code
| def detach(self) -> None:
self.move.disconnect(self._slot)
self._signallers = {}
|
from_thread
Source code
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110 | def from_thread(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]:
fut = Future[T]()
with self._signal_lock:
my_counter = self._signal_counter
self._signal_counter += 1
def wrapper() -> None:
nonlocal my_counter
if not fut.set_running_or_notify_cancel():
return
del self._signallers[my_counter]
try:
result = func(*args, **kwargs)
except BaseException as e:
fut.set_exception(e)
else:
fut.set_result(result)
self._signallers[my_counter] = wrapper
self.move.emit(my_counter)
return fut
|
to_thread
Source code
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128 | def to_thread(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]:
fut = Future[T]()
def wrapper() -> None:
if not fut.set_running_or_notify_cancel():
return
try:
result = func(*args, **kwargs)
except BaseException as e:
fut.set_exception(e)
else:
fut.set_result(result)
QThreadPool.globalInstance().start(Runner(wrapper))
return fut
|