Skip to content

D2VWitch

Classes:

D2VWitch

D2VWitch(
    *,
    bin_path: SPathLike | MissingT = MISSING,
    ext: str | MissingT = MISSING,
    force: bool = True,
    default_out_folder: SPathLike | Literal[False] | None = None,
    **kwargs: Any
)

Bases: ExternalIndexer

Methods:

Attributes:

Source code in vssource/indexers/base.py
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def __init__(
    self,
    *,
    bin_path: SPathLike | MissingT = MISSING,
    ext: str | MissingT = MISSING,
    force: bool = True,
    default_out_folder: SPathLike | Literal[False] | None = None,
    **kwargs: Any,
) -> None:
    super().__init__(force=force, **kwargs)

    if bin_path is MISSING:
        bin_path = self._bin_path

    if ext is MISSING:
        ext = self._ext

    self.bin_path = SPath(bin_path)
    self.ext = ext
    self.default_out_folder = default_out_folder

bin_path instance-attribute

bin_path = SPath(bin_path)

default_out_folder instance-attribute

default_out_folder = default_out_folder

ext instance-attribute

ext = ext

force instance-attribute

force = force

indexer_kwargs instance-attribute

indexer_kwargs = kwargs

file_corrupted

file_corrupted(index_path: SPath) -> None
Source code in vssource/indexers/base.py
225
226
227
228
229
230
231
232
def file_corrupted(self, index_path: SPath) -> None:
    if self.force:
        try:
            index_path.unlink()
        except OSError:
            raise CustomRuntimeError("Index file corrupted, tried to delete it and failed.", self.__class__)
    else:
        raise CustomRuntimeError("Index file corrupted! Delete it and retry.", self.__class__)

get_cmd

get_cmd(files: list[SPath], output: SPath) -> list[str]
Source code in vssource/indexers/D2VWitch.py
25
26
def get_cmd(self, files: list[SPath], output: SPath) -> list[str]:
    return list(map(str, [self._get_bin_path(), *files, "--output", output]))

get_idx_file_path

get_idx_file_path(path: SPath) -> SPath
Source code in vssource/indexers/base.py
222
223
def get_idx_file_path(self, path: SPath) -> SPath:
    return path.with_suffix(f".{self.ext}")

get_info

get_info(index_path: SPath, file_idx: int = -1) -> D2VIndexFileInfo
Source code in vssource/indexers/D2VWitch.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def get_info(self, index_path: SPath, file_idx: int = -1) -> D2VIndexFileInfo:
    with open(index_path, "r") as f:
        file_content = f.read()

    lines = file_content.split("\n")

    head, lines = lines[:2], lines[2:]

    if "DGIndex" not in head[0]:
        self.file_corrupted(index_path)

    raw_header, lines = self._split_lines(self._split_lines(lines)[1])

    header = D2VIndexHeader()

    for rlin in raw_header:
        if split_val := rlin.rstrip().split("="):
            key: str = split_val[0].upper()
            values: list[str] = ",".join(split_val[1:]).split(",")
        else:
            continue

        if key == "STREAM_TYPE":
            header.stream_type = int(values[0])
        elif key == "MPEG_TYPE":
            header.MPEG_type = int(values[0])
        elif key == "IDCT_ALGORITHM":
            header.iDCT_algorithm = int(values[0])
        elif key == "YUVRGB_SCALE":
            header.YUVRGB_scale = int(values[0])
        elif key == "LUMINANCE_FILTER":
            header.luminance_filter = tuple(map(int, values))
        elif key == "CLIPPING":
            header.clipping = list(map(int, values))
        elif key == "ASPECT_RATIO":
            header.aspect = Fraction(*list(map(int, values[0].split(":"))))
        elif key == "PICTURE_SIZE":
            header.pic_size = str(values[0])
        elif key == "FIELD_OPERATION":
            header.field_op = int(values[0])
        elif key == "FRAME_RATE":
            if matches := re.search(r".*\((\d+\/\d+)", values[0]):
                header.frame_rate = Fraction(matches.group(1))
        elif key == "LOCATION":
            header.location = list(map(partial(int, base=16), values))

    frame_data = list[D2VIndexFrameData]()

    if file_idx >= 0:
        for rawline in lines:
            if len(rawline) == 0:
                break

            line = rawline.split(" ", maxsplit=7)

            ffile_idx = int(line[2])

            if ffile_idx < file_idx:
                continue
            elif ffile_idx > file_idx:
                break

            frame_data.append(
                D2VIndexFrameData(
                    int(line[1]),
                    "I",
                    int(line[5]),
                    int(line[6]),
                    int(line[0], 16),
                    int(line[4]),
                    int(line[3]),
                    [int(a, 16) for a in line[7:]],
                )
            )
    elif file_idx == -1:
        for rawline in lines:
            if len(rawline) == 0:
                break

            line = rawline.split(" ")

            frame_data.append(
                D2VIndexFrameData(
                    int(line[1]),
                    "I",
                    int(line[5]),
                    int(line[6]),
                    int(line[0], 16),
                    int(line[4]),
                    int(line[3]),
                    [int(a, 16) for a in line[7:]],
                )
            )

    return D2VIndexFileInfo(index_path, file_idx, header, frame_data)

get_joined_names classmethod

get_joined_names(files: list[SPath]) -> str
Source code in vssource/indexers/base.py
58
59
60
@classmethod
def get_joined_names(cls, files: list[SPath]) -> str:
    return "_".join([file.name for file in files])

get_out_folder

