Source code for ibm_watson_openscale.monitor_instances

# coding: utf-8

# Copyright 2020,2021 IBM All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import inspect

from typing import Tuple, Any

from ibm_cloud_sdk_core import BaseService

from ibm_watson_openscale.base_classes.tables import Table
from .utils import *
from ibm_watson_openscale.base_classes.watson_open_scale_v2 import Instances as BaseInstances
from ibm_watson_openscale.base_classes.watson_open_scale_v2 import Runs, Measurements, Metrics,ExplanationTasks, DriftService, Target
from ibm_watson_openscale.base_classes.watson_open_scale_v2 import get_sdk_headers
from ibm_watson_openscale.supporting_classes.enums import TargetTypes

if TYPE_CHECKING:
    from .client import WatsonOpenScaleV2Adapter
    from ibm_watson_openscale.base_classes.watson_open_scale_v2 import DetailedResponse, Target, \
        MonitorInstanceParameters, MetricThresholdOverride, \
        MonitorInstanceSchedule

_DEFAULT_LIST_LENGTH = 50


[docs]class MonitorInstances(BaseInstances): """ Manages Monitor Instances. """ def __init__(self, ai_client: 'WatsonOpenScaleV2Adapter') -> None: validate_type(ai_client, 'ai_client', BaseService, True) self._ai_client = ai_client self.runs = Runs(self._ai_client) self.measurements = Measurements(ai_client) self.explanation = ExplanationTasks(ai_client) self.drift = DriftService(ai_client) super().__init__(watson_open_scale=self._ai_client) ################################################ # Hidden methods from base monitor class # ################################################ @property def get_instance(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, please use " f"client.monitor_definitions.{inspect.currentframe().f_code.co_name.split('_')[0]}() instead") @property def add(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, please use " f"client.monitor_definitions.create() instead") @property def update_instance(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, please use " f"client.monitor_definitions.{inspect.currentframe().f_code.co_name.split('_')[0]}() instead") @property def delete_instance(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, please use " f"client.monitor_definitions.{inspect.currentframe().f_code.co_name.split('_')[0]}() instead") @property def list_instances(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, please use " f"client.monitor_definitions.{inspect.currentframe().f_code.co_name.split('_')[0]}() instead") @property def add_instance(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, please use " f"client.monitor_definitions.create() instead") @property def run_instance(self, *args, **kwargs) -> None: raise AttributeError( f"{inspect.currentframe().f_code.co_name}() is not implemented, please use " f"client.monitor_definitions.{inspect.currentframe().f_code.co_name.split('_')[0]}() instead") ################################################# # Overridden methods for monitor instances # #################################################
[docs] def run( self, monitor_instance_id: str, triggered_by: str = None, parameters = None, business_metric_context = None, background_mode: bool = True) -> Any: """ Trigger monitoring run. :param str monitor_instance_id: Unique monitor instance ID. :param str triggered_by: (optional) An identifier representing the source that triggered the run request (optional). One of: event, scheduler, user, webhook. :param MonitorInstanceParameters parameters: (optional) Monitoring parameters consistent with the `parameters_schema` from the monitor definition. :param MonitoringRunBusinessMetricContext business_metric_context: (optional) Properties defining the business metric context in the triggered run of AI metric calculation. :param background_mode: if set to True, run will be in asynchronous mode, if set to False it will wait for result (optional) :type background_mode: bool :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitoringRun` result A way you may use me: >>> monitor_instance_run_info = client.monitor_instances.run( background_mode=False, monitor_instance_id='997b1474-00d2-4g05-ac02-287ebfc603b5' ) """ #runs = Runs(watson_open_scale=self._ai_client) response = self.runs.add(monitor_instance_id=monitor_instance_id, triggered_by=triggered_by, parameters=parameters, business_metric_context=business_metric_context) run_id = response.result.metadata.id if background_mode: return response else: def check_state() -> dict: details = self.get_run_details(monitor_instance_id=monitor_instance_id, monitoring_run_id=run_id) return details.result.entity.status.state def get_result() -> Union[Tuple[str, Union[None, str], 'DetailedResponse']]: details = self.get_run_details(monitor_instance_id=monitor_instance_id, monitoring_run_id=run_id) state = details.result.entity.status.state if state in [StatusStateType.FINISHED]: return "Successfully finished run", None, details else: return "Run failed with status: {}".format(state), \ 'Reason: {}'.format(["code: {}, message: {}".format(error.code, error.message) for error in details.result.entity.status.failure.errors]), details return print_synchronous_run( 'Waiting for end of monitoring run {}'.format(run_id), check_state, get_result=get_result, success_states=[StatusStateType.FINISHED] )
[docs] def list(self, data_mart_id: str = None, monitor_definition_id: str = None, target_target_id: str = None, target_target_type: str = None) -> 'DetailedResponse': """ List monitor instances. :param str data_mart_id: (optional) comma-separeted list of IDs. :param str monitor_definition_id: (optional) comma-separeted list of IDs. :param str target_target_id: (optional) comma-separeted list of IDs. :param str target_target_type: (optional) comma-separeted list of types. :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitorInstanceCollection` result A way you may use me: >>> monitor_instances_info = client.monitor_instances.list( data_mart_id='997b1474-00d2-4g05-ac02-287ebfc603b5', ) """ response = super().list(data_mart_id=data_mart_id, monitor_definition_id=monitor_definition_id, target_target_id=target_target_id, target_target_type=target_target_type) return response
[docs] def show(self, limit: int = 10, data_mart_id: str = None, monitor_definition_id: str = None, target_target_id: str = None, target_target_type: str = None) -> None: """ Show monitor instances. By default 10 records will be shown. :param limit: Maximal number of fetched rows. By default set to 10. (optional) :type limit: int :param str data_mart_id: (optional) comma-separeted list of IDs. :param str monitor_definition_id: (optional) comma-separeted list of IDs. :param str target_target_id: (optional) comma-separeted list of IDs. :param str target_target_type: (optional) comma-separeted list of types. :return: None A way you might use me is: >>> client.monitor_instances.show() >>> client.monitor_instances.show(limit=20) >>> client.monitor_instances.show(20) """ validate_type(limit, u'limit', int, False) response = self.list(data_mart_id=data_mart_id,monitor_definition_id=monitor_definition_id,target_target_id=target_target_id,target_target_type=target_target_type) records = [[instances.entity.data_mart_id, instances.entity.status.state, instances.entity.target.target_id, instances.entity.target.target_type, instances.entity.monitor_definition_id, instances.metadata.created_at, instances.metadata.id] for instances in response.result.monitor_instances] columns = ['data_mart_id', 'status', 'target_id', 'target_type', 'monitor_definition_id', 'created_at', 'id'] Table(columns, records).list( limit=limit, default_limit=_DEFAULT_LIST_LENGTH, title="Monitor instances" )
[docs] def show_metrics(self, monitor_instance_id: str, limit: int = 10): """ Show metrics for monitor instance. :param monitor_instance_id: ID of the monitor instance. :type monitor_instance_id: str :param limit: maximal number of fetched rows. By default set to 10. (optional) :type limit: int :return: None A way you might use me is: >>> client.monitor_instances.show_metrics(monitor_instance_id='997b1474-00d2-4g05-ac02-287ebfc603b5') """ from datetime import timedelta, datetime start_time = datetime.now() - timedelta(days=7) end_time = datetime.now() #runs = Runs(watson_open_scale=self._ai_client) response = self.runs.list(monitor_instance_id=monitor_instance_id) records = [] measurements = Measurements(watson_open_scale=self._ai_client) for run_id in [run.metadata.id for run in response.result.runs]: response = measurements.list( monitor_instance_id=monitor_instance_id, start=start_time, end=end_time, run_id=run_id ) records.extend([[measurement.entity.timestamp, metric.id, measurement.metadata.id, metric.value, metric.lower_limit, metric.upper_limit, ['{}:{}'.format(tag.id, tag.value) for tag in value.tags], measurement.entity.monitor_definition_id, measurement.entity.monitor_instance_id, measurement.entity.run_id, measurement.entity.target.target_type, measurement.entity.target.target_id] for measurement in response.result.measurements for value in measurement.entity.values for metric in value.metrics]) columns = ['ts', 'id', 'measurement_id', 'value', 'lower_limit', 'upper_limit', 'tags', 'monitor_definition_id', 'monitor_instance_id', 'run_id', 'target_type', 'target_id'] Table(columns, records).list( limit=limit, default_limit=_DEFAULT_LIST_LENGTH, title="{} Monitor Runs Metrics from: {} till: {}".format(monitor_instance_id, start_time, end_time) )
[docs] def create(self, monitor_definition_id: str, target: 'Target', data_mart_id: str = None, parameters: 'MonitorInstanceParameters' = None, thresholds: 'List[MetricThresholdOverride]' = None, schedule: 'MonitorInstanceSchedule' = None, managed_by: str = None, training_data_stats: dict = None, background_mode: bool = True, **kwargs) -> Union['DetailedResponse', Optional[dict]]: """ Create monitor instance. :param str data_mart_id: :param str monitor_definition_id: :param Target target: :param MonitorInstanceParameters parameters: (optional) Monitoring parameters consistent with the `parameters_schema` from the monitor definition. :param list[MetricThresholdOverride] thresholds: (optional) :param MonitorInstanceSchedule schedule: (optional) The schedule used to control how frequently the target is monitored. The maximum frequency is once every 30 minutes. Defaults to once every hour if not specified. :param str managed_by: (optional) :param training_data_stats: Training statistic json generated using training stats notebook (https://github.com/IBM-Watson/aios-data-distribution/blob/master/training_statistics_notebook.ipynb) :param background_mode: if set to True, run will be in asynchronous mode, if set to False it will wait for result (optional) :type background_mode: bool :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitorInstanceResponse` result A way you might use me is: >>> from ibm_watson_openscale import * >>> target = Target( target_type=TargetTypes.SUBSCRIPTION, target_id='997b1474-00d2-4g05-ac02-287ebfc603b5' ) >>> parameters = MonitorInstanceParameters( min_feedback_data_size=50 ) >>> thresholds = [ MetricThresholdOverride( metric_id=client.monitor_definitions.MONITORS.QUALITY.METRIC.ACCURACY, type=MetricThresholdTypes.LOWER_LIMIT, value=0.7 ) ] >>> monitor_instances_info = client.monitor_instances.create( data_mart_id='997b1474-00d2-4g05-ac02-287ebfc603b5', background_mode=False, monitor_definition_id=client.monitor_definitions.MONITORS.QUALITY.ID, target=target, parameters=parameters, thresholds=thresholds ) """ if training_data_stats is None: response = super().add(data_mart_id=data_mart_id, monitor_definition_id=monitor_definition_id, target=target, parameters=parameters, thresholds=thresholds, schedule=schedule, managed_by=managed_by) monitor_instance_id = response.result.metadata.id return self.__check_for_finished_status(background_mode, monitor_instance_id) else: if len(training_data_stats) == 0: raise Exception("training_data_stats is empty. Please re-generate and use it") return self.__create_monitor_from_training_stats(data_mart_id,training_data_stats, kwargs, background_mode)
[docs] def delete(self, monitor_instance_id: str, background_mode: bool = True) -> Union['DetailedResponse', Optional[dict]]: """ Delete monitor instance. :param str monitor_instance_id: Unique monitor instance ID. :param background_mode: if set to True, run will be in asynchronous mode, if set to False it will wait for result (optional) :type background_mode: bool :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse A way you might use me is: >>> ai_client_v2.monitor_instances.delete( background_mode=False, monitor_instance_id='997b1474-00d2-4g05-ac02-287ebfc603b5' ) """ response = super().delete(monitor_instance_id=monitor_instance_id) if background_mode: return response else: def check_state() -> dict: details = self.list() if monitor_instance_id not in str(details.result): return StatusStateType.FINISHED else: return StatusStateType.ACTIVE def get_result() -> Union[Tuple[str, Union[None, str], 'DetailedResponse']]: details = self.list() if monitor_instance_id not in str(details.result): state = StatusStateType.FINISHED else: state = StatusStateType.ACTIVE if state in [StatusStateType.FINISHED]: return "Successfully finished deleting monitor instance", None, response else: return "Delete monitor instance failed", 'Reason: None', response # TODO: Need to show the reason. return print_synchronous_run( 'Waiting for end of deleting monitor instance {}'.format(monitor_instance_id), check_state, get_result=get_result, success_states=[StatusStateType.FINISHED] )
[docs] def get(self, monitor_instance_id: str) -> 'DetailedResponse': """ Get monitor instance details. :param str monitor_instance_id: Unique monitor instance ID. :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitorInstanceResponse` result A way you might use me is: >>> monitor_instance_info = client.monitor_instances.get( monitor_instance_id='997b1474-00d2-4g05-ac02-287ebfc603b5' ) """ response = super().get(monitor_instance_id=monitor_instance_id) return response
[docs] def update(self, monitor_instance_id: str, patch_document: List['PatchDocument'], update_metadata_only: bool = None) -> 'DetailedResponse': """ Update monitor instance. :param str monitor_instance_id: Unique monitor instance ID. :param List[PatchDocument] patch_document: :param bool update_metadata_only: (optional) Flag that allows to control if the underlaying actions related to the monitor reconfiguration should be triggered. :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitorInstanceResponse` result """ response = super().update(monitor_instance_id=monitor_instance_id, patch_document=patch_document, update_metadata_only=update_metadata_only) return response
def _verify_table(self, data_set_id: str = None) -> None: """ Verify if particular dataset/table exists and is loaded with any data. :param data_set_id: ID of the data set to check """ validate_type(data_set_id, 'data_set_id', str, True) if self._ai_client.data_sets.get_records_count(data_set_id=data_set_id) == 0: raise MissingPayload(self.__class__.__name__, '{} data set table is empty.'.format(data_set_id))
[docs] def get_metrics_count(self, monitor_instance_id: str, start: 'datetime', end: 'datetime', interval: str = None, filter: str = None, group: str = None): """ Get the count of the metrics. :param str monitor_instance_id: Unique monitor instance ID. :param datetime start: Calculations **inclusive**, internally floored to achieve full interval. If interval is vulnerable to time zone, the calculated value depends on a backend db engine: PostgreSQL respects time zone and DB2 use UTC time. Calculated value is returned in response. :param datetime end: Calculations **exclusive**, internally ceiled to achieve full interval. If interval is vulnerable to time zone, the calculated value depends on a backend db engine: PostgreSQL respects time zone and DB2 use UTC time. Calculated value is returned in response. :param str agg: Comma delimited function list constructed from metric name and function, e.g. `agg=metric_name:count,:last` that defines aggregations. :param str interval: (optional) Time unit in which metrics are grouped and aggregated, interval by interval. :param str filter: (optional) Filter expression can consist of any metric tag or a common column of string type followed by filter name and optionally a value, all delimited by colon. Supported filters are: `in`, `eq`, `null` and `exists`. Sample filters are: `filter=region:in:[us,pl],segment:eq:sales` or `filter=region:null,segment:exists`. :param str group: (optional) Comma delimited list constructed from metric tags, e.g. `group=region,segment` to group metrics before aggregations. :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `DataMartGetMonitorInstanceMetrics` result A way you might use me is: >>> metrics_count = client.monitor_instances.get_metrics_count( monitor_instance_id='997b1474-00d2-4g05-ac02-287ebfc603b5', start=start_time, end=end_time, ) """ metrics = Metrics(self._ai_client) response = metrics.list(monitor_instance_id=monitor_instance_id, start=start, end=end, agg='count', interval=interval, filter=filter, group=group) return response
[docs] def get_metrics(self, monitor_instance_id: str, start: 'datetime', end: 'datetime', interval: str = None, filter: str = None, group: str = None): """ Get the count of the metrics. :param str monitor_instance_id: Unique monitor instance ID. :param datetime start: Calculations **inclusive**, internally floored to achieve full interval. If interval is vulnerable to time zone, the calculated value depends on a backend db engine: PostgreSQL respects time zone and DB2 use UTC time. Calculated value is returned in response. :param datetime end: Calculations **exclusive**, internally ceiled to achieve full interval. If interval is vulnerable to time zone, the calculated value depends on a backend db engine: PostgreSQL respects time zone and DB2 use UTC time. Calculated value is returned in response. :param str agg: Comma delimited function list constructed from metric name and function, e.g. `agg=metric_name:count,:last` that defines aggregations. :param str interval: (optional) Time unit in which metrics are grouped and aggregated, interval by interval. :param str filter: (optional) Filter expression can consist of any metric tag or a common column of string type followed by filter name and optionally a value, all delimited by colon. Supported filters are: `in`, `eq`, `null` and `exists`. Sample filters are: `filter=region:in:[us,pl],segment:eq:sales` or `filter=region:null,segment:exists`. :param str group: (optional) Comma delimited list constructed from metric tags, e.g. `group=region,segment` to group metrics before aggregations. :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `DataMartGetMonitorInstanceMetrics` result A way you might use me is: >>> metrics_count = client.monitor_instances.get_metrics_count( monitor_instance_id='997b1474-00d2-4g05-ac02-287ebfc603b5', start=start_time, end=end_time, ) """ metrics = Metrics(self._ai_client) response = metrics.list(monitor_instance_id=monitor_instance_id, start=start, end=end, agg='last', interval=interval, filter=filter, group=group) return response
[docs] def upload_drift_model(self, model_path: str, data_mart_id: str, subscription_id: str, archive_name: str = "user_drift.tar.gz", enable_data_drift: bool = True, enable_model_drift: bool = True) -> 'DetailedResponse': """ Upload a Drift model to be able to compute Drift monitor. :param str model_path: (required) path to the drift model :param str data_mart_id: (required) data_mart ID :param str subscription_id: (required) subscription ID :param str archive_name: (optional) Archive name to use while storing tarball in datamart :param str enable_data_drift: (optional) If data drift is expected to be enabled for this subscription. If set to True, archive is verified to contain a valid data constraints json file. :param str enable_model_drift: (optional) If model drift is expected to be enabled for this subscription. If set to True, archive is verified to contain a valid drift model pickle file. :return: A `DetailedResponse` containing the result, headers and HTTP status code. A way you might use me is: >>> client.monitor_instances.upload_drift_model( model_path='drift_models/drift_detection_model.tar.gz', data_mart_id='997b1474-00d2-4g05-ac02-287ebfc603b5', subscription_id='997b1474-00d2-4g05-ac02-287ebfc603b5', ) """ drift_model = open(model_path, mode="rb").read() response = self.drift.drift_archive_post(data_mart_id, subscription_id, drift_model, archive_name=archive_name, enable_data_drift=enable_data_drift, enable_model_drift=enable_model_drift) return response
[docs] def download_drift_model(self, monitor_instance_id: str) -> "DetailedResponse": """ Downloads the model. :param str monitor_instance_id: (required) The Drift monitor instance ID. :return: A `DetailedResponse` containing the result, headers and HTTP status code. A way you might use me is: >>> client.monitor_instances.download_drift_model( monitor_instance_id="0a9a4b85-f2bb-46de-850b-f033d7d37d9a" ) """ response = self.drift.drift_archive_get(monitor_instance_id) return response
[docs] def download_drift_model_metadata(self, monitor_instance_id: str) -> "DetailedResponse": """ Downloads the model metadata. :param str monitor_instance_id: (required) The Drift monitor instance ID. :return: A `DetailedResponse` containing the result, headers and HTTP status code. A way you might use me is: >>> client.monitor_instances.download_drift_model_metadata( monitor_instance_id="0a9a4b85-f2bb-46de-850b-f033d7d37d9a" ) """ response = self.drift.drift_archive_head(monitor_instance_id) return response
[docs] def list_runs(self, monitor_instance_id: str, start: str = None,limit: int = None): """ List monitoring runs. List monitoring runs. :param str monitor_instance_id: Unique monitor instance ID. :param str start: (optional) The page token indicating where to start paging from. :param int limit: (optional) The limit of the number of items to return, for example limit=50. If not specified a default of 100 will be used. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitoringRunCollection` result """ #runs = Runs(watson_open_scale=self._ai_client) return self.runs.list(monitor_instance_id = monitor_instance_id, start=start,limit=limit)
[docs] def get_run_details(self, monitor_instance_id: str, monitoring_run_id: str, **kwargs ): """ Get monitoring run details. :param str monitor_instance_id: Unique monitor instance ID. :param str monitoring_run_id: Unique monitoring run ID. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitoringRun` result """ #runs = Runs(watson_open_scale=self._ai_client) return self.runs.get(monitor_instance_id=monitor_instance_id, monitoring_run_id=monitoring_run_id)
[docs] def explanation_tasks(self, scoring_ids: List[str], input_rows: List[dict] = None, explanation_types: List[str] = None,**kwargs ): """ Compute explanations. Submit tasks for computing explanation of predictions. :param List[str] scoring_ids: IDs of the scoring transaction. :param List[dict] input_rows: (optional) List of scoring transactions. :param List[str] explanation_types: (optional) Types of explanations to generate. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `PostExplanationTaskResponse` result """ return self.explanation.add(scoring_ids=scoring_ids, input_rows=input_rows, explanation_types=explanation_types, **kwargs)
[docs] def get_explanation_tasks(self, explanation_task_id: str, **kwargs ): """ Get explanation. Get explanation for the given explanation task id. :param str explanation_task_id: ID of the explanation task. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `GetExplanationTaskResponse` result """ return self.explanation.get(explanation_task_id=explanation_task_id)
[docs] def get_measurement_details(self, monitor_instance_id: str,measurement_id: str, **kwargs ): """ Get Measurement details. Get Measurement info for the given measurement and monitor instance id. :param str monitor_instance_id: ID of the monitor instance. :param str measurement_id: ID of the measurement. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitorMeasurementResponse` result """ return self.measurements.get(monitor_instance_id=monitor_instance_id, measurement_id=measurement_id, **kwargs)
#private method to check for finished state def __check_for_finished_status(self, background_mode, monitor_instance_id): if background_mode: return response else: def check_state() -> dict: details = self.get(monitor_instance_id=monitor_instance_id) return details.result.entity.status.state def get_result() -> Union[Tuple[str, Union[None, str], 'DetailedResponse']]: details = self.get(monitor_instance_id=monitor_instance_id) state = details.result.entity.status.state if state in [StatusStateType.ACTIVE]: return "Monitor instance successfully created", None, details else: return "Monitor instance creation failed with status: {}".format(state), \ 'Reason: {}'.format(["code: {}, message: {}".format(error.code, error.message) for error in details.result.entity.status.failure.errors]), details return print_synchronous_run( 'Waiting for end of monitor instance creation {}'.format(monitor_instance_id), check_state, get_result=get_result, success_states=[StatusStateType.ACTIVE] ) #private method to create monitors from training stats def __create_monitor_from_training_stats(self, data_mart_id, training_stats_info, params, background_mode): subscription_id = params.get("subscription_id") validate_type(data_mart_id, 'data_mart_id', str, True) validate_type(subscription_id, 'subscription_id', str, True) responses = {} if 'fairness_configuration' in training_stats_info: target = Target( target_type=TargetTypes.SUBSCRIPTION, target_id=subscription_id ) parameters = training_stats_info["fairness_configuration"]["parameters"] parameters["model_type"] = training_stats_info['common_configuration']['problem_type'] thresholds = training_stats_info["fairness_configuration"]["thresholds"] response = super().add( data_mart_id=data_mart_id, background_mode=False, monitor_definition_id=self._ai_client.monitor_definitions.MONITORS.FAIRNESS.ID, target=target, parameters=parameters, thresholds=thresholds) mon_resp = self.__check_for_finished_status(background_mode, response.result.metadata.id) responses[self._ai_client.monitor_definitions.MONITORS.FAIRNESS.ID] = mon_resp.result.to_dict() if 'explainability_configuration' in training_stats_info: target = Target( target_type=TargetTypes.SUBSCRIPTION, target_id=subscription_id ) parameters = { "enabled": True, "training_statistics": training_stats_info["explainability_configuration"] } response = super().add( data_mart_id=data_mart_id, background_mode=False, monitor_definition_id=self._ai_client.monitor_definitions.MONITORS.EXPLAINABILITY.ID, target=target, parameters=parameters ) mon_resp = self.__check_for_finished_status(background_mode, response.result.metadata.id) responses[self._ai_client.monitor_definitions.MONITORS.EXPLAINABILITY.ID] = mon_resp.result.to_dict() return responses