import os
import yaml
import json
from loguru import logger
from typing import Optional
from datetime import datetime
from neomaril_codex.exceptions import *
from neomaril_codex.training import *
from neomaril_codex.model import *
[docs]class NeomarilPipeline(BaseNeomaril):
"""
Class to construct and orchestrates the flow data of the models inside Neomaril.
Atributtes
----------
login : str
Login for authenticating with the client. You can also use the env variable NEOMARIL_USER to set this
password : str
Password for authenticating with the client. You can also use the env variable NEOMARIL_PASSWORD to set this
group : str
Group the model is inserted
url : str
URL to Neomaril Server. Default value is https://neomaril.staging.datarisk.net, use it to test your deployment first before changing to production. You can also use the env variable NEOMARIL_URL to set this
python_version : str
Python version for the model environment. Avaliable versions are 3.8, 3.9, 3.10. Defaults to '3.9'
Example
--------
.. code-block:: python
from neomaril_codex.pipeline import NeomarilPipeline
pipeline = NeomarilPipeline.from_config_file('./samples/pipeline.yml')
pipeline.register_monitoring_config(directory = "./samples/monitoring", preprocess = "preprocess.py", preprocess_function = "score", shap_function = "score", config = "configuration.json", packages = "requirements.txt")
pipeline.start()
pipeline.run_monitoring('2', 'Mb29d61da4324a39a8bc2e0946f213b4959643916d354bf39940de2124f1e9d8')
"""
def __init__(self, *, group:str, login:Optional[str]=None, password:Optional[str]=None, url:str='https://neomaril.staging.datarisk.net/', python_version:float=3.9) -> None:
super().__init__(login=login, password=password, url=url)
self.group = group
self.python_version = python_version
self.train_config = None
self.deploy_config = None
self.monitoring_config = None
[docs] def register_train_config(self, **kwargs)-> dict:
"""
Set the files for configure the training
Parameters
----------
kwargs : list or dict
List or dictionary with the necessary files for training
"""
self.train_config = kwargs
[docs] def register_deploy_config(self, **kwargs) -> dict:
"""
Set the files for configure the deploy
Parameters
----------
kwargs : list or dict
List or dictionary with the necessary files for deploy
"""
self.deploy_config = kwargs
[docs] def register_monitoring_config(self, **kwargs) -> dict:
"""
Set the files for configure the monitoring
Parameters
----------
kwargs : list or dict
List or dictionary with the necessary files for monitoring
Example
-------
>>> pipeline.register_monitoring_config(directory = "./samples/monitoring", preprocess = "preprocess.py", preprocess_function = "score", shap_function = "score", config = "configuration.json", packages = "requirements.txt")
"""
self.monitoring_config = kwargs
[docs] @staticmethod
def from_config_file(path):
"""
Load the configuration files for orchestrate the model
Parameters
----------
path : str
Path of the configuration file, but it could be a dict
Raises
------
PipelineError
Undefined credentials
Returns
-------
NeomarilPipeline
The new pipeline
Example
--------
>>> pipeline = NeomarilPipeline.from_config_file('./samples/pipeline-just-model.yml')
>>> pipeline.register_monitoring_config(directory = "./samples/monitoring", preprocess = "preprocess.py", preprocess_function = "score", shap_function = "score", config = "configuration.json", packages = "requirements.txt")
>>> pipeline.start()
"""
with open(path, 'rb') as stream:
try:
conf=yaml.safe_load(stream)
except yaml.YAMLError as exc:
print(exc)
load_dotenv()
logger.info('Loading .env')
login = os.getenv("NEOMARIL_USER")
if not login:
raise PipelineError("When using a config file the environment variable NEOMARIL_USER must be defined")
password = os.getenv("NEOMARIL_PASSWORD")
if not password:
raise PipelineError("When using a config file the environment variable NEOMARIL_PASSWORD must be defined")
url = os.getenv('NEOMARIL_URL', conf.get('url'))
pipeline = NeomarilPipeline(
group=conf['group'],
login=login,
password=password,
url=url,
python_version=conf['python_version']
)
if 'training' in conf.keys():
pipeline.register_train_config(**conf['training'])
if 'deploy' in conf.keys():
pipeline.register_deploy_config(**conf['deploy'])
if 'monitoring' in conf.keys():
pipeline.register_monitoring_config(**conf['monitoring'])
return pipeline
[docs] def run_training(self) -> tuple[str, str]:
"""
Run the training process
Raises
------
TrainingError
Training has failed
Returns
-------
tuple[str, str]
A tuple with the 'training_id' and the 'exec_id'
Example
-------
>>> pipeline.run_training()
"""
logger.info('Running training')
client = NeomarilTrainingClient(login=self.credentials[0], password=self.credentials[1], url=self.base_url)
client.create_group(name=self.group, description=self.group)
conf = self.train_config
training = client.create_training_experiment(
experiment_name=conf['experiment_name'],
model_type=conf['model_type'],
group=self.group
)
PATH = conf['directory']
run_name = conf.get('run_name', 'Pipeline run '+str(datetime.now()))
extra_files = conf.get('extra')
if conf['training_type'] == 'Custom':
run = training.run_training(
run_name=run_name,
training_type=os.path.join(PATH, conf['data']),
source_file=os.path.join(PATH, conf['source']),
requirements_file=os.path.join(PATH, conf['packages']),
training_reference=conf['train_function'],
extra_files=[os.path.join(PATH,e) for e in extra_files] if extra_files else None,
python_version=str(self.python_version),
wait_complete=True
)
elif conf['training_type'] == 'AutoML':
run = training.run_training(
run_name=run_name,
training_type=os.path.join(PATH, conf['data']),
conf_dict=os.path.join(PATH, conf['config']),
wait_complete=True
)
status = run.get_status()
if status['Status'] == "Succeeded":
logger.info('Model training finished')
return training.training_id, run.exec_id
else:
raise TrainingError('Training failed: '+status['Message'])
[docs] def run_deploy(self, training_id:Optional[str]=None) -> str:
"""
Run the deploy process
Arguments
----------
training_id : str, optional
The id for the training process that you want to deploy now
Raises
------
ModelError
Deploy has failed
Returns
-------
str
The new Model id (hash)
Example
-------
>>> pipeline.run_deploy('Mb29d61da4324a39a8bc2e0946f213b4959643916d354bf39940de2124f1e9d8')
"""
conf = self.deploy_config
PATH = conf['directory']
extra_files = conf.get('extra')
if training_id:
logger.info('Deploying scorer from training')
training_run = NeomarilTrainingExecution(
training_id=training_id[0],
group=self.group,
exec_id=training_id[1],
login=self.credentials[0],
password=self.credentials[1],
url=self.base_url,
)
model_name = conf.get('name', training_run.execution_data.get('ExperimentName', ''))
if training_run.execution_data['TrainingType'] == 'Custom':
model = training_run.promote_model(
model_name=model_name,
model_reference=conf['score_function'],
source_file=os.path.join(PATH, conf['source']),
extra_files=[os.path.join(PATH,e) for e in extra_files] if extra_files else None,
env=os.path.join(PATH, conf['env']) if conf.get('env') else None,
schema=os.path.join(PATH, conf['schema']) if conf.get('schema') else None,
operation=conf['operation']
)
elif training_run.execution_data['TrainingType'] == 'AutoML':
model = training_run.promote_model(model_name=model_name, operation=conf['operation'])
else:
logger.info('Deploying scorer')
client = NeomarilModelClient(
login=self.credentials[0],
password=self.credentials[1],
url=self.base_url,
)
client.create_group(name=self.group, description=self.group)
model = client.create_model(
model_name=conf.get('name'),
model_reference=conf['score_function'],
source_file=os.path.join(PATH, conf['source']),
model_file=os.path.join(PATH, conf['model']),
requirements_file=os.path.join(PATH, conf['packages']),
extra_files=[os.path.join(PATH,e) for e in extra_files] if extra_files else None,
env=os.path.join(PATH, conf['env']) if conf.get('env') else None,
schema=os.path.join(PATH, conf['schema']) if conf.get('schema') else None,
operation=conf['operation'],
input_type=conf['input_type'],
group=self.group
)
while model.status == 'Building':
model.wait_ready()
if model.status == 'Deployed':
logger.info('Model deployement finished')
return model.model_id
else:
raise ModelError("Model deployement failed: "+ model.get_logs(routine='Host')[0])
[docs] def run_monitoring(self, *, training_exec_id:Optional[str]=None, model_id:Optional[str]=None):
"""
Run the monitoring process
Arguments
----------
training_exec_id : str, optional
The id for the training execution process that you want to monitore now
model_id : str, optional
Example
-------
>>> pipeline.run_monitoring('2', 'Mb29d61da4324a39a8bc2e0946f213b4959643916d354bf39940de2124f1e9d8')
"""
logger.info('Configuring monitoring')
conf = self.monitoring_config
PATH = conf['directory']
if training_exec_id:
with open(os.path.join(PATH, conf['config']), 'r+') as f:
conf_dict = json.load(f)
f.seek(0)
conf_dict['TrainData']['NeomarilTrainingExecution'] = training_exec_id
json.dump(conf_dict, f)
f.truncate()
model = NeomarilModel(
model_id=model_id,
login=self.credentials[0],
password=self.credentials[1],
group=self.group,
group_token=os.getenv('NEOMARIL_GROUP_TOKEN'),
url=self.base_url,
)
model.register_monitoring(
preprocess_reference=conf['preprocess_function'],
shap_reference=conf['shap_function'],
configuration_file=os.path.join(PATH, conf['config']),
preprocess_file=os.path.join(PATH, conf['preprocess']),
requirements_file=(os.path.join(PATH, conf['packages']) if conf.get('packages') else None)
)
[docs] def start(self):
"""
Start the pipeline for the model orchestration
Raises
------
PipelineError
Cannot start pipeline without configuration
Example
-------
>>> pipeline = NeomarilPipeline.from_config_file('./samples/pipeline.yml').start()
"""
if (not self.train_config) and (not self.deploy_config) and (not self.monitoring_config):
raise PipelineError("Cannot start pipeline without configuration")
if self.train_config:
training_id = self.run_training()
else:
training_id = None
if self.deploy_config:
model_id = self.run_deploy(training_id=training_id)
else:
model_id = None
if self.monitoring_config:
self.run_monitoring(training_exec_id=(training_id[1] if training_id else None), model_id=model_id)