File size: 7,152 Bytes
3133b5e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 |
import pyrootutils
root = pyrootutils.setup_root(
search_from=__file__,
indicator=[".project-root"],
pythonpath=True,
dotenv=True,
)
# ------------------------------------------------------------------------------------ #
# `pyrootutils.setup_root(...)` is an optional line at the top of each entry file
# that helps to make the environment more robust and convenient
#
# the main advantages are:
# - allows you to keep all entry files in "src/" without installing project as a package
# - makes paths and scripts always work no matter where is your current work dir
# - automatically loads environment variables from ".env" file if exists
#
# how it works:
# - the line above recursively searches for either ".git" or "pyproject.toml" in present
# and parent dirs, to determine the project root dir
# - adds root dir to the PYTHONPATH (if `pythonpath=True`), so this file can be run from
# any place without installing project as a package
# - sets PROJECT_ROOT environment variable which is used in "configs/paths/default.yaml"
# to make all paths always relative to the project root
# - loads environment variables from ".env" file in root dir (if `dotenv=True`)
#
# you can remove `pyrootutils.setup_root(...)` if you:
# 1. either install project as a package or move each entry file to the project root dir
# 2. simply remove PROJECT_ROOT variable from paths in "configs/paths/default.yaml"
# 3. always run entry files from the project root dir
#
# https://github.com/ashleve/pyrootutils
# ------------------------------------------------------------------------------------ #
import os
import timeit
from collections.abc import Iterable, Sequence
from typing import Any, Dict, Optional, Tuple, Union
import hydra
import pytorch_lightning as pl
from omegaconf import DictConfig, OmegaConf
from pie_datasets import Dataset, DatasetDict
from pie_modules.models import * # noqa: F403
from pie_modules.taskmodules import * # noqa: F403
from pytorch_ie import Document, Pipeline
from pytorch_ie.models import * # noqa: F403
from pytorch_ie.taskmodules import * # noqa: F403
from src import utils
from src.models import * # noqa: F403
from src.serializer.interface import DocumentSerializer
from src.taskmodules import * # noqa: F403
log = utils.get_pylogger(__name__)
def document_batch_iter(
dataset: Union[Sequence[Document], Iterable[Document]], batch_size: int
) -> Iterable[Sequence[Document]]:
if isinstance(dataset, Sequence):
for i in range(0, len(dataset), batch_size):
yield dataset[i : i + batch_size]
elif isinstance(dataset, Iterable):
docs = []
for doc in dataset:
docs.append(doc)
if len(docs) == batch_size:
yield docs
docs = []
if docs:
yield docs
else:
raise ValueError(f"Unsupported dataset type: {type(dataset)}")
@utils.task_wrapper
def predict(cfg: DictConfig) -> Tuple[dict, dict]:
"""Contains minimal example of the prediction pipeline. Uses a pretrained model to annotate
documents from a dataset and serializes them.
Args:
cfg (DictConfig): Configuration composed by Hydra.
Returns:
None
"""
# Set seed for random number generators in pytorch, numpy and python.random
if cfg.get("seed"):
pl.seed_everything(cfg.seed, workers=True)
# Init pytorch-ie dataset
log.info(f"Instantiating dataset <{cfg.dataset._target_}>")
dataset: DatasetDict = hydra.utils.instantiate(cfg.dataset, _convert_="partial")
# Init pytorch-ie pipeline
# The pipeline, and therefore the inference step, is optional to allow for easy testing
# of the dataset creation and processing.
pipeline: Optional[Pipeline] = None
if cfg.get("pipeline") and cfg.pipeline.get("_target_"):
log.info(f"Instantiating pipeline <{cfg.pipeline._target_}> from {cfg.model_name_or_path}")
pipeline = hydra.utils.instantiate(cfg.pipeline, _convert_="partial")
# Per default, the model is loaded with .from_pretrained() which already loads the weights.
# However, ckpt_path can be used to load different weights from any checkpoint.
if cfg.ckpt_path is not None:
pipeline.model = pipeline.model.load_from_checkpoint(checkpoint_path=cfg.ckpt_path).to(
pipeline.device
)
# auto-convert the dataset if the metric specifies a document type
dataset = pipeline.taskmodule.convert_dataset(dataset)
# Init the serializer
serializer: Optional[DocumentSerializer] = None
if cfg.get("serializer") and cfg.serializer.get("_target_"):
log.info(f"Instantiating serializer <{cfg.serializer._target_}>")
serializer = hydra.utils.instantiate(cfg.serializer, _convert_="partial")
# select the dataset split for prediction
dataset_predict = dataset[cfg.dataset_split]
object_dict = {
"cfg": cfg,
"dataset": dataset,
"pipeline": pipeline,
"serializer": serializer,
}
result: Dict[str, Any] = {}
if pipeline is not None:
log.info("Starting inference!")
prediction_time = 0.0
else:
log.warning("No prediction pipeline is defined, skip inference!")
prediction_time = None
document_batch_size = cfg.get("document_batch_size", None)
for docs_batch in (
document_batch_iter(dataset_predict, document_batch_size)
if document_batch_size
else [dataset_predict]
):
if pipeline is not None:
t_start = timeit.default_timer()
docs_batch = pipeline(docs_batch, inplace=False)
prediction_time += timeit.default_timer() - t_start # type: ignore
# serialize the documents
if serializer is not None:
# the serializer should not return the serialized documents, but write them to disk
# and instead return some metadata such as the path to the serialized documents
serializer_result = serializer(docs_batch)
if "serializer" in result and result["serializer"] != serializer_result:
log.warning(
f"serializer result changed from {result['serializer']} to {serializer_result}"
" during prediction. Only the last result is returned."
)
result["serializer"] = serializer_result
if prediction_time is not None:
result["prediction_time"] = prediction_time
# serialize config with resolved paths
if cfg.get("config_out_path"):
config_out_dir = os.path.dirname(cfg.config_out_path)
os.makedirs(config_out_dir, exist_ok=True)
OmegaConf.save(config=cfg, f=cfg.config_out_path)
result["config"] = cfg.config_out_path
return result, object_dict
@hydra.main(version_base="1.2", config_path=str(root / "configs"), config_name="predict.yaml")
def main(cfg: DictConfig) -> None:
result_dict, _ = predict(cfg)
return result_dict
if __name__ == "__main__":
utils.replace_sys_args_with_values_from_files()
utils.prepare_omegaconf()
main()
|