File size: 7,152 Bytes
3133b5e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
import pyrootutils

root = pyrootutils.setup_root(
    search_from=__file__,
    indicator=[".project-root"],
    pythonpath=True,
    dotenv=True,
)

# ------------------------------------------------------------------------------------ #
# `pyrootutils.setup_root(...)` is an optional line at the top of each entry file
# that helps to make the environment more robust and convenient
#
# the main advantages are:
# - allows you to keep all entry files in "src/" without installing project as a package
# - makes paths and scripts always work no matter where is your current work dir
# - automatically loads environment variables from ".env" file if exists
#
# how it works:
# - the line above recursively searches for either ".git" or "pyproject.toml" in present
#   and parent dirs, to determine the project root dir
# - adds root dir to the PYTHONPATH (if `pythonpath=True`), so this file can be run from
#   any place without installing project as a package
# - sets PROJECT_ROOT environment variable which is used in "configs/paths/default.yaml"
#   to make all paths always relative to the project root
# - loads environment variables from ".env" file in root dir (if `dotenv=True`)
#
# you can remove `pyrootutils.setup_root(...)` if you:
# 1. either install project as a package or move each entry file to the project root dir
# 2. simply remove PROJECT_ROOT variable from paths in "configs/paths/default.yaml"
# 3. always run entry files from the project root dir
#
# https://github.com/ashleve/pyrootutils
# ------------------------------------------------------------------------------------ #

import os
import timeit
from collections.abc import Iterable, Sequence
from typing import Any, Dict, Optional, Tuple, Union

import hydra
import pytorch_lightning as pl
from omegaconf import DictConfig, OmegaConf
from pie_datasets import Dataset, DatasetDict
from pie_modules.models import *  # noqa: F403
from pie_modules.taskmodules import *  # noqa: F403
from pytorch_ie import Document, Pipeline
from pytorch_ie.models import *  # noqa: F403
from pytorch_ie.taskmodules import *  # noqa: F403

from src import utils
from src.models import *  # noqa: F403
from src.serializer.interface import DocumentSerializer
from src.taskmodules import *  # noqa: F403

log = utils.get_pylogger(__name__)


def document_batch_iter(
    dataset: Union[Sequence[Document], Iterable[Document]], batch_size: int
) -> Iterable[Sequence[Document]]:
    if isinstance(dataset, Sequence):
        for i in range(0, len(dataset), batch_size):
            yield dataset[i : i + batch_size]
    elif isinstance(dataset, Iterable):
        docs = []
        for doc in dataset:
            docs.append(doc)
            if len(docs) == batch_size:
                yield docs
                docs = []
        if docs:
            yield docs
    else:
        raise ValueError(f"Unsupported dataset type: {type(dataset)}")


@utils.task_wrapper
def predict(cfg: DictConfig) -> Tuple[dict, dict]:
    """Contains minimal example of the prediction pipeline. Uses a pretrained model to annotate
    documents from a dataset and serializes them.

    Args:
        cfg (DictConfig): Configuration composed by Hydra.

    Returns:
        None
    """

    # Set seed for random number generators in pytorch, numpy and python.random
    if cfg.get("seed"):
        pl.seed_everything(cfg.seed, workers=True)

    # Init pytorch-ie dataset
    log.info(f"Instantiating dataset <{cfg.dataset._target_}>")
    dataset: DatasetDict = hydra.utils.instantiate(cfg.dataset, _convert_="partial")

    # Init pytorch-ie pipeline
    # The pipeline, and therefore the inference step, is optional to allow for easy testing
    # of the dataset creation and processing.
    pipeline: Optional[Pipeline] = None
    if cfg.get("pipeline") and cfg.pipeline.get("_target_"):
        log.info(f"Instantiating pipeline <{cfg.pipeline._target_}> from {cfg.model_name_or_path}")
        pipeline = hydra.utils.instantiate(cfg.pipeline, _convert_="partial")

        # Per default, the model is loaded with .from_pretrained() which already loads the weights.
        # However, ckpt_path can be used to load different weights from any checkpoint.
        if cfg.ckpt_path is not None:
            pipeline.model = pipeline.model.load_from_checkpoint(checkpoint_path=cfg.ckpt_path).to(
                pipeline.device
            )

        # auto-convert the dataset if the metric specifies a document type
        dataset = pipeline.taskmodule.convert_dataset(dataset)

    # Init the serializer
    serializer: Optional[DocumentSerializer] = None
    if cfg.get("serializer") and cfg.serializer.get("_target_"):
        log.info(f"Instantiating serializer <{cfg.serializer._target_}>")
        serializer = hydra.utils.instantiate(cfg.serializer, _convert_="partial")

    # select the dataset split for prediction
    dataset_predict = dataset[cfg.dataset_split]

    object_dict = {
        "cfg": cfg,
        "dataset": dataset,
        "pipeline": pipeline,
        "serializer": serializer,
    }
    result: Dict[str, Any] = {}
    if pipeline is not None:
        log.info("Starting inference!")
        prediction_time = 0.0
    else:
        log.warning("No prediction pipeline is defined, skip inference!")
        prediction_time = None
    document_batch_size = cfg.get("document_batch_size", None)
    for docs_batch in (
        document_batch_iter(dataset_predict, document_batch_size)
        if document_batch_size
        else [dataset_predict]
    ):
        if pipeline is not None:
            t_start = timeit.default_timer()
            docs_batch = pipeline(docs_batch, inplace=False)
            prediction_time += timeit.default_timer() - t_start  # type: ignore

        # serialize the documents
        if serializer is not None:
            # the serializer should not return the serialized documents, but write them to disk
            # and instead return some metadata such as the path to the serialized documents
            serializer_result = serializer(docs_batch)
            if "serializer" in result and result["serializer"] != serializer_result:
                log.warning(
                    f"serializer result changed from {result['serializer']} to {serializer_result}"
                    " during prediction. Only the last result is returned."
                )
            result["serializer"] = serializer_result

    if prediction_time is not None:
        result["prediction_time"] = prediction_time

    # serialize config with resolved paths
    if cfg.get("config_out_path"):
        config_out_dir = os.path.dirname(cfg.config_out_path)
        os.makedirs(config_out_dir, exist_ok=True)
        OmegaConf.save(config=cfg, f=cfg.config_out_path)
        result["config"] = cfg.config_out_path

    return result, object_dict


@hydra.main(version_base="1.2", config_path=str(root / "configs"), config_name="predict.yaml")
def main(cfg: DictConfig) -> None:
    result_dict, _ = predict(cfg)
    return result_dict


if __name__ == "__main__":
    utils.replace_sys_args_with_values_from_files()
    utils.prepare_omegaconf()
    main()