# coding: utf-8
# Copyright 2020,2021 IBM All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Tuple
from ibm_cloud_sdk_core import BaseService
from ibm_watson_openscale.base_classes.tables import Table
from .utils import *
from ibm_watson_openscale.base_classes.watson_open_scale_v2 import Subscriptions as BaseSubscriptions
from ibm_watson_openscale.base_classes.watson_open_scale_v2 import Asset, AssetPropertiesRequest, \
AssetDeploymentRequest, RiskEvaluationStatus
from ibm_watson_openscale.supporting_classes.enums import *
from ibm_watson_openscale.base_classes.watson_open_scale_v2 import SparkStruct
if TYPE_CHECKING:
from .client import WatsonOpenScaleV2Adapter
from ibm_watson_openscale.base_classes.watson_open_scale_v2 import DetailedResponse
_DEFAULT_LIST_LENGTH = 50
# TODO: Add parameters validation in every method
[docs]class Subscriptions(BaseSubscriptions):
"""
Manages Subscription instance.
"""
def __init__(self, ai_client: 'WatsonOpenScaleV2Adapter') -> None:
validate_type(ai_client, 'ai_client', BaseService, True)
self._ai_client = ai_client
super().__init__(watson_open_scale=self._ai_client)
[docs] def show(self, limit: Optional[int] = 10,
data_mart_id: str = None,
service_provider_id: str = None,
asset_asset_id: str = None,
deployment_deployment_id: str = None,
deployment_deployment_type: str = None,
integration_reference_integrated_system_id: str = None,
integration_reference_external_id: str = None,
risk_evaluation_status_state: str = None,
service_provider_operational_space_id: str = None,
pre_production_reference_id: str = None,
**kwargs) -> None:
"""
Show service providers. By default 10 records will be shown.
:param limit: maximal number of fetched rows. By default set to 10. (optional)
:type limit: int
:param str data_mart_id: (optional) comma-separeted list of IDs.
:param str service_provider_id: (optional) comma-separeted list of IDs.
:param str asset_asset_id: (optional) comma-separeted list of IDs.
:param str deployment_deployment_id: (optional) comma-separeted list of
IDs.
:param str deployment_deployment_type: (optional) comma-separeted list of
types.
:param str integration_reference_integrated_system_id: (optional)
comma-separeted list of IDs.
:param str integration_reference_external_id: (optional) comma-separeted
list of IDs.
:param str risk_evaluation_status_state: (optional) comma-separeted list of
states.
:param str service_provider_operational_space_id: (optional)
comma-separeted list of operational space ids (property of service provider
object).
:param str pre_production_reference_id: (optional) comma-separeted list of
IDs.
:param dict headers: A `dict` containing the request headers
A way you might use me is:
>>> client.subscriptions.show()
>>> client.subscriptions.show(limit=20)
>>> client.subscriptions.show(limit=None)
"""
validate_type(limit, u'limit', int, False)
response = self.list(data_mart_id = data_mart_id,
service_provider_id = service_provider_id,
asset_asset_id = asset_asset_id,
deployment_deployment_id = deployment_deployment_id,
deployment_deployment_type = deployment_deployment_type,
integration_reference_integrated_system_id = integration_reference_integrated_system_id,
integration_reference_external_id = integration_reference_external_id,
risk_evaluation_status_state = risk_evaluation_status_state,
service_provider_operational_space_id = service_provider_operational_space_id,
pre_production_reference_id = pre_production_reference_id,
**kwargs)
records = [[subscription.entity.asset.asset_id,
subscription.entity.asset.name,
subscription.entity.data_mart_id,
subscription.entity.deployment.deployment_id,
subscription.entity.deployment.name,
subscription.entity.service_provider_id,
subscription.entity.status.state,
subscription.metadata.created_at,
subscription.metadata.id
] for subscription in response.result.subscriptions]
columns = ['asset_id', 'asset_name', 'data_mart_id', 'deployment_id', 'deployment_name',
'service_provider_id', 'status', 'created_at', 'id']
Table(columns, records).list(
limit=limit,
default_limit=_DEFAULT_LIST_LENGTH,
title="Subscriptions"
)
[docs] def add(self,
data_mart_id: str,
service_provider_id: str,
asset: 'Asset',
deployment: 'AssetDeploymentRequest',
asset_properties: 'AssetPropertiesRequest' = None,
risk_evaluation_status: 'RiskEvaluationStatus' = None,
analytics_engine: 'AnalyticsEngine' = None,
data_sources: List['DataSource'] = None,
training_data_stats: dict = None,
background_mode: bool = True,
**kwargs) -> Union['DetailedResponse', Optional[dict]]:
"""
Add a subscription to the model deployment.
:param str data_mart_id: ID of the data_mart (required)
:param str service_provider_id: ID of the service_provider (required)
:param Asset asset: an Asset object with asset's information (required)
:param AssetDeploymentRequest deployment: an AssetDeploymentRequest object with deployment's information (required)
:param AssetPropertiesRequest asset_properties: (optional) Additional asset
properties (subject of discovery if not provided when creating the
subscription).
:param RiskEvaluationStatus risk_evaluation_status: (optional)
:param AnalyticsEngine analytics_engine: (optional)
:param List[DataSource] data_sources: (optional)
:param training_data_stats: Training statistic json generated using training stats notebook (https://github.com/IBM-Watson/aios-data-distribution/blob/master/training_statistics_notebook.ipynb)
:param background_mode: if set to True, run will be in asynchronous mode, if set to False
it will wait for result (optional)
:type background_mode: bool
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `SubscriptionResponse` result
A way you may use me:
>>> from ibm_watson_openscale import *
>>> added_subscription_info = client.subscriptions.add(
data_mart_id='997b1474-00d2-4g05-ac02-287ebfc603b5',
service_provider_id='997b1474-00d2-4g05-ac02-287ebfc603b5',
asset=Asset(...),
deployment=AssetDeploymentRequest(...),
asset_properties=AssetPropertiesRequest(...),
)
"""
validate_type(data_mart_id, 'data_mart_id', str, True)
validate_type(service_provider_id, 'service_provider_id', str, True)
if self._ai_client.check_entitlements is True and self._ai_client.plan_name == constants.LITE_PLAN:
total_subscriptions = self.list().result.subscriptions
if len(total_subscriptions) >= 5:
raise Exception("You are not allowed to create more than 5 subscriptions for lite plan.")
if asset_properties is None and training_data_stats is None:
raise Exception("Either asset_properties or training_data_stats has to be passed")
### If "other" data type is present in output or input schema then remove it from subscription before saving it.
### https://github.ibm.com/aiopenscale/tracker/issues/19160
if asset_properties is not None:
asset_props = asset_properties.to_dict()
if 'input_data_schema' in asset_props:
input_schema = asset_props['input_data_schema']
if input_schema is not None:
has_other_data_type = self._has_other_datatype(input_schema['fields'])
if has_other_data_type is True:
asset_properties.input_data_schema = None
if 'output_data_schema' in asset_props:
output_schema = asset_props['output_data_schema']
if output_schema is not None:
has_other_data_type = self._has_other_datatype(output_schema['fields'])
if has_other_data_type is True:
asset_properties.output_data_schema = None
if training_data_stats is None:
response = super().add(data_mart_id=data_mart_id, service_provider_id=service_provider_id, asset=asset,
deployment=deployment, asset_properties=asset_properties,
risk_evaluation_status=risk_evaluation_status,
analytics_engine=analytics_engine,
data_sources=data_sources)
else:
#Create subscription using data available in training stats
if len(training_data_stats) == 0:
raise Exception("training_data_stats is empty. Please re-generate and use it")
response = self.__create_subscription_from_training_stats(data_mart_id, service_provider_id, training_data_stats, kwargs,background_mode )
subscription_id = response.result.metadata.id
if background_mode:
return response
else:
def check_state() -> dict:
details = self.get(subscription_id=subscription_id)
return details.result.entity.status.state
def get_result() -> Union[Tuple[str, Union[None, str], 'DetailedResponse']]:
details = self.get(subscription_id=subscription_id)
state = details.result.entity.status.state
if state in [StatusStateType.ACTIVE]:
return "Successfully finished adding subscription", None, details
else:
return "Add subscription failed with status: {}".format(state), \
'Reason: {}'.format(["code: {}, message: {}".format(error.code, error.message) for error in
details.result.entity.status.failure.errors]), details
return print_synchronous_run(
'Waiting for end of adding subscription {}'.format(subscription_id),
check_state,
get_result=get_result,
success_states=[StatusStateType.ACTIVE]
)
[docs] def delete(self,
subscription_id: str,
force: bool = None,
background_mode: bool = True) -> Union['DetailedResponse', Optional[dict]]:
"""
Delete subscription.
:param str subscription_id: Unique subscription ID.
:param bool force: (optional) force hard delete.
:param background_mode: if set to True, run will be in asynchronous mode, if set to False
it will wait for result (optional)
:type background_mode: bool
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
A way you may use me:
>>> client.subscriptions.delete(
background_mode=False,
subscription_id='997b1474-00d2-4g05-ac02-287ebfc603b5',
force=True
)
"""
response = super().delete(subscription_id=subscription_id, force=force)
if background_mode:
return response
else:
def check_state() -> dict:
details = self.list()
if subscription_id not in str(details.result):
return StatusStateType.FINISHED
else:
return StatusStateType.ACTIVE
def get_result() -> Union[Tuple[str, Union[None, str], 'DetailedResponse']]:
details = self.list()
if subscription_id not in str(details.result):
state = StatusStateType.FINISHED
else:
state = StatusStateType.ACTIVE
if state in [StatusStateType.FINISHED]:
return "Successfully finished deleting subscription", None, response
else:
return "Delete subscription failed", 'Reason: None', response # TODO: Need to show the reason.
return print_synchronous_run(
'Waiting for end of deleting subscription {}'.format(subscription_id),
check_state,
get_result=get_result,
success_states=[StatusStateType.FINISHED]
)
def _has_other_datatype(self, fields):
for field in fields:
type = field['type']
if type == 'other':
return True
return False
#Private method to create subscription using training stats data
def __create_subscription_from_training_stats(self, data_mart_id, service_provider_id, training_stats_info, params,background_mode):
deployment_id = params.get("deployment_id")
deployment_space_id = params.get("deployment_space_id")
model_type = training_stats_info['common_configuration']['problem_type']
problem_type = None
if model_type=="binary":
problem_type = ProblemType.BINARY_CLASSIFICATION
elif model_type=="multiclass":
problem_type = ProblemType.MULTICLASS_CLASSIFICATION
elif model_type=="regression":
problem_type = ProblemType.REGRESSION
prediction_column = params.get("prediction_field")
probability_columns = params.get("probability_fields")
predicted_target_column = params.get("predicted_target_field")
validate_type(deployment_id, 'deployment_id', str, True)
validate_type(prediction_column, 'prediction_field', str, True)
#validate_type(predicted_target_column, 'predicted_target_field', str, True)
model_asset_details_from_deployment=self._ai_client.service_providers.get_deployment_asset(data_mart_id=data_mart_id,service_provider_id=service_provider_id,deployment_id=deployment_id,deployment_space_id=deployment_space_id)
input_data_schema = training_stats_info["common_configuration"]["input_data_schema"]
fields = []
label_column=training_stats_info["common_configuration"]["label_column"]
# remove label column entry from input data schema
for field in input_data_schema["fields"]:
if field["name"] == label_column:
continue
fields.append(field)
required_input_data_schema = {
"type": "struct",
"fields": fields
}
training_data_schema = None
if "training_data_schema" in model_asset_details_from_deployment["entity"]["asset_properties"]:
training_data_schema = SparkStruct.from_dict(model_asset_details_from_deployment["entity"]["asset_properties"]["training_data_schema"])
return super().add(
data_mart_id=data_mart_id,
service_provider_id=service_provider_id,
asset=Asset(
asset_id=model_asset_details_from_deployment["entity"]["asset"]["asset_id"],
name=model_asset_details_from_deployment["entity"]["asset"]["name"],
url=model_asset_details_from_deployment["entity"]["asset"]["url"],
asset_type=AssetTypes.MODEL,
input_data_type=InputDataType.STRUCTURED,
problem_type=problem_type
),
deployment=AssetDeploymentRequest(
deployment_id=model_asset_details_from_deployment['metadata']['guid'],
name=model_asset_details_from_deployment['entity']['name'],
deployment_type= DeploymentTypes.ONLINE,
url=model_asset_details_from_deployment['entity']['scoring_endpoint']['url']
),
asset_properties=AssetPropertiesRequest(
label_column=label_column,
probability_fields=probability_columns,
prediction_field=prediction_column,
predicted_target_field=predicted_target_column,
feature_fields = training_stats_info["common_configuration"]["feature_fields"],
categorical_fields = training_stats_info["common_configuration"]["categorical_fields"],
input_data_schema = SparkStruct.from_dict(required_input_data_schema),
training_data_schema=training_data_schema
),
background_mode=background_mode
)