Source code for ibm_watson_openscale.base_classes.watson_open_scale_v2

# coding: utf-8

# (C) Copyright IBM Corp. 2021.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Watson OpenScale API Specification
"""

from datetime import datetime
from enum import Enum
from typing import BinaryIO, Dict, List, TextIO, Union
import json

from ibm_cloud_sdk_core import BaseService, DetailedResponse
from ibm_cloud_sdk_core.authenticators.authenticator import Authenticator
from ibm_cloud_sdk_core.get_authenticator import get_authenticator_from_environment
from ibm_cloud_sdk_core.utils import convert_list, convert_model, datetime_to_string, string_to_datetime

from .common import get_sdk_headers

##############################################################################
# Service
##############################################################################

class WatsonOpenScaleV2(BaseService):
    """The Watson OpenScale V2 service."""

    DEFAULT_SERVICE_URL = None
    DEFAULT_SERVICE_NAME = 'ai_openscale'

    @classmethod
    def new_instance(cls,
                     service_name: str = DEFAULT_SERVICE_NAME,
                    ) -> 'WatsonOpenScaleV2':
        """
        Return a new client for the Watson OpenScale service using the specified
               parameters and external configuration.
        """
        authenticator = get_authenticator_from_environment(service_name)
        service = cls(
            authenticator
            )
        service.configure_service(service_name)
        return service

    def __init__(self,
                 authenticator: Authenticator = None,
                ) -> None:
        """
        Construct a new client for the Watson OpenScale service.

        :param Authenticator authenticator: The authenticator specifies the authentication mechanism.
               Get up to date information from https://github.com/IBM/python-sdk-core/blob/master/README.md
               about initializing the authenticator of your choice.
        """
        BaseService.__init__(self,
                             service_url=self.DEFAULT_SERVICE_URL,
                             authenticator=authenticator)


#########################
# Data Marts
#########################

class DataMarts:
    """
    Data Marts
    """

    def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None:
        """
        Construct a DataMarts client for the Watson OpenScale service.

        :param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service.
        """
        self.watson_open_scale = watson_open_scale


    def add(self,
        *,
        name: str = None,
        description: str = None,
        service_instance_crn: str = None,
        internal_database: bool = None,
        database_configuration: 'DatabaseConfigurationRequest' = None,
        database_discovery: str = None,
        force: bool = None,
        **kwargs
    ) -> DetailedResponse:
        """
        Create a new data mart.

        Create a new data mart with the given database connection.

        :param str name: (optional) Name of the data mart.
        :param str description: (optional) Description of the data mart.
        :param str service_instance_crn: (optional) Can be omitted if user token is
               used for authorization.
        :param bool internal_database: (optional) If `true` the internal database
               managed by AI OpenScale is provided for the user.
        :param DatabaseConfigurationRequest database_configuration: (optional)
               Database configuration ignored if internal database is requested
               (`internal_database` is `true`).
        :param str database_discovery: (optional) Indicates if the database was
               discovered automatically or manually added by user through UI.
        :param bool force: (optional) force update of metadata and db credentials
               (assumption is that the new database is already prepared and populated).
        :param dict headers: A `dict` containing the request headers
        :return: A `DetailedResponse` containing the result, headers and HTTP status code.
        :rtype: DetailedResponse with `DataMartDatabaseResponse` result
        """

        if database_configuration is not None:
            database_configuration = convert_model(database_configuration)
        headers = {}
        sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME,
                                      service_version='V2',
                                      operation_id='data_marts_add')
        headers.update(sdk_headers)

        params = {
            'force': force
        }

        data = {
            'name': name,
            'description': description,
            'service_instance_crn': service_instance_crn,
            'internal_database': internal_database,
            'database_configuration': database_configuration,
            'database_discovery': database_discovery
        }
        data = {k: v for (k, v) in data.items() if v is not None}
        data = json.dumps(data)
        headers['content-type'] = 'application/json'

        if 'headers' in kwargs:
            headers.update(kwargs.get('headers'))
        headers['Accept'] = 'application/json'

        url = '/v2/data_marts'
        request = self.watson_open_scale.prepare_request(method='POST',
                                       url=url,
                                       headers=headers,
                                       params=params,
                                       data=data)

        response = self.watson_open_scale.send(request)
        if hasattr(DataMartDatabaseResponse, 'from_dict'):
            response.result = DataMartDatabaseResponse.from_dict(response.result)
        return response


[docs] def list(self, **kwargs ) -> DetailedResponse: """ List all data marts. The method returns the data mart confugrations as an object. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `DataMartDatabaseResponseCollection` result """ headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='data_marts_list') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/data_marts' request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers) response = self.watson_open_scale.send(request) if hasattr(DataMartDatabaseResponseCollection, 'from_dict'): response.result = DataMartDatabaseResponseCollection.from_dict(response.result) return response
[docs] def get(self, data_mart_id: str, **kwargs ) -> DetailedResponse: """ Get data mart with the given id. :param str data_mart_id: ID of the data mart. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `DataMartDatabaseResponse` result """ if data_mart_id is None: raise ValueError('data_mart_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='data_marts_get') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/data_marts/{0}'.format( *self.watson_open_scale.encode_path_vars(data_mart_id)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers) response = self.watson_open_scale.send(request) if hasattr(DataMartDatabaseResponse, 'from_dict'): response.result = DataMartDatabaseResponse.from_dict(response.result) return response
[docs] def patch(self, data_mart_id: str, json_patch_operation: List['JsonPatchOperation'], **kwargs ) -> DetailedResponse: """ Update a data mart. :param str data_mart_id: ID of the data mart. :param List[JsonPatchOperation] json_patch_operation: :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `DataMartDatabaseResponse` result """ if data_mart_id is None: raise ValueError('data_mart_id must be provided') if json_patch_operation is None: raise ValueError('json_patch_operation must be provided') json_patch_operation = [convert_model(x) for x in json_patch_operation] headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='data_marts_patch') headers.update(sdk_headers) data = json.dumps(json_patch_operation) headers['content-type'] = 'application/json-patch+json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/data_marts/{0}'.format( *self.watson_open_scale.encode_path_vars(data_mart_id)) request = self.watson_open_scale.prepare_request(method='PATCH', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(DataMartDatabaseResponse, 'from_dict'): response.result = DataMartDatabaseResponse.from_dict(response.result) return response
def delete(self, data_mart_id: str, *, force: bool = None, **kwargs ) -> DetailedResponse: """ Delete a data mart. :param str data_mart_id: ID of the data mart. :param bool force: (optional) force hard delete. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse """ if data_mart_id is None: raise ValueError('data_mart_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='data_marts_delete') headers.update(sdk_headers) params = { 'force': force } if 'headers' in kwargs: headers.update(kwargs.get('headers')) url = '/v2/data_marts/{0}'.format( *self.watson_open_scale.encode_path_vars(data_mart_id)) request = self.watson_open_scale.prepare_request(method='DELETE', url=url, headers=headers, params=params) response = self.watson_open_scale.send(request) return response ######################### # Service Providers ######################### class ServiceProviders: """ Service Providers """ def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None: """ Construct a ServiceProviders client for the Watson OpenScale service. :param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service. """ self.watson_open_scale = watson_open_scale
[docs] def list(self, *, show_deleted: bool = None, service_type: str = None, instance_id: str = None, operational_space_id: str = None, deployment_space_id: str = None, **kwargs ) -> DetailedResponse: """ List service providers. List assosiated Machine Learning service instances. :param bool show_deleted: (optional) show also resources pending delete. :param str service_type: (optional) Type of service. :param str instance_id: (optional) comma-separeted list of IDs. :param str operational_space_id: (optional) comma-separeted list of IDs. :param str deployment_space_id: (optional) comma-separeted list of IDs. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `ServiceProviderResponseCollection` result """ headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='service_providers_list') headers.update(sdk_headers) params = { 'show_deleted': show_deleted, 'service_type': service_type, 'instance_id': instance_id, 'operational_space_id': operational_space_id, 'deployment_space_id': deployment_space_id } if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/service_providers' request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers, params=params) response = self.watson_open_scale.send(request) if hasattr(ServiceProviderResponseCollection, 'from_dict'): response.result = ServiceProviderResponseCollection.from_dict(response.result) return response
def add(self, name: str, service_type: str, credentials: 'MLCredentials', *, description: str = None, request_headers: dict = None, operational_space_id: str = None, deployment_space_id: str = None, **kwargs ) -> DetailedResponse: """ Add service provider. Assosiate external Machine Learning service instance with the OpenScale DataMart. :param str name: Name of the ML service instance. :param str service_type: machine learning service type (azure_machine_learning_studio is a prefered alias for azure_machine_learning and should be used in new service bindings). :param MLCredentials credentials: :param str description: (optional) :param dict request_headers: (optional) map header name to header value. :param str operational_space_id: (optional) Reference to Operational Space. :param str deployment_space_id: (optional) Reference to V2 Space ID. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `ServiceProviderResponse` result """ if name is None: raise ValueError('name must be provided') if service_type is None: raise ValueError('service_type must be provided') if credentials is None: raise ValueError('credentials must be provided') credentials = convert_model(credentials) headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='service_providers_add') headers.update(sdk_headers) data = { 'name': name, 'service_type': service_type, 'credentials': credentials, 'description': description, 'request_headers': request_headers, 'operational_space_id': operational_space_id, 'deployment_space_id': deployment_space_id } data = {k: v for (k, v) in data.items() if v is not None} data = json.dumps(data) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/service_providers' request = self.watson_open_scale.prepare_request(method='POST', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(ServiceProviderResponse, 'from_dict'): response.result = ServiceProviderResponse.from_dict(response.result) return response
[docs] def get(self, service_provider_id: str, **kwargs ) -> DetailedResponse: """ Get a specific service provider. Get the assosiated Machine Learning service provider details. :param str service_provider_id: ID of the ML service provider. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `ServiceProviderResponse` result """ if service_provider_id is None: raise ValueError('service_provider_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='service_providers_get') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/service_providers/{0}'.format( *self.watson_open_scale.encode_path_vars(service_provider_id)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers) response = self.watson_open_scale.send(request) if hasattr(ServiceProviderResponse, 'from_dict'): response.result = ServiceProviderResponse.from_dict(response.result) return response
def delete(self, service_provider_id: str, *, force: bool = None, **kwargs ) -> DetailedResponse: """ Delete a service provider. Detach Machine Learning service provider. :param str service_provider_id: ID of the ML service provider. :param bool force: (optional) force hard delete. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse """ if service_provider_id is None: raise ValueError('service_provider_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='service_providers_delete') headers.update(sdk_headers) params = { 'force': force } if 'headers' in kwargs: headers.update(kwargs.get('headers')) url = '/v2/service_providers/{0}'.format( *self.watson_open_scale.encode_path_vars(service_provider_id)) request = self.watson_open_scale.prepare_request(method='DELETE', url=url, headers=headers, params=params) response = self.watson_open_scale.send(request) return response
[docs] def update(self, service_provider_id: str, patch_document: List['PatchDocument'], **kwargs ) -> DetailedResponse: """ Update a service provider. Update existing service provider. :param str service_provider_id: ID of the ML service provider. :param List[PatchDocument] patch_document: :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `ServiceProviderResponse` result """ if service_provider_id is None: raise ValueError('service_provider_id must be provided') if patch_document is None: raise ValueError('patch_document must be provided') patch_document = [convert_model(x) for x in patch_document] headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='service_providers_update') headers.update(sdk_headers) data = json.dumps(patch_document) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/service_providers/{0}'.format( *self.watson_open_scale.encode_path_vars(service_provider_id)) request = self.watson_open_scale.prepare_request(method='PATCH', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(ServiceProviderResponse, 'from_dict'): response.result = ServiceProviderResponse.from_dict(response.result) return response
######################### # Subscriptions ######################### class Subscriptions: """ Subscriptions """ def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None: """ Construct a Subscriptions client for the Watson OpenScale service. :param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service. """ self.watson_open_scale = watson_open_scale
[docs] def list(self, *, data_mart_id: str = None, service_provider_id: str = None, asset_asset_id: str = None, deployment_deployment_id: str = None, deployment_deployment_type: str = None, integration_reference_integrated_system_id: str = None, integration_reference_external_id: str = None, risk_evaluation_status_state: str = None, service_provider_operational_space_id: str = None, pre_production_reference_id: str = None, **kwargs ) -> DetailedResponse: """ List subscriptions. List subscriptions. :param str data_mart_id: (optional) comma-separeted list of IDs. :param str service_provider_id: (optional) comma-separeted list of IDs. :param str asset_asset_id: (optional) comma-separeted list of IDs. :param str deployment_deployment_id: (optional) comma-separeted list of IDs. :param str deployment_deployment_type: (optional) comma-separeted list of types. :param str integration_reference_integrated_system_id: (optional) comma-separeted list of IDs. :param str integration_reference_external_id: (optional) comma-separeted list of IDs. :param str risk_evaluation_status_state: (optional) comma-separeted list of states. :param str service_provider_operational_space_id: (optional) comma-separeted list of operational space ids (property of service provider object). :param str pre_production_reference_id: (optional) comma-separeted list of IDs. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `SubscriptionResponseCollection` result """ headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='subscriptions_list') headers.update(sdk_headers) params = { 'data_mart_id': data_mart_id, 'service_provider_id': service_provider_id, 'asset.asset_id': asset_asset_id, 'deployment.deployment_id': deployment_deployment_id, 'deployment.deployment_type': deployment_deployment_type, 'integration_reference.integrated_system_id': integration_reference_integrated_system_id, 'integration_reference.external_id': integration_reference_external_id, 'risk_evaluation_status.state': risk_evaluation_status_state, 'service_provider.operational_space_id': service_provider_operational_space_id, 'pre_production_reference_id': pre_production_reference_id } if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/subscriptions' request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers, params=params) response = self.watson_open_scale.send(request) if hasattr(SubscriptionResponseCollection, 'from_dict'): response.result = SubscriptionResponseCollection.from_dict(response.result) return response
def add(self, data_mart_id: str, service_provider_id: str, asset: 'Asset', deployment: 'AssetDeploymentRequest', *, asset_properties: 'AssetPropertiesRequest' = None, risk_evaluation_status: 'RiskEvaluationStatus' = None, analytics_engine: 'AnalyticsEngine' = None, data_sources: List['DataSource'] = None, **kwargs ) -> DetailedResponse: """ Add a new subscription. Add a new subscription to the model deployment. :param str data_mart_id: :param str service_provider_id: :param Asset asset: :param AssetDeploymentRequest deployment: :param AssetPropertiesRequest asset_properties: (optional) Additional asset properties (subject of discovery if not provided when creating the subscription). :param RiskEvaluationStatus risk_evaluation_status: (optional) :param AnalyticsEngine analytics_engine: (optional) :param List[DataSource] data_sources: (optional) :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `SubscriptionResponse` result """ if data_mart_id is None: raise ValueError('data_mart_id must be provided') if service_provider_id is None: raise ValueError('service_provider_id must be provided') if asset is None: raise ValueError('asset must be provided') if deployment is None: raise ValueError('deployment must be provided') asset = convert_model(asset) deployment = convert_model(deployment) if asset_properties is not None: asset_properties = convert_model(asset_properties) if risk_evaluation_status is not None: risk_evaluation_status = convert_model(risk_evaluation_status) if analytics_engine is not None: analytics_engine = convert_model(analytics_engine) if data_sources is not None: data_sources = [convert_model(x) for x in data_sources] headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='subscriptions_add') headers.update(sdk_headers) data = { 'data_mart_id': data_mart_id, 'service_provider_id': service_provider_id, 'asset': asset, 'deployment': deployment, 'asset_properties': asset_properties, 'risk_evaluation_status': risk_evaluation_status, 'analytics_engine': analytics_engine, 'data_sources': data_sources } data = {k: v for (k, v) in data.items() if v is not None} data = json.dumps(data) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/subscriptions' request = self.watson_open_scale.prepare_request(method='POST', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(SubscriptionResponse, 'from_dict'): response.result = SubscriptionResponse.from_dict(response.result) return response
[docs] def get(self, subscription_id: str, **kwargs ) -> DetailedResponse: """ Get a specific subscription. Get a specific subscription. :param str subscription_id: Unique subscription ID. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `SubscriptionResponse` result """ if subscription_id is None: raise ValueError('subscription_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='subscriptions_get') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/subscriptions/{0}'.format( *self.watson_open_scale.encode_path_vars(subscription_id)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers) response = self.watson_open_scale.send(request) if hasattr(SubscriptionResponse, 'from_dict'): response.result = SubscriptionResponse.from_dict(response.result) return response
[docs] def update(self, subscription_id: str, patch_document: List['PatchDocument'], **kwargs ) -> DetailedResponse: """ Update a subscription. Update existing asset (from ML service instance) subscription. :param str subscription_id: Unique subscription ID. :param List[PatchDocument] patch_document: :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `SubscriptionResponse` result """ if subscription_id is None: raise ValueError('subscription_id must be provided') if patch_document is None: raise ValueError('patch_document must be provided') patch_document = [convert_model(x) for x in patch_document] headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='subscriptions_update') headers.update(sdk_headers) data = json.dumps(patch_document) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/subscriptions/{0}'.format( *self.watson_open_scale.encode_path_vars(subscription_id)) request = self.watson_open_scale.prepare_request(method='PATCH', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(SubscriptionResponse, 'from_dict'): response.result = SubscriptionResponse.from_dict(response.result) return response
def delete(self, subscription_id: str, *, force: bool = None, **kwargs ) -> DetailedResponse: """ Delete a subscription. Delete a subscription. :param str subscription_id: Unique subscription ID. :param bool force: (optional) force hard delete. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse """ if subscription_id is None: raise ValueError('subscription_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='subscriptions_delete') headers.update(sdk_headers) params = { 'force': force } if 'headers' in kwargs: headers.update(kwargs.get('headers')) url = '/v2/subscriptions/{0}'.format( *self.watson_open_scale.encode_path_vars(subscription_id)) request = self.watson_open_scale.prepare_request(method='DELETE', url=url, headers=headers, params=params) response = self.watson_open_scale.send(request) return response
[docs] def schemas(self, subscription_id: str, *, input_data: List['ScoreData'] = None, training_data_reference: 'InputDataReference' = None, **kwargs ) -> DetailedResponse: """ Derive model schemas from the training data. Derive model schemas from the training data. Only "structured" input data type is supported. If the input_data_type field in the subscription (subscription -> entity -> asset -> input_data_type) is not "structured", an error will be returned. :param str subscription_id: Unique subscription ID. :param List[ScoreData] input_data: (optional) Array of score data object. If multiple score data objects are included, the "fields" array (if any) for score purposes will always be taken from the first score data object. :param InputDataReference training_data_reference: (optional) InputDataReference is the same as TrainingDataReference except that neither location nor connection is required. This is needed for the Schemas API and to avoid updating existing APIs. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `SchemaInferenceResponse` result """ if subscription_id is None: raise ValueError('subscription_id must be provided') if input_data is not None: input_data = [convert_model(x) for x in input_data] if training_data_reference is not None: training_data_reference = convert_model(training_data_reference) headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='subscriptions_schemas') headers.update(sdk_headers) data = { 'input_data': input_data, 'training_data_reference': training_data_reference } data = {k: v for (k, v) in data.items() if v is not None} data = json.dumps(data) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/subscriptions/{0}/schemas'.format( *self.watson_open_scale.encode_path_vars(subscription_id)) request = self.watson_open_scale.prepare_request(method='POST', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(SchemaInferenceResponse, 'from_dict'): response.result = SchemaInferenceResponse.from_dict(response.result) return response
[docs] def score(self, subscription_id: str, values: List[str], *, fields: List[str] = None, **kwargs ) -> DetailedResponse: """ Computes the bias mitigation/remediation for the specified model. Computes the bias mitigation/remediation for the specified model. The fairness monitoring debias request payload details must be valid. :param str subscription_id: Unique subscription ID. :param List[str] values: The values associated to the fields. :param List[str] fields: (optional) The fields to process debias scoring. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `FairnessMonitoringRemediation` result """ if subscription_id is None: raise ValueError('subscription_id must be provided') if values is None: raise ValueError('values must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='subscriptions_score') headers.update(sdk_headers) data = { 'values': values, 'fields': fields } data = {k: v for (k, v) in data.items() if v is not None} data = json.dumps(data) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/subscriptions/{0}/predictions'.format( *self.watson_open_scale.encode_path_vars(subscription_id)) request = self.watson_open_scale.prepare_request(method='POST', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(FairnessMonitoringRemediation, 'from_dict'): response.result = FairnessMonitoringRemediation.from_dict(response.result) return response
######################### # Data Sets ######################### class DataSets: """ Data Sets """ def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None: """ Construct a DataSets client for the Watson OpenScale service. :param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service. """ self.watson_open_scale = watson_open_scale
[docs] def list(self, *, target_target_id: str = None, target_target_type: str = None, type: str = None, managed_by: str = None, **kwargs ) -> DetailedResponse: """ List all data sets specified by the parameters. :param str target_target_id: (optional) ID of the data set target (e.g. subscription ID, business application ID). :param str target_target_type: (optional) type of the target. :param str type: (optional) type of the data set. :param str managed_by: (optional) ID of the managing entity (e.g. business application ID). :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `DataSetResponseCollection` result """ headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='data_sets_list') headers.update(sdk_headers) params = { 'target.target_id': target_target_id, 'target.target_type': target_target_type, 'type': type, 'managed_by': managed_by } if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/data_sets' request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers, params=params) response = self.watson_open_scale.send(request) if hasattr(DataSetResponseCollection, 'from_dict'): response.result = DataSetResponseCollection.from_dict(response.result) return response
def add(self, data_mart_id: str, name: str, type: str, target: 'Target', data_schema: 'SparkStruct', *, description: str = None, schema_update_mode: str = None, location: 'LocationTableName' = None, managed_by: str = None, **kwargs ) -> DetailedResponse: """ Create a new data set. Create a new data set. :param str data_mart_id: :param str name: :param str type: type of a data set. :param Target target: :param SparkStruct data_schema: :param str description: (optional) :param str schema_update_mode: (optional) :param LocationTableName location: (optional) :param str managed_by: (optional) :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `DataSetResponse` result """ if data_mart_id is None: raise ValueError('data_mart_id must be provided') if name is None: raise ValueError('name must be provided') if type is None: raise ValueError('type must be provided') if target is None: raise ValueError('target must be provided') if data_schema is None: raise ValueError('data_schema must be provided') target = convert_model(target) data_schema = convert_model(data_schema) if location is not None: location = convert_model(location) headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='data_sets_add') headers.update(sdk_headers) data = { 'data_mart_id': data_mart_id, 'name': name, 'type': type, 'target': target, 'data_schema': data_schema, 'description': description, 'schema_update_mode': schema_update_mode, 'location': location, 'managed_by': managed_by } data = {k: v for (k, v) in data.items() if v is not None} data = json.dumps(data) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/data_sets' request = self.watson_open_scale.prepare_request(method='POST', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(DataSetResponse, 'from_dict'): response.result = DataSetResponse.from_dict(response.result) return response
[docs] def get(self, data_set_id: str, **kwargs ) -> DetailedResponse: """ Get data set with the given id. :param str data_set_id: ID of the data set. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `DataSetResponse` result """ if data_set_id is None: raise ValueError('data_set_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='data_sets_get') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/data_sets/{0}'.format( *self.watson_open_scale.encode_path_vars(data_set_id)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers) response = self.watson_open_scale.send(request) if hasattr(DataSetResponse, 'from_dict'): response.result = DataSetResponse.from_dict(response.result) return response
def delete(self, data_set_id: str, **kwargs ) -> DetailedResponse: """ Delete a data set. :param str data_set_id: ID of the data set. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse """ if data_set_id is None: raise ValueError('data_set_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='data_sets_delete') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) url = '/v2/data_sets/{0}'.format( *self.watson_open_scale.encode_path_vars(data_set_id)) request = self.watson_open_scale.prepare_request(method='DELETE', url=url, headers=headers) response = self.watson_open_scale.send(request) return response
[docs] def update(self, data_set_id: str, patch_document: List['PatchDocument'], **kwargs ) -> DetailedResponse: """ Update a data set. Update the data set. :param str data_set_id: ID of the data set. :param List[PatchDocument] patch_document: :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `DataSetResponse` result """ if data_set_id is None: raise ValueError('data_set_id must be provided') if patch_document is None: raise ValueError('patch_document must be provided') patch_document = [convert_model(x) for x in patch_document] headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='data_sets_update') headers.update(sdk_headers) data = json.dumps(patch_document) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/data_sets/{0}'.format( *self.watson_open_scale.encode_path_vars(data_set_id)) request = self.watson_open_scale.prepare_request(method='PATCH', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(DataSetResponse, 'from_dict'): response.result = DataSetResponse.from_dict(response.result) return response
######################### # Records ######################### class Records: """ Records """ def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None: """ Construct a Records client for the Watson OpenScale service. :param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service. """ self.watson_open_scale = watson_open_scale def add(self, data_set_id: str, request_body: Union[List[object], str, TextIO], *, content_type: str = None, header: bool = None, skip: int = None, limit: int = None, delimiter: str = None, on_error: str = None, csv_max_line_length: float = None, **kwargs ) -> DetailedResponse: """ Add new data set records. Add new data set records. :param str data_set_id: ID of the data set. :param List[object] request_body: :param str content_type: (optional) The type of the input. A character encoding can be specified by including a `charset` parameter. For example, 'text/csv;charset=utf-8'. :param bool header: (optional) if not provided service will attempt to automatically detect header in the first line. :param int skip: (optional) skip number of rows from input. :param int limit: (optional) limit for number of processed input rows. :param str delimiter: (optional) delimiter character for data provided as csv. :param str on_error: (optional) expected behaviour on error. :param float csv_max_line_length: (optional) maximum length of single line in bytes (default 10MB). :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `Status` result """ if data_set_id is None: raise ValueError('data_set_id must be provided') if request_body is None: raise ValueError('request_body must be provided') headers = { 'Content-Type': content_type } sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='records_add') headers.update(sdk_headers) params = { 'header': header, 'skip': skip, 'limit': limit, 'delimiter': delimiter, 'on_error': on_error, 'csv_max_line_length': csv_max_line_length } if isinstance(request_body, list): data = json.dumps(request_body) if content_type is None: headers['Content-Type'] = 'application/json' else: data = request_body if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/data_sets/{0}/records'.format( *self.watson_open_scale.encode_path_vars(data_set_id)) request = self.watson_open_scale.prepare_request(method='POST', url=url, headers=headers, params=params, data=data) response = self.watson_open_scale.send(request) if hasattr(Status, 'from_dict'): response.result = Status.from_dict(response.result) return response def list(self, data_set_id: str, *, start: datetime = None, end: datetime = None, limit: int = None, offset: int = None, annotations: List[str] = None, filter: str = None, include_total_count: bool = None, format: str = None, binary_format: str = None, **kwargs ) -> DetailedResponse: """ List data set records. List data set records. :param str data_set_id: ID of the data set. :param datetime start: (optional) return records with timestamp grather then or equal to `start` parameter. :param datetime end: (optional) return records with timestamp lower then `end` parameter. :param int limit: (optional) limit for number of returned records. If the value is greater than 1000 than it will be truncated. :param int offset: (optional) offset of returned records. :param List[str] annotations: (optional) return record annotations with given names. :param str filter: (optional) Only return records that match given filters. There are two types of filters, separated by commas: * normal filter (multiple are possible), {field_name}:{op}:{value} — filter records directly * joining filter (only a single one is possible), {data_set_id}.{field_name}:{op}:{value} — join a data set by transaction_id (the user must ensure it's provided!) and filter by this data set's records' field. Will fail if the user hasn't provided transaction_id for both data sets' records. Filters of different types can be mixed. They are partly compatible with the ones in POST /v2/data_sets/{data_set_id}/distributions. Available operators: | op | meaning | example | code equivalent | |:----:|:---------------------------:|:------------------:|:------------------------:| | eq | equality | field:eq:value | field == value | | gt | greater than | field:gt:value | field > value | | gte | greater or equal | field:gte:value | field >= value | | lt | less than | field:lt:value | field < value | | lte | less or equal | field:lte:value | field <= value | | like | matching a simple pattern* | field:like:pattern | pattern.match(field) | | in | is contained in list | field:in:a;b;c | [a,b,c].contains(field) | * - "%" means "one or more character", "_" means "any single character", other characters have their usual, literal meaning (e.g. "|" means character "|"). :param bool include_total_count: (optional) If total_count should be included. It can have performance impact if total_count is calculated. :param str format: (optional) What JSON format to use on output. :param str binary_format: (optional) Binary data presentation format. By default, the binary field value is encoded to base64 string. If _reference_ is chosen, every binary field is moved to the _references_ section with value set to an uri to the particular field within the record that can be GET in a separate request. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `RecordsListResponse` result """ if data_set_id is None: raise ValueError('data_set_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='records_list') headers.update(sdk_headers) params = { 'start': start, 'end': end, 'limit': limit, 'offset': offset, 'annotations': convert_list(annotations), 'filter': filter, 'include_total_count': include_total_count, 'format': format, 'binary_format': binary_format } if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/data_sets/{0}/records'.format( *self.watson_open_scale.encode_path_vars(data_set_id)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers, params=params) response = self.watson_open_scale.send(request) if hasattr(RecordsListResponse, 'from_dict'): response.result = RecordsListResponse.from_dict(response.result) return response def patch(self, data_set_id: str, patch_document: List['PatchDocument'], **kwargs ) -> DetailedResponse: """ Update data set records. Update data set records. :param str data_set_id: ID of the data set. :param List[PatchDocument] patch_document: :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `Status` result """ if data_set_id is None: raise ValueError('data_set_id must be provided') if patch_document is None: raise ValueError('patch_document must be provided') patch_document = [convert_model(x) for x in patch_document] headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='records_patch') headers.update(sdk_headers) data = json.dumps(patch_document) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/data_sets/{0}/records'.format( *self.watson_open_scale.encode_path_vars(data_set_id)) request = self.watson_open_scale.prepare_request(method='PATCH', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(Status, 'from_dict'): response.result = Status.from_dict(response.result) return response def get(self, data_set_id: str, record_id: str, *, binary_format: str = None, **kwargs ) -> DetailedResponse: """ Get a specific data set record with the given id. Get a specific record in a data set. :param str data_set_id: ID of the data set. :param str record_id: ID of the record. :param str binary_format: (optional) Binary data presentation format. By default, the binary field value is encoded to base64 string. If _reference_ is chosen, every binary field is moved to the _references_ section with value set to an uri to the particular field within the record that can be GET in a separate request. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `DataRecordResponse` result """ if data_set_id is None: raise ValueError('data_set_id must be provided') if record_id is None: raise ValueError('record_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='records_get') headers.update(sdk_headers) params = { 'binary_format': binary_format } if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/data_sets/{0}/records/{1}'.format( *self.watson_open_scale.encode_path_vars(data_set_id, record_id)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers, params=params) response = self.watson_open_scale.send(request) if hasattr(DataRecordResponse, 'from_dict'): response.result = DataRecordResponse.from_dict(response.result) return response def update(self, data_set_id: str, record_id: str, patch_document: List['PatchDocument'], *, binary_format: str = None, **kwargs ) -> DetailedResponse: """ Update a specific record in a data set. Update a specific record in a data set. :param str data_set_id: ID of the data set. :param str record_id: ID of the record. :param List[PatchDocument] patch_document: :param str binary_format: (optional) Binary data presentation format. By default, the binary field value is encoded to base64 string. If _reference_ is chosen, every binary field is moved to the _references_ section with value set to an uri to the particular field within the record that can be GET in a separate request. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `DataRecordResponse` result """ if data_set_id is None: raise ValueError('data_set_id must be provided') if record_id is None: raise ValueError('record_id must be provided') if patch_document is None: raise ValueError('patch_document must be provided') patch_document = [convert_model(x) for x in patch_document] headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='records_update') headers.update(sdk_headers) params = { 'binary_format': binary_format } data = json.dumps(patch_document) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/data_sets/{0}/records/{1}'.format( *self.watson_open_scale.encode_path_vars(data_set_id, record_id)) request = self.watson_open_scale.prepare_request(method='PATCH', url=url, headers=headers, params=params, data=data) response = self.watson_open_scale.send(request) if hasattr(DataRecordResponse, 'from_dict'): response.result = DataRecordResponse.from_dict(response.result) return response def field(self, data_set_id: str, record_id: str, field_name: str, **kwargs ) -> DetailedResponse: """ Get value of a field in a given record. Get value of a field in a given record. :param str data_set_id: ID of the data set. :param str record_id: ID of the record. :param str field_name: field_name should map to db column name which value is to be retrieved. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse """ if data_set_id is None: raise ValueError('data_set_id must be provided') if record_id is None: raise ValueError('record_id must be provided') if field_name is None: raise ValueError('field_name must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='records_field') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) url = '/v2/data_sets/{0}/records/{1}/{2}'.format( *self.watson_open_scale.encode_path_vars(data_set_id, record_id, field_name)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers) response = self.watson_open_scale.send(request) return response def query(self, data_set_type: str, *, record_id: List[str] = None, transaction_id: List[str] = None, start: datetime = None, end: datetime = None, offset: int = None, limit: int = None, **kwargs ) -> DetailedResponse: """ Get data set records using record_id or transaction_id. Get data set records with specific record_id or transaction_id. :param str data_set_type: a (single) data set type. :param List[str] record_id: (optional) one or more record id values that should be matched. :param List[str] transaction_id: (optional) one or more transaction id values that should be matched. :param datetime start: (optional) beginning of the time range. :param datetime end: (optional) end of the time range. :param int offset: (optional) offset of returned explanations. :param int limit: (optional) Maximum number of elements returned. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `DataSetRecords` result """ if data_set_type is None: raise ValueError('data_set_type must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='records_query') headers.update(sdk_headers) params = { 'data_set_type': data_set_type, 'record_id': convert_list(record_id), 'transaction_id': convert_list(transaction_id), 'start': start, 'end': end, 'offset': offset, 'limit': limit } if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/data_set_records' request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers, params=params) response = self.watson_open_scale.send(request) if hasattr(DataSetRecords, 'from_dict'): response.result = DataSetRecords.from_dict(response.result) return response ######################### # Requests ######################### class Requests: """ Requests """ def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None: """ Construct a Requests client for the Watson OpenScale service. :param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service. """ self.watson_open_scale = watson_open_scale def get(self, data_set_id: str, request_id: str, **kwargs ) -> DetailedResponse: """ Get status of a specific request. Get status of a specific request. :param str data_set_id: ID of the data set. :param str request_id: ID of the request. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `Status` result """ if data_set_id is None: raise ValueError('data_set_id must be provided') if request_id is None: raise ValueError('request_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='requests_get') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/data_sets/{0}/requests/{1}'.format( *self.watson_open_scale.encode_path_vars(data_set_id, request_id)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers) response = self.watson_open_scale.send(request) if hasattr(Status, 'from_dict'): response.result = Status.from_dict(response.result) return response ######################### # Distributions ######################### class Distributions: """ Distributions """ def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None: """ Construct a Distributions client for the Watson OpenScale service. :param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service. """ self.watson_open_scale = watson_open_scale def add(self, data_set_id: str, start: str, end: str, dataset: str, group: List[str], *, limit: float = None, filter: str = None, agg: List[str] = None, max_bins: float = None, nocache: bool = None, **kwargs ) -> DetailedResponse: """ add new data disbributions. add new data disbributions. :param str data_set_id: ID of the data set. :param str start: start datetime in ISO format. :param str end: end datetime in ISO format. :param str dataset: type of a data set. :param List[str] group: names of columns to be grouped. :param float limit: (optional) limit for number of rows, by default it is 50,000 (max possible limit is 50,000). :param str filter: (optional) Filters defined by user in format: {field_name}:{op}:{value}. Partly compatible with filters in "filter" parameter of GET /v2/data_sets/{data_set_id}/records. Possible filter operators: * eq - equals (numeric, string) * gt - greater than (numeric) * gte - greater than or equal (numeric) * lt - lower than (numeric) * lte - lower than or equal (numeric) * in - value in a set (numeric, string) * field:null (a no-argument filter) - value is null (any nullable) * field:exists (a no-argument filter) - value is not null (any column). :param List[str] agg: (optional) Definition of aggregations, by default 'count'. Aggregations can be one of: * count * <column_name>:sum * <column_name>:min * <column_name>:max * <column_name>:avg * <column_name>:stddev. :param float max_bins: (optional) max number of bins which will be generated for data. :param bool nocache: (optional) force columns data refresh. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `DataDistributionResponse` result """ if data_set_id is None: raise ValueError('data_set_id must be provided') if start is None: raise ValueError('start must be provided') if end is None: raise ValueError('end must be provided') if dataset is None: raise ValueError('dataset must be provided') if group is None: raise ValueError('group must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='distributions_add') headers.update(sdk_headers) params = { 'nocache': nocache } data = { 'start': start, 'end': end, 'dataset': dataset, 'group': group, 'limit': limit, 'filter': filter, 'agg': agg, 'max_bins': max_bins } data = {k: v for (k, v) in data.items() if v is not None} data = json.dumps(data) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/data_sets/{0}/distributions'.format( *self.watson_open_scale.encode_path_vars(data_set_id)) request = self.watson_open_scale.prepare_request(method='POST', url=url, headers=headers, params=params, data=data) response = self.watson_open_scale.send(request) if hasattr(DataDistributionResponse, 'from_dict'): response.result = DataDistributionResponse.from_dict(response.result) return response def delete(self, data_set_id: str, **kwargs ) -> DetailedResponse: """ Delete data distributions. Delete data distribution. :param str data_set_id: ID of the data set. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse """ if data_set_id is None: raise ValueError('data_set_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='distributions_delete') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) url = '/v2/data_sets/{0}/distributions'.format( *self.watson_open_scale.encode_path_vars(data_set_id)) request = self.watson_open_scale.prepare_request(method='DELETE', url=url, headers=headers) response = self.watson_open_scale.send(request) return response def get(self, data_set_id: str, data_distribution_id: str, **kwargs ) -> DetailedResponse: """ Get a specific data distribution. Get a specific data distribution. :param str data_set_id: ID of the data set. :param str data_distribution_id: ID of the data distribution requested to be calculated. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `DataDistributionResponse` result """ if data_set_id is None: raise ValueError('data_set_id must be provided') if data_distribution_id is None: raise ValueError('data_distribution_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='distributions_get') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/data_sets/{0}/distributions/{1}'.format( *self.watson_open_scale.encode_path_vars(data_set_id, data_distribution_id)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers) response = self.watson_open_scale.send(request) if hasattr(DataDistributionResponse, 'from_dict'): response.result = DataDistributionResponse.from_dict(response.result) return response ######################### # Monitors ######################### class Monitors: """ Monitors """ def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None: """ Construct a Monitors client for the Watson OpenScale service. :param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service. """ self.watson_open_scale = watson_open_scale def list(self, *, name: str = None, **kwargs ) -> DetailedResponse: """ List available monitors. List available monitors. :param str name: (optional) comma-separeted list of names. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitorCollections` result """ headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='monitors_list') headers.update(sdk_headers) params = { 'name': name } if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/monitor_definitions' request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers, params=params) response = self.watson_open_scale.send(request) if hasattr(MonitorCollections, 'from_dict'): response.result = MonitorCollections.from_dict(response.result) return response def add(self, name: str, metrics: List['MonitorMetricRequest'], tags: List['MonitorTagRequest'], *, description: str = None, applies_to: 'ApplicabilitySelection' = None, parameters_schema: dict = None, managed_by: str = None, schedule: 'MonitorInstanceSchedule' = None, schedules: 'MonitorInstanceScheduleCollection' = None, **kwargs ) -> DetailedResponse: """ Add custom monitor. Add custom monitor. :param str name: Monitor UI label (must be unique). :param List[MonitorMetricRequest] metrics: A list of metric definition. :param List[MonitorTagRequest] tags: Available tags. :param str description: (optional) Long monitoring description presented in monitoring catalog. :param ApplicabilitySelection applies_to: (optional) :param dict parameters_schema: (optional) JSON schema that will be used to validate monitoring parameters when enabled. :param str managed_by: (optional) :param MonitorInstanceSchedule schedule: (optional) The schedule used to control how frequently the target is monitored. The maximum frequency is once every 30 minutes. Defaults to once every hour if not specified. :param MonitorInstanceScheduleCollection schedules: (optional) A set of schedules used to control how frequently the target is monitored for online and batch deployment type. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitorDisplayForm` result """ if name is None: raise ValueError('name must be provided') if metrics is None: raise ValueError('metrics must be provided') if tags is None: raise ValueError('tags must be provided') metrics = [convert_model(x) for x in metrics] tags = [convert_model(x) for x in tags] if applies_to is not None: applies_to = convert_model(applies_to) if schedule is not None: schedule = convert_model(schedule) if schedules is not None: schedules = convert_model(schedules) headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='monitors_add') headers.update(sdk_headers) data = { 'name': name, 'metrics': metrics, 'tags': tags, 'description': description, 'applies_to': applies_to, 'parameters_schema': parameters_schema, 'managed_by': managed_by, 'schedule': schedule, 'schedules': schedules } data = {k: v for (k, v) in data.items() if v is not None} data = json.dumps(data) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/monitor_definitions' request = self.watson_open_scale.prepare_request(method='POST', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(MonitorDisplayForm, 'from_dict'): response.result = MonitorDisplayForm.from_dict(response.result) return response def get(self, monitor_definition_id: str, **kwargs ) -> DetailedResponse: """ Get a specific monitor definition. Get a specific monitor definition. :param str monitor_definition_id: Unique monitor definition ID. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitorDisplayForm` result """ if monitor_definition_id is None: raise ValueError('monitor_definition_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='monitors_get') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/monitor_definitions/{0}'.format( *self.watson_open_scale.encode_path_vars(monitor_definition_id)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers) response = self.watson_open_scale.send(request) if hasattr(MonitorDisplayForm, 'from_dict'): response.result = MonitorDisplayForm.from_dict(response.result) return response def update(self, monitor_definition_id: str, name: str, metrics: List['MonitorMetricRequest'], tags: List['MonitorTagRequest'], *, description: str = None, applies_to: 'ApplicabilitySelection' = None, parameters_schema: dict = None, managed_by: str = None, schedule: 'MonitorInstanceSchedule' = None, schedules: 'MonitorInstanceScheduleCollection' = None, **kwargs ) -> DetailedResponse: """ Update the monitor definition. Update a monitor definition. :param str monitor_definition_id: Unique monitor definition ID. :param str name: Monitor UI label (must be unique). :param List[MonitorMetricRequest] metrics: A list of metric definition. :param List[MonitorTagRequest] tags: Available tags. :param str description: (optional) Long monitoring description presented in monitoring catalog. :param ApplicabilitySelection applies_to: (optional) :param dict parameters_schema: (optional) JSON schema that will be used to validate monitoring parameters when enabled. :param str managed_by: (optional) :param MonitorInstanceSchedule schedule: (optional) The schedule used to control how frequently the target is monitored. The maximum frequency is once every 30 minutes. Defaults to once every hour if not specified. :param MonitorInstanceScheduleCollection schedules: (optional) A set of schedules used to control how frequently the target is monitored for online and batch deployment type. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitorDisplayForm` result """ if monitor_definition_id is None: raise ValueError('monitor_definition_id must be provided') if name is None: raise ValueError('name must be provided') if metrics is None: raise ValueError('metrics must be provided') if tags is None: raise ValueError('tags must be provided') metrics = [convert_model(x) for x in metrics] tags = [convert_model(x) for x in tags] if applies_to is not None: applies_to = convert_model(applies_to) if schedule is not None: schedule = convert_model(schedule) if schedules is not None: schedules = convert_model(schedules) headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='monitors_update') headers.update(sdk_headers) data = { 'name': name, 'metrics': metrics, 'tags': tags, 'description': description, 'applies_to': applies_to, 'parameters_schema': parameters_schema, 'managed_by': managed_by, 'schedule': schedule, 'schedules': schedules } data = {k: v for (k, v) in data.items() if v is not None} data = json.dumps(data) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/monitor_definitions/{0}'.format( *self.watson_open_scale.encode_path_vars(monitor_definition_id)) request = self.watson_open_scale.prepare_request(method='PUT', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(MonitorDisplayForm, 'from_dict'): response.result = MonitorDisplayForm.from_dict(response.result) return response def patch(self, monitor_definition_id: str, json_patch_operation: List['JsonPatchOperation'], **kwargs ) -> DetailedResponse: """ Update a monitor definition. Update a monitor definition. :param str monitor_definition_id: Unique monitor definition ID. :param List[JsonPatchOperation] json_patch_operation: :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitorDisplayForm` result """ if monitor_definition_id is None: raise ValueError('monitor_definition_id must be provided') if json_patch_operation is None: raise ValueError('json_patch_operation must be provided') json_patch_operation = [convert_model(x) for x in json_patch_operation] headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='monitors_patch') headers.update(sdk_headers) data = json.dumps(json_patch_operation) headers['content-type'] = 'application/json-patch+json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/monitor_definitions/{0}'.format( *self.watson_open_scale.encode_path_vars(monitor_definition_id)) request = self.watson_open_scale.prepare_request(method='PATCH', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(MonitorDisplayForm, 'from_dict'): response.result = MonitorDisplayForm.from_dict(response.result) return response def delete(self, monitor_definition_id: str, **kwargs ) -> DetailedResponse: """ Delete a monitor definition. Delete a monitor definition. :param str monitor_definition_id: Unique monitor definition ID. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse """ if monitor_definition_id is None: raise ValueError('monitor_definition_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='monitors_delete') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) url = '/v2/monitor_definitions/{0}'.format( *self.watson_open_scale.encode_path_vars(monitor_definition_id)) request = self.watson_open_scale.prepare_request(method='DELETE', url=url, headers=headers) response = self.watson_open_scale.send(request) return response ######################### # Instances ######################### class Instances: """ Instances """ def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None: """ Construct a Instances client for the Watson OpenScale service. :param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service. """ self.watson_open_scale = watson_open_scale def list(self, *, data_mart_id: str = None, monitor_definition_id: str = None, target_target_id: str = None, target_target_type: str = None, **kwargs ) -> DetailedResponse: """ List monitor instances. List monitor instances. :param str data_mart_id: (optional) comma-separeted list of IDs. :param str monitor_definition_id: (optional) comma-separeted list of IDs. :param str target_target_id: (optional) comma-separeted list of IDs. :param str target_target_type: (optional) comma-separeted list of types. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitorInstanceCollection` result """ headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='instances_list') headers.update(sdk_headers) params = { 'data_mart_id': data_mart_id, 'monitor_definition_id': monitor_definition_id, 'target.target_id': target_target_id, 'target.target_type': target_target_type } if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/monitor_instances' request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers, params=params) response = self.watson_open_scale.send(request) if hasattr(MonitorInstanceCollection, 'from_dict'): response.result = MonitorInstanceCollection.from_dict(response.result) return response def add(self, data_mart_id: str, monitor_definition_id: str, target: 'Target', *, parameters: dict = None, thresholds: List['MetricThresholdOverride'] = None, schedule: 'MonitorInstanceSchedule' = None, schedules: 'MonitorInstanceScheduleCollection' = None, schedule_id: str = None, managed_by: str = None, unprocessed_records: 'RecordsCountSummary' = None, total_records: 'RecordsCountSummary' = None, skip_scheduler: bool = None, **kwargs ) -> DetailedResponse: """ Create a new monitor instance. Create a new monitor instance. :param str data_mart_id: :param str monitor_definition_id: :param Target target: :param dict parameters: (optional) Monitoring parameters consistent with the `parameters_schema` from the monitor definition. :param List[MetricThresholdOverride] thresholds: (optional) :param MonitorInstanceSchedule schedule: (optional) The schedule used to control how frequently the target is monitored. The maximum frequency is once every 30 minutes. Defaults to once every hour if not specified. :param MonitorInstanceScheduleCollection schedules: (optional) A set of schedules used to control how frequently the target is monitored for online and batch deployment type. :param str schedule_id: (optional) :param str managed_by: (optional) :param RecordsCountSummary unprocessed_records: (optional) Summary about records count. :param RecordsCountSummary total_records: (optional) Summary about records count. :param bool skip_scheduler: (optional) prevent schedule creation for this monitor instnace. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitorInstanceResponse` result """ if data_mart_id is None: raise ValueError('data_mart_id must be provided') if monitor_definition_id is None: raise ValueError('monitor_definition_id must be provided') if target is None: raise ValueError('target must be provided') target = convert_model(target) if thresholds is not None: thresholds = [convert_model(x) for x in thresholds] if schedule is not None: schedule = convert_model(schedule) if schedules is not None: schedules = convert_model(schedules) if unprocessed_records is not None: unprocessed_records = convert_model(unprocessed_records) if total_records is not None: total_records = convert_model(total_records) headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='instances_add') headers.update(sdk_headers) params = { 'skip_scheduler': skip_scheduler } data = { 'data_mart_id': data_mart_id, 'monitor_definition_id': monitor_definition_id, 'target': target, 'parameters': parameters, 'thresholds': thresholds, 'schedule': schedule, 'schedules': schedules, 'schedule_id': schedule_id, 'managed_by': managed_by, 'unprocessed_records': unprocessed_records, 'total_records': total_records } data = {k: v for (k, v) in data.items() if v is not None} data = json.dumps(data) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/monitor_instances' request = self.watson_open_scale.prepare_request(method='POST', url=url, headers=headers, params=params, data=data) response = self.watson_open_scale.send(request) if hasattr(MonitorInstanceResponse, 'from_dict'): response.result = MonitorInstanceResponse.from_dict(response.result) return response def get(self, monitor_instance_id: str, *, expand: str = None, **kwargs ) -> DetailedResponse: """ Get monitor instance details. Get monitor instance details. :param str monitor_instance_id: Unique monitor instance ID. :param str expand: (optional) comma-separeted list of fields (supported fields are unprocessed_records and total_records). :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitorInstanceResponse` result """ if monitor_instance_id is None: raise ValueError('monitor_instance_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='instances_get') headers.update(sdk_headers) params = { 'expand': expand } if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/monitor_instances/{0}'.format( *self.watson_open_scale.encode_path_vars(monitor_instance_id)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers, params=params) response = self.watson_open_scale.send(request) if hasattr(MonitorInstanceResponse, 'from_dict'): response.result = MonitorInstanceResponse.from_dict(response.result) return response def update(self, monitor_instance_id: str, patch_document: List['PatchDocument'], *, update_metadata_only: bool = None, **kwargs ) -> DetailedResponse: """ Update a monitor instance. Update a monitor instance. :param str monitor_instance_id: Unique monitor instance ID. :param List[PatchDocument] patch_document: :param bool update_metadata_only: (optional) Flag that allows to control if the underlaying actions related to the monitor reconfiguration should be triggered. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitorInstanceResponse` result """ if monitor_instance_id is None: raise ValueError('monitor_instance_id must be provided') if patch_document is None: raise ValueError('patch_document must be provided') patch_document = [convert_model(x) for x in patch_document] headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='instances_update') headers.update(sdk_headers) params = { 'update_metadata_only': update_metadata_only } data = json.dumps(patch_document) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/monitor_instances/{0}'.format( *self.watson_open_scale.encode_path_vars(monitor_instance_id)) request = self.watson_open_scale.prepare_request(method='PATCH', url=url, headers=headers, params=params, data=data) response = self.watson_open_scale.send(request) if hasattr(MonitorInstanceResponse, 'from_dict'): response.result = MonitorInstanceResponse.from_dict(response.result) return response def delete(self, monitor_instance_id: str, **kwargs ) -> DetailedResponse: """ Delete a monitor instance. Delete a monitor instance. :param str monitor_instance_id: Unique monitor instance ID. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse """ if monitor_instance_id is None: raise ValueError('monitor_instance_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='instances_delete') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) url = '/v2/monitor_instances/{0}'.format( *self.watson_open_scale.encode_path_vars(monitor_instance_id)) request = self.watson_open_scale.prepare_request(method='DELETE', url=url, headers=headers) response = self.watson_open_scale.send(request) return response ######################### # Runs ######################### class Runs: """ Runs """ def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None: """ Construct a Runs client for the Watson OpenScale service. :param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service. """ self.watson_open_scale = watson_open_scale def add(self, monitor_instance_id: str, *, triggered_by: str = None, parameters: dict = None, business_metric_context: 'MonitoringRunBusinessMetricContext' = None, expiration_date: datetime = None, **kwargs ) -> DetailedResponse: """ Trigger monitoring run. Trigger monitoring run. :param str monitor_instance_id: Unique monitor instance ID. :param str triggered_by: (optional) An identifier representing the source that triggered the run request (optional). One of: event, scheduler, user, webhook. :param dict parameters: (optional) Monitoring parameters consistent with the `parameters_schema` from the monitor definition. :param MonitoringRunBusinessMetricContext business_metric_context: (optional) Properties defining the business metric context in the triggered run of AI metric calculation. :param datetime expiration_date: (optional) The timestamp when the monitoring run was created with expiry date (in the format YYYY-MM-DDTHH:mm:ssZ or YYYY-MM-DDTHH:mm:ss.sssZ, matching the date-time format as specified by RFC 3339). :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitoringRun` result """ if monitor_instance_id is None: raise ValueError('monitor_instance_id must be provided') if business_metric_context is not None: business_metric_context = convert_model(business_metric_context) if expiration_date is not None: expiration_date = datetime_to_string(expiration_date) headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='runs_add') headers.update(sdk_headers) data = { 'triggered_by': triggered_by, 'parameters': parameters, 'business_metric_context': business_metric_context, 'expiration_date': expiration_date } data = {k: v for (k, v) in data.items() if v is not None} data = json.dumps(data) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/monitor_instances/{0}/runs'.format( *self.watson_open_scale.encode_path_vars(monitor_instance_id)) request = self.watson_open_scale.prepare_request(method='POST', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(MonitoringRun, 'from_dict'): response.result = MonitoringRun.from_dict(response.result) return response def list(self, monitor_instance_id: str, *, start: str = None, limit: int = None, **kwargs ) -> DetailedResponse: """ List monitoring runs. List monitoring runs. :param str monitor_instance_id: Unique monitor instance ID. :param str start: (optional) The page token indicating where to start paging from. :param int limit: (optional) The limit of the number of items to return, for example limit=50. If not specified a default of 100 will be used. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitoringRunCollection` result """ if monitor_instance_id is None: raise ValueError('monitor_instance_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='runs_list') headers.update(sdk_headers) params = { 'start': start, 'limit': limit } if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/monitor_instances/{0}/runs'.format( *self.watson_open_scale.encode_path_vars(monitor_instance_id)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers, params=params) response = self.watson_open_scale.send(request) if hasattr(MonitoringRunCollection, 'from_dict'): response.result = MonitoringRunCollection.from_dict(response.result) return response def get(self, monitor_instance_id: str, monitoring_run_id: str, **kwargs ) -> DetailedResponse: """ Get monitoring run details. Get monitoring run details. :param str monitor_instance_id: Unique monitor instance ID. :param str monitoring_run_id: Unique monitoring run ID. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitoringRun` result """ if monitor_instance_id is None: raise ValueError('monitor_instance_id must be provided') if monitoring_run_id is None: raise ValueError('monitoring_run_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='runs_get') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/monitor_instances/{0}/runs/{1}'.format( *self.watson_open_scale.encode_path_vars(monitor_instance_id, monitoring_run_id)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers) response = self.watson_open_scale.send(request) if hasattr(MonitoringRun, 'from_dict'): response.result = MonitoringRun.from_dict(response.result) return response def update(self, monitor_instance_id: str, monitoring_run_id: str, json_patch_operation: List['JsonPatchOperation'], **kwargs ) -> DetailedResponse: """ Update existing monitoring run details. Update existing monitoring run details. :param str monitor_instance_id: Unique monitor instance ID. :param str monitoring_run_id: Unique monitoring run ID. :param List[JsonPatchOperation] json_patch_operation: :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitoringRun` result """ if monitor_instance_id is None: raise ValueError('monitor_instance_id must be provided') if monitoring_run_id is None: raise ValueError('monitoring_run_id must be provided') if json_patch_operation is None: raise ValueError('json_patch_operation must be provided') json_patch_operation = [convert_model(x) for x in json_patch_operation] headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='runs_update') headers.update(sdk_headers) data = json.dumps(json_patch_operation) headers['content-type'] = 'application/json-patch+json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/monitor_instances/{0}/runs/{1}'.format( *self.watson_open_scale.encode_path_vars(monitor_instance_id, monitoring_run_id)) request = self.watson_open_scale.prepare_request(method='PATCH', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(MonitoringRun, 'from_dict'): response.result = MonitoringRun.from_dict(response.result) return response ######################### # Measurements ######################### class Measurements: """ Measurements """ def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None: """ Construct a Measurements client for the Watson OpenScale service. :param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service. """ self.watson_open_scale = watson_open_scale def add(self, monitor_instance_id: str, monitor_measurement_request: List['MonitorMeasurementRequest'], **kwargs ) -> DetailedResponse: """ Publish measurement data to OpenScale. Publish measurement data to OpenScale. :param str monitor_instance_id: Unique monitor instance ID. :param List[MonitorMeasurementRequest] monitor_measurement_request: :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse """ if monitor_instance_id is None: raise ValueError('monitor_instance_id must be provided') if monitor_measurement_request is None: raise ValueError('monitor_measurement_request must be provided') monitor_measurement_request = [convert_model(x) for x in monitor_measurement_request] headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='measurements_add') headers.update(sdk_headers) data = json.dumps(monitor_measurement_request) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) url = '/v2/monitor_instances/{0}/measurements'.format( *self.watson_open_scale.encode_path_vars(monitor_instance_id)) request = self.watson_open_scale.prepare_request(method='POST', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) return response def list(self, monitor_instance_id: str, start: datetime, end: datetime, run_id: str, *, filter: str = None, limit: int = None, **kwargs ) -> DetailedResponse: """ Query measeurements from OpenScale DataMart. Query measeurements from OpenScale DataMart. It is required to either provide a `start end` or `run_id` parameter. :param str monitor_instance_id: Unique monitor instance ID. :param datetime start: Beginning of the time range. :param datetime end: End of the time range. :param str run_id: Comma delimited list of measurement run_id. :param str filter: (optional) Filter expression can consist of any metric tag or a common column of string type followed by filter name and optionally a value, all delimited by colon. Supported filters are: `in`, `eq`, `null` and `exists`. Sample filters are: `filter=region:in:[us,pl],segment:eq:sales` or `filter=region:null,segment:exists`. :param int limit: (optional) Maximum number of elements returned. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitorMeasurementResponseCollection` result """ if monitor_instance_id is None: raise ValueError('monitor_instance_id must be provided') if start is None: raise ValueError('start must be provided') if end is None: raise ValueError('end must be provided') if run_id is None: raise ValueError('run_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='measurements_list') headers.update(sdk_headers) params = { 'start': start, 'end': end, 'run_id': run_id, 'filter': filter, 'limit': limit } if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/monitor_instances/{0}/measurements'.format( *self.watson_open_scale.encode_path_vars(monitor_instance_id)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers, params=params) response = self.watson_open_scale.send(request) if hasattr(MonitorMeasurementResponseCollection, 'from_dict'): response.result = MonitorMeasurementResponseCollection.from_dict(response.result) return response def delete(self, monitor_instance_id: str, **kwargs ) -> DetailedResponse: """ Delete measeurements for a given monitor instance. Delete measeurements for a given monitor_instance_id. :param str monitor_instance_id: Unique monitor instance ID. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse """ if monitor_instance_id is None: raise ValueError('monitor_instance_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='measurements_delete') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) url = '/v2/monitor_instances/{0}/measurements'.format( *self.watson_open_scale.encode_path_vars(monitor_instance_id)) request = self.watson_open_scale.prepare_request(method='DELETE', url=url, headers=headers) response = self.watson_open_scale.send(request) return response def get(self, monitor_instance_id: str, measurement_id: str, **kwargs ) -> DetailedResponse: """ Get measeurement data from OpenScale DataMart. Get measeurement data from OpenScale DataMart. :param str monitor_instance_id: Unique monitor instance ID. :param str measurement_id: Uniq monitoring run ID. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MonitorMeasurementResponse` result """ if monitor_instance_id is None: raise ValueError('monitor_instance_id must be provided') if measurement_id is None: raise ValueError('measurement_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='measurements_get') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/monitor_instances/{0}/measurements/{1}'.format( *self.watson_open_scale.encode_path_vars(monitor_instance_id, measurement_id)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers) response = self.watson_open_scale.send(request) if hasattr(MonitorMeasurementResponse, 'from_dict'): response.result = MonitorMeasurementResponse.from_dict(response.result) return response def query(self, *, target_id: str = None, target_type: str = None, monitor_definition_id: str = None, filter: str = None, recent_count: int = None, format: str = None, **kwargs ) -> DetailedResponse: """ Query for the recent measurement. Query for the recent measurement grouped by the monitoring target (subscription or business application). :param str target_id: (optional) Comma separated ID of the monitoring target (subscription or business application). :param str target_type: (optional) Type of the monitoring target (subscription or business application). :param str monitor_definition_id: (optional) Comma separated ID of the monitor definition. :param str filter: (optional) Filter expression can consist of any metric tag or a common column of string type followed by filter name and optionally a value, all delimited by colon and prepended with `monitor_definition_id.` string. Supported filters are: `in`, `eq`, `null` and `exists`. Sample filters are: `monitor_definition_id.filter=region:in:[us,pl],monitor_definition_id.segment:eq:sales` or `filter=monitor_definition_id.region:null,monitor_definition_id.segment:exists`. Every monitor_definition_id can have own set of filters. :param int recent_count: (optional) Number of measurements (per target) to be returned. :param str format: (optional) Format of the returned data. `full` format compared to `compact` is additive and contains `sources` part. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `MeasurementsResponseCollection` result """ headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='measurements_query') headers.update(sdk_headers) params = { 'target_id': target_id, 'target_type': target_type, 'monitor_definition_id': monitor_definition_id, 'filter': filter, 'recent_count': recent_count, 'format': format } if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/measurements' request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers, params=params) response = self.watson_open_scale.send(request) if hasattr(MeasurementsResponseCollection, 'from_dict'): response.result = MeasurementsResponseCollection.from_dict(response.result) return response ######################### # Metrics ######################### class Metrics: """ Metrics """ def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None: """ Construct a Metrics client for the Watson OpenScale service. :param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service. """ self.watson_open_scale = watson_open_scale def list(self, monitor_instance_id: str, start: datetime, end: datetime, agg: str, *, interval: str = None, filter: str = None, group: str = None, **kwargs ) -> DetailedResponse: """ Query monitor instance metrics from OpenScale DataMart. Query monitor instance metrics from OpenScale DataMart. See <a href="https://github.ibm.com/aiopenscale/aios-datamart-service-api/wiki/1.3.-Metrics-Query-Language">Metrics Query Language documentation</a>. :param str monitor_instance_id: Unique monitor instance ID. :param datetime start: Calculations **inclusive**, internally floored to achieve full interval. If interval is vulnerable to time zone, the calculated value depends on a backend db engine: PostgreSQL respects time zone and DB2 use UTC time. Calculated value is returned in response. :param datetime end: Calculations **exclusive**, internally ceiled to achieve full interval. If interval is vulnerable to time zone, the calculated value depends on a backend db engine: PostgreSQL respects time zone and DB2 use UTC time. Calculated value is returned in response. :param str agg: Comma delimited function list constructed from metric name and function, e.g. `agg=metric_name:count,:last` that defines aggregations. :param str interval: (optional) Time unit in which metrics are grouped and aggregated, interval by interval. :param str filter: (optional) Filter expression can consist of any metric tag or a common column of string type followed by filter name and optionally a value, all delimited by colon. Supported filters are: `in`, `eq`, `null` and `exists`. Sample filters are: `filter=region:in:[us,pl],segment:eq:sales` or `filter=region:null,segment:exists`. :param str group: (optional) Comma delimited list constructed from metric tags, e.g. `group=region,segment` to group metrics before aggregations. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `DataMartGetMonitorInstanceMetrics` result """ if monitor_instance_id is None: raise ValueError('monitor_instance_id must be provided') if start is None: raise ValueError('start must be provided') if end is None: raise ValueError('end must be provided') if agg is None: raise ValueError('agg must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='metrics_list') headers.update(sdk_headers) params = { 'start': start, 'end': end, 'agg': agg, 'interval': interval, 'filter': filter, 'group': group } if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/monitor_instances/{0}/metrics'.format( *self.watson_open_scale.encode_path_vars(monitor_instance_id)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers, params=params) response = self.watson_open_scale.send(request) if hasattr(DataMartGetMonitorInstanceMetrics, 'from_dict'): response.result = DataMartGetMonitorInstanceMetrics.from_dict(response.result) return response ######################### # Business Applications ######################### class BusinessApplications: """ Business Applications """ def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None: """ Construct a BusinessApplications client for the Watson OpenScale service. :param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service. """ self.watson_open_scale = watson_open_scale def add(self, name: str, description: str, payload_fields: List['PayloadField'], business_metrics: List['BusinessMetric'], *, subscription_ids: List[str] = None, business_metrics_monitor_definition_id: str = None, business_metrics_monitor_instance_id: str = None, correlation_monitor_instance_id: str = None, business_payload_data_set_id: str = None, transaction_batches_data_set_id: str = None, **kwargs ) -> DetailedResponse: """ Add business application. Add the new business application. :param str name: name fo the business application. :param str description: description fo the business application. :param List[PayloadField] payload_fields: :param List[BusinessMetric] business_metrics: :param List[str] subscription_ids: (optional) :param str business_metrics_monitor_definition_id: (optional) :param str business_metrics_monitor_instance_id: (optional) :param str correlation_monitor_instance_id: (optional) :param str business_payload_data_set_id: (optional) Unique identifier of the data set (like scoring, feedback or business payload). :param str transaction_batches_data_set_id: (optional) Unique identifier of the data set (like scoring, feedback or business payload). :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `BusinessApplicationResponse` result """ if name is None: raise ValueError('name must be provided') if description is None: raise ValueError('description must be provided') if payload_fields is None: raise ValueError('payload_fields must be provided') if business_metrics is None: raise ValueError('business_metrics must be provided') payload_fields = [convert_model(x) for x in payload_fields] business_metrics = [convert_model(x) for x in business_metrics] headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='business_applications_add') headers.update(sdk_headers) data = { 'name': name, 'description': description, 'payload_fields': payload_fields, 'business_metrics': business_metrics, 'subscription_ids': subscription_ids, 'business_metrics_monitor_definition_id': business_metrics_monitor_definition_id, 'business_metrics_monitor_instance_id': business_metrics_monitor_instance_id, 'correlation_monitor_instance_id': correlation_monitor_instance_id, 'business_payload_data_set_id': business_payload_data_set_id, 'transaction_batches_data_set_id': transaction_batches_data_set_id } data = {k: v for (k, v) in data.items() if v is not None} data = json.dumps(data) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/business_applications' request = self.watson_open_scale.prepare_request(method='POST', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(BusinessApplicationResponse, 'from_dict'): response.result = BusinessApplicationResponse.from_dict(response.result) return response
[docs] def list(self, **kwargs ) -> DetailedResponse: """ List business applications. List configured business applications. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `BusinessApplicationsCollection` result """ headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='business_applications_list') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/business_applications' request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers) response = self.watson_open_scale.send(request) if hasattr(BusinessApplicationsCollection, 'from_dict'): response.result = BusinessApplicationsCollection.from_dict(response.result) return response
[docs] def get(self, business_application_id: str, **kwargs ) -> DetailedResponse: """ Get application details. Get business application details. :param str business_application_id: ID of the business application. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `BusinessApplicationResponse` result """ if business_application_id is None: raise ValueError('business_application_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='business_applications_get') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/business_applications/{0}'.format( *self.watson_open_scale.encode_path_vars(business_application_id)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers) response = self.watson_open_scale.send(request) if hasattr(BusinessApplicationResponse, 'from_dict'): response.result = BusinessApplicationResponse.from_dict(response.result) return response
def delete(self, business_application_id: str, **kwargs ) -> DetailedResponse: """ Delete business application. Delete business application. :param str business_application_id: ID of the business application. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse """ if business_application_id is None: raise ValueError('business_application_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='business_applications_delete') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) url = '/v2/business_applications/{0}'.format( *self.watson_open_scale.encode_path_vars(business_application_id)) request = self.watson_open_scale.prepare_request(method='DELETE', url=url, headers=headers) response = self.watson_open_scale.send(request) return response
[docs] def update(self, business_application_id: str, patch_document: List['PatchDocument'], **kwargs ) -> DetailedResponse: """ Update business application. Update business application. :param str business_application_id: ID of the business application. :param List[PatchDocument] patch_document: :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `BusinessApplicationResponse` result """ if business_application_id is None: raise ValueError('business_application_id must be provided') if patch_document is None: raise ValueError('patch_document must be provided') patch_document = [convert_model(x) for x in patch_document] headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='business_applications_update') headers.update(sdk_headers) data = json.dumps(patch_document) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/business_applications/{0}'.format( *self.watson_open_scale.encode_path_vars(business_application_id)) request = self.watson_open_scale.prepare_request(method='PATCH', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(BusinessApplicationResponse, 'from_dict'): response.result = BusinessApplicationResponse.from_dict(response.result) return response
######################### # Integrated Systems ######################### class IntegratedSystems: """ Integrated Systems """ def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None: """ Construct a IntegratedSystems client for the Watson OpenScale service. :param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service. """ self.watson_open_scale = watson_open_scale def list(self, **kwargs ) -> DetailedResponse: """ List integrated systems. List integrated systems. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `IntegratedSystemCollection` result """ headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='integrated_systems_list') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/integrated_systems' request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers) response = self.watson_open_scale.send(request) if hasattr(IntegratedSystemCollection, 'from_dict'): response.result = IntegratedSystemCollection.from_dict(response.result) return response def add(self, name: str, type: str, description: str, credentials: dict, *, connection: object = None, **kwargs ) -> DetailedResponse: """ Create a new integrated system. Create a new integrated system. :param str name: The name of the Integrated System. :param str type: :param str description: The description of the Integrated System. :param dict credentials: The credentials for the Integrated System. :param object connection: (optional) The additional connection information for the Integrated System. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `IntegratedSystemResponse` result """ if name is None: raise ValueError('name must be provided') if type is None: raise ValueError('type must be provided') if description is None: raise ValueError('description must be provided') if credentials is None: raise ValueError('credentials must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='integrated_systems_add') headers.update(sdk_headers) data = { 'name': name, 'type': type, 'description': description, 'credentials': credentials, 'connection': connection } data = {k: v for (k, v) in data.items() if v is not None} data = json.dumps(data) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/integrated_systems' request = self.watson_open_scale.prepare_request(method='POST', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(IntegratedSystemResponse, 'from_dict'): response.result = IntegratedSystemResponse.from_dict(response.result) return response def get(self, integrated_system_id: str, **kwargs ) -> DetailedResponse: """ Get a specific integrated system. Get a specific integrated system. :param str integrated_system_id: Unique integrated system ID. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `IntegratedSystemResponse` result """ if integrated_system_id is None: raise ValueError('integrated_system_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='integrated_systems_get') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/integrated_systems/{0}'.format( *self.watson_open_scale.encode_path_vars(integrated_system_id)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers) response = self.watson_open_scale.send(request) if hasattr(IntegratedSystemResponse, 'from_dict'): response.result = IntegratedSystemResponse.from_dict(response.result) return response def update(self, integrated_system_id: str, json_patch_operation: List['JsonPatchOperation'], **kwargs ) -> DetailedResponse: """ Update an integrated system. Update an integrated system. :param str integrated_system_id: Unique integrated system ID. :param List[JsonPatchOperation] json_patch_operation: :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `IntegratedSystemResponse` result """ if integrated_system_id is None: raise ValueError('integrated_system_id must be provided') if json_patch_operation is None: raise ValueError('json_patch_operation must be provided') json_patch_operation = [convert_model(x) for x in json_patch_operation] headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='integrated_systems_update') headers.update(sdk_headers) data = json.dumps(json_patch_operation) headers['content-type'] = 'application/json-patch+json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/integrated_systems/{0}'.format( *self.watson_open_scale.encode_path_vars(integrated_system_id)) request = self.watson_open_scale.prepare_request(method='PATCH', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(IntegratedSystemResponse, 'from_dict'): response.result = IntegratedSystemResponse.from_dict(response.result) return response def delete(self, integrated_system_id: str, **kwargs ) -> DetailedResponse: """ Delete an integrated system. Delete an integrated system. :param str integrated_system_id: Unique integrated system ID. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse """ if integrated_system_id is None: raise ValueError('integrated_system_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='integrated_systems_delete') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) url = '/v2/integrated_systems/{0}'.format( *self.watson_open_scale.encode_path_vars(integrated_system_id)) request = self.watson_open_scale.prepare_request(method='DELETE', url=url, headers=headers) response = self.watson_open_scale.send(request) return response ######################### # Operational Spaces ######################### class OperationalSpaces: """ Operational Spaces """ def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None: """ Construct a OperationalSpaces client for the Watson OpenScale service. :param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service. """ self.watson_open_scale = watson_open_scale def list(self, **kwargs ) -> DetailedResponse: """ List Operational Spaces. List Operational Spaces. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `OperationalSpaceCollection` result """ headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='operational_spaces_list') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/operational_spaces' request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers) response = self.watson_open_scale.send(request) if hasattr(OperationalSpaceCollection, 'from_dict'): response.result = OperationalSpaceCollection.from_dict(response.result) return response def add(self, name: str, *, description: str = None, **kwargs ) -> DetailedResponse: """ Create an operational space. Create an operational space. :param str name: The name of the Operational Space. :param str description: (optional) The description of the Operational Space. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `OperationalSpaceResponse` result """ if name is None: raise ValueError('name must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='operational_spaces_add') headers.update(sdk_headers) data = { 'name': name, 'description': description } data = {k: v for (k, v) in data.items() if v is not None} data = json.dumps(data) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/operational_spaces' request = self.watson_open_scale.prepare_request(method='POST', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(OperationalSpaceResponse, 'from_dict'): response.result = OperationalSpaceResponse.from_dict(response.result) return response def get(self, operational_space_id: str, **kwargs ) -> DetailedResponse: """ Get an operational space. Get an operational space. :param str operational_space_id: Unique Operational Space ID. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `OperationalSpaceResponse` result """ if operational_space_id is None: raise ValueError('operational_space_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='operational_spaces_get') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/operational_spaces/{0}'.format( *self.watson_open_scale.encode_path_vars(operational_space_id)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers) response = self.watson_open_scale.send(request) if hasattr(OperationalSpaceResponse, 'from_dict'): response.result = OperationalSpaceResponse.from_dict(response.result) return response def update(self, operational_space_id: str, json_patch_operation: List['JsonPatchOperation'], **kwargs ) -> DetailedResponse: """ Update an operational space. Update an operational space. :param str operational_space_id: Unique Operational Space ID. :param List[JsonPatchOperation] json_patch_operation: :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `OperationalSpaceResponse` result """ if operational_space_id is None: raise ValueError('operational_space_id must be provided') if json_patch_operation is None: raise ValueError('json_patch_operation must be provided') json_patch_operation = [convert_model(x) for x in json_patch_operation] headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='operational_spaces_update') headers.update(sdk_headers) data = json.dumps(json_patch_operation) headers['content-type'] = 'application/json-patch+json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/operational_spaces/{0}'.format( *self.watson_open_scale.encode_path_vars(operational_space_id)) request = self.watson_open_scale.prepare_request(method='PATCH', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(OperationalSpaceResponse, 'from_dict'): response.result = OperationalSpaceResponse.from_dict(response.result) return response def delete(self, operational_space_id: str, **kwargs ) -> DetailedResponse: """ Delete an operational space. Delete an operational space. :param str operational_space_id: Unique Operational Space ID. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse """ if operational_space_id is None: raise ValueError('operational_space_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='operational_spaces_delete') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) url = '/v2/operational_spaces/{0}'.format( *self.watson_open_scale.encode_path_vars(operational_space_id)) request = self.watson_open_scale.prepare_request(method='DELETE', url=url, headers=headers) response = self.watson_open_scale.send(request) return response ######################### # User Preferences ######################### class UserPreferences: """ User Preferences """ def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None: """ Construct a UserPreferences client for the Watson OpenScale service. :param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service. """ self.watson_open_scale = watson_open_scale def list(self, **kwargs ) -> DetailedResponse: """ Get User Preferences. Get User Preferences. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse """ headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='user_preferences_list') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/user_preferences' request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers) response = self.watson_open_scale.send(request) return response def patch(self, json_patch_operation: List['JsonPatchOperation'], **kwargs ) -> DetailedResponse: """ Update User Preferences. Update User Preferences. :param List[JsonPatchOperation] json_patch_operation: :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse """ if json_patch_operation is None: raise ValueError('json_patch_operation must be provided') json_patch_operation = [convert_model(x) for x in json_patch_operation] headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='user_preferences_patch') headers.update(sdk_headers) data = json.dumps(json_patch_operation) headers['content-type'] = 'application/json-patch+json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/user_preferences' request = self.watson_open_scale.prepare_request(method='PATCH', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) return response def get(self, user_preference_key: str, **kwargs ) -> DetailedResponse: """ Get a specific user prefrence. Get a specific user preference. :param str user_preference_key: key in user preferences. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `UserPreferencesGetResponse` result """ if user_preference_key is None: raise ValueError('user_preference_key must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='user_preferences_get') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/user_preferences/{0}'.format( *self.watson_open_scale.encode_path_vars(user_preference_key)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers) response = self.watson_open_scale.send(request) if hasattr(UserPreferencesGetResponse, 'from_dict'): response.result = UserPreferencesGetResponse.from_dict(response.result) return response def update(self, user_preference_key: str, user_preferences_update_request: 'UserPreferencesUpdateRequest', **kwargs ) -> DetailedResponse: """ Update the user preference. Update the user preference. :param str user_preference_key: key in user preferences. :param UserPreferencesUpdateRequest user_preferences_update_request: :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse """ if user_preference_key is None: raise ValueError('user_preference_key must be provided') if user_preferences_update_request is None: raise ValueError('user_preferences_update_request must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='user_preferences_update') headers.update(sdk_headers) data = json.dumps(user_preferences_update_request) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/user_preferences/{0}'.format( *self.watson_open_scale.encode_path_vars(user_preference_key)) request = self.watson_open_scale.prepare_request(method='PUT', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) return response def delete(self, user_preference_key: str, **kwargs ) -> DetailedResponse: """ Delete the user preference. Delete the user preference. :param str user_preference_key: key in user preferences. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse """ if user_preference_key is None: raise ValueError('user_preference_key must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='user_preferences_delete') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) url = '/v2/user_preferences/{0}'.format( *self.watson_open_scale.encode_path_vars(user_preference_key)) request = self.watson_open_scale.prepare_request(method='DELETE', url=url, headers=headers) response = self.watson_open_scale.send(request) return response ######################### # Explanation Tasks ######################### class ExplanationTasks: """ Explanation Tasks """ def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None: """ Construct a ExplanationTasks client for the Watson OpenScale service. :param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service. """ self.watson_open_scale = watson_open_scale def add(self, *, scoring_ids: List[str] = None, input_rows: List[dict] = None, explanation_types: List[str] = None, subscription_id: str = None, **kwargs ) -> DetailedResponse: """ Compute explanations. Submit tasks for computing explanation of predictions. :param List[str] scoring_ids: (optional) IDs of the scoring transaction. :param List[dict] input_rows: (optional) List of scoring transactions. :param List[str] explanation_types: (optional) Types of explanations to generate. :param str subscription_id: (optional) Unique subscription ID. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `PostExplanationTaskResponse` result """ headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='explanation_tasks_add') headers.update(sdk_headers) data = { 'scoring_ids': scoring_ids, 'input_rows': input_rows, 'explanation_types': explanation_types, 'subscription_id': subscription_id } data = {k: v for (k, v) in data.items() if v is not None} data = json.dumps(data) headers['content-type'] = 'application/json' if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/explanation_tasks' request = self.watson_open_scale.prepare_request(method='POST', url=url, headers=headers, data=data) response = self.watson_open_scale.send(request) if hasattr(PostExplanationTaskResponse, 'from_dict'): response.result = PostExplanationTaskResponse.from_dict(response.result) return response def list(self, *, offset: int = None, limit: int = None, subscription_id: str = None, scoring_id: str = None, status: str = None, **kwargs ) -> DetailedResponse: """ List all explanations. List of all the computed explanations. :param int offset: (optional) offset of the explanations to return. :param int limit: (optional) Maximum number of explanations to return. :param str subscription_id: (optional) Unique subscription ID. :param str scoring_id: (optional) ID of the scoring transaction. :param str status: (optional) Status of the explanation task. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `GetExplanationTasksResponse` result """ headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='explanation_tasks_list') headers.update(sdk_headers) params = { 'offset': offset, 'limit': limit, 'subscription_id': subscription_id, 'scoring_id': scoring_id, 'status': status } if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/explanation_tasks' request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers, params=params) response = self.watson_open_scale.send(request) if hasattr(GetExplanationTasksResponse, 'from_dict'): response.result = GetExplanationTasksResponse.from_dict(response.result) return response def get(self, explanation_task_id: str, *, subscription_id: str = None, **kwargs ) -> DetailedResponse: """ Get explanation. Get explanation for the given explanation task id. :param str explanation_task_id: ID of the explanation task. :param str subscription_id: (optional) Unique subscription ID. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse with `GetExplanationTaskResponse` result """ if explanation_task_id is None: raise ValueError('explanation_task_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='explanation_tasks_get') headers.update(sdk_headers) params = { 'subscription_id': subscription_id } if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/json' url = '/v2/explanation_tasks/{0}'.format( *self.watson_open_scale.encode_path_vars(explanation_task_id)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers, params=params) response = self.watson_open_scale.send(request) if hasattr(GetExplanationTaskResponse, 'from_dict'): response.result = GetExplanationTaskResponse.from_dict(response.result) return response ######################### # Drift Service ######################### class DriftService: """ Drift Service """ def __init__(self, watson_open_scale: WatsonOpenScaleV2) -> None: """ Construct a DriftService client for the Watson OpenScale service. :param WatsonOpenScaleV2 watson_open_scale: client for the Watson OpenScale service. """ self.watson_open_scale = watson_open_scale def drift_archive_head(self, monitor_instance_id: str, **kwargs ) -> DetailedResponse: """ Retrieves the Drift archive metadata. API to retrieve the Drift archive metadata. :param str monitor_instance_id: Unique monitor instance ID. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse """ if monitor_instance_id is None: raise ValueError('monitor_instance_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='drift_archive_head') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) url = '/v2/monitoring_services/drift/monitor_instances/{0}/archives'.format( *self.watson_open_scale.encode_path_vars(monitor_instance_id)) request = self.watson_open_scale.prepare_request(method='HEAD', url=url, headers=headers) response = self.watson_open_scale.send(request) return response def drift_archive_post(self, data_mart_id: str, subscription_id: str, body: BinaryIO, *, archive_name: str = None, enable_data_drift: bool = None, enable_model_drift: bool = None, **kwargs ) -> DetailedResponse: """ Upload Drift archives. API to upload drift archive such as the Drift Detection Model. :param str data_mart_id: ID of the data mart. :param str subscription_id: Unique subscription ID. :param BinaryIO body: :param str archive_name: (optional) The name of the archive being uploaded. :param bool enable_data_drift: (optional) Flag to enable/disable data drift. :param bool enable_model_drift: (optional) Flag to enable/disable model drift. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse """ if data_mart_id is None: raise ValueError('data_mart_id must be provided') if subscription_id is None: raise ValueError('subscription_id must be provided') if body is None: raise ValueError('body must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='drift_archive_post') headers.update(sdk_headers) params = { 'archive_name': archive_name, 'enable_data_drift': enable_data_drift, 'enable_model_drift': enable_model_drift } data = body headers['content-type'] = 'application/octet-stream' if 'headers' in kwargs: headers.update(kwargs.get('headers')) url = '/v2/monitoring_services/drift/data_marts/{0}/subscriptions/{1}/archives'.format( *self.watson_open_scale.encode_path_vars(data_mart_id, subscription_id)) request = self.watson_open_scale.prepare_request(method='POST', url=url, headers=headers, params=params, data=data) response = self.watson_open_scale.send(request) return response def drift_archive_get(self, monitor_instance_id: str, **kwargs ) -> DetailedResponse: """ Retrieves the Drift archives. API to retrieve the Drift archives. :param str monitor_instance_id: Unique monitor instance ID. :param dict headers: A `dict` containing the request headers :return: A `DetailedResponse` containing the result, headers and HTTP status code. :rtype: DetailedResponse """ if monitor_instance_id is None: raise ValueError('monitor_instance_id must be provided') headers = {} sdk_headers = get_sdk_headers(service_name=self.watson_open_scale.DEFAULT_SERVICE_NAME, service_version='V2', operation_id='drift_archive_get') headers.update(sdk_headers) if 'headers' in kwargs: headers.update(kwargs.get('headers')) headers['Accept'] = 'application/octet-stream' url = '/v2/monitoring_services/drift/monitor_instances/{0}/archives'.format( *self.watson_open_scale.encode_path_vars(monitor_instance_id)) request = self.watson_open_scale.prepare_request(method='GET', url=url, headers=headers) response = self.watson_open_scale.send(request) return response class DataSetsListEnums: """ Enums for data_sets_list parameters. """ class TargetTargetType(str, Enum): """ type of the target. """ SUBSCRIPTION = 'subscription' BUSINESS_APPLICATION = 'business_application' INSTANCE = 'instance' DATA_MART = 'data_mart' class Type(str, Enum): """ type of the data set. """ MANUAL_LABELING = 'manual_labeling' PAYLOAD_LOGGING = 'payload_logging' FEEDBACK = 'feedback' BUSINESS_PAYLOAD = 'business_payload' EXPLANATIONS = 'explanations' EXPLANATIONS_WHATIF = 'explanations_whatif' TRAINING = 'training' CUSTOM = 'custom' class RecordsAddEnums: """ Enums for records_add parameters. """ class ContentType(str, Enum): """ The type of the input. A character encoding can be specified by including a `charset` parameter. For example, 'text/csv;charset=utf-8'. """ APPLICATION_JSON = 'application/json' TEXT_CSV = 'text/csv' class OnError(str, Enum): """ expected behaviour on error. """ STOP = 'stop' CONTINUE = 'continue' class RecordsListEnums: """ Enums for records_list parameters. """ class Format(str, Enum): """ What JSON format to use on output. """ DICT = 'dict' LIST = 'list' class BinaryFormat(str, Enum): """ Binary data presentation format. By default, the binary field value is encoded to base64 string. If _reference_ is chosen, every binary field is moved to the _references_ section with value set to an uri to the particular field within the record that can be GET in a separate request. """ REFERENCE = 'reference' class RecordsGetEnums: """ Enums for records_get parameters. """ class BinaryFormat(str, Enum): """ Binary data presentation format. By default, the binary field value is encoded to base64 string. If _reference_ is chosen, every binary field is moved to the _references_ section with value set to an uri to the particular field within the record that can be GET in a separate request. """ REFERENCE = 'reference' class RecordsUpdateEnums: """ Enums for records_update parameters. """ class BinaryFormat(str, Enum): """ Binary data presentation format. By default, the binary field value is encoded to base64 string. If _reference_ is chosen, every binary field is moved to the _references_ section with value set to an uri to the particular field within the record that can be GET in a separate request. """ REFERENCE = 'reference' class RecordsQueryEnums: """ Enums for records_query parameters. """ class DataSetType(str, Enum): """ a (single) data set type. """ MANUAL_LABELING = 'manual_labeling' PAYLOAD_LOGGING = 'payload_logging' FEEDBACK = 'feedback' BUSINESS_PAYLOAD = 'business_payload' EXPLANATIONS = 'explanations' EXPLANATIONS_WHATIF = 'explanations_whatif' TRAINING = 'training' CUSTOM = 'custom' class MeasurementsQueryEnums: """ Enums for measurements_query parameters. """ class TargetType(str, Enum): """ Type of the monitoring target (subscription or business application). """ SUBSCRIPTION = 'subscription' BUSINESS_APPLICATION = 'business_application' INSTANCE = 'instance' DATA_MART = 'data_mart' class Format(str, Enum): """ Format of the returned data. `full` format compared to `compact` is additive and contains `sources` part. """ COMPACT = 'compact' FULL = 'full' class MetricsListEnums: """ Enums for metrics_list parameters. """ class Agg(str, Enum): """ Comma delimited function list constructed from metric name and function, e.g. `agg=metric_name:count,:last` that defines aggregations. """ LAST = 'last' FIRST = 'first' MAX = 'max' MIN = 'min' SUM = 'sum' AVG = 'avg' COUNT = 'count' STDDEV = 'stddev' class Interval(str, Enum): """ Time unit in which metrics are grouped and aggregated, interval by interval. """ MINUTE = 'minute' HOUR = 'hour' DAY = 'day' WEEK = 'week' MONTH = 'month' YEAR = 'year' class ExplanationTasksListEnums: """ Enums for explanation_tasks_list parameters. """ class Status(str, Enum): """ Status of the explanation task. """ IN_PROGRESS = 'in_progress' FINISHED = 'finished' ERROR = 'error' ############################################################################## # Models ############################################################################## class AnalyticsEngine(): """ AnalyticsEngine. :attr str type: (optional) Type of analytics engine. e.g. spark. :attr str integrated_system_id: (optional) id of the Integrated System. :attr object credentials: (optional) Credentials to override credentials in integration_reference. :attr object parameters: (optional) Additional parameters (e.g. max_num_executors, min_num_executors, executor_cores, executor_memory, driver_cores, driver_memory). """ def __init__(self, *, type: str = None, integrated_system_id: str = None, credentials: object = None, parameters: object = None) -> None: """ Initialize a AnalyticsEngine object. :param str type: (optional) Type of analytics engine. e.g. spark. :param str integrated_system_id: (optional) id of the Integrated System. :param object credentials: (optional) Credentials to override credentials in integration_reference. :param object parameters: (optional) Additional parameters (e.g. max_num_executors, min_num_executors, executor_cores, executor_memory, driver_cores, driver_memory). """ self.type = type self.integrated_system_id = integrated_system_id self.credentials = credentials self.parameters = parameters @classmethod def from_dict(cls, _dict: Dict) -> 'AnalyticsEngine': """Initialize a AnalyticsEngine object from a json dictionary.""" args = {} if 'type' in _dict: args['type'] = _dict.get('type') if 'integrated_system_id' in _dict: args['integrated_system_id'] = _dict.get('integrated_system_id') if 'credentials' in _dict: args['credentials'] = _dict.get('credentials') if 'parameters' in _dict: args['parameters'] = _dict.get('parameters') return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a AnalyticsEngine object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'type') and self.type is not None: _dict['type'] = self.type if hasattr(self, 'integrated_system_id') and self.integrated_system_id is not None: _dict['integrated_system_id'] = self.integrated_system_id if hasattr(self, 'credentials') and self.credentials is not None: _dict['credentials'] = self.credentials if hasattr(self, 'parameters') and self.parameters is not None: _dict['parameters'] = self.parameters return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this AnalyticsEngine object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'AnalyticsEngine') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'AnalyticsEngine') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class ApplicabilitySelection(): """ ApplicabilitySelection. :attr List[str] input_data_type: (optional) :attr List[str] problem_type: (optional) :attr List[str] target_type: (optional) """ def __init__(self, *, input_data_type: List[str] = None, problem_type: List[str] = None, target_type: List[str] = None) -> None: """ Initialize a ApplicabilitySelection object. :param List[str] input_data_type: (optional) :param List[str] problem_type: (optional) :param List[str] target_type: (optional) """ self.input_data_type = input_data_type self.problem_type = problem_type self.target_type = target_type @classmethod def from_dict(cls, _dict: Dict) -> 'ApplicabilitySelection': """Initialize a ApplicabilitySelection object from a json dictionary.""" args = {} if 'input_data_type' in _dict: args['input_data_type'] = _dict.get('input_data_type') if 'problem_type' in _dict: args['problem_type'] = _dict.get('problem_type') if 'target_type' in _dict: args['target_type'] = _dict.get('target_type') return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a ApplicabilitySelection object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'input_data_type') and self.input_data_type is not None: _dict['input_data_type'] = self.input_data_type if hasattr(self, 'problem_type') and self.problem_type is not None: _dict['problem_type'] = self.problem_type if hasattr(self, 'target_type') and self.target_type is not None: _dict['target_type'] = self.target_type return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this ApplicabilitySelection object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'ApplicabilitySelection') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'ApplicabilitySelection') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class InputDataTypeEnum(str, Enum): """ input_data_type. """ STRUCTURED = 'structured' UNSTRUCTURED_IMAGE = 'unstructured_image' UNSTRUCTURED_TEXT = 'unstructured_text' UNSTRUCTURED_VIDEO = 'unstructured_video' UNSTRUCTURED_AUDIO = 'unstructured_audio' class ProblemTypeEnum(str, Enum): """ problem_type. """ BINARY = 'binary' REGRESSION = 'regression' MULTICLASS = 'multiclass' class TargetTypeEnum(str, Enum): """ Type of the target (e.g. subscription, business application, ...). """ SUBSCRIPTION = 'subscription' BUSINESS_APPLICATION = 'business_application' INSTANCE = 'instance' DATA_MART = 'data_mart'
[docs]class Asset(): """ Asset. :attr str asset_id: :attr str url: :attr str name: (optional) :attr str asset_type: :attr str asset_rn: (optional) Asset Resource Name (used for integration with 3rd party ML engines). :attr str created_at: (optional) :attr str problem_type: (optional) :attr str model_type: (optional) :attr str runtime_environment: (optional) :attr str input_data_type: (optional) """ def __init__(self, asset_id: str, url: str, asset_type: str, *, name: str = None, asset_rn: str = None, created_at: str = None, problem_type: str = None, model_type: str = None, runtime_environment: str = None, input_data_type: str = None) -> None: """ Initialize a Asset object. :param str asset_id: :param str url: :param str asset_type: :param str name: (optional) :param str asset_rn: (optional) Asset Resource Name (used for integration with 3rd party ML engines). :param str created_at: (optional) :param str problem_type: (optional) :param str model_type: (optional) :param str runtime_environment: (optional) :param str input_data_type: (optional) """ self.asset_id = asset_id self.url = url self.name = name self.asset_type = asset_type self.asset_rn = asset_rn self.created_at = created_at self.problem_type = problem_type self.model_type = model_type self.runtime_environment = runtime_environment self.input_data_type = input_data_type @classmethod def from_dict(cls, _dict: Dict) -> 'Asset': """Initialize a Asset object from a json dictionary.""" args = {} if 'asset_id' in _dict: args['asset_id'] = _dict.get('asset_id') else: raise ValueError('Required property \'asset_id\' not present in Asset JSON') if 'url' in _dict: args['url'] = _dict.get('url') else: raise ValueError('Required property \'url\' not present in Asset JSON') if 'name' in _dict: args['name'] = _dict.get('name') if 'asset_type' in _dict: args['asset_type'] = _dict.get('asset_type') else: raise ValueError('Required property \'asset_type\' not present in Asset JSON') if 'asset_rn' in _dict: args['asset_rn'] = _dict.get('asset_rn') if 'created_at' in _dict: args['created_at'] = _dict.get('created_at') if 'problem_type' in _dict: args['problem_type'] = _dict.get('problem_type') if 'model_type' in _dict: args['model_type'] = _dict.get('model_type') if 'runtime_environment' in _dict: args['runtime_environment'] = _dict.get('runtime_environment') if 'input_data_type' in _dict: args['input_data_type'] = _dict.get('input_data_type') return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a Asset object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'asset_id') and self.asset_id is not None: _dict['asset_id'] = self.asset_id if hasattr(self, 'url') and self.url is not None: _dict['url'] = self.url if hasattr(self, 'name') and self.name is not None: _dict['name'] = self.name if hasattr(self, 'asset_type') and self.asset_type is not None: _dict['asset_type'] = self.asset_type if hasattr(self, 'asset_rn') and self.asset_rn is not None: _dict['asset_rn'] = self.asset_rn if hasattr(self, 'created_at') and self.created_at is not None: _dict['created_at'] = self.created_at if hasattr(self, 'problem_type') and self.problem_type is not None: _dict['problem_type'] = self.problem_type if hasattr(self, 'model_type') and self.model_type is not None: _dict['model_type'] = self.model_type if hasattr(self, 'runtime_environment') and self.runtime_environment is not None: _dict['runtime_environment'] = self.runtime_environment if hasattr(self, 'input_data_type') and self.input_data_type is not None: _dict['input_data_type'] = self.input_data_type return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this Asset object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'Asset') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'Asset') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class AssetTypeEnum(str, Enum): """ asset_type. """ MODEL = 'model' FUNCTION = 'function' class ProblemTypeEnum(str, Enum): """ problem_type. """ BINARY = 'binary' REGRESSION = 'regression' MULTICLASS = 'multiclass' class InputDataTypeEnum(str, Enum): """ input_data_type. """ STRUCTURED = 'structured' UNSTRUCTURED_IMAGE = 'unstructured_image' UNSTRUCTURED_TEXT = 'unstructured_text' UNSTRUCTURED_VIDEO = 'unstructured_video' UNSTRUCTURED_AUDIO = 'unstructured_audio'
class AssetDeployment(): """ AssetDeployment. :attr str deployment_id: (optional) :attr str deployment_rn: (optional) Deployment Resource Name (used for integration with 3rd party ML engines). :attr str url: (optional) :attr str name: (optional) :attr str description: (optional) :attr str deployment_type: (optional) Deployment type, e.g. online, batch. :attr str created_at: (optional) :attr ScoringEndpoint scoring_endpoint: (optional) Definition of scoring endpoint in custom_machine_learning. """ def __init__(self, *, deployment_id: str = None, deployment_rn: str = None, url: str = None, name: str = None, description: str = None, deployment_type: str = None, created_at: str = None, scoring_endpoint: 'ScoringEndpoint' = None) -> None: """ Initialize a AssetDeployment object. :param str deployment_id: (optional) :param str deployment_rn: (optional) Deployment Resource Name (used for integration with 3rd party ML engines). :param str url: (optional) :param str name: (optional) :param str description: (optional) :param str deployment_type: (optional) Deployment type, e.g. online, batch. :param str created_at: (optional) :param ScoringEndpoint scoring_endpoint: (optional) Definition of scoring endpoint in custom_machine_learning. """ self.deployment_id = deployment_id self.deployment_rn = deployment_rn self.url = url self.name = name self.description = description self.deployment_type = deployment_type self.created_at = created_at self.scoring_endpoint = scoring_endpoint @classmethod def from_dict(cls, _dict: Dict) -> 'AssetDeployment': """Initialize a AssetDeployment object from a json dictionary.""" args = {} if 'deployment_id' in _dict: args['deployment_id'] = _dict.get('deployment_id') if 'deployment_rn' in _dict: args['deployment_rn'] = _dict.get('deployment_rn') if 'url' in _dict: args['url'] = _dict.get('url') if 'name' in _dict: args['name'] = _dict.get('name') if 'description' in _dict: args['description'] = _dict.get('description') if 'deployment_type' in _dict: args['deployment_type'] = _dict.get('deployment_type') if 'created_at' in _dict: args['created_at'] = _dict.get('created_at') if 'scoring_endpoint' in _dict: args['scoring_endpoint'] = ScoringEndpoint.from_dict(_dict.get('scoring_endpoint')) return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a AssetDeployment object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'deployment_id') and self.deployment_id is not None: _dict['deployment_id'] = self.deployment_id if hasattr(self, 'deployment_rn') and self.deployment_rn is not None: _dict['deployment_rn'] = self.deployment_rn if hasattr(self, 'url') and self.url is not None: _dict['url'] = self.url if hasattr(self, 'name') and self.name is not None: _dict['name'] = self.name if hasattr(self, 'description') and self.description is not None: _dict['description'] = self.description if hasattr(self, 'deployment_type') and self.deployment_type is not None: _dict['deployment_type'] = self.deployment_type if hasattr(self, 'created_at') and self.created_at is not None: _dict['created_at'] = self.created_at if hasattr(self, 'scoring_endpoint') and self.scoring_endpoint is not None: _dict['scoring_endpoint'] = self.scoring_endpoint.to_dict() return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this AssetDeployment object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'AssetDeployment') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'AssetDeployment') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other
[docs]class AssetDeploymentRequest(): """ AssetDeploymentRequest. :attr str deployment_id: :attr str deployment_rn: (optional) Deployment Resource Name (used for integration with 3rd party ML engines). :attr str url: (optional) :attr str name: :attr str description: (optional) :attr str deployment_type: Deployment type, e.g. online, batch. :attr str created_at: (optional) :attr ScoringEndpointRequest scoring_endpoint: (optional) Definition of scoring endpoint in custom_machine_learning. """ def __init__(self, deployment_id: str, name: str, deployment_type: str, *, deployment_rn: str = None, url: str = None, description: str = None, created_at: str = None, scoring_endpoint: 'ScoringEndpointRequest' = None) -> None: """ Initialize a AssetDeploymentRequest object. :param str deployment_id: :param str name: :param str deployment_type: Deployment type, e.g. online, batch. :param str deployment_rn: (optional) Deployment Resource Name (used for integration with 3rd party ML engines). :param str url: (optional) :param str description: (optional) :param str created_at: (optional) :param ScoringEndpointRequest scoring_endpoint: (optional) Definition of scoring endpoint in custom_machine_learning. """ self.deployment_id = deployment_id self.deployment_rn = deployment_rn self.url = url self.name = name self.description = description self.deployment_type = deployment_type self.created_at = created_at self.scoring_endpoint = scoring_endpoint @classmethod def from_dict(cls, _dict: Dict) -> 'AssetDeploymentRequest': """Initialize a AssetDeploymentRequest object from a json dictionary.""" args = {} if 'deployment_id' in _dict: args['deployment_id'] = _dict.get('deployment_id') else: raise ValueError('Required property \'deployment_id\' not present in AssetDeploymentRequest JSON') if 'deployment_rn' in _dict: args['deployment_rn'] = _dict.get('deployment_rn') if 'url' in _dict: args['url'] = _dict.get('url') if 'name' in _dict: args['name'] = _dict.get('name') else: raise ValueError('Required property \'name\' not present in AssetDeploymentRequest JSON') if 'description' in _dict: args['description'] = _dict.get('description') if 'deployment_type' in _dict: args['deployment_type'] = _dict.get('deployment_type') else: raise ValueError('Required property \'deployment_type\' not present in AssetDeploymentRequest JSON') if 'created_at' in _dict: args['created_at'] = _dict.get('created_at') if 'scoring_endpoint' in _dict: args['scoring_endpoint'] = ScoringEndpointRequest.from_dict(_dict.get('scoring_endpoint')) return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a AssetDeploymentRequest object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'deployment_id') and self.deployment_id is not None: _dict['deployment_id'] = self.deployment_id if hasattr(self, 'deployment_rn') and self.deployment_rn is not None: _dict['deployment_rn'] = self.deployment_rn if hasattr(self, 'url') and self.url is not None: _dict['url'] = self.url if hasattr(self, 'name') and self.name is not None: _dict['name'] = self.name if hasattr(self, 'description') and self.description is not None: _dict['description'] = self.description if hasattr(self, 'deployment_type') and self.deployment_type is not None: _dict['deployment_type'] = self.deployment_type if hasattr(self, 'created_at') and self.created_at is not None: _dict['created_at'] = self.created_at if hasattr(self, 'scoring_endpoint') and self.scoring_endpoint is not None: _dict['scoring_endpoint'] = self.scoring_endpoint.to_dict() return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this AssetDeploymentRequest object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'AssetDeploymentRequest') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'AssetDeploymentRequest') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other
class AssetProperties(): """ Additional asset properties (subject of discovery if not provided when creating the subscription). :attr str problem_type: (optional) :attr str model_type: (optional) :attr str runtime_environment: (optional) :attr SecretCleaned training_data_reference: (optional) :attr SparkStruct training_data_schema: (optional) :attr SparkStruct input_data_schema: (optional) :attr SparkStruct output_data_schema: (optional) :attr str label_column: (optional) :attr str predicted_target_field: (optional) Field with this name will be given modeling_role `decoded-target`. :attr str prediction_field: (optional) Field with this name will be given modeling_role `prediction`. :attr str transaction_id_field: (optional) Field with this name will have `transaction_id_key` metadata set to true. :attr str input_data_type: (optional) :attr dict dashboard_configuration: (optional) :attr List[str] feature_fields: (optional) Fields to be given modeling_role feature. :attr List[str] categorical_fields: (optional) Fields to be given metadata `measure` of value `discrete`. :attr List[str] probability_fields: (optional) Fields to be given modeling_role `class_probability` (for columns of double data type) or `probability` (for column of array data type). """ def __init__(self, *, problem_type: str = None, model_type: str = None, runtime_environment: str = None, training_data_reference: 'SecretCleaned' = None, training_data_schema: 'SparkStruct' = None, input_data_schema: 'SparkStruct' = None, output_data_schema: 'SparkStruct' = None, label_column: str = None, predicted_target_field: str = None, prediction_field: str = None, transaction_id_field: str = None, input_data_type: str = None, dashboard_configuration: dict = None, feature_fields: List[str] = None, categorical_fields: List[str] = None, probability_fields: List[str] = None) -> None: """ Initialize a AssetProperties object. :param str problem_type: (optional) :param str model_type: (optional) :param str runtime_environment: (optional) :param SecretCleaned training_data_reference: (optional) :param SparkStruct training_data_schema: (optional) :param SparkStruct input_data_schema: (optional) :param SparkStruct output_data_schema: (optional) :param str label_column: (optional) :param str predicted_target_field: (optional) Field with this name will be given modeling_role `decoded-target`. :param str prediction_field: (optional) Field with this name will be given modeling_role `prediction`. :param str transaction_id_field: (optional) Field with this name will have `transaction_id_key` metadata set to true. :param str input_data_type: (optional) :param dict dashboard_configuration: (optional) :param List[str] feature_fields: (optional) Fields to be given modeling_role feature. :param List[str] categorical_fields: (optional) Fields to be given metadata `measure` of value `discrete`. :param List[str] probability_fields: (optional) Fields to be given modeling_role `class_probability` (for columns of double data type) or `probability` (for column of array data type). """ self.problem_type = problem_type self.model_type = model_type self.runtime_environment = runtime_environment self.training_data_reference = training_data_reference self.training_data_schema = training_data_schema self.input_data_schema = input_data_schema self.output_data_schema = output_data_schema self.label_column = label_column self.predicted_target_field = predicted_target_field self.prediction_field = prediction_field self.transaction_id_field = transaction_id_field self.input_data_type = input_data_type self.dashboard_configuration = dashboard_configuration self.feature_fields = feature_fields self.categorical_fields = categorical_fields self.probability_fields = probability_fields @classmethod def from_dict(cls, _dict: Dict) -> 'AssetProperties': """Initialize a AssetProperties object from a json dictionary.""" args = {} if 'problem_type' in _dict: args['problem_type'] = _dict.get('problem_type') if 'model_type' in _dict: args['model_type'] = _dict.get('model_type') if 'runtime_environment' in _dict: args['runtime_environment'] = _dict.get('runtime_environment') if 'training_data_reference' in _dict: args['training_data_reference'] = SecretCleaned.from_dict(_dict.get('training_data_reference')) if 'training_data_schema' in _dict: args['training_data_schema'] = SparkStruct.from_dict(_dict.get('training_data_schema')) if 'input_data_schema' in _dict: args['input_data_schema'] = SparkStruct.from_dict(_dict.get('input_data_schema')) if 'output_data_schema' in _dict: args['output_data_schema'] = SparkStruct.from_dict(_dict.get('output_data_schema')) if 'label_column' in _dict: args['label_column'] = _dict.get('label_column') if 'predicted_target_field' in _dict: args['predicted_target_field'] = _dict.get('predicted_target_field') if 'prediction_field' in _dict: args['prediction_field'] = _dict.get('prediction_field') if 'transaction_id_field' in _dict: args['transaction_id_field'] = _dict.get('transaction_id_field') if 'input_data_type' in _dict: args['input_data_type'] = _dict.get('input_data_type') if 'dashboard_configuration' in _dict: args['dashboard_configuration'] = _dict.get('dashboard_configuration') if 'feature_fields' in _dict: args['feature_fields'] = _dict.get('feature_fields') if 'categorical_fields' in _dict: args['categorical_fields'] = _dict.get('categorical_fields') if 'probability_fields' in _dict: args['probability_fields'] = _dict.get('probability_fields') return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a AssetProperties object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'problem_type') and self.problem_type is not None: _dict['problem_type'] = self.problem_type if hasattr(self, 'model_type') and self.model_type is not None: _dict['model_type'] = self.model_type if hasattr(self, 'runtime_environment') and self.runtime_environment is not None: _dict['runtime_environment'] = self.runtime_environment if hasattr(self, 'training_data_reference') and self.training_data_reference is not None: _dict['training_data_reference'] = self.training_data_reference.to_dict() if hasattr(self, 'training_data_schema') and self.training_data_schema is not None: _dict['training_data_schema'] = self.training_data_schema.to_dict() if hasattr(self, 'input_data_schema') and self.input_data_schema is not None: _dict['input_data_schema'] = self.input_data_schema.to_dict() if hasattr(self, 'output_data_schema') and self.output_data_schema is not None: _dict['output_data_schema'] = self.output_data_schema.to_dict() if hasattr(self, 'label_column') and self.label_column is not None: _dict['label_column'] = self.label_column if hasattr(self, 'predicted_target_field') and self.predicted_target_field is not None: _dict['predicted_target_field'] = self.predicted_target_field if hasattr(self, 'prediction_field') and self.prediction_field is not None: _dict['prediction_field'] = self.prediction_field if hasattr(self, 'transaction_id_field') and self.transaction_id_field is not None: _dict['transaction_id_field'] = self.transaction_id_field if hasattr(self, 'input_data_type') and self.input_data_type is not None: _dict['input_data_type'] = self.input_data_type if hasattr(self, 'dashboard_configuration') and self.dashboard_configuration is not None: _dict['dashboard_configuration'] = self.dashboard_configuration if hasattr(self, 'feature_fields') and self.feature_fields is not None: _dict['feature_fields'] = self.feature_fields if hasattr(self, 'categorical_fields') and self.categorical_fields is not None: _dict['categorical_fields'] = self.categorical_fields if hasattr(self, 'probability_fields') and self.probability_fields is not None: _dict['probability_fields'] = self.probability_fields return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this AssetProperties object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'AssetProperties') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'AssetProperties') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class ProblemTypeEnum(str, Enum): """ problem_type. """ BINARY = 'binary' REGRESSION = 'regression' MULTICLASS = 'multiclass' class InputDataTypeEnum(str, Enum): """ input_data_type. """ STRUCTURED = 'structured' UNSTRUCTURED_IMAGE = 'unstructured_image' UNSTRUCTURED_TEXT = 'unstructured_text' UNSTRUCTURED_VIDEO = 'unstructured_video' UNSTRUCTURED_AUDIO = 'unstructured_audio'
[docs]class AssetPropertiesRequest(): """ Additional asset properties (subject of discovery if not provided when creating the subscription). :attr TrainingDataReference training_data_reference: (optional) :attr SparkStruct training_data_schema: (optional) :attr SparkStruct input_data_schema: (optional) :attr SparkStruct output_data_schema: (optional) :attr str label_column: (optional) :attr List[str] labels: (optional) :attr str predicted_target_field: (optional) Field with this name will be given modeling_role `decoded-target`. :attr str prediction_field: (optional) Field with this name will be given modeling_role `prediction`. :attr str transaction_id_field: (optional) Field with this name will have `transaction_id_key` metadata set to true. :attr dict dashboard_configuration: (optional) :attr List[str] feature_fields: (optional) Fields to be given modeling_role feature. :attr List[str] categorical_fields: (optional) Fields to be given metadata `measure` of value `discrete`. :attr List[str] probability_fields: (optional) Fields to be given modeling_role `class_probability` (for columns of double data type) or `probability` (for column of array data type). """ def __init__(self, *, training_data_reference: 'TrainingDataReference' = None, training_data_schema: 'SparkStruct' = None, input_data_schema: 'SparkStruct' = None, output_data_schema: 'SparkStruct' = None, label_column: str = None, labels: List[str] = None, predicted_target_field: str = None, prediction_field: str = None, transaction_id_field: str = None, dashboard_configuration: dict = None, feature_fields: List[str] = None, categorical_fields: List[str] = None, probability_fields: List[str] = None) -> None: """ Initialize a AssetPropertiesRequest object. :param TrainingDataReference training_data_reference: (optional) :param SparkStruct training_data_schema: (optional) :param SparkStruct input_data_schema: (optional) :param SparkStruct output_data_schema: (optional) :param str label_column: (optional) :param List[str] labels: (optional) :param str predicted_target_field: (optional) Field with this name will be given modeling_role `decoded-target`. :param str prediction_field: (optional) Field with this name will be given modeling_role `prediction`. :param str transaction_id_field: (optional) Field with this name will have `transaction_id_key` metadata set to true. :param dict dashboard_configuration: (optional) :param List[str] feature_fields: (optional) Fields to be given modeling_role feature. :param List[str] categorical_fields: (optional) Fields to be given metadata `measure` of value `discrete`. :param List[str] probability_fields: (optional) Fields to be given modeling_role `class_probability` (for columns of double data type) or `probability` (for column of array data type). """ self.training_data_reference = training_data_reference self.training_data_schema = training_data_schema self.input_data_schema = input_data_schema self.output_data_schema = output_data_schema self.label_column = label_column self.labels = labels self.predicted_target_field = predicted_target_field self.prediction_field = prediction_field self.transaction_id_field = transaction_id_field self.dashboard_configuration = dashboard_configuration self.feature_fields = feature_fields self.categorical_fields = categorical_fields self.probability_fields = probability_fields @classmethod def from_dict(cls, _dict: Dict) -> 'AssetPropertiesRequest': """Initialize a AssetPropertiesRequest object from a json dictionary.""" args = {} if 'training_data_reference' in _dict: args['training_data_reference'] = TrainingDataReference.from_dict(_dict.get('training_data_reference')) if 'training_data_schema' in _dict: args['training_data_schema'] = SparkStruct.from_dict(_dict.get('training_data_schema')) if 'input_data_schema' in _dict: args['input_data_schema'] = SparkStruct.from_dict(_dict.get('input_data_schema')) if 'output_data_schema' in _dict: args['output_data_schema'] = SparkStruct.from_dict(_dict.get('output_data_schema')) if 'label_column' in _dict: args['label_column'] = _dict.get('label_column') if 'labels' in _dict: args['labels'] = _dict.get('labels') if 'predicted_target_field' in _dict: args['predicted_target_field'] = _dict.get('predicted_target_field') if 'prediction_field' in _dict: args['prediction_field'] = _dict.get('prediction_field') if 'transaction_id_field' in _dict: args['transaction_id_field'] = _dict.get('transaction_id_field') if 'dashboard_configuration' in _dict: args['dashboard_configuration'] = _dict.get('dashboard_configuration') if 'feature_fields' in _dict: args['feature_fields'] = _dict.get('feature_fields') if 'categorical_fields' in _dict: args['categorical_fields'] = _dict.get('categorical_fields') if 'probability_fields' in _dict: args['probability_fields'] = _dict.get('probability_fields') return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a AssetPropertiesRequest object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'training_data_reference') and self.training_data_reference is not None: _dict['training_data_reference'] = self.training_data_reference.to_dict() if hasattr(self, 'training_data_schema') and self.training_data_schema is not None: _dict['training_data_schema'] = self.training_data_schema.to_dict() if hasattr(self, 'input_data_schema') and self.input_data_schema is not None: _dict['input_data_schema'] = self.input_data_schema.to_dict() if hasattr(self, 'output_data_schema') and self.output_data_schema is not None: _dict['output_data_schema'] = self.output_data_schema.to_dict() if hasattr(self, 'label_column') and self.label_column is not None: _dict['label_column'] = self.label_column if hasattr(self, 'labels') and self.labels is not None: _dict['labels'] = self.labels if hasattr(self, 'predicted_target_field') and self.predicted_target_field is not None: _dict['predicted_target_field'] = self.predicted_target_field if hasattr(self, 'prediction_field') and self.prediction_field is not None: _dict['prediction_field'] = self.prediction_field if hasattr(self, 'transaction_id_field') and self.transaction_id_field is not None: _dict['transaction_id_field'] = self.transaction_id_field if hasattr(self, 'dashboard_configuration') and self.dashboard_configuration is not None: _dict['dashboard_configuration'] = self.dashboard_configuration if hasattr(self, 'feature_fields') and self.feature_fields is not None: _dict['feature_fields'] = self.feature_fields if hasattr(self, 'categorical_fields') and self.categorical_fields is not None: _dict['categorical_fields'] = self.categorical_fields if hasattr(self, 'probability_fields') and self.probability_fields is not None: _dict['probability_fields'] = self.probability_fields return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this AssetPropertiesRequest object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'AssetPropertiesRequest') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'AssetPropertiesRequest') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other
class AzureWorkspaceCredentials(): """ AzureWorkspaceCredentials. :attr str workspace_id: :attr str token: """ def __init__(self, workspace_id: str, token: str) -> None: """ Initialize a AzureWorkspaceCredentials object. :param str workspace_id: :param str token: """ self.workspace_id = workspace_id self.token = token @classmethod def from_dict(cls, _dict: Dict) -> 'AzureWorkspaceCredentials': """Initialize a AzureWorkspaceCredentials object from a json dictionary.""" args = {} if 'workspace_id' in _dict: args['workspace_id'] = _dict.get('workspace_id') else: raise ValueError('Required property \'workspace_id\' not present in AzureWorkspaceCredentials JSON') if 'token' in _dict: args['token'] = _dict.get('token') else: raise ValueError('Required property \'token\' not present in AzureWorkspaceCredentials JSON') return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a AzureWorkspaceCredentials object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'workspace_id') and self.workspace_id is not None: _dict['workspace_id'] = self.workspace_id if hasattr(self, 'token') and self.token is not None: _dict['token'] = self.token return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this AzureWorkspaceCredentials object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'AzureWorkspaceCredentials') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'AzureWorkspaceCredentials') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class BusinessApplicationResponse(): """ BusinessApplicationResponse. :attr Metadata metadata: :attr BusinessApplicationResponseEntity entity: business application payload. """ def __init__(self, metadata: 'Metadata', entity: 'BusinessApplicationResponseEntity') -> None: """ Initialize a BusinessApplicationResponse object. :param Metadata metadata: :param BusinessApplicationResponseEntity entity: business application payload. """ self.metadata = metadata self.entity = entity @classmethod def from_dict(cls, _dict: Dict) -> 'BusinessApplicationResponse': """Initialize a BusinessApplicationResponse object from a json dictionary.""" args = {} if 'metadata' in _dict: args['metadata'] = Metadata.from_dict(_dict.get('metadata')) else: raise ValueError('Required property \'metadata\' not present in BusinessApplicationResponse JSON') if 'entity' in _dict: args['entity'] = BusinessApplicationResponseEntity.from_dict(_dict.get('entity')) else: raise ValueError('Required property \'entity\' not present in BusinessApplicationResponse JSON') return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a BusinessApplicationResponse object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'metadata') and self.metadata is not None: _dict['metadata'] = self.metadata.to_dict() if hasattr(self, 'entity') and self.entity is not None: _dict['entity'] = self.entity.to_dict() return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this BusinessApplicationResponse object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'BusinessApplicationResponse') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'BusinessApplicationResponse') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class BusinessApplicationResponseEntity(): """ business application payload. :attr str name: name fo the business application. :attr str description: description fo the business application. :attr List[PayloadField] payload_fields: :attr List[BusinessMetric] business_metrics: :attr List[str] subscription_ids: (optional) :attr str business_metrics_monitor_definition_id: (optional) :attr str business_metrics_monitor_instance_id: (optional) :attr str correlation_monitor_instance_id: (optional) :attr str business_payload_data_set_id: (optional) Unique identifier of the data set (like scoring, feedback or business payload). :attr str transaction_batches_data_set_id: (optional) Unique identifier of the data set (like scoring, feedback or business payload). :attr Status status: (optional) """ def __init__(self, name: str, description: str, payload_fields: List['PayloadField'], business_metrics: List['BusinessMetric'], *, subscription_ids: List[str] = None, business_metrics_monitor_definition_id: str = None, business_metrics_monitor_instance_id: str = None, correlation_monitor_instance_id: str = None, business_payload_data_set_id: str = None, transaction_batches_data_set_id: str = None, status: 'Status' = None) -> None: """ Initialize a BusinessApplicationResponseEntity object. :param str name: name fo the business application. :param str description: description fo the business application. :param List[PayloadField] payload_fields: :param List[BusinessMetric] business_metrics: :param List[str] subscription_ids: (optional) :param str business_metrics_monitor_definition_id: (optional) :param str business_metrics_monitor_instance_id: (optional) :param str correlation_monitor_instance_id: (optional) :param str business_payload_data_set_id: (optional) Unique identifier of the data set (like scoring, feedback or business payload). :param str transaction_batches_data_set_id: (optional) Unique identifier of the data set (like scoring, feedback or business payload). :param Status status: (optional) """ self.name = name self.description = description self.payload_fields = payload_fields self.business_metrics = business_metrics self.subscription_ids = subscription_ids self.business_metrics_monitor_definition_id = business_metrics_monitor_definition_id self.business_metrics_monitor_instance_id = business_metrics_monitor_instance_id self.correlation_monitor_instance_id = correlation_monitor_instance_id self.business_payload_data_set_id = business_payload_data_set_id self.transaction_batches_data_set_id = transaction_batches_data_set_id self.status = status @classmethod def from_dict(cls, _dict: Dict) -> 'BusinessApplicationResponseEntity': """Initialize a BusinessApplicationResponseEntity object from a json dictionary.""" args = {} if 'name' in _dict: args['name'] = _dict.get('name') else: raise ValueError('Required property \'name\' not present in BusinessApplicationResponseEntity JSON') if 'description' in _dict: args['description'] = _dict.get('description') else: raise ValueError('Required property \'description\' not present in BusinessApplicationResponseEntity JSON') if 'payload_fields' in _dict: args['payload_fields'] = [PayloadField.from_dict(x) for x in _dict.get('payload_fields')] else: raise ValueError('Required property \'payload_fields\' not present in BusinessApplicationResponseEntity JSON') if 'business_metrics' in _dict: args['business_metrics'] = [BusinessMetric.from_dict(x) for x in _dict.get('business_metrics')] else: raise ValueError('Required property \'business_metrics\' not present in BusinessApplicationResponseEntity JSON') if 'subscription_ids' in _dict: args['subscription_ids'] = _dict.get('subscription_ids') if 'business_metrics_monitor_definition_id' in _dict: args['business_metrics_monitor_definition_id'] = _dict.get('business_metrics_monitor_definition_id') if 'business_metrics_monitor_instance_id' in _dict: args['business_metrics_monitor_instance_id'] = _dict.get('business_metrics_monitor_instance_id') if 'correlation_monitor_instance_id' in _dict: args['correlation_monitor_instance_id'] = _dict.get('correlation_monitor_instance_id') if 'business_payload_data_set_id' in _dict: args['business_payload_data_set_id'] = _dict.get('business_payload_data_set_id') if 'transaction_batches_data_set_id' in _dict: args['transaction_batches_data_set_id'] = _dict.get('transaction_batches_data_set_id') if 'status' in _dict: args['status'] = Status.from_dict(_dict.get('status')) return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a BusinessApplicationResponseEntity object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'name') and self.name is not None: _dict['name'] = self.name if hasattr(self, 'description') and self.description is not None: _dict['description'] = self.description if hasattr(self, 'payload_fields') and self.payload_fields is not None: _dict['payload_fields'] = [x.to_dict() for x in self.payload_fields] if hasattr(self, 'business_metrics') and self.business_metrics is not None: _dict['business_metrics'] = [x.to_dict() for x in self.business_metrics] if hasattr(self, 'subscription_ids') and self.subscription_ids is not None: _dict['subscription_ids'] = self.subscription_ids if hasattr(self, 'business_metrics_monitor_definition_id') and self.business_metrics_monitor_definition_id is not None: _dict['business_metrics_monitor_definition_id'] = self.business_metrics_monitor_definition_id if hasattr(self, 'business_metrics_monitor_instance_id') and self.business_metrics_monitor_instance_id is not None: _dict['business_metrics_monitor_instance_id'] = self.business_metrics_monitor_instance_id if hasattr(self, 'correlation_monitor_instance_id') and self.correlation_monitor_instance_id is not None: _dict['correlation_monitor_instance_id'] = self.correlation_monitor_instance_id if hasattr(self, 'business_payload_data_set_id') and self.business_payload_data_set_id is not None: _dict['business_payload_data_set_id'] = self.business_payload_data_set_id if hasattr(self, 'transaction_batches_data_set_id') and self.transaction_batches_data_set_id is not None: _dict['transaction_batches_data_set_id'] = self.transaction_batches_data_set_id if hasattr(self, 'status') and self.status is not None: _dict['status'] = self.status.to_dict() return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this BusinessApplicationResponseEntity object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'BusinessApplicationResponseEntity') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'BusinessApplicationResponseEntity') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class BusinessApplicationsCollection(): """ BusinessApplicationsCollection. :attr List[BusinessApplicationResponse] business_applications: """ def __init__(self, business_applications: List['BusinessApplicationResponse']) -> None: """ Initialize a BusinessApplicationsCollection object. :param List[BusinessApplicationResponse] business_applications: """ self.business_applications = business_applications @classmethod def from_dict(cls, _dict: Dict) -> 'BusinessApplicationsCollection': """Initialize a BusinessApplicationsCollection object from a json dictionary.""" args = {} if 'business_applications' in _dict: args['business_applications'] = [BusinessApplicationResponse.from_dict(x) for x in _dict.get('business_applications')] else: raise ValueError('Required property \'business_applications\' not present in BusinessApplicationsCollection JSON') return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a BusinessApplicationsCollection object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'business_applications') and self.business_applications is not None: _dict['business_applications'] = [x.to_dict() for x in self.business_applications] return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this BusinessApplicationsCollection object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'BusinessApplicationsCollection') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'BusinessApplicationsCollection') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class BusinessMetric(): """ BusinessMetric. :attr str id: (optional) :attr str name: unique name used by UI instead of id (must be unique in scope of the monitor defition across both metrics and tags). :attr str description: (optional) Description of the metrics. :attr List[MetricThreshold] thresholds: (optional) :attr bool required: (optional) :attr str expected_direction: (optional) the indicator specifying the expected direction of the monotonic metric values. :attr CalculationMeta calculation_metadata: """ def __init__(self, name: str, calculation_metadata: 'CalculationMeta', *, id: str = None, description: str = None, thresholds: List['MetricThreshold'] = None, required: bool = None, expected_direction: str = None) -> None: """ Initialize a BusinessMetric object. :param str name: unique name used by UI instead of id (must be unique in scope of the monitor defition across both metrics and tags). :param CalculationMeta calculation_metadata: :param str id: (optional) :param str description: (optional) Description of the metrics. :param List[MetricThreshold] thresholds: (optional) :param bool required: (optional) :param str expected_direction: (optional) the indicator specifying the expected direction of the monotonic metric values. """ self.id = id self.name = name self.description = description self.thresholds = thresholds self.required = required self.expected_direction = expected_direction self.calculation_metadata = calculation_metadata @classmethod def from_dict(cls, _dict: Dict) -> 'BusinessMetric': """Initialize a BusinessMetric object from a json dictionary.""" args = {} if 'id' in _dict: args['id'] = _dict.get('id') if 'name' in _dict: args['name'] = _dict.get('name') else: raise ValueError('Required property \'name\' not present in BusinessMetric JSON') if 'description' in _dict: args['description'] = _dict.get('description') if 'thresholds' in _dict: args['thresholds'] = [MetricThreshold.from_dict(x) for x in _dict.get('thresholds')] if 'required' in _dict: args['required'] = _dict.get('required') if 'expected_direction' in _dict: args['expected_direction'] = _dict.get('expected_direction') if 'calculation_metadata' in _dict: args['calculation_metadata'] = CalculationMeta.from_dict(_dict.get('calculation_metadata')) else: raise ValueError('Required property \'calculation_metadata\' not present in BusinessMetric JSON') return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a BusinessMetric object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'id') and self.id is not None: _dict['id'] = self.id if hasattr(self, 'name') and self.name is not None: _dict['name'] = self.name if hasattr(self, 'description') and self.description is not None: _dict['description'] = self.description if hasattr(self, 'thresholds') and self.thresholds is not None: _dict['thresholds'] = [x.to_dict() for x in self.thresholds] if hasattr(self, 'required') and self.required is not None: _dict['required'] = self.required if hasattr(self, 'expected_direction') and self.expected_direction is not None: _dict['expected_direction'] = self.expected_direction if hasattr(self, 'calculation_metadata') and self.calculation_metadata is not None: _dict['calculation_metadata'] = self.calculation_metadata.to_dict() return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this BusinessMetric object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'BusinessMetric') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'BusinessMetric') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class ExpectedDirectionEnum(str, Enum): """ the indicator specifying the expected direction of the monotonic metric values. """ INCREASING = 'increasing' DECREASING = 'decreasing' UNKNOWN = 'unknown' class CalculationMeta(): """ CalculationMeta. :attr str field_name: :attr str aggregation: (optional) :attr TimeFrame time_frame: (optional) """ def __init__(self, field_name: str, *, aggregation: str = None, time_frame: 'TimeFrame' = None) -> None: """ Initialize a CalculationMeta object. :param str field_name: :param str aggregation: (optional) :param TimeFrame time_frame: (optional) """ self.field_name = field_name self.aggregation = aggregation self.time_frame = time_frame @classmethod def from_dict(cls, _dict: Dict) -> 'CalculationMeta': """Initialize a CalculationMeta object from a json dictionary.""" args = {} if 'field_name' in _dict: args['field_name'] = _dict.get('field_name') else: raise ValueError('Required property \'field_name\' not present in CalculationMeta JSON') if 'aggregation' in _dict: args['aggregation'] = _dict.get('aggregation') if 'time_frame' in _dict: args['time_frame'] = TimeFrame.from_dict(_dict.get('time_frame')) return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a CalculationMeta object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'field_name') and self.field_name is not None: _dict['field_name'] = self.field_name if hasattr(self, 'aggregation') and self.aggregation is not None: _dict['aggregation'] = self.aggregation if hasattr(self, 'time_frame') and self.time_frame is not None: _dict['time_frame'] = self.time_frame.to_dict() return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this CalculationMeta object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'CalculationMeta') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'CalculationMeta') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class AggregationEnum(str, Enum): """ aggregation. """ SUM = 'sum' MIN = 'min' MAX = 'max' AVG = 'avg' class CollectionUrlModel(): """ CollectionUrlModel. :attr str url: URI of a resource. """ def __init__(self, url: str) -> None: """ Initialize a CollectionUrlModel object. :param str url: URI of a resource. """ self.url = url @classmethod def from_dict(cls, _dict: Dict) -> 'CollectionUrlModel': """Initialize a CollectionUrlModel object from a json dictionary.""" args = {} if 'url' in _dict: args['url'] = _dict.get('url') else: raise ValueError('Required property \'url\' not present in CollectionUrlModel JSON') return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a CollectionUrlModel object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'url') and self.url is not None: _dict['url'] = self.url return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this CollectionUrlModel object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'CollectionUrlModel') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'CollectionUrlModel') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class DataDistributionResponse(): """ Data distribution details response. :attr Metadata metadata: :attr DataDistributionResponseEntity entity: The status information for the monitoring run. """ def __init__(self, metadata: 'Metadata', entity: 'DataDistributionResponseEntity') -> None: """ Initialize a DataDistributionResponse object. :param Metadata metadata: :param DataDistributionResponseEntity entity: The status information for the monitoring run. """ self.metadata = metadata self.entity = entity @classmethod def from_dict(cls, _dict: Dict) -> 'DataDistributionResponse': """Initialize a DataDistributionResponse object from a json dictionary.""" args = {} if 'metadata' in _dict: args['metadata'] = Metadata.from_dict(_dict.get('metadata')) else: raise ValueError('Required property \'metadata\' not present in DataDistributionResponse JSON') if 'entity' in _dict: args['entity'] = DataDistributionResponseEntity.from_dict(_dict.get('entity')) else: raise ValueError('Required property \'entity\' not present in DataDistributionResponse JSON') return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a DataDistributionResponse object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'metadata') and self.metadata is not None: _dict['metadata'] = self.metadata.to_dict() if hasattr(self, 'entity') and self.entity is not None: _dict['entity'] = self.entity.to_dict() return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this DataDistributionResponse object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'DataDistributionResponse') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'DataDistributionResponse') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class DataDistributionResponseEntity(): """ The status information for the monitoring run. :attr str start: start datetime in ISO format. :attr str end: end datetime in ISO format. :attr str dataset: type of a data set. :attr float limit: (optional) limit for number of rows, by default it is 50,000 (max possible limit is 50,000). :attr List[str] group: names of columns to be grouped. :attr str filter: (optional) Filters defined by user in format: {field_name}:{op}:{value}. Partly compatible with filters in "filter" parameter of GET /v2/data_sets/{data_set_id}/records. Possible filter operators: * eq - equals (numeric, string) * gt - greater than (numeric) * gte - greater than or equal (numeric) * lt - lower than (numeric) * lte - lower than or equal (numeric) * in - value in a set (numeric, string) * field:null (a no-argument filter) - value is null (any nullable) * field:exists (a no-argument filter) - value is not null (any column). :attr List[str] agg: (optional) Definition of aggregations, by default 'count'. Aggregations can be one of: * count * <column_name>:sum * <column_name>:min * <column_name>:max * <column_name>:avg * <column_name>:stddev. :attr float max_bins: (optional) max number of bins which will be generated for data. :attr DataDistributionStatus status: (optional) The status information for the monitoring run. :attr float processed_records: (optional) number of processed records. :attr bool limited_data: (optional) was the limit used on data. :attr DataDistributionResponseEntityDistribution distribution: (optional) """ def __init__(self, start: str, end: str, dataset: str, group: List[str], *, limit: float = None, filter: str = None, agg: List[str] = None, max_bins: float = None, status: 'DataDistributionStatus' = None, processed_records: float = None, limited_data: bool = None, distribution: 'DataDistributionResponseEntityDistribution' = None) -> None: """ Initialize a DataDistributionResponseEntity object. :param str start: start datetime in ISO format. :param str end: end datetime in ISO format. :param str dataset: type of a data set. :param List[str] group: names of columns to be grouped. :param float limit: (optional) limit for number of rows, by default it is 50,000 (max possible limit is 50,000). :param str filter: (optional) Filters defined by user in format: {field_name}:{op}:{value}. Partly compatible with filters in "filter" parameter of GET /v2/data_sets/{data_set_id}/records. Possible filter operators: * eq - equals (numeric, string) * gt - greater than (numeric) * gte - greater than or equal (numeric) * lt - lower than (numeric) * lte - lower than or equal (numeric) * in - value in a set (numeric, string) * field:null (a no-argument filter) - value is null (any nullable) * field:exists (a no-argument filter) - value is not null (any column). :param List[str] agg: (optional) Definition of aggregations, by default 'count'. Aggregations can be one of: * count * <column_name>:sum * <column_name>:min * <column_name>:max * <column_name>:avg * <column_name>:stddev. :param float max_bins: (optional) max number of bins which will be generated for data. :param DataDistributionStatus status: (optional) The status information for the monitoring run. :param float processed_records: (optional) number of processed records. :param bool limited_data: (optional) was the limit used on data. :param DataDistributionResponseEntityDistribution distribution: (optional) """ self.start = start self.end = end self.dataset = dataset self.limit = limit self.group = group self.filter = filter self.agg = agg self.max_bins = max_bins self.status = status self.processed_records = processed_records self.limited_data = limited_data self.distribution = distribution @classmethod def from_dict(cls, _dict: Dict) -> 'DataDistributionResponseEntity': """Initialize a DataDistributionResponseEntity object from a json dictionary.""" args = {} if 'start' in _dict: args['start'] = _dict.get('start') else: raise ValueError('Required property \'start\' not present in DataDistributionResponseEntity JSON') if 'end' in _dict: args['end'] = _dict.get('end') else: raise ValueError('Required property \'end\' not present in DataDistributionResponseEntity JSON') if 'dataset' in _dict: args['dataset'] = _dict.get('dataset') else: raise ValueError('Required property \'dataset\' not present in DataDistributionResponseEntity JSON') if 'limit' in _dict: args['limit'] = _dict.get('limit') if 'group' in _dict: args['group'] = _dict.get('group') else: raise ValueError('Required property \'group\' not present in DataDistributionResponseEntity JSON') if 'filter' in _dict: args['filter'] = _dict.get('filter') if 'agg' in _dict: args['agg'] = _dict.get('agg') if 'max_bins' in _dict: args['max_bins'] = _dict.get('max_bins') if 'status' in _dict: args['status'] = DataDistributionStatus.from_dict(_dict.get('status')) if 'processed_records' in _dict: args['processed_records'] = _dict.get('processed_records') if 'limited_data' in _dict: args['limited_data'] = _dict.get('limited_data') if 'distribution' in _dict: args['distribution'] = DataDistributionResponseEntityDistribution.from_dict(_dict.get('distribution')) return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a DataDistributionResponseEntity object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'start') and self.start is not None: _dict['start'] = self.start if hasattr(self, 'end') and self.end is not None: _dict['end'] = self.end if hasattr(self, 'dataset') and self.dataset is not None: _dict['dataset'] = self.dataset if hasattr(self, 'limit') and self.limit is not None: _dict['limit'] = self.limit if hasattr(self, 'group') and self.group is not None: _dict['group'] = self.group if hasattr(self, 'filter') and self.filter is not None: _dict['filter'] = self.filter if hasattr(self, 'agg') and self.agg is not None: _dict['agg'] = self.agg if hasattr(self, 'max_bins') and self.max_bins is not None: _dict['max_bins'] = self.max_bins if hasattr(self, 'status') and self.status is not None: _dict['status'] = self.status.to_dict() if hasattr(self, 'processed_records') and self.processed_records is not None: _dict['processed_records'] = self.processed_records if hasattr(self, 'limited_data') and self.limited_data is not None: _dict['limited_data'] = self.limited_data if hasattr(self, 'distribution') and self.distribution is not None: _dict['distribution'] = self.distribution.to_dict() return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this DataDistributionResponseEntity object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'DataDistributionResponseEntity') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'DataDistributionResponseEntity') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class DatasetEnum(str, Enum): """ type of a data set. """ MANUAL_LABELING = 'manual_labeling' PAYLOAD_LOGGING = 'payload_logging' FEEDBACK = 'feedback' BUSINESS_PAYLOAD = 'business_payload' EXPLANATIONS = 'explanations' EXPLANATIONS_WHATIF = 'explanations_whatif' TRAINING = 'training' CUSTOM = 'custom' class DataDistributionResponseEntityDistribution(): """ DataDistributionResponseEntityDistribution. :attr List[str] fields: names of the data distribution fields. :attr List[object] values: data distribution rows. """ def __init__(self, fields: List[str], values: List[object]) -> None: """ Initialize a DataDistributionResponseEntityDistribution object. :param List[str] fields: names of the data distribution fields. :param List[object] values: data distribution rows. """ self.fields = fields self.values = values @classmethod def from_dict(cls, _dict: Dict) -> 'DataDistributionResponseEntityDistribution': """Initialize a DataDistributionResponseEntityDistribution object from a json dictionary.""" args = {} if 'fields' in _dict: args['fields'] = _dict.get('fields') else: raise ValueError('Required property \'fields\' not present in DataDistributionResponseEntityDistribution JSON') if 'values' in _dict: args['values'] = _dict.get('values') else: raise ValueError('Required property \'values\' not present in DataDistributionResponseEntityDistribution JSON') return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a DataDistributionResponseEntityDistribution object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'fields') and self.fields is not None: _dict['fields'] = self.fields if hasattr(self, 'values') and self.values is not None: _dict['values'] = self.values return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this DataDistributionResponseEntityDistribution object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'DataDistributionResponseEntityDistribution') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'DataDistributionResponseEntityDistribution') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class DataDistributionStatus(): """ The status information for the monitoring run. :attr str state: (optional) :attr datetime started_at: (optional) The timestamp when the monitoring run was started running (in the format YYYY-MM-DDTHH:mm:ssZ or YYYY-MM-DDTHH:mm:ss.sssZ, matching the date-time format as specified by RFC 3339). :attr datetime updated_at: (optional) The timestamp when the monitoring run was last updated (in the format YYYY-MM-DDTHH:mm:ssZ or YYYY-MM-DDTHH:mm:ss.sssZ, matching the date-time format as specified by RFC 3339). :attr datetime completed_at: (optional) The timestamp when the monitoring run finished running (in the format YYYY-MM-DDTHH:mm:ssZ or YYYY-MM-DDTHH:mm:ss.sssZ, matching the date-time format as specified by RFC 3339). :attr GenericErrorResponse failure: (optional) """ def __init__(self, *, state: str = None, started_at: datetime = None, updated_at: datetime = None, completed_at: datetime = None, failure: 'GenericErrorResponse' = None) -> None: """ Initialize a DataDistributionStatus object. :param str state: (optional) :param datetime started_at: (optional) The timestamp when the monitoring run was started running (in the format YYYY-MM-DDTHH:mm:ssZ or YYYY-MM-DDTHH:mm:ss.sssZ, matching the date-time format as specified by RFC 3339). :param datetime updated_at: (optional) The timestamp when the monitoring run was last updated (in the format YYYY-MM-DDTHH:mm:ssZ or YYYY-MM-DDTHH:mm:ss.sssZ, matching the date-time format as specified by RFC 3339). :param datetime completed_at: (optional) The timestamp when the monitoring run finished running (in the format YYYY-MM-DDTHH:mm:ssZ or YYYY-MM-DDTHH:mm:ss.sssZ, matching the date-time format as specified by RFC 3339). :param GenericErrorResponse failure: (optional) """ self.state = state self.started_at = started_at self.updated_at = updated_at self.completed_at = completed_at self.failure = failure @classmethod def from_dict(cls, _dict: Dict) -> 'DataDistributionStatus': """Initialize a DataDistributionStatus object from a json dictionary.""" args = {} if 'state' in _dict: args['state'] = _dict.get('state') if 'started_at' in _dict: args['started_at'] = string_to_datetime(_dict.get('started_at')) if 'updated_at' in _dict: args['updated_at'] = string_to_datetime(_dict.get('updated_at')) if 'completed_at' in _dict: args['completed_at'] = string_to_datetime(_dict.get('completed_at')) if 'failure' in _dict: args['failure'] = GenericErrorResponse.from_dict(_dict.get('failure')) return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a DataDistributionStatus object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'state') and self.state is not None: _dict['state'] = self.state if hasattr(self, 'started_at') and self.started_at is not None: _dict['started_at'] = datetime_to_string(self.started_at) if hasattr(self, 'updated_at') and self.updated_at is not None: _dict['updated_at'] = datetime_to_string(self.updated_at) if hasattr(self, 'completed_at') and self.completed_at is not None: _dict['completed_at'] = datetime_to_string(self.completed_at) if hasattr(self, 'failure') and self.failure is not None: _dict['failure'] = self.failure.to_dict() return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this DataDistributionStatus object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'DataDistributionStatus') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'DataDistributionStatus') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class StateEnum(str, Enum): """ state. """ INITIALIZING = 'initializing' RUNNING = 'running' COMPLETED = 'completed' FAILED = 'failed' class DataMartDatabaseResponse(): """ DataMartDatabaseResponse. :attr Metadata metadata: :attr DataMartDatabaseResponseEntity entity: """ def __init__(self, metadata: 'Metadata', entity: 'DataMartDatabaseResponseEntity') -> None: """ Initialize a DataMartDatabaseResponse object. :param Metadata metadata: :param DataMartDatabaseResponseEntity entity: """ self.metadata = metadata self.entity = entity @classmethod def from_dict(cls, _dict: Dict) -> 'DataMartDatabaseResponse': """Initialize a DataMartDatabaseResponse object from a json dictionary.""" args = {} if 'metadata' in _dict: args['metadata'] = Metadata.from_dict(_dict.get('metadata')) else: raise ValueError('Required property \'metadata\' not present in DataMartDatabaseResponse JSON') if 'entity' in _dict: args['entity'] = DataMartDatabaseResponseEntity.from_dict(_dict.get('entity')) else: raise ValueError('Required property \'entity\' not present in DataMartDatabaseResponse JSON') return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a DataMartDatabaseResponse object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'metadata') and self.metadata is not None: _dict['metadata'] = self.metadata.to_dict() if hasattr(self, 'entity') and self.entity is not None: _dict['entity'] = self.entity.to_dict() return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this DataMartDatabaseResponse object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'DataMartDatabaseResponse') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'DataMartDatabaseResponse') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class DataMartDatabaseResponseCollection(): """ DataMartDatabaseResponseCollection. :attr List[DataMartDatabaseResponse] data_marts: """ def __init__(self, data_marts: List['DataMartDatabaseResponse']) -> None: """ Initialize a DataMartDatabaseResponseCollection object. :param List[DataMartDatabaseResponse] data_marts: """ self.data_marts = data_marts @classmethod def from_dict(cls, _dict: Dict) -> 'DataMartDatabaseResponseCollection': """Initialize a DataMartDatabaseResponseCollection object from a json dictionary.""" args = {} if 'data_marts' in _dict: args['data_marts'] = [DataMartDatabaseResponse.from_dict(x) for x in _dict.get('data_marts')] else: raise ValueError('Required property \'data_marts\' not present in DataMartDatabaseResponseCollection JSON') return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a DataMartDatabaseResponseCollection object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'data_marts') and self.data_marts is not None: _dict['data_marts'] = [x.to_dict() for x in self.data_marts] return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this DataMartDatabaseResponseCollection object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'DataMartDatabaseResponseCollection') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'DataMartDatabaseResponseCollection') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class DataMartDatabaseResponseEntity(): """ DataMartDatabaseResponseEntity. :attr str name: (optional) :attr str description: (optional) :attr str service_instance_crn: (optional) :attr bool internal_database: (optional) If `true` the internal database managed by AI OpenScale is provided for the user. :attr DatabaseConfiguration database_configuration: (optional) Database configuration ignored if internal database is requested (`internal_database` is `true`). :attr str database_discovery: (optional) Used by UI to check if database discovery was automatic or manual. :attr Status status: (optional) """ def __init__(self, *, name: str = None, description: str = None, service_instance_crn: str = None, internal_database: bool = None, database_configuration: 'DatabaseConfiguration' = None, database_discovery: str = None, status: 'Status' = None) -> None: """ Initialize a DataMartDatabaseResponseEntity object. :param str name: (optional) :param str description: (optional) :param str service_instance_crn: (optional) :param bool internal_database: (optional) If `true` the internal database managed by AI OpenScale is provided for the user. :param DatabaseConfiguration database_configuration: (optional) Database configuration ignored if internal database is requested (`internal_database` is `true`). :param str database_discovery: (optional) Used by UI to check if database discovery was automatic or manual. :param Status status: (optional) """ self.name = name self.description = description self.service_instance_crn = service_instance_crn self.internal_database = internal_database self.database_configuration = database_configuration self.database_discovery = database_discovery self.status = status @classmethod def from_dict(cls, _dict: Dict) -> 'DataMartDatabaseResponseEntity': """Initialize a DataMartDatabaseResponseEntity object from a json dictionary.""" args = {} if 'name' in _dict: args['name'] = _dict.get('name') if 'description' in _dict: args['description'] = _dict.get('description') if 'service_instance_crn' in _dict: args['service_instance_crn'] = _dict.get('service_instance_crn') if 'internal_database' in _dict: args['internal_database'] = _dict.get('internal_database') if 'database_configuration' in _dict: args['database_configuration'] = DatabaseConfiguration.from_dict(_dict.get('database_configuration')) if 'database_discovery' in _dict: args['database_discovery'] = _dict.get('database_discovery') if 'status' in _dict: args['status'] = Status.from_dict(_dict.get('status')) return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a DataMartDatabaseResponseEntity object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'name') and self.name is not None: _dict['name'] = self.name if hasattr(self, 'description') and self.description is not None: _dict['description'] = self.description if hasattr(self, 'service_instance_crn') and self.service_instance_crn is not None: _dict['service_instance_crn'] = self.service_instance_crn if hasattr(self, 'internal_database') and self.internal_database is not None: _dict['internal_database'] = self.internal_database if hasattr(self, 'database_configuration') and self.database_configuration is not None: _dict['database_configuration'] = self.database_configuration.to_dict() if hasattr(self, 'database_discovery') and self.database_discovery is not None: _dict['database_discovery'] = self.database_discovery if hasattr(self, 'status') and self.status is not None: _dict['status'] = self.status.to_dict() return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this DataMartDatabaseResponseEntity object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'DataMartDatabaseResponseEntity') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'DataMartDatabaseResponseEntity') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class DatabaseDiscoveryEnum(str, Enum): """ Used by UI to check if database discovery was automatic or manual. """ AUTOMATIC = 'automatic' MANUAL = 'manual' class DataMartGetMonitorInstanceMetrics(): """ DataMartGetMonitorInstanceMetrics. :attr datetime start: (optional) Floored to full interval. :attr datetime end: (optional) Ceiled to full interval. :attr str interval: (optional) :attr str monitor_definition_id: (optional) :attr List[DataMartGetMonitorInstanceMetricsGroupsItem] groups: (optional) """ def __init__(self, *, start: datetime = None, end: datetime = None, interval: str = None, monitor_definition_id: str = None, groups: List['DataMartGetMonitorInstanceMetricsGroupsItem'] = None) -> None: """ Initialize a DataMartGetMonitorInstanceMetrics object. :param datetime start: (optional) Floored to full interval. :param datetime end: (optional) Ceiled to full interval. :param str interval: (optional) :param str monitor_definition_id: (optional) :param List[DataMartGetMonitorInstanceMetricsGroupsItem] groups: (optional) """ self.start = start self.end = end self.interval = interval self.monitor_definition_id = monitor_definition_id self.groups = groups @classmethod def from_dict(cls, _dict: Dict) -> 'DataMartGetMonitorInstanceMetrics': """Initialize a DataMartGetMonitorInstanceMetrics object from a json dictionary.""" args = {} if 'start' in _dict: args['start'] = string_to_datetime(_dict.get('start')) if 'end' in _dict: args['end'] = string_to_datetime(_dict.get('end')) if 'interval' in _dict: args['interval'] = _dict.get('interval') if 'monitor_definition_id' in _dict: args['monitor_definition_id'] = _dict.get('monitor_definition_id') if 'groups' in _dict: args['groups'] = [DataMartGetMonitorInstanceMetricsGroupsItem.from_dict(x) for x in _dict.get('groups')] return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a DataMartGetMonitorInstanceMetrics object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'start') and self.start is not None: _dict['start'] = datetime_to_string(self.start) if hasattr(self, 'end') and self.end is not None: _dict['end'] = datetime_to_string(self.end) if hasattr(self, 'interval') and self.interval is not None: _dict['interval'] = self.interval if hasattr(self, 'monitor_definition_id') and self.monitor_definition_id is not None: _dict['monitor_definition_id'] = self.monitor_definition_id if hasattr(self, 'groups') and self.groups is not None: _dict['groups'] = [x.to_dict() for x in self.groups] return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this DataMartGetMonitorInstanceMetrics object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'DataMartGetMonitorInstanceMetrics') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'DataMartGetMonitorInstanceMetrics') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class DataMartGetMonitorInstanceMetricsGroupsItem(): """ DataMartGetMonitorInstanceMetricsGroupsItem. :attr List[DataMartGetMonitorInstanceMetricsGroupsItemTagsItem] tags: (optional) :attr List[DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem] metrics: (optional) """ def __init__(self, *, tags: List['DataMartGetMonitorInstanceMetricsGroupsItemTagsItem'] = None, metrics: List['DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem'] = None) -> None: """ Initialize a DataMartGetMonitorInstanceMetricsGroupsItem object. :param List[DataMartGetMonitorInstanceMetricsGroupsItemTagsItem] tags: (optional) :param List[DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem] metrics: (optional) """ self.tags = tags self.metrics = metrics @classmethod def from_dict(cls, _dict: Dict) -> 'DataMartGetMonitorInstanceMetricsGroupsItem': """Initialize a DataMartGetMonitorInstanceMetricsGroupsItem object from a json dictionary.""" args = {} if 'tags' in _dict: args['tags'] = [DataMartGetMonitorInstanceMetricsGroupsItemTagsItem.from_dict(x) for x in _dict.get('tags')] if 'metrics' in _dict: args['metrics'] = [DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem.from_dict(x) for x in _dict.get('metrics')] return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a DataMartGetMonitorInstanceMetricsGroupsItem object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'tags') and self.tags is not None: _dict['tags'] = [x.to_dict() for x in self.tags] if hasattr(self, 'metrics') and self.metrics is not None: _dict['metrics'] = [x.to_dict() for x in self.metrics] return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this DataMartGetMonitorInstanceMetricsGroupsItem object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'DataMartGetMonitorInstanceMetricsGroupsItem') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'DataMartGetMonitorInstanceMetricsGroupsItem') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem(): """ DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem. :attr str id: :attr float lower_limit: (optional) :attr float upper_limit: (optional) :attr DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast last: (optional) """ def __init__(self, id: str, *, lower_limit: float = None, upper_limit: float = None, last: 'DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast' = None) -> None: """ Initialize a DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem object. :param str id: :param float lower_limit: (optional) :param float upper_limit: (optional) :param DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast last: (optional) """ self.id = id self.lower_limit = lower_limit self.upper_limit = upper_limit self.last = last @classmethod def from_dict(cls, _dict: Dict) -> 'DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem': """Initialize a DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem object from a json dictionary.""" args = {} if 'id' in _dict: args['id'] = _dict.get('id') else: raise ValueError('Required property \'id\' not present in DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem JSON') if 'lower_limit' in _dict: args['lower_limit'] = _dict.get('lower_limit') if 'upper_limit' in _dict: args['upper_limit'] = _dict.get('upper_limit') if 'last' in _dict: args['last'] = DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast.from_dict(_dict.get('last')) return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'id') and self.id is not None: _dict['id'] = self.id if hasattr(self, 'lower_limit') and self.lower_limit is not None: _dict['lower_limit'] = self.lower_limit if hasattr(self, 'upper_limit') and self.upper_limit is not None: _dict['upper_limit'] = self.upper_limit if hasattr(self, 'last') and self.last is not None: _dict['last'] = self.last.to_dict() return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'DataMartGetMonitorInstanceMetricsGroupsItemMetricsItem') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast(): """ DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast. :attr List[float] value: (optional) :attr List[str] measurement_id: (optional) """ def __init__(self, *, value: List[float] = None, measurement_id: List[str] = None) -> None: """ Initialize a DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast object. :param List[float] value: (optional) :param List[str] measurement_id: (optional) """ self.value = value self.measurement_id = measurement_id @classmethod def from_dict(cls, _dict: Dict) -> 'DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast': """Initialize a DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast object from a json dictionary.""" args = {} if 'value' in _dict: args['value'] = _dict.get('value') if 'measurement_id' in _dict: args['measurement_id'] = _dict.get('measurement_id') return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'value') and self.value is not None: _dict['value'] = self.value if hasattr(self, 'measurement_id') and self.measurement_id is not None: _dict['measurement_id'] = self.measurement_id return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'DataMartGetMonitorInstanceMetricsGroupsItemMetricsItemLast') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class DataMartGetMonitorInstanceMetricsGroupsItemTagsItem(): """ DataMartGetMonitorInstanceMetricsGroupsItemTagsItem. :attr str id: :attr str value: """ def __init__(self, id: str, value: str) -> None: """ Initialize a DataMartGetMonitorInstanceMetricsGroupsItemTagsItem object. :param str id: :param str value: """ self.id = id self.value = value @classmethod def from_dict(cls, _dict: Dict) -> 'DataMartGetMonitorInstanceMetricsGroupsItemTagsItem': """Initialize a DataMartGetMonitorInstanceMetricsGroupsItemTagsItem object from a json dictionary.""" args = {} if 'id' in _dict: args['id'] = _dict.get('id') else: raise ValueError('Required property \'id\' not present in DataMartGetMonitorInstanceMetricsGroupsItemTagsItem JSON') if 'value' in _dict: args['value'] = _dict.get('value') else: raise ValueError('Required property \'value\' not present in DataMartGetMonitorInstanceMetricsGroupsItemTagsItem JSON') return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a DataMartGetMonitorInstanceMetricsGroupsItemTagsItem object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'id') and self.id is not None: _dict['id'] = self.id if hasattr(self, 'value') and self.value is not None: _dict['value'] = self.value return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this DataMartGetMonitorInstanceMetricsGroupsItemTagsItem object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'DataMartGetMonitorInstanceMetricsGroupsItemTagsItem') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'DataMartGetMonitorInstanceMetricsGroupsItemTagsItem') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class DataRecord(): """ DataRecord. :attr object values: Fields and values of the entity matches JSON object's fields and values. :attr dict annotations: (optional) Any JSON object representing annotations. """ def __init__(self, values: object, *, annotations: dict = None) -> None: """ Initialize a DataRecord object. :param object values: Fields and values of the entity matches JSON object's fields and values. :param dict annotations: (optional) Any JSON object representing annotations. """ self.values = values self.annotations = annotations @classmethod def from_dict(cls, _dict: Dict) -> 'DataRecord': """Initialize a DataRecord object from a json dictionary.""" args = {} if 'values' in _dict: args['values'] = _dict.get('values') else: raise ValueError('Required property \'values\' not present in DataRecord JSON') if 'annotations' in _dict: args['annotations'] = _dict.get('annotations') return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a DataRecord object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'values') and self.values is not None: _dict['values'] = self.values if hasattr(self, 'annotations') and self.annotations is not None: _dict['annotations'] = self.annotations return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this DataRecord object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'DataRecord') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'DataRecord') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class DataRecordResponse(): """ DataRecordResponse. :attr Metadata metadata: :attr DataRecord entity: """ def __init__(self, metadata: 'Metadata', entity: 'DataRecord') -> None: """ Initialize a DataRecordResponse object. :param Metadata metadata: :param DataRecord entity: """ self.metadata = metadata self.entity = entity @classmethod def from_dict(cls, _dict: Dict) -> 'DataRecordResponse': """Initialize a DataRecordResponse object from a json dictionary.""" args = {} if 'metadata' in _dict: args['metadata'] = Metadata.from_dict(_dict.get('metadata')) else: raise ValueError('Required property \'metadata\' not present in DataRecordResponse JSON') if 'entity' in _dict: args['entity'] = DataRecord.from_dict(_dict.get('entity')) else: raise ValueError('Required property \'entity\' not present in DataRecordResponse JSON') return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a DataRecordResponse object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'metadata') and self.metadata is not None: _dict['metadata'] = self.metadata.to_dict() if hasattr(self, 'entity') and self.entity is not None: _dict['entity'] = self.entity.to_dict() return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this DataRecordResponse object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'DataRecordResponse') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'DataRecordResponse') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class DataRecordResponseList(): """ DataRecordResponseList. :attr List[str] fields: Fields' names are listed in order in 'fields'. :attr List[List[object]] values: Rows organized per fields as described in 'fields'. :attr List[dict] annotations: (optional) List of annotations objects. """ def __init__(self, fields: List[str], values: List[List[object]], *, annotations: List[dict] = None) -> None: """ Initialize a DataRecordResponseList object. :param List[str] fields: Fields' names are listed in order in 'fields'. :param List[List[object]] values: Rows organized per fields as described in 'fields'. :param List[dict] annotations: (optional) List of annotations objects. """ self.fields = fields self.values = values self.annotations = annotations @classmethod def from_dict(cls, _dict: Dict) -> 'DataRecordResponseList': """Initialize a DataRecordResponseList object from a json dictionary.""" args = {} if 'fields' in _dict: args['fields'] = _dict.get('fields') else: raise ValueError('Required property \'fields\' not present in DataRecordResponseList JSON') if 'values' in _dict: args['values'] = _dict.get('values') else: raise ValueError('Required property \'values\' not present in DataRecordResponseList JSON') if 'annotations' in _dict: args['annotations'] = _dict.get('annotations') return cls(**args) @classmethod def _from_dict(cls, _dict): """Initialize a DataRecordResponseList object from a json dictionary.""" return cls.from_dict(_dict) def to_dict(self) -> Dict: """Return a json dictionary representing this model.""" _dict = {} if hasattr(self, 'fields') and self.fields is not None: _dict['fields'] = self.fields if hasattr(self, 'values') and self.values is not None: _dict['values'] = self.values if hasattr(self, 'annotations') and self.annotations is not None: _dict['annotations'] = self.annotations return _dict def _to_dict(self): """Return a json dictionary representing this model.""" return self.to_dict() def __str__(self) -> str: """Return a `str` version of this DataRecordResponseList object.""" return json.dumps(self.to_dict(), indent=2) def __eq__(self, other: 'DataRecordResponseList') -> bool: """Return `true` when self and other are equal, false otherwise.""" if not isinstance(other, self.__class__): return False return self.__dict__ == other.__dict__ def __ne__(self, other: 'DataRecordResponseList') -> bool: """Return `true` when self and other are not equal, false otherwise.""" return not self == other class DataSetObject(): """ DataSetObject. :attr str data_mart_id: :attr str name: :attr str description: (optional) :attr str type: type of a data set. :attr Target target: :attr str schema_update_mode: (optional) :attr SparkStruct data_schema: :attr LocationTableName location: (optional) :attr str managed_by: (optional) :attr Status status: """ def __init__(self, data_mart_id: str, name: str, type: str, target: 'Target', data_schema: 'SparkStruct', status: 'Status', *, description: str = None, schema_update_mode: str = None, location: 'LocationTableName' = None, managed_by: str = None) -> None: """ Initialize a DataSetObject object. :param str data_mart_id: :param str name: :param str type: type of a data set. :param Target target: :param SparkStruct data_schema: :param Status status: :param str description: (optional) :param str schema_update_mode: (optional) :param LocationTableName location: (optional) :param str managed_by: (optional) """ self.data_mart_id = data_mart_id self.name = name self.description = description self.type = type self.target = target self.schema_update_mode = schema_update_mode self.data_schema = data_schema self.location = location self.managed_by = managed_by self.status = status @classmethod def from_dict(cls, _dict: Dict) -> 'DataSetObject': """Initialize a DataSetObject object from a json dictionary.""" args = {} if 'data_mart_id' in _dict: args['data_mart_id'] = _dict.get('data_mart_id') else: raise ValueError('Required property \'data_mart_id\' not present in DataSetObject JSON') if 'name' in _dict: args['name'] = _dict.get('name') else: raise ValueError('Required property \'name\' not present in DataSetObject JSON'