Skip to content

D2VWitch

Classes:

D2VWitch

D2VWitch(
    *,
    bin_path: SPathLike | MissingT = MISSING,
    ext: str | MissingT = MISSING,
    force: bool = True,
    default_out_folder: SPathLike | Literal[False] | None = None,
    **kwargs: Any
)

Bases: ExternalIndexer

Methods:

Attributes:

Source code in vssource/indexers/base.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def __init__(
    self,
    *,
    bin_path: SPathLike | MissingT = MISSING,
    ext: str | MissingT = MISSING,
    force: bool = True,
    default_out_folder: SPathLike | Literal[False] | None = None,
    **kwargs: Any,
) -> None:
    super().__init__(force=force, **kwargs)

    if bin_path is MISSING:
        bin_path = self._bin_path

    if ext is MISSING:
        ext = self._ext

    self.bin_path = SPath(bin_path)
    self.ext = ext
    self.default_out_folder = default_out_folder

bin_path instance-attribute

bin_path = SPath(bin_path)

default_out_folder instance-attribute

default_out_folder = default_out_folder

ext instance-attribute

ext = ext

force instance-attribute

force = force

indexer_kwargs instance-attribute

indexer_kwargs = kwargs

ensure_obj classmethod

ensure_obj(
    indexer: str | type[Self] | Self | None = None,
    /,
    func_except: FuncExcept | None = None,
) -> Self

Ensure that the input is a indexer instance, resolving it if necessary.

Parameters:

  • indexer

    (str | type[Self] | Self | None, default: None ) –

    Indexer identifier (string, class, or instance). Plugin namespace is also supported.

  • func_except

    (FuncExcept | None, default: None ) –

    Function returned for custom error handling.

Returns:

  • Self

    Indexer instance.

Source code in vssource/indexers/base.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
@classmethod
def ensure_obj(
    cls, indexer: str | type[Self] | Self | None = None, /, func_except: FuncExcept | None = None
) -> Self:
    """
    Ensure that the input is a indexer instance, resolving it if necessary.

    Args:
        indexer: Indexer identifier (string, class, or instance). Plugin namespace is also supported.
        func_except: Function returned for custom error handling.

    Returns:
        Indexer instance.
    """
    return _base_ensure_obj(cls, indexer, func_except)

file_corrupted

file_corrupted(index_path: SPath) -> None
Source code in vssource/indexers/base.py
362
363
364
365
366
367
368
369
def file_corrupted(self, index_path: SPath) -> None:
    if self.force:
        try:
            index_path.unlink()
        except OSError:
            raise CustomRuntimeError("Index file corrupted, tried to delete it and failed.", self.__class__)
    else:
        raise CustomRuntimeError("Index file corrupted! Delete it and retry.", self.__class__)

from_param classmethod

from_param(
    indexer: str | type[Self] | Self | None = None,
    /,
    func_except: FuncExcept | None = None,
) -> type[Self]

Resolve and return an Indexer type from a given input (string, type, or instance).

Parameters:

  • indexer

    (str | type[Self] | Self | None, default: None ) –

    Indexer identifier (string, class, or instance). Plugin namespace is also supported.

  • func_except

    (FuncExcept | None, default: None ) –

    Function returned for custom error handling.

Returns:

Source code in vssource/indexers/base.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
@classmethod
def from_param(
    cls, indexer: str | type[Self] | Self | None = None, /, func_except: FuncExcept | None = None
) -> type[Self]:
    """
    Resolve and return an Indexer type from a given input (string, type, or instance).

    Args:
        indexer: Indexer identifier (string, class, or instance). Plugin namespace is also supported.
        func_except: Function returned for custom error handling.

    Returns:
        Resolved indexer type.
    """
    return _base_from_param(cls, indexer, func_except)

get_cmd

get_cmd(files: list[SPath], output: SPath) -> list[str]

Returns the indexer command

Source code in vssource/indexers/D2VWitch.py
27
28
def get_cmd(self, files: list[SPath], output: SPath) -> list[str]:
    return list(map(str, [self._get_bin_path(), *files, "--output", output]))

get_idx_file_path

get_idx_file_path(path: SPath) -> SPath
Source code in vssource/indexers/base.py
359
360
def get_idx_file_path(self, path: SPath) -> SPath:
    return path.with_suffix(f".{self.ext}")

get_info

get_info(index_path: SPath, file_idx: int = -1) -> D2VIndexFileInfo

Returns info about the indexing file

