Preprocessing module#

Module with all classes and methods to manage the Preprocessing scripts deployed at MLOps.

MLOpsPreprocessing#

class mlops_codex.preprocessing.MLOpsPreprocessing(*, preprocessing_id: str, login: str | None = None, password: str | None = None, group: str | None = None, group_token: str | None = None, url: str | None = None)[source]#

Bases: BaseMLOps

Class to manage Preprocessing scripts deployed inside MLOps

Parameters:
  • login (str) – Login for authenticating with the client. You can also use the env variable MLOPS_USER to set this

  • password (str) – Password for authenticating with the client. You can also use the env variable MLOPS_PASSWORD to set this

  • preprocessing_id (str) – Preprocessing script id (hash) from the script you want to access

  • group (str) – Group the model is inserted.

  • base_url (str) – URL to MLOps Server. Default value is https://neomaril.datarisk.net/, use it to test your deployment first before changing to production. You can also use the env variable MLOPS_URL to set this

Example

Getting a model, testing its healthy and putting it to run the prediction

from mlops_codex.preprocessing import MLOpsPreprocessingClient
from mlops_codex.model import MLOpsModelClient

client = MLOpsPreprocessingClient('123456')

client.search_preprocessing()

preprocessing = client.get_preprocessing(preprocessing_id='S72110d87c2a4341a7ef0a0cb35e483699db1df6c5d2450f92573c093c65b062', group='ex_group')
get_logs(*, start: str | None = None, end: str | None = None, routine: str | None = None, type: str | None = None)[source]#

Get the logs

Parameters:
  • start (Optional[str], optional) – Date to start filter. At the format aaaa-mm-dd

  • end (Optional[str], optional) – Date to end filter. At the format aaaa-mm-dd

  • routine (Optional[str], optional) – Type of routine beeing executed, can assume values Host or Run

  • type (Optional[str], optional) – Defines the type of the logs that are going to be filtered, can assume the values Ok, Error, Debug or Warning

Raises:

ServerError – Unexpected server error

Returns:

Logs list

Return type:

dict

Example

>>> preprocessing.get_logs(start='2023-01-31', end='2023-02-24', routine='Run', type='Error')
 {'Results':
    [{'Hash': 'M9c3af308c754ee7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4',
        'RegisteredAt': '2023-01-31T16:06:45.5955220Z',
        'OutputType': 'Error',
        'OutputData': '',
        'Routine': 'Run'}]
 }
get_preprocessing_execution(exec_id: str) MLOpsExecution[source]#

Get an execution instance for that preprocessing.

Parameters:

exec_id (str) – Execution id

Raises:

PreprocessingError – If the user tries to get an execution from a Sync preprocessing

Returns:

An execution instance for the preprocessing.

Return type:

MlopsExecution

Example

>>> preprocessing.get_preprocessing_execution('1')
run(*, data: str | dict | List[Tuple[str, str]] | Tuple[str, str], group_token: str | None = None, wait_complete: bool | None = False)[source]#

Runs a prediction from the current preprocessing.

Parameters:
  • data (Union[dict, str, List[Tuple[str, str]]]) – The same data that is used in the source file. If Sync is a dict, the keys that are needed inside this dict are the ones in the schema attribute. If Async is a string with the file path with the same filename used in the source file. If you wish to send more than an input file, consider send a list of tuples of the form (input_file_name, input_file_path).

  • group_token (Optional[str], optional) – Token for executing the preprocessing (show when creating a group). It can be informed when getting the preprocessing or when running predictions, or using the env variable MLOPS_GROUP_TOKEN

  • wait_complete (Optional[bool], optional) – Boolean that informs if a preprocessing training is completed (True) or not (False). Default value is False

Raises:

PreprocessingError – Pre processing is not available

Returns:

The return of the scoring function in the source file for Sync preprocessing or the execution class for Async preprocessing.

Return type:

Union[dict, MLOpsExecution]

set_token(group_token: str) None[source]#

Saves the group token for this preprocessing instance.

Parameters:

group_token (str) – Token for executing the preprocessing (show when creating a group). You can set this using the MLOPS_GROUP_TOKEN env variable

Example

>>> preprocessing.set_token('6cb64889a45a45ea8749881e30c136df')
wait_ready()[source]#

Waits the pre-processing to be with status ‘Deployed’

Example

>>> preprocessing.wait_ready()

MLOpsPreprocessingClient#

class mlops_codex.preprocessing.MLOpsPreprocessingClient(login: str | None = None, password: str | None = None, url: str | None = None)[source]#