get_out_folder(
    output_folder: SPathLike | Literal[False] | None = None,
    file: SPath | None = None,
) -> SPath
Source code in vssource/indexers/base.py
211
212
213
214
215
216
217
218
219
220
def get_out_folder(
    self, output_folder: SPathLike | Literal[False] | None = None, file: SPath | None = None
) -> SPath:
    if output_folder is None:
        return SPath(file).get_folder() if file else self.get_out_folder(False)

    if not output_folder:
        return SPath(gettempdir())

    return SPath(output_folder)

get_video_idx_path

get_video_idx_path(
    folder: SPath, file_hash: str, video_name: SPathLike
) -> SPath
Source code in vssource/indexers/base.py
280
281
282
283
284
285
def get_video_idx_path(self, folder: SPath, file_hash: str, video_name: SPathLike) -> SPath:
    vid_name = SPath(video_name).stem
    current_indxer = SPath(self._bin_path).name
    filename = "_".join([file_hash, vid_name, current_indxer])

    return self.get_idx_file_path(PackageStorage(folder).get_file(filename))

get_videos_hash classmethod

get_videos_hash(files: list[SPath]) -> str
Source code in vssource/indexers/base.py
62
63
64
65
66
@classmethod
def get_videos_hash(cls, files: list[SPath]) -> str:
    length = sum(file.stat().st_size for file in files)
    to_hash = length.to_bytes(32, "little") + cls.get_joined_names(files).encode()
    return md5(to_hash).hexdigest()

index

index(
    files: Sequence[SPath],
    force: bool = False,
    split_files: bool = False,
    output_folder: SPathLike | Literal[False] | None = None,
    *cmd_args: str
) -> list[SPath]
Source code in vssource/indexers/base.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def index(
    self,
    files: Sequence[SPath],
    force: bool = False,
    split_files: bool = False,
    output_folder: SPathLike | Literal[False] | None = None,
    *cmd_args: str,
) -> list[SPath]:
    if len(unique_folders := list({f.get_folder().to_str() for f in files})) > 1:
        return [
            c
            for s in (
                self.index(
                    [f for f in files if f.get_folder().to_str() == folder], force, split_files, output_folder
                )
                for folder in unique_folders
            )
            for c in s
        ]

    dest_folder = self.get_out_folder(output_folder, files[0])

    files = sorted(set(files))

    hash_str = self.get_videos_hash(files)

    def _index(files: list[SPath], output: SPath) -> None:
        if output.is_file():
            if output.stat().st_size == 0 or force:
                output.unlink()
            else:
                return self.update_video_filenames(output, files)
        return self._run_index(files, output, cmd_args)

    if not split_files:
        output = self.get_video_idx_path(dest_folder, hash_str, "JOINED" if len(files) > 1 else "SINGLE")
        _index(files, output)
        return [output]

    outputs = [self.get_video_idx_path(dest_folder, hash_str, file.name) for file in files]

    for file, output in zip(files, outputs):
        _index([file], output)

    return outputs

normalize_filenames classmethod

normalize_filenames(file: SPathLike | Sequence[SPathLike]) -> list[SPath]
Source code in vssource/indexers/base.py
72
73
74
75
76
77
78
79
80
81
82
@classmethod
def normalize_filenames(cls, file: SPathLike | Sequence[SPathLike]) -> list[SPath]:
    files = list[SPath]()

    for f in to_arr(file):
        if str(f).startswith("file:///"):
            f = str(f)[8::]

        files.append(SPath(f))

    return files

source

source(
    file: SPathLike | Sequence[SPathLike],
    bits: int | None = None,
    *,
    matrix: MatrixLike | None = None,
    transfer: TransferLike | None = None,
    primaries: PrimariesLike | None = None,
    chroma_location: ChromaLocation | None = None,
    color_range: ColorRangeLike | None = None,
    field_based: FieldBasedLike | None = None,
    idx_props: bool = True,
    **kwargs: Any
) -> VideoNode
Source code in vssource/indexers/base.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
@inject_self
def source(
    self,
    file: SPathLike | Sequence[SPathLike],
    bits: int | None = None,
    *,
    matrix: MatrixLike | None = None,
    transfer: TransferLike | None = None,
    primaries: PrimariesLike | None = None,
    chroma_location: ChromaLocation | None = None,
    color_range: ColorRangeLike | None = None,
    field_based: FieldBasedLike | None = None,
    idx_props: bool = True,
    **kwargs: Any,
) -> vs.VideoNode:
    index_files = self.index(self.normalize_filenames(file))

    return super().source(
        index_files,
        bits,
        matrix=matrix,
        transfer=transfer,
        primaries=primaries,
        chroma_location=chroma_location,
        color_range=color_range,
        field_based=field_based,
        idx_props=idx_props,
        **kwargs,
    )

source_func classmethod

source_func(path: DataType | SPathLike, *args: Any, **kwargs: Any) -> VideoNode
Source code in vssource/indexers/base.py
68
69
70
@classmethod
def source_func(cls, path: DataType | SPathLike, *args: Any, **kwargs: Any) -> vs.VideoNode:
    return cls._source_func(str(path), *args, **kwargs)

update_video_filenames

update_video_filenames(index_path: SPath, filepaths: list[SPath]) -> None
Source code in vssource/indexers/D2VWitch.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def update_video_filenames(self, index_path: SPath, filepaths: list[SPath]) -> None:
    with open(index_path, "r") as file:
        file_content = file.read()

    lines = file_content.split("\n")

    str_filepaths = list(map(str, filepaths))

    if "DGIndex" not in lines[0]:
        self.file_corrupted(index_path)

    if not (n_files := int(lines[1])) or n_files != len(str_filepaths):
        self.file_corrupted(index_path)

    end_videos = lines.index("")

    if lines[2:end_videos] == str_filepaths:
        return

    lines[2:end_videos] = str_filepaths

    with open(index_path, "w") as file:
        file.write("\n".join(lines))