Source code in vssource/indexers/D2VWitch.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
def get_info(self, index_path: SPath, file_idx: int = -1) -> D2VIndexFileInfo:
    with open(index_path, "r") as f:
        file_content = f.read()

    lines = file_content.split("\n")

    head, lines = lines[:2], lines[2:]

    if "DGIndex" not in head[0]:
        self.file_corrupted(index_path)

    raw_header, lines = self._split_lines(self._split_lines(lines)[1])

    header = D2VIndexHeader()

    for rlin in raw_header:
        if split_val := rlin.rstrip().split("="):
            key: str = split_val[0].upper()
            values: list[str] = ",".join(split_val[1:]).split(",")
        else:
            continue

        if key == "STREAM_TYPE":
            header.stream_type = int(values[0])
        elif key == "MPEG_TYPE":
            header.MPEG_type = int(values[0])
        elif key == "IDCT_ALGORITHM":
            header.iDCT_algorithm = int(values[0])
        elif key == "YUVRGB_SCALE":
            header.YUVRGB_scale = int(values[0])
        elif key == "LUMINANCE_FILTER":
            header.luminance_filter = tuple(map(int, values))
        elif key == "CLIPPING":
            header.clipping = list(map(int, values))
        elif key == "ASPECT_RATIO":
            header.aspect = Fraction(*list(map(int, values[0].split(":"))))
        elif key == "PICTURE_SIZE":
            header.pic_size = str(values[0])
        elif key == "FIELD_OPERATION":
            header.field_op = int(values[0])
        elif key == "FRAME_RATE":
            if matches := re.search(r".*\((\d+\/\d+)", values[0]):
                header.frame_rate = Fraction(matches.group(1))
        elif key == "LOCATION":
            header.location = list(map(partial(int, base=16), values))

    frame_data = list[D2VIndexFrameData]()

    if file_idx >= 0:
        for rawline in lines:
            if len(rawline) == 0:
                break

            line = rawline.split(" ", maxsplit=7)

            ffile_idx = int(line[2])

            if ffile_idx < file_idx:
                continue
            elif ffile_idx > file_idx:
                break

            frame_data.append(
                D2VIndexFrameData(
                    int(line[1]),
                    "I",
                    int(line[5]),
                    int(line[6]),
                    int(line[0], 16),
                    int(line[4]),
                    int(line[3]),
                    [int(a, 16) for a in line[7:]],
                )
            )
    elif file_idx == -1:
        for rawline in lines:
            if len(rawline) == 0:
                break

            line = rawline.split(" ")

            frame_data.append(
                D2VIndexFrameData(
                    int(line[1]),
                    "I",
                    int(line[5]),
                    int(line[6]),
                    int(line[0], 16),
                    int(line[4]),
                    int(line[3]),
                    [int(a, 16) for a in line[7:]],
                )
            )

    return D2VIndexFileInfo(index_path, file_idx, header, frame_data)

get_joined_names classmethod

get_joined_names(files: list[SPath]) -> str
Source code in vssource/indexers/base.py
144
145
146
@classmethod
def get_joined_names(cls, files: list[SPath]) -> str:
    return "_".join([file.name for file in files])

get_out_folder

get_out_folder(
    output_folder: SPathLike | Literal[False] | None = None,
    file: SPath | None = None,
) -> SPath
Source code in vssource/indexers/base.py
346
347
348
349
350
351
352
353
354
355
356
357
def get_out_folder(
    self, output_folder: SPathLike | Literal[False] | None = None, file: SPath | None = None
) -> SPath:
    if output_folder is None:
        return SPath(file).get_folder() if file else self.get_out_folder(False)

    if not output_folder:
        from tempfile import gettempdir

        return SPath(gettempdir())

    return SPath(output_folder)

get_video_idx_path

get_video_idx_path(
    folder: SPath, file_hash: str, video_name: SPathLike
) -> SPath
Source code in vssource/indexers/base.py
417
418
419
420
421
422
def get_video_idx_path(self, folder: SPath, file_hash: str, video_name: SPathLike) -> SPath:
    vid_name = SPath(video_name).stem
    current_indxer = SPath(self._bin_path).name
    filename = "_".join([file_hash, vid_name, current_indxer])

    return self.get_idx_file_path(PackageStorage(folder).get_file(filename))

get_videos_hash classmethod

get_videos_hash(files: list[SPath]) -> str
Source code in vssource/indexers/base.py
148
149
150
151
152
153
154
@classmethod
def get_videos_hash(cls, files: list[SPath]) -> str:
    from hashlib import md5

    length = sum(file.stat().st_size for file in files)
    to_hash = length.to_bytes(32, "little") + cls.get_joined_names(files).encode()
    return md5(to_hash).hexdigest()

index