Bases: BaseMLOpsClient

Class for client to access MLOps and manage Preprocessing scripts

Parameters:
  • login (str) – Login for authenticating with the client. You can also use the env variable MLOPS_USER to set this

  • password (str) – Password for authenticating with the client. You can also use the env variable MLOPS_PASSWORD to set this

  • url (str) – URL to MLOps Server. Default value is https://neomaril.datarisk.net/, use it to test your deployment first before changing to production. You can also use the env variable MLOPS_URL to set this

Raises:

Example

Example 1: Creation and managing a Synchronous Preprocess script

from mlops_codex.preprocessing import MLOpsPreprocessingClient
from mlops_codex.model import MLOpsModelClient

client = MLOpsPreprocessingClient('123456')
PATH = './samples/syncPreprocessing/'

sync_preprocessing = client.create('Teste preprocessing Sync', # model_name
                    'process', # name of the scoring function
                    PATH+'app.py', # Path of the source file
                    PATH+'requirements.txt', # Path of the requirements file,
                    schema=PATH+'schema.json', # Path of the schema file, but it could be a dict (only required for Sync models)
                    # env=PATH+'.env'  #  File for env variables (this will be encrypted in the server)
                    # extra_files=[PATH+'utils.py'], # List with extra files paths that should be uploaded along (they will be all in the same folder)
                    python_version='3.9', # Can be 3.8 to 3.10
                    operation="Sync", # Can be Sync or Async
                    group='datarisk' # Model group (create one using the client)
                    )

sync_preprocessing.set_token('TOKEN')

result = sync_preprocessing.run({'variable': 100})
result

Example 2: creation and deployment of an Asynchronous Preprocess script

from mlops_codex.preprocessing import MLOpsPreprocessingClient
from mlops_codex.model import MLOpsModelClient

client = MLOpsPreprocessingClient('123456')
PATH = './samples/asyncPreprocessing/'

async_preprocessing = client.create('Teste preprocessing Async', # model_name
                    'process', # name of the scoring function
                    PATH+'app.py', # Path of the source file
                    PATH+'requirements.txt', # Path of the requirements file,
                    # env=PATH+'.env',  #  File for env variables (this will be encrypted in the server)
                    # extra_files=[PATH+'input.csv'], # List with extra files paths that should be uploaded along (they will be all in the same folder)
                    python_version='3.9', # Can be 3.8 to 3.10
                    operation="Async", # Can be Sync or Async
                    group='datarisk', # Model group (create one using the client)
                    input_type='csv'
                    )

async_preprocessing.set_token('TOKEN')

execution = async_preprocessing.run(PATH+'input.csv')

execution.get_status()

execution.wait_ready()

execution.download_result()

Example 3: Using preprocessing with a Synchronous model

from mlops_codex.preprocessing import MLOpsPreprocessingClient
from mlops_codex.model import MLOpsModelClient

# the sync preprocess script configuration presented before
# ...

model_client = MLOpsModelClient('123456')

sync_model = model_client.get_model(group='datarisk', model_id='M3aa182ff161478a97f4d3b2dc0e9b064d5a9e7330174daeb302e01586b9654c')

sync_model.predict(data=sync_model.schema, preprocessing=sync_preprocessing)

Example 4: Using preprocessing with an Asynchronous model

from mlops_codex.preprocessing import MLOpsPreprocessingClient
from mlops_codex.model import MLOpsModelClient

# the async preprocess script configuration presented before
# ...

async_model = model_client.get_model(group='datarisk', model_id='Maa3449c7f474567b6556614a12039d8bfdad0117fec47b2a4e03fcca90b7e7c')

PATH = './samples/asyncModel/'

execution = async_model.predict(PATH+'input.csv', preprocessing=async_preprocessing)
execution.wait_ready()

execution.download_result()
create(*, preprocessing_name: str, preprocessing_reference: str, source_file: str, requirements_file: str, group: str, schema: str | Dict | List[Tuple[str, str]] | None = None, extra_files: List | Tuple[str, str] | List[Tuple[str, str]] | None = None, env: str | None = None, python_version: str = '3.10', operation='Sync', input_type: str = 'json|csv|parquet', wait_for_ready: bool = True) MLOpsPreprocessing[source]#

Deploy a new preprocessing to MLOps.

