Preprocessing module#

Module with all classes and methods to manage the Preprocessing scripts deployed at Neomaril.

neomaril_codex.preprocessing.NeomarilPreprocessing#

class neomaril_codex.preprocessing.NeomarilPreprocessing(*, preprocessing_id: str, login: str | None = None, password: str | None = None, group: str = 'datarisk', group_token: str | None = None, url: str = 'https://neomaril.staging.datarisk.net/')[source]#

Bases: BaseNeomaril

Class to manage Preprocessing scripts deployed inside Neomaril

login#

Login for authenticating with the client. You can also use the env variable NEOMARIL_USER to set this

Type:

str

password#

Password for authenticating with the client. You can also use the env variable NEOMARIL_PASSWORD to set this

Type:

str

preprocessing_id#

Preprocessing script id (hash) from the script you want to access

Type:

str

group#

Group the model is inserted. Default is ‘datarisk’ (public group)

Type:

str

base_url#

URL to Neomaril Server. Default value is https://neomaril.staging.datarisk.net, use it to test your deployment first before changing to production. You can also use the env variable NEOMARIL_URL to set this

Type:

str

Example

Getting a model, testint its healthy and putting it to run the prediction

from neomaril_codex.preprocessing import NeomarilPreprocessingClient
from neomaril_codex.model import NeomarilModelClient

client = NeomarilPreprocessingClient('123456')

client.search_preprocessing()

preprocessing = client.get_preprocessing(preprocessing_id='S72110d87c2a4341a7ef0a0cb35e483699db1df6c5d2450f92573c093c65b062', group='ex_group')
get_logs(*, start: str | None = None, end: str | None = None, routine: str | None = None, type: str | None = None)[source]#

Get the logs

Parameters:
  • start (str, optional) – Date to start filter. At the format aaaa-mm-dd

  • end (str, optional) – Date to end filter. At the format aaaa-mm-dd

  • routine (str, optional) – Type of routine beeing executed, can assume values Host or Run

  • type (str, optional) – Defines the type of the logs that are going to be filtered, can assume the values Ok, Error, Debug or Warning

Raises:

ServerError – Unexpected server error

Returns:

Logs list

Return type:

json

Example

>>> preprocessing.get_logs(start='2023-01-31', end='2023-02-24', routine='Run', type='Error')
 {'Results':
    [{'Hash': 'M9c3af308c754ee7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4',
        'RegisteredAt': '2023-01-31T16:06:45.5955220Z',
        'OutputType': 'Error',
        'OutputData': '',
        'Routine': 'Run'}]
 }
get_preprocessing_execution(exec_id: str) NeomarilExecution[source]#

Get a execution instance for that preprocessing.

Parameters:

exec_id (str) – Execution id

Raises:

PreprocessingError – If the user tries to get a execution from a Sync preprocessing

Example

>>> preprocessing.get_preprocessing_execution('1')
run(*, data: dict | str, group_token: str | None = None, wait_complete: bool | None = False) dict | NeomarilExecution[source]#

Runs a prediction from the current pre processing.

Parameters:
  • data (Union[dict, str]) – The same data that is used in the source file. If Sync is a dict, the keys that are needed inside this dict are the ones in the schema atribute. If Async is a string with the file path with the same filename used in the source file.

  • group_token (str, optional) – Token for executing the pre processing (show when creating a group). It can be informed when getting the preprocessing or when running predictions, or using the env variable NEOMARIL_GROUP_TOKEN

  • wait_complete (bool, optional) – Boolean that informs if a pre processing training is completed (True) or not (False). Default value is False

Raises:

PreprocessingError – Pre processing is not available

Returns:

The return of the scoring function in the source file for Sync pre processing or the execution class for Async pre processing.

Return type:

Union[dict, NeomarilExecution]

set_token(group_token: str) None[source]#

Saves the group token for this pre processing instance.

Parameters:

group_token (str) – Token for executing the pre processing (show when creating a group). You can set this using the NEOMARIL_GROUP_TOKEN env variable

Example

>>> preprocessing.set_token('6cb64889a45a45ea8749881e30c136df')
wait_ready()[source]#

Waits the pre processing to be with status ‘Deployed’

Example

>>> preprocessing.wait_ready()

neomaril_codex.preprocessing.NeomarilPreprocessingClient#

class neomaril_codex.preprocessing.NeomarilPreprocessingClient(*, login: str | None = None, password: str | None = None, url: str = 'https://neomaril.staging.datarisk.net/')[source]#

Bases: BaseNeomarilClient

Class for client to access Neomaril and manage Preprocessing scripts

login#

Login for authenticating with the client. You can also use the env variable NEOMARIL_USER to set this

Type:

str

password#

