Skip to content

coros

This module and original idea is by cid-chan (Sarah cid@cid-chan.moe)

Classes:

Functions:

UNWRAP_NAME module-attribute

UNWRAP_NAME = '__vspyplugin_unwrap'

core module-attribute

core = core

Atom

Atom()

Bases: Generic[T]

Methods:

Attributes:

Source code
30
31
def __init__(self) -> None:
    self.value = None

value instance-attribute

value: T | None = None

set

set(value: T) -> None
Source code
33
34
def set(self, value: T) -> None:
    self.value = value

unset

unset() -> None
Source code
36
37
def unset(self) -> None:
    self.value = None

GatherRequests

GatherRequests(coroutines: tuple[AnyCoroutine[T0, T], ...])

Bases: Generic[T], FrameRequest

Methods:

Attributes:

Source code
60
61
62
63
64
def __init__(self, coroutines: tuple[AnyCoroutine[T0, T], ...]) -> None:
    if len(coroutines) <= 1:
        raise CustomValueError('You need to pass at least 2 coroutines!', self.__class__)

    self.coroutines = coroutines

coroutines instance-attribute

coroutines = coroutines

build_frame_eval

build_frame_eval(
    clip: VideoNode,
    frame_no: int,
    continuation: Callable[[tuple[VideoFrame | T | None, ...]], VideoNode],
) -> VideoNode
Source code
86
87
88
89
90
91
92
93
94
95
def build_frame_eval(
    self, clip: vs.VideoNode, frame_no: int,
    continuation: Callable[[tuple[vs.VideoFrame | T | None, ...]], vs.VideoNode]
) -> vs.VideoNode:
    clips, atoms = self.unwrap_coros(clip, frame_no)

    def _apply(n: int, f: list[vs.VideoFrame]) -> vs.VideoNode:
        return continuation(self.wrap_frames(f, atoms))

    return clip.std.FrameEval(_apply, clips)

unwrap_coros

unwrap_coros(
    clip: VideoNode, frame_no: int
) -> tuple[list[VideoNode], list[Atom[T]]]
Source code
76
77
78
79
def unwrap_coros(self, clip: vs.VideoNode, frame_no: int) -> tuple[list[vs.VideoNode], list[Atom[T]]]:
    return zip(*[  # type: ignore
        _coro2node_wrapped(clip, frame_no, coro) for coro in self.coroutines
    ])

wrap_frames

wrap_frames(
    frames: list[VideoFrame], atoms: list[Atom[T]]
) -> tuple[VideoFrame | T | None, ...]
Source code
81
82
83
84
def wrap_frames(self, frames: list[vs.VideoFrame], atoms: list[Atom[T]]) -> tuple[vs.VideoFrame | T | None, ...]:
    return tuple(
        self._unwrap(frame, atom) for frame, atom in zip(frames, atoms)
    )

SingleFrameRequest

SingleFrameRequest(clip: VideoNode, frame_no: int)

Bases: FrameRequest

Methods:

Attributes:

Source code
41
42
43
def __init__(self, clip: vs.VideoNode, frame_no: int) -> None:
    self.clip = clip
    self.frame_no = frame_no

clip instance-attribute

clip = clip

frame_no instance-attribute

frame_no = frame_no

build_frame_eval

build_frame_eval(
    clip: VideoNode, frame_no: int, continuation: Callable[[Any], VideoNode]
) -> VideoNode
Source code
48
49
50
51
52
53
54
55
56
def build_frame_eval(
    self, clip: vs.VideoNode, frame_no: int,
    continuation: Callable[[Any], vs.VideoNode]
) -> vs.VideoNode:
    req_clip = self.clip[self.frame_no]

    return clip.std.FrameEval(
        lambda n, f: continuation(f), [req_clip]
    )

coro2node

coro2node(
    base_clip: VideoNode,
    frameno: int,
    coro: AnyCoroutine[T0, T],
    wrap: Atom[T] | None = None,
) -> VideoNode
Source code
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def coro2node(
    base_clip: vs.VideoNode, frameno: int, coro: AnyCoroutine[T0, T], wrap: Atom[T] | None = None
) -> vs.VideoNode:
    assert base_clip.format

    props_clip = base_clip.std.BlankClip()
    blank_clip = core.std.BlankClip(
        length=1, fpsnum=1, fpsden=1, keep=True,
        width=base_clip.width, height=base_clip.height,
        format=base_clip.format.id
    )

    _wrap_frame = _wrapped_modify_frame(blank_clip)

    def _continue(wrapped_value: T0 | None) -> vs.VideoNode:
        if wrap:
            wrap.unset()

        try:
            next_request = coro.send(wrapped_value)
        except StopIteration as e:
            value = e.value

            if isinstance(value, vs.VideoNode):
                return value  # type: ignore

            if isinstance(value, vs.VideoFrame):
                return _wrap_frame(value)

            if not wrap:
                raise CustomTypeError('You can only return a VideoFrame or VideoNode!', coro2node)

            wrap.set(value)

            return props_clip.std.SetFrameProp(UNWRAP_NAME, intval=True)
        except Exception as e:
            raise e

        return next_request.build_frame_eval(base_clip, frameno, _continue)

    return _continue(None)