Parameters:
  • preprocessing_name (str) – The name of the preprocessing, in less than 32 characters

  • preprocessing_reference (str) – The name of the scoring function inside the source file

  • source_file (str) – Path of the source file. The file must have a scoring function that accepts two parameters: data (data for the request body of the preprocessing) and preprocessing_path (absolute path of where the file is located)

  • requirements_file (str) – Path of the requirements file. The packages versions must be fixed eg: pandas==1.0

  • group (str) – Group the preprocessing is inserted.

  • schema (Optional[Union[str, Dict, List[Tuple[str, str]]]]) – Path to a JSON, XML, CSV or PARQUET file with a sample of the input for the entrypoint function. A dict with the sample input can be sending as well. For async models, send a parquet or csv file For sync models, send a json or xml file If you want to upload more than a file, send a list of tuples in the format (dataset_name, dataset_file_path).

  • extra_files (Optional[Union[List, Tuple[str, str], List[Tuple[str, str]]]]) – A optional list with additional files paths that should be uploaded. If the scoring function refer to this file they will be on the same folder as the source file If you will use the extras files in the multiple preprocessing, you must upload a tuple in the format (extra_file_name, extra_file_path) or a list of tuples in that format.

  • env (Optional[str]) – Flag that choose which environment (dev, staging, production) of MLOps you are using. Default is True

  • python_version (Optional[str], optional) – Python version for the preprocessing environment. Available versions are 3.8, 3.9, 3.10. Defaults to ‘3.10’

  • operation (str) – Defines which kind operation is being executed (Sync or Async). Default value is Sync

  • input_type (str) – The type of the input file that should be ‘json’, ‘csv’ or ‘parquet’

  • wait_for_ready (Optional[bool]) – Wait for preprocessing to be ready and returns a MLOpsPreprocessing instance with the new preprocessing. Defaults to True

Raises:

InputError – Some input parameters its invalid

Returns:

Returns the new preprocessing, if wait_for_ready=True runs the deployment process synchronously. Otherwise, returns nothing after sending all the data to server and runs the deployment asynchronously

Return type:

MLOpsPreprocessing

get_execution(preprocessing_id: str, exec_id: str, group: str | None = None) MLOpsExecution[source]#

Get an execution instance (Async preprocessing only).

Parameters:
  • preprocessing_id (str) – Pre processing id (hash)

  • exec_id (str) – Execution id

  • group (str, optional) – Group name, default value is None

Returns:

The new execution

Return type:

MLOpsExecution

get_logs(*, preprocessing_id, start: str | None = None, end: str | None = None, routine: str | None = None, type: str | None = None)[source]#

Get the logs

Parameters:
  • preprocessing_id (str) – Pre processing id (hash)

  • start (Optional[str], optional) – Date to start filter. At the format aaaa-mm-dd

  • end (Optional[str], optional) – Date to end filter. At the format aaaa-mm-dd

  • routine (Optional[str], optional) – Type of routine being executed, can assume values ‘Host’ (for deployment logs) or ‘Run’ (for execution logs)

  • type (Optional[str], optional) – Defines the type of the logs that are going to be filtered, can assume the values ‘Ok’, ‘Error’, ‘Debug’ or ‘Warning’

Raises:

ServerError – Unexpected server error

Returns:

Logs list

Return type:

dict

Example

>>> preprocessing.get_logs(routine='Run')
 {'Results':
    [{'Hash': 'B4c3af308c3e452e7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4',
        'RegisteredAt': '2023-02-03T16:06:45.5955220Z',
        'OutputType': 'Ok',
        'OutputData': '',
        'Routine': 'Run'}]
 }
get_preprocessing(*, preprocessing_id: str, group: str, group_token: str | None = None, wait_for_ready: bool | None = True) MLOpsPreprocessing[source]#

Access a preprocessing using its id

Parameters:
  • preprocessing_id (str) – Pre processing id (hash) that needs to be accessed.

  • group (str) – Group the preprocessing is inserted.

  • group_token (Optional[str], optional) – Token for executing the preprocessing (show when creating a group). It can be informed when getting the preprocessing or when running predictions, or using the env variable MLOPS_GROUP_TOKEN

  • wait_for_ready (Optional[bool], optional) – If the preprocessing is being deployed, wait for it to be ready instead of failing the request. Defaults to True.

Raises:
Returns:

A MLOpsPreprocessing instance with the preprocessing hash from preprocessing_id

Return type:

MLOpsPreprocessing

Example

>>> preprocessing.get_preprocessing(preprocessing_id='M9c3af308c754ee7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4', group='ex_group')
search_preprocessing(*, name: str | None = None, state: str | None = None, group: str | None = None, start: str | None = None, end: str | None = None, only_deployed: bool = False) list[source]#