Password for authenticating with the client. You can also use the env variable NEOMARIL_PASSWORD to set this

Type:

str

url#

URL to Neomaril Server. Default value is https://neomaril.staging.datarisk.net, use it to test your deployment first before changing to production. You can also use the env variable NEOMARIL_URL to set this

Type:

str

Raises:

Example

Example 1: Creation and managing a Synchronous Preprocess script

from neomaril_codex.preprocessing import NeomarilPreprocessingClient
from neomaril_codex.model import NeomarilModelClient

client = NeomarilPreprocessingClient('123456')
PATH = './samples/syncPreprocessing/'

sync_preprocessing = client.create('Teste preprocessing Sync', # model_name
                    'process', # name of the scoring function
                    PATH+'app.py', # Path of the source file
                    PATH+'requirements.txt', # Path of the requirements file,
                    schema=PATH+'schema.json', # Path of the schema file, but it could be a dict (only required for Sync models)
                    # env=PATH+'.env'  #  File for env variables (this will be encrypted in the server)
                    # extra_files=[PATH+'utils.py'], # List with extra files paths that should be uploaded along (they will be all in the same folder)
                    python_version='3.9', # Can be 3.8 to 3.10
                    operation="Sync", # Can be Sync or Async
                    group='datarisk' # Model group (create one using the client)
                    )

sync_preprocessing.set_token('TOKEN')

result = sync_preprocessing.run({'variable' : 100})
result

Example 2: creation and deployment of an Asynchronous Preprocess script

from neomaril_codex.preprocessing import NeomarilPreprocessingClient
from neomaril_codex.model import NeomarilModelClient

client = NeomarilPreprocessingClient('123456')
PATH = './samples/asyncPreprocessing/'

async_preprocessing = client.create('Teste preprocessing Async', # model_name
                    'process', # name of the scoring function
                    PATH+'app.py', # Path of the source file
                    PATH+'requirements.txt', # Path of the requirements file,
                    # env=PATH+'.env',  #  File for env variables (this will be encrypted in the server)
                    # extra_files=[PATH+'input.csv'], # List with extra files paths that should be uploaded along (they will be all in the same folder)
                    python_version='3.9', # Can be 3.8 to 3.10
                    operation="Async", # Can be Sync or Async
                    group='datarisk', # Model group (create one using the client)
                    input_type='csv'
                    )

async_preprocessing.set_token('TOKEN')

execution = async_preprocessing.run(PATH+'input.csv')

execution.get_status()

execution.wait_ready()

execution.download_result()

Example 3: Using preprocessing with a Synchronous model

from neomaril_codex.preprocessing import NeomarilPreprocessingClient
from neomaril_codex.model import NeomarilModelClient

# the sync preprocess script configuration presented before
# ...

model_client = NeomarilModelClient('123456')

sync_model = model_client.get_model(group='datarisk', model_id='M3aa182ff161478a97f4d3b2dc0e9b064d5a9e7330174daeb302e01586b9654c')

sync_model.predict(data=sync_model.schema, preprocessing=sync_preprocessing)

Example 4: Using preprocessing with an Asynchronous model

from neomaril_codex.preprocessing import NeomarilPreprocessingClient
from neomaril_codex.model import NeomarilModelClient

# the async preprocess script configuration presented before
# ...

async_model = model_client.get_model(group='datarisk', model_id='Maa3449c7f474567b6556614a12039d8bfdad0117fec47b2a4e03fcca90b7e7c')

PATH = './samples/asyncModel/'

execution = async_model.predict(PATH+'input.csv', preprocessing=async_preprocessing)
execution.wait_ready()

execution.download_result()
create(*, preprocessing_name: str, preprocessing_reference: str, source_file: str, requirements_file: str, schema: str | dict | None = None, group: str, extra_files: list | None = None, env: str | None = None, python_version: str = '3.8', operation='Sync', input_type: str = 'json|csv|parquet', wait_for_ready: bool = True) NeomarilPreprocessing | str[source]#

Deploy a new preprocessing to Neomaril.

