#!/usr/bin/env python
# coding: utf-8
import io
import json
import os
import time
from time import sleep
from typing import Optional, Union
import requests
from neomaril_codex.__utils import *
from neomaril_codex.base import *
from neomaril_codex.exceptions import *
[docs]class NeomarilPreprocessing(BaseNeomaril):
"""
Class to manage Preprocessing scripts deployed inside Neomaril
Attributes
----------
login : str
Login for authenticating with the client. You can also use the env variable NEOMARIL_USER to set this
password : str
Password for authenticating with the client. You can also use the env variable NEOMARIL_PASSWORD to set this
preprocessing_id : str
Preprocessing script id (hash) from the script you want to access
group : str
Group the model is inserted. Default is 'datarisk' (public group)
base_url : str
URL to Neomaril Server. Default value is https://neomaril.staging.datarisk.net, use it to test your deployment first before changing to production. You can also use the env variable NEOMARIL_URL to set this
Example
--------
Getting a model, testint its healthy and putting it to run the prediction
.. code-block:: python
from neomaril_codex.preprocessing import NeomarilPreprocessingClient
from neomaril_codex.model import NeomarilModelClient
client = NeomarilPreprocessingClient('123456')
client.search_preprocessing()
preprocessing = client.get_preprocessing(preprocessing_id='S72110d87c2a4341a7ef0a0cb35e483699db1df6c5d2450f92573c093c65b062', group='ex_group')
"""
def __init__(
self,
*,
preprocessing_id: str,
login: Optional[str] = None,
password: Optional[str] = None,
group: str = "datarisk",
group_token: Optional[str] = None,
url: str = "https://neomaril.staging.datarisk.net/",
) -> None:
super().__init__(login=login, password=password, url=url)
self.preprocessing_id = preprocessing_id
self.group = group
self.__token = group_token if group_token else os.getenv("NEOMARIL_GROUP_TOKEN")
url = f"{self.base_url}/preprocessing/describe/{group}/{preprocessing_id}"
response = requests.get(
url,
headers={
"Authorization": "Bearer "
+ refresh_token(*self.credentials, self.base_url)
},
)
result = response.json()['Description']
self.operation = result.get("Operation").lower()
response = self.__get_status()
self.status = response.get("Status")
self.__preprocessing_ready = self.status == "Deployed"
def __repr__(self) -> str:
return f"""NeomarilPreprocessing, group="{self.group}",
status="{self.status}",
preprocessing_id="{self.preprocessing_id}",
operation="{self.operation.title()}",
)"""
def __str__(self):
return f'NEOMARIL preprocessing (Group: {self.group}, Id: {self.preprocessing_id})"'
[docs] def wait_ready(self):
"""
Waits the pre processing to be with status 'Deployed'
Example
-------
>>> preprocessing.wait_ready()
"""
if self.status in ["Ready", "Building"]:
self.status = self.__get_status()["Status"]
while self.status == "Building":
sleep(30)
self.status = self.__get_status()["Status"]
[docs] def get_logs(
self,
*,
start: Optional[str] = None,
end: Optional[str] = None,
routine: Optional[str] = None,
type: Optional[str] = None,
):
"""
Get the logs
Parameters
-----------
start : str, optional
Date to start filter. At the format aaaa-mm-dd
end : str, optional
Date to end filter. At the format aaaa-mm-dd
routine : str, optional
Type of routine beeing executed, can assume values Host or Run
type : str, optional
Defines the type of the logs that are going to be filtered, can assume the values Ok, Error, Debug or Warning
Raises
------
ServerError
Unexpected server error
Returns
-------
json
Logs list
Example
-------
>>> preprocessing.get_logs(start='2023-01-31', end='2023-02-24', routine='Run', type='Error')
{'Results':
[{'Hash': 'M9c3af308c754ee7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4',
'RegisteredAt': '2023-01-31T16:06:45.5955220Z',
'OutputType': 'Error',
'OutputData': '',
'Routine': 'Run'}]
}
"""
url = f"{self.base_url}/preprocessing/logs/{self.group}/{self.preprocessing_id}"
return self._logs(
url=url,
credentials=self.credentials,
start=start,
end=end,
routine=routine,
type=type,
)
[docs] def set_token(self, group_token: str) -> None:
"""
Saves the group token for this pre processing instance.
Arguments
---------
group_token : str
Token for executing the pre processing (show when creating a group). You can set this using the NEOMARIL_GROUP_TOKEN env variable
Example
-------
>>> preprocessing.set_token('6cb64889a45a45ea8749881e30c136df')
"""
self.__token = group_token
logger.info(f"Token for group {self.group} added.")
[docs] def run(
self,
*,
data: Union[dict, str],
group_token: Optional[str] = None,
wait_complete: Optional[bool] = False,
) -> Union[dict, NeomarilExecution]:
"""
Runs a prediction from the current pre processing.
Arguments
---------
data : Union[dict, str]
The same data that is used in the source file.
If Sync is a dict, the keys that are needed inside this dict are the ones in the `schema` atribute.
If Async is a string with the file path with the same filename used in the source file.
group_token : str, optional
Token for executing the pre processing (show when creating a group). It can be informed when getting the preprocessing or when running predictions, or using the env variable NEOMARIL_GROUP_TOKEN
wait_complete: bool, optional
Boolean that informs if a pre processing training is completed (True) or not (False). Default value is False
Raises
------
PreprocessingError
Pre processing is not available
Returns
-------
Union[dict, NeomarilExecution]
The return of the scoring function in the source file for Sync pre processing or the execution class for Async pre processing.
"""
if self.__preprocessing_ready:
if (group_token is not None) | (self.__token is not None):
url = f"{self.base_url}/preprocessing/{self.operation}/run/{self.group}/{self.preprocessing_id}"
if self.__token and not group_token:
group_token = self.__token
if group_token and not self.__token:
self.__token = group_token
if self.operation == "sync":
preprocessing_input = {"Input": data}
req = requests.post(
url,
data=json.dumps(preprocessing_input),
headers={"Authorization": "Bearer " + group_token},
)
return req.json()
elif self.operation == "async":
files = {
"dataset": open(data, "rb"),
}
req = requests.post(
url,
files=files,
headers={"Authorization": "Bearer " + group_token},
)
# TODO: Shouldn't both sync and async preprocessing have the same succeeded status code?
if req.status_code == 202 or req.status_code == 200:
message = req.json()
logger.info(message["Message"])
exec_id = message["ExecutionId"]
run = NeomarilExecution(
parent_id=self.preprocessing_id,
exec_type="AsyncPreprocessing",
exec_id=exec_id,
login=self.credentials[0],
password=self.credentials[1],
url=self.base_url,
group=self.group,
group_token=group_token,
)
response = run.get_status()
status = response["Status"]
if wait_complete:
print("Waiting the training run.", end="")
while status in ["Running", "Requested"]:
sleep(30)
print(".", end="", flush=True)
response = run.get_status()
status = response["Status"]
if status == "Failed":
logger.error(response["Message"])
raise ExecutionError("Training execution failed")
else:
raise ServerError(req.text)
else:
raise AuthenticationError("Group token not informed")
return run
# else:
# url = f"{self.base_url}/preprocessing/describe/{self.group}/{self.preprocessing_id}"
# response = requests.get(url, headers={'Authorization': 'Bearer ' + self.credentials}).json()['Description']
# if response['Status'] == "Deployed":
# self.preprocessing_data = response
# self.status = response['Status']
# self.__preprocessing_ready = True
# return self.predict(data)
# else:
# raise PreprocessingError('Pre processing is not available to predictions')
[docs] def get_preprocessing_execution(self, exec_id: str) -> NeomarilExecution:
"""
Get a execution instance for that preprocessing.
Arguments
---------
exec_id: str
Execution id
Raises
------
PreprocessingError
If the user tries to get a execution from a Sync preprocessing
Example
-------
>>> preprocessing.get_preprocessing_execution('1')
"""
if self.operation == "async":
return NeomarilExecution(
parent_id=self.preprocessing_id,
exec_type="AsyncPreprocessing",
exec_id=exec_id,
login=self.credentials[0],
password=self.credentials[1],
url=self.base_url,
group_token=self.__token,
group=self.group,
)
raise PreprocessingError("Sync pre processing don't have executions")
def __get_status(self):
"""
Gets the status of the preprocessing.
Raises
-------
PreprocessingError
Execution unavailable
Returns
-------
str
The preprocessing status
"""
url = (
f"{self.base_url}/preprocessing/status/{self.group}/{self.preprocessing_id}"
)
response = requests.get(
url,
headers={
"Authorization": "Bearer "
+ refresh_token(*self.credentials, self.base_url)
},
)
if response.status_code == 200:
return response.json()
else:
raise PreprocessingError(response.text)
[docs]class NeomarilPreprocessingClient(BaseNeomarilClient):
"""
Class for client to access Neomaril and manage Preprocessing scripts
Attributes
----------
login : str
Login for authenticating with the client. You can also use the env variable NEOMARIL_USER to set this
password : str
Password for authenticating with the client. You can also use the env variable NEOMARIL_PASSWORD to set this
url : str
URL to Neomaril Server. Default value is https://neomaril.staging.datarisk.net, use it to test your deployment first before changing to production. You can also use the env variable NEOMARIL_URL to set this
Raises
------
AuthenticationError
Unvalid credentials
ServerError
Server unavailable
Example
--------
Example 1: Creation and managing a Synchronous Preprocess script
.. code-block:: python
from neomaril_codex.preprocessing import NeomarilPreprocessingClient
from neomaril_codex.model import NeomarilModelClient
client = NeomarilPreprocessingClient('123456')
PATH = './samples/syncPreprocessing/'
sync_preprocessing = client.create('Teste preprocessing Sync', # model_name
'process', # name of the scoring function
PATH+'app.py', # Path of the source file
PATH+'requirements.txt', # Path of the requirements file,
schema=PATH+'schema.json', # Path of the schema file, but it could be a dict (only required for Sync models)
# env=PATH+'.env' # File for env variables (this will be encrypted in the server)
# extra_files=[PATH+'utils.py'], # List with extra files paths that should be uploaded along (they will be all in the same folder)
python_version='3.9', # Can be 3.8 to 3.10
operation="Sync", # Can be Sync or Async
group='datarisk' # Model group (create one using the client)
)
sync_preprocessing.set_token('TOKEN')
result = sync_preprocessing.run({'variable' : 100})
result
Example 2: creation and deployment of an Asynchronous Preprocess script
.. code-block:: python
from neomaril_codex.preprocessing import NeomarilPreprocessingClient
from neomaril_codex.model import NeomarilModelClient
client = NeomarilPreprocessingClient('123456')
PATH = './samples/asyncPreprocessing/'
async_preprocessing = client.create('Teste preprocessing Async', # model_name
'process', # name of the scoring function
PATH+'app.py', # Path of the source file
PATH+'requirements.txt', # Path of the requirements file,
# env=PATH+'.env', # File for env variables (this will be encrypted in the server)
# extra_files=[PATH+'input.csv'], # List with extra files paths that should be uploaded along (they will be all in the same folder)
python_version='3.9', # Can be 3.8 to 3.10
operation="Async", # Can be Sync or Async
group='datarisk', # Model group (create one using the client)
input_type='csv'
)
async_preprocessing.set_token('TOKEN')
execution = async_preprocessing.run(PATH+'input.csv')
execution.get_status()
execution.wait_ready()
execution.download_result()
Example 3: Using preprocessing with a Synchronous model
.. code-block:: python
from neomaril_codex.preprocessing import NeomarilPreprocessingClient
from neomaril_codex.model import NeomarilModelClient
# the sync preprocess script configuration presented before
# ...
model_client = NeomarilModelClient('123456')
sync_model = model_client.get_model(group='datarisk', model_id='M3aa182ff161478a97f4d3b2dc0e9b064d5a9e7330174daeb302e01586b9654c')
sync_model.predict(data=sync_model.schema, preprocessing=sync_preprocessing)
Example 4: Using preprocessing with an Asynchronous model
.. code-block:: python
from neomaril_codex.preprocessing import NeomarilPreprocessingClient
from neomaril_codex.model import NeomarilModelClient
# the async preprocess script configuration presented before
# ...
async_model = model_client.get_model(group='datarisk', model_id='Maa3449c7f474567b6556614a12039d8bfdad0117fec47b2a4e03fcca90b7e7c')
PATH = './samples/asyncModel/'
execution = async_model.predict(PATH+'input.csv', preprocessing=async_preprocessing)
execution.wait_ready()
execution.download_result()
"""
def __init__(
self,
*,
login: Optional[str] = None,
password: Optional[str] = None,
url: str = "https://neomaril.staging.datarisk.net/",
) -> None:
super().__init__(login=login, password=password, url=url)
def __get_preprocessing_status(self, *, preprocessing_id: str, group: str) -> dict:
"""
Gets the status of the pre processing with the hash equal to `preprocessing_id`
Parameters
----------
group : str
Group the pre processing is inserted
preprocessing_id : str
Pre processing id (hash) from the pre processing being searched
Raises
------
PreprocessingError
Pre processing unavailable
Returns
-------
dict
The pre processing status and a message if the status is 'Failed'
"""
url = f"{self.base_url}/preprocessing/status/{group}/{preprocessing_id}"
response = requests.get(
url=url,
headers={
"Authorization": "Bearer "
+ refresh_token(*self.credentials, self.base_url)
},
timeout=60,
)
if response.status_code not in [200, 410]:
raise PreprocessingError(f'Preprocessing "{preprocessing_id}" not found')
return response.json()
[docs] def get_preprocessing(
self,
*,
preprocessing_id: str,
group: str = "datarisk",
group_token: Optional[str] = None,
wait_for_ready: bool = True,
) -> NeomarilPreprocessing:
"""
Access a pre processing using its id
Arguments
---------
preprocessing_id : str
Pre processing id (hash) that needs to be accessed
group : str
Group the pre processing is inserted. Default is 'datarisk' (public group)
group_token : str, optional
Token for executing the pre processing (show when creating a group). It can be informed when getting the preprocessing or when running predictions, or using the env variable NEOMARIL_GROUP_TOKEN
wait_for_ready : bool
If the pre processing is being deployed, wait for it to be ready instead of failing the request. Defaults to True
Raises
------
PreprocessingError
Pre processing unavailable
ServerError
Unknown return from server
Returns
-------
NeomarilPreprocessing
A NeomarilPreprocessing instance with the pre processing hash from `preprocessing_id`
Example
-------
>>> preprocessing.get_preprocessing(preprocessing_id='M9c3af308c754ee7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4', group='ex_group')
"""
try:
response = self.__get_preprocessing_status(
preprocessing_id=preprocessing_id, group=group
)
except KeyError:
raise PreprocessingError("Preprocessing not found")
status = response["Status"]
if status == "Building":
if wait_for_ready:
print("Waiting for deploy to be ready.", end="")
while status == "Building":
response = self.__get_preprocessing_status(
preprocessing_id=preprocessing_id, group=group
)
status = response["Status"]
print(".", end="", flush=True)
sleep(10)
else:
logger.info("Returning preprocessing, but preprocessing is not ready.")
NeomarilPreprocessing(
preprocessing_id=preprocessing_id,
login=self.credentials[0],
password=self.credentials[1],
group=group,
url=self.base_url,
group_token=group_token,
)
if status in ["Disabled", "Ready"]:
raise PreprocessingError(
f'Preprocessing "{preprocessing_id}" unavailable (disabled or deploy process is incomplete)'
)
elif status == "Failed":
logger.error(str(response["Message"]))
raise PreprocessingError(
f'Preprocessing "{preprocessing_id}" deploy failed, so preprocessing is unavailable.'
)
elif status == "Deployed":
logger.info(
f"Preprocessing {preprocessing_id} its deployed. Fetching preprocessing."
)
return NeomarilPreprocessing(
preprocessing_id=preprocessing_id,
login=self.credentials[0],
password=self.credentials[1],
group=group,
url=self.base_url,
group_token=group_token,
)
else:
raise ServerError("Unknown preprocessing status: ", status)
[docs] def search_preprocessing(
self,
*,
name: Optional[str] = None,
state: Optional[str] = None,
group: Optional[str] = None,
only_deployed: bool = False,
) -> list:
"""
Search for pre processing using the name of the pre processing
Arguments
---------
name : str, optional
Text that its expected to be on the pre processing name. It runs similar to a LIKE query on SQL
state : str, optional
Text that its expected to be on the state. It runs similar to a LIKE query on SQL
group : str, optional
Text that its expected to be on the group name. It runs similar to a LIKE query on SQL
only_deployed : bool, optional
If its True, filter only pre processing ready to be used (status == "Deployed"). Defaults to False
Raises
------
ServerError
Unexpected server error
Returns
-------
list
List with the pre processing data, it can works like a filter depending on the arguments values
Example
-------
>>> client.search_preprocessing(group='ex_group', only_deployed=True)
"""
url = f"{self.base_url}/preprocessing/search"
query = {}
if name:
query["name"] = name
if state:
query["state"] = state
if group:
query["group"] = group
if only_deployed:
query["state"] = "Deployed"
response = requests.get(
url,
params=query,
headers={
"Authorization": "Bearer "
+ refresh_token(*self.credentials, self.base_url)
},
)
if response.status_code == 200:
results = response.json()["Results"]
return results
elif response.status_code == 401:
logger.error(response.text)
raise AuthenticationError("Login not authorized")
elif response.status_code >= 500:
logger.error(response.text)
raise ServerError("Server Error")
else:
logger.error(response.text)
raise PreprocessingError("Could not search the preprocessing script")
[docs] def get_logs(
self,
*,
preprocessing_id,
start: Optional[str] = None,
end: Optional[str] = None,
routine: Optional[str] = None,
type: Optional[str] = None,
):
"""
Get the logs
Parameters
----------
preprocessing_id : str
Pre processing id (hash)
start : str, optional
Date to start filter. At the format aaaa-mm-dd
end : str, optional
Date to end filter. At the format aaaa-mm-dd
routine : str, optional
Type of routine being executed, can assume values 'Host' (for deployment logs) or 'Run' (for execution logs)
type : str, optional
Defines the type of the logs that are going to be filtered, can assume the values 'Ok', 'Error', 'Debug' or 'Warning'
Raises
------
ServerError
Unexpected server error
Returns
-------
json
Logs list
Example
-------
>>> preprocessing.get_logs(routine='Run')
{'Results':
[{'Hash': 'B4c3af308c3e452e7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4',
'RegisteredAt': '2023-02-03T16:06:45.5955220Z',
'OutputType': 'Ok',
'OutputData': '',
'Routine': 'Run'}]
}
"""
url = f"{self.base_url}/preprocessing/logs/{preprocessing_id}"
return self._logs(
url=url,
credentials=self.credentials,
start=start,
end=end,
routine=routine,
type=type,
)
def __upload_preprocessing(
self,
*,
preprocessing_name: str,
preprocessing_reference: str,
source_file: str,
requirements_file: str,
schema: Optional[Union[str, dict]] = None,
group: Optional[str],
extra_files: Optional[list] = None,
env: Optional[str] = None,
python_version: str = "3.8",
operation: str = "Sync",
input_type: str = None,
) -> str:
"""
Upload the files to the server
Arguments
---------
preprocessing_name : str
The name of the pre processing, in less than 32 characters
preprocessing_reference : str
The name of the scoring function inside the source file
source_file : str
Path of the source file. The file must have a scoring function that accepts two parameters: data (data for the request body of the preprocessing) and preprocessing_path (absolute path of where the file is located)
requirements_file : str
Path of the requirements file. The packages versions must be fixed eg: pandas==1.0
schema : Union[str, dict], optional
Path to a JSON or XML file with a sample of the input for the entrypoint function. A dict with the sample input can be send as well
group : str, optional
Group the pre processing is inserted. If None the server uses 'datarisk' (public group)
extra_files : list, optional
A optional list with additional files paths that should be uploaded. If the scoring function refer to this file they will be on the same folder as the source file
env : str, optional
Flag that choose which environment (dev, staging, production) of Neomaril you are using. Default is True
python_version : str, optional
Python version for the pre processing environment. Avaliable versions are 3.8, 3.9, 3.10. Defaults to '3.8'
operation : str
Defines wich kind operation is beeing executed (Sync or Async). Default value is Sync
input_type : str
The type of the input file that should be 'json', 'csv' or 'parquet'
Raises
------
InputError
Some input parameters its invalid
Returns
-------
str
The new pre processing id (hash)
"""
url = f"{self.base_url}/preprocessing/register/{group}"
file_extesions = {"py": "script.py", "ipynb": "notebook.ipynb"}
upload_data = [
(
"source",
(file_extesions[source_file.split(".")[-1]], open(source_file, "rb")),
),
("requirements", ("requirements.txt", open(requirements_file, "rb"))),
]
if operation == "Sync":
input_type = "json"
if schema:
if isinstance(schema, str):
schema_file = open(schema, "rb")
elif isinstance(schema, dict):
schema_file = json.dumps(schema)
upload_data.append(("schema", (schema.split("/")[-1], schema_file)))
else:
raise InputError(
"Schema file is mandatory for preprocessing, choose a input type from json, parquet or csv"
)
if env:
upload_data.append(("env", (".env", open(env, "r"))))
if extra_files:
extra_data = [
("extra", (c.split("/")[-1], open(c, "rb"))) for c in extra_files
]
upload_data += extra_data
form_data = {
"name": preprocessing_name,
"script_reference": preprocessing_reference,
"operation": operation,
"python_version": "Python" + python_version.replace(".", ""),
}
response = requests.post(
url,
data=form_data,
files=upload_data,
headers={
"Authorization": "Bearer "
+ refresh_token(*self.credentials, self.base_url)
},
)
if response.status_code == 201:
data = response.json()
preprocessing_id = data["Hash"]
logger.info(
f'{data["Message"]} - Hash: "{preprocessing_id}" with response {response.text}'
)
return preprocessing_id
elif response.status_code == 401:
logger.error(response.text)
raise AuthenticationError("Login not authorized")
elif response.status_code >= 500:
logger.error(response.text)
raise ServerError("Server Error")
else:
logger.error("Upload error: " + response.text)
raise InputError("Invalid parameters for preprocessing creation")
def __host_preprocessing(
self, *, operation: str, preprocessing_id: str, group: str
) -> None:
"""
Builds the preprocessing execution environment
Arguments
---------
operation : str
The pre processing operation type (Sync or Async)
preprocessing_id : str
The uploaded pre processing id (hash)
group : str
Group the pre processing is inserted. Default is 'datarisk' (public group)
Raises
------
InputError
Some input parameters its invalid
"""
url = (
f"{self.base_url}/preprocessing/{operation}/host/{group}/{preprocessing_id}"
)
response = requests.get(
url,
headers={
"Authorization": "Bearer "
+ refresh_token(*self.credentials, self.base_url)
},
)
if response.status_code == 202:
logger.info(f"Preprocessing host in process - Hash: {preprocessing_id}")
elif response.status_code == 401:
logger.error(response.text)
raise AuthenticationError("Login not authorized")
elif response.status_code >= 500:
logger.error(response.text)
raise ServerError("Server Error")
else:
logger.error(response.text)
raise InputError("Invalid parameters for preprocessing creation")
[docs] def create(
self,
*,
preprocessing_name: str,
preprocessing_reference: str,
source_file: str,
requirements_file: str,
schema: Optional[Union[str, dict]] = None,
group: str,
extra_files: Optional[list] = None,
env: Optional[str] = None,
python_version: str = "3.8",
operation="Sync",
input_type: str = "json|csv|parquet",
wait_for_ready: bool = True,
) -> Union[NeomarilPreprocessing, str]:
"""
Deploy a new preprocessing to Neomaril.
Arguments
---------
preprocessing_name : str
The name of the pre processing, in less than 32 characters
preprocessing_reference : str
The name of the scoring function inside the source file
source_file : str
Path of the source file. The file must have a scoring function that accepts two parameters: data (data for the request body of the preprocessing) and preprocessing_path (absolute path of where the file is located)
requirements_file : str
Path of the requirements file. The packages versions must be fixed eg: pandas==1.0
schema : Union[str, dict]
Path to a JSON or XML file with a sample of the input for the entrypoint function. A dict with the sample input can be send as well. Mandatory for Sync preprocessing
group : str
Group the pre processing is inserted. Default to 'datarisk' (public group)
extra_files : list, optional
A optional list with additional files paths that should be uploaded. If the scoring function refer to this file they will be on the same folder as the source file
env : str, optional
Flag that choose which environment (dev, staging, production) of Neomaril you are using. Default is True
python_version : str, optional
Python version for the pre processing environment. Avaliable versions are 3.8, 3.9, 3.10. Defaults to '3.8'
operation : str
Defines wich kind operation is beeing executed (Sync or Async). Default value is Sync
input_type : str
The type of the input file that should be 'json', 'csv' or 'parquet'
wait_for_ready : bool, optional
Wait for preprocessing to be ready and returns a NeomarilPreprocessing instace with the new preprocessing. Defaults to True
Raises
------
InputError
Some input parameters its invalid
Returns
-------
Union[NeomarilPreprocessing, str]
Returns the new preprocessing, if wait_for_ready=True runs the deploy process synchronously. If its False, returns nothing after sending all the data to server and runs the deploy asynchronously
Example
-------
>>> preprocessing = client.create('Pre processing Example Sync', 'score', './samples/syncPreprocessing/app.py', './samples/syncPreprocessing/'preprocessing.pkl', './samples/syncPreprocessing/requirements.txt','./samples/syncPreprocessing/schema.json', group=group, operation="Sync")
"""
if python_version not in ["3.8", "3.9", "3.10"]:
raise InputError(
"Invalid python version. Avaliable versions are 3.8, 3.9, 3.10"
)
if group:
group = (
group.lower()
.strip()
.replace(" ", "_")
.replace(".", "_")
.replace("-", "_")
)
groups = groups = [g.get("Name") for g in self.list_groups()]
if group not in groups:
raise GroupError("Group dont exist. Create a group first.")
else:
group = "datarisk"
logger.info("Group not informed, using default 'datarisk' group")
preprocessing_id = self.__upload_preprocessing(
preprocessing_name=preprocessing_name,
preprocessing_reference=preprocessing_reference,
source_file=source_file,
requirements_file=requirements_file,
schema=schema,
group=group,
extra_files=extra_files,
python_version=python_version,
env=env,
operation=operation,
input_type=input_type,
)
self.__host_preprocessing(
operation=operation.lower(), preprocessing_id=preprocessing_id, group=group
)
time.sleep(1)
return self.get_preprocessing(
preprocessing_id=preprocessing_id,
group=group,
wait_for_ready=wait_for_ready,
)
[docs] def get_execution(
self, preprocessing_id: str, exec_id: str, group: Optional[str] = None
) -> NeomarilExecution:
"""
Get a execution instace (Async pre processing only).
Arguments
---------
preprocessing_id : str
Pre processing id (hash)
exec_id : str
Execution id
group : str, optional
Group name, default value is None
Returns
-------
NeomarilExecution
The new execution
Example
-------
>>> preprocessing.get_execution( preprocessing_id='M9c3af308c754ee7b96b2f4a273984414d40a33be90242908f9fc4aa28ba8ec4', exec_id = '1')
"""
return self.get_preprocessing(
preprocessing_id=preprocessing_id, group=group
).get_preprocessing_execution(exec_id)