Search for preprocessing using the name of the preprocessing

Parameters:
  • name (Optional[str]) – Text that it’s expected to be on the preprocessing name. It runs similar to a LIKE query on SQL

  • state (Optional[str]) – Text that it’s expected to be on the state. It runs similar to a LIKE query on SQL

  • group (Optional[str]) – Text that it’s expected to be on the group name. It runs similar to a LIKE query on SQL

  • start (Optional[str]) – Start date to filter search record

  • end (Optional[str]) – End date to filter search record

  • only_deployed (Optional[bool]) – If it’s True, filter only preprocessing ready to be used (status == “Deployed”). Defaults to False

Raises:

ServerError – Unexpected server error

Returns:

A list with the preprocessing data, it can works like a filter depending on the arguments values

Return type:

list

Example

>>> client.search_preprocessing(group='ex_group', only_deployed=True)

MLOpsPreprocessingAsyncV2Client#

class mlops_codex.preprocessing.MLOpsPreprocessingAsyncV2Client(login: str | None = None, password: str | None = None, url: str | None = None)[source]#

Bases: BaseMLOpsClient

Class to operate actions in an asynchronous pre-processing.

Parameters:
  • login (str) – Login for authenticating with the client. You can also use the env variable MLOPS_USER to set this

  • password (str) – Password for authenticating with the client. You can also use the env variable MLOPS_PASSWORD to set this

  • url (str) – URL to MLOps Server. Default value is https://neomaril.datarisk.net, use it to test your deployment first before changing to production. You can also use the env variable MLOPS_URL to set this

create(*, name: str, group: str, script_path: str, entrypoint_function_name: str, requirements_path: str, python_version: str | None = '3.9', schema_files_path: Tuple[str, str] | List[Tuple[str, str]] | None = None, schema_datasets: str | List[str] | None = None, extra_files: Tuple[str, str] | List[Tuple[str, str]] | None = None, wait_read: bool = False)[source]#

Create a new preprocessing script.

