Preprocessing module#
Module with all classes and methods to manage the Preprocessing scripts deployed at Neomaril.
neomaril_codex.preprocessing.NeomarilPreprocessing#
- class neomaril_codex.preprocessing.NeomarilPreprocessing(*, preprocessing_id: str, login: str | None = None, password: str | None = None, group: str = 'datarisk', group_token: str | None = None, url: str = 'https://neomaril.staging.datarisk.net/')[source]#
Bases:
BaseNeomaril
Class to manage Preprocessing scripts deployed inside Neomaril
- login#
Login for authenticating with the client. You can also use the env variable NEOMARIL_USER to set this
- Type:
str
- password#
Password for authenticating with the client. You can also use the env variable NEOMARIL_PASSWORD to set this
- Type:
str
- preprocessing_id#
Preprocessing script id (hash) from the script you want to access
- Type:
str
- group#
Group the model is inserted. Default is ‘datarisk’ (public group)
- Type:
str
- base_url#
URL to Neomaril Server. Default value is https://neomaril.staging.datarisk.net, use it to test your deployment first before changing to production. You can also use the env variable NEOMARIL_URL to set this
- Type:
str
Example
Getting a model, testint its healthy and putting it to run the prediction
from neomaril_codex.preprocessing import NeomarilPreprocessingClient from neomaril_codex.model import NeomarilModelClient client = NeomarilPreprocessingClient('123456') client.search_preprocessing() preprocessing = client.get_preprocessing(preprocessing_id='S72110d87c2a4341a7ef0a0cb35e483699db1df6c5d2450f92573c093c65b062', group='ex_group')
- get_logs(*, start: str | None = None, end: str | None = None, routine: str | None = None, type: str | None = None)[source]#
Get the logs
- Parameters:
start (str, optional) – Date to start filter. At the format aaaa-mm-dd
end (str, optional) – Date to end filter. At the format aaaa-mm-dd
routine (str, optional) – Type of routine beeing executed, can assume values Host or Run
type (str, optional) – Defines the type of the logs that are going to be filtered, can assume the values Ok, Error, Debug or Warning
- Raises:
ServerError – Unexpected server error
- Returns:
Logs list
- Return type:
json
Example
>>> preprocessing.get_logs(start='2023-01-31', end='2023-02-24', routine='Run', type='Error') {'Results': [{'Hash': 'M9c3af308c754ee7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4', 'RegisteredAt': '2023-01-31T16:06:45.5955220Z', 'OutputType': 'Error', 'OutputData': '', 'Routine': 'Run'}] }
- get_preprocessing_execution(exec_id: str) NeomarilExecution [source]#
Get a execution instance for that preprocessing.
- Parameters:
exec_id (str) – Execution id
- Raises:
PreprocessingError – If the user tries to get a execution from a Sync preprocessing
Example
>>> preprocessing.get_preprocessing_execution('1')
- run(*, data: dict | str, group_token: str | None = None, wait_complete: bool | None = False) dict | NeomarilExecution [source]#
Runs a prediction from the current pre processing.
- Parameters:
data (Union[dict, str]) – The same data that is used in the source file. If Sync is a dict, the keys that are needed inside this dict are the ones in the schema atribute. If Async is a string with the file path with the same filename used in the source file.
group_token (str, optional) – Token for executing the pre processing (show when creating a group). It can be informed when getting the preprocessing or when running predictions, or using the env variable NEOMARIL_GROUP_TOKEN
wait_complete (bool, optional) – Boolean that informs if a pre processing training is completed (True) or not (False). Default value is False
- Raises:
PreprocessingError – Pre processing is not available
- Returns:
The return of the scoring function in the source file for Sync pre processing or the execution class for Async pre processing.
- Return type:
Union[dict, NeomarilExecution]
- set_token(group_token: str) None [source]#
Saves the group token for this pre processing instance.
- Parameters:
group_token (str) – Token for executing the pre processing (show when creating a group). You can set this using the NEOMARIL_GROUP_TOKEN env variable
Example
>>> preprocessing.set_token('6cb64889a45a45ea8749881e30c136df')
neomaril_codex.preprocessing.NeomarilPreprocessingClient#
- class neomaril_codex.preprocessing.NeomarilPreprocessingClient(*, login: str | None = None, password: str | None = None, url: str = 'https://neomaril.staging.datarisk.net/')[source]#
Bases:
BaseNeomarilClient
Class for client to access Neomaril and manage Preprocessing scripts
- login#
Login for authenticating with the client. You can also use the env variable NEOMARIL_USER to set this
- Type:
str
- password#
Password for authenticating with the client. You can also use the env variable NEOMARIL_PASSWORD to set this
- Type:
str
- url#
URL to Neomaril Server. Default value is https://neomaril.staging.datarisk.net, use it to test your deployment first before changing to production. You can also use the env variable NEOMARIL_URL to set this
- Type:
str
- Raises:
AuthenticationError – Unvalid credentials
ServerError – Server unavailable
Example
Example 1: Creation and managing a Synchronous Preprocess script
from neomaril_codex.preprocessing import NeomarilPreprocessingClient from neomaril_codex.model import NeomarilModelClient client = NeomarilPreprocessingClient('123456') PATH = './samples/syncPreprocessing/' sync_preprocessing = client.create('Teste preprocessing Sync', # model_name 'process', # name of the scoring function PATH+'app.py', # Path of the source file PATH+'requirements.txt', # Path of the requirements file, schema=PATH+'schema.json', # Path of the schema file, but it could be a dict (only required for Sync models) # env=PATH+'.env' # File for env variables (this will be encrypted in the server) # extra_files=[PATH+'utils.py'], # List with extra files paths that should be uploaded along (they will be all in the same folder) python_version='3.9', # Can be 3.8 to 3.10 operation="Sync", # Can be Sync or Async group='datarisk' # Model group (create one using the client) ) sync_preprocessing.set_token('TOKEN') result = sync_preprocessing.run({'variable' : 100}) result
Example 2: creation and deployment of an Asynchronous Preprocess script
from neomaril_codex.preprocessing import NeomarilPreprocessingClient from neomaril_codex.model import NeomarilModelClient client = NeomarilPreprocessingClient('123456') PATH = './samples/asyncPreprocessing/' async_preprocessing = client.create('Teste preprocessing Async', # model_name 'process', # name of the scoring function PATH+'app.py', # Path of the source file PATH+'requirements.txt', # Path of the requirements file, # env=PATH+'.env', # File for env variables (this will be encrypted in the server) # extra_files=[PATH+'input.csv'], # List with extra files paths that should be uploaded along (they will be all in the same folder) python_version='3.9', # Can be 3.8 to 3.10 operation="Async", # Can be Sync or Async group='datarisk', # Model group (create one using the client) input_type='csv' ) async_preprocessing.set_token('TOKEN') execution = async_preprocessing.run(PATH+'input.csv') execution.get_status() execution.wait_ready() execution.download_result()
Example 3: Using preprocessing with a Synchronous model
from neomaril_codex.preprocessing import NeomarilPreprocessingClient from neomaril_codex.model import NeomarilModelClient # the sync preprocess script configuration presented before # ... model_client = NeomarilModelClient('123456') sync_model = model_client.get_model(group='datarisk', model_id='M3aa182ff161478a97f4d3b2dc0e9b064d5a9e7330174daeb302e01586b9654c') sync_model.predict(data=sync_model.schema, preprocessing=sync_preprocessing)
Example 4: Using preprocessing with an Asynchronous model
from neomaril_codex.preprocessing import NeomarilPreprocessingClient from neomaril_codex.model import NeomarilModelClient # the async preprocess script configuration presented before # ... async_model = model_client.get_model(group='datarisk', model_id='Maa3449c7f474567b6556614a12039d8bfdad0117fec47b2a4e03fcca90b7e7c') PATH = './samples/asyncModel/' execution = async_model.predict(PATH+'input.csv', preprocessing=async_preprocessing) execution.wait_ready() execution.download_result()
- create(*, preprocessing_name: str, preprocessing_reference: str, source_file: str, requirements_file: str, schema: str | dict | None = None, group: str, extra_files: list | None = None, env: str | None = None, python_version: str = '3.8', operation='Sync', input_type: str = 'json|csv|parquet', wait_for_ready: bool = True) NeomarilPreprocessing | str [source]#
Deploy a new preprocessing to Neomaril.
- Parameters:
preprocessing_name (str) – The name of the pre processing, in less than 32 characters
preprocessing_reference (str) – The name of the scoring function inside the source file
source_file (str) – Path of the source file. The file must have a scoring function that accepts two parameters: data (data for the request body of the preprocessing) and preprocessing_path (absolute path of where the file is located)
requirements_file (str) – Path of the requirements file. The packages versions must be fixed eg: pandas==1.0
schema (Union[str, dict]) – Path to a JSON or XML file with a sample of the input for the entrypoint function. A dict with the sample input can be send as well. Mandatory for Sync preprocessing
group (str) – Group the pre processing is inserted. Default to ‘datarisk’ (public group)
extra_files (list, optional) – A optional list with additional files paths that should be uploaded. If the scoring function refer to this file they will be on the same folder as the source file
env (str, optional) – Flag that choose which environment (dev, staging, production) of Neomaril you are using. Default is True
python_version (str, optional) – Python version for the pre processing environment. Avaliable versions are 3.8, 3.9, 3.10. Defaults to ‘3.8’
operation (str) – Defines wich kind operation is beeing executed (Sync or Async). Default value is Sync
input_type (str) – The type of the input file that should be ‘json’, ‘csv’ or ‘parquet’
wait_for_ready (bool, optional) – Wait for preprocessing to be ready and returns a NeomarilPreprocessing instace with the new preprocessing. Defaults to True
- Raises:
InputError – Some input parameters its invalid
- Returns:
Returns the new preprocessing, if wait_for_ready=True runs the deploy process synchronously. If its False, returns nothing after sending all the data to server and runs the deploy asynchronously
- Return type:
Union[NeomarilPreprocessing, str]
Example
>>> preprocessing = client.create('Pre processing Example Sync', 'score', './samples/syncPreprocessing/app.py', './samples/syncPreprocessing/'preprocessing.pkl', './samples/syncPreprocessing/requirements.txt','./samples/syncPreprocessing/schema.json', group=group, operation="Sync")
- get_execution(preprocessing_id: str, exec_id: str, group: str | None = None) NeomarilExecution [source]#
Get a execution instace (Async pre processing only).
- Parameters:
preprocessing_id (str) – Pre processing id (hash)
exec_id (str) – Execution id
group (str, optional) – Group name, default value is None
- Returns:
The new execution
- Return type:
Example
>>> preprocessing.get_execution( preprocessing_id='M9c3af308c754ee7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4', exec_id = '1')
- get_logs(*, preprocessing_id, start: str | None = None, end: str | None = None, routine: str | None = None, type: str | None = None)[source]#
Get the logs
- Parameters:
preprocessing_id (str) – Pre processing id (hash)
start (str, optional) – Date to start filter. At the format aaaa-mm-dd
end (str, optional) – Date to end filter. At the format aaaa-mm-dd
routine (str, optional) – Type of routine being executed, can assume values ‘Host’ (for deployment logs) or ‘Run’ (for execution logs)
type (str, optional) – Defines the type of the logs that are going to be filtered, can assume the values ‘Ok’, ‘Error’, ‘Debug’ or ‘Warning’
- Raises:
ServerError – Unexpected server error
- Returns:
Logs list
- Return type:
json
Example
>>> preprocessing.get_logs(routine='Run') {'Results': [{'Hash': 'B4c3af308c3e452e7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4', 'RegisteredAt': '2023-02-03T16:06:45.5955220Z', 'OutputType': 'Ok', 'OutputData': '', 'Routine': 'Run'}] }
- get_preprocessing(*, preprocessing_id: str, group: str = 'datarisk', group_token: str | None = None, wait_for_ready: bool = True) NeomarilPreprocessing [source]#
Access a pre processing using its id
- Parameters:
preprocessing_id (str) – Pre processing id (hash) that needs to be accessed
group (str) – Group the pre processing is inserted. Default is ‘datarisk’ (public group)
group_token (str, optional) – Token for executing the pre processing (show when creating a group). It can be informed when getting the preprocessing or when running predictions, or using the env variable NEOMARIL_GROUP_TOKEN
wait_for_ready (bool) – If the pre processing is being deployed, wait for it to be ready instead of failing the request. Defaults to True
- Raises:
PreprocessingError – Pre processing unavailable
ServerError – Unknown return from server
- Returns:
A NeomarilPreprocessing instance with the pre processing hash from preprocessing_id
- Return type:
Example
>>> preprocessing.get_preprocessing(preprocessing_id='M9c3af308c754ee7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4', group='ex_group')
- search_preprocessing(*, name: str | None = None, state: str | None = None, group: str | None = None, only_deployed: bool = False) list [source]#
Search for pre processing using the name of the pre processing
- Parameters:
name (str, optional) – Text that its expected to be on the pre processing name. It runs similar to a LIKE query on SQL
state (str, optional) – Text that its expected to be on the state. It runs similar to a LIKE query on SQL
group (str, optional) – Text that its expected to be on the group name. It runs similar to a LIKE query on SQL
only_deployed (bool, optional) – If its True, filter only pre processing ready to be used (status == “Deployed”). Defaults to False
- Raises:
ServerError – Unexpected server error
- Returns:
List with the pre processing data, it can works like a filter depending on the arguments values
- Return type:
list
Example
>>> client.search_preprocessing(group='ex_group', only_deployed=True)