index(
    files: Sequence[SPath],
    force: bool = False,
    split_files: bool = False,
    output_folder: SPathLike | Literal[False] | None = None,
    *cmd_args: str
) -> list[SPath]
Source code in vssource/indexers/base.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
def index(
    self,
    files: Sequence[SPath],
    force: bool = False,
    split_files: bool = False,
    output_folder: SPathLike | Literal[False] | None = None,
    *cmd_args: str,
) -> list[SPath]:
    if len(unique_folders := list({f.get_folder().to_str() for f in files})) > 1:
        return [
            c
            for s in (
                self.index(
                    [f for f in files if f.get_folder().to_str() == folder], force, split_files, output_folder
                )
                for folder in unique_folders
            )
            for c in s
        ]

    dest_folder = self.get_out_folder(output_folder, files[0])

    files = sorted(set(files))

    hash_str = self.get_videos_hash(files)

    def _index(files: list[SPath], output: SPath) -> None:
        if output.is_file():
            if output.stat().st_size == 0 or force:
                output.unlink()
            else:
                return self.update_video_filenames(output, files)
        return self._run_index(files, output, cmd_args)

    if not split_files:
        output = self.get_video_idx_path(dest_folder, hash_str, "JOINED" if len(files) > 1 else "SINGLE")
        _index(files, output)
        return [output]

    outputs = [self.get_video_idx_path(dest_folder, hash_str, file.name) for file in files]

    for file, output in zip(files, outputs):
        _index([file], output)

    return outputs

normalize_filenames classmethod

normalize_filenames(file: SPathLike | Iterable[SPathLike]) -> list[SPath]
Source code in vssource/indexers/base.py
161
162
163
164
165
166
167
168
169
170
171
@classmethod
def normalize_filenames(cls, file: SPathLike | Iterable[SPathLike]) -> list[SPath]:
    files = list[SPath]()

    for f in to_arr(file):
        if str(f).startswith("file:///"):
            f = str(f)[8::]

        files.append(SPath(f))

    return files

source

source(
    file: SPathLike | Iterable[SPathLike],
    bits: int | None = None,
    *,
    matrix: MatrixLike | None = None,
    transfer: TransferLike | None = None,
    primaries: PrimariesLike | None = None,
    chroma_location: ChromaLocation | None = None,
    color_range: ColorRangeLike | None = None,
    field_based: FieldBasedLike | None = None,
    idx_props: bool = True,
    **kwargs: Any
) -> VideoNode

Load one or more input files using the indexer and return a processed clip.

The returned clip is passed through initialize_clip to apply bit depth conversion and frame props initialization.

Source code in vssource/indexers/base.py
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
@inject_self
def source(
    self,
    file: SPathLike | Iterable[SPathLike],
    bits: int | None = None,
    *,
    matrix: MatrixLike | None = None,
    transfer: TransferLike | None = None,
    primaries: PrimariesLike | None = None,
    chroma_location: ChromaLocation | None = None,
    color_range: ColorRangeLike | None = None,
    field_based: FieldBasedLike | None = None,
    idx_props: bool = True,
    **kwargs: Any,
) -> vs.VideoNode:
    index_files = self.index(self.normalize_filenames(file))

    return super().source(
        index_files,
        bits,
        matrix=matrix,
        transfer=transfer,
        primaries=primaries,
        chroma_location=chroma_location,
        color_range=color_range,
        field_based=field_based,
        idx_props=idx_props,
        **kwargs,
    )

source_func classmethod

source_func(path: SPathLike, **kwargs: Any) -> VideoNode
Source code in vssource/indexers/base.py
156
157
158
159
@classmethod
def source_func(cls, path: SPathLike, **kwargs: Any) -> vs.VideoNode:
    log.debug("%s: indexing %r; arguments: %r", cls, path, kwargs)
    return cls._source_func(str(path), **kwargs)

update_video_filenames

update_video_filenames(index_path: SPath, filepaths: list[SPath]) -> None
Source code in vssource/indexers/D2VWitch.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def update_video_filenames(self, index_path: SPath, filepaths: list[SPath]) -> None:
    with open(index_path, "r") as file:
        file_content = file.read()

    lines = file_content.split("\n")

    str_filepaths = list(map(str, filepaths))

    if "DGIndex" not in lines[0]:
        self.file_corrupted(index_path)

    if not (n_files := int(lines[1])) or n_files != len(str_filepaths):
        self.file_corrupted(index_path)

    end_videos = lines.index("")

    if lines[2:end_videos] == str_filepaths:
        return

    lines[2:end_videos] = str_filepaths

    with open(index_path, "w") as file:
        file.write("\n".join(lines))