Preprocessing module#
Module with all classes and methods to manage the Preprocessing scripts deployed at MLOps.
MLOpsPreprocessing#
- class mlops_codex.preprocessing.MLOpsPreprocessing(*, preprocessing_id: str, login: str | None = None, password: str | None = None, group: str | None = None, group_token: str | None = None, url: str | None = None)[source]#
Bases:
BaseMLOps
Class to manage Preprocessing scripts deployed inside MLOps
- Parameters:
login (str) – Login for authenticating with the client. You can also use the env variable MLOPS_USER to set this
password (str) – Password for authenticating with the client. You can also use the env variable MLOPS_PASSWORD to set this
preprocessing_id (str) – Preprocessing script id (hash) from the script you want to access
group (str) – Group the model is inserted.
base_url (str) – URL to MLOps Server. Default value is https://neomaril.datarisk.net/, use it to test your deployment first before changing to production. You can also use the env variable MLOPS_URL to set this
Example
Getting a model, testing its healthy and putting it to run the prediction
from mlops_codex.preprocessing import MLOpsPreprocessingClient from mlops_codex.model import MLOpsModelClient client = MLOpsPreprocessingClient('123456') client.search_preprocessing() preprocessing = client.get_preprocessing(preprocessing_id='S72110d87c2a4341a7ef0a0cb35e483699db1df6c5d2450f92573c093c65b062', group='ex_group')
- get_logs(*, start: str | None = None, end: str | None = None, routine: str | None = None, type: str | None = None)[source]#
Get the logs
- Parameters:
start (Optional[str], optional) – Date to start filter. At the format aaaa-mm-dd
end (Optional[str], optional) – Date to end filter. At the format aaaa-mm-dd
routine (Optional[str], optional) – Type of routine beeing executed, can assume values Host or Run
type (Optional[str], optional) – Defines the type of the logs that are going to be filtered, can assume the values Ok, Error, Debug or Warning
- Raises:
ServerError – Unexpected server error
- Returns:
Logs list
- Return type:
dict
Example
>>> preprocessing.get_logs(start='2023-01-31', end='2023-02-24', routine='Run', type='Error') {'Results': [{'Hash': 'M9c3af308c754ee7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4', 'RegisteredAt': '2023-01-31T16:06:45.5955220Z', 'OutputType': 'Error', 'OutputData': '', 'Routine': 'Run'}] }
- get_preprocessing_execution(exec_id: str) MLOpsExecution [source]#
Get an execution instance for that preprocessing.
- Parameters:
exec_id (str) – Execution id
- Raises:
PreprocessingError – If the user tries to get an execution from a Sync preprocessing
- Returns:
An execution instance for the preprocessing.
- Return type:
MlopsExecution
Example
>>> preprocessing.get_preprocessing_execution('1')
- run(*, data: str | dict | List[Tuple[str, str]] | Tuple[str, str], group_token: str | None = None, wait_complete: bool | None = False)[source]#
Runs a prediction from the current preprocessing.
- Parameters:
data (Union[dict, str, List[Tuple[str, str]]]) – The same data that is used in the source file. If Sync is a dict, the keys that are needed inside this dict are the ones in the schema attribute. If Async is a string with the file path with the same filename used in the source file. If you wish to send more than an input file, consider send a list of tuples of the form (input_file_name, input_file_path).
group_token (Optional[str], optional) – Token for executing the preprocessing (show when creating a group). It can be informed when getting the preprocessing or when running predictions, or using the env variable MLOPS_GROUP_TOKEN
wait_complete (Optional[bool], optional) – Boolean that informs if a preprocessing training is completed (True) or not (False). Default value is False
- Raises:
PreprocessingError – Pre processing is not available
- Returns:
The return of the scoring function in the source file for Sync preprocessing or the execution class for Async preprocessing.
- Return type:
Union[dict, MLOpsExecution]
- set_token(group_token: str) None [source]#
Saves the group token for this preprocessing instance.
- Parameters:
group_token (str) – Token for executing the preprocessing (show when creating a group). You can set this using the MLOPS_GROUP_TOKEN env variable
Example
>>> preprocessing.set_token('6cb64889a45a45ea8749881e30c136df')
MLOpsPreprocessingClient#
- class mlops_codex.preprocessing.MLOpsPreprocessingClient(login: str | None = None, password: str | None = None, url: str | None = None)[source]#
Bases:
BaseMLOpsClient
Class for client to access MLOps and manage Preprocessing scripts
- Parameters:
login (str) – Login for authenticating with the client. You can also use the env variable MLOPS_USER to set this
password (str) – Password for authenticating with the client. You can also use the env variable MLOPS_PASSWORD to set this
url (str) – URL to MLOps Server. Default value is https://neomaril.datarisk.net/, use it to test your deployment first before changing to production. You can also use the env variable MLOPS_URL to set this
- Raises:
AuthenticationError – Invalid credentials
ServerError – Server unavailable
Example
Example 1: Creation and managing a Synchronous Preprocess script
from mlops_codex.preprocessing import MLOpsPreprocessingClient from mlops_codex.model import MLOpsModelClient client = MLOpsPreprocessingClient('123456') PATH = './samples/syncPreprocessing/' sync_preprocessing = client.create('Teste preprocessing Sync', # model_name 'process', # name of the scoring function PATH+'app.py', # Path of the source file PATH+'requirements.txt', # Path of the requirements file, schema=PATH+'schema.json', # Path of the schema file, but it could be a dict (only required for Sync models) # env=PATH+'.env' # File for env variables (this will be encrypted in the server) # extra_files=[PATH+'utils.py'], # List with extra files paths that should be uploaded along (they will be all in the same folder) python_version='3.9', # Can be 3.8 to 3.10 operation="Sync", # Can be Sync or Async group='datarisk' # Model group (create one using the client) ) sync_preprocessing.set_token('TOKEN') result = sync_preprocessing.run({'variable': 100}) result
Example 2: creation and deployment of an Asynchronous Preprocess script
from mlops_codex.preprocessing import MLOpsPreprocessingClient from mlops_codex.model import MLOpsModelClient client = MLOpsPreprocessingClient('123456') PATH = './samples/asyncPreprocessing/' async_preprocessing = client.create('Teste preprocessing Async', # model_name 'process', # name of the scoring function PATH+'app.py', # Path of the source file PATH+'requirements.txt', # Path of the requirements file, # env=PATH+'.env', # File for env variables (this will be encrypted in the server) # extra_files=[PATH+'input.csv'], # List with extra files paths that should be uploaded along (they will be all in the same folder) python_version='3.9', # Can be 3.8 to 3.10 operation="Async", # Can be Sync or Async group='datarisk', # Model group (create one using the client) input_type='csv' ) async_preprocessing.set_token('TOKEN') execution = async_preprocessing.run(PATH+'input.csv') execution.get_status() execution.wait_ready() execution.download_result()
Example 3: Using preprocessing with a Synchronous model
from mlops_codex.preprocessing import MLOpsPreprocessingClient from mlops_codex.model import MLOpsModelClient # the sync preprocess script configuration presented before # ... model_client = MLOpsModelClient('123456') sync_model = model_client.get_model(group='datarisk', model_id='M3aa182ff161478a97f4d3b2dc0e9b064d5a9e7330174daeb302e01586b9654c') sync_model.predict(data=sync_model.schema, preprocessing=sync_preprocessing)
Example 4: Using preprocessing with an Asynchronous model
from mlops_codex.preprocessing import MLOpsPreprocessingClient from mlops_codex.model import MLOpsModelClient # the async preprocess script configuration presented before # ... async_model = model_client.get_model(group='datarisk', model_id='Maa3449c7f474567b6556614a12039d8bfdad0117fec47b2a4e03fcca90b7e7c') PATH = './samples/asyncModel/' execution = async_model.predict(PATH+'input.csv', preprocessing=async_preprocessing) execution.wait_ready() execution.download_result()
- create(*, preprocessing_name: str, preprocessing_reference: str, source_file: str, requirements_file: str, group: str, schema: str | Dict | List[Tuple[str, str]] | None = None, extra_files: List | Tuple[str, str] | List[Tuple[str, str]] | None = None, env: str | None = None, python_version: str = '3.10', operation='Sync', input_type: str = 'json|csv|parquet', wait_for_ready: bool = True) MLOpsPreprocessing [source]#
Deploy a new preprocessing to MLOps.
- Parameters:
preprocessing_name (str) – The name of the preprocessing, in less than 32 characters
preprocessing_reference (str) – The name of the scoring function inside the source file
source_file (str) – Path of the source file. The file must have a scoring function that accepts two parameters: data (data for the request body of the preprocessing) and preprocessing_path (absolute path of where the file is located)
requirements_file (str) – Path of the requirements file. The packages versions must be fixed eg: pandas==1.0
group (str) – Group the preprocessing is inserted.
schema (Optional[Union[str, Dict, List[Tuple[str, str]]]]) – Path to a JSON, XML, CSV or PARQUET file with a sample of the input for the entrypoint function. A dict with the sample input can be sending as well. For async models, send a parquet or csv file For sync models, send a json or xml file If you want to upload more than a file, send a list of tuples in the format (dataset_name, dataset_file_path).
extra_files (Optional[Union[List, Tuple[str, str], List[Tuple[str, str]]]]) – A optional list with additional files paths that should be uploaded. If the scoring function refer to this file they will be on the same folder as the source file If you will use the extras files in the multiple preprocessing, you must upload a tuple in the format (extra_file_name, extra_file_path) or a list of tuples in that format.
env (Optional[str]) – Flag that choose which environment (dev, staging, production) of MLOps you are using. Default is True
python_version (Optional[str], optional) – Python version for the preprocessing environment. Available versions are 3.8, 3.9, 3.10. Defaults to ‘3.10’
operation (str) – Defines which kind operation is being executed (Sync or Async). Default value is Sync
input_type (str) – The type of the input file that should be ‘json’, ‘csv’ or ‘parquet’
wait_for_ready (Optional[bool]) – Wait for preprocessing to be ready and returns a MLOpsPreprocessing instance with the new preprocessing. Defaults to True
- Raises:
InputError – Some input parameters its invalid
- Returns:
Returns the new preprocessing, if wait_for_ready=True runs the deployment process synchronously. Otherwise, returns nothing after sending all the data to server and runs the deployment asynchronously
- Return type:
- get_execution(preprocessing_id: str, exec_id: str, group: str | None = None) MLOpsExecution [source]#
Get an execution instance (Async preprocessing only).
- Parameters:
preprocessing_id (str) – Pre processing id (hash)
exec_id (str) – Execution id
group (str, optional) – Group name, default value is None
- Returns:
The new execution
- Return type:
- get_logs(*, preprocessing_id, start: str | None = None, end: str | None = None, routine: str | None = None, type: str | None = None)[source]#
Get the logs
- Parameters:
preprocessing_id (str) – Pre processing id (hash)
start (Optional[str], optional) – Date to start filter. At the format aaaa-mm-dd
end (Optional[str], optional) – Date to end filter. At the format aaaa-mm-dd
routine (Optional[str], optional) – Type of routine being executed, can assume values ‘Host’ (for deployment logs) or ‘Run’ (for execution logs)
type (Optional[str], optional) – Defines the type of the logs that are going to be filtered, can assume the values ‘Ok’, ‘Error’, ‘Debug’ or ‘Warning’
- Raises:
ServerError – Unexpected server error
- Returns:
Logs list
- Return type:
dict
Example
>>> preprocessing.get_logs(routine='Run') {'Results': [{'Hash': 'B4c3af308c3e452e7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4', 'RegisteredAt': '2023-02-03T16:06:45.5955220Z', 'OutputType': 'Ok', 'OutputData': '', 'Routine': 'Run'}] }
- get_preprocessing(*, preprocessing_id: str, group: str, group_token: str | None = None, wait_for_ready: bool | None = True) MLOpsPreprocessing [source]#
Access a preprocessing using its id
- Parameters:
preprocessing_id (str) – Pre processing id (hash) that needs to be accessed.
group (str) – Group the preprocessing is inserted.
group_token (Optional[str], optional) – Token for executing the preprocessing (show when creating a group). It can be informed when getting the preprocessing or when running predictions, or using the env variable MLOPS_GROUP_TOKEN
wait_for_ready (Optional[bool], optional) – If the preprocessing is being deployed, wait for it to be ready instead of failing the request. Defaults to True.
- Raises:
PreprocessingError – Pre processing unavailable
ServerError – Unknown return from server
- Returns:
A MLOpsPreprocessing instance with the preprocessing hash from preprocessing_id
- Return type:
Example
>>> preprocessing.get_preprocessing(preprocessing_id='M9c3af308c754ee7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4', group='ex_group')
- search_preprocessing(*, name: str | None = None, state: str | None = None, group: str | None = None, start: str | None = None, end: str | None = None, only_deployed: bool = False) list [source]#
Search for preprocessing using the name of the preprocessing
- Parameters:
name (Optional[str]) – Text that it’s expected to be on the preprocessing name. It runs similar to a LIKE query on SQL
state (Optional[str]) – Text that it’s expected to be on the state. It runs similar to a LIKE query on SQL
group (Optional[str]) – Text that it’s expected to be on the group name. It runs similar to a LIKE query on SQL
start (Optional[str]) – Start date to filter search record
end (Optional[str]) – End date to filter search record
only_deployed (Optional[bool]) – If it’s True, filter only preprocessing ready to be used (status == “Deployed”). Defaults to False
- Raises:
ServerError – Unexpected server error
- Returns:
A list with the preprocessing data, it can works like a filter depending on the arguments values
- Return type:
list
Example
>>> client.search_preprocessing(group='ex_group', only_deployed=True)
MLOpsPreprocessingAsyncV2Client#
- class mlops_codex.preprocessing.MLOpsPreprocessingAsyncV2Client(login: str | None = None, password: str | None = None, url: str | None = None)[source]#
Bases:
BaseMLOpsClient
Class to operate actions in an asynchronous pre-processing.
- Parameters:
login (str) – Login for authenticating with the client. You can also use the env variable MLOPS_USER to set this
password (str) – Password for authenticating with the client. You can also use the env variable MLOPS_PASSWORD to set this
url (str) – URL to MLOps Server. Default value is https://neomaril.datarisk.net, use it to test your deployment first before changing to production. You can also use the env variable MLOPS_URL to set this
- create(*, name: str, group: str, script_path: str, entrypoint_function_name: str, requirements_path: str, python_version: str | None = '3.9', schema_files_path: Tuple[str, str] | List[Tuple[str, str]] | None = None, schema_datasets: str | List[str] | None = None, extra_files: Tuple[str, str] | List[Tuple[str, str]] | None = None, wait_read: bool = False)[source]#
Create a new preprocessing script.
- Parameters:
name (str) – Name of the new preprocessing script
group (str) – Group of the new preprocessing script
schema_files_path (Optional[Union[Tuple[str, str], List[Tuple[str, str]]]) – Schema files path. It must be a tuple in the format (dataset_name, dataset_file_path). If you want to upload more than a file, send a list of tuples in the format (dataset_name, dataset_file_path).
schema_datasets (Optional[Union[str, List[str]]]) – Dataset to upload schema to
script_path (str) – Path to the python script
entrypoint_function_name (str) – Name of the entrypoint function in the python script
python_version (str) – Python version for the model environment. Available versions are 3.8, 3.9, 3.10. Defaults to ‘3.9’
requirements_path (str) – Path to the requirements file
extra_files (Union[Tuple[str, str], List[Tuple[str, str]]]) – Extra files to upload to the preprocessing script. The format must be a tuple in the format (extra_file_name, extra_file_path). If you want to upload more than a file, send a list of tuples in the format (extra_file_name, extra_file_path).
wait_read (bool) – If true, it will wait for the preprocessing script to finish before returning. Defaults to False.
- Returns:
Preprocessing async version of the new preprocessing script.
- Return type:
- download(preprocessing_script_hash: str, execution_id: int, path: str | None = './')[source]#
Download preprocessing script execution
- Parameters:
preprocessing_script_hash (str) – Preprocessing script hash
execution_id (int) – Execution id of the preprocessing script.
path (str) – Path to download the output of a preprocessing script execution.
- Raises:
AuthenticationError – Raised if there is an authentication issue.
PreprocessingError – Raised could not find a preprocessing script hash.
ServerError – Raised if the server encounters an issue.
- execution_status(preprocessing_script_hash: str, execution_id: int)[source]#
Get execution status for preprocessing script execution
- Parameters:
preprocessing_script_hash (str) – Preprocessing script hash
execution_id (int) – Execution id of the preprocessing script.
- Returns:
Return the status of the execution. If the execution is successful, the output dataset hash is also returned.
- Return type:
Tuple[ModelExecutionState, Union[str, None]]
- Raises:
AuthenticationError – Raised if there is an authentication issue.
PreprocessingError – Raised could not find a preprocessing script hash.
ServerError – Raised if the server encounters an issue.
- host(preprocessing_script_hash: str, token: str) None [source]#
Host a preprocessing script to MLOps.
- Parameters:
preprocessing_script_hash (str) – Preprocessing script hash
token (str) – Token to authenticate with the MLOps server
- Raises:
AuthenticationError – Raised if there is an authentication issue.
PreprocessingError – Raised could not find a preprocessing script hash.
ServerError – Raised if the server encounters an issue.
- host_status(preprocessing_script_hash: str)[source]#
Get the host status for a preprocessing script.
- Parameters:
preprocessing_script_hash (str) – Preprocessing script hash
- Returns:
Host status for a preprocessing script and dataset hash (if it is not available, it will return None).
- Return type:
Tuple[ModelState, Union[str, None]]
- Raises:
AuthenticationError – Raised if there is an authentication issue.
PreprocessingError – Raised could not find a preprocessing script hash.
ServerError – Raised if the server encounters an issue.
- register_execution(preprocessing_script_hash: str) int [source]#
Register a new execution for preprocessing script
- Parameters:
preprocessing_script_hash (str) – Preprocessing script hash
- Returns:
New execution id of the preprocessing script.
- Return type:
int
- Raises:
AuthenticationError – Raised if there is an authentication issue.
PreprocessingError – Raised could not find a preprocessing script hash.
ServerError – Raised if the server encounters an issue.
- run(preprocessing_script_hash: str, execution_id: int)[source]#
Run preprocessing script execution
- Parameters:
preprocessing_script_hash (str) – Preprocessing script hash
execution_id (int) – Execution id of the preprocessing script.
- Raises:
AuthenticationError – Raised if there is an authentication issue.
PreprocessingError – Raised could not find a preprocessing script hash.
ServerError – Raised if the server encounters an issue.
- search(group: str | None = None, state: str | None = None, start: str | None = None, end: str | None = None)[source]#
- upload_input(preprocessing_script_hash: str, execution_id: int, input_file: Tuple[str, str] | None = None, dataset_hash: str | None = None) str [source]#
Upload an input file for a preprocessing script execution
- Parameters:
preprocessing_script_hash (str) – Preprocessing script hash
execution_id (int) – Execution id of the preprocessing script.
input_file (Optional[Tuple[str, str]]) – Input file path and file name. It must be a tuple of the form (input_file_name, input_file_path).
dataset_hash (str) – Dataset hash uploaded or generated.
- Returns:
Dataset hash of the preprocessing script execution input.
- Return type:
str
- Raises:
AuthenticationError – Raised if there is an authentication issue.
PreprocessingError – Raised could not find a preprocessing script hash.
ServerError – Raised if the server encounters an issue.
- wait(preprocessing_script_hash: str, token: str)[source]#
Check host status for a preprocessing script every 30 seconds.
- Parameters:
preprocessing_script_hash (str) – Preprocessing script hash
token (str) – Token to authenticate with the MLOps server
- Returns:
Host status for a preprocessing script and dataset hash (if it is not available, it will return None).
- Return type:
Tuple[ModelState, Union[str, None]]
- Raises:
AuthenticationError – Raised if there is an authentication issue.
PreprocessingError – Raised could not find a preprocessing script hash.
ServerError – Raised if the server encounters an issue.
MLOpsPreprocessingAsyncV2#
- class mlops_codex.preprocessing.MLOpsPreprocessingAsyncV2(*, login: str, password: str, url: str, name: str, preprocessing_hash: str, group: str, status: ModelState)[source]#
Bases:
BaseModel
Preprocessing class to represent the new preprocessing
- Parameters:
login (str) – Login for authenticating with the client.
password (str) – Password for authenticating with the client.
url (str) – URL to MLOps Server. Default value is https://neomaril.datarisk.net, use it to test your deployment first before changing to production. You can also use the env variable MLOPS_URL to set this
name (str) – Name of the preprocessing script.
group (str) – Name of the group where the script is hosted.
status (ModelState) – Status of the preprocessing script.
- download(execution_id: int, path: str | None = './')[source]#
Download the preprocessing script execution.
- Parameters:
execution_id (int) – Execution id of the preprocessing script.
path (str, optional) – Path where to save the downloaded file.
- get_execution_status(execution_id: int)[source]#
Get the status of the preprocessing script execution.
- Parameters:
execution_id (int) – Execution id of the preprocessing script.
- group: str#
- host(wait_ready: bool = False) None [source]#
Host the preprocessing script in case you it is not hosted.
- Parameters:
wait_ready (bool) – If true, it will wait for the preprocessing script to finish before returning. Defaults to False.
- login: str#
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}#
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- model_post_init(context: Any, /) None #
We need to both initialize private attributes and call the user-defined model_post_init method.
- name: str#
- password: str#
- preprocessing_hash: str#
- run(*, input_files: Tuple[str, str] | List[Tuple[str, str]] | None = None, dataset_hashes: str | List[str] | None = None, wait_read: bool | None = False)[source]#
Create a new preprocessing script execution and host it.
- Parameters:
input_files (Union[Tuple[str, str], List[Tuple[str, str]]]) – Input file path and file name. It must be a tuple of the form (input_file_name, input_file_path). If you wish to send more than an input file, consider send a list of tuples of the form (input_file_name, input_file_path).
dataset_hashes (Optional[Union[str, List[str]]]) – List of dataset hashes. If you have just one dataset hash, consider send a single string.
wait_read (Optional[bool]) – If true, it will wait for the preprocessing script execution to finish before returning. Defaults to False.
- status: ModelState#
- url: str#