Source code for neomaril_codex.base

import os
from datetime import datetime, timedelta
from time import sleep
from typing import Optional

import requests
from dotenv import load_dotenv
from loguru import logger

from neomaril_codex.__utils import *
from neomaril_codex.exceptions import *


[docs]class BaseNeomaril: """ Super base class to initialize other variables and URLs for other Neomaril classes. """ def __init__( self, *, login: Optional[str] = None, password: Optional[str] = None, url: str = "https://neomaril.staging.datarisk.net/", ) -> None: self.base_url = url load_dotenv() logger.info("Loading .env") self.credentials = ( login if login else os.getenv("NEOMARIL_USER"), password if password else os.getenv("NEOMARIL_PASSWORD"), ) self.base_url = url if url else os.getenv("NEOMARIL_URL") self.base_url = parse_url(self.base_url) if self.base_url == "https://neomaril.staging.datarisk.net/": logger.info( "You are using the test enviroment that will have the data cleaned from time to time. If your model is ready to use change the enviroment to Production" ) self.user_token, self.version = try_login( self.credentials[0], self.credentials[1], self.base_url, ) logger.info(f"Successfully connected to Neomaril") def _logs( self, *, url, credentials, start: Optional[str] = None, end: Optional[str] = None, routine: Optional[str] = None, type: Optional[str] = None, ): if not start and not end: end = datetime.today().strftime("%d-%m-%Y") start = (datetime.today() - timedelta(days=6)).strftime("%d-%m-%Y") if not start and end: start = (datetime.strptime(end, "%d-%m-%Y") - timedelta(days=6)).strftime( "%d-%m-%Y" ) if start and not end: end = (datetime.strptime(start, "%d-%m-%Y") + timedelta(days=6)).strftime( "%d-%m-%Y" ) query = {"start": start, "end": end} if routine: assert routine in ["Run", "Host"] query["routine"] = routine if type: assert type in ["Ok", "Error", "Debug", "Warning"] query["type"] = type response = requests.get( url, params=query, headers={ "Authorization": "Bearer " + refresh_token(*credentials, self.base_url) }, ) if response.status_code == 200: return response.json() elif response.status_code == 401: logger.error(response.text) raise AuthenticationError("Login not authorized") elif response.status_code >= 500: logger.error(response.text) raise ServerError("Unexpected server error:") else: logger.error(response.text) raise InputError("Bad Input. Client error")
[docs]class BaseNeomarilClient(BaseNeomaril): """ Base class for Neomaril client side related classes. This is the class that contains some methods related to Client models administration. Mainly related to initialize environment and its variables, but also to generate groups. A group is a way to organize models clustering for different users and also to increase security. Each group has a unique token that should be used to run the models that belongs to that. Attributes ---------- login : str Login for authenticating with the client. You can also use the env variable NEOMARIL_USER to set this password : str Password for authenticating with the client. You can also use the env variable NEOMARIL_PASSWORD to set this url : str URL to Neomaril Server. Default value is https://neomaril.staging.datarisk.net, use it to test your deployment first before changing to production. You can also use the env variable NEOMARIL_URL to set this Raises ------ NotImplementedError When the environment is production, becase itis not implemented yet Example ------- In this example you can see how to create a group and after consult the list of groups that already exists. .. code-block:: python from neomaril_codex.base import BaseNeomarilClient def start_group(password): client = BaseNeomarilClient(password) isCreated = client.create_group('ex_group', 'Group for example purpose') print(client.list_groups()) return isCreated """
[docs] def list_groups(self) -> list: """ List all existing groups. Raises ------ ServerError Unexpected server error Returns ------- list List with the groups that exists in the database """ url = f"{self.base_url}/groups" token = refresh_token(*self.credentials, self.base_url) response = requests.get(url, headers={"Authorization": "Bearer " + token}) if response.status_code == 200: results = response.json()["Results"] return results elif response.status_code == 401: logger.error(response.text) raise AuthenticationError("Login not authorized") elif response.status_code >= 500: logger.error(response.text) raise ServerError("Server Error") else: logger.error(response.text) raise InputError("Bad Input. Client error")
[docs] def create_group(self, *, name: str, description: str) -> bool: """ Create a group for multiple models of the same final client at the end if it returns TRUE, a message with the token for that group will be returned as a INFO message. You should keep this token information to be able to run the model of that group afterwards. Arguments --------- name : str Name of the group. Must be 32 characters long and with no special characters (some parsing will be made) description : str Short description of the group Raises ------ ServerError Unexpected server error Returns ------- bool Returns True if the group was successfully created and False if not """ data = {"name": name, "description": description} url = f"{self.base_url}/groups" token = refresh_token(*self.credentials, self.base_url) response = requests.post( url, data=data, headers={"Authorization": "Bearer " + token} ) if response.status_code == 201: logger.info( f"Group '{name}' inserted. Use the token for scoring. Carefully save it as we won't show it again." ) return response.json()["Token"] elif response.status_code == 400: logger.error(response.text) logger.error("Group already exist, nothing was changed.") return False elif response.status_code == 401: logger.error(response.text) raise AuthenticationError("Login not authorized") elif response.status_code >= 500: logger.error(response.text) raise ServerError("Server Error") else: logger.error(response.text) raise InputError("Bad Input. Client error")
[docs] def refresh_group_token(self, *, name: str, force: bool = False) -> bool: """ Refresh the group token. If the the token its still valid it wont be changed, unless you use parameter force = True. At the end a message with the token for that group will be returned as a INFO message. You should keep this new token information to be able to run the model of that group afterwards. Arguments --------- name : str Name of the group to have the token refreshed force : str Force token expiration even if its still valid (this can make multiple models integrations stop working, so use with care) Raises ------ ServerError Unexpected server error Returns ------- bool Returns True if the group was successfully created and False if not. Example -------- Supose that you lost the token to access your group, you can create a new one forcing it with this method as at the example below. .. code-block:: python from neomaril_codex.base import BaseNeomarilClient def update_group_token(model_client, group_name): model_client.refresh_group_token('ex_group', True) print(client.list_groups()) return isCreated """ url = f"{self.base_url}/groups/refresh/{name}" token = refresh_token(*self.credentials, self.base_url) response = requests.get( url, params={"force": str(force).lower()}, headers={"Authorization": "Bearer " + token}, ) if response.status_code == 201: logger.info(f"Group '{name}' was refreshed") return response.json()["Token"] elif response.status_code == 401: logger.error(response.text) raise AuthenticationError("Login not authorized") elif response.status_code >= 500: logger.error(response.text) raise ServerError("Server Error") else: logger.error(response.text) raise InputError("Bad Input. Client error")
[docs]class NeomarilExecution(BaseNeomaril): """ Base class for Neomaril asynchronous model executions. With this class you can visualize the status of an execution and download the results after and execution has finished. Attributes ---------- parend_id : str Model id (hash) from the model you want to access exec_type : str Flag that contains which type of execution you use. It can be 'AsyncModel' or 'Training' group : str, optional Group the model is inserted exec_id : str, optional Execution id login : str Login for authenticating with the client. You can also use the env variable NEOMARIL_USER to set this password : str Password for authenticating with the client. You can also use the env variable NEOMARIL_PASSWORD to set this url : str URL to Neomaril Server. Default value is https://neomaril.staging.datarisk.net, use it to test your deployment first before changing to production. You can also use the env variable NEOMARIL_URL to set this Raises ------ InputError Invalid execution type ModelError If the exection id was not found or wasn't possible to retrive it Example ------- In this example you can see how to get the status of an existing execution and download its results .. code-block:: python from neomaril_codex.base import NeomarilExecution from neomaril_codex.model import NeomarilModelClient def get_execution_status(password, data_path): client = BaseNeomarilClient(password) model = client.create_model('Example notebook Async', 'score', data_path+'app.py', data_path+'model.pkl', data_path+'requirements.txt', python_version='3.9', operation="Async", input_type='csv' ) execution = model.predict(data_path+'input.csv') execution.get_status() execution.download_result() """ def __init__( self, *, parent_id: str, exec_type: str, group: Optional[str] = None, exec_id: Optional[str] = None, login: Optional[str] = None, password: Optional[str] = None, url: str = None, group_token: Optional[str] = None, ) -> None: super().__init__(login=login, password=password, url=url) load_dotenv() logger.info("Loading .env") self.exec_type = exec_type self.exec_id = exec_id self.status = "Requested" self.group = group self.__token = group_token if group_token else os.getenv("NEOMARIL_GROUP_TOKEN") if exec_type == "AsyncModel": self.__url_path = "model/async" elif exec_type == "Training": self.__url_path = "training" elif exec_type == "AsyncPreprocessing": self.__url_path = "preprocessing/async" else: raise InputError( f"Invalid execution type '{exec_type}'. Valid options are 'AsyncModel' and 'Training'" ) if exec_type == "AsyncPreprocessing": # CHANGEME when add describe execution for preprocessing self.execution_data = {} self.status = "Running" else: url = f"{self.base_url}/{self.__url_path.replace('/async', '')}/describe/{group}/{parent_id}/{exec_id}" response = requests.get( url, headers={ "Authorization": "Bearer " + refresh_token(*self.credentials, self.base_url) }, ) if response.status_code == 401: logger.error(response.text) raise AuthenticationError("Login not authorized") elif response.status_code == 404: logger.error(f'Unable to retrive execution "{exec_id}"\n{response.text}') raise ModelError(f'Execution "{exec_id}" not found.') elif response.status_code >= 500: logger.error(response.text) raise ServerError("Server Error") self.execution_data = response.json()["Description"] self.status = self.execution_data["ExecutionState"] def __repr__(self) -> str: return f"""Neomaril{self.exec_type}Execution(exec_id="{self.exec_id}", status="{self.status}")""" def __str__(self): return f'NEOMARIL {self.exec_type }Execution :{self.exec_id} (Status: {self.status})"'
[docs] def get_status(self) -> dict: """ Gets the status of the related execution. Raises ------ ExecutionError Execution unavailable Returns ------- dict Returns the execution status. """ url = f"{self.base_url}/{self.__url_path}/status/{self.group}/{self.exec_id}" response = requests.get( url, headers={"Authorization": "Bearer " + self.__token} ) if response.status_code not in [200, 410]: logger.error(response.text) raise ExecutionError(f'Execution "{self.exec_id}" unavailable') result = response.json() self.status = result["Status"] self.execution_data["ExecutionState"] = result["Status"] return result
[docs] def wait_ready(self): """ Waits the execution until is no longer running Example ------- >>> model.wait_ready() """ if self.status in ["Requested", "Running"]: self.status = self.get_status()["Status"] while self.status == "Running": sleep(30) self.status = self.get_status()["Status"]
[docs] def download_result( self, *, path: Optional[str] = "./", filename: Optional[str] = "output.zip" ) -> dict: """ Gets the output of the execution. Arguments --------- path : str Path of the result file. Default value is './' filename : str Name of the result file. Default value is 'output.zip' Raises ------ ExecutionError Execution unavailable Returns ------- dict Returns the path for the result file. """ if self.status in ["Running", "Requested"]: self.status = self.get_status()["Status"] if self.exec_type in ["AsyncModel", "AsyncPreprocessing"]: token = self.__token elif self.exec_type == "Training": token = refresh_token(*self.credentials, self.base_url) if self.status == "Succeeded": url = ( f"{self.base_url}/{self.__url_path}/result/{self.group}/{self.exec_id}" ) response = requests.get(url, headers={"Authorization": "Bearer " + token}) if response.status_code not in [200, 410]: logger.error(response.text) raise ExecutionError(f'Execution "{self.exec_id}" unavailable') if not path.endswith("/"): filename = "/" + filename with open(path + filename, "wb") as f: f.write(response.content) logger.info(f"Output saved in {path+filename}") elif self.status == "Failed": raise ExecutionError("Execution failed") else: logger.info(f"Execution not ready. Status is {self.status}")