Skip to content

vsenv

Functions:

Attributes:

PRELOADED_MODULES module-attribute

PRELOADED_MODULES = set(values())

T module-attribute

T = TypeVar('T')

current_log module-attribute

current_log: LogHandle | None = None

environment module-attribute

environment: ManagedEnvironment | None = None

orig_runpy_run_code module-attribute

orig_runpy_run_code = _run_code

policy module-attribute

policy: FlagsPolicy = FlagsPolicy(GlobalStore())

FlagsPolicy

Bases: Policy

Methods:

new_environment

new_environment() -> ManagedEnvironment
Source code in vspreview/core/vsenv.py
36
37
38
39
40
def new_environment(self) -> ManagedEnvironment:
    data = self.api.create_environment(CoreCreationFlags.ENABLE_GRAPH_INSPECTION)
    env = self.api.wrap_environment(data)
    logger.debug("Created new environment")
    return ManagedEnvironment(env, data, self)

PyQTLoop

Bases: QObject, EventLoop

Methods:

Attributes:

move class-attribute instance-attribute

move = pyqtSignal(int)

attach

attach() -> None
Source code in vspreview/core/vsenv.py
69
70
71
72
73
74
75
def attach(self) -> None:
    self._signal_lock = Lock()
    self._signal_counter = 0
    self._signallers = dict[int, Callable[[], None]]()

    self._slot = self._receive_task
    self.move.connect(self._slot)

detach

detach() -> None
Source code in vspreview/core/vsenv.py
77
78
79
def detach(self) -> None:
    self.move.disconnect(self._slot)
    self._signallers = {}

from_thread

from_thread(func: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]
Source code in vspreview/core/vsenv.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def from_thread(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]:
    fut = Future[T]()

    with self._signal_lock:
        my_counter = self._signal_counter

        self._signal_counter += 1

    def wrapper() -> None:
        nonlocal my_counter

        if not fut.set_running_or_notify_cancel():
            return

        del self._signallers[my_counter]

        try:
            result = func(*args, **kwargs)
        except BaseException as e:
            fut.set_exception(e)
        else:
            fut.set_result(result)

    self._signallers[my_counter] = wrapper
    self.move.emit(my_counter)

    return fut

to_thread

to_thread(func: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]
Source code in vspreview/core/vsenv.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def to_thread(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]:
    fut = Future[T]()

    def wrapper() -> None:
        if not fut.set_running_or_notify_cancel():
            return

        try:
            result = func(*args, **kwargs)
        except BaseException as e:
            fut.set_exception(e)
        else:
            fut.set_result(result)

    QThreadPool.globalInstance().start(Runner(wrapper))

    return fut

Runner

Runner(wrapper: Callable[[], None])

Bases: QRunnable

Methods:

Attributes:

Source code in vspreview/core/vsenv.py
58
59
60
def __init__(self, wrapper: Callable[[], None]) -> None:
    super().__init__()
    self.wrapper = wrapper

wrapper instance-attribute

wrapper = wrapper

run

run() -> None
Source code in vspreview/core/vsenv.py
62
63
def run(self) -> None:
    self.wrapper()

dispose_environment

dispose_environment(env: ManagedEnvironment) -> None
Source code in vspreview/core/vsenv.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def dispose_environment(env: ManagedEnvironment) -> None:
    import logging

    if current_log:
        env.core.remove_log_handler(current_log)

    if logging.getLogger().level <= logging.DEBUG:
        from gc import get_referrers

        # There has to be the environment referencing it
        ref_count = len(get_referrers(env.core)) - 1

        if ref_count:
            logging.debug(f'Core {id(env.core)} is being held reference by {ref_count} objects!')

    env.dispose()

get_current_environment

get_current_environment() -> ManagedEnvironment
Source code in vspreview/core/vsenv.py
176
177
178
def get_current_environment() -> ManagedEnvironment:
    assert environment
    return environment

make_environment

make_environment() -> None
Source code in vspreview/core/vsenv.py
138
139
140
141
142
143
144
145
146
147
def make_environment() -> None:
    global environment, current_log
    assert policy is not None

    if environment and current_log:
        environment.core.remove_log_handler(current_log)

    environment = policy.new_environment()
    current_log = environment.core.add_log_handler(get_vs_logger())
    environment.switch()

set_vsengine_loop

set_vsengine_loop() -> None
Source code in vspreview/core/vsenv.py
131
132
def set_vsengine_loop() -> None:
    set_loop(PyQTLoop())