Parameters:
  • name (str) – Name of the new preprocessing script

  • group (str) – Group of the new preprocessing script

  • schema_files_path (Optional[Union[Tuple[str, str], List[Tuple[str, str]]]) – Schema files path. It must be a tuple in the format (dataset_name, dataset_file_path). If you want to upload more than a file, send a list of tuples in the format (dataset_name, dataset_file_path).

  • schema_datasets (Optional[Union[str, List[str]]]) – Dataset to upload schema to

  • script_path (str) – Path to the python script

  • entrypoint_function_name (str) – Name of the entrypoint function in the python script

  • python_version (str) – Python version for the model environment. Available versions are 3.8, 3.9, 3.10. Defaults to ‘3.9’

  • requirements_path (str) – Path to the requirements file

  • extra_files (Union[Tuple[str, str], List[Tuple[str, str]]]) – Extra files to upload to the preprocessing script. The format must be a tuple in the format (extra_file_name, extra_file_path). If you want to upload more than a file, send a list of tuples in the format (extra_file_name, extra_file_path).

  • wait_read (bool) – If true, it will wait for the preprocessing script to finish before returning. Defaults to False.

Returns:

Preprocessing async version of the new preprocessing script.

Return type:

MLOpsPreprocessingAsyncV2

describe(preprocessing_script_hash: str)[source]#
describe_execution(preprocessing_script_hash: str, execution_id: int)[source]#
download(preprocessing_script_hash: str, execution_id: int, path: str | None = './')[source]#

Download preprocessing script execution

Parameters:
  • preprocessing_script_hash (str) – Preprocessing script hash

  • execution_id (int) – Execution id of the preprocessing script.

  • path (str) – Path to download the output of a preprocessing script execution.

Raises:
execution_status(preprocessing_script_hash: str, execution_id: int)[source]#

Get execution status for preprocessing script execution

Parameters:
  • preprocessing_script_hash (str) – Preprocessing script hash

  • execution_id (int) – Execution id of the preprocessing script.

Returns:

Return the status of the execution. If the execution is successful, the output dataset hash is also returned.

Return type:

Tuple[ModelExecutionState, Union[str, None]]

Raises:
host(preprocessing_script_hash: str, token: str) None[source]#

Host a preprocessing script to MLOps.

Parameters:
  • preprocessing_script_hash (str) – Preprocessing script hash

  • token (str) – Token to authenticate with the MLOps server

Raises:
host_status(preprocessing_script_hash: str)[source]#

Get the host status for a preprocessing script.

Parameters:

preprocessing_script_hash (str) – Preprocessing script hash

Returns:

Host status for a preprocessing script and dataset hash (if it is not available, it will return None).

Return type:

Tuple[ModelState, Union[str, None]]

Raises:
list_preprocessing()[source]#

List preprocessing scripts

register_execution(preprocessing_script_hash: str) int[source]#

Register a new execution for preprocessing script

Parameters:

preprocessing_script_hash (str) – Preprocessing script hash

Returns:

New execution id of the preprocessing script.

Return type:

int

Raises:
run(preprocessing_script_hash: str, execution_id: int)[source]#

Run preprocessing script execution

Parameters:
  • preprocessing_script_hash (str) – Preprocessing script hash

  • execution_id (int) – Execution id of the preprocessing script.

Raises:
search(group: str | None = None, state: str | None = None, start: str | None = None, end: str | None = None)[source]#
upload_input(preprocessing_script_hash: str, execution_id: int, input_file: Tuple[str, str] | None = None, dataset_hash: str | None = None) str[source]#

Upload an input file for a preprocessing script execution

Parameters:
  • preprocessing_script_hash (str) – Preprocessing script hash

  • execution_id (int) – Execution id of the preprocessing script.

  • input_file (Optional[Tuple[str, str]]) – Input file path and file name. It must be a tuple of the form (input_file_name, input_file_path).

  • dataset_hash (str) – Dataset hash uploaded or generated.

Returns:

Dataset hash of the preprocessing script execution input.

Return type:

str

Raises:
wait(preprocessing_script_hash: str, token: str)[source]#

Check host status for a preprocessing script every 30 seconds.

Parameters:
  • preprocessing_script_hash (str) – Preprocessing script hash

  • token (str) – Token to authenticate with the MLOps server

Returns:

Host status for a preprocessing script and dataset hash (if it is not available, it will return None).

Return type:

Tuple[ModelState, Union[str, None]]

Raises:

MLOpsPreprocessingAsyncV2#

class mlops_codex.preprocessing.MLOpsPreprocessingAsyncV2(*, login: str, password: str, url: str, name: str, preprocessing_hash: str, group: str, status: ModelState)[source]#

Bases: BaseModel

Preprocessing class to represent the new preprocessing

Parameters:
  • login (str) – Login for authenticating with the client.

  • password (str) – Password for authenticating with the client.

  • url (str) – URL to MLOps Server. Default value is https://neomaril.datarisk.net, use it to test your deployment first before changing to production. You can also use the env variable MLOPS_URL to set this

  • name (str) – Name of the preprocessing script.

  • group (str) – Name of the group where the script is hosted.

  • status (ModelState) – Status of the preprocessing script.

class Config[source]#

Bases: object

arbitrary_types_allowed = True#
download(execution_id: int, path: str | None = './')[source]#

Download the preprocessing script execution.

Parameters:
  • execution_id (int) – Execution id of the preprocessing script.

  • path (str, optional) – Path where to save the downloaded file.

get_execution_status(execution_id: int)[source]#

Get the status of the preprocessing script execution.

Parameters:

execution_id (int) – Execution id of the preprocessing script.

group: str#
host(wait_ready: bool = False) None[source]#

Host the preprocessing script in case you it is not hosted.

Parameters:

wait_ready (bool) – If true, it will wait for the preprocessing script to finish before returning. Defaults to False.

login: str#
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}#

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

model_post_init(context: Any, /) None#

We need to both initialize private attributes and call the user-defined model_post_init method.

name: str#
password: str#
preprocessing_hash: str#
run(*, input_files: Tuple[str, str] | List[Tuple[str, str]] | None = None, dataset_hashes: str | List[str] | None = None, wait_read: bool | None = False)[source]#

Create a new preprocessing script execution and host it.

Parameters:
  • input_files (Union[Tuple[str, str], List[Tuple[str, str]]]) – Input file path and file name. It must be a tuple of the form (input_file_name, input_file_path). If you wish to send more than an input file, consider send a list of tuples of the form (input_file_name, input_file_path).

  • dataset_hashes (Optional[Union[str, List[str]]]) – List of dataset hashes. If you have just one dataset hash, consider send a single string.

  • wait_read (Optional[bool]) – If true, it will wait for the preprocessing script execution to finish before returning. Defaults to False.

status: ModelState#
url: str#