#!/usr/bin/env python
# coding: utf-8
import os
import re
import sys
from contextlib import contextmanager
from time import sleep
from typing import Any, List, Optional, Union
import cloudpickle
import numpy as np
import pandas as pd
import requests
from lazy_imports import try_import
from neomaril_codex.__utils import *
from neomaril_codex.base import *
from neomaril_codex.datasources import NeomarilDataset
from neomaril_codex.exceptions import *
from neomaril_codex.model import NeomarilModel
patt = re.compile(r"(\d+)")
class NeomarilTrainingLogger:
"""A class for logging Neomaril training runs.
Example
-------
.. code-block:: python
with training.log_train('Teste 1', X, y) as logger:
pipe.fit(X, y)
logger.save_model(pipe)
params = pipe.get_params()
params.pop('steps')
params.pop('simpleimputer')
params.pop('lgbmclassifier')
logger.save_params(params)
model_output = pd.DataFrame({"pred": pipe.predict(X), "proba": pipe.predict_proba(X)[:,1]})
logger.save_model_output(model_output)
auc = cross_val_score(pipe, X, y, cv=5, scoring="roc_auc")
f_score = cross_val_score(pipe, X, y, cv=5, scoring="f1")
logger.save_metric(name='auc', value=auc.mean())
logger.save_metric(name='f1_score', value=f_score.mean())
logger.set_python_version('3.10')
"""
def __init__(
self,
*,
name: str,
X_train: pd.DataFrame,
y_train: pd.DataFrame,
description: Optional[str] = None,
save_path: Optional[str] = None,
):
"""
Initialize a new NeomarilTrainingLogger.
Args:
name: The name of the training run.
X_train: The training data.
y_train: The training labels.
"""
self.name = name
self.X_train = X_train
self.y_train = y_train
self.description = description
self.model_outputs = None
self.model = None
self.metrics = {}
self.params = {}
self.requirements = None
self.python_version = f"{sys.version_info.major}.{sys.version_info.minor}"
self.extras = []
if not save_path:
dir_name = self.name.replace(" ", "_")
if not os.path.exists(f"./{dir_name}"):
os.mkdir(f"./{dir_name}")
self.save_path = f"./{dir_name}"
def save_model(self, model):
"""
Save the trained model to the logger.
Args:
model: The trained model.
"""
self.model = model
def save_metric(self, *, name, value):
"""
Save a metric to the logger.
Args:
name: The name of the metric.
value: The value of the metric.
"""
self.metrics[name] = value
def save_model_output(self, model_output):
"""
Save the model output to the logger.
Args:
model_output: The output of the trained model.
"""
self.model_outputs = model_output
def set_python_version(self, version: str):
"""
Set the Python version used to train the model.
Args:
version: The Python version.
"""
self.python_version = version
def set_requirements(self, requirements: str):
"""
Set the project requirements.
Args:
requirements: The path of project requirements.
"""
self.requirements = requirements
def save_plot(self, *, plot: object, save_filename: str):
"""
Save plot graphic image to the logger.
Args:
plot: A Matplotlib/Plotly/Seaborn graphic object.
save_filename: A name to save the plot.
"""
filepath = f"./{save_filename}.png"
with try_import() as plotly_import:
import plotly
if isinstance(plot, plotly.graph_objs.Figure):
self.save_plotly_plot(plot=plot, filepath=filepath)
return
with try_import() as seaborn_import:
import seaborn as sns
if isinstance(plot, sns.axisgrid.FacetGrid):
self.save_seaborn_or_matplotlib_plot(plot=plot, filepath=filepath)
return
with try_import() as matplotlib_import:
import matplotlib.pyplot as plt
if isinstance(plot, plt.Figure):
self.save_seaborn_or_matplotlib_plot(plot=plot, filepath=filepath)
return
raise ValueError("The plot only accepts plots of Matplotlib/Plotly/Seaborn")
def save_plotly_plot(self, *, plot, filepath):
image_data = plot.to_image()
with open(filepath, "wb") as f:
f.write(image_data)
self.add_extra(extra=filepath)
def save_seaborn_or_matplotlib_plot(self, *, plot, filepath):
plot.savefig(filepath)
self.add_extra(extra=filepath)
def set_extra(self, extra: list):
"""
Set the extra files list.
Args:
extra: A list of paths of the extra file.
"""
self.extras = extra
def add_extra(self, *, extra: Union[pd.DataFrame, str], filename: str = None):
"""
Add an extra file in the extra file list.
Args:
extra: A path of an extra file or a list to include in extra file list.
filename: A filename if the extra it's a DataFrame.
"""
if isinstance(extra, str):
if os.path.exists(extra):
self.extras.append(extra)
else:
raise FileNotFoundError("Extra file path not found!")
elif isinstance(extra, pd.DataFrame):
if filename:
self.extras.append(
self.__to_parquet(output_filename=filename, input_data=extra)
)
else:
raise InputError("Needs a filename to save the dataframe parquet.")
def add_requirements(self, filename: str):
"""
Add requirements file.
Args:
filename: The name of output filename to save.
"""
self.requirements = filename
def __to_parquet(self, *, output_filename: str, input_data: pd.DataFrame):
"""
Transform dataframe to parquet.
Args:
output_filename: The name of output filename to save.
input_data: A pandas dataframe to save.
"""
path = os.path.join(self.save_path, f"{output_filename}.parquet")
input_data.to_parquet(path)
return path
def __to_json(self, output_filename: str, input_data: dict):
"""
Transform dict to json.
Args:
output_filename: The name of output filename to save.
input_data: A dictionary to save.
"""
path = os.path.join(self.save_path, f"{output_filename}.json")
with open(path, "w", encoding="utf-8") as f:
json.dump(input_data, f)
return path
def __to_pickle(self, *, output_filename: str, input_data):
"""
Transform content to pickle.
Args:
output_filename: The name of output filename to save.
input_data: The content to save.
"""
path = os.path.join(self.save_path, f"{output_filename}.pkl")
with open(path, "wb") as f:
cloudpickle.dump(input_data, f)
return path
def _set_params(self):
missing = self.X_train.isna().sum()
missing_dict = {
k + "_missings": v
for k, v in missing[missing > 0].describe().to_dict().items()
if k != "count"
}
params = {
"shape": self.X_train.shape,
"cols_with_missing": len(missing[missing > 0]),
"missing_distribution": missing_dict,
}
try:
params["pipeline_steps"] = list(self.model.named_steps.keys())
except:
params["pipeline_steps"] = [
str(self.model.__class__).replace("<class '", "").replace("'>", "")
]
if "get_all_params" in dir(self.model):
hyperparameters = {
f"hyperparam_{k}": str(v)
for k, v in self.model.get_all_params().items()
if k != "task_type"
}
elif "get_params" in dir(self.model):
hyperparameters = {
"hyperparam_" + k: str(v)
for k, v in self.model.get_params().items()
if k not in params["pipeline_steps"] + ["steps", "memory", "verbose"]
}
params = {**params, **hyperparameters}
if len(self.y_train.value_counts()) < 10:
target_proportion = self.y_train.value_counts() / len(self.y_train)
target_proportion = target_proportion.to_dict()
target_proportion = [
{"target": k, "proportion": v} for k, v in target_proportion.items()
]
params["target_proportion"] = target_proportion
else:
params["target_distribution"] = {
k: v
for k, v in self.y_train.describe().to_dict().items()
if k != "count"
}
self.params = {**params, **self.params}
@staticmethod
def _parse_data_objects(obj: Any) -> pd.DataFrame:
"""
Tranform data types to dataframe
"""
if isinstance(obj, pd.Series):
return obj.to_frame()
elif isinstance(obj, np.ndarray):
array_df = pd.DataFrame(obj)
array_df.columns = [str(c) for c in array_df.columns]
return array_df
elif isinstance(obj, pd.DataFrame):
return obj
def _processing_logging_inputs(self):
"""
Processing of everything that be logged.
"""
self._set_params()
self.params = self.__to_json("params", self.params)
self.X_train = self.__to_parquet(
output_filename="features",
input_data=self._parse_data_objects(self.X_train),
)
self.y_train = self.__to_parquet(
output_filename="target", input_data=self._parse_data_objects(self.y_train)
)
self.model_outputs = self.__to_parquet(
output_filename="predictions",
input_data=self._parse_data_objects(self.model_outputs),
)
if self.model:
self.model = self.__to_pickle(
output_filename="model", input_data=self.model
)
if self.metrics:
self.metrics = self.__to_json("metrics", self.metrics)
[docs]class NeomarilTrainingExecution(NeomarilExecution):
"""
Class to manage trained models.
Attributes
----------
training_id : str
Training id (hash) from the experiment you want to access
group : str
Group the training is inserted. Default is 'datarisk' (public group)
exec_id : str
Executiong id for that especific training run
login : str
Login for authenticating with the client. You can also use the env variable NEOMARIL_USER to set this
password : str
Password for authenticating with the client. You can also use the env variable NEOMARIL_PASSWORD to set this
environment : str
Enviroment of Neomaril you are using.
run_data : dict
Metadata from the execution.
Raises
------
TrainingError
When the training can't be acessed in the server
AuthenticationError
Unvalid credentials
Example
-------
.. code-block:: python
from neomaril_codex.training import NeomarilTrainingClient
from neomaril_codex.base import NeomarilExecution
client = NeomarilTrainingClient('123456')
client.create_group('ex_group', 'Group for example purpose')
training = client.create_training_experiment('Training example', 'Classification', 'ex_group')
print(client.get_training(training.training_id, 'ex_group').training_data)
data_path = './samples/train/'
run = training.run_training('First test', data_path+'dados.csv', training_reference='train_model', training_type='Custom', python_version='3.9', requirements_file=data_path+'requirements.txt', wait_complete=True)
print(run.get_training_execution(run.exec_id))
print(run.download_result())
run.promote_model('Teste notebook promoted custom', 'score', data_path+'app.py', data_path+'schema.json', 'csv')
"""
def __init__(
self,
*,
training_id: str,
group: str,
exec_id: str,
login: Optional[str] = None,
password: Optional[str] = None,
url: str = None,
) -> None:
super().__init__(
parent_id=training_id,
exec_type="Training",
exec_id=exec_id,
login=login,
password=password,
url=url,
group=group,
)
self.training_id = training_id
self.group = group
self.training_type = self.execution_data["TrainingType"]
self.name = self.execution_data["RunName"]
self.run_data = self.execution_data["RunData"]
def __repr__(self) -> str:
return f"""Neomaril{self.exec_type}Execution(name="{self.name}",
exec_id="{self.exec_id}", status="{self.status}")"""
def __upload_model(
self,
*,
model_name: str,
model_reference: Optional[str] = None,
source_file: Optional[str] = None,
schema: Optional[Union[str, dict]] = None,
extra_files: Optional[list] = None,
env: Optional[str] = None,
requirements_file: Optional[str] = None,
operation: str = "Sync",
input_type: str = None,
) -> str:
"""
Upload the files to the server
Arguments
---------
model_name : str
The name of the model, in less than 32 characters
model_reference : str, optional
The name of the scoring function inside the source file
source_file : str, optional
Path of the source file. The file must have a scoring function that accepts two parameters: data (data for the request body of the model) and model_path (absolute path of where the file is located)
schema : Union[str, dict], optional
Path to a JSON or XML file with a sample of the input for the entrypoint function. A dict with the sample input can be send as well
extra_files list, optional
A optional list with additional files paths that should be uploaded. If the scoring function refer to this file they will be on the same folder as the source file
requirements_file : str, optional
Path of the requirements file. This will override the requirements used in trainning. The packages versions must be fixed eg: pandas==1.0
env : str, optional
Flag that choose which environment (dev, staging, production) of Neomaril you are using. Default is None
operation : str
Defines wich kind operation is beeing executed (Sync or Async). Default value is Sync
input_type : str
The type of the input file that should be 'json', 'csv' or 'parquet'
Raises
------
InputError
Some input parameters its invalid
Returns
-------
str
The new model id (hash)
"""
url = f"{self.base_url}/training/promote/{self.group}/{self.training_id}/{self.exec_id}"
form_data = {"name": model_name, "operation": operation, "schema": schema}
upload_data = []
if self.training_type != "AutoML":
form_data["model_reference"] = model_reference
upload_data = [("source", ("app.py", open(source_file, "rb")))]
if env:
upload_data.append(("env", (".env", open(env, "rb"))))
if requirements_file:
upload_data.append(
(
"requirements",
("requirements.txt", open(requirements_file, "rb")),
)
)
if extra_files:
extra_data = [
("extra", (c.split("/")[-1], open(c, "rb"))) for c in extra_files
]
upload_data += extra_data
else:
input_type = "AutoML"
schema_extesion = ".json"
input_type = "json"
if operation == "Async":
schema_extesion = schema.split(".")[-1]
if input_type == "json|csv|parquet":
raise InputError("Choose a input type from " + input_type)
upload_data += [
("schema", ("schema." + schema_extesion, parse_dict_or_file(schema)))
]
form_data["input_type"] = input_type
response = requests.post(
url,
data=form_data,
files=upload_data,
headers={
"Authorization": "Bearer "
+ refresh_token(*self.credentials, self.base_url)
},
)
if response.status_code == 201:
data = response.json()
model_id = data["ModelHash"]
logger.info(f'{data["Message"]} - Hash: "{model_id}"')
return model_id
else:
logger.error(f"Upload error: {response.text}")
raise InputError("Invalid parameters for model creation")
[docs] def get_status(self) -> dict:
"""
Gets the status of the related execution.
Raises
------
ExecutionError
Execution unavailable
Returns
-------
dict
Returns the execution status.
"""
url = f"{self.base_url}/training/status/{self.group}/{self.exec_id}"
response = requests.get(
url,
headers={
"Authorization": "Bearer "
+ refresh_token(*self.credentials, self.base_url)
},
)
if response.status_code not in [200, 410]:
logger.error(response.text)
raise ExecutionError(f'Execution "{self.exec_id}" unavailable')
result = response.json()
self.status = result["Status"]
self.execution_data["ExecutionState"] = result["Status"]
if self.status == "Succeeded":
url = f"{self.base_url}/training/describe/{self.group}/{self.training_id}/{self.exec_id}"
response = requests.get(
url,
headers={
"Authorization": "Bearer "
+ refresh_token(*self.credentials, self.base_url)
},
)
self.execution_data = response.json()["Description"]
self.run_data = self.execution_data["RunData"]
try:
del self.run_data["tags"]
except:
pass
return result
def __host_model(self, *, operation: str, model_id: str) -> None:
"""
Builds the model execution environment
Arguments
----------
operation : str
The model operation type (Sync or Async)
model_id : str
The uploaded model id (hash)
Raises
------
InputError
Some input parameters its invalid
"""
url = f"{self.base_url}/model/{operation}/host/{self.group}/{model_id}"
if operation == "sync":
url = url.replace("7070", "7071")
response = requests.get(
url,
headers={
"Authorization": "Bearer "
+ refresh_token(*self.credentials, self.base_url)
},
)
if response.status_code == 202:
logger.info(f"Model host in process - Hash: {model_id}")
elif response.status_code == 401:
logger.error(response.text)
raise AuthenticationError("Login not authorized")
elif response.status_code >= 500:
logger.error(response.text)
raise ServerError("Server Error")
else:
logger.error(response.text)
raise InputError("Invalid parameters for model creation")
[docs]class NeomarilTrainingExperiment(BaseNeomaril):
"""
Class to manage models being trained inside Neomaril
Attributes
----------
login : str
Login for authenticating with the client. You can also use the env variable NEOMARIL_USER to set this
password : str
Password for authenticating with the client. You can also use the env variable NEOMARIL_PASSWORD to set this
training_id : str
Training id (hash) from the experiment you want to access
group : str
Group the training is inserted. Default is 'datarisk' (public group)
environment : str
Flag that choose which environment of Neomaril you are using. Test your deployment first before changing to production. Default is True
executions : List[int]
Ids for the executions in that training
Raises
------
TrainingError
When the training can't be acessed in the server
AuthenticationError
Unvalid credentials
Example
-------
.. code-block:: python
from neomaril_codex.training import NeomarilTrainingClient
from neomaril_codex.base import NeomarilExecution
client = NeomarilTrainingClient('123456')
client.create_group('ex_group', 'Group for example purpose')
training = client.create_training_experiment('Training example', 'Classification', 'ex_group')
print(client.get_training(training.training_id, 'ex_group').training_data)
data_path = './samples/train/'
run = run = training.run_training('First test', data_path+'dados.csv', training_reference='train_model', training_type='Custom', python_version='3.9', requirements_file=data_path+'requirements.txt', wait_complete=True)
print(run.get_training_execution(run.exec_id))
print(run.download_result())
"""
def __init__(
self,
*,
training_id: str,
login: Optional[str] = None,
password: Optional[str] = None,
group: str = "datarisk",
url: str = "https://neomaril.staging.datarisk.net/",
) -> None:
super().__init__(login=login, password=password, url=url)
self.training_id = training_id
self.group = group
url = f"{self.base_url}/training/describe/{self.group}/{self.training_id}"
response = requests.get(
url,
headers={
"Authorization": "Bearer "
+ refresh_token(*self.credentials, self.base_url)
},
)
if response.status_code == 404:
raise ModelError(f'Experiment "{training_id}" not found.')
elif response.status_code == 401:
logger.error(response.text)
raise AuthenticationError("Login not authorized")
elif response.status_code >= 500:
logger.error(response.text)
raise ServerError(f'Unable to retrive experiment "{training_id}"')
self.training_data = response.json()["Description"]
self.model_type = self.training_data["ModelType"]
self.experiment_name = self.training_data["ExperimentName"]
self.executions = self.training_data["Executions"]
def __repr__(self) -> str:
return f"""NeomarilTrainingExperiment(name="{self.experiment_name}",
group="{self.group}",
training_id="{self.training_id}",
model_type={str(self.model_type)}
)"""
def __str__(self):
return f'NEOMARIL training experiment "{self.experiment_name} (Group: {self.group}, Id: {self.training_id})"'
def __upload_training(
self,
*,
run_name: str,
training_type: str = "External",
description: Optional[str] = None,
train_data: Optional[str] = None,
dataset: Union[str, NeomarilDataset] = None,
training_reference: Optional[str] = None,
python_version: str = "3.8",
conf_dict: Optional[Union[str, dict]] = None,
source_file: Optional[str] = None,
requirements_file: Optional[str] = None,
env: Optional[str] = None,
X_train=None,
y_train=None,
model_outputs=None,
model_file: Optional[str] = None,
model_metrics: Optional[Union[str, dict]] = None,
model_params: Optional[Union[str, dict]] = None,
model_hash: Optional[str] = None,
extra_files: Optional[list] = None,
) -> str:
"""
Upload the files to the server
Arguments
---------
run_name : str
The name of the model, in less than 32 characters
train_data : str
Path of the file with train data
training_type : str
Can be Custom, AutoML or External
description : str, optional
Description of the experiment
training_reference : str, optional
The name of the training function inside the source file. Just used when training_type is Custom
python_version : str
Python version for the model environment. Avaliable versions are 3.8, 3.9, 3.10. Defaults to '3.8'. Just used when training_type is Custom
conf_dict : Union[str, dict], optional
Path to a JSON file with a the AutoML configuration. A dict can be send as well. Just used when training_type is AutoML
source_file : str, optional
Path of the source file. The file must have a training function that accepts one parameter: model_path (absolute path of where the file is located). Just used when training_type is Custom
requirements_file : str, optional
Path of the requirements file. The packages versions must be fixed eg: pandas==1.0. Just used when training_type is Custom
env : str, optional
.env file to be used in your training enviroment. This will be encrypted in the server.
extra_files : list, optional
A optional list with additional files paths that should be uploaded. If the scoring function refer to this file they will be on the same folder as the source file. Just used when training_type is Custom
X_train: pd.DataFrame, optional
The training data.
y_train : pd.Series, optional
The training labels.
model_outputs : pd.DataFrame, optional
The model outputs.
model_file : str, optional
The path to the trained model file.
model_metrics : Union[str, dict], optional
The path to a JSON file with the model metrics or a dictionary with the metrics.
model_params : Union[str, dict], optional
The path to a JSON file with the model parameters or a dictionary with the parameters.
Raises
------
InputError
Some input parameters its invalid
Returns
-------
str
The new model id (hash)
"""
url = f"{self.base_url}/training/upload/{self.group}/{self.training_id}"
upload_data = []
form_data = {"run_name": run_name, "training_type": training_type}
if description:
form_data["description"] = description
if training_type != "External":
if train_data:
upload_data.append(
("train_data", (train_data.split("/")[-1], open(train_data, "rb")))
)
elif dataset:
dataset_hash = (
dataset if isinstance(dataset, str) else dataset.dataset_hash
)
form_data["dataset_hash"] = dataset_hash
if training_type == "Custom":
file_extesions = {"py": "app.py", "ipynb": "notebook.ipynb"}
upload_data = upload_data + [
(
"source",
(
file_extesions[source_file.split(".")[-1]],
open(source_file, "rb"),
),
),
("requirements", ("requirements.txt", open(requirements_file, "rb"))),
]
if env:
upload_data.append(("env", (".env", open(env, "r"))))
if extra_files:
extra_data = [
("extra", (c.split("/")[-1], open(c, "rb"))) for c in extra_files
]
upload_data += extra_data
form_data["training_reference"] = (training_reference,)
form_data["python_version"] = "Python" + python_version.replace(".", "")
elif training_type == "AutoML":
if conf_dict:
upload_data.append(
("conf_dict", ("conf.json", parse_dict_or_file(conf_dict)))
)
else:
raise InputError("conf_dict is mandatory for AutoML training")
elif training_type == "External":
upload_data = []
if model_hash:
form_data["model_hash"] = model_hash
if X_train:
print(f"X_train:{X_train}")
upload_data += [("features", ("features.parquet", open(X_train, "rb")))]
if y_train:
print(f"y_train:{y_train}")
upload_data += [("target", ("target.parquet", open(y_train, "rb")))]
if model_outputs:
print(f"model_outputs:{model_outputs}")
upload_data += [
("output", ("predictions.parquet", open(model_outputs, "rb"))),
]
if model_file:
print(f"model_file:{model_file}")
upload_data += [("model", open(model_file, "rb"))]
if model_params:
print(f"model_params:{model_params}")
upload_data += [("parameters", open(model_params, "rb"))]
if model_metrics:
print(f"model_metrics:{model_metrics}")
upload_data += [("metrics", open(model_metrics, "rb"))]
if requirements_file:
upload_data += [("requirements", open(requirements_file, "rb"))]
if extra_files:
extra_data = [("extra", open(path, "rb")) for path in extra_files]
upload_data += extra_data
if python_version:
form_data["python_version"] = "Python" + python_version.replace(".", "")
if env:
upload_data.append(("env", (".env", open(env, "r"))))
token = refresh_token(*self.credentials, self.base_url)
response = requests.post(
url,
data=form_data,
files=upload_data,
headers={"Authorization": "Bearer " + token},
)
message = response.text
if response.status_code == 201:
logger.info(message)
return re.search(patt, message).group(1)
elif response.status_code == 401:
logger.error(response.text)
raise AuthenticationError("Login not authorized")
elif response.status_code >= 500:
logger.error(response.text)
raise ServerError("Server Error")
else:
logger.error(message)
raise InputError("Bad input for training upload")
def __execute_training(self, exec_id: str) -> None:
"""
Builds the model execution environment
Arguments
---------
exec_id : str
The uploaded training execution id (hash)
Raises
------
InputError
Some input parameters its invalid
"""
url = f"{self.base_url}/training/execute/{self.group}/{self.training_id}/{exec_id}"
response = requests.get(
url,
headers={
"Authorization": "Bearer "
+ refresh_token(*self.credentials, self.base_url)
},
)
if response.status_code == 200:
logger.info(f"Model training starting - Hash: {self.training_id}")
elif response.status_code == 401:
logger.error(response.text)
raise AuthenticationError("Login not authorized")
elif response.status_code >= 500:
logger.error(response.text)
raise ServerError("Server Error")
else:
logger.error(response.text)
raise InputError("Invalid parameters for training execution")
def __refresh_execution_list(self):
url = f"{self.base_url}/training/describe/{self.group}/{self.training_id}"
response = requests.get(
url,
headers={
"Authorization": "Bearer "
+ refresh_token(*self.credentials, self.base_url)
},
)
if response.status_code == 404:
raise ModelError(f'Experiment "{self.training_id}" not found.')
elif response.status_code == 401:
logger.error(response.text)
raise AuthenticationError("Login not authorized")
elif response.status_code >= 500:
logger.error(response.text)
raise ServerError(f'Unable to retrive experiment "{self.training_id}"')
self.training_data = response.json()["Description"]
self.executions = [c["Id"] for c in self.training_data["Executions"]]
[docs] def run_training(
self,
*,
run_name: str,
training_type: str = "External",
description: Optional[str] = None,
train_data: Optional[str] = None,
dataset: Union[str, NeomarilDataset] = None,
training_reference: Optional[str] = None,
python_version: str = "3.8",
conf_dict: Optional[Union[str, dict]] = None,
source_file: Optional[str] = None,
requirements_file: Optional[str] = None,
extra_files: Optional[list] = None,
env: Optional[str] = None,
X_train=None,
y_train=None,
model_outputs=None,
model_file: Optional[str] = None,
model_metrics: Optional[Union[str, dict]] = None,
model_params: Optional[Union[str, dict]] = None,
model_hash: Optional[str] = None,
wait_complete: Optional[bool] = False,
) -> Union[dict, NeomarilExecution]:
"""
Runs a prediction from the current model.
Arguments
---------
run_name : str
The name of the model, in less than 32 characters
train_data : str
Path of the file with train data.
training_reference : str, optional
The name of the training function inside the source file. Just used when training_type is Custom
training_type : str
Can be Custom, AutoML or External
description : str, optional
Description of the experiment
python_version : str, optional
Python version for the model environment. Avaliable versions are 3.8, 3.9, 3.10. Defaults to '3.8'. Just used when training_type is Custom
conf_dict : Union[str, dict]
Path to a JSON file with a the AutoML configuration. A dict can be send as well. Just used when training_type is AutoML
source_file : str, optional
Path of the source file. The file must have a training function that accepts one parameter: model_path (absolute path of where the file is located). Just used when training_type is Custom
requirements_file : str
Path of the requirements file. The packages versions must be fixed eg: pandas==1.0. Just used when training_type is Custom
env : str, optional
.env file to be used in your training enviroment. This will be encrypted in the server.
extra_files : list, optional
A optional list with additional files paths that should be uploaded. If the scoring function refer to this file they will be on the same folder as the source file. Just used when training_type is Custom
wait_complete : bool, optional
Boolean that informs if a model training is completed (True) or not (False). Default value is False
Raises
------
InputError
Some input parameters its invalid
Returns
-------
Union[dict, NeomarilExecution]
The return of the scoring function in the source file for Sync models or the execution class for Async models.
Example
-------
>>> execution = run = training.run_training('First test', data_path+'dados.csv', training_reference='train_model', python_version='3.9', requirements_file=data_path+'requirements.txt', wait_complete=True)
"""
if training_type == "Custom":
input_validator = (
train_data
and (dataset or train_data)
and source_file
and requirements_file
and run_name
and training_reference
and python_version
)
fields_required = "train_data, dataset or train_data, source_file, requirements_file, run_name, training_reference, python_version"
elif training_type == "AutoML":
input_validator = (
train_data
and (dataset or train_data)
and conf_dict
and run_name
and python_version
)
fields_required = (
"train_data, dataset or train_data, conf_dict, run_name, python_version"
)
elif training_type == "External":
input_validator = run_name and python_version and X_train and y_train
fields_required = "run_name, python_version, X_train, y_train"
else:
input_validator = False
raise InputError("Training type needs be: Custom, AutoML or External.")
if not input_validator:
raise InputError(
f"The parameters {fields_required} it's mandatory on {training_type} training."
)
if training_type != "External" and not (train_data or dataset):
raise InputError(
"Invalid data input. Run training requires a train_data or dataset"
)
if python_version not in ["3.8", "3.9", "3.10"]:
raise InputError(
"Invalid python version. Avaliable versions are 3.8, 3.9, 3.10"
)
if training_type not in ["Custom", "AutoML", "External"]:
raise InputError(
f"Invalid training_type {training_type}. Should be one of the following: Custom, AutoML or External"
)
if training_type == "Custom":
exec_id = self.__upload_training(
run_name=run_name,
training_type=training_type,
description=description,
train_data=train_data,
dataset=dataset,
training_reference=training_reference,
python_version=python_version,
source_file=source_file,
env=env,
requirements_file=requirements_file,
extra_files=extra_files,
)
elif training_type == "AutoML":
exec_id = self.__upload_training(
run_name=run_name,
training_type=training_type,
description=description,
train_data=train_data,
dataset=dataset,
conf_dict=conf_dict,
)
elif training_type == "External":
exec_id = self.__upload_training(
run_name=run_name,
training_type=training_type,
description=description,
python_version=python_version,
requirements_file=requirements_file,
extra_files=extra_files,
X_train=X_train,
y_train=y_train,
model_outputs=model_outputs,
model_file=model_file,
model_metrics=model_metrics,
model_params=model_params,
model_hash=model_hash,
)
else:
raise InputError("Invalid training type")
if exec_id:
self.__execute_training(exec_id)
self.__refresh_execution_list()
run = NeomarilTrainingExecution(
training_id=self.training_id,
group=self.group,
exec_id=exec_id,
login=self.credentials[0],
password=self.credentials[1],
url=self.base_url,
)
response = run.get_status()
status = response["Status"]
if wait_complete:
print("Waiting the training run.", end="")
while status in ["Running", "Requested"]:
sleep(30)
print(".", end="", flush=True)
response = run.get_status()
status = response["Status"]
if status == "Failed":
logger.error(response["Message"])
raise ExecutionError("Training execution failed")
else:
return run
[docs] def get_training_execution(
self, exec_id: Optional[str] = None
) -> NeomarilTrainingExecution:
"""
Get a execution instace.
Arguments
---------
exec_id : str, optional
Execution id. If not informed we get the last execution.
Returns
-------
NeomarilExecution
The choosen execution
"""
if not exec_id:
self.__refresh_execution_list()
logger.info("Execution id not informed. Getting last execution")
exec_id = max(self.executions)
try:
int(exec_id)
except:
InputError(
"Unvalid execution Id informed or this training dont have a successful execution yet."
)
exec = NeomarilTrainingExecution(
training_id=self.training_id,
group=self.group,
exec_id=exec_id,
login=self.credentials[0],
password=self.credentials[1],
url=self.base_url,
)
exec.get_status()
return exec
[docs] def get_all_training_executions(self) -> List[NeomarilTrainingExecution]:
"""
Get all executions from that experiment.
Returns
-------
List[NeomarilExecution]
All executions from that training
"""
self.__refresh_execution_list()
return [self.get_training_execution(e) for e in self.executions]
[docs] @contextmanager
def log_train(
self,
*,
name,
X_train,
y_train,
description: Optional[str] = None,
save_path: Optional[str] = None,
):
try:
self.trainer = NeomarilTrainingLogger(
name=name,
X_train=X_train,
y_train=y_train,
description=description,
save_path=save_path,
)
yield self.trainer
finally:
self.trainer._processing_logging_inputs()
self.run_training(
run_name=self.trainer.name,
description=self.trainer.description,
training_type="External",
python_version=self.trainer.python_version,
requirements_file=self.trainer.requirements,
extra_files=self.trainer.extras,
X_train=self.trainer.X_train,
y_train=self.trainer.y_train,
model_outputs=self.trainer.model_outputs,
model_file=self.trainer.model,
model_metrics=self.trainer.metrics,
model_params=self.trainer.params,
)
[docs]class NeomarilTrainingClient(BaseNeomarilClient):
"""
Class for client for acessing Neomaril and manage models
Attributes
----------
login : str
Login for authenticating with the client. You can also use the env variable NEOMARIL_USER to set this
password : str
Password for authenticating with the client. You can also use the env variable NEOMARIL_PASSWORD to set this
url : str
URL to Neomaril Server. Default value is https://neomaril.staging.datarisk.net, use it to test your deployment first before changing to production. You can also use the env variable NEOMARIL_URL to set this
Raises
------
AuthenticationError
Unvalid credentials
ServerError
Server unavailable
Example
-------
.. code-block:: python
from neomaril_codex.training import NeomarilTrainingClient
client = NeomarilTrainingClient('123456')
client.create_group('ex_group', 'Group for example purpose')
training = client.create_training_experiment('Training example', 'Classification', 'Custom', 'ex_group')
print(client.get_training(training.training_id, 'ex_group').training_data)
"""
def __repr__(self) -> str:
return f'API version {self.version} - NeomarilTrainingClient(url="{self.base_url}", Token="{self.user_token}")'
def __str__(self):
return f"NEOMARIL {self.base_url} Training client:{self.user_token}"
[docs] def get_training(
self, *, training_id: str, group: str = "datarisk"
) -> NeomarilTrainingExperiment:
"""
Acess a model using its id
Arguments
---------
training_id : str
Training id (hash) that needs to be acessed
group : str
Group the model is inserted. Default is 'datarisk' (public group)
Raises
------
TrainingError
Model unavailable
ServerError
Unknown return from server
Returns
-------
NeomarilTrainingExperiment
A NeomarilTrainingExperiment instance with the training hash from `training_id`
Example
-------
>>> training = get_training('Tfb3274827a24dc39d5b78603f348aee8d3dbfe791574dc4a6681a7e2a6622fa')
"""
return NeomarilTrainingExperiment(
training_id=training_id,
login=self.credentials[0],
password=self.credentials[1],
group=group,
url=self.base_url,
)
def __get_repeated_thash(self, model_type: str, experiment_name: str, group: str) -> str | None:
"""Look for a previous train experiment.
Args:
experiment_name (str): name given to the training, should be not null, case-sensitive, have between 3 and 32 characters,
that could be alphanumeric including accentuation (for example: 'é', à', 'ç','ñ') and space,
without blank spaces and special characters
model_type (str): type of the model being trained. It can be
Classification: for ML algorithms related to classification (predicts discrete class labels) problems;
Regression: the ones that will use regression (predict a continuous quantity) algorithms;
Unsupervised: for training that will use ML algorithms without supervision.
group (str): name of the group, previous created, where the training will be inserted
Raises:
InputError: some part of the data is incorrect
AuthenticationError: user has insufficient permissions
ServerError: server is not available
Exception: generated exception in case of the response to the request is different than 201
Returns:
str | None: THash if it is found, otherwise, None is returned
"""
url = f"{self.base_url}/training/search"
response = requests.get(
url,
headers={
"Authorization": "Bearer "
+ refresh_token(*self.credentials, self.base_url)
},
)
if response.status_code == 400:
logger.error(response.text)
raise InputError("Bad Input")
if response.status_code == 401:
logger.error(response.text)
raise AuthenticationError("Login not authorized")
if response.status_code >= 500:
logger.error(response.text)
raise ServerError("Server Error")
if response.status_code != 200:
logger.error(response.text)
raise Exception("Unexpected error!")
results = response.json().get("Results")
for result in results:
condition = (
result["ExperimentName"] == experiment_name
and result["GroupName"] == group
and result["ModelType"] == model_type
)
if condition:
logger.info("Found experiment with same attributes...")
return result["TrainingHash"]
def __create(self, experiment_name: str, model_type: str, group: str) -> str:
"""Creates a train experiment. A train experiment can aggregate multiple training runs (also called executions).
Each execution can eventually become a deployed model or not.
Args:
experiment_name (str): name given to the training, should be not null, case-sensitive, have between 3 and 32 characters,
that could be alphanumeric including accentuation (for example: 'é', à', 'ç','ñ') and space,
without blank spaces and special characters
model_type (str): type of the model being trained. It can be
Classification: for ML algorithms related to classification (predicts discrete class labels) problems;
Regression: the ones that will use regression (predict a continuous quantity) algorithms;
Unsupervised: for training that will use ML algorithms without supervision.
group (str): name of the group, previous created, where the training will be inserted
Raises:
InputError: some part of the data is incorrect
AuthenticationError: user has insufficient permissions
ServerError: server is not available
Exception: generated exception in case of the response to the request is different than 201
Returns:
str: training hash of the experiment
"""
url = f"{self.base_url}/training/register/{group}"
data = {"experiment_name": experiment_name, "model_type": model_type}
response = requests.post(
url,
data=data,
headers={
"Authorization": "Bearer "
+ refresh_token(*self.credentials, self.base_url)
},
)
if response.status_code == 400:
logger.error(response.text)
raise InputError("Bad Input")
if response.status_code == 401:
logger.error(response.text)
raise AuthenticationError("Login not authorized")
if response.status_code >= 500:
logger.error(response.text)
raise ServerError("Server Error")
if response.status_code != 201:
logger.error(response.text)
raise Exception("Unexpected error!")
response_data = response.json()
logger.info(response_data["Message"])
training_id = response_data["TrainingHash"]
return training_id
[docs] def create_training_experiment(
self, *, experiment_name: str, model_type: str, group: str = "datarisk", force: bool = False
) -> NeomarilTrainingExperiment:
"""
Create a new training experiment on Neomaril.
Arguments
---------
experiment_name : str
The name of the experiment, in less than 32 characters
model_type : str
The name of the scoring function inside the source file.
group : str
Group the model is inserted. Default to 'datarisk' (public group)
force: bool
Forces to create a new training with the same model_type, experiment_name, group
Raises
------
InputError
Some input parameters its invalid
ServerError
Unknow internal server error
Returns
-------
NeomarilTrainingExperiment
A NeomarilTrainingExperiment instance with the training hash from `training_id`
Example
-------
>>> training = client.create_training_experiment('Training example', 'Classification', 'ex_group')
"""
if group:
group = (
group.lower()
.strip()
.replace(" ", "_")
.replace(".", "_")
.replace("-", "_")
)
groups = [g.get("Name") for g in self.list_groups()]
if group not in groups:
raise GroupError("Group dont exist. Create a group first.")
else:
group = "datarisk"
logger.info("Group not informed, using default 'datarisk' group")
if model_type not in ["Classification", "Regression", "Unsupervised"]:
raise InputError(
f"Invalid model_type {model_type}. Should be one of the following: Classification, Regression or "
f"Unsupervised"
)
logger.info("Trying to load experiment...")
training_id = self.__get_repeated_thash(model_type=model_type, experiment_name=experiment_name, group=group)
if force or training_id is None:
msg = ("The experiment you're creating has identical name, group, and model type attributes to an existing one. "
+ "Since forced creation is active, we will continue with the process as specified"
if force else "Could not find experiment. Creating a new one...")
logger.info(msg)
training_id = self.__create(experiment_name=experiment_name, model_type=model_type, group=group)
return NeomarilTrainingExperiment(
training_id=training_id,
login=self.credentials[0],
password=self.credentials[1],
group=group,
url=self.base_url,
)