Parameters:
  • preprocessing_name (str) – The name of the pre processing, in less than 32 characters

  • preprocessing_reference (str) – The name of the scoring function inside the source file

  • source_file (str) – Path of the source file. The file must have a scoring function that accepts two parameters: data (data for the request body of the preprocessing) and preprocessing_path (absolute path of where the file is located)

  • requirements_file (str) – Path of the requirements file. The packages versions must be fixed eg: pandas==1.0

  • schema (Union[str, dict]) – Path to a JSON or XML file with a sample of the input for the entrypoint function. A dict with the sample input can be send as well. Mandatory for Sync preprocessing

  • group (str) – Group the pre processing is inserted. Default to ‘datarisk’ (public group)

  • extra_files (list, optional) – A optional list with additional files paths that should be uploaded. If the scoring function refer to this file they will be on the same folder as the source file

  • env (str, optional) – Flag that choose which environment (dev, staging, production) of Neomaril you are using. Default is True

  • python_version (str, optional) – Python version for the pre processing environment. Avaliable versions are 3.8, 3.9, 3.10. Defaults to ‘3.8’

  • operation (str) – Defines wich kind operation is beeing executed (Sync or Async). Default value is Sync

  • input_type (str) – The type of the input file that should be ‘json’, ‘csv’ or ‘parquet’

  • wait_for_ready (bool, optional) – Wait for preprocessing to be ready and returns a NeomarilPreprocessing instace with the new preprocessing. Defaults to True

Raises:

InputError – Some input parameters its invalid

Returns:

Returns the new preprocessing, if wait_for_ready=True runs the deploy process synchronously. If its False, returns nothing after sending all the data to server and runs the deploy asynchronously

Return type:

Union[NeomarilPreprocessing, str]

Example

>>> preprocessing = client.create('Pre processing Example Sync', 'score',  './samples/syncPreprocessing/app.py', './samples/syncPreprocessing/'preprocessing.pkl', './samples/syncPreprocessing/requirements.txt','./samples/syncPreprocessing/schema.json', group=group, operation="Sync")
get_execution(preprocessing_id: str, exec_id: str, group: str | None = None) NeomarilExecution[source]#

Get a execution instace (Async pre processing only).

Parameters:
  • preprocessing_id (str) – Pre processing id (hash)

  • exec_id (str) – Execution id

  • group (str, optional) – Group name, default value is None

Returns:

The new execution

Return type:

NeomarilExecution

Example

>>> preprocessing.get_execution( preprocessing_id='M9c3af308c754ee7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4', exec_id = '1')
get_logs(*, preprocessing_id, start: str | None = None, end: str | None = None, routine: str | None = None, type: str | None = None)[source]#

Get the logs

Parameters:
  • preprocessing_id (str) – Pre processing id (hash)

  • start (str, optional) – Date to start filter. At the format aaaa-mm-dd

  • end (str, optional) – Date to end filter. At the format aaaa-mm-dd

  • routine (str, optional) – Type of routine being executed, can assume values ‘Host’ (for deployment logs) or ‘Run’ (for execution logs)

  • type (str, optional) – Defines the type of the logs that are going to be filtered, can assume the values ‘Ok’, ‘Error’, ‘Debug’ or ‘Warning’

Raises:

ServerError – Unexpected server error

Returns:

Logs list

Return type:

json

Example

>>> preprocessing.get_logs(routine='Run')
 {'Results':
    [{'Hash': 'B4c3af308c3e452e7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4',
        'RegisteredAt': '2023-02-03T16:06:45.5955220Z',
        'OutputType': 'Ok',
        'OutputData': '',
        'Routine': 'Run'}]
 }
get_preprocessing(*, preprocessing_id: str, group: str = 'datarisk', group_token: str | None = None, wait_for_ready: bool = True) NeomarilPreprocessing[source]#

Access a pre processing using its id

Parameters:
  • preprocessing_id (str) – Pre processing id (hash) that needs to be accessed

  • group (str) – Group the pre processing is inserted. Default is ‘datarisk’ (public group)

  • group_token (str, optional) – Token for executing the pre processing (show when creating a group). It can be informed when getting the preprocessing or when running predictions, or using the env variable NEOMARIL_GROUP_TOKEN

  • wait_for_ready (bool) – If the pre processing is being deployed, wait for it to be ready instead of failing the request. Defaults to True

Raises:
Returns:

A NeomarilPreprocessing instance with the pre processing hash from preprocessing_id

Return type:

NeomarilPreprocessing

Example

>>> preprocessing.get_preprocessing(preprocessing_id='M9c3af308c754ee7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4', group='ex_group')
search_preprocessing(*, name: str | None = None, state: str | None = None, group: str | None = None, only_deployed: bool = False) list[source]#

Search for pre processing using the name of the pre processing

Parameters:
  • name (str, optional) – Text that its expected to be on the pre processing name. It runs similar to a LIKE query on SQL

  • state (str, optional) – Text that its expected to be on the state. It runs similar to a LIKE query on SQL

  • group (str, optional) – Text that its expected to be on the group name. It runs similar to a LIKE query on SQL

  • only_deployed (bool, optional) – If its True, filter only pre processing ready to be used (status == “Deployed”). Defaults to False

Raises:

ServerError – Unexpected server error

Returns:

List with the pre processing data, it can works like a filter depending on the arguments values

Return type:

list

Example

>>> client.search_preprocessing(group='ex_group', only_deployed=True)