# coding: utf-8
# (C) Copyright IBM Corp. 2022.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Watson OpenScale API Specification
"""
from datetime import datetime
from enum import Enum
from typing import BinaryIO, Dict, List, TextIO, Union
import json
from ibm_cloud_sdk_core import BaseService, DetailedResponse
from ibm_cloud_sdk_core.authenticators.authenticator import Authenticator
from ibm_cloud_sdk_core.get_authenticator import get_authenticator_from_environment
from ibm_cloud_sdk_core.utils import convert_list, convert_model, datetime_to_string, string_to_datetime
from .common import get_sdk_headers
##############################################################################
# Service
##############################################################################
class WatsonOpenScaleV2(BaseService):
"""The Watson OpenScale V2 service."""
DEFAULT_SERVICE_URL = None
DEFAULT_SERVICE_NAME = 'ai_openscale'
@classmethod
def new_instance(cls,
service_name: str = DEFAULT_SERVICE_NAME,
) -> 'WatsonOpenScaleV2':
"""
Return a new client for the Watson OpenScale service using the specified
parameters and external configuration.
"""
authenticator = get_authenticator_from_environment(service_name)
service = cls(
authenticator
)
service.configure_service(service_name)
return service
def __init__(self,
authenticator: Authenticator = None,
) -> None:
"""
Construct a new client for the Watson OpenScale service.
:param Authenticator authenticator: The authenticator specifies the authentication mechanism.
Get up to date information from https://github.com/IBM/python-sdk-core/blob/master/README.md
about initializing the authenticator of your choice.
"""
BaseService.__init__(self,
service_url=self.DEFAULT_SERVICE_URL,
authenticator=authenticator)
#########################
# Data Marts
#########################
class DataMarts:
"""
Data Marts
"""
def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None:
"""
Construct a DataMarts client for the Watson OpenScale service.
:param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service.
"""
self.watson_open_scale = watson_open_scale
def add(self,
*,
name: str = None,
description: str = None,
service_instance_crn: str = None,
internal_database: bool = None,
database_configuration: 'DatabaseConfigurationRequest' = None,
database_discovery: str = None,
force: bool = None,
**kwargs
) -> DetailedResponse:
"""
Create a new data mart.
Create a new data mart with the given database connection.
:param str name: (optional) Name of the data mart.
:param str description: (optional) Description of the data mart.
:param str service_instance_crn: (optional) Can be omitted if user token is
used for authorization.
:param bool internal_database: (optional) If `true` the internal database
managed by AI OpenScale is provided for the user.
:param DatabaseConfigurationRequest database_configuration: (optional)
Database configuration ignored if internal database is requested
(`internal_database` is `true`).
:param str database_discovery: (optional) Indicates if the database was
discovered automatically or manually added by user through UI.
:param bool force: (optional) force update of metadata and db credentials
(assumption is that the new database is already prepared and populated).
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `DataMartDatabaseResponse` result
"""
if database_configuration is not None:
database_configuration = convert_model(database_configuration)
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='data_marts_add')
headers.update(sdk_headers)
params = {
'force': force
}
data = {
'name': name,
'description': description,
'service_instance_crn': service_instance_crn,
'internal_database': internal_database,
'database_configuration': database_configuration,
'database_discovery': database_discovery
}
data = {k: v for (k, v) in data.items() if v is not None}
data = json.dumps(data)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/data_marts'
request = self.watson_open_scale.prepare_request(method='POST',
url=url,
headers=headers,
params=params,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(DataMartDatabaseResponse, 'from_dict'):
response.result = DataMartDatabaseResponse.from_dict(response.result)
return response
[docs] def list(self,
**kwargs
) -> DetailedResponse:
"""
List all data marts.
The method returns the data mart confugrations as an object.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `DataMartDatabaseResponseCollection` result
"""
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='data_marts_list')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/data_marts'
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
if hasattr(DataMartDatabaseResponseCollection, 'from_dict'):
response.result = DataMartDatabaseResponseCollection.from_dict(response.result)
return response
[docs] def get(self,
data_mart_id: str,
**kwargs
) -> DetailedResponse:
"""
Get data mart with the given id.
:param str data_mart_id: ID of the data mart.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `DataMartDatabaseResponse` result
"""
if data_mart_id is None:
raise ValueError('data_mart_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='data_marts_get')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/data_marts/{0}'.format(
*self.watson_open_scale.encode_path_vars(data_mart_id))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
if hasattr(DataMartDatabaseResponse, 'from_dict'):
response.result = DataMartDatabaseResponse.from_dict(response.result)
return response
[docs] def patch(self,
data_mart_id: str,
json_patch_operation: List['JsonPatchOperation'],
**kwargs
) -> DetailedResponse:
"""
Update a data mart.
:param str data_mart_id: ID of the data mart.
:param List[JsonPatchOperation] json_patch_operation:
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `DataMartDatabaseResponse` result
"""
if data_mart_id is None:
raise ValueError('data_mart_id must be provided')
if json_patch_operation is None:
raise ValueError('json_patch_operation must be provided')
json_patch_operation = [convert_model(x) for x in json_patch_operation]
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='data_marts_patch')
headers.update(sdk_headers)
data = json.dumps(json_patch_operation)
headers['content-type'] = 'application/json-patch+json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/data_marts/{0}'.format(
*self.watson_open_scale.encode_path_vars(data_mart_id))
request = self.watson_open_scale.prepare_request(method='PATCH',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(DataMartDatabaseResponse, 'from_dict'):
response.result = DataMartDatabaseResponse.from_dict(response.result)
return response
def delete(self,
data_mart_id: str,
*,
force: bool = None,
**kwargs
) -> DetailedResponse:
"""
Delete a data mart.
:param str data_mart_id: ID of the data mart.
:param bool force: (optional) force hard delete.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
if data_mart_id is None:
raise ValueError('data_mart_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='data_marts_delete')
headers.update(sdk_headers)
params = {
'force': force
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
url = '/v2/data_marts/{0}'.format(
*self.watson_open_scale.encode_path_vars(data_mart_id))
request = self.watson_open_scale.prepare_request(method='DELETE',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
return response
#########################
# Service Providers
#########################
class ServiceProviders:
"""
Service Providers
"""
def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None:
"""
Construct a ServiceProviders client for the Watson OpenScale service.
:param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service.
"""
self.watson_open_scale = watson_open_scale
[docs] def list(self,
*,
show_deleted: bool = None,
service_type: str = None,
instance_id: str = None,
operational_space_id: str = None,
deployment_space_id: str = None,
**kwargs
) -> DetailedResponse:
"""
List service providers.
List assosiated Machine Learning service instances.
:param bool show_deleted: (optional) show also resources pending delete.
:param str service_type: (optional) Type of service.
:param str instance_id: (optional) comma-separeted list of IDs.
:param str operational_space_id: (optional) comma-separeted list of IDs.
:param str deployment_space_id: (optional) comma-separeted list of IDs.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `ServiceProviderResponseCollection` result
"""
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='service_providers_list')
headers.update(sdk_headers)
params = {
'show_deleted': show_deleted,
'service_type': service_type,
'instance_id': instance_id,
'operational_space_id': operational_space_id,
'deployment_space_id': deployment_space_id
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/service_providers'
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
if hasattr(ServiceProviderResponseCollection, 'from_dict'):
response.result = ServiceProviderResponseCollection.from_dict(response.result)
return response
def add(self,
name: str,
service_type: str,
credentials: 'MLCredentials',
*,
description: str = None,
request_headers: dict = None,
operational_space_id: str = None,
deployment_space_id: str = None,
group_ids: List[str] = None,
user_ids: List[str] = None,
**kwargs
) -> DetailedResponse:
"""
Add service provider.
Assosiate external Machine Learning service instance with the OpenScale DataMart.
:param str name: Name of the ML service instance.
:param str service_type: machine learning service type
(azure_machine_learning_studio is a prefered alias for
azure_machine_learning and should be used in new service bindings).
:param MLCredentials credentials:
:param str description: (optional)
:param dict request_headers: (optional) map header name to header value.
:param str operational_space_id: (optional) Reference to Operational Space.
:param str deployment_space_id: (optional) Reference to V2 Space ID.
:param List[str] group_ids: (optional) Access control list of group id of
Cloud Pak for Data (Only available for OpenScale on Cloud Pak for Data >=
4.0.6 with ENABLE_GROUP_AUTH being true).
:param List[str] user_ids: (optional) Access control list of user id of
Cloud Pak for Data (Only available for OpenScale on Cloud Pak for Data >=
4.0.6 with ENABLE_GROUP_AUTH being true).
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `ServiceProviderResponse` result
"""
if name is None:
raise ValueError('name must be provided')
if service_type is None:
raise ValueError('service_type must be provided')
if credentials is None:
raise ValueError('credentials must be provided')
credentials = convert_model(credentials)
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='service_providers_add')
headers.update(sdk_headers)
data = {
'name': name,
'service_type': service_type,
'credentials': credentials,
'description': description,
'request_headers': request_headers,
'operational_space_id': operational_space_id,
'deployment_space_id': deployment_space_id,
'group_ids': group_ids,
'user_ids': user_ids
}
data = {k: v for (k, v) in data.items() if v is not None}
data = json.dumps(data)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/service_providers'
request = self.watson_open_scale.prepare_request(method='POST',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(ServiceProviderResponse, 'from_dict'):
response.result = ServiceProviderResponse.from_dict(response.result)
return response
[docs] def get(self,
service_provider_id: str,
**kwargs
) -> DetailedResponse:
"""
Get a specific service provider.
Get the assosiated Machine Learning service provider details.
:param str service_provider_id: ID of the ML service provider.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `ServiceProviderResponse` result
"""
if service_provider_id is None:
raise ValueError('service_provider_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='service_providers_get')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/service_providers/{0}'.format(
*self.watson_open_scale.encode_path_vars(service_provider_id))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
if hasattr(ServiceProviderResponse, 'from_dict'):
response.result = ServiceProviderResponse.from_dict(response.result)
return response
def delete(self,
service_provider_id: str,
*,
force: bool = None,
**kwargs
) -> DetailedResponse:
"""
Delete a service provider.
Detach Machine Learning service provider.
:param str service_provider_id: ID of the ML service provider.
:param bool force: (optional) force hard delete.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
if service_provider_id is None:
raise ValueError('service_provider_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='service_providers_delete')
headers.update(sdk_headers)
params = {
'force': force
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
url = '/v2/service_providers/{0}'.format(
*self.watson_open_scale.encode_path_vars(service_provider_id))
request = self.watson_open_scale.prepare_request(method='DELETE',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
return response
[docs] def update(self,
service_provider_id: str,
patch_document: List['PatchDocument'],
**kwargs
) -> DetailedResponse:
"""
Update a service provider.
Update existing service provider.
:param str service_provider_id: ID of the ML service provider.
:param List[PatchDocument] patch_document:
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `ServiceProviderResponse` result
"""
if service_provider_id is None:
raise ValueError('service_provider_id must be provided')
if patch_document is None:
raise ValueError('patch_document must be provided')
patch_document = [convert_model(x) for x in patch_document]
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='service_providers_update')
headers.update(sdk_headers)
data = json.dumps(patch_document)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/service_providers/{0}'.format(
*self.watson_open_scale.encode_path_vars(service_provider_id))
request = self.watson_open_scale.prepare_request(method='PATCH',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(ServiceProviderResponse, 'from_dict'):
response.result = ServiceProviderResponse.from_dict(response.result)
return response
#########################
# Subscriptions
#########################
class Subscriptions:
"""
Subscriptions
"""
def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None:
"""
Construct a Subscriptions client for the Watson OpenScale service.
:param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service.
"""
self.watson_open_scale = watson_open_scale
[docs] def list(self,
*,
data_mart_id: str = None,
service_provider_id: str = None,
asset_asset_id: str = None,
deployment_deployment_id: str = None,
deployment_deployment_type: str = None,
integration_reference_integrated_system_id: str = None,
integration_reference_external_id: str = None,
risk_evaluation_status_state: str = None,
service_provider_operational_space_id: str = None,
pre_production_reference_id: str = None,
**kwargs
) -> DetailedResponse:
"""
List subscriptions.
List subscriptions.
:param str data_mart_id: (optional) comma-separeted list of IDs.
:param str service_provider_id: (optional) comma-separeted list of IDs.
:param str asset_asset_id: (optional) comma-separeted list of IDs.
:param str deployment_deployment_id: (optional) comma-separeted list of
IDs.
:param str deployment_deployment_type: (optional) comma-separeted list of
types.
:param str integration_reference_integrated_system_id: (optional)
comma-separeted list of IDs.
:param str integration_reference_external_id: (optional) comma-separeted
list of IDs.
:param str risk_evaluation_status_state: (optional) comma-separeted list of
states.
:param str service_provider_operational_space_id: (optional)
comma-separeted list of operational space ids (property of service provider
object).
:param str pre_production_reference_id: (optional) comma-separeted list of
IDs.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `SubscriptionResponseCollection` result
"""
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='subscriptions_list')
headers.update(sdk_headers)
params = {
'data_mart_id': data_mart_id,
'service_provider_id': service_provider_id,
'asset.asset_id': asset_asset_id,
'deployment.deployment_id': deployment_deployment_id,
'deployment.deployment_type': deployment_deployment_type,
'integration_reference.integrated_system_id': integration_reference_integrated_system_id,
'integration_reference.external_id': integration_reference_external_id,
'risk_evaluation_status.state': risk_evaluation_status_state,
'service_provider.operational_space_id': service_provider_operational_space_id,
'pre_production_reference_id': pre_production_reference_id
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/subscriptions'
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
if hasattr(SubscriptionResponseCollection, 'from_dict'):
response.result = SubscriptionResponseCollection.from_dict(response.result)
return response
def add(self,
data_mart_id: str,
service_provider_id: str,
asset: 'Asset',
deployment: 'AssetDeploymentRequest',
*,
asset_properties: 'AssetPropertiesRequest' = None,
risk_evaluation_status: 'RiskEvaluationStatus' = None,
analytics_engine: 'AnalyticsEngine' = None,
data_sources: List['DataSource'] = None,
**kwargs
) -> DetailedResponse:
"""
Add a new subscription.
Add a new subscription to the model deployment.
:param str data_mart_id:
:param str service_provider_id:
:param Asset asset:
:param AssetDeploymentRequest deployment:
:param AssetPropertiesRequest asset_properties: (optional) Additional asset
properties (subject of discovery if not provided when creating the
subscription).
:param RiskEvaluationStatus risk_evaluation_status: (optional)
:param AnalyticsEngine analytics_engine: (optional)
:param List[DataSource] data_sources: (optional)
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `SubscriptionResponse` result
"""
if data_mart_id is None:
raise ValueError('data_mart_id must be provided')
if service_provider_id is None:
raise ValueError('service_provider_id must be provided')
if asset is None:
raise ValueError('asset must be provided')
if deployment is None:
raise ValueError('deployment must be provided')
asset = convert_model(asset)
deployment = convert_model(deployment)
if asset_properties is not None:
asset_properties = convert_model(asset_properties)
if risk_evaluation_status is not None:
risk_evaluation_status = convert_model(risk_evaluation_status)
if analytics_engine is not None:
analytics_engine = convert_model(analytics_engine)
if data_sources is not None:
data_sources = [convert_model(x) for x in data_sources]
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='subscriptions_add')
headers.update(sdk_headers)
data = {
'data_mart_id': data_mart_id,
'service_provider_id': service_provider_id,
'asset': asset,
'deployment': deployment,
'asset_properties': asset_properties,
'risk_evaluation_status': risk_evaluation_status,
'analytics_engine': analytics_engine,
'data_sources': data_sources
}
data = {k: v for (k, v) in data.items() if v is not None}
data = json.dumps(data)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/subscriptions'
request = self.watson_open_scale.prepare_request(method='POST',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(SubscriptionResponse, 'from_dict'):
response.result = SubscriptionResponse.from_dict(response.result)
return response
[docs] def get(self,
subscription_id: str,
**kwargs
) -> DetailedResponse:
"""
Get a specific subscription.
Get a specific subscription.
:param str subscription_id: Unique subscription ID.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `SubscriptionResponse` result
"""
if subscription_id is None:
raise ValueError('subscription_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='subscriptions_get')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/subscriptions/{0}'.format(
*self.watson_open_scale.encode_path_vars(subscription_id))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
if hasattr(SubscriptionResponse, 'from_dict'):
response.result = SubscriptionResponse.from_dict(response.result)
return response
[docs] def update(self,
subscription_id: str,
patch_document: List['PatchDocument'],
**kwargs
) -> DetailedResponse:
"""
Update a subscription.
Update existing asset (from ML service instance) subscription.
:param str subscription_id: Unique subscription ID.
:param List[PatchDocument] patch_document:
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `SubscriptionResponse` result
"""
if subscription_id is None:
raise ValueError('subscription_id must be provided')
if patch_document is None:
raise ValueError('patch_document must be provided')
patch_document = [convert_model(x) for x in patch_document]
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='subscriptions_update')
headers.update(sdk_headers)
data = json.dumps(patch_document)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/subscriptions/{0}'.format(
*self.watson_open_scale.encode_path_vars(subscription_id))
request = self.watson_open_scale.prepare_request(method='PATCH',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(SubscriptionResponse, 'from_dict'):
response.result = SubscriptionResponse.from_dict(response.result)
return response
def delete(self,
subscription_id: str,
*,
force: bool = None,
**kwargs
) -> DetailedResponse:
"""
Delete a subscription.
Delete a subscription.
:param str subscription_id: Unique subscription ID.
:param bool force: (optional) force hard delete. All data for records and
metrics associated to this subscription will be deleted.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
if subscription_id is None:
raise ValueError('subscription_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='subscriptions_delete')
headers.update(sdk_headers)
params = {
'force': force
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
url = '/v2/subscriptions/{0}'.format(
*self.watson_open_scale.encode_path_vars(subscription_id))
request = self.watson_open_scale.prepare_request(method='DELETE',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
return response
[docs] def schemas(self,
subscription_id: str,
*,
input_data: List['ScoreData'] = None,
training_data_reference: 'InputDataReference' = None,
**kwargs
) -> DetailedResponse:
"""
Derive model schemas from the training data.
Derive model schemas from the training data. Only "structured" input data type is
supported. If the input_data_type field in the subscription (subscription ->
entity -> asset -> input_data_type) is not "structured", an error will be
returned.
:param str subscription_id: Unique subscription ID.
:param List[ScoreData] input_data: (optional) Array of score data object.
If multiple score data objects are included, the "fields" array (if any)
for score purposes will always be taken from the first score data object.
:param InputDataReference training_data_reference: (optional)
InputDataReference is the same as TrainingDataReference except that neither
location nor connection is required. This is needed for the Schemas API and
to avoid updating existing APIs.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `SchemaInferenceResponse` result
"""
if subscription_id is None:
raise ValueError('subscription_id must be provided')
if input_data is not None:
input_data = [convert_model(x) for x in input_data]
if training_data_reference is not None:
training_data_reference = convert_model(training_data_reference)
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='subscriptions_schemas')
headers.update(sdk_headers)
data = {
'input_data': input_data,
'training_data_reference': training_data_reference
}
data = {k: v for (k, v) in data.items() if v is not None}
data = json.dumps(data)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/subscriptions/{0}/schemas'.format(
*self.watson_open_scale.encode_path_vars(subscription_id))
request = self.watson_open_scale.prepare_request(method='POST',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(SchemaInferenceResponse, 'from_dict'):
response.result = SchemaInferenceResponse.from_dict(response.result)
return response
[docs] def tables(self,
subscription_id: str,
dataset_type: str,
unknown_base_type: object,
**kwargs
) -> DetailedResponse:
"""
Create a table for specified data set type.
Current supported dataset_type is feedback. The body of request should be json
object (an empty object `{}` is fine).
:param str subscription_id: Unique subscription ID.
:param str dataset_type: data set type of subscription.
:param UNKNOWN_BASE_TYPE unknown_base_type:
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `DataSetResponse` result
"""
if subscription_id is None:
raise ValueError('subscription_id must be provided')
if dataset_type is None:
raise ValueError('dataset_type must be provided')
if unknown_base_type is None:
raise ValueError('unknown_base_type must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='subscriptions_tables')
headers.update(sdk_headers)
data = json.dumps(unknown_base_type)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/subscriptions/{0}/tables/{1}'.format(
*self.watson_open_scale.encode_path_vars(subscription_id, dataset_type))
request = self.watson_open_scale.prepare_request(method='POST',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(DataSetResponse, 'from_dict'):
response.result = DataSetResponse.from_dict(response.result)
return response
[docs] def score(self,
subscription_id: str,
values: List[str],
*,
fields: List[str] = None,
**kwargs
) -> DetailedResponse:
"""
Computes the bias mitigation/remediation for the specified model.
Computes the bias mitigation/remediation for the specified model. The fairness
monitoring debias request payload details must be valid.
:param str subscription_id: Unique subscription ID.
:param List[str] values: The values associated to the fields.
:param List[str] fields: (optional) The fields to process debias scoring.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `FairnessMonitoringRemediation` result
"""
if subscription_id is None:
raise ValueError('subscription_id must be provided')
if values is None:
raise ValueError('values must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='subscriptions_score')
headers.update(sdk_headers)
data = {
'values': values,
'fields': fields
}
data = {k: v for (k, v) in data.items() if v is not None}
data = json.dumps(data)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/subscriptions/{0}/predictions'.format(
*self.watson_open_scale.encode_path_vars(subscription_id))
request = self.watson_open_scale.prepare_request(method='POST',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(FairnessMonitoringRemediation, 'from_dict'):
response.result = FairnessMonitoringRemediation.from_dict(response.result)
return response
#########################
# Data Sets
#########################
class DataSets:
"""
Data Sets
"""
def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None:
"""
Construct a DataSets client for the Watson OpenScale service.
:param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service.
"""
self.watson_open_scale = watson_open_scale
[docs] def list(self,
*,
target_target_id: str = None,
target_target_type: str = None,
type: str = None,
managed_by: str = None,
**kwargs
) -> DetailedResponse:
"""
List all data sets specified by the parameters.
:param str target_target_id: (optional) ID of the data set target (e.g.
subscription ID, business application ID).
:param str target_target_type: (optional) type of the target.
:param str type: (optional) type of the data set.
:param str managed_by: (optional) ID of the managing entity (e.g. business
application ID).
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `DataSetResponseCollection` result
"""
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='data_sets_list')
headers.update(sdk_headers)
params = {
'target.target_id': target_target_id,
'target.target_type': target_target_type,
'type': type,
'managed_by': managed_by
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/data_sets'
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
if hasattr(DataSetResponseCollection, 'from_dict'):
response.result = DataSetResponseCollection.from_dict(response.result)
return response
def add(self,
data_mart_id: str,
name: str,
type: str,
target: 'Target',
data_schema: 'SparkStruct',
*,
description: str = None,
schema_update_mode: str = None,
location: 'LocationTableName' = None,
managed_by: str = None,
**kwargs
) -> DetailedResponse:
"""
Create a new data set.
Create a new data set.
:param str data_mart_id:
:param str name:
:param str type: type of a data set.
:param Target target:
:param SparkStruct data_schema:
:param str description: (optional)
:param str schema_update_mode: (optional)
:param LocationTableName location: (optional)
:param str managed_by: (optional)
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `DataSetResponse` result
"""
if data_mart_id is None:
raise ValueError('data_mart_id must be provided')
if name is None:
raise ValueError('name must be provided')
if type is None:
raise ValueError('type must be provided')
if target is None:
raise ValueError('target must be provided')
if data_schema is None:
raise ValueError('data_schema must be provided')
target = convert_model(target)
data_schema = convert_model(data_schema)
if location is not None:
location = convert_model(location)
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='data_sets_add')
headers.update(sdk_headers)
data = {
'data_mart_id': data_mart_id,
'name': name,
'type': type,
'target': target,
'data_schema': data_schema,
'description': description,
'schema_update_mode': schema_update_mode,
'location': location,
'managed_by': managed_by
}
data = {k: v for (k, v) in data.items() if v is not None}
data = json.dumps(data)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/data_sets'
request = self.watson_open_scale.prepare_request(method='POST',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(DataSetResponse, 'from_dict'):
response.result = DataSetResponse.from_dict(response.result)
return response
[docs] def get(self,
data_set_id: str,
**kwargs
) -> DetailedResponse:
"""
Get data set with the given id.
:param str data_set_id: ID of the data set.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `DataSetResponse` result
"""
if data_set_id is None:
raise ValueError('data_set_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='data_sets_get')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/data_sets/{0}'.format(
*self.watson_open_scale.encode_path_vars(data_set_id))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
if hasattr(DataSetResponse, 'from_dict'):
response.result = DataSetResponse.from_dict(response.result)
return response
def delete(self,
data_set_id: str,
*,
force: bool = None,
**kwargs
) -> DetailedResponse:
"""
Delete a data set.
:param str data_set_id: ID of the data set.
:param bool force: (optional) force hard delete. Table associated with the
data set will be dropped.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
if data_set_id is None:
raise ValueError('data_set_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='data_sets_delete')
headers.update(sdk_headers)
params = {
'force': force
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
url = '/v2/data_sets/{0}'.format(
*self.watson_open_scale.encode_path_vars(data_set_id))
request = self.watson_open_scale.prepare_request(method='DELETE',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
return response
[docs] def update(self,
data_set_id: str,
patch_document: List['PatchDocument'],
**kwargs
) -> DetailedResponse:
"""
Update a data set.
Update the data set.
:param str data_set_id: ID of the data set.
:param List[PatchDocument] patch_document:
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `DataSetResponse` result
"""
if data_set_id is None:
raise ValueError('data_set_id must be provided')
if patch_document is None:
raise ValueError('patch_document must be provided')
patch_document = [convert_model(x) for x in patch_document]
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='data_sets_update')
headers.update(sdk_headers)
data = json.dumps(patch_document)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/data_sets/{0}'.format(
*self.watson_open_scale.encode_path_vars(data_set_id))
request = self.watson_open_scale.prepare_request(method='PATCH',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(DataSetResponse, 'from_dict'):
response.result = DataSetResponse.from_dict(response.result)
return response
#########################
# Records
#########################
class Records:
"""
Records
"""
def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None:
"""
Construct a Records client for the Watson OpenScale service.
:param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service.
"""
self.watson_open_scale = watson_open_scale
def add(self,
data_set_id: str,
request_body: Union[List[object], str, TextIO],
*,
content_type: str = None,
header: bool = None,
skip: int = None,
limit: int = None,
delimiter: str = None,
on_error: str = None,
csv_max_line_length: float = None,
**kwargs
) -> DetailedResponse:
"""
Add new data set records.
Add new data set records.
:param str data_set_id: ID of the data set.
:param List[object] request_body:
:param str content_type: (optional) The type of the input. A character
encoding can be specified by including a `charset` parameter. For example,
'text/csv;charset=utf-8'.
:param bool header: (optional) if not provided service will attempt to
automatically detect header in the first line.
:param int skip: (optional) skip number of rows from input.
:param int limit: (optional) limit for number of processed input rows.
:param str delimiter: (optional) delimiter character for data provided as
csv.
:param str on_error: (optional) expected behaviour on error.
:param float csv_max_line_length: (optional) maximum length of single line
in bytes (default 10MB).
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `Status` result
"""
if data_set_id is None:
raise ValueError('data_set_id must be provided')
if request_body is None:
raise ValueError('request_body must be provided')
headers = {
'Content-Type': content_type
}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='records_add')
headers.update(sdk_headers)
params = {
'header': header,
'skip': skip,
'limit': limit,
'delimiter': delimiter,
'on_error': on_error,
'csv_max_line_length': csv_max_line_length
}
if isinstance(request_body, list):
data = json.dumps(request_body)
if content_type is None:
headers['Content-Type'] = 'application/json'
else:
data = request_body
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/data_sets/{0}/records'.format(
*self.watson_open_scale.encode_path_vars(data_set_id))
request = self.watson_open_scale.prepare_request(method='POST',
url=url,
headers=headers,
params=params,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(Status, 'from_dict'):
response.result = Status.from_dict(response.result)
return response
def list(self,
data_set_id: str,
*,
start: datetime = None,
end: datetime = None,
limit: int = None,
offset: int = None,
annotations: List[str] = None,
exclude_annotations: bool = None,
filter: str = None,
include_total_count: bool = None,
order: str = None,
seed: float = None,
format: str = None,
binary_format: str = None,
**kwargs
) -> DetailedResponse:
"""
List data set records.
List data set records.
:param str data_set_id: ID of the data set.
:param datetime start: (optional) return records with timestamp grather
then or equal to `start` parameter.
:param datetime end: (optional) return records with timestamp lower then
`end` parameter.
:param int limit: (optional) limit for number of returned records. If the
value is greater than 1000 than it will be truncated.
:param int offset: (optional) offset of returned records.
:param List[str] annotations: (optional) return record annotations with
given names.
:param bool exclude_annotations: (optional) If there is no need to fetch
annotations at all, set this parameter as true. There should be better
performance.
:param str filter: (optional) Only return records that match given filters.
There are two types of filters, separated by commas:
* normal filter (multiple are possible), {field_name}:{op}:{value} —
filter records directly
* joining filter (only a single one is possible),
{data_set_id}.{field_name}:{op}:{value} —
join a data set by transaction_id (the user must ensure it's
provided!)
and filter by this data set's records' field.
Will fail if the user hasn't provided transaction_id for both data
sets' records. Filters of different types can be mixed. They are partly
compatible with the ones in POST /v2/data_sets/{data_set_id}/distributions.
Available operators:
| op | meaning | example | code
equivalent |
|:----:|:---------------------------:|:------------------:|:------------------------:|
| eq | equality | field:eq:value | field ==
value |
| gt | greater than | field:gt:value | field >
value |
| gte | greater or equal | field:gte:value | field >=
value |
| lt | less than | field:lt:value | field <
value |
| lte | less or equal | field:lte:value | field <=
value |
| like | matching a simple pattern* | field:like:pattern |
pattern.match(field) |
| in | is contained in list | field:in:a;b;c |
[a,b,c].contains(field) |
| null | is null | field:null | field value
== null |
| nonnull | is not null | field:nonnull | field value
!= null |
* - "%" means "one or more character", "_" means "any single character",
other characters have their usual,
literal meaning (e.g. "|" means character "|").
:param bool include_total_count: (optional) If total_count should be
included. It can have performance impact if total_count is calculated.
:param str order: (optional) return records in order specified. Currently
only `random` is supported. You have to use one of `start`, `end`, or
`filter` param with `order=random`.
:param float seed: (optional) return repeatable result for randome
sampling. Value must be between 0.0 and 1.0, inclusive. Also you can get
sampled records in pagination manner by specifying `limit` and `offset`.
:param str format: (optional) What JSON format to use on output.
:param str binary_format: (optional) Binary data presentation format. By
default, the binary field value is encoded to base64 string. If _reference_
is chosen, every binary field is moved to the _references_ section with
value set to an uri to the particular field within the record that can be
GET in a separate request.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `RecordsListResponse` result
"""
if data_set_id is None:
raise ValueError('data_set_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='records_list')
headers.update(sdk_headers)
params = {
'start': start,
'end': end,
'limit': limit,
'offset': offset,
'annotations': convert_list(annotations),
'exclude_annotations': exclude_annotations,
'filter': filter,
'include_total_count': include_total_count,
'order': order,
'seed': seed,
'format': format,
'binary_format': binary_format
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/data_sets/{0}/records'.format(
*self.watson_open_scale.encode_path_vars(data_set_id))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
if hasattr(RecordsListResponse, 'from_dict'):
response.result = RecordsListResponse.from_dict(response.result)
return response
def patch(self,
data_set_id: str,
patch_document: List['PatchDocument'],
**kwargs
) -> DetailedResponse:
"""
Update data set records.
Update data set records.
:param str data_set_id: ID of the data set.
:param List[PatchDocument] patch_document:
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `Status` result
"""
if data_set_id is None:
raise ValueError('data_set_id must be provided')
if patch_document is None:
raise ValueError('patch_document must be provided')
patch_document = [convert_model(x) for x in patch_document]
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='records_patch')
headers.update(sdk_headers)
data = json.dumps(patch_document)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/data_sets/{0}/records'.format(
*self.watson_open_scale.encode_path_vars(data_set_id))
request = self.watson_open_scale.prepare_request(method='PATCH',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(Status, 'from_dict'):
response.result = Status.from_dict(response.result)
return response
def get(self,
data_set_id: str,
record_id: str,
*,
binary_format: str = None,
**kwargs
) -> DetailedResponse:
"""
Get a specific data set record with the given id.
Get a specific record in a data set.
:param str data_set_id: ID of the data set.
:param str record_id: ID of the record.
:param str binary_format: (optional) Binary data presentation format. By
default, the binary field value is encoded to base64 string. If _reference_
is chosen, every binary field is moved to the _references_ section with
value set to an uri to the particular field within the record that can be
GET in a separate request.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `DataRecordResponse` result
"""
if data_set_id is None:
raise ValueError('data_set_id must be provided')
if record_id is None:
raise ValueError('record_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='records_get')
headers.update(sdk_headers)
params = {
'binary_format': binary_format
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/data_sets/{0}/records/{1}'.format(
*self.watson_open_scale.encode_path_vars(data_set_id, record_id))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
if hasattr(DataRecordResponse, 'from_dict'):
response.result = DataRecordResponse.from_dict(response.result)
return response
def update(self,
data_set_id: str,
record_id: str,
patch_document: List['PatchDocument'],
*,
binary_format: str = None,
**kwargs
) -> DetailedResponse:
"""
Update a specific record in a data set.
Update a specific record in a data set.
:param str data_set_id: ID of the data set.
:param str record_id: ID of the record.
:param List[PatchDocument] patch_document:
:param str binary_format: (optional) Binary data presentation format. By
default, the binary field value is encoded to base64 string. If _reference_
is chosen, every binary field is moved to the _references_ section with
value set to an uri to the particular field within the record that can be
GET in a separate request.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `DataRecordResponse` result
"""
if data_set_id is None:
raise ValueError('data_set_id must be provided')
if record_id is None:
raise ValueError('record_id must be provided')
if patch_document is None:
raise ValueError('patch_document must be provided')
patch_document = [convert_model(x) for x in patch_document]
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='records_update')
headers.update(sdk_headers)
params = {
'binary_format': binary_format
}
data = json.dumps(patch_document)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/data_sets/{0}/records/{1}'.format(
*self.watson_open_scale.encode_path_vars(data_set_id, record_id))
request = self.watson_open_scale.prepare_request(method='PATCH',
url=url,
headers=headers,
params=params,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(DataRecordResponse, 'from_dict'):
response.result = DataRecordResponse.from_dict(response.result)
return response
def field(self,
data_set_id: str,
record_id: str,
field_name: str,
**kwargs
) -> DetailedResponse:
"""
Get value of a field in a given record.
Get value of a field in a given record.
:param str data_set_id: ID of the data set.
:param str record_id: ID of the record.
:param str field_name: field_name should map to db column name which value
is to be retrieved.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
if data_set_id is None:
raise ValueError('data_set_id must be provided')
if record_id is None:
raise ValueError('record_id must be provided')
if field_name is None:
raise ValueError('field_name must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='records_field')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
url = '/v2/data_sets/{0}/records/{1}/{2}'.format(
*self.watson_open_scale.encode_path_vars(data_set_id, record_id, field_name))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
return response
def query(self,
data_set_type: str,
*,
record_id: List[str] = None,
transaction_id: List[str] = None,
start: datetime = None,
end: datetime = None,
offset: int = None,
limit: int = None,
**kwargs
) -> DetailedResponse:
"""
Get data set records using record_id or transaction_id.
Get data set records with specific record_id or transaction_id.
:param str data_set_type: a (single) data set type.
:param List[str] record_id: (optional) one or more record id values that
should be matched.
:param List[str] transaction_id: (optional) one or more transaction id
values that should be matched.
:param datetime start: (optional) beginning of the time range.
:param datetime end: (optional) end of the time range.
:param int offset: (optional) offset of returned explanations.
:param int limit: (optional) Maximum number of elements returned.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `DataSetRecords` result
"""
if data_set_type is None:
raise ValueError('data_set_type must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='records_query')
headers.update(sdk_headers)
params = {
'data_set_type': data_set_type,
'record_id': convert_list(record_id),
'transaction_id': convert_list(transaction_id),
'start': start,
'end': end,
'offset': offset,
'limit': limit
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/data_set_records'
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
if hasattr(DataSetRecords, 'from_dict'):
response.result = DataSetRecords.from_dict(response.result)
return response
#########################
# Requests
#########################
class Requests:
"""
Requests
"""
def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None:
"""
Construct a Requests client for the Watson OpenScale service.
:param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service.
"""
self.watson_open_scale = watson_open_scale
def get(self,
data_set_id: str,
request_id: str,
**kwargs
) -> DetailedResponse:
"""
Get status of a specific request.
Get status of a specific request.
:param str data_set_id: ID of the data set.
:param str request_id: ID of the request.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `Status` result
"""
if data_set_id is None:
raise ValueError('data_set_id must be provided')
if request_id is None:
raise ValueError('request_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='requests_get')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/data_sets/{0}/requests/{1}'.format(
*self.watson_open_scale.encode_path_vars(data_set_id, request_id))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
if hasattr(Status, 'from_dict'):
response.result = Status.from_dict(response.result)
return response
#########################
# Distributions
#########################
class Distributions:
"""
Distributions
"""
def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None:
"""
Construct a Distributions client for the Watson OpenScale service.
:param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service.
"""
self.watson_open_scale = watson_open_scale
def add(self,
data_set_id: str,
start: str,
end: str,
dataset: str,
group: List[str],
*,
limit: float = None,
filter: str = None,
agg: List[str] = None,
max_bins: float = None,
nocache: bool = None,
**kwargs
) -> DetailedResponse:
"""
add new data disbributions.
add new data disbributions.
:param str data_set_id: ID of the data set.
:param str start: start datetime in ISO format.
:param str end: end datetime in ISO format.
:param str dataset: type of a data set.
:param List[str] group: names of columns to be grouped.
:param float limit: (optional) limit for number of rows, by default it is
50,000 (max possible limit is 50,000).
:param str filter: (optional) Filters defined by user in format:
{field_name}:{op}:{value}. Partly compatible with filters in "filter"
parameter of GET /v2/data_sets/{data_set_id}/records.
Possible filter operators:
* eq - equals (numeric, string)
* gt - greater than (numeric)
* gte - greater than or equal (numeric)
* lt - lower than (numeric)
* lte - lower than or equal (numeric)
* in - value in a set (numeric, string)
* field:null (a no-argument filter) - value is null (any nullable)
* field:exists (a no-argument filter) - value is not null (any column).
:param List[str] agg: (optional) Definition of aggregations, by default
'count'.
Aggregations can be one of:
* count
* <column_name>:sum
* <column_name>:min
* <column_name>:max
* <column_name>:avg
* <column_name>:stddev.
:param float max_bins: (optional) max number of bins which will be
generated for data.
:param bool nocache: (optional) force columns data refresh.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `DataDistributionResponse` result
"""
if data_set_id is None:
raise ValueError('data_set_id must be provided')
if start is None:
raise ValueError('start must be provided')
if end is None:
raise ValueError('end must be provided')
if dataset is None:
raise ValueError('dataset must be provided')
if group is None:
raise ValueError('group must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='distributions_add')
headers.update(sdk_headers)
params = {
'nocache': nocache
}
data = {
'start': start,
'end': end,
'dataset': dataset,
'group': group,
'limit': limit,
'filter': filter,
'agg': agg,
'max_bins': max_bins
}
data = {k: v for (k, v) in data.items() if v is not None}
data = json.dumps(data)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/data_sets/{0}/distributions'.format(
*self.watson_open_scale.encode_path_vars(data_set_id))
request = self.watson_open_scale.prepare_request(method='POST',
url=url,
headers=headers,
params=params,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(DataDistributionResponse, 'from_dict'):
response.result = DataDistributionResponse.from_dict(response.result)
return response
def delete(self,
data_set_id: str,
**kwargs
) -> DetailedResponse:
"""
Delete data distributions.
Delete data distribution.
:param str data_set_id: ID of the data set.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
if data_set_id is None:
raise ValueError('data_set_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='distributions_delete')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
url = '/v2/data_sets/{0}/distributions'.format(
*self.watson_open_scale.encode_path_vars(data_set_id))
request = self.watson_open_scale.prepare_request(method='DELETE',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
return response
def get(self,
data_set_id: str,
data_distribution_id: str,
**kwargs
) -> DetailedResponse:
"""
Get a specific data distribution.
Get a specific data distribution.
:param str data_set_id: ID of the data set.
:param str data_distribution_id: ID of the data distribution requested to
be calculated.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `DataDistributionResponse` result
"""
if data_set_id is None:
raise ValueError('data_set_id must be provided')
if data_distribution_id is None:
raise ValueError('data_distribution_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='distributions_get')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/data_sets/{0}/distributions/{1}'.format(
*self.watson_open_scale.encode_path_vars(data_set_id, data_distribution_id))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
if hasattr(DataDistributionResponse, 'from_dict'):
response.result = DataDistributionResponse.from_dict(response.result)
return response
#########################
# Monitors
#########################
class Monitors:
"""
Monitors
"""
def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None:
"""
Construct a Monitors client for the Watson OpenScale service.
:param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service.
"""
self.watson_open_scale = watson_open_scale
def list(self,
*,
name: str = None,
**kwargs
) -> DetailedResponse:
"""
List available monitors.
List available monitors.
:param str name: (optional) comma-separeted list of names.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `MonitorCollections` result
"""
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='monitors_list')
headers.update(sdk_headers)
params = {
'name': name
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/monitor_definitions'
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
if hasattr(MonitorCollections, 'from_dict'):
response.result = MonitorCollections.from_dict(response.result)
return response
def add(self,
name: str,
metrics: List['MonitorMetricRequest'],
tags: List['MonitorTagRequest'],
*,
description: str = None,
applies_to: 'ApplicabilitySelection' = None,
parameters_schema: dict = None,
managed_by: str = None,
schedule: 'MonitorInstanceSchedule' = None,
schedules: 'MonitorInstanceScheduleCollection' = None,
monitor_runtime: 'MonitorRuntime' = None,
**kwargs
) -> DetailedResponse:
"""
Add custom monitor.
Add custom monitor.
:param str name: Monitor UI label (must be unique).
:param List[MonitorMetricRequest] metrics: A list of metric definition.
:param List[MonitorTagRequest] tags: Available tags.
:param str description: (optional) Long monitoring description presented in
monitoring catalog.
:param ApplicabilitySelection applies_to: (optional)
:param dict parameters_schema: (optional) JSON schema that will be used to
validate monitoring parameters when enabled.
:param str managed_by: (optional)
:param MonitorInstanceSchedule schedule: (optional) The schedule used to
control how frequently the target is monitored. The maximum frequency is
once every 30 minutes.
Defaults to once every hour if not specified.
:param MonitorInstanceScheduleCollection schedules: (optional) A set of
schedules used to control how frequently the target is monitored for online
and batch deployment type.
:param MonitorRuntime monitor_runtime: (optional) Field to specify if
scheduler should be created or not.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `MonitorDisplayForm` result
"""
if name is None:
raise ValueError('name must be provided')
if metrics is None:
raise ValueError('metrics must be provided')
if tags is None:
raise ValueError('tags must be provided')
metrics = [convert_model(x) for x in metrics]
tags = [convert_model(x) for x in tags]
if applies_to is not None:
applies_to = convert_model(applies_to)
if schedule is not None:
schedule = convert_model(schedule)
if schedules is not None:
schedules = convert_model(schedules)
if monitor_runtime is not None:
monitor_runtime = convert_model(monitor_runtime)
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='monitors_add')
headers.update(sdk_headers)
data = {
'name': name,
'metrics': metrics,
'tags': tags,
'description': description,
'applies_to': applies_to,
'parameters_schema': parameters_schema,
'managed_by': managed_by,
'schedule': schedule,
'schedules': schedules,
'monitor_runtime': monitor_runtime
}
data = {k: v for (k, v) in data.items() if v is not None}
data = json.dumps(data)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/monitor_definitions'
request = self.watson_open_scale.prepare_request(method='POST',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(MonitorDisplayForm, 'from_dict'):
response.result = MonitorDisplayForm.from_dict(response.result)
return response
def get(self,
monitor_definition_id: str,
**kwargs
) -> DetailedResponse:
"""
Get a specific monitor definition.
Get a specific monitor definition.
:param str monitor_definition_id: Unique monitor definition ID.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `MonitorDisplayForm` result
"""
if monitor_definition_id is None:
raise ValueError('monitor_definition_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='monitors_get')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/monitor_definitions/{0}'.format(
*self.watson_open_scale.encode_path_vars(monitor_definition_id))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
if hasattr(MonitorDisplayForm, 'from_dict'):
response.result = MonitorDisplayForm.from_dict(response.result)
return response
def update(self,
monitor_definition_id: str,
name: str,
metrics: List['MonitorMetricRequest'],
tags: List['MonitorTagRequest'],
*,
description: str = None,
applies_to: 'ApplicabilitySelection' = None,
parameters_schema: dict = None,
managed_by: str = None,
schedule: 'MonitorInstanceSchedule' = None,
schedules: 'MonitorInstanceScheduleCollection' = None,
monitor_runtime: 'MonitorRuntime' = None,
**kwargs
) -> DetailedResponse:
"""
Update the monitor definition.
Update a monitor definition.
:param str monitor_definition_id: Unique monitor definition ID.
:param str name: Monitor UI label (must be unique).
:param List[MonitorMetricRequest] metrics: A list of metric definition.
:param List[MonitorTagRequest] tags: Available tags.
:param str description: (optional) Long monitoring description presented in
monitoring catalog.
:param ApplicabilitySelection applies_to: (optional)
:param dict parameters_schema: (optional) JSON schema that will be used to
validate monitoring parameters when enabled.
:param str managed_by: (optional)
:param MonitorInstanceSchedule schedule: (optional) The schedule used to
control how frequently the target is monitored. The maximum frequency is
once every 30 minutes.
Defaults to once every hour if not specified.
:param MonitorInstanceScheduleCollection schedules: (optional) A set of
schedules used to control how frequently the target is monitored for online
and batch deployment type.
:param MonitorRuntime monitor_runtime: (optional) Field to specify if
scheduler should be created or not.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `MonitorDisplayForm` result
"""
if monitor_definition_id is None:
raise ValueError('monitor_definition_id must be provided')
if name is None:
raise ValueError('name must be provided')
if metrics is None:
raise ValueError('metrics must be provided')
if tags is None:
raise ValueError('tags must be provided')
metrics = [convert_model(x) for x in metrics]
tags = [convert_model(x) for x in tags]
if applies_to is not None:
applies_to = convert_model(applies_to)
if schedule is not None:
schedule = convert_model(schedule)
if schedules is not None:
schedules = convert_model(schedules)
if monitor_runtime is not None:
monitor_runtime = convert_model(monitor_runtime)
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='monitors_update')
headers.update(sdk_headers)
data = {
'name': name,
'metrics': metrics,
'tags': tags,
'description': description,
'applies_to': applies_to,
'parameters_schema': parameters_schema,
'managed_by': managed_by,
'schedule': schedule,
'schedules': schedules,
'monitor_runtime': monitor_runtime
}
data = {k: v for (k, v) in data.items() if v is not None}
data = json.dumps(data)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/monitor_definitions/{0}'.format(
*self.watson_open_scale.encode_path_vars(monitor_definition_id))
request = self.watson_open_scale.prepare_request(method='PUT',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(MonitorDisplayForm, 'from_dict'):
response.result = MonitorDisplayForm.from_dict(response.result)
return response
def patch(self,
monitor_definition_id: str,
json_patch_operation: List['JsonPatchOperation'],
**kwargs
) -> DetailedResponse:
"""
Update a monitor definition.
Update a monitor definition.
:param str monitor_definition_id: Unique monitor definition ID.
:param List[JsonPatchOperation] json_patch_operation:
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `MonitorDisplayForm` result
"""
if monitor_definition_id is None:
raise ValueError('monitor_definition_id must be provided')
if json_patch_operation is None:
raise ValueError('json_patch_operation must be provided')
json_patch_operation = [convert_model(x) for x in json_patch_operation]
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='monitors_patch')
headers.update(sdk_headers)
data = json.dumps(json_patch_operation)
headers['content-type'] = 'application/json-patch+json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/monitor_definitions/{0}'.format(
*self.watson_open_scale.encode_path_vars(monitor_definition_id))
request = self.watson_open_scale.prepare_request(method='PATCH',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(MonitorDisplayForm, 'from_dict'):
response.result = MonitorDisplayForm.from_dict(response.result)
return response
def delete(self,
monitor_definition_id: str,
*,
force: bool = None,
**kwargs
) -> DetailedResponse:
"""
Delete a monitor definition.
Delete a monitor definition, backing up table specific to monitor.
:param str monitor_definition_id: Unique monitor definition ID.
:param bool force: (optional) force hard delete. Table specific to this
montior will dropped from data mart.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
if monitor_definition_id is None:
raise ValueError('monitor_definition_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='monitors_delete')
headers.update(sdk_headers)
params = {
'force': force
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
url = '/v2/monitor_definitions/{0}'.format(
*self.watson_open_scale.encode_path_vars(monitor_definition_id))
request = self.watson_open_scale.prepare_request(method='DELETE',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
return response
#########################
# Instances
#########################
class Instances:
"""
Instances
"""
def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None:
"""
Construct a Instances client for the Watson OpenScale service.
:param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service.
"""
self.watson_open_scale = watson_open_scale
def list(self,
*,
data_mart_id: str = None,
monitor_definition_id: str = None,
target_target_id: str = None,
target_target_type: str = None,
**kwargs
) -> DetailedResponse:
"""
List monitor instances.
List monitor instances.
:param str data_mart_id: (optional) comma-separeted list of IDs.
:param str monitor_definition_id: (optional) comma-separeted list of IDs.
:param str target_target_id: (optional) comma-separeted list of IDs.
:param str target_target_type: (optional) comma-separeted list of types.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `MonitorInstanceCollection` result
"""
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='instances_list')
headers.update(sdk_headers)
params = {
'data_mart_id': data_mart_id,
'monitor_definition_id': monitor_definition_id,
'target.target_id': target_target_id,
'target.target_type': target_target_type
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/monitor_instances'
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
if hasattr(MonitorInstanceCollection, 'from_dict'):
response.result = MonitorInstanceCollection.from_dict(response.result)
return response
def add(self,
data_mart_id: str,
monitor_definition_id: str,
target: 'Target',
*,
parameters: dict = None,
thresholds: List['MetricThresholdOverride'] = None,
schedule: 'MonitorInstanceSchedule' = None,
schedule_id: str = None,
managed_by: str = None,
unprocessed_records: 'RecordsCountSummary' = None,
total_records: 'RecordsCountSummary' = None,
skip_scheduler: bool = None,
**kwargs
) -> DetailedResponse:
"""
Create a new monitor instance.
Create a new monitor instance.
:param str data_mart_id:
:param str monitor_definition_id:
:param Target target:
:param dict parameters: (optional) Monitoring parameters consistent with
the `parameters_schema` from the monitor definition.
:param List[MetricThresholdOverride] thresholds: (optional)
:param MonitorInstanceSchedule schedule: (optional) The schedule used to
control how frequently the target is monitored. The maximum frequency is
once every 30 minutes.
Defaults to once every hour if not specified.
:param str schedule_id: (optional)
:param str managed_by: (optional)
:param RecordsCountSummary unprocessed_records: (optional) Summary about
records count.
:param RecordsCountSummary total_records: (optional) Summary about records
count.
:param bool skip_scheduler: (optional) prevent schedule creation for this
monitor instnace.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `MonitorInstanceResponse` result
"""
if data_mart_id is None:
raise ValueError('data_mart_id must be provided')
if monitor_definition_id is None:
raise ValueError('monitor_definition_id must be provided')
if target is None:
raise ValueError('target must be provided')
target = convert_model(target)
if thresholds is not None:
thresholds = [convert_model(x) for x in thresholds]
if schedule is not None:
schedule = convert_model(schedule)
if unprocessed_records is not None:
unprocessed_records = convert_model(unprocessed_records)
if total_records is not None:
total_records = convert_model(total_records)
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='instances_add')
headers.update(sdk_headers)
params = {
'skip_scheduler': skip_scheduler
}
data = {
'data_mart_id': data_mart_id,
'monitor_definition_id': monitor_definition_id,
'target': target,
'parameters': parameters,
'thresholds': thresholds,
'schedule': schedule,
'schedule_id': schedule_id,
'managed_by': managed_by,
'unprocessed_records': unprocessed_records,
'total_records': total_records
}
data = {k: v for (k, v) in data.items() if v is not None}
data = json.dumps(data)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/monitor_instances'
request = self.watson_open_scale.prepare_request(method='POST',
url=url,
headers=headers,
params=params,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(MonitorInstanceResponse, 'from_dict'):
response.result = MonitorInstanceResponse.from_dict(response.result)
return response
def get(self,
monitor_instance_id: str,
*,
expand: str = None,
**kwargs
) -> DetailedResponse:
"""
Get monitor instance details.
Get monitor instance details.
:param str monitor_instance_id: Unique monitor instance ID.
:param str expand: (optional) comma-separeted list of fields (supported
fields are unprocessed_records and total_records).
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `MonitorInstanceResponse` result
"""
if monitor_instance_id is None:
raise ValueError('monitor_instance_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='instances_get')
headers.update(sdk_headers)
params = {
'expand': expand
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/monitor_instances/{0}'.format(
*self.watson_open_scale.encode_path_vars(monitor_instance_id))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
if hasattr(MonitorInstanceResponse, 'from_dict'):
response.result = MonitorInstanceResponse.from_dict(response.result)
return response
def update(self,
monitor_instance_id: str,
patch_document: List['PatchDocument'],
*,
update_metadata_only: bool = None,
**kwargs
) -> DetailedResponse:
"""
Update a monitor instance.
Update a monitor instance.
:param str monitor_instance_id: Unique monitor instance ID.
:param List[PatchDocument] patch_document:
:param bool update_metadata_only: (optional) Flag that allows to control if
the underlaying actions related to the monitor reconfiguration should be
triggered.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `MonitorInstanceResponse` result
"""
if monitor_instance_id is None:
raise ValueError('monitor_instance_id must be provided')
if patch_document is None:
raise ValueError('patch_document must be provided')
patch_document = [convert_model(x) for x in patch_document]
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='instances_update')
headers.update(sdk_headers)
params = {
'update_metadata_only': update_metadata_only
}
data = json.dumps(patch_document)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/monitor_instances/{0}'.format(
*self.watson_open_scale.encode_path_vars(monitor_instance_id))
request = self.watson_open_scale.prepare_request(method='PATCH',
url=url,
headers=headers,
params=params,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(MonitorInstanceResponse, 'from_dict'):
response.result = MonitorInstanceResponse.from_dict(response.result)
return response
def delete(self,
monitor_instance_id: str,
*,
force: bool = None,
**kwargs
) -> DetailedResponse:
"""
Delete a monitor instance.
Delete a monitor instance.
:param str monitor_instance_id: Unique monitor instance ID.
:param bool force: (optional) force hard delete. All metrics for this
monitor instance will be deleted from the data mart.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
if monitor_instance_id is None:
raise ValueError('monitor_instance_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='instances_delete')
headers.update(sdk_headers)
params = {
'force': force
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
url = '/v2/monitor_instances/{0}'.format(
*self.watson_open_scale.encode_path_vars(monitor_instance_id))
request = self.watson_open_scale.prepare_request(method='DELETE',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
return response
#########################
# Runs
#########################
class Runs:
"""
Runs
"""
def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None:
"""
Construct a Runs client for the Watson OpenScale service.
:param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service.
"""
self.watson_open_scale = watson_open_scale
def add(self,
monitor_instance_id: str,
*,
triggered_by: str = None,
parameters: dict = None,
business_metric_context: 'MonitoringRunBusinessMetricContext' = None,
expiration_date: datetime = None,
**kwargs
) -> DetailedResponse:
"""
Trigger monitoring run.
Trigger monitoring run.
:param str monitor_instance_id: Unique monitor instance ID.
:param str triggered_by: (optional) An identifier representing the source
that triggered the run request (optional). One of: event, scheduler, user,
webhook.
:param dict parameters: (optional) Monitoring parameters consistent with
the `parameters_schema` from the monitor definition.
:param MonitoringRunBusinessMetricContext business_metric_context:
(optional) Properties defining the business metric context in the triggered
run of AI metric calculation.
:param datetime expiration_date: (optional) The timestamp when the
monitoring run was created with expiry date (in the format
YYYY-MM-DDTHH:mm:ssZ or YYYY-MM-DDTHH:mm:ss.sssZ, matching the date-time
format as specified by RFC 3339).
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `MonitoringRun` result
"""
if monitor_instance_id is None:
raise ValueError('monitor_instance_id must be provided')
if business_metric_context is not None:
business_metric_context = convert_model(business_metric_context)
if expiration_date is not None:
expiration_date = datetime_to_string(expiration_date)
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='runs_add')
headers.update(sdk_headers)
data = {
'triggered_by': triggered_by,
'parameters': parameters,
'business_metric_context': business_metric_context,
'expiration_date': expiration_date
}
data = {k: v for (k, v) in data.items() if v is not None}
data = json.dumps(data)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/monitor_instances/{0}/runs'.format(
*self.watson_open_scale.encode_path_vars(monitor_instance_id))
request = self.watson_open_scale.prepare_request(method='POST',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(MonitoringRun, 'from_dict'):
response.result = MonitoringRun.from_dict(response.result)
return response
def list(self,
monitor_instance_id: str,
*,
start: str = None,
limit: int = None,
**kwargs
) -> DetailedResponse:
"""
List monitoring runs.
List monitoring runs.
:param str monitor_instance_id: Unique monitor instance ID.
:param str start: (optional) The page token indicating where to start
paging from.
:param int limit: (optional) The limit of the number of items to return,
for example limit=50. If not specified a default of 100 will be used.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `MonitoringRunCollection` result
"""
if monitor_instance_id is None:
raise ValueError('monitor_instance_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='runs_list')
headers.update(sdk_headers)
params = {
'start': start,
'limit': limit
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/monitor_instances/{0}/runs'.format(
*self.watson_open_scale.encode_path_vars(monitor_instance_id))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
if hasattr(MonitoringRunCollection, 'from_dict'):
response.result = MonitoringRunCollection.from_dict(response.result)
return response
def get(self,
monitor_instance_id: str,
monitoring_run_id: str,
**kwargs
) -> DetailedResponse:
"""
Get monitoring run details.
Get monitoring run details.
:param str monitor_instance_id: Unique monitor instance ID.
:param str monitoring_run_id: Unique monitoring run ID.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `MonitoringRun` result
"""
if monitor_instance_id is None:
raise ValueError('monitor_instance_id must be provided')
if monitoring_run_id is None:
raise ValueError('monitoring_run_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='runs_get')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/monitor_instances/{0}/runs/{1}'.format(
*self.watson_open_scale.encode_path_vars(monitor_instance_id, monitoring_run_id))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
if hasattr(MonitoringRun, 'from_dict'):
response.result = MonitoringRun.from_dict(response.result)
return response
def update(self,
monitor_instance_id: str,
monitoring_run_id: str,
json_patch_operation: List['JsonPatchOperation'],
**kwargs
) -> DetailedResponse:
"""
Update existing monitoring run details.
Update existing monitoring run details.
:param str monitor_instance_id: Unique monitor instance ID.
:param str monitoring_run_id: Unique monitoring run ID.
:param List[JsonPatchOperation] json_patch_operation:
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `MonitoringRun` result
"""
if monitor_instance_id is None:
raise ValueError('monitor_instance_id must be provided')
if monitoring_run_id is None:
raise ValueError('monitoring_run_id must be provided')
if json_patch_operation is None:
raise ValueError('json_patch_operation must be provided')
json_patch_operation = [convert_model(x) for x in json_patch_operation]
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='runs_update')
headers.update(sdk_headers)
data = json.dumps(json_patch_operation)
headers['content-type'] = 'application/json-patch+json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/monitor_instances/{0}/runs/{1}'.format(
*self.watson_open_scale.encode_path_vars(monitor_instance_id, monitoring_run_id))
request = self.watson_open_scale.prepare_request(method='PATCH',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(MonitoringRun, 'from_dict'):
response.result = MonitoringRun.from_dict(response.result)
return response
#########################
# Measurements
#########################
class Measurements:
"""
Measurements
"""
def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None:
"""
Construct a Measurements client for the Watson OpenScale service.
:param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service.
"""
self.watson_open_scale = watson_open_scale
def add(self,
monitor_instance_id: str,
monitor_measurement_request: List['MonitorMeasurementRequest'],
**kwargs
) -> DetailedResponse:
"""
Publish measurement data to OpenScale.
Publish measurement data to OpenScale.
:param str monitor_instance_id: Unique monitor instance ID.
:param List[MonitorMeasurementRequest] monitor_measurement_request:
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
if monitor_instance_id is None:
raise ValueError('monitor_instance_id must be provided')
if monitor_measurement_request is None:
raise ValueError('monitor_measurement_request must be provided')
monitor_measurement_request = [convert_model(x) for x in monitor_measurement_request]
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='measurements_add')
headers.update(sdk_headers)
data = json.dumps(monitor_measurement_request)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
url = '/v2/monitor_instances/{0}/measurements'.format(
*self.watson_open_scale.encode_path_vars(monitor_instance_id))
request = self.watson_open_scale.prepare_request(method='POST',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
return response
def list(self,
monitor_instance_id: str,
start: datetime,
end: datetime,
run_id: str,
*,
filter: str = None,
limit: int = None,
offset: int = None,
**kwargs
) -> DetailedResponse:
"""
Query measeurements from OpenScale DataMart.
Query measeurements from OpenScale DataMart. It is required to either provide a
`start end` or `run_id` parameter.
:param str monitor_instance_id: Unique monitor instance ID.
:param datetime start: Beginning of the time range.
:param datetime end: End of the time range.
:param str run_id: Comma delimited list of measurement run_id.
:param str filter: (optional) Filter expression can consist of any metric
tag or a common column of string type followed by filter name and
optionally a value, all delimited by colon. Supported filters are: `in`,
`eq`, `null` and `exists`. Sample filters are:
`filter=region:in:[us,pl],segment:eq:sales` or
`filter=region:null,segment:exists`.
:param int limit: (optional) Maximum number of measurements returned.
:param int offset: (optional) Offset of measurements returned.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `MonitorMeasurementResponseCollection` result
"""
if monitor_instance_id is None:
raise ValueError('monitor_instance_id must be provided')
if start is None:
raise ValueError('start must be provided')
if end is None:
raise ValueError('end must be provided')
if run_id is None:
raise ValueError('run_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='measurements_list')
headers.update(sdk_headers)
params = {
'start': start,
'end': end,
'run_id': run_id,
'filter': filter,
'limit': limit,
'offset': offset
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/monitor_instances/{0}/measurements'.format(
*self.watson_open_scale.encode_path_vars(monitor_instance_id))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
if hasattr(MonitorMeasurementResponseCollection, 'from_dict'):
response.result = MonitorMeasurementResponseCollection.from_dict(response.result)
return response
def get(self,
monitor_instance_id: str,
measurement_id: str,
*,
metric_id: str = None,
**kwargs
) -> DetailedResponse:
"""
Get measeurement data from OpenScale DataMart.
Get measeurement data from OpenScale DataMart. If metric_id is specified, sources
associated with given metric id are filtered.
:param str monitor_instance_id: Unique monitor instance ID.
:param str measurement_id: Unique measurement ID.
:param str metric_id: (optional) Comma delimited list of metric_id.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `MonitorMeasurementResponse` result
"""
if monitor_instance_id is None:
raise ValueError('monitor_instance_id must be provided')
if measurement_id is None:
raise ValueError('measurement_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='measurements_get')
headers.update(sdk_headers)
params = {
'metric_id': metric_id
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/monitor_instances/{0}/measurements/{1}'.format(
*self.watson_open_scale.encode_path_vars(monitor_instance_id, measurement_id))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
if hasattr(MonitorMeasurementResponse, 'from_dict'):
response.result = MonitorMeasurementResponse.from_dict(response.result)
return response
def query(self,
*,
target_id: str = None,
target_type: str = None,
monitor_definition_id: str = None,
filter: str = None,
recent_count: int = None,
format: str = None,
**kwargs
) -> DetailedResponse:
"""
Query for the recent measurement.
Query for the recent measurement grouped by the monitoring target (subscription or
business application).
:param str target_id: (optional) Comma separated ID of the monitoring
target (subscription or business application).
:param str target_type: (optional) Type of the monitoring target
(subscription or business application).
:param str monitor_definition_id: (optional) Comma separated ID of the
monitor definition.
:param str filter: (optional) Filter expression can consist of any metric
tag or a common column of string type followed by filter name and
optionally a value, all delimited by colon and prepended with
`monitor_definition_id.` string. Supported filters are: `in`, `eq`, `null`
and `exists`. Sample filters are:
`monitor_definition_id.filter=region:in:[us,pl],monitor_definition_id.segment:eq:sales`
or
`filter=monitor_definition_id.region:null,monitor_definition_id.segment:exists`.
Every monitor_definition_id can have own set of filters.
:param int recent_count: (optional) Number of measurements (per target) to
be returned.
:param str format: (optional) Format of the returned data. `full` format
compared to `compact` is additive and contains `sources` part.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `MeasurementsResponseCollection` result
"""
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='measurements_query')
headers.update(sdk_headers)
params = {
'target_id': target_id,
'target_type': target_type,
'monitor_definition_id': monitor_definition_id,
'filter': filter,
'recent_count': recent_count,
'format': format
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/measurements'
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
if hasattr(MeasurementsResponseCollection, 'from_dict'):
response.result = MeasurementsResponseCollection.from_dict(response.result)
return response
#########################
# Metrics
#########################
class Metrics:
"""
Metrics
"""
def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None:
"""
Construct a Metrics client for the Watson OpenScale service.
:param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service.
"""
self.watson_open_scale = watson_open_scale
def list(self,
monitor_instance_id: str,
start: datetime,
end: datetime,
agg: str,
*,
interval: str = None,
filter: str = None,
group: str = None,
**kwargs
) -> DetailedResponse:
"""
Query monitor instance metrics from OpenScale DataMart.
Query monitor instance metrics from OpenScale DataMart. See <a
href="https://github.ibm.com/aiopenscale/aios-datamart-service-api/wiki/1.3.-Metrics-Query-Language">Metrics
Query Language documentation</a>.
:param str monitor_instance_id: Unique monitor instance ID.
:param datetime start: Calculations **inclusive**, internally floored to
achieve full interval. If interval is vulnerable to time zone, the
calculated value depends on a backend db engine: PostgreSQL respects time
zone and DB2 use UTC time. Calculated value is returned in response.
:param datetime end: Calculations **exclusive**, internally ceiled to
achieve full interval. If interval is vulnerable to time zone, the
calculated value depends on a backend db engine: PostgreSQL respects time
zone and DB2 use UTC time. Calculated value is returned in response.
:param str agg: Comma delimited function list constructed from metric name
and function, e.g. `agg=metric_name:count,:last` that defines aggregations.
:param str interval: (optional) Time unit in which metrics are grouped and
aggregated, interval by interval.
:param str filter: (optional) Filter expression can consist of any metric
tag or a common column of string type followed by filter name and
optionally a value, all delimited by colon. Supported filters are: `in`,
`eq`, `null` and `exists`. Sample filters are:
`filter=region:in:[us,pl],segment:eq:sales` or
`filter=region:null,segment:exists`.
:param str group: (optional) Comma delimited list constructed from metric
tags, e.g. `group=region,segment` to group metrics before aggregations.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `DataMartGetMonitorInstanceMetrics` result
"""
if monitor_instance_id is None:
raise ValueError('monitor_instance_id must be provided')
if start is None:
raise ValueError('start must be provided')
if end is None:
raise ValueError('end must be provided')
if agg is None:
raise ValueError('agg must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='metrics_list')
headers.update(sdk_headers)
params = {
'start': start,
'end': end,
'agg': agg,
'interval': interval,
'filter': filter,
'group': group
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/monitor_instances/{0}/metrics'.format(
*self.watson_open_scale.encode_path_vars(monitor_instance_id))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
if hasattr(DataMartGetMonitorInstanceMetrics, 'from_dict'):
response.result = DataMartGetMonitorInstanceMetrics.from_dict(response.result)
return response
#########################
# Integrated Systems
#########################
class IntegratedSystems:
"""
Integrated Systems
"""
def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None:
"""
Construct a IntegratedSystems client for the Watson OpenScale service.
:param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service.
"""
self.watson_open_scale = watson_open_scale
def list(self,
*,
type: str = None,
**kwargs
) -> DetailedResponse:
"""
List integrated systems.
List integrated systems.
:param str type: (optional) comma-separeted list of type for the integrated
system.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `IntegratedSystemCollection` result
"""
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='integrated_systems_list')
headers.update(sdk_headers)
params = {
'type': type
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/integrated_systems'
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
if hasattr(IntegratedSystemCollection, 'from_dict'):
response.result = IntegratedSystemCollection.from_dict(response.result)
return response
def add(self,
name: str,
type: str,
description: str,
credentials: dict,
*,
connection: object = None,
group_ids: List[str] = None,
user_ids: List[str] = None,
**kwargs
) -> DetailedResponse:
"""
Create a new integrated system.
Create a new integrated system.
:param str name: The name of the Integrated System.
:param str type:
:param str description: The description of the Integrated System.
:param dict credentials: The credentials for the Integrated System.
:param object connection: (optional) The additional connection information
for the Integrated System.
:param List[str] group_ids: (optional) Access control list of group id of
Cloud Pak for Data (Only available for open_pages type and OpenScale on
Cloud Pak for Data >= 4.0.6 with ENABLE_GROUP_AUTH being true).
:param List[str] user_ids: (optional) Access control list of user id of
Cloud Pak for Data (Only available for open_pages type and OpenScale on
Cloud Pak for Data >= 4.0.6 with ENABLE_GROUP_AUTH being true).
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `IntegratedSystemResponse` result
"""
if name is None:
raise ValueError('name must be provided')
if type is None:
raise ValueError('type must be provided')
if description is None:
raise ValueError('description must be provided')
if credentials is None:
raise ValueError('credentials must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='integrated_systems_add')
headers.update(sdk_headers)
data = {
'name': name,
'type': type,
'description': description,
'credentials': credentials,
'connection': connection,
'group_ids': group_ids,
'user_ids': user_ids
}
data = {k: v for (k, v) in data.items() if v is not None}
data = json.dumps(data)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/integrated_systems'
request = self.watson_open_scale.prepare_request(method='POST',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(IntegratedSystemResponse, 'from_dict'):
response.result = IntegratedSystemResponse.from_dict(response.result)
return response
def get(self,
integrated_system_id: str,
**kwargs
) -> DetailedResponse:
"""
Get a specific integrated system.
Get a specific integrated system.
:param str integrated_system_id: Unique integrated system ID.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `IntegratedSystemResponse` result
"""
if integrated_system_id is None:
raise ValueError('integrated_system_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='integrated_systems_get')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/integrated_systems/{0}'.format(
*self.watson_open_scale.encode_path_vars(integrated_system_id))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
if hasattr(IntegratedSystemResponse, 'from_dict'):
response.result = IntegratedSystemResponse.from_dict(response.result)
return response
def update(self,
integrated_system_id: str,
json_patch_operation: List['JsonPatchOperation'],
**kwargs
) -> DetailedResponse:
"""
Update an integrated system.
Update an integrated system.
:param str integrated_system_id: Unique integrated system ID.
:param List[JsonPatchOperation] json_patch_operation:
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `IntegratedSystemResponse` result
"""
if integrated_system_id is None:
raise ValueError('integrated_system_id must be provided')
if json_patch_operation is None:
raise ValueError('json_patch_operation must be provided')
json_patch_operation = [convert_model(x) for x in json_patch_operation]
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='integrated_systems_update')
headers.update(sdk_headers)
data = json.dumps(json_patch_operation)
headers['content-type'] = 'application/json-patch+json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/integrated_systems/{0}'.format(
*self.watson_open_scale.encode_path_vars(integrated_system_id))
request = self.watson_open_scale.prepare_request(method='PATCH',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(IntegratedSystemResponse, 'from_dict'):
response.result = IntegratedSystemResponse.from_dict(response.result)
return response
def delete(self,
integrated_system_id: str,
**kwargs
) -> DetailedResponse:
"""
Delete an integrated system.
Delete an integrated system.
:param str integrated_system_id: Unique integrated system ID.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
if integrated_system_id is None:
raise ValueError('integrated_system_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='integrated_systems_delete')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
url = '/v2/integrated_systems/{0}'.format(
*self.watson_open_scale.encode_path_vars(integrated_system_id))
request = self.watson_open_scale.prepare_request(method='DELETE',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
return response
#########################
# Operational Spaces
#########################
class OperationalSpaces:
"""
Operational Spaces
"""
def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None:
"""
Construct a OperationalSpaces client for the Watson OpenScale service.
:param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service.
"""
self.watson_open_scale = watson_open_scale
def list(self,
**kwargs
) -> DetailedResponse:
"""
List Operational Spaces.
List Operational Spaces.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `OperationalSpaceCollection` result
"""
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='operational_spaces_list')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/operational_spaces'
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
if hasattr(OperationalSpaceCollection, 'from_dict'):
response.result = OperationalSpaceCollection.from_dict(response.result)
return response
def add(self,
name: str,
*,
description: str = None,
**kwargs
) -> DetailedResponse:
"""
Create an operational space.
Create an operational space.
:param str name: The name of the Operational Space.
:param str description: (optional) The description of the Operational
Space.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `OperationalSpaceResponse` result
"""
if name is None:
raise ValueError('name must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='operational_spaces_add')
headers.update(sdk_headers)
data = {
'name': name,
'description': description
}
data = {k: v for (k, v) in data.items() if v is not None}
data = json.dumps(data)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/operational_spaces'
request = self.watson_open_scale.prepare_request(method='POST',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(OperationalSpaceResponse, 'from_dict'):
response.result = OperationalSpaceResponse.from_dict(response.result)
return response
def get(self,
operational_space_id: str,
**kwargs
) -> DetailedResponse:
"""
Get an operational space.
Get an operational space.
:param str operational_space_id: Unique Operational Space ID.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `OperationalSpaceResponse` result
"""
if operational_space_id is None:
raise ValueError('operational_space_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='operational_spaces_get')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/operational_spaces/{0}'.format(
*self.watson_open_scale.encode_path_vars(operational_space_id))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
if hasattr(OperationalSpaceResponse, 'from_dict'):
response.result = OperationalSpaceResponse.from_dict(response.result)
return response
def update(self,
operational_space_id: str,
json_patch_operation: List['JsonPatchOperation'],
**kwargs
) -> DetailedResponse:
"""
Update an operational space.
Update an operational space.
:param str operational_space_id: Unique Operational Space ID.
:param List[JsonPatchOperation] json_patch_operation:
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `OperationalSpaceResponse` result
"""
if operational_space_id is None:
raise ValueError('operational_space_id must be provided')
if json_patch_operation is None:
raise ValueError('json_patch_operation must be provided')
json_patch_operation = [convert_model(x) for x in json_patch_operation]
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='operational_spaces_update')
headers.update(sdk_headers)
data = json.dumps(json_patch_operation)
headers['content-type'] = 'application/json-patch+json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/operational_spaces/{0}'.format(
*self.watson_open_scale.encode_path_vars(operational_space_id))
request = self.watson_open_scale.prepare_request(method='PATCH',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(OperationalSpaceResponse, 'from_dict'):
response.result = OperationalSpaceResponse.from_dict(response.result)
return response
def delete(self,
operational_space_id: str,
**kwargs
) -> DetailedResponse:
"""
Delete an operational space.
Delete an operational space.
:param str operational_space_id: Unique Operational Space ID.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
if operational_space_id is None:
raise ValueError('operational_space_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='operational_spaces_delete')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
url = '/v2/operational_spaces/{0}'.format(
*self.watson_open_scale.encode_path_vars(operational_space_id))
request = self.watson_open_scale.prepare_request(method='DELETE',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
return response
#########################
# User Preferences
#########################
class UserPreferences:
"""
User Preferences
"""
def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None:
"""
Construct a UserPreferences client for the Watson OpenScale service.
:param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service.
"""
self.watson_open_scale = watson_open_scale
def list(self,
**kwargs
) -> DetailedResponse:
"""
Get User Preferences.
Get User Preferences.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='user_preferences_list')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/user_preferences'
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
return response
def patch(self,
json_patch_operation: List['JsonPatchOperation'],
**kwargs
) -> DetailedResponse:
"""
Update User Preferences.
Update User Preferences.
:param List[JsonPatchOperation] json_patch_operation:
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
if json_patch_operation is None:
raise ValueError('json_patch_operation must be provided')
json_patch_operation = [convert_model(x) for x in json_patch_operation]
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='user_preferences_patch')
headers.update(sdk_headers)
data = json.dumps(json_patch_operation)
headers['content-type'] = 'application/json-patch+json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/user_preferences'
request = self.watson_open_scale.prepare_request(method='PATCH',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
return response
def get(self,
user_preference_key: str,
**kwargs
) -> DetailedResponse:
"""
Get a specific user prefrence.
Get a specific user preference.
:param str user_preference_key: key in user preferences.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `UserPreferencesGetResponse` result
"""
if user_preference_key is None:
raise ValueError('user_preference_key must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='user_preferences_get')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/user_preferences/{0}'.format(
*self.watson_open_scale.encode_path_vars(user_preference_key))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
if hasattr(UserPreferencesGetResponse, 'from_dict'):
response.result = UserPreferencesGetResponse.from_dict(response.result)
return response
def update(self,
user_preference_key: str,
user_preferences_update_request: 'UserPreferencesUpdateRequest',
**kwargs
) -> DetailedResponse:
"""
Update the user preference.
Update the user preference.
:param str user_preference_key: key in user preferences.
:param UserPreferencesUpdateRequest user_preferences_update_request:
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
if user_preference_key is None:
raise ValueError('user_preference_key must be provided')
if user_preferences_update_request is None:
raise ValueError('user_preferences_update_request must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='user_preferences_update')
headers.update(sdk_headers)
data = json.dumps(user_preferences_update_request)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/user_preferences/{0}'.format(
*self.watson_open_scale.encode_path_vars(user_preference_key))
request = self.watson_open_scale.prepare_request(method='PUT',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
return response
def delete(self,
user_preference_key: str,
**kwargs
) -> DetailedResponse:
"""
Delete the user preference.
Delete the user preference.
:param str user_preference_key: key in user preferences.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
if user_preference_key is None:
raise ValueError('user_preference_key must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='user_preferences_delete')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
url = '/v2/user_preferences/{0}'.format(
*self.watson_open_scale.encode_path_vars(user_preference_key))
request = self.watson_open_scale.prepare_request(method='DELETE',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
return response
#########################
# Explanation Tasks
#########################
class ExplanationTasks:
"""
Explanation Tasks
"""
def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None:
"""
Construct a ExplanationTasks client for the Watson OpenScale service.
:param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service.
"""
self.watson_open_scale = watson_open_scale
def add(self,
*,
scoring_ids: List[str] = None,
input_rows: List[dict] = None,
explanation_types: List[str] = None,
subscription_id: str = None,
**kwargs
) -> DetailedResponse:
"""
Compute explanations.
Submit tasks for computing explanation of predictions.
:param List[str] scoring_ids: (optional) IDs of the scoring transaction.
:param List[dict] input_rows: (optional) List of scoring transactions.
:param List[str] explanation_types: (optional) Types of explanations to
generate.
:param str subscription_id: (optional) Unique subscription ID.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `PostExplanationTaskResponse` result
"""
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='explanation_tasks_add')
headers.update(sdk_headers)
data = {
'scoring_ids': scoring_ids,
'input_rows': input_rows,
'explanation_types': explanation_types,
'subscription_id': subscription_id
}
data = {k: v for (k, v) in data.items() if v is not None}
data = json.dumps(data)
headers['content-type'] = 'application/json'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/explanation_tasks'
request = self.watson_open_scale.prepare_request(method='POST',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(PostExplanationTaskResponse, 'from_dict'):
response.result = PostExplanationTaskResponse.from_dict(response.result)
return response
def list(self,
subscription_id: str,
*,
offset: int = None,
limit: int = None,
scoring_id: str = None,
status: str = None,
**kwargs
) -> DetailedResponse:
"""
List all explanations.
List of all the computed explanations.
:param str subscription_id: Unique subscription ID.
:param int offset: (optional) offset of the explanations to return.
:param int limit: (optional) Maximum number of explanations to return.
:param str scoring_id: (optional) ID of the scoring transaction.
:param str status: (optional) Status of the explanation task.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `GetExplanationTasksResponse` result
"""
if subscription_id is None:
raise ValueError('subscription_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='explanation_tasks_list')
headers.update(sdk_headers)
params = {
'subscription_id': subscription_id,
'offset': offset,
'limit': limit,
'scoring_id': scoring_id,
'status': status
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/explanation_tasks'
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
if hasattr(GetExplanationTasksResponse, 'from_dict'):
response.result = GetExplanationTasksResponse.from_dict(response.result)
return response
def get(self,
explanation_task_id: str,
*,
subscription_id: str = None,
**kwargs
) -> DetailedResponse:
"""
Get explanation.
Get explanation for the given explanation task id.
:param str explanation_task_id: ID of the explanation task.
:param str subscription_id: (optional) Unique subscription ID.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `GetExplanationTaskResponse` result
"""
if explanation_task_id is None:
raise ValueError('explanation_task_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='explanation_tasks_get')
headers.update(sdk_headers)
params = {
'subscription_id': subscription_id
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/explanation_tasks/{0}'.format(
*self.watson_open_scale.encode_path_vars(explanation_task_id))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
if hasattr(GetExplanationTaskResponse, 'from_dict'):
response.result = GetExplanationTaskResponse.from_dict(response.result)
return response
#########################
# Drift Service
#########################
class DriftService:
"""
Drift Service
"""
def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None:
"""
Construct a DriftService client for the Watson OpenScale service.
:param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service.
"""
self.watson_open_scale = watson_open_scale
def drift_archive_head(self,
monitor_instance_id: str,
**kwargs
) -> DetailedResponse:
"""
Retrieves the Drift archive metadata.
API to retrieve the Drift archive metadata.
:param str monitor_instance_id: Unique monitor instance ID.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
if monitor_instance_id is None:
raise ValueError('monitor_instance_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='drift_archive_head')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
url = '/v2/monitoring_services/drift/monitor_instances/{0}/archives'.format(
*self.watson_open_scale.encode_path_vars(monitor_instance_id))
request = self.watson_open_scale.prepare_request(method='HEAD',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
return response
def drift_archive_post(self,
data_mart_id: str,
subscription_id: str,
body: BinaryIO,
*,
archive_name: str = None,
enable_data_drift: bool = None,
enable_model_drift: bool = None,
**kwargs
) -> DetailedResponse:
"""
Upload Drift archives.
API to upload drift archive such as the Drift Detection Model.
:param str data_mart_id: ID of the data mart.
:param str subscription_id: Unique subscription ID.
:param BinaryIO body:
:param str archive_name: (optional) The name of the archive being uploaded.
:param bool enable_data_drift: (optional) Flag to enable/disable data
drift.
:param bool enable_model_drift: (optional) Flag to enable/disable model
drift.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
if data_mart_id is None:
raise ValueError('data_mart_id must be provided')
if subscription_id is None:
raise ValueError('subscription_id must be provided')
if body is None:
raise ValueError('body must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='drift_archive_post')
headers.update(sdk_headers)
params = {
'archive_name': archive_name,
'enable_data_drift': enable_data_drift,
'enable_model_drift': enable_model_drift
}
data = body
headers['content-type'] = 'application/octet-stream'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
url = '/v2/monitoring_services/drift/data_marts/{0}/subscriptions/{1}/archives'.format(
*self.watson_open_scale.encode_path_vars(data_mart_id, subscription_id))
request = self.watson_open_scale.prepare_request(method='POST',
url=url,
headers=headers,
params=params,
data=data)
response = self.watson_open_scale.send(request)
return response
def drift_archive_get(self,
monitor_instance_id: str,
**kwargs
) -> DetailedResponse:
"""
Retrieves the Drift archives.
API to retrieve the Drift archives.
:param str monitor_instance_id: Unique monitor instance ID.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
if monitor_instance_id is None:
raise ValueError('monitor_instance_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='drift_archive_get')
headers.update(sdk_headers)
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/octet-stream'
url = '/v2/monitoring_services/drift/monitor_instances/{0}/archives'.format(
*self.watson_open_scale.encode_path_vars(monitor_instance_id))
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers)
response = self.watson_open_scale.send(request)
return response
#########################
# Explainability Service
#########################
class ExplainabilityService:
"""
Explainability Service
"""
def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None:
"""
Construct a ExplainabilityService client for the Watson OpenScale service.
:param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service.
"""
self.watson_open_scale = watson_open_scale
def explainability_archive_put(self,
subscription_id: str,
body: BinaryIO,
**kwargs
) -> DetailedResponse:
"""
Upload explainability configuration archive.
API to upload explainability configuration archive containing the explainability
artifacts. The api can also be used to update the archive.
:param str subscription_id: Unique subscription ID.
:param BinaryIO body:
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
if subscription_id is None:
raise ValueError('subscription_id must be provided')
if body is None:
raise ValueError('body must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='explainability_archive_put')
headers.update(sdk_headers)
params = {
'subscription_id': subscription_id
}
data = body
headers['content-type'] = 'application/octet-stream'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
url = '/v2/monitoring_services/explainability/archives'
request = self.watson_open_scale.prepare_request(method='PUT',
url=url,
headers=headers,
params=params,
data=data)
response = self.watson_open_scale.send(request)
return response
def explainability_archive_post(self,
subscription_id: str,
body: BinaryIO,
**kwargs
) -> DetailedResponse:
"""
Upload explainability configuration archive.
API to upload explainability configuration archive containing the explainability
artifacts. The api can also be used to update the archive.
:param str subscription_id: Unique subscription ID.
:param BinaryIO body:
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
if subscription_id is None:
raise ValueError('subscription_id must be provided')
if body is None:
raise ValueError('body must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='explainability_archive_post')
headers.update(sdk_headers)
params = {
'subscription_id': subscription_id
}
data = body
headers['content-type'] = 'application/octet-stream'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
url = '/v2/monitoring_services/explainability/archives'
request = self.watson_open_scale.prepare_request(method='POST',
url=url,
headers=headers,
params=params,
data=data)
response = self.watson_open_scale.send(request)
return response
def explainability_archive_get(self,
subscription_id: str,
**kwargs
) -> DetailedResponse:
"""
Download the Explainability configuration archive.
API to download the Explainability archive.
:param str subscription_id: Unique subscription ID.
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse
"""
if subscription_id is None:
raise ValueError('subscription_id must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='explainability_archive_get')
headers.update(sdk_headers)
params = {
'subscription_id': subscription_id
}
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/octet-stream'
url = '/v2/monitoring_services/explainability/archives'
request = self.watson_open_scale.prepare_request(method='GET',
url=url,
headers=headers,
params=params)
response = self.watson_open_scale.send(request)
return response
#########################
# Schema Utility
#########################
class SchemaUtility:
"""
Schema Utility
"""
def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None:
"""
Construct a SchemaUtility client for the Watson OpenScale service.
:param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service.
"""
self.watson_open_scale = watson_open_scale
def spark_schemas_post(self,
body: Union[str, TextIO],
**kwargs
) -> DetailedResponse:
"""
Parse a given file and eextract the schema in Spark StructType format.
API to consume a file such as CSV and returns the schema in Spark StructType
format.
:param str body:
:param dict headers: A `dict` containing the request headers
:return: A `DetailedResponse` containing the result, headers and HTTP status code.
:rtype: DetailedResponse with `SparkStruct` result
"""
if body is None:
raise ValueError('body must be provided')
headers = {}
sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
service_version='V2',
operation_id='spark_schemas_post')
headers.update(sdk_headers)
data = body
headers['content-type'] = 'text/csv'
if 'headers' in kwargs:
headers.update(kwargs.get('headers'))
headers['Accept'] = 'application/json'
url = '/v2/spark_schemas'
request = self.watson_open_scale.prepare_request(method='POST',
url=url,
headers=headers,
data=data)
response = self.watson_open_scale.send(request)
if hasattr(SparkStruct, 'from_dict'):
response.result = SparkStruct.from_dict(response.result)
return response
class SubscriptionsTablesEnums:
"""
Enums for subscriptions_tables parameters.
"""
class DatasetType(str, Enum):
"""
data set type of subscription.
"""
FEEDBACK = 'feedback'
class DataSetsListEnums:
"""
Enums for data_sets_list parameters.
"""
class TargetTargetType(str, Enum):
"""
type of the target.
"""
SUBSCRIPTION = 'subscription'
BUSINESS_APPLICATION = 'business_application'
INSTANCE = 'instance'
DATA_MART = 'data_mart'
class Type(str, Enum):
"""
type of the data set.
"""
MANUAL_LABELING = 'manual_labeling'
PAYLOAD_LOGGING = 'payload_logging'
FEEDBACK = 'feedback'
BUSINESS_PAYLOAD = 'business_payload'
EXPLANATIONS = 'explanations'
EXPLANATIONS_WHATIF = 'explanations_whatif'
TRAINING = 'training'
PAYLOAD_LOGGING_ERROR = 'payload_logging_error'
CUSTOM = 'custom'
class RecordsAddEnums:
"""
Enums for records_add parameters.
"""
class ContentType(str, Enum):
"""
The type of the input. A character encoding can be specified by including a
`charset` parameter. For example, 'text/csv;charset=utf-8'.
"""
APPLICATION_JSON = 'application/json'
TEXT_CSV = 'text/csv'
class OnError(str, Enum):
"""
expected behaviour on error.
"""
STOP = 'stop'
CONTINUE = 'continue'
class RecordsListEnums:
"""
Enums for records_list parameters.
"""
class Order(str, Enum):
"""
return records in order specified. Currently only `random` is supported. You have
to use one of `start`, `end`, or `filter` param with `order=random`.
"""
RANDOM = 'random'
class Format(str, Enum):
"""
What JSON format to use on output.
"""
DICT = 'dict'
LIST = 'list'
class BinaryFormat(str, Enum):
"""
Binary data presentation format. By default, the binary field value is encoded to
base64 string. If _reference_ is chosen, every binary field is moved to the
_references_ section with value set to an uri to the particular field within the
record that can be GET in a separate request.
"""
REFERENCE = 'reference'
class RecordsGetEnums:
"""
Enums for records_get parameters.
"""
class BinaryFormat(str, Enum):
"""
Binary data presentation format. By default, the binary field value is encoded to
base64 string. If _reference_ is chosen, every binary field is moved to the
_references_ section with value set to an uri to the particular field within the
record that can be GET in a separate request.
"""
REFERENCE = 'reference'
class RecordsUpdateEnums:
"""
Enums for records_update parameters.
"""
class BinaryFormat(str, Enum):
"""
Binary data presentation format. By default, the binary field value is encoded to
base64 string. If _reference_ is chosen, every binary field is moved to the
_references_ section with value set to an uri to the particular field within the
record that can be GET in a separate request.
"""
REFERENCE = 'reference'
class RecordsQueryEnums:
"""
Enums for records_query parameters.
"""
class DataSetType(str, Enum):
"""
a (single) data set type.
"""
MANUAL_LABELING = 'manual_labeling'
PAYLOAD_LOGGING = 'payload_logging'
FEEDBACK = 'feedback'
BUSINESS_PAYLOAD = 'business_payload'
EXPLANATIONS = 'explanations'
EXPLANATIONS_WHATIF = 'explanations_whatif'
TRAINING = 'training'
PAYLOAD_LOGGING_ERROR = 'payload_logging_error'
CUSTOM = 'custom'
class MeasurementsQueryEnums:
"""
Enums for measurements_query parameters.
"""
class TargetType(str, Enum):
"""
Type of the monitoring target (subscription or business application).
"""
SUBSCRIPTION = 'subscription'
BUSINESS_APPLICATION = 'business_application'
INSTANCE = 'instance'
DATA_MART = 'data_mart'
class Format(str, Enum):
"""
Format of the returned data. `full` format compared to `compact` is additive and
contains `sources` part.
"""
COMPACT = 'compact'
FULL = 'full'
class MetricsListEnums:
"""
Enums for metrics_list parameters.
"""
class Agg(str, Enum):
"""
Comma delimited function list constructed from metric name and function, e.g.
`agg=metric_name:count,:last` that defines aggregations.
"""
LAST = 'last'
FIRST = 'first'
MAX = 'max'
MIN = 'min'
SUM = 'sum'
AVG = 'avg'
COUNT = 'count'
STDDEV = 'stddev'
class Interval(str, Enum):
"""
Time unit in which metrics are grouped and aggregated, interval by interval.
"""
MINUTE = 'minute'
HOUR = 'hour'
DAY = 'day'
WEEK = 'week'
MONTH = 'month'
YEAR = 'year'
class IntegratedSystemsListEnums:
"""
Enums for integrated_systems_list parameters.
"""
class Type(str, Enum):
"""
comma-separeted list of type for the integrated system.
"""
OPEN_PAGES = 'open_pages'
SLACK = 'slack'
HIVE = 'hive'
SPARK = 'spark'
JDBC = 'jdbc'
CUSTOM_METRICS_PROVIDER = 'custom_metrics_provider'
WATSON_KNOWLEDGE_CATALOG = 'watson_knowledge_catalog'
class ExplanationTasksListEnums:
"""
Enums for explanation_tasks_list parameters.
"""
class Status(str, Enum):
"""
Status of the explanation task.
"""
IN_PROGRESS = 'in_progress'
FINISHED = 'finished'
ERROR = 'error'
##############################################################################
# Models
##############################################################################
class AnalyticsEngine():
"""
AnalyticsEngine.
:attr str type: Type of analytics engine. e.g. spark.
:attr str integrated_system_id: (optional) id of the Integrated System.
:attr object credentials: (optional) Credentials to override credentials in
integration_reference.
:attr object parameters: (optional) Additional parameters (e.g.
max_num_executors, min_num_executors, executor_cores, executor_memory,
driver_cores, driver_memory).
"""
def __init__(self,
type: str,
*,
integrated_system_id: str = None,
credentials: object = None,
parameters: object = None) -> None:
"""
Initialize a AnalyticsEngine object.
:param str type: Type of analytics engine. e.g. spark.
:param str integrated_system_id: (optional) id of the Integrated System.
:param object credentials: (optional) Credentials to override credentials
in integration_reference.
:param object parameters: (optional) Additional parameters (e.g.
max_num_executors, min_num_executors, executor_cores, executor_memory,
driver_cores, driver_memory).
"""
self.type = type
self.integrated_system_id = integrated_system_id
self.credentials = credentials
self.parameters = parameters
@classmethod
def from_dict(cls, _dict: Dict) -> 'AnalyticsEngine':
"""Initialize a AnalyticsEngine object from a json dictionary."""
args = {}
if 'type' in _dict:
args['type'] = _dict.get('type')
else:
raise ValueError('Required property \'type\' not present in AnalyticsEngine JSON')
if 'integrated_system_id' in _dict:
args['integrated_system_id'] = _dict.get('integrated_system_id')
if 'credentials' in _dict:
args['credentials'] = _dict.get('credentials')
if 'parameters' in _dict:
args['parameters'] = _dict.get('parameters')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a AnalyticsEngine object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'type') and self.type is not None:
_dict['type'] = self.type
if hasattr(self, 'integrated_system_id') and self.integrated_system_id is not None:
_dict['integrated_system_id'] = self.integrated_system_id
if hasattr(self, 'credentials') and self.credentials is not None:
_dict['credentials'] = self.credentials
if hasattr(self, 'parameters') and self.parameters is not None:
_dict['parameters'] = self.parameters
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this AnalyticsEngine object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'AnalyticsEngine') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'AnalyticsEngine') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class ApplicabilitySelection():
"""
ApplicabilitySelection.
:attr List[str] input_data_type: (optional)
:attr List[str] problem_type: (optional)
:attr List[str] target_type: (optional)
"""
def __init__(self,
*,
input_data_type: List[str] = None,
problem_type: List[str] = None,
target_type: List[str] = None) -> None:
"""
Initialize a ApplicabilitySelection object.
:param List[str] input_data_type: (optional)
:param List[str] problem_type: (optional)
:param List[str] target_type: (optional)
"""
self.input_data_type = input_data_type
self.problem_type = problem_type
self.target_type = target_type
@classmethod
def from_dict(cls, _dict: Dict) -> 'ApplicabilitySelection':
"""Initialize a ApplicabilitySelection object from a json dictionary."""
args = {}
if 'input_data_type' in _dict:
args['input_data_type'] = _dict.get('input_data_type')
if 'problem_type' in _dict:
args['problem_type'] = _dict.get('problem_type')
if 'target_type' in _dict:
args['target_type'] = _dict.get('target_type')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a ApplicabilitySelection object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'input_data_type') and self.input_data_type is not None:
_dict['input_data_type'] = self.input_data_type
if hasattr(self, 'problem_type') and self.problem_type is not None:
_dict['problem_type'] = self.problem_type
if hasattr(self, 'target_type') and self.target_type is not None:
_dict['target_type'] = self.target_type
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this ApplicabilitySelection object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'ApplicabilitySelection') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'ApplicabilitySelection') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class InputDataTypeEnum(str, Enum):
"""
input_data_type.
"""
STRUCTURED = 'structured'
UNSTRUCTURED_IMAGE = 'unstructured_image'
UNSTRUCTURED_TEXT = 'unstructured_text'
UNSTRUCTURED_VIDEO = 'unstructured_video'
UNSTRUCTURED_AUDIO = 'unstructured_audio'
class ProblemTypeEnum(str, Enum):
"""
problem_type.
"""
BINARY = 'binary'
REGRESSION = 'regression'
MULTICLASS = 'multiclass'
class TargetTypeEnum(str, Enum):
"""
Type of the target (e.g. subscription, business application, ...).
"""
SUBSCRIPTION = 'subscription'
BUSINESS_APPLICATION = 'business_application'
INSTANCE = 'instance'
DATA_MART = 'data_mart'
[docs]class Asset():
"""
Asset.
:attr str asset_id:
:attr str url: (optional)
:attr str name: (optional)
:attr str asset_type:
:attr str asset_rn: (optional) Asset Resource Name (used for integration with
3rd party ML engines).
:attr str created_at: (optional)
:attr str problem_type: (optional)
:attr str model_type: (optional)
:attr str runtime_environment: (optional)
:attr str input_data_type: (optional)
"""
def __init__(self,
asset_id: str,
asset_type: str,
*,
url: str = None,
name: str = None,
asset_rn: str = None,
created_at: str = None,
problem_type: str = None,
model_type: str = None,
runtime_environment: str = None,
input_data_type: str = None) -> None:
"""
Initialize a Asset object.
:param str asset_id:
:param str asset_type:
:param str url: (optional)
:param str name: (optional)
:param str asset_rn: (optional) Asset Resource Name (used for integration
with 3rd party ML engines).
:param str created_at: (optional)
:param str problem_type: (optional)
:param str model_type: (optional)
:param str runtime_environment: (optional)
:param str input_data_type: (optional)
"""
self.asset_id = asset_id
self.url = url
self.name = name
self.asset_type = asset_type
self.asset_rn = asset_rn
self.created_at = created_at
self.problem_type = problem_type
self.model_type = model_type
self.runtime_environment = runtime_environment
self.input_data_type = input_data_type
@classmethod
def from_dict(cls, _dict: Dict) -> 'Asset':
"""Initialize a Asset object from a json dictionary."""
args = {}
if 'asset_id' in _dict:
args['asset_id'] = _dict.get('asset_id')
else:
raise ValueError('Required property \'asset_id\' not present in Asset JSON')
if 'url' in _dict:
args['url'] = _dict.get('url')
if 'name' in _dict:
args['name'] = _dict.get('name')
if 'asset_type' in _dict:
args['asset_type'] = _dict.get('asset_type')
else:
raise ValueError('Required property \'asset_type\' not present in Asset JSON')
if 'asset_rn' in _dict:
args['asset_rn'] = _dict.get('asset_rn')
if 'created_at' in _dict:
args['created_at'] = _dict.get('created_at')
if 'problem_type' in _dict:
args['problem_type'] = _dict.get('problem_type')
if 'model_type' in _dict:
args['model_type'] = _dict.get('model_type')
if 'runtime_environment' in _dict:
args['runtime_environment'] = _dict.get('runtime_environment')
if 'input_data_type' in _dict:
args['input_data_type'] = _dict.get('input_data_type')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a Asset object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'asset_id') and self.asset_id is not None:
_dict['asset_id'] = self.asset_id
if hasattr(self, 'url') and self.url is not None:
_dict['url'] = self.url
if hasattr(self, 'name') and self.name is not None:
_dict['name'] = self.name
if hasattr(self, 'asset_type') and self.asset_type is not None:
_dict['asset_type'] = self.asset_type
if hasattr(self, 'asset_rn') and self.asset_rn is not None:
_dict['asset_rn'] = self.asset_rn
if hasattr(self, 'created_at') and self.created_at is not None:
_dict['created_at'] = self.created_at
if hasattr(self, 'problem_type') and self.problem_type is not None:
_dict['problem_type'] = self.problem_type
if hasattr(self, 'model_type') and self.model_type is not None:
_dict['model_type'] = self.model_type
if hasattr(self, 'runtime_environment') and self.runtime_environment is not None:
_dict['runtime_environment'] = self.runtime_environment
if hasattr(self, 'input_data_type') and self.input_data_type is not None:
_dict['input_data_type'] = self.input_data_type
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this Asset object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'Asset') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'Asset') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class AssetTypeEnum(str, Enum):
"""
asset_type.
"""
MODEL = 'model'
FUNCTION = 'function'
class ProblemTypeEnum(str, Enum):
"""
problem_type.
"""
BINARY = 'binary'
REGRESSION = 'regression'
MULTICLASS = 'multiclass'
class InputDataTypeEnum(str, Enum):
"""
input_data_type.
"""
STRUCTURED = 'structured'
UNSTRUCTURED_IMAGE = 'unstructured_image'
UNSTRUCTURED_TEXT = 'unstructured_text'
UNSTRUCTURED_VIDEO = 'unstructured_video'
UNSTRUCTURED_AUDIO = 'unstructured_audio'
class AssetDeployment():
"""
AssetDeployment.
:attr str deployment_id: (optional)
:attr str deployment_rn: (optional) Deployment Resource Name (used for
integration with 3rd party ML engines).
:attr str url: (optional)
:attr str name: (optional)
:attr str description: (optional)
:attr str deployment_type: (optional) Deployment type, e.g. online, batch.
:attr str created_at: (optional)
:attr ScoringEndpoint scoring_endpoint: (optional) Definition of scoring
endpoint in custom_machine_learning.
"""
def __init__(self,
*,
deployment_id: str = None,
deployment_rn: str = None,
url: str = None,
name: str = None,
description: str = None,
deployment_type: str = None,
created_at: str = None,
scoring_endpoint: 'ScoringEndpoint' = None) -> None:
"""
Initialize a AssetDeployment object.
:param str deployment_id: (optional)
:param str deployment_rn: (optional) Deployment Resource Name (used for
integration with 3rd party ML engines).
:param str url: (optional)
:param str name: (optional)
:param str description: (optional)
:param str deployment_type: (optional) Deployment type, e.g. online, batch.
:param str created_at: (optional)
:param ScoringEndpoint scoring_endpoint: (optional) Definition of scoring
endpoint in custom_machine_learning.
"""
self.deployment_id = deployment_id
self.deployment_rn = deployment_rn
self.url = url
self.name = name
self.description = description
self.deployment_type = deployment_type
self.created_at = created_at
self.scoring_endpoint = scoring_endpoint
@classmethod
def from_dict(cls, _dict: Dict) -> 'AssetDeployment':
"""Initialize a AssetDeployment object from a json dictionary."""
args = {}
if 'deployment_id' in _dict:
args['deployment_id'] = _dict.get('deployment_id')
if 'deployment_rn' in _dict:
args['deployment_rn'] = _dict.get('deployment_rn')
if 'url' in _dict:
args['url'] = _dict.get('url')
if 'name' in _dict:
args['name'] = _dict.get('name')
if 'description' in _dict:
args['description'] = _dict.get('description')
if 'deployment_type' in _dict:
args['deployment_type'] = _dict.get('deployment_type')
if 'created_at' in _dict:
args['created_at'] = _dict.get('created_at')
if 'scoring_endpoint' in _dict:
args['scoring_endpoint'] = ScoringEndpoint.from_dict(_dict.get('scoring_endpoint'))
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a AssetDeployment object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'deployment_id') and self.deployment_id is not None:
_dict['deployment_id'] = self.deployment_id
if hasattr(self, 'deployment_rn') and self.deployment_rn is not None:
_dict['deployment_rn'] = self.deployment_rn
if hasattr(self, 'url') and self.url is not None:
_dict['url'] = self.url
if hasattr(self, 'name') and self.name is not None:
_dict['name'] = self.name
if hasattr(self, 'description') and self.description is not None:
_dict['description'] = self.description
if hasattr(self, 'deployment_type') and self.deployment_type is not None:
_dict['deployment_type'] = self.deployment_type
if hasattr(self, 'created_at') and self.created_at is not None:
_dict['created_at'] = self.created_at
if hasattr(self, 'scoring_endpoint') and self.scoring_endpoint is not None:
_dict['scoring_endpoint'] = self.scoring_endpoint.to_dict()
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this AssetDeployment object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'AssetDeployment') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'AssetDeployment') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
[docs]class AssetDeploymentRequest():
"""
AssetDeploymentRequest.
:attr str deployment_id:
:attr str deployment_rn: (optional) Deployment Resource Name (used for
integration with 3rd party ML engines).
:attr str url: (optional)
:attr str name:
:attr str description: (optional)
:attr str deployment_type: Deployment type, e.g. online, batch.
:attr str created_at: (optional)
:attr ScoringEndpointRequest scoring_endpoint: (optional) Definition of scoring
endpoint in custom_machine_learning.
"""
def __init__(self,
deployment_id: str,
name: str,
deployment_type: str,
*,
deployment_rn: str = None,
url: str = None,
description: str = None,
created_at: str = None,
scoring_endpoint: 'ScoringEndpointRequest' = None) -> None:
"""
Initialize a AssetDeploymentRequest object.
:param str deployment_id:
:param str name:
:param str deployment_type: Deployment type, e.g. online, batch.
:param str deployment_rn: (optional) Deployment Resource Name (used for
integration with 3rd party ML engines).
:param str url: (optional)
:param str description: (optional)
:param str created_at: (optional)
:param ScoringEndpointRequest scoring_endpoint: (optional) Definition of
scoring endpoint in custom_machine_learning.
"""
self.deployment_id = deployment_id
self.deployment_rn = deployment_rn
self.url = url
self.name = name
self.description = description
self.deployment_type = deployment_type
self.created_at = created_at
self.scoring_endpoint = scoring_endpoint
@classmethod
def from_dict(cls, _dict: Dict) -> 'AssetDeploymentRequest':
"""Initialize a AssetDeploymentRequest object from a json dictionary."""
args = {}
if 'deployment_id' in _dict:
args['deployment_id'] = _dict.get('deployment_id')
else:
raise ValueError('Required property \'deployment_id\' not present in AssetDeploymentRequest JSON')
if 'deployment_rn' in _dict:
args['deployment_rn'] = _dict.get('deployment_rn')
if 'url' in _dict:
args['url'] = _dict.get('url')
if 'name' in _dict:
args['name'] = _dict.get('name')
else:
raise ValueError('Required property \'name\' not present in AssetDeploymentRequest JSON')
if 'description' in _dict:
args['description'] = _dict.get('description')
if 'deployment_type' in _dict:
args['deployment_type'] = _dict.get('deployment_type')
else:
raise ValueError('Required property \'deployment_type\' not present in AssetDeploymentRequest JSON')
if 'created_at' in _dict:
args['created_at'] = _dict.get('created_at')
if 'scoring_endpoint' in _dict:
args['scoring_endpoint'] = ScoringEndpointRequest.from_dict(_dict.get('scoring_endpoint'))
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a AssetDeploymentRequest object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'deployment_id') and self.deployment_id is not None:
_dict['deployment_id'] = self.deployment_id
if hasattr(self, 'deployment_rn') and self.deployment_rn is not None:
_dict['deployment_rn'] = self.deployment_rn
if hasattr(self, 'url') and self.url is not None:
_dict['url'] = self.url
if hasattr(self, 'name') and self.name is not None:
_dict['name'] = self.name
if hasattr(self, 'description') and self.description is not None:
_dict['description'] = self.description
if hasattr(self, 'deployment_type') and self.deployment_type is not None:
_dict['deployment_type'] = self.deployment_type
if hasattr(self, 'created_at') and self.created_at is not None:
_dict['created_at'] = self.created_at
if hasattr(self, 'scoring_endpoint') and self.scoring_endpoint is not None:
_dict['scoring_endpoint'] = self.scoring_endpoint.to_dict()
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this AssetDeploymentRequest object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'AssetDeploymentRequest') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'AssetDeploymentRequest') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class AssetProperties():
"""
Additional asset properties (subject of discovery if not provided when creating the
subscription).
:attr str problem_type: (optional)
:attr str model_type: (optional)
:attr str runtime_environment: (optional)
:attr SecretCleaned training_data_reference: (optional)
:attr SparkStruct training_data_schema: (optional)
:attr SparkStruct input_data_schema: (optional)
:attr SparkStruct output_data_schema: (optional)
:attr str label_column: (optional)
:attr str predicted_target_field: (optional) Field with this name will be given
modeling_role `decoded-target`.
:attr str prediction_field: (optional) Field with this name will be given
modeling_role `prediction`.
:attr str transaction_id_field: (optional) Field with this name will have
`transaction_id_key` metadata set to true.
:attr str input_data_type: (optional)
:attr dict dashboard_configuration: (optional)
:attr List[str] feature_fields: (optional) Fields to be given modeling_role
feature.
:attr List[str] categorical_fields: (optional) Fields to be given metadata
`measure` of value `discrete`.
:attr List[str] probability_fields: (optional) Fields to be given modeling_role
`class_probability` (for columns of double data type) or `probability` (for
column of array data type).
"""
def __init__(self,
*,
problem_type: str = None,
model_type: str = None,
runtime_environment: str = None,
training_data_reference: 'SecretCleaned' = None,
training_data_schema: 'SparkStruct' = None,
input_data_schema: 'SparkStruct' = None,
output_data_schema: 'SparkStruct' = None,
label_column: str = None,
predicted_target_field: str = None,
prediction_field: str = None,
transaction_id_field: str = None,
input_data_type: str = None,
dashboard_configuration: dict = None,
feature_fields: List[str] = None,
categorical_fields: List[str] = None,
probability_fields: List[str] = None) -> None:
"""
Initialize a AssetProperties object.
:param str problem_type: (optional)
:param str model_type: (optional)
:param str runtime_environment: (optional)
:param SecretCleaned training_data_reference: (optional)
:param SparkStruct training_data_schema: (optional)
:param SparkStruct input_data_schema: (optional)
:param SparkStruct output_data_schema: (optional)
:param str label_column: (optional)
:param str predicted_target_field: (optional) Field with this name will be
given modeling_role `decoded-target`.
:param str prediction_field: (optional) Field with this name will be given
modeling_role `prediction`.
:param str transaction_id_field: (optional) Field with this name will have
`transaction_id_key` metadata set to true.
:param str input_data_type: (optional)
:param dict dashboard_configuration: (optional)
:param List[str] feature_fields: (optional) Fields to be given
modeling_role feature.
:param List[str] categorical_fields: (optional) Fields to be given metadata
`measure` of value `discrete`.
:param List[str] probability_fields: (optional) Fields to be given
modeling_role `class_probability` (for columns of double data type) or
`probability` (for column of array data type).
"""
self.problem_type = problem_type
self.model_type = model_type
self.runtime_environment = runtime_environment
self.training_data_reference = training_data_reference
self.training_data_schema = training_data_schema
self.input_data_schema = input_data_schema
self.output_data_schema = output_data_schema
self.label_column = label_column
self.predicted_target_field = predicted_target_field
self.prediction_field = prediction_field
self.transaction_id_field = transaction_id_field
self.input_data_type = input_data_type
self.dashboard_configuration = dashboard_configuration
self.feature_fields = feature_fields
self.categorical_fields = categorical_fields
self.probability_fields = probability_fields
@classmethod
def from_dict(cls, _dict: Dict) -> 'AssetProperties':
"""Initialize a AssetProperties object from a json dictionary."""
args = {}
if 'problem_type' in _dict:
args['problem_type'] = _dict.get('problem_type')
if 'model_type' in _dict:
args['model_type'] = _dict.get('model_type')
if 'runtime_environment' in _dict:
args['runtime_environment'] = _dict.get('runtime_environment')
if 'training_data_reference' in _dict:
args['training_data_reference'] = SecretCleaned.from_dict(_dict.get('training_data_reference'))
if 'training_data_schema' in _dict:
args['training_data_schema'] = SparkStruct.from_dict(_dict.get('training_data_schema'))
if 'input_data_schema' in _dict:
args['input_data_schema'] = SparkStruct.from_dict(_dict.get('input_data_schema'))
if 'output_data_schema' in _dict:
args['output_data_schema'] = SparkStruct.from_dict(_dict.get('output_data_schema'))
if 'label_column' in _dict:
args['label_column'] = _dict.get('label_column')
if 'predicted_target_field' in _dict:
args['predicted_target_field'] = _dict.get('predicted_target_field')
if 'prediction_field' in _dict:
args['prediction_field'] = _dict.get('prediction_field')
if 'transaction_id_field' in _dict:
args['transaction_id_field'] = _dict.get('transaction_id_field')
if 'input_data_type' in _dict:
args['input_data_type'] = _dict.get('input_data_type')
if 'dashboard_configuration' in _dict:
args['dashboard_configuration'] = _dict.get('dashboard_configuration')
if 'feature_fields' in _dict:
args['feature_fields'] = _dict.get('feature_fields')
if 'categorical_fields' in _dict:
args['categorical_fields'] = _dict.get('categorical_fields')
if 'probability_fields' in _dict:
args['probability_fields'] = _dict.get('probability_fields')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a AssetProperties object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'problem_type') and self.problem_type is not None:
_dict['problem_type'] = self.problem_type
if hasattr(self, 'model_type') and self.model_type is not None:
_dict['model_type'] = self.model_type
if hasattr(self, 'runtime_environment') and self.runtime_environment is not None:
_dict['runtime_environment'] = self.runtime_environment
if hasattr(self, 'training_data_reference') and self.training_data_reference is not None:
_dict['training_data_reference'] = self.training_data_reference.to_dict()
if hasattr(self, 'training_data_schema') and self.training_data_schema is not None:
_dict['training_data_schema'] = self.training_data_schema.to_dict()
if hasattr(self, 'input_data_schema') and self.input_data_schema is not None:
_dict['input_data_schema'] = self.input_data_schema.to_dict()
if hasattr(self, 'output_data_schema') and self.output_data_schema is not None:
_dict['output_data_schema'] = self.output_data_schema.to_dict()
if hasattr(self, 'label_column') and self.label_column is not None:
_dict['label_column'] = self.label_column
if hasattr(self, 'predicted_target_field') and self.predicted_target_field is not None:
_dict['predicted_target_field'] = self.predicted_target_field
if hasattr(self, 'prediction_field') and self.prediction_field is not None:
_dict['prediction_field'] = self.prediction_field
if hasattr(self, 'transaction_id_field') and self.transaction_id_field is not None:
_dict['transaction_id_field'] = self.transaction_id_field
if hasattr(self, 'input_data_type') and self.input_data_type is not None:
_dict['input_data_type'] = self.input_data_type
if hasattr(self, 'dashboard_configuration') and self.dashboard_configuration is not None:
_dict['dashboard_configuration'] = self.dashboard_configuration
if hasattr(self, 'feature_fields') and self.feature_fields is not None:
_dict['feature_fields'] = self.feature_fields
if hasattr(self, 'categorical_fields') and self.categorical_fields is not None:
_dict['categorical_fields'] = self.categorical_fields
if hasattr(self, 'probability_fields') and self.probability_fields is not None:
_dict['probability_fields'] = self.probability_fields
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this AssetProperties object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'AssetProperties') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'AssetProperties') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class ProblemTypeEnum(str, Enum):
"""
problem_type.
"""
BINARY = 'binary'
REGRESSION = 'regression'
MULTICLASS = 'multiclass'
class InputDataTypeEnum(str, Enum):
"""
input_data_type.
"""
STRUCTURED = 'structured'
UNSTRUCTURED_IMAGE = 'unstructured_image'
UNSTRUCTURED_TEXT = 'unstructured_text'
UNSTRUCTURED_VIDEO = 'unstructured_video'
UNSTRUCTURED_AUDIO = 'unstructured_audio'
[docs]class AssetPropertiesRequest():
"""
Additional asset properties (subject of discovery if not provided when creating the
subscription).
:attr TrainingDataReference training_data_reference: (optional)
:attr SparkStruct training_data_schema: (optional)
:attr SparkStruct input_data_schema: (optional)
:attr SparkStruct output_data_schema: (optional)
:attr str label_column: (optional)
:attr List[str] labels: (optional)
:attr str predicted_target_field: (optional) Field with this name will be given
modeling_role `decoded-target`.
:attr str prediction_field: (optional) Field with this name will be given
modeling_role `prediction`.
:attr str transaction_id_field: (optional) Field with this name will have
`transaction_id_key` metadata set to true.
:attr dict dashboard_configuration: (optional)
:attr List[str] feature_fields: (optional) Fields to be given modeling_role
feature.
:attr List[str] categorical_fields: (optional) Fields to be given metadata
`measure` of value `discrete`.
:attr List[str] probability_fields: (optional) Fields to be given modeling_role
`class_probability` (for columns of double data type) or `probability` (for
column of array data type).
"""
def __init__(self,
*,
training_data_reference: 'TrainingDataReference' = None,
training_data_schema: 'SparkStruct' = None,
input_data_schema: 'SparkStruct' = None,
output_data_schema: 'SparkStruct' = None,
label_column: str = None,
labels: List[str] = None,
predicted_target_field: str = None,
prediction_field: str = None,
transaction_id_field: str = None,
dashboard_configuration: dict = None,
feature_fields: List[str] = None,
categorical_fields: List[str] = None,
probability_fields: List[str] = None) -> None:
"""
Initialize a AssetPropertiesRequest object.
:param TrainingDataReference training_data_reference: (optional)
:param SparkStruct training_data_schema: (optional)
:param SparkStruct input_data_schema: (optional)
:param SparkStruct output_data_schema: (optional)
:param str label_column: (optional)
:param List[str] labels: (optional)
:param str predicted_target_field: (optional) Field with this name will be
given modeling_role `decoded-target`.
:param str prediction_field: (optional) Field with this name will be given
modeling_role `prediction`.
:param str transaction_id_field: (optional) Field with this name will have
`transaction_id_key` metadata set to true.
:param dict dashboard_configuration: (optional)
:param List[str] feature_fields: (optional) Fields to be given
modeling_role feature.
:param List[str] categorical_fields: (optional) Fields to be given metadata
`measure` of value `discrete`.
:param List[str] probability_fields: (optional) Fields to be given
modeling_role `class_probability` (for columns of double data type) or
`probability` (for column of array data type).
"""
self.training_data_reference = training_data_reference
self.training_data_schema = training_data_schema
self.input_data_schema = input_data_schema
self.output_data_schema = output_data_schema
self.label_column = label_column
self.labels = labels
self.predicted_target_field = predicted_target_field
self.prediction_field = prediction_field
self.transaction_id_field = transaction_id_field
self.dashboard_configuration = dashboard_configuration
self.feature_fields = feature_fields
self.categorical_fields = categorical_fields
self.probability_fields = probability_fields
@classmethod
def from_dict(cls, _dict: Dict) -> 'AssetPropertiesRequest':
"""Initialize a AssetPropertiesRequest object from a json dictionary."""
args = {}
if 'training_data_reference' in _dict:
args['training_data_reference'] = TrainingDataReference.from_dict(_dict.get('training_data_reference'))
if 'training_data_schema' in _dict:
args['training_data_schema'] = SparkStruct.from_dict(_dict.get('training_data_schema'))
if 'input_data_schema' in _dict:
args['input_data_schema'] = SparkStruct.from_dict(_dict.get('input_data_schema'))
if 'output_data_schema' in _dict:
args['output_data_schema'] = SparkStruct.from_dict(_dict.get('output_data_schema'))
if 'label_column' in _dict:
args['label_column'] = _dict.get('label_column')
if 'labels' in _dict:
args['labels'] = _dict.get('labels')
if 'predicted_target_field' in _dict:
args['predicted_target_field'] = _dict.get('predicted_target_field')
if 'prediction_field' in _dict:
args['prediction_field'] = _dict.get('prediction_field')
if 'transaction_id_field' in _dict:
args['transaction_id_field'] = _dict.get('transaction_id_field')
if 'dashboard_configuration' in _dict:
args['dashboard_configuration'] = _dict.get('dashboard_configuration')
if 'feature_fields' in _dict:
args['feature_fields'] = _dict.get('feature_fields')
if 'categorical_fields' in _dict:
args['categorical_fields'] = _dict.get('categorical_fields')
if 'probability_fields' in _dict:
args['probability_fields'] = _dict.get('probability_fields')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a AssetPropertiesRequest object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'training_data_reference') and self.training_data_reference is not None:
_dict['training_data_reference'] = self.training_data_reference.to_dict()
if hasattr(self, 'training_data_schema') and self.training_data_schema is not None:
_dict['training_data_schema'] = self.training_data_schema.to_dict()
if hasattr(self, 'input_data_schema') and self.input_data_schema is not None:
_dict['input_data_schema'] = self.input_data_schema.to_dict()
if hasattr(self, 'output_data_schema') and self.output_data_schema is not None:
_dict['output_data_schema'] = self.output_data_schema.to_dict()
if hasattr(self, 'label_column') and self.label_column is not None:
_dict['label_column'] = self.label_column
if hasattr(self, 'labels') and self.labels is not None:
_dict['labels'] = self.labels
if hasattr(self, 'predicted_target_field') and self.predicted_target_field is not None:
_dict['predicted_target_field'] = self.predicted_target_field
if hasattr(self, 'prediction_field') and self.prediction_field is not None:
_dict['prediction_field'] = self.prediction_field
if hasattr(self, 'transaction_id_field') and self.transaction_id_field is not None:
_dict['transaction_id_field'] = self.transaction_id_field
if hasattr(self, 'dashboard_configuration') and self.dashboard_configuration is not None:
_dict['dashboard_configuration'] = self.dashboard_configuration
if hasattr(self, 'feature_fields') and self.feature_fields is not None:
_dict['feature_fields'] = self.feature_fields
if hasattr(self, 'categorical_fields') and self.categorical_fields is not None:
_dict['categorical_fields'] = self.categorical_fields
if hasattr(self, 'probability_fields') and self.probability_fields is not None:
_dict['probability_fields'] = self.probability_fields
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this AssetPropertiesRequest object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'AssetPropertiesRequest') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'AssetPropertiesRequest') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class AzureWorkspaceCredentials():
"""
AzureWorkspaceCredentials.
:attr str workspace_id:
:attr str token:
"""
def __init__(self,
workspace_id: str,
token: str) -> None:
"""
Initialize a AzureWorkspaceCredentials object.
:param str workspace_id:
:param str token:
"""
self.workspace_id = workspace_id
self.token = token
@classmethod
def from_dict(cls, _dict: Dict) -> 'AzureWorkspaceCredentials':
"""Initialize a AzureWorkspaceCredentials object from a json dictionary."""
args = {}
if 'workspace_id' in _dict:
args['workspace_id'] = _dict.get('workspace_id')
else:
raise ValueError('Required property \'workspace_id\' not present in AzureWorkspaceCredentials JSON')
if 'token' in _dict:
args['token'] = _dict.get('token')
else:
raise ValueError('Required property \'token\' not present in AzureWorkspaceCredentials JSON')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a AzureWorkspaceCredentials object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'workspace_id') and self.workspace_id is not None:
_dict['workspace_id'] = self.workspace_id
if hasattr(self, 'token') and self.token is not None:
_dict['token'] = self.token
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this AzureWorkspaceCredentials object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'AzureWorkspaceCredentials') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'AzureWorkspaceCredentials') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class CollectionUrlModel():
"""
CollectionUrlModel.
:attr str url: URI of a resource.
"""
def __init__(self,
url: str) -> None:
"""
Initialize a CollectionUrlModel object.
:param str url: URI of a resource.
"""
self.url = url
@classmethod
def from_dict(cls, _dict: Dict) -> 'CollectionUrlModel':
"""Initialize a CollectionUrlModel object from a json dictionary."""
args = {}
if 'url' in _dict:
args['url'] = _dict.get('url')
else:
raise ValueError('Required property \'url\' not present in CollectionUrlModel JSON')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a CollectionUrlModel object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'url') and self.url is not None:
_dict['url'] = self.url
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this CollectionUrlModel object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'CollectionUrlModel') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'CollectionUrlModel') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DataDistributionResponse():
"""
Data distribution details response.
:attr Metadata metadata:
:attr DataDistributionResponseEntity entity: The computed data distribution
against specified data set.
"""
def __init__(self,
metadata: 'Metadata',
entity: 'DataDistributionResponseEntity') -> None:
"""
Initialize a DataDistributionResponse object.
:param Metadata metadata:
:param DataDistributionResponseEntity entity: The computed data
distribution against specified data set.
"""
self.metadata = metadata
self.entity = entity
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataDistributionResponse':
"""Initialize a DataDistributionResponse object from a json dictionary."""
args = {}
if 'metadata' in _dict:
args['metadata'] = Metadata.from_dict(_dict.get('metadata'))
else:
raise ValueError('Required property \'metadata\' not present in DataDistributionResponse JSON')
if 'entity' in _dict:
args['entity'] = DataDistributionResponseEntity.from_dict(_dict.get('entity'))
else:
raise ValueError('Required property \'entity\' not present in DataDistributionResponse JSON')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataDistributionResponse object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'metadata') and self.metadata is not None:
_dict['metadata'] = self.metadata.to_dict()
if hasattr(self, 'entity') and self.entity is not None:
_dict['entity'] = self.entity.to_dict()
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataDistributionResponse object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataDistributionResponse') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataDistributionResponse') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DataDistributionResponseEntity():
"""
The computed data distribution against specified data set.
:attr str start: start datetime in ISO format.
:attr str end: end datetime in ISO format.
:attr str dataset: type of a data set.
:attr float limit: (optional) limit for number of rows, by default it is 50,000
(max possible limit is 50,000).
:attr List[str] group: names of columns to be grouped.
:attr str filter: (optional) Filters defined by user in format:
{field_name}:{op}:{value}. Partly compatible with filters in "filter" parameter
of GET /v2/data_sets/{data_set_id}/records.
Possible filter operators:
* eq - equals (numeric, string)
* gt - greater than (numeric)
* gte - greater than or equal (numeric)
* lt - lower than (numeric)
* lte - lower than or equal (numeric)
* in - value in a set (numeric, string)
* field:null (a no-argument filter) - value is null (any nullable)
* field:exists (a no-argument filter) - value is not null (any column).
:attr List[str] agg: (optional) Definition of aggregations, by default 'count'.
Aggregations can be one of:
* count
* <column_name>:sum
* <column_name>:min
* <column_name>:max
* <column_name>:avg
* <column_name>:stddev.
:attr float max_bins: (optional) max number of bins which will be generated for
data.
:attr MonitoringRunStatus status: (optional) The status information for the
monitoring run.
:attr float processed_records: (optional) number of processed records.
:attr bool limited_data: (optional) was the limit used on data.
:attr DataDistributionResponseEntityDistribution distribution: (optional)
"""
def __init__(self,
start: str,
end: str,
dataset: str,
group: List[str],
*,
limit: float = None,
filter: str = None,
agg: List[str] = None,
max_bins: float = None,
status: 'MonitoringRunStatus' = None,
processed_records: float = None,
limited_data: bool = None,
distribution: 'DataDistributionResponseEntityDistribution' = None) -> None:
"""
Initialize a DataDistributionResponseEntity object.
:param str start: start datetime in ISO format.
:param str end: end datetime in ISO format.
:param str dataset: type of a data set.
:param List[str] group: names of columns to be grouped.
:param float limit: (optional) limit for number of rows, by default it is
50,000 (max possible limit is 50,000).
:param str filter: (optional) Filters defined by user in format:
{field_name}:{op}:{value}. Partly compatible with filters in "filter"
parameter of GET /v2/data_sets/{data_set_id}/records.
Possible filter operators:
* eq - equals (numeric, string)
* gt - greater than (numeric)
* gte - greater than or equal (numeric)
* lt - lower than (numeric)
* lte - lower than or equal (numeric)
* in - value in a set (numeric, string)
* field:null (a no-argument filter) - value is null (any nullable)
* field:exists (a no-argument filter) - value is not null (any column).
:param List[str] agg: (optional) Definition of aggregations, by default
'count'.
Aggregations can be one of:
* count
* <column_name>:sum
* <column_name>:min
* <column_name>:max
* <column_name>:avg
* <column_name>:stddev.
:param float max_bins: (optional) max number of bins which will be
generated for data.
:param MonitoringRunStatus status: (optional) The status information for
the monitoring run.
:param float processed_records: (optional) number of processed records.
:param bool limited_data: (optional) was the limit used on data.
:param DataDistributionResponseEntityDistribution distribution: (optional)
"""
self.start = start
self.end = end
self.dataset = dataset
self.limit = limit
self.group = group
self.filter = filter
self.agg = agg
self.max_bins = max_bins
self.status = status
self.processed_records = processed_records
self.limited_data = limited_data
self.distribution = distribution
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataDistributionResponseEntity':
"""Initialize a DataDistributionResponseEntity object from a json dictionary."""
args = {}
if 'start' in _dict:
args['start'] = _dict.get('start')
else:
raise ValueError('Required property \'start\' not present in DataDistributionResponseEntity JSON')
if 'end' in _dict:
args['end'] = _dict.get('end')
else:
raise ValueError('Required property \'end\' not present in DataDistributionResponseEntity JSON')
if 'dataset' in _dict:
args['dataset'] = _dict.get('dataset')
else:
raise ValueError('Required property \'dataset\' not present in DataDistributionResponseEntity JSON')
if 'limit' in _dict:
args['limit'] = _dict.get('limit')
if 'group' in _dict:
args['group'] = _dict.get('group')
else:
raise ValueError('Required property \'group\' not present in DataDistributionResponseEntity JSON')
if 'filter' in _dict:
args['filter'] = _dict.get('filter')
if 'agg' in _dict:
args['agg'] = _dict.get('agg')
if 'max_bins' in _dict:
args['max_bins'] = _dict.get('max_bins')
if 'status' in _dict:
args['status'] = MonitoringRunStatus.from_dict(_dict.get('status'))
if 'processed_records' in _dict:
args['processed_records'] = _dict.get('processed_records')
if 'limited_data' in _dict:
args['limited_data'] = _dict.get('limited_data')
if 'distribution' in _dict:
args['distribution'] = DataDistributionResponseEntityDistribution.from_dict(_dict.get('distribution'))
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataDistributionResponseEntity object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'start') and self.start is not None:
_dict['start'] = self.start
if hasattr(self, 'end') and self.end is not None:
_dict['end'] = self.end
if hasattr(self, 'dataset') and self.dataset is not None:
_dict['dataset'] = self.dataset
if hasattr(self, 'limit') and self.limit is not None:
_dict['limit'] = self.limit
if hasattr(self, 'group') and self.group is not None:
_dict['group'] = self.group
if hasattr(self, 'filter') and self.filter is not None:
_dict['filter'] = self.filter
if hasattr(self, 'agg') and self.agg is not None:
_dict['agg'] = self.agg
if hasattr(self, 'max_bins') and self.max_bins is not None:
_dict['max_bins'] = self.max_bins
if hasattr(self, 'status') and self.status is not None:
_dict['status'] = self.status.to_dict()
if hasattr(self, 'processed_records') and self.processed_records is not None:
_dict['processed_records'] = self.processed_records
if hasattr(self, 'limited_data') and self.limited_data is not None:
_dict['limited_data'] = self.limited_data
if hasattr(self, 'distribution') and self.distribution is not None:
_dict['distribution'] = self.distribution.to_dict()
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataDistributionResponseEntity object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataDistributionResponseEntity') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataDistributionResponseEntity') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DatasetEnum(str, Enum):
"""
type of a data set.
"""
MANUAL_LABELING = 'manual_labeling'
PAYLOAD_LOGGING = 'payload_logging'
FEEDBACK = 'feedback'
BUSINESS_PAYLOAD = 'business_payload'
EXPLANATIONS = 'explanations'
EXPLANATIONS_WHATIF = 'explanations_whatif'
TRAINING = 'training'
PAYLOAD_LOGGING_ERROR = 'payload_logging_error'
CUSTOM = 'custom'
class DataDistributionResponseEntityDistribution():
"""
DataDistributionResponseEntityDistribution.
:attr List[str] fields: names of the data distribution fields.
:attr List[object] values: data distribution rows.
"""
def __init__(self,
fields: List[str],
values: List[object]) -> None:
"""
Initialize a DataDistributionResponseEntityDistribution object.
:param List[str] fields: names of the data distribution fields.
:param List[object] values: data distribution rows.
"""
self.fields = fields
self.values = values
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataDistributionResponseEntityDistribution':
"""Initialize a DataDistributionResponseEntityDistribution object from a json dictionary."""
args = {}
if 'fields' in _dict:
args['fields'] = _dict.get('fields')
else:
raise ValueError('Required property \'fields\' not present in DataDistributionResponseEntityDistribution JSON')
if 'values' in _dict:
args['values'] = _dict.get('values')
else:
raise ValueError('Required property \'values\' not present in DataDistributionResponseEntityDistribution JSON')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataDistributionResponseEntityDistribution object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'fields') and self.fields is not None:
_dict['fields'] = self.fields
if hasattr(self, 'values') and self.values is not None:
_dict['values'] = self.values
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataDistributionResponseEntityDistribution object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataDistributionResponseEntityDistribution') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataDistributionResponseEntityDistribution') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DataMartDatabaseResponse():
"""
DataMartDatabaseResponse.
:attr Metadata metadata:
:attr DataMartDatabaseResponseEntity entity:
"""
def __init__(self,
metadata: 'Metadata',
entity: 'DataMartDatabaseResponseEntity') -> None:
"""
Initialize a DataMartDatabaseResponse object.
:param Metadata metadata:
:param DataMartDatabaseResponseEntity entity:
"""
self.metadata = metadata
self.entity = entity
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataMartDatabaseResponse':
"""Initialize a DataMartDatabaseResponse object from a json dictionary."""
args = {}
if 'metadata' in _dict:
args['metadata'] = Metadata.from_dict(_dict.get('metadata'))
else:
raise ValueError('Required property \'metadata\' not present in DataMartDatabaseResponse JSON')
if 'entity' in _dict:
args['entity'] = DataMartDatabaseResponseEntity.from_dict(_dict.get('entity'))
else:
raise ValueError('Required property \'entity\' not present in DataMartDatabaseResponse JSON')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataMartDatabaseResponse object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'metadata') and self.metadata is not None:
_dict['metadata'] = self.metadata.to_dict()
if hasattr(self, 'entity') and self.entity is not None:
_dict['entity'] = self.entity.to_dict()
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataMartDatabaseResponse object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataMartDatabaseResponse') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataMartDatabaseResponse') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DataMartDatabaseResponseCollection():
"""
DataMartDatabaseResponseCollection.
:attr List[DataMartDatabaseResponse] data_marts:
"""
def __init__(self,
data_marts: List['DataMartDatabaseResponse']) -> None:
"""
Initialize a DataMartDatabaseResponseCollection object.
:param List[DataMartDatabaseResponse] data_marts:
"""
self.data_marts = data_marts
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataMartDatabaseResponseCollection':
"""Initialize a DataMartDatabaseResponseCollection object from a json dictionary."""
args = {}
if 'data_marts' in _dict:
args['data_marts'] = [DataMartDatabaseResponse.from_dict(x) for x in _dict.get('data_marts')]
else:
raise ValueError('Required property \'data_marts\' not present in DataMartDatabaseResponseCollection JSON')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataMartDatabaseResponseCollection object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'data_marts') and self.data_marts is not None:
_dict['data_marts'] = [x.to_dict() for x in self.data_marts]
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataMartDatabaseResponseCollection object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataMartDatabaseResponseCollection') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataMartDatabaseResponseCollection') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DataMartDatabaseResponseEntity():
"""
DataMartDatabaseResponseEntity.
:attr str name: (optional)
:attr str description: (optional)
:attr str service_instance_crn: (optional)
:attr bool internal_database: (optional) If `true` the internal database managed
by AI OpenScale is provided for the user.
:attr DatabaseConfiguration database_configuration: (optional) Database
configuration ignored if internal database is requested (`internal_database` is
`true`).
:attr str database_discovery: (optional) Used by UI to check if database
discovery was automatic or manual.
:attr Status status: (optional)
"""
def __init__(self,
*,
name: str = None,
description: str = None,
service_instance_crn: str = None,
internal_database: bool = None,
database_configuration: 'DatabaseConfiguration' = None,
database_discovery: str = None,
status: 'Status' = None) -> None:
"""
Initialize a DataMartDatabaseResponseEntity object.
:param str name: (optional)
:param str description: (optional)
:param str service_instance_crn: (optional)
:param bool internal_database: (optional) If `true` the internal database
managed by AI OpenScale is provided for the user.
:param DatabaseConfiguration database_configuration: (optional) Database
configuration ignored if internal database is requested
(`internal_database` is `true`).
:param str database_discovery: (optional) Used by UI to check if database
discovery was automatic or manual.
:param Status status: (optional)
"""
self.name = name
self.description = description
self.service_instance_crn = service_instance_crn
self.internal_database = internal_database
self.database_configuration = database_configuration
self.database_discovery = database_discovery
self.status = status
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataMartDatabaseResponseEntity':
"""Initialize a DataMartDatabaseResponseEntity object from a json dictionary."""
args = {}
if 'name' in _dict:
args['name'] = _dict.get('name')
if 'description' in _dict:
args['description'] = _dict.get('description')
if 'service_instance_crn' in _dict:
args['service_instance_crn'] = _dict.get('service_instance_crn')
if 'internal_database' in _dict:
args['internal_database'] = _dict.get('internal_database')
if 'database_configuration' in _dict:
args['database_configuration'] = DatabaseConfiguration.from_dict(_dict.get('database_configuration'))
if 'database_discovery' in _dict:
args['database_discovery'] = _dict.get('database_discovery')
if 'status' in _dict:
args['status'] = Status.from_dict(_dict.get('status'))
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataMartDatabaseResponseEntity object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'name') and self.name is not None:
_dict['name'] = self.name
if hasattr(self, 'description') and self.description is not None:
_dict['description'] = self.description
if hasattr(self, 'service_instance_crn') and self.service_instance_crn is not None:
_dict['service_instance_crn'] = self.service_instance_crn
if hasattr(self, 'internal_database') and self.internal_database is not None:
_dict['internal_database'] = self.internal_database
if hasattr(self, 'database_configuration') and self.database_configuration is not None:
_dict['database_configuration'] = self.database_configuration.to_dict()
if hasattr(self, 'database_discovery') and self.database_discovery is not None:
_dict['database_discovery'] = self.database_discovery
if hasattr(self, 'status') and self.status is not None:
_dict['status'] = self.status.to_dict()
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataMartDatabaseResponseEntity object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataMartDatabaseResponseEntity') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataMartDatabaseResponseEntity') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DatabaseDiscoveryEnum(str, Enum):
"""
Used by UI to check if database discovery was automatic or manual.
"""
AUTOMATIC = 'automatic'
MANUAL = 'manual'
class DataMartGetMonitorInstanceMetrics():
"""
DataMartGetMonitorInstanceMetrics.
:attr datetime start: (optional) Floored to full interval.
:attr datetime end: (optional) Ceiled to full interval.
:attr str interval: (optional)
:attr str monitor_definition_id: (optional)
:attr List[DataMartGetMonitorInstanceMetricsGroupsItem] groups: (optional)
"""
def __init__(self,
*,
start: datetime = None,
end: datetime = None,
interval: str = None,
monitor_definition_id: str = None,
groups: List['DataMartGetMonitorInstanceMetricsGroupsItem'] = None) -> None:
"""
Initialize a DataMartGetMonitorInstanceMetrics object.
:param datetime start: (optional) Floored to full interval.
:param datetime end: (optional) Ceiled to full interval.
:param str interval: (optional)
:param str monitor_definition_id: (optional)
:param List[DataMartGetMonitorInstanceMetricsGroupsItem] groups: (optional)
"""
self.start = start
self.end = end
self.interval = interval
self.monitor_definition_id = monitor_definition_id
self.groups = groups
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataMartGetMonitorInstanceMetrics':
"""Initialize a DataMartGetMonitorInstanceMetrics object from a json dictionary."""
args = {}
if 'start' in _dict:
args['start'] = string_to_datetime(_dict.get('start'))
if 'end' in _dict:
args['end'] = string_to_datetime(_dict.get('end'))
if 'interval' in _dict:
args['interval'] = _dict.get('interval')
if 'monitor_definition_id' in _dict:
args['monitor_definition_id'] = _dict.get('monitor_definition_id')
if 'groups' in _dict:
args['groups'] = [DataMartGetMonitorInstanceMetricsGroupsItem.from_dict(x) for x in _dict.get('groups')]
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataMartGetMonitorInstanceMetrics object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'start') and self.start is not None:
_dict['start'] = datetime_to_string(self.start)
if hasattr(self, 'end') and self.end is not None:
_dict['end'] = datetime_to_string(self.end)
if hasattr(self, 'interval') and self.interval is not None:
_dict['interval'] = self.interval
if hasattr(self, 'monitor_definition_id') and self.monitor_definition_id is not None:
_dict['monitor_definition_id'] = self.monitor_definition_id
if hasattr(self, 'groups') and self.groups is not None:
_dict['groups'] = [x.to_dict() for x in self.groups]
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataMartGetMonitorInstanceMetrics object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataMartGetMonitorInstanceMetrics') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataMartGetMonitorInstanceMetrics') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DataMartGetMonitorInstanceMetricsGroupsItem():
"""
DataMartGetMonitorInstanceMetricsGroupsItem.
:attr List[DataMartGetMonitorInstanceMetricsGroupsItemTagsItem] tags: (optional)
:attr List[DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem] metrics:
(optional)
"""
def __init__(self,
*,
tags: List['DataMartGetMonitorInstanceMetricsGroupsItemTagsItem'] = None,
metrics: List['DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem'] = None) -> None:
"""
Initialize a DataMartGetMonitorInstanceMetricsGroupsItem object.
:param List[DataMartGetMonitorInstanceMetricsGroupsItemTagsItem] tags:
(optional)
:param List[DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem]
metrics: (optional)
"""
self.tags = tags
self.metrics = metrics
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataMartGetMonitorInstanceMetricsGroupsItem':
"""Initialize a DataMartGetMonitorInstanceMetricsGroupsItem object from a json dictionary."""
args = {}
if 'tags' in _dict:
args['tags'] = [DataMartGetMonitorInstanceMetricsGroupsItemTagsItem.from_dict(x) for x in _dict.get('tags')]
if 'metrics' in _dict:
args['metrics'] = [DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem.from_dict(x) for x in _dict.get('metrics')]
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataMartGetMonitorInstanceMetricsGroupsItem object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'tags') and self.tags is not None:
_dict['tags'] = [x.to_dict() for x in self.tags]
if hasattr(self, 'metrics') and self.metrics is not None:
_dict['metrics'] = [x.to_dict() for x in self.metrics]
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataMartGetMonitorInstanceMetricsGroupsItem object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataMartGetMonitorInstanceMetricsGroupsItem') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataMartGetMonitorInstanceMetricsGroupsItem') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem():
"""
DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem.
:attr str id:
:attr float lower_limit: (optional)
:attr float upper_limit: (optional)
:attr DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast last:
(optional)
"""
def __init__(self,
id: str,
*,
lower_limit: float = None,
upper_limit: float = None,
last: 'DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast' = None) -> None:
"""
Initialize a DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem object.
:param str id:
:param float lower_limit: (optional)
:param float upper_limit: (optional)
:param DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast last:
(optional)
"""
self.id = id
self.lower_limit = lower_limit
self.upper_limit = upper_limit
self.last = last
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem':
"""Initialize a DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem object from a json dictionary."""
args = {}
if 'id' in _dict:
args['id'] = _dict.get('id')
else:
raise ValueError('Required property \'id\' not present in DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem JSON')
if 'lower_limit' in _dict:
args['lower_limit'] = _dict.get('lower_limit')
if 'upper_limit' in _dict:
args['upper_limit'] = _dict.get('upper_limit')
if 'last' in _dict:
args['last'] = DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast.from_dict(_dict.get('last'))
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'id') and self.id is not None:
_dict['id'] = self.id
if hasattr(self, 'lower_limit') and self.lower_limit is not None:
_dict['lower_limit'] = self.lower_limit
if hasattr(self, 'upper_limit') and self.upper_limit is not None:
_dict['upper_limit'] = self.upper_limit
if hasattr(self, 'last') and self.last is not None:
_dict['last'] = self.last.to_dict()
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast():
"""
DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast.
:attr List[float] value: (optional)
:attr List[str] measurement_id: (optional)
"""
def __init__(self,
*,
value: List[float] = None,
measurement_id: List[str] = None) -> None:
"""
Initialize a DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast object.
:param List[float] value: (optional)
:param List[str] measurement_id: (optional)
"""
self.value = value
self.measurement_id = measurement_id
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast':
"""Initialize a DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast object from a json dictionary."""
args = {}
if 'value' in _dict:
args['value'] = _dict.get('value')
if 'measurement_id' in _dict:
args['measurement_id'] = _dict.get('measurement_id')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'value') and self.value is not None:
_dict['value'] = self.value
if hasattr(self, 'measurement_id') and self.measurement_id is not None:
_dict['measurement_id'] = self.measurement_id
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DataMartGetMonitorInstanceMetricsGroupsItemTagsItem():
"""
DataMartGetMonitorInstanceMetricsGroupsItemTagsItem.
:attr str id:
:attr str value:
"""
def __init__(self,
id: str,
value: str) -> None:
"""
Initialize a DataMartGetMonitorInstanceMetricsGroupsItemTagsItem object.
:param str id:
:param str value:
"""
self.id = id
self.value = value
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataMartGetMonitorInstanceMetricsGroupsItemTagsItem':
"""Initialize a DataMartGetMonitorInstanceMetricsGroupsItemTagsItem object from a json dictionary."""
args = {}
if 'id' in _dict:
args['id'] = _dict.get('id')
else:
raise ValueError('Required property \'id\' not present in DataMartGetMonitorInstanceMetricsGroupsItemTagsItem JSON')
if 'value' in _dict:
args['value'] = _dict.get('value')
else:
raise ValueError('Required property \'value\' not present in DataMartGetMonitorInstanceMetricsGroupsItemTagsItem JSON')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataMartGetMonitorInstanceMetricsGroupsItemTagsItem object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'id') and self.id is not None:
_dict['id'] = self.id
if hasattr(self, 'value') and self.value is not None:
_dict['value'] = self.value
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataMartGetMonitorInstanceMetricsGroupsItemTagsItem object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataMartGetMonitorInstanceMetricsGroupsItemTagsItem') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataMartGetMonitorInstanceMetricsGroupsItemTagsItem') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DataRecord():
"""
DataRecord.
:attr object values: Fields and values of the entity matches JSON object's
fields and values.
:attr dict annotations: (optional) Any JSON object representing annotations.
"""
def __init__(self,
values: object,
*,
annotations: dict = None) -> None:
"""
Initialize a DataRecord object.
:param object values: Fields and values of the entity matches JSON object's
fields and values.
:param dict annotations: (optional) Any JSON object representing
annotations.
"""
self.values = values
self.annotations = annotations
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataRecord':
"""Initialize a DataRecord object from a json dictionary."""
args = {}
if 'values' in _dict:
args['values'] = _dict.get('values')
else:
raise ValueError('Required property \'values\' not present in DataRecord JSON')
if 'annotations' in _dict:
args['annotations'] = _dict.get('annotations')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataRecord object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'values') and self.values is not None:
_dict['values'] = self.values
if hasattr(self, 'annotations') and self.annotations is not None:
_dict['annotations'] = self.annotations
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataRecord object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataRecord') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataRecord') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DataRecordResponse():
"""
DataRecordResponse.
:attr Metadata metadata:
:attr DataRecord entity:
"""
def __init__(self,
metadata: 'Metadata',
entity: 'DataRecord') -> None:
"""
Initialize a DataRecordResponse object.
:param Metadata metadata:
:param DataRecord entity:
"""
self.metadata = metadata
self.entity = entity
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataRecordResponse':
"""Initialize a DataRecordResponse object from a json dictionary."""
args = {}
if 'metadata' in _dict:
args['metadata'] = Metadata.from_dict(_dict.get('metadata'))
else:
raise ValueError('Required property \'metadata\' not present in DataRecordResponse JSON')
if 'entity' in _dict:
args['entity'] = DataRecord.from_dict(_dict.get('entity'))
else:
raise ValueError('Required property \'entity\' not present in DataRecordResponse JSON')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataRecordResponse object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'metadata') and self.metadata is not None:
_dict['metadata'] = self.metadata.to_dict()
if hasattr(self, 'entity') and self.entity is not None:
_dict['entity'] = self.entity.to_dict()
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataRecordResponse object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataRecordResponse') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataRecordResponse') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DataRecordResponseList():
"""
DataRecordResponseList.
:attr List[str] fields: Fields' names are listed in order in 'fields'.
:attr List[List[object]] values: Rows organized per fields as described in
'fields'.
:attr List[dict] annotations: (optional) List of annotations objects.
"""
def __init__(self,
fields: List[str],
values: List[List[object]],
*,
annotations: List[dict] = None) -> None:
"""
Initialize a DataRecordResponseList object.
:param List[str] fields: Fields' names are listed in order in 'fields'.
:param List[List[object]] values: Rows organized per fields as described in
'fields'.
:param List[dict] annotations: (optional) List of annotations objects.
"""
self.fields = fields
self.values = values
self.annotations = annotations
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataRecordResponseList':
"""Initialize a DataRecordResponseList object from a json dictionary."""
args = {}
if 'fields' in _dict:
args['fields'] = _dict.get('fields')
else:
raise ValueError('Required property \'fields\' not present in DataRecordResponseList JSON')
if 'values' in _dict:
args['values'] = _dict.get('values')
else:
raise ValueError('Required property \'values\' not present in DataRecordResponseList JSON')
if 'annotations' in _dict:
args['annotations'] = _dict.get('annotations')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataRecordResponseList object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'fields') and self.fields is not None:
_dict['fields'] = self.fields
if hasattr(self, 'values') and self.values is not None:
_dict['values'] = self.values
if hasattr(self, 'annotations') and self.annotations is not None:
_dict['annotations'] = self.annotations
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataRecordResponseList object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataRecordResponseList') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataRecordResponseList') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DataSetObject():
"""
DataSetObject.
:attr str data_mart_id:
:attr str name:
:attr str description: (optional)
:attr str type: type of a data set.
:attr Target target:
:attr str schema_update_mode: (optional)
:attr SparkStruct data_schema:
:attr LocationTableName location: (optional)
:attr str managed_by: (optional)
:attr Status status:
"""
def __init__(self,
data_mart_id: str,
name: str,
type: str,
target: 'Target',
data_schema: 'SparkStruct',
status: 'Status',
*,
description: str = None,
schema_update_mode: str = None,
location: 'LocationTableName' = None,
managed_by: str = None) -> None:
"""
Initialize a DataSetObject object.
:param str data_mart_id:
:param str name:
:param str type: type of a data set.
:param Target target:
:param SparkStruct data_schema:
:param Status status:
:param str description: (optional)
:param str schema_update_mode: (optional)
:param LocationTableName location: (optional)
:param str managed_by: (optional)
"""
self.data_mart_id = data_mart_id
self.name = name
self.description = description
self.type = type
self.target = target
self.schema_update_mode = schema_update_mode
self.data_schema = data_schema
self.location = location
self.managed_by = managed_by
self.status = status
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataSetObject':
"""Initialize a DataSetObject object from a json dictionary."""
args = {}
if 'data_mart_id' in _dict:
args['data_mart_id'] = _dict.get('data_mart_id')
else:
raise ValueError('Required property \'data_mart_id\' not present in DataSetObject JSON')
if 'name' in _dict:
args['name'] = _dict.get('name')
else:
raise ValueError('Required property \'name\' not present in DataSetObject JSON')
if 'description' in _dict:
args['description'] = _dict.get('description')
if 'type' in _dict:
args['type'] = _dict.get('type')
else:
raise ValueError('Required property \'type\' not present in DataSetObject JSON')
if 'target' in _dict:
args['target'] = Target.from_dict(_dict.get('target'))
else:
raise ValueError('Required property \'target\' not present in DataSetObject JSON')
if 'schema_update_mode' in _dict:
args['schema_update_mode'] = _dict.get('schema_update_mode')
if 'data_schema' in _dict:
args['data_schema'] = SparkStruct.from_dict(_dict.get('data_schema'))
else:
raise ValueError('Required property \'data_schema\' not present in DataSetObject JSON')
if 'location' in _dict:
args['location'] = LocationTableName.from_dict(_dict.get('location'))
if 'managed_by' in _dict:
args['managed_by'] = _dict.get('managed_by')
if 'status' in _dict:
args['status'] = Status.from_dict(_dict.get('status'))
else:
raise ValueError('Required property \'status\' not present in DataSetObject JSON')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataSetObject object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'data_mart_id') and self.data_mart_id is not None:
_dict['data_mart_id'] = self.data_mart_id
if hasattr(self, 'name') and self.name is not None:
_dict['name'] = self.name
if hasattr(self, 'description') and self.description is not None:
_dict['description'] = self.description
if hasattr(self, 'type') and self.type is not None:
_dict['type'] = self.type
if hasattr(self, 'target') and self.target is not None:
_dict['target'] = self.target.to_dict()
if hasattr(self, 'schema_update_mode') and self.schema_update_mode is not None:
_dict['schema_update_mode'] = self.schema_update_mode
if hasattr(self, 'data_schema') and self.data_schema is not None:
_dict['data_schema'] = self.data_schema.to_dict()
if hasattr(self, 'location') and self.location is not None:
_dict['location'] = self.location.to_dict()
if hasattr(self, 'managed_by') and self.managed_by is not None:
_dict['managed_by'] = self.managed_by
if hasattr(self, 'status') and self.status is not None:
_dict['status'] = self.status.to_dict()
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataSetObject object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataSetObject') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataSetObject') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class TypeEnum(str, Enum):
"""
type of a data set.
"""
MANUAL_LABELING = 'manual_labeling'
PAYLOAD_LOGGING = 'payload_logging'
FEEDBACK = 'feedback'
BUSINESS_PAYLOAD = 'business_payload'
EXPLANATIONS = 'explanations'
EXPLANATIONS_WHATIF = 'explanations_whatif'
TRAINING = 'training'
PAYLOAD_LOGGING_ERROR = 'payload_logging_error'
CUSTOM = 'custom'
class SchemaUpdateModeEnum(str, Enum):
"""
schema_update_mode.
"""
NONE = 'none'
AUTO = 'auto'
class DataSetRecords():
"""
DataSetRecords.
:attr List[DataSetRecordsDataSetRecordsItem] data_set_records:
"""
def __init__(self,
data_set_records: List['DataSetRecordsDataSetRecordsItem']) -> None:
"""
Initialize a DataSetRecords object.
:param List[DataSetRecordsDataSetRecordsItem] data_set_records:
"""
self.data_set_records = data_set_records
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataSetRecords':
"""Initialize a DataSetRecords object from a json dictionary."""
args = {}
if 'data_set_records' in _dict:
args['data_set_records'] = [DataSetRecordsDataSetRecordsItem.from_dict(x) for x in _dict.get('data_set_records')]
else:
raise ValueError('Required property \'data_set_records\' not present in DataSetRecords JSON')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataSetRecords object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'data_set_records') and self.data_set_records is not None:
_dict['data_set_records'] = [x.to_dict() for x in self.data_set_records]
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataSetRecords object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataSetRecords') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataSetRecords') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DataSetRecordsDataSetRecordsItem():
"""
DataSetRecordsDataSetRecordsItem.
:attr DataSetRecordsDataSetRecordsItemDataSet data_set:
:attr DataSetRecordsDataSetRecordsItemRecords records:
"""
def __init__(self,
data_set: 'DataSetRecordsDataSetRecordsItemDataSet',
records: 'DataSetRecordsDataSetRecordsItemRecords') -> None:
"""
Initialize a DataSetRecordsDataSetRecordsItem object.
:param DataSetRecordsDataSetRecordsItemDataSet data_set:
:param DataSetRecordsDataSetRecordsItemRecords records:
"""
self.data_set = data_set
self.records = records
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataSetRecordsDataSetRecordsItem':
"""Initialize a DataSetRecordsDataSetRecordsItem object from a json dictionary."""
args = {}
if 'data_set' in _dict:
args['data_set'] = DataSetRecordsDataSetRecordsItemDataSet.from_dict(_dict.get('data_set'))
else:
raise ValueError('Required property \'data_set\' not present in DataSetRecordsDataSetRecordsItem JSON')
if 'records' in _dict:
args['records'] = DataSetRecordsDataSetRecordsItemRecords.from_dict(_dict.get('records'))
else:
raise ValueError('Required property \'records\' not present in DataSetRecordsDataSetRecordsItem JSON')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataSetRecordsDataSetRecordsItem object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'data_set') and self.data_set is not None:
_dict['data_set'] = self.data_set.to_dict()
if hasattr(self, 'records') and self.records is not None:
_dict['records'] = self.records.to_dict()
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataSetRecordsDataSetRecordsItem object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataSetRecordsDataSetRecordsItem') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataSetRecordsDataSetRecordsItem') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DataSetRecordsDataSetRecordsItemDataSet():
"""
DataSetRecordsDataSetRecordsItemDataSet.
:attr str data_mart_id:
:attr str type: type of a data set.
:attr Target target:
:attr SparkStruct data_schema:
"""
def __init__(self,
data_mart_id: str,
type: str,
target: 'Target',
data_schema: 'SparkStruct') -> None:
"""
Initialize a DataSetRecordsDataSetRecordsItemDataSet object.
:param str data_mart_id:
:param str type: type of a data set.
:param Target target:
:param SparkStruct data_schema:
"""
self.data_mart_id = data_mart_id
self.type = type
self.target = target
self.data_schema = data_schema
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataSetRecordsDataSetRecordsItemDataSet':
"""Initialize a DataSetRecordsDataSetRecordsItemDataSet object from a json dictionary."""
args = {}
if 'data_mart_id' in _dict:
args['data_mart_id'] = _dict.get('data_mart_id')
else:
raise ValueError('Required property \'data_mart_id\' not present in DataSetRecordsDataSetRecordsItemDataSet JSON')
if 'type' in _dict:
args['type'] = _dict.get('type')
else:
raise ValueError('Required property \'type\' not present in DataSetRecordsDataSetRecordsItemDataSet JSON')
if 'target' in _dict:
args['target'] = Target.from_dict(_dict.get('target'))
else:
raise ValueError('Required property \'target\' not present in DataSetRecordsDataSetRecordsItemDataSet JSON')
if 'data_schema' in _dict:
args['data_schema'] = SparkStruct.from_dict(_dict.get('data_schema'))
else:
raise ValueError('Required property \'data_schema\' not present in DataSetRecordsDataSetRecordsItemDataSet JSON')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataSetRecordsDataSetRecordsItemDataSet object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'data_mart_id') and self.data_mart_id is not None:
_dict['data_mart_id'] = self.data_mart_id
if hasattr(self, 'type') and self.type is not None:
_dict['type'] = self.type
if hasattr(self, 'target') and self.target is not None:
_dict['target'] = self.target.to_dict()
if hasattr(self, 'data_schema') and self.data_schema is not None:
_dict['data_schema'] = self.data_schema.to_dict()
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataSetRecordsDataSetRecordsItemDataSet object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataSetRecordsDataSetRecordsItemDataSet') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataSetRecordsDataSetRecordsItemDataSet') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class TypeEnum(str, Enum):
"""
type of a data set.
"""
MANUAL_LABELING = 'manual_labeling'
PAYLOAD_LOGGING = 'payload_logging'
FEEDBACK = 'feedback'
BUSINESS_PAYLOAD = 'business_payload'
EXPLANATIONS = 'explanations'
EXPLANATIONS_WHATIF = 'explanations_whatif'
TRAINING = 'training'
PAYLOAD_LOGGING_ERROR = 'payload_logging_error'
CUSTOM = 'custom'
class DataSetRecordsDataSetRecordsItemRecords():
"""
DataSetRecordsDataSetRecordsItemRecords.
:attr Metadata metadata:
:attr DataSetRecordsDataSetRecordsItemRecordsEntity entity:
"""
def __init__(self,
metadata: 'Metadata',
entity: 'DataSetRecordsDataSetRecordsItemRecordsEntity') -> None:
"""
Initialize a DataSetRecordsDataSetRecordsItemRecords object.
:param Metadata metadata:
:param DataSetRecordsDataSetRecordsItemRecordsEntity entity:
"""
self.metadata = metadata
self.entity = entity
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataSetRecordsDataSetRecordsItemRecords':
"""Initialize a DataSetRecordsDataSetRecordsItemRecords object from a json dictionary."""
args = {}
if 'metadata' in _dict:
args['metadata'] = Metadata.from_dict(_dict.get('metadata'))
else:
raise ValueError('Required property \'metadata\' not present in DataSetRecordsDataSetRecordsItemRecords JSON')
if 'entity' in _dict:
args['entity'] = DataSetRecordsDataSetRecordsItemRecordsEntity.from_dict(_dict.get('entity'))
else:
raise ValueError('Required property \'entity\' not present in DataSetRecordsDataSetRecordsItemRecords JSON')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataSetRecordsDataSetRecordsItemRecords object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'metadata') and self.metadata is not None:
_dict['metadata'] = self.metadata.to_dict()
if hasattr(self, 'entity') and self.entity is not None:
_dict['entity'] = self.entity.to_dict()
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataSetRecordsDataSetRecordsItemRecords object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataSetRecordsDataSetRecordsItemRecords') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataSetRecordsDataSetRecordsItemRecords') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DataSetRecordsDataSetRecordsItemRecordsEntity():
"""
DataSetRecordsDataSetRecordsItemRecordsEntity.
:attr object values: Fields and values of the entity matches JSON object's
fields and values.
"""
def __init__(self,
values: object) -> None:
"""
Initialize a DataSetRecordsDataSetRecordsItemRecordsEntity object.
:param object values: Fields and values of the entity matches JSON object's
fields and values.
"""
self.values = values
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataSetRecordsDataSetRecordsItemRecordsEntity':
"""Initialize a DataSetRecordsDataSetRecordsItemRecordsEntity object from a json dictionary."""
args = {}
if 'values' in _dict:
args['values'] = _dict.get('values')
else:
raise ValueError('Required property \'values\' not present in DataSetRecordsDataSetRecordsItemRecordsEntity JSON')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataSetRecordsDataSetRecordsItemRecordsEntity object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'values') and self.values is not None:
_dict['values'] = self.values
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataSetRecordsDataSetRecordsItemRecordsEntity object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataSetRecordsDataSetRecordsItemRecordsEntity') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataSetRecordsDataSetRecordsItemRecordsEntity') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DataSetResponse():
"""
DataSetResponse.
:attr Metadata metadata:
:attr DataSetObject entity:
"""
def __init__(self,
metadata: 'Metadata',
entity: 'DataSetObject') -> None:
"""
Initialize a DataSetResponse object.
:param Metadata metadata:
:param DataSetObject entity:
"""
self.metadata = metadata
self.entity = entity
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataSetResponse':
"""Initialize a DataSetResponse object from a json dictionary."""
args = {}
if 'metadata' in _dict:
args['metadata'] = Metadata.from_dict(_dict.get('metadata'))
else:
raise ValueError('Required property \'metadata\' not present in DataSetResponse JSON')
if 'entity' in _dict:
args['entity'] = DataSetObject.from_dict(_dict.get('entity'))
else:
raise ValueError('Required property \'entity\' not present in DataSetResponse JSON')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataSetResponse object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'metadata') and self.metadata is not None:
_dict['metadata'] = self.metadata.to_dict()
if hasattr(self, 'entity') and self.entity is not None:
_dict['entity'] = self.entity.to_dict()
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataSetResponse object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataSetResponse') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataSetResponse') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DataSetResponseCollection():
"""
DataSetResponseCollection.
:attr List[DataSetResponse] data_sets:
"""
def __init__(self,
data_sets: List['DataSetResponse']) -> None:
"""
Initialize a DataSetResponseCollection object.
:param List[DataSetResponse] data_sets:
"""
self.data_sets = data_sets
@classmethod
def from_dict(cls, _dict: Dict) -> 'DataSetResponseCollection':
"""Initialize a DataSetResponseCollection object from a json dictionary."""
args = {}
if 'data_sets' in _dict:
args['data_sets'] = [DataSetResponse.from_dict(x) for x in _dict.get('data_sets')]
else:
raise ValueError('Required property \'data_sets\' not present in DataSetResponseCollection JSON')
return cls(**args)
@classmethod
def _from_dict(cls, _dict):
"""Initialize a DataSetResponseCollection object from a json dictionary."""
return cls.from_dict(_dict)
def to_dict(self) -> Dict:
"""Return a json dictionary representing this model."""
_dict = {}
if hasattr(self, 'data_sets') and self.data_sets is not None:
_dict['data_sets'] = [x.to_dict() for x in self.data_sets]
return _dict
def _to_dict(self):
"""Return a json dictionary representing this model."""
return self.to_dict()
def __str__(self) -> str:
"""Return a `str` version of this DataSetResponseCollection object."""
return json.dumps(self.to_dict(), indent=2)
def __eq__(self, other: 'DataSetResponseCollection') -> bool:
"""Return `true` when self and other are equal, false otherwise."""
if not isinstance(other, self.__class__):
return False
return self.__dict__ == other.__dict__
def __ne__(self, other: 'DataSetResponseCollection') -> bool:
"""Return `true` when self and other are not equal, false otherwise."""
return not self == other
class DataSource():
"""
DataSource.
:attr str type: (optional) Type of data source. e.g. payload, feedback,
drift,explain.
:attr DataSourceConnection connection: (optional)
:attr str database_name: (optional) database name.
:attr str schema_name: (optional) schema name.
:attr str table_name: (optional) table name.
:attr DataSourceEndpoint endpoint: (optional)
:attr object parameters: (optional) Additional parameters.
:attr bool auto_create: (optional) Set true for automatically creating the
table.
:attr DataSourceStatus status: (optional)
"""
def __init__(self,
*,
type: str = None,
connection: 'DataSourceConnection' = None,
database_name: str = None,
schema_name: str = None,