Source code for neomaril_codex.model

#!/usr/bin/env python
# coding: utf-8

import io
import json
import os
from time import sleep
from typing import Optional, Union

import requests

from neomaril_codex.__utils import *
from neomaril_codex.base import *
from neomaril_codex.datasources import NeomarilDataset
from neomaril_codex.exceptions import *
from neomaril_codex.preprocessing import *

from neomaril_codex.__model_states import ModelState


[docs]class NeomarilModel(BaseNeomaril): """ Class to manage Models deployed inside Neomaril Attributes ---------- login : str Login for authenticating with the client. You can also use the env variable NEOMARIL_USER to set this password : str Password for authenticating with the client. You can also use the env variable NEOMARIL_PASSWORD to set this model_id : str Model id (hash) from the model you want to access group : str Group the model is inserted. Default is 'datarisk' (public group) group_token : str Token for executing the model (show when creating a group). It can be informed when getting the model or when running predictions, or using the env variable NEOMARIL_GROUP_TOKEN url : str URL to Neomaril Server. Default value is https://neomaril.staging.datarisk.net, use it to test your deployment first before changing to production. You can also use the env variable NEOMARIL_URL to set this docs : str URL for the model Swagger page Raises ------ ModelError When the model can't be acessed in the server AuthenticationError Unvalid credentials Example -------- Getting a model, testing its health and running the prediction .. code-block:: python from neomaril_codex.model import NeomarilModelClient from neomaril_codex.model import NeomarilModel client = NeomarilModelClient('123456') model = client.get_model(model_id='M9c3af308c754ee7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4', group='ex_group') if model.health() = 'OK': model.wait_ready() model.predict(model.schema) else: model.restart_model(False) model.wait_ready() model.predict(model.schema) """ def __init__( self, *, model_id: str, login: Optional[str] = None, password: Optional[str] = None, group: str = "datarisk", group_token: Optional[str] = None, url: str = "https://neomaril.staging.datarisk.net/", ) -> None: super().__init__(login=login, password=password, url=url) self.model_id = model_id self.group = group self.__token = group_token if group_token else os.getenv("NEOMARIL_GROUP_TOKEN") url = f"{self.base_url}/model/describe/{self.group}/{self.model_id}" response = requests.get( url, headers={ "Authorization": "Bearer " + refresh_token(*self.credentials, self.base_url) }, ) if response.status_code == 401: logger.error(response.text) raise AuthenticationError("Login not authorized") elif response.status_code == 404: logger.error(response.text) raise ModelError(f'Model "{model_id}" not found.') elif response.status_code >= 500: logger.error(response.text) raise ServerError("Server Error") self.model_data = response.json()["Description"] self.name = self.model_data["Name"] self.status = ModelState[self.model_data["Status"]] self.operation = self.model_data["Operation"].lower() self.docs = ( f"{self.base_url}/model/{self.operation}/docs/{self.group}/{self.model_id}" ) self.__model_ready = self.status == ModelState.Deployed def __repr__(self) -> str: status = self.__get_status() return f"""NeomarilModel(name="{self.name}", group="{self.group}", status="{status}", model_id="{self.model_id}", operation="{self.operation.title()}", )""" def __str__(self): return ( f'NEOMARIL model "{self.name} (Group: {self.group}, Id: {self.model_id})"' ) def __get_status(self): """ Gets the status of the model. Raises ------- ModelError Execution unavailable Returns ------- str The model status """ url = f"{self.base_url}/model/status/{self.group}/{self.model_id}" response = requests.get( url, headers={ "Authorization": "Bearer " + refresh_token(*self.credentials, self.base_url) }, ) if response.status_code == 200: return ModelState[response.json().get("Status")] if response.status_code == 401: logger.error(response.text) raise AuthenticationError("Login not authorized") if response.status_code >= 500: logger.error(response.text) raise ServerError("Server Error") logger.error(response.text) raise ModelError("Could not get the status of the model")
[docs] def wait_ready(self): """ Waits the model to be with status 'Deployed' Example ------- >>> model.wait_ready() """ if self.status in [ModelState.Ready, ModelState.Building]: self.status = self.__get_status() while self.status == ModelState.Building: sleep(30) self.status = self.__get_status()
[docs] def health(self) -> str: """ Get the model deployment process health state. Returns ------- str OK - if the it is possible to get the health state NOK - if an exception occurs Example ------- >>> model.health() 'OK' """ if self.operation == "async": try: try_login( *self.credentials, self.base_url, ) return "OK" except Exception as e: logger.error("Server error: " + e) return "NOK" elif self.operation == "sync": url = f"{self.base_url}/model/sync/health/{self.group}/{self.model_id}" response = requests.get( url, headers={"Authorization": "Bearer " + self.__token} ) if response.status_code == 200: return response.json()["Message"] elif response.status_code == 401: logger.error(response.text) raise AuthenticationError("Login not authorized") elif response.status_code >= 500: logger.error(response.text) raise ServerError("Server Error") else: logger.error(response.text) raise ModelError("Could not get the health of the model")
[docs] def restart_model(self, *, wait_for_ready: bool = True): """ Restart a model deployment process health state. Be sure your model is one of these states: - Deployed; - Disabled; - DisabledRecovery; - FailedRecovery. Parameters ----------- wait_for_ready : bool If the model is being deployed, wait for it to be ready instead of failing the request. Defaults to True Example ------- >>> model.restart_model() """ url = f"{self.base_url}/model/restart/{self.group}/{self.model_id}" response = requests.get( url, headers={ "Authorization": "Bearer " + refresh_token(*self.credentials, self.base_url) }, ) if response.status_code == 401: logger.error(response.text) raise AuthenticationError("Login not authorized") if response.status_code >= 500: logger.error(response.text) raise ServerError("Server Error") if response.status_code != 200: logger.error(response.text) raise ModelError("Could not restart the model") logger.info("Model is restarting") self.status = self.__get_status() if wait_for_ready: print("Waiting for deploy to be ready.", end="") while self.status == ModelState.Building: sleep(30) self.status = self.__get_status() print(".", end="", flush=True) print("Model is deployed", flush=True)
[docs] def get_logs( self, *, start: Optional[str] = None, end: Optional[str] = None, routine: Optional[str] = None, type: Optional[str] = None, ): """ Get the logs Parameters ----------- start : str, optional Date to start filter. At the format aaaa-mm-dd end : str, optional Date to end filter. At the format aaaa-mm-dd routine : str, optional Type of routine beeing executed, can assume values Host or Run type : str, optional Defines the type of the logs that are going to be filtered, can assume the values Ok, Error, Debug or Warning Raises ------ ServerError Unexpected server error Returns ------- json Logs list Example ------- >>> model.get_logs(start='2023-01-31', end='2023-02-24', routine='Run', type='Error') {'Results': [{'ModelHash': 'M9c3af308c754ee7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4', 'RegisteredAt': '2023-01-31T16:06:45.5955220Z', 'OutputType': 'Error', 'OutputData': '', 'Routine': 'Run'}] } """ url = f"{self.base_url}/model/logs/{self.group}/{self.model_id}" return self._logs( url=url, credentials=self.credentials, start=start, end=end, routine=routine, type=type, )
[docs] def delete(self): """ Deletes the current model. IMPORTANT! For now this is irreversible, if you want to use the model again later you will need to upload again (and it will have a new ID). Raises ------ ServerError Model deleting failed Returns ------- str If model is at status=Deployed deletes the model and return a json with his information. If it isn't Deployed it returns the message that the model is under another state Example ------- >>> model.delete() """ token = refresh_token(*self.credentials, self.base_url) req = requests.delete( f"{self.base_url}/model/delete/{self.group}/{self.model_id}", headers={"Authorization": "Bearer " + token}, ) if req.status_code == 401: logger.error(req.text) raise AuthenticationError("Login not authorized") if req.status_code >= 500: logger.error(req.text) raise ServerError("Server Error") if req.status_code != 200: logger.error(req.text) raise ModelError("Failed to delete model.") response = requests.get( f"{self.base_url}/model/describe/{self.group}/{self.model_id}", headers={"Authorization": "Bearer " + token}, ) self.model_data = response.json()["Description"] self.status = ModelState[self.model_data["Status"]] self.__model_ready = False return req.json()
[docs] def disable(self): """ Disables a model. It means that you won't be able to perform some operations in the model Please, check with your team if you're allowed to perform this operation Raises ------ ServerError Model deleting failed Returns ------- str status=Deployed: disables the model and return a json. If it isn't Deployed it returns the message that the model is under another state Example ------- >>> model.disable() """ token = refresh_token(*self.credentials, self.base_url) req = requests.post( f"{self.base_url}/model/disable/{self.group}/{self.model_id}", headers={"Authorization": "Bearer " + token}, ) if req.status_code == 401: logger.error(req.text) raise AuthenticationError("Login not authorized") if req.status_code >= 500: logger.error(req.text) raise ServerError("Server Error") if req.status_code != 200: logger.error(req.text) raise ModelError("Failed to delete model.") response = requests.get( f"{self.base_url}/model/describe/{self.group}/{self.model_id}", headers={"Authorization": "Bearer " + token}, ) self.model_data = response.json()["Description"] self.status = ModelState[self.model_data["Status"]] self.__model_ready = False print(f"The model {self.model_id} was disabled") return req.json()
[docs] def set_token(self, group_token: str) -> None: """ Saves the group token for this model instance. Arguments --------- group_token : str Token for executing the model (show when creating a group). You can set this using the NEOMARIL_GROUP_TOKEN env variable Example ------- >>> model.set_token('6cb64889a45a45ea8749881e30c136df') """ self.__token = group_token logger.info(f"Token for group {self.group} added.")
[docs] def predict( self, *, data: Optional[Union[dict, str, NeomarilExecution]] = None, dataset: Union[str, NeomarilDataset] = None, preprocessing: Optional[NeomarilPreprocessing] = None, group_token: Optional[str] = None, wait_complete: Optional[bool] = False, ) -> Union[dict, NeomarilExecution]: """ Runs a prediction from the current model. Arguments --------- data : Union[dict, str] The same data that is used in the source file. If Sync is a dict, the keys that are needed inside this dict are the ones in the `schema` atribute. If Async is a string with the file path with the same filename used in the source file. group_token : str, optional Token for executing the model (show when creating a group). It can be informed when getting the model or when running predictions, or using the env variable NEOMARIL_GROUP_TOKEN wait_complete: bool, optional Boolean that informs if a model training is completed (True) or not (False). Default value is False Raises ------ ModelError Model is not available InputError Model requires a dataset or a data input Returns ------- Union[dict, NeomarilExecution] The return of the scoring function in the source file for Sync models or the execution class for Async models. """ if not (data or dataset): raise InputError( "Invalid data input. Run training requires a data or dataset" ) if self.__model_ready: if (group_token is not None) | (self.__token is not None): url = f"{self.base_url}/model/{self.operation}/run/{self.group}/{self.model_id}" if self.__token and not group_token: group_token = self.__token if group_token and not self.__token: self.__token = group_token if self.operation == "sync": model_input = {"Input": data} if preprocessing: model_input["ScriptHash"] = preprocessing.preprocessing_id req = requests.post( url, data=json.dumps(model_input), headers={"Authorization": "Bearer " + group_token}, ) return req.json() elif self.operation == "async": if preprocessing: if preprocessing.operation == "async": preprocessing.set_token(group_token) pre_run = preprocessing.run(data=data) pre_run.wait_ready() if pre_run.status != "Succeeded": logger.error( "Preprocessing failed, we wont send any data to it" ) logger.info("Returning Preprocessing run instead.") return pre_run data = "./result_preprocessing" pre_run.download_result( path="./", filename="result_preprocessing" ) else: raise PreprocessingError( "Can only use async preprocessing with async models" ) form_data = {} if data: files = [("input", (data.split("/")[-1], open(data, "rb")))] elif dataset: dataset_hash = ( dataset if isinstance(dataset, str) else dataset.dataset_hash ) form_data["dataset_hash"] = dataset_hash req = requests.post( url, files=files, data=form_data, headers={"Authorization": "Bearer " + group_token}, ) if req.status_code == 202: message = req.json() logger.info(message["Message"]) exec_id = message["ExecutionId"] run = NeomarilExecution( parent_id=self.model_id, exec_type="AsyncModel", group=self.group, exec_id=exec_id, login=self.credentials[0], password=self.credentials[1], url=self.base_url, group_token=group_token, ) response = run.get_status() status = response["Status"] if wait_complete: print("Waiting the training run.", end="") while status in ["Running", "Requested"]: sleep(30) print(".", end="", flush=True) response = run.get_status() status = response["Status"] if status == "Failed": logger.error(response["Message"]) raise ExecutionError("Training execution failed") return run elif req.status_code >= 500: raise ServerError("Unexpected server error: ", req.text) else: logger.error(req.text) raise ServerError("Server Error") else: raise InputError("Group token not informed") else: url = f"{self.base_url}/model/describe/{self.group}/{self.model_id}" response = requests.get( url, headers={ "Authorization": "Bearer " + refresh_token(*self.credentials, self.base_url) }, ).json()["Description"] if response["Status"] == "Deployed": self.model_data = response self.status = ModelState[response["Status"]] self.__model_ready = True return self.predict( data=data, dataset=dataset, preprocessing=preprocessing, group_token=group_token, wait_complete=wait_complete, ) else: raise ModelError("Model is not available to predictions")
[docs] def generate_predict_code(self, *, language: str = "curl") -> str: """ Generates predict code for the model to be used outside Neomaril Codex Arguments --------- language : str The generated code language. Supported languages are 'curl', 'python' or 'javascript' Raises ------ InputError Unsupported language Returns ------- str The generated code. """ if language not in ["curl", "python", "javascript"]: raise InputError("Suported languages are curl, python or javascript") if self.operation == "sync": payload = json.dumps({"Input": {"DATA": "DATA"}}) base_url = self.base_url if language == "curl": return f"""curl --request POST \\ --url {base_url}/model/sync/run/{self.group}/{self.model_id} \\ --header 'Authorization: Bearer TOKEN' \\ --header 'Content-Type: application/json' \\ --data '{payload}' """ if language == "python": return f""" import requests url = "{base_url}/model/sync/run/{self.group}/{self.model_id}" payload = {payload} headers = {{ "Content-Type": "application/json", "Authorization": "Bearer TOKEN" }} response = requests.request("POST", url, json=payload, headers=headers) print(response.text) """ if language == "javascript": return f""" const options = {{ method: 'POST', headers: {{'Content-Type': 'application/json', Authorization: 'Bearer TOKEN'}}, body: '{payload}' }}; fetch('{base_url}/model/sync/run/{self.group}/{self.model_id}', options) .then(response => response.json()) .then(response => console.log(response)) .catch(err => console.error(err)); """ if self.operation == "async": if language == "curl": return f""" curl --request POST \ --url {self.base_url}/model/async/run/{self.group}/{self.model_id} \\ --header 'Authorization: Bearer TOKEN' \\ --header 'Content-Type: multipart/form-data' \\ --form "input=@/path/to/file" """ if language == "python": return f""" import requests url = "{self.base_url}/model/async/run/{self.group}/{self.model_id}" upload_data = [ ("input", ('filename', open('/path/to/file', 'rb'))), ] headers = {{ "Content-Type": "multipart/form-data", "Authorization": "Bearer TOKEN" }} response = requests.request("POST", url, files=upload_data, headers=headers) print(response.text) """ if language == "javascript": return f""" const form = new FormData(); form.append("input", "/path/to/file"); const options = {{ method: 'POST', headers: {{ 'Content-Type': 'multipart/form-data', Authorization: 'Bearer TOKEN' }} }}; options.body = form; fetch('{self.base_url}/model/async/run/{self.group}/{self.model_id}', options) .then(response => response.json()) .then(response => console.log(response)) .catch(err => console.error(err)); """
def __call__(self, data: dict) -> dict: return self.predict(data=data)
[docs] def get_model_execution(self, exec_id: str) -> None: """ Get a execution instace for that model. Arguments --------- exec_id : str Execution id Raises ------ ModelError If the user tries to get a execution from a Sync model Example ------- >>> model.get_model_execution('1') """ if self.operation == "async": run = NeomarilExecution( parent_id=self.model_id, exec_type="AsyncModel", group=self.group, exec_id=exec_id, login=self.credentials[0], password=self.credentials[1], url=self.base_url, group_token=self.__token, ) run.get_status() return run else: raise ModelError("Sync models don't have executions")
def __host_monitoring_status(self, *, group: str, model_id: str): """ Get the host status for the monitoring configuration Arguments --------- group : str Group the model is inserted. Default is 'datarisk' (public group) model_id : str The uploaded model id (hash) Raises ------ ExecutionError Monitoring host failed ServerError Unexpected server error """ url = f"{self.base_url}/monitoring/status/{group}/{model_id}" response = requests.get( url, headers={ "Authorization": "Bearer " + refresh_token(*self.credentials, self.base_url) }, ) if response.status_code == 200: message = response.json() status = message["Status"] if status == "Validating": logger.info("Waiting the monitoring host.") sleep(30) self.__host_monitoring_status( group=group, model_id=model_id ) # recursive if status == "Validated": logger.info(f'Model monitoring host validated - Hash: "{model_id}"') if status == "Invalidated": res_message = message["Message"] logger.error(f"Model monitoring host message: {res_message}") raise ExecutionError("Monitoring host failed") elif response.status_code == 401: logger.error(response.text) raise AuthenticationError("Login not authorized") elif response.status_code >= 500: logger.error(response.text) raise ServerError("Server Error") else: logger.error(response.text) raise ModelError("Could not get host monitoring status") def __host_monitoring(self, *, group: str, model_id: str): """ Host the monitoring configuration Arguments --------- group : str Group the model is inserted. Default is 'datarisk' (public group) model_id : str The uploaded model id (hash) Raises ------ InputError Monitoring host error """ url = f"{self.base_url}/monitoring/host/{group}/{model_id}" response = requests.get( url, headers={ "Authorization": "Bearer " + refresh_token(*self.credentials, self.base_url) }, ) if response.status_code == 200: logger.info(f'Model monitoring host started - Hash: "{model_id}"') elif response.status_code == 401: logger.error(response.text) raise AuthenticationError("Login not authorized") elif response.status_code >= 500: logger.error(response.text) raise ServerError("Server Error") else: logger.error("Model monitoring host error: " + response.text) raise InputError("Monitoring host error")
[docs] def register_monitoring( self, *, preprocess_reference: str, shap_reference: str, configuration_file: Union[str, dict], preprocess_file: Optional[str] = None, requirements_file: Optional[str] = None, ) -> str: """ Register the model monitoring configuration at the database Arguments --------- preprocess_reference : str Name of the preprocess reference shap_reference : str Name of the preprocess function configuration_file : str or dict Path of the configuration file, but it could be a dict preprocess_file : str, optional Path of the preprocess script requirements_file : str Path of the requirements file Raises ------ InputError Invalid parameters for model creation Returns ------- str Model id (hash) Example ------- >>> model.register_monitoring('parse', 'get_shap', configuration_file=PATH+'configuration.json', preprocess_file=PATH+'preprocess.py', requirements_file=PATH+'requirements.txt') """ url = f"{self.base_url}/monitoring/register/{self.group}/{self.model_id}" if isinstance(configuration_file, str): conf = open(configuration_file, "rb") elif isinstance(configuration_file, dict): conf = json.dumps(configuration_file) upload_data = [ ("configuration", ('configuration.json', conf)), ] form_data = { "preprocess_reference": preprocess_reference, "shap_reference": shap_reference, } if preprocess_file: upload_data.append( ( "source", ( "preprocess." + preprocess_file.split(".")[-1], open(preprocess_file, "rb"), ), ) ) if preprocess_file.endswith("py"): form_data["type"] = "PythonScript" elif preprocess_file.endswith("ipynb"): form_data["type"] = "PythonNotebook" else: form_data["type"] = "ModelScript" if requirements_file: upload_data.append( ("requirements", ("requirements.txt", open(requirements_file, "rb"))) ) response = requests.post( url, data=form_data, files=upload_data, headers={ "Authorization": "Bearer " + refresh_token(*self.credentials, self.base_url) }, ) if response.status_code == 201: data = response.json() model_id = data["ModelHash"] logger.info(f'{data["Message"]} - Hash: "{model_id}"') self.__host_monitoring(group=self.group, model_id=model_id) self.__host_monitoring_status(group=self.group, model_id=model_id) return model_id elif response.status_code == 401: logger.error(response.text) raise AuthenticationError("Login not authorized") elif response.status_code >= 500: logger.error(response.text) raise ServerError("Server Error") else: logger.error("Upload error: " + response.text) raise InputError("Invalid parameters for model creation")
[docs]class NeomarilModelClient(BaseNeomarilClient): """ Class for client to access Neomaril and manage models Attributes ---------- login : str Login for authenticating with the client. You can also use the env variable NEOMARIL_USER to set this password : str Password for authenticating with the client. You can also use the env variable NEOMARIL_PASSWORD to set this url : str URL to Neomaril Server. Default value is https://neomaril.staging.datarisk.net, use it to test your deployment first before changing to production. You can also use the env variable NEOMARIL_URL to set this Raises ------ AuthenticationError Unvalid credentials ServerError Server unavailable Example -------- Example 1: Creation and managing a Synchronous Model .. code-block:: python from neomaril_codex.model import NeomarilModelClient from neomaril_codex.model import NeomarilModel def new_sync_model(client, group, data_path): model = client.create_model('Model Example Sync', 'score', data_path+'app.py', data_path+'model.pkl', data_path+'requirements.txt', data_path+'schema.json', group=group, operation="Sync" ) model.register_monitoring('parse', 'get_shap', configuration_file=data_path+'configuration.json', preprocess_file=data_path+'preprocess.py', requirements_file=data_path+'requirements.txt' ) return model.model_id client = NeomarilModelClient('123456') client.create_group('ex_group', 'Group for example purpose') data_path = './samples/syncModel/' model_id = new_sync_model(client, 'ex_group', data_path) model_list = client.search_models() print(model_list) model = client.get_model(model_id, 'ex_group') print(model.health()) model.wait_ready() model.predict(model.schema) print(model.get_logs(routine='Run')) Example 2: creation and deployment of a Asynchronous Model .. code-block:: python from neomaril_codex.model import NeomarilModelClient from neomaril_codex.model import NeomarilModel def new_async_model(client, group, data_path): model = client.create_model('Teste notebook Async', 'score', data_path+'app.py', data_path+'model.pkl', data_path+'requirements.txt', group=group, python_version='3.9', operation="Async", input_type='csv' ) return model.model_id def run_model(client, model_id, data_path): model = client.get_model(model_id, 'ex_group') execution = model.predict(data_path+'input.csv') return execution client = NeomarilModelClient('123456') client.create_group('ex_group', 'Group for example purpose') data_path = './samples/asyncModel/' model_id = new_async_model(client, 'ex_group', data_path) execution = run_model(client, model_id, data_path) execution.get_status() execution.download_result() """ def __repr__(self) -> str: return f'API version {self.version} - NeomarilModelClient(url="{self.base_url}", Token="{self.user_token}")' def __str__(self): return f"NEOMARIL {self.base_url} Model client:{self.user_token}" def __get_model_status(self, model_id: str, group: str) -> dict: """ Gets the status of the model with the hash equal to `model_id` Parameters ---------- group : str Group the model is inserted model_id : str Model id (hash) from the model being searched Raises ------ ModelError Model unavailable Returns ------- dict The model status and a message if the status is 'Failed' """ url = f"{self.base_url}/model/status/{group}/{model_id}" response = requests.get( url, headers={ "Authorization": "Bearer " + refresh_token(*self.credentials, self.base_url) }, ) if response.status_code not in [200, 410]: raise ModelError(f'Model "{model_id}" not found') return response.json()
[docs] def get_model( self, *, model_id: str, group: str = "datarisk", group_token: Optional[str] = None, wait_for_ready: bool = True, ) -> NeomarilModel: """ Acess a model using its id Arguments --------- model_id : str Model id (hash) that needs to be acessed group : str Group the model is inserted. Default is 'datarisk' (public group) group_token : str, optional Token for executing the model (show when creating a group). It can be informed when getting the model or when running predictions, or using the env variable NEOMARIL_GROUP_TOKEN wait_for_ready : bool If the model is being deployed, wait for it to be ready instead of failing the request. Defaults to True Raises ------ ModelError Model unavailable ServerError Unknown return from server Returns ------- NeomarilModel A NeomarilModel instance with the model hash from `model_id` Example ------- >>> model.get_model(model_id='M9c3af308c754ee7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4', group='ex_group') """ try: response = self.__get_model_status(model_id, group) except KeyError: raise ModelError("Model not found") status = response["Status"] if status == "Building": if wait_for_ready: print("Waiting for deploy to be ready.", end="") while status == "Building": response = self.__get_model_status(model_id, group) status = response["Status"] print(".", end="", flush=True) sleep(10) else: logger.info("Returning model, but model is not ready.") NeomarilModel( model_id=model_id, login=self.credentials[0], password=self.credentials[1], group=group, url=self.base_url, group_token=group_token, ) if status in ["Disabled", "Ready"]: raise ModelError( f'Model "{model_id}" unavailable (disabled or deploy process is incomplete)' ) elif status == "Failed": logger.error(str(response["Message"])) raise ModelError( f'Model "{model_id}" deploy failed, so model is unavailable.' ) elif status == "Deployed": logger.info(f"Model {model_id} its deployed. Fetching model.") return NeomarilModel( model_id=model_id, login=self.credentials[0], password=self.credentials[1], group=group, url=self.base_url, group_token=group_token, ) else: raise ServerError("Unknown model status: ", status)
[docs] def search_models( self, *, name: Optional[str] = None, state: Optional[str] = None, group: Optional[str] = None, only_deployed: bool = False, ) -> list: """ Search for models using the name of the model Arguments --------- name : str, optional Text that its expected to be on the model name. It runs similar to a LIKE query on SQL state : str, optional Text that its expected to be on the state. It runs similar to a LIKE query on SQL group : str, optional Text that its expected to be on the group name. It runs similar to a LIKE query on SQL only_deployed : bool, optional If its True, filter only models ready to be used (status == "Deployed"). Defaults to False Raises ------ ServerError Unexpected server error Returns ------- list List with the models data, it can works like a filter depending on the arguments values Example ------- >>> client.search_models(group='ex_group', only_deployed=True) """ url = f"{self.base_url}/model/search" query = {} if name: query["name"] = name if state: query["state"] = state if group: query["group"] = group if only_deployed: query["state"] = "Deployed" response = requests.get( url, params=query, headers={ "Authorization": "Bearer " + refresh_token(*self.credentials, self.base_url) }, ) if response.status_code == 200: results = response.json()["Results"] parsed_results = [] for r in results: if schema := r.get("Schema"): r["Schema"] = json.loads(schema) parsed_results.append(r) return [NeomarilModel( model_id=m['ModelHash'], login=self.credentials[0], password=self.credentials[1], group=m['Group'], url=self.base_url) for m in parsed_results] elif response.status_code == 401: logger.error(response.text) raise AuthenticationError("Login not authorized") elif response.status_code >= 500: logger.error(response.text) raise ServerError("Server Error") else: logger.error(response.text) raise ModelError("Could not search the model")
[docs] def get_logs( self, *, model_id, start: Optional[str] = None, end: Optional[str] = None, routine: Optional[str] = None, type: Optional[str] = None, ): """ Get the logs Parameters ---------- model_id : str Model id (hash) start : str, optional Date to start filter. At the format aaaa-mm-dd end : str, optional Date to end filter. At the format aaaa-mm-dd routine : str, optional Type of routine being executed, can assume values 'Host' (for deployment logs) or 'Run' (for execution logs) type : str, optional Defines the type of the logs that are going to be filtered, can assume the values 'Ok', 'Error', 'Debug' or 'Warning' Raises ------ ServerError Unexpected server error Returns ------- json Logs list Example ------- >>> model.get_logs(routine='Run') {'Results': [{'ModelHash': 'B4c3af308c3e452e7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4', 'RegisteredAt': '2023-02-03T16:06:45.5955220Z', 'OutputType': 'Ok', 'OutputData': '', 'Routine': 'Run'}] } """ url = f"{self.base_url}/model/logs/{model_id}" return self._logs( url=url, credentials=self.credentials, start=start, end=end, routine=routine, type=type, )
def __upload_model( self, *, model_name: str, model_reference: str, source_file: str, model_file: str, requirements_file: str, schema: Optional[Union[str, dict]] = None, group: Optional[str] = None, extra_files: Optional[list] = None, env: Optional[str] = None, python_version: str = "3.8", operation: str = "Sync", input_type: str = None, ) -> str: """ Upload the files to the server Arguments --------- model_name : str The name of the model, in less than 32 characters model_reference : str The name of the scoring function inside the source file source_file : str Path of the source file. The file must have a scoring function that accepts two parameters: data (data for the request body of the model) and model_path (absolute path of where the file is located) model_file : str Path of the model pkl file requirements_file : str Path of the requirements file. The packages versions must be fixed eg: pandas==1.0 schema : Union[str, dict], optional Path to a JSON or XML file with a sample of the input for the entrypoint function. A dict with the sample input can be send as well group : str, optional Group the model is inserted. If None the server uses 'datarisk' (public group) extra_files : list, optional A optional list with additional files paths that should be uploaded. If the scoring function refer to this file they will be on the same folder as the source file env : str, optional Flag that choose which environment (dev, staging, production) of Neomaril you are using. Default is True python_version : str, optional Python version for the model environment. Avaliable versions are 3.8, 3.8, 3.9, 3.10. Defaults to '3.8' operation : str Defines wich kind operation is beeing executed (Sync or Async). Default value is Sync input_type : str The type of the input file that should be 'json', 'csv', 'parquet', 'txt', 'xls', 'xlsx' Raises ------ InputError Some input parameters is invalid Returns ------- str The new model id (hash) """ url = f"{self.base_url}/model/upload/{group}" file_extesions = {"py": "script.py", "ipynb": "notebook.ipynb"} upload_data = [ ( "source", (file_extesions[source_file.split(".")[-1]], open(source_file, "rb")), ), ("model", (model_file.split("/")[-1], open(model_file, "rb"))), ("requirements", ("requirements.txt", open(requirements_file, "rb"))), ] if schema: upload_data.append(("schema", (schema, parse_dict_or_file(schema)))) else: raise InputError("Schema file is mandatory") if operation == "Sync": input_type = "json" else: if input_type == "json|csv|parquet": raise InputError("Choose a input type from " + input_type) if env: upload_data.append(("env", (".env", open(env, "r")))) if extra_files: extra_data = [ ("extra", (c.split("/")[-1], open(c, "rb"))) for c in extra_files ] upload_data += extra_data form_data = { "name": model_name, "model_reference": model_reference, "operation": operation, "input_type": input_type, "python_version": "Python" + python_version.replace(".", ""), } response = requests.post( url, data=form_data, files=upload_data, headers={ "Authorization": "Bearer " + refresh_token(*self.credentials, self.base_url) }, ) if response.status_code == 201: data = response.json() model_id = data["ModelHash"] logger.info(f'{data["Message"]} - Hash: "{model_id}"') return model_id elif response.status_code == 401: logger.error(response.text) raise AuthenticationError("Login not authorized") elif response.status_code >= 500: logger.error(response.text) raise ServerError("Server Error") else: logger.error("Upload error: " + response.text) raise InputError("Invalid parameters for model creation") def __host_model(self, *, operation: str, model_id: str, group: str) -> None: """ Builds the model execution environment Arguments --------- operation : str The model operation type (Sync or Async) model_id : str The uploaded model id (hash) group : str Group the model is inserted. Default is 'datarisk' (public group) Raises ------ InputError Some input parameters is invalid """ url = f"{self.base_url}/model/{operation}/host/{group}/{model_id}" response = requests.get( url, headers={ "Authorization": "Bearer " + refresh_token(*self.credentials, self.base_url) }, ) if response.status_code == 202: logger.info(f"Model host in process - Hash: {model_id}") elif response.status_code == 401: logger.error(response.text) raise AuthenticationError("Login not authorized") elif response.status_code >= 500: logger.error(response.text) raise ServerError("Server Error") else: logger.error(response.text) raise InputError("Invalid parameters for model creation")
[docs] def create_model( self, *, model_name: str, model_reference: str, source_file: str, model_file: str, requirements_file: str, schema: Optional[Union[str, dict]] = None, group: str = None, extra_files: Optional[list] = None, env: Optional[str] = None, python_version: str = "3.8", operation="Sync", input_type: str = "json|csv|parquet", wait_for_ready: bool = True, ) -> Union[NeomarilModel, str]: """ Deploy a new model to Neomaril. Arguments --------- model_name : str The name of the model, in less than 32 characters model_reference : str The name of the scoring function inside the source file source_file : str Path of the source file. The file must have a scoring function that accepts two parameters: data (data for the request body of the model) and model_path (absolute path of where the file is located) model_file : str Path of the model pkl file requirements_file : str Path of the requirements file. The packages versions must be fixed eg: pandas==1.0 schema : Union[str, dict] Path to a JSON or XML file with a sample of the input for the entrypoint function. A dict with the sample input can be send as well. Mandatory for Sync models group : str Group the model is inserted. Default to 'datarisk' (public group) extra_files : list, optional A optional list with additional files paths that should be uploaded. If the scoring function refer to this file they will be on the same folder as the source file env : str, optional .env file to be used in your model enviroment. This will be encrypted in the server. python_version : str, optional Python version for the model environment. Avaliable versions are 3.8, 3.9, 3.10. Defaults to '3.8' operation : str Defines wich kind operation is beeing executed (Sync or Async). Default value is Sync input_type : str The type of the input file that should be 'json', 'csv' or 'parquet' wait_for_ready : bool, optional Wait for model to be ready and returns a NeomarilModel instace with the new model. Defaults to True Raises ------ InputError Some input parameters is invalid Returns ------- Union[NeomarilModel, str] Returns the new model, if wait_for_ready=True runs the deploy process synchronously. If its False, returns nothing after sending all the data to server and runs the deploy asynchronously Example ------- >>> model = client.create_model('Model Example Sync', 'score', './samples/syncModel/app.py', './samples/syncModel/'model.pkl', './samples/syncModel/requirements.txt','./samples/syncModel/schema.json', group=group, operation="Sync") """ if python_version not in ["3.8", "3.9", "3.10"]: raise InputError( "Invalid python version. Avaliable versions are 3.8, 3.9, 3.10" ) if group: group = ( group.lower() .strip() .replace(" ", "_") .replace(".", "_") .replace("-", "_") ) groups = [g.get("Name") for g in self.list_groups()] if group not in groups: raise GroupError("Group dont exist. Create a group first.") else: group = "datarisk" logger.info("Group not informed, using default 'datarisk' group") model_id = self.__upload_model( model_name=model_name, model_reference=model_reference, source_file=source_file, model_file=model_file, requirements_file=requirements_file, schema=schema, group=group, extra_files=extra_files, python_version=python_version, env=env, operation=operation, input_type=input_type, ) self.__host_model(operation=operation.lower(), model_id=model_id, group=group) return self.get_model( model_id=model_id, group=group, wait_for_ready=wait_for_ready )
[docs] def get_model_execution( self, *, model_id: str, exec_id: str, group: Optional[str] = None ) -> NeomarilExecution: """ Get a execution instace (Async model only). Arguments --------- model_id : str Model id (hash) exec_id : str Execution id group : str, optional Group name, default value is None Returns ------- NeomarilExecution The new execution Example ------- >>> model.get_model_execution( model_id='M9c3af308c754ee7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4', exec_id = '1') """ return self.get_model(model_id=model_id, group=group).get_model_execution( exec_id )