# coding: utf-8
"""
LUSID API
FINBOURNE Technology # noqa: E501
Contact: info@finbourne.com
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
"""
import re # noqa: F401
import io
import warnings
from pydantic.v1 import validate_arguments, ValidationError
from typing import overload, Optional, Union, Awaitable
from typing_extensions import Annotated
from datetime import datetime
from pydantic.v1 import Field, constr, validator
from typing import Optional
from lusid.models.annul_single_structured_data_response import AnnulSingleStructuredDataResponse
from lusid.models.get_cds_flow_conventions_response import GetCdsFlowConventionsResponse
from lusid.models.get_flow_conventions_response import GetFlowConventionsResponse
from lusid.models.get_index_convention_response import GetIndexConventionResponse
from lusid.models.resource_list_of_get_cds_flow_conventions_response import ResourceListOfGetCdsFlowConventionsResponse
from lusid.models.resource_list_of_get_flow_conventions_response import ResourceListOfGetFlowConventionsResponse
from lusid.models.resource_list_of_get_index_convention_response import ResourceListOfGetIndexConventionResponse
from lusid.models.upsert_cds_flow_conventions_request import UpsertCdsFlowConventionsRequest
from lusid.models.upsert_flow_conventions_request import UpsertFlowConventionsRequest
from lusid.models.upsert_index_convention_request import UpsertIndexConventionRequest
from lusid.models.upsert_single_structured_data_response import UpsertSingleStructuredDataResponse
from lusid.api_client import ApiClient
from lusid.api_response import ApiResponse
from lusid.exceptions import ( # noqa: F401
ApiTypeError,
ApiValueError
)
[docs]
class ConventionsApi:
"""NOTE: This class is auto generated by OpenAPI Generator
Ref: https://openapi-generator.tech
Do not edit the class manually.
"""
def __init__(self, api_client=None) -> None:
if api_client is None:
api_client = ApiClient.get_default()
self.api_client = api_client
@overload
async def delete_cds_flow_conventions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the CDS Flow Conventions to delete.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The CDS Flow Conventions to delete.")], **kwargs) -> AnnulSingleStructuredDataResponse: # noqa: E501
...
@overload
def delete_cds_flow_conventions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the CDS Flow Conventions to delete.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The CDS Flow Conventions to delete.")], async_req: Optional[bool]=True, **kwargs) -> AnnulSingleStructuredDataResponse: # noqa: E501
...
[docs]
@validate_arguments
def delete_cds_flow_conventions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the CDS Flow Conventions to delete.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The CDS Flow Conventions to delete.")], async_req: Optional[bool]=None, **kwargs) -> Union[AnnulSingleStructuredDataResponse, Awaitable[AnnulSingleStructuredDataResponse]]: # noqa: E501
"""[BETA] DeleteCdsFlowConventions: Delete the CDS Flow Conventions of given scope and code, assuming that it is present. # noqa: E501
Delete the specified CDS Flow Conventions from a single scope. The response will return either detail of the deleted item, or an explanation (failure) as to why this did not succeed. It is important to always check for any unsuccessful response. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.delete_cds_flow_conventions(scope, code, async_req=True)
>>> result = thread.get()
:param scope: The scope of the CDS Flow Conventions to delete. (required)
:type scope: str
:param code: The CDS Flow Conventions to delete. (required)
:type code: str
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: AnnulSingleStructuredDataResponse
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the delete_cds_flow_conventions_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.delete_cds_flow_conventions_with_http_info(scope, code, **kwargs) # noqa: E501
[docs]
@validate_arguments
def delete_cds_flow_conventions_with_http_info(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the CDS Flow Conventions to delete.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The CDS Flow Conventions to delete.")], **kwargs) -> ApiResponse: # noqa: E501
"""[BETA] DeleteCdsFlowConventions: Delete the CDS Flow Conventions of given scope and code, assuming that it is present. # noqa: E501
Delete the specified CDS Flow Conventions from a single scope. The response will return either detail of the deleted item, or an explanation (failure) as to why this did not succeed. It is important to always check for any unsuccessful response. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.delete_cds_flow_conventions_with_http_info(scope, code, async_req=True)
>>> result = thread.get()
:param scope: The scope of the CDS Flow Conventions to delete. (required)
:type scope: str
:param code: The CDS Flow Conventions to delete. (required)
:type code: str
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(AnnulSingleStructuredDataResponse, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'scope',
'code'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method delete_cds_flow_conventions" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
if _params['scope']:
_path_params['scope'] = _params['scope']
if _params['code']:
_path_params['code'] = _params['code']
# process the query parameters
_query_params = []
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "AnnulSingleStructuredDataResponse",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/conventions/credit/conventions/{scope}/{code}', 'DELETE',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def delete_flow_conventions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the Flow Conventions to delete.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The Flow Conventions to delete.")], **kwargs) -> AnnulSingleStructuredDataResponse: # noqa: E501
...
@overload
def delete_flow_conventions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the Flow Conventions to delete.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The Flow Conventions to delete.")], async_req: Optional[bool]=True, **kwargs) -> AnnulSingleStructuredDataResponse: # noqa: E501
...
[docs]
@validate_arguments
def delete_flow_conventions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the Flow Conventions to delete.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The Flow Conventions to delete.")], async_req: Optional[bool]=None, **kwargs) -> Union[AnnulSingleStructuredDataResponse, Awaitable[AnnulSingleStructuredDataResponse]]: # noqa: E501
"""[BETA] DeleteFlowConventions: Delete the Flow Conventions of given scope and code, assuming that it is present. # noqa: E501
Delete the specified conventions from a single scope. The response will return either detail of the deleted item, or an explanation (failure) as to why this did not succeed. It is important to always check for any unsuccessful response. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.delete_flow_conventions(scope, code, async_req=True)
>>> result = thread.get()
:param scope: The scope of the Flow Conventions to delete. (required)
:type scope: str
:param code: The Flow Conventions to delete. (required)
:type code: str
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: AnnulSingleStructuredDataResponse
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the delete_flow_conventions_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.delete_flow_conventions_with_http_info(scope, code, **kwargs) # noqa: E501
[docs]
@validate_arguments
def delete_flow_conventions_with_http_info(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the Flow Conventions to delete.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The Flow Conventions to delete.")], **kwargs) -> ApiResponse: # noqa: E501
"""[BETA] DeleteFlowConventions: Delete the Flow Conventions of given scope and code, assuming that it is present. # noqa: E501
Delete the specified conventions from a single scope. The response will return either detail of the deleted item, or an explanation (failure) as to why this did not succeed. It is important to always check for any unsuccessful response. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.delete_flow_conventions_with_http_info(scope, code, async_req=True)
>>> result = thread.get()
:param scope: The scope of the Flow Conventions to delete. (required)
:type scope: str
:param code: The Flow Conventions to delete. (required)
:type code: str
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(AnnulSingleStructuredDataResponse, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'scope',
'code'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method delete_flow_conventions" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
if _params['scope']:
_path_params['scope'] = _params['scope']
if _params['code']:
_path_params['code'] = _params['code']
# process the query parameters
_query_params = []
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "AnnulSingleStructuredDataResponse",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/conventions/rates/flowconventions/{scope}/{code}', 'DELETE',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def delete_index_convention(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the Index Convention to delete.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The Index Convention to delete.")], **kwargs) -> AnnulSingleStructuredDataResponse: # noqa: E501
...
@overload
def delete_index_convention(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the Index Convention to delete.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The Index Convention to delete.")], async_req: Optional[bool]=True, **kwargs) -> AnnulSingleStructuredDataResponse: # noqa: E501
...
[docs]
@validate_arguments
def delete_index_convention(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the Index Convention to delete.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The Index Convention to delete.")], async_req: Optional[bool]=None, **kwargs) -> Union[AnnulSingleStructuredDataResponse, Awaitable[AnnulSingleStructuredDataResponse]]: # noqa: E501
"""[BETA] DeleteIndexConvention: Delete the Index Convention of given scope and code, assuming that it is present. # noqa: E501
Delete the specified Index Convention from a single scope. The response will return either detail of the deleted item, or an explanation (failure) as to why this did not succeed. It is important to always check for any unsuccessful response. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.delete_index_convention(scope, code, async_req=True)
>>> result = thread.get()
:param scope: The scope of the Index Convention to delete. (required)
:type scope: str
:param code: The Index Convention to delete. (required)
:type code: str
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: AnnulSingleStructuredDataResponse
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the delete_index_convention_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.delete_index_convention_with_http_info(scope, code, **kwargs) # noqa: E501
[docs]
@validate_arguments
def delete_index_convention_with_http_info(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the Index Convention to delete.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The Index Convention to delete.")], **kwargs) -> ApiResponse: # noqa: E501
"""[BETA] DeleteIndexConvention: Delete the Index Convention of given scope and code, assuming that it is present. # noqa: E501
Delete the specified Index Convention from a single scope. The response will return either detail of the deleted item, or an explanation (failure) as to why this did not succeed. It is important to always check for any unsuccessful response. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.delete_index_convention_with_http_info(scope, code, async_req=True)
>>> result = thread.get()
:param scope: The scope of the Index Convention to delete. (required)
:type scope: str
:param code: The Index Convention to delete. (required)
:type code: str
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(AnnulSingleStructuredDataResponse, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'scope',
'code'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method delete_index_convention" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
if _params['scope']:
_path_params['scope'] = _params['scope']
if _params['code']:
_path_params['code'] = _params['code']
# process the query parameters
_query_params = []
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "AnnulSingleStructuredDataResponse",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/conventions/rates/indexconventions/{scope}/{code}', 'DELETE',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def get_cds_flow_conventions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the CDS Flow Conventions to retrieve.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The name of the CDS Flow Conventions to retrieve the data for.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the CDS Flow Conventions. Defaults to return the latest version if not specified.")] = None, **kwargs) -> GetCdsFlowConventionsResponse: # noqa: E501
...
@overload
def get_cds_flow_conventions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the CDS Flow Conventions to retrieve.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The name of the CDS Flow Conventions to retrieve the data for.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the CDS Flow Conventions. Defaults to return the latest version if not specified.")] = None, async_req: Optional[bool]=True, **kwargs) -> GetCdsFlowConventionsResponse: # noqa: E501
...
[docs]
@validate_arguments
def get_cds_flow_conventions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the CDS Flow Conventions to retrieve.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The name of the CDS Flow Conventions to retrieve the data for.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the CDS Flow Conventions. Defaults to return the latest version if not specified.")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[GetCdsFlowConventionsResponse, Awaitable[GetCdsFlowConventionsResponse]]: # noqa: E501
"""[BETA] GetCdsFlowConventions: Get CDS Flow Conventions # noqa: E501
Get a CDS Flow Conventions from a single scope. The response will return either the conventions that has been stored, or a failure explaining why the request was unsuccessful. It is important to always check for any unsuccessful requests (failures). # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.get_cds_flow_conventions(scope, code, as_at, async_req=True)
>>> result = thread.get()
:param scope: The scope of the CDS Flow Conventions to retrieve. (required)
:type scope: str
:param code: The name of the CDS Flow Conventions to retrieve the data for. (required)
:type code: str
:param as_at: The asAt datetime at which to retrieve the CDS Flow Conventions. Defaults to return the latest version if not specified.
:type as_at: datetime
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: GetCdsFlowConventionsResponse
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the get_cds_flow_conventions_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.get_cds_flow_conventions_with_http_info(scope, code, as_at, **kwargs) # noqa: E501
[docs]
@validate_arguments
def get_cds_flow_conventions_with_http_info(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the CDS Flow Conventions to retrieve.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The name of the CDS Flow Conventions to retrieve the data for.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the CDS Flow Conventions. Defaults to return the latest version if not specified.")] = None, **kwargs) -> ApiResponse: # noqa: E501
"""[BETA] GetCdsFlowConventions: Get CDS Flow Conventions # noqa: E501
Get a CDS Flow Conventions from a single scope. The response will return either the conventions that has been stored, or a failure explaining why the request was unsuccessful. It is important to always check for any unsuccessful requests (failures). # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.get_cds_flow_conventions_with_http_info(scope, code, as_at, async_req=True)
>>> result = thread.get()
:param scope: The scope of the CDS Flow Conventions to retrieve. (required)
:type scope: str
:param code: The name of the CDS Flow Conventions to retrieve the data for. (required)
:type code: str
:param as_at: The asAt datetime at which to retrieve the CDS Flow Conventions. Defaults to return the latest version if not specified.
:type as_at: datetime
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(GetCdsFlowConventionsResponse, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'scope',
'code',
'as_at'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method get_cds_flow_conventions" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
if _params['scope']:
_path_params['scope'] = _params['scope']
if _params['code']:
_path_params['code'] = _params['code']
# process the query parameters
_query_params = []
if _params.get('as_at') is not None: # noqa: E501
if isinstance(_params['as_at'], datetime):
_query_params.append(('asAt', _params['as_at'].strftime(self.api_client.configuration.datetime_format)))
else:
_query_params.append(('asAt', _params['as_at']))
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "GetCdsFlowConventionsResponse",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/conventions/credit/conventions/{scope}/{code}', 'GET',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def get_flow_conventions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the Flow Conventions to retrieve.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The name of the Flow Conventions to retrieve the data for.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the Flow Conventions. Defaults to return the latest version if not specified.")] = None, **kwargs) -> GetFlowConventionsResponse: # noqa: E501
...
@overload
def get_flow_conventions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the Flow Conventions to retrieve.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The name of the Flow Conventions to retrieve the data for.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the Flow Conventions. Defaults to return the latest version if not specified.")] = None, async_req: Optional[bool]=True, **kwargs) -> GetFlowConventionsResponse: # noqa: E501
...
[docs]
@validate_arguments
def get_flow_conventions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the Flow Conventions to retrieve.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The name of the Flow Conventions to retrieve the data for.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the Flow Conventions. Defaults to return the latest version if not specified.")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[GetFlowConventionsResponse, Awaitable[GetFlowConventionsResponse]]: # noqa: E501
"""[BETA] GetFlowConventions: Get Flow Conventions # noqa: E501
Get a Flow Conventions from a single scope. The response will return either the conventions that has been stored, or a failure explaining why the request was unsuccessful. It is important to always check for any unsuccessful requests (failures). # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.get_flow_conventions(scope, code, as_at, async_req=True)
>>> result = thread.get()
:param scope: The scope of the Flow Conventions to retrieve. (required)
:type scope: str
:param code: The name of the Flow Conventions to retrieve the data for. (required)
:type code: str
:param as_at: The asAt datetime at which to retrieve the Flow Conventions. Defaults to return the latest version if not specified.
:type as_at: datetime
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: GetFlowConventionsResponse
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the get_flow_conventions_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.get_flow_conventions_with_http_info(scope, code, as_at, **kwargs) # noqa: E501
[docs]
@validate_arguments
def get_flow_conventions_with_http_info(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the Flow Conventions to retrieve.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The name of the Flow Conventions to retrieve the data for.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the Flow Conventions. Defaults to return the latest version if not specified.")] = None, **kwargs) -> ApiResponse: # noqa: E501
"""[BETA] GetFlowConventions: Get Flow Conventions # noqa: E501
Get a Flow Conventions from a single scope. The response will return either the conventions that has been stored, or a failure explaining why the request was unsuccessful. It is important to always check for any unsuccessful requests (failures). # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.get_flow_conventions_with_http_info(scope, code, as_at, async_req=True)
>>> result = thread.get()
:param scope: The scope of the Flow Conventions to retrieve. (required)
:type scope: str
:param code: The name of the Flow Conventions to retrieve the data for. (required)
:type code: str
:param as_at: The asAt datetime at which to retrieve the Flow Conventions. Defaults to return the latest version if not specified.
:type as_at: datetime
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(GetFlowConventionsResponse, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'scope',
'code',
'as_at'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method get_flow_conventions" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
if _params['scope']:
_path_params['scope'] = _params['scope']
if _params['code']:
_path_params['code'] = _params['code']
# process the query parameters
_query_params = []
if _params.get('as_at') is not None: # noqa: E501
if isinstance(_params['as_at'], datetime):
_query_params.append(('asAt', _params['as_at'].strftime(self.api_client.configuration.datetime_format)))
else:
_query_params.append(('asAt', _params['as_at']))
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "GetFlowConventionsResponse",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/conventions/rates/flowconventions/{scope}/{code}', 'GET',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def get_index_convention(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the Index Convention to retrieve.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The name of the Index Convention to retrieve the data for.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the Index Convention. Defaults to return the latest version if not specified.")] = None, **kwargs) -> GetIndexConventionResponse: # noqa: E501
...
@overload
def get_index_convention(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the Index Convention to retrieve.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The name of the Index Convention to retrieve the data for.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the Index Convention. Defaults to return the latest version if not specified.")] = None, async_req: Optional[bool]=True, **kwargs) -> GetIndexConventionResponse: # noqa: E501
...
[docs]
@validate_arguments
def get_index_convention(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the Index Convention to retrieve.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The name of the Index Convention to retrieve the data for.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the Index Convention. Defaults to return the latest version if not specified.")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[GetIndexConventionResponse, Awaitable[GetIndexConventionResponse]]: # noqa: E501
"""[BETA] GetIndexConvention: Get Index Convention # noqa: E501
Get a Index Convention from a single scope. The response will return either the conventions that has been stored, or a failure explaining why the request was unsuccessful. It is important to always check for any unsuccessful requests (failures). # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.get_index_convention(scope, code, as_at, async_req=True)
>>> result = thread.get()
:param scope: The scope of the Index Convention to retrieve. (required)
:type scope: str
:param code: The name of the Index Convention to retrieve the data for. (required)
:type code: str
:param as_at: The asAt datetime at which to retrieve the Index Convention. Defaults to return the latest version if not specified.
:type as_at: datetime
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: GetIndexConventionResponse
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the get_index_convention_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.get_index_convention_with_http_info(scope, code, as_at, **kwargs) # noqa: E501
[docs]
@validate_arguments
def get_index_convention_with_http_info(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the Index Convention to retrieve.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The name of the Index Convention to retrieve the data for.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the Index Convention. Defaults to return the latest version if not specified.")] = None, **kwargs) -> ApiResponse: # noqa: E501
"""[BETA] GetIndexConvention: Get Index Convention # noqa: E501
Get a Index Convention from a single scope. The response will return either the conventions that has been stored, or a failure explaining why the request was unsuccessful. It is important to always check for any unsuccessful requests (failures). # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.get_index_convention_with_http_info(scope, code, as_at, async_req=True)
>>> result = thread.get()
:param scope: The scope of the Index Convention to retrieve. (required)
:type scope: str
:param code: The name of the Index Convention to retrieve the data for. (required)
:type code: str
:param as_at: The asAt datetime at which to retrieve the Index Convention. Defaults to return the latest version if not specified.
:type as_at: datetime
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(GetIndexConventionResponse, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'scope',
'code',
'as_at'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method get_index_convention" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
if _params['scope']:
_path_params['scope'] = _params['scope']
if _params['code']:
_path_params['code'] = _params['code']
# process the query parameters
_query_params = []
if _params.get('as_at') is not None: # noqa: E501
if isinstance(_params['as_at'], datetime):
_query_params.append(('asAt', _params['as_at'].strftime(self.api_client.configuration.datetime_format)))
else:
_query_params.append(('asAt', _params['as_at']))
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "GetIndexConventionResponse",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/conventions/rates/indexconventions/{scope}/{code}', 'GET',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def list_cds_flow_conventions(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to list the conventions. Defaults to latest if not specified.")] = None, **kwargs) -> ResourceListOfGetCdsFlowConventionsResponse: # noqa: E501
...
@overload
def list_cds_flow_conventions(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to list the conventions. Defaults to latest if not specified.")] = None, async_req: Optional[bool]=True, **kwargs) -> ResourceListOfGetCdsFlowConventionsResponse: # noqa: E501
...
[docs]
@validate_arguments
def list_cds_flow_conventions(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to list the conventions. Defaults to latest if not specified.")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[ResourceListOfGetCdsFlowConventionsResponse, Awaitable[ResourceListOfGetCdsFlowConventionsResponse]]: # noqa: E501
"""[BETA] ListCdsFlowConventions: List the set of CDS Flow Conventions # noqa: E501
List the set of CDS Flow Conventions at the specified date/time # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.list_cds_flow_conventions(as_at, async_req=True)
>>> result = thread.get()
:param as_at: The asAt datetime at which to list the conventions. Defaults to latest if not specified.
:type as_at: datetime
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: ResourceListOfGetCdsFlowConventionsResponse
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the list_cds_flow_conventions_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.list_cds_flow_conventions_with_http_info(as_at, **kwargs) # noqa: E501
[docs]
@validate_arguments
def list_cds_flow_conventions_with_http_info(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to list the conventions. Defaults to latest if not specified.")] = None, **kwargs) -> ApiResponse: # noqa: E501
"""[BETA] ListCdsFlowConventions: List the set of CDS Flow Conventions # noqa: E501
List the set of CDS Flow Conventions at the specified date/time # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.list_cds_flow_conventions_with_http_info(as_at, async_req=True)
>>> result = thread.get()
:param as_at: The asAt datetime at which to list the conventions. Defaults to latest if not specified.
:type as_at: datetime
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(ResourceListOfGetCdsFlowConventionsResponse, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'as_at'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method list_cds_flow_conventions" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
# process the query parameters
_query_params = []
if _params.get('as_at') is not None: # noqa: E501
if isinstance(_params['as_at'], datetime):
_query_params.append(('asAt', _params['as_at'].strftime(self.api_client.configuration.datetime_format)))
else:
_query_params.append(('asAt', _params['as_at']))
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "ResourceListOfGetCdsFlowConventionsResponse",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/conventions/credit/conventions', 'GET',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def list_flow_conventions(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to list the conventions. Defaults to latest if not specified.")] = None, **kwargs) -> ResourceListOfGetFlowConventionsResponse: # noqa: E501
...
@overload
def list_flow_conventions(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to list the conventions. Defaults to latest if not specified.")] = None, async_req: Optional[bool]=True, **kwargs) -> ResourceListOfGetFlowConventionsResponse: # noqa: E501
...
[docs]
@validate_arguments
def list_flow_conventions(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to list the conventions. Defaults to latest if not specified.")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[ResourceListOfGetFlowConventionsResponse, Awaitable[ResourceListOfGetFlowConventionsResponse]]: # noqa: E501
"""[BETA] ListFlowConventions: List the set of Flow Conventions # noqa: E501
List the set of Flow Conventions at the specified date/time # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.list_flow_conventions(as_at, async_req=True)
>>> result = thread.get()
:param as_at: The asAt datetime at which to list the conventions. Defaults to latest if not specified.
:type as_at: datetime
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: ResourceListOfGetFlowConventionsResponse
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the list_flow_conventions_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.list_flow_conventions_with_http_info(as_at, **kwargs) # noqa: E501
[docs]
@validate_arguments
def list_flow_conventions_with_http_info(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to list the conventions. Defaults to latest if not specified.")] = None, **kwargs) -> ApiResponse: # noqa: E501
"""[BETA] ListFlowConventions: List the set of Flow Conventions # noqa: E501
List the set of Flow Conventions at the specified date/time # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.list_flow_conventions_with_http_info(as_at, async_req=True)
>>> result = thread.get()
:param as_at: The asAt datetime at which to list the conventions. Defaults to latest if not specified.
:type as_at: datetime
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(ResourceListOfGetFlowConventionsResponse, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'as_at'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method list_flow_conventions" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
# process the query parameters
_query_params = []
if _params.get('as_at') is not None: # noqa: E501
if isinstance(_params['as_at'], datetime):
_query_params.append(('asAt', _params['as_at'].strftime(self.api_client.configuration.datetime_format)))
else:
_query_params.append(('asAt', _params['as_at']))
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "ResourceListOfGetFlowConventionsResponse",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/conventions/rates/flowconventions', 'GET',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def list_index_convention(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to list the conventions. Defaults to latest if not specified.")] = None, **kwargs) -> ResourceListOfGetIndexConventionResponse: # noqa: E501
...
@overload
def list_index_convention(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to list the conventions. Defaults to latest if not specified.")] = None, async_req: Optional[bool]=True, **kwargs) -> ResourceListOfGetIndexConventionResponse: # noqa: E501
...
[docs]
@validate_arguments
def list_index_convention(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to list the conventions. Defaults to latest if not specified.")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[ResourceListOfGetIndexConventionResponse, Awaitable[ResourceListOfGetIndexConventionResponse]]: # noqa: E501
"""[BETA] ListIndexConvention: List the set of Index Conventions # noqa: E501
List the set of Index Conventions at the specified date/time # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.list_index_convention(as_at, async_req=True)
>>> result = thread.get()
:param as_at: The asAt datetime at which to list the conventions. Defaults to latest if not specified.
:type as_at: datetime
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: ResourceListOfGetIndexConventionResponse
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the list_index_convention_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.list_index_convention_with_http_info(as_at, **kwargs) # noqa: E501
[docs]
@validate_arguments
def list_index_convention_with_http_info(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to list the conventions. Defaults to latest if not specified.")] = None, **kwargs) -> ApiResponse: # noqa: E501
"""[BETA] ListIndexConvention: List the set of Index Conventions # noqa: E501
List the set of Index Conventions at the specified date/time # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.list_index_convention_with_http_info(as_at, async_req=True)
>>> result = thread.get()
:param as_at: The asAt datetime at which to list the conventions. Defaults to latest if not specified.
:type as_at: datetime
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(ResourceListOfGetIndexConventionResponse, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'as_at'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method list_index_convention" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
# process the query parameters
_query_params = []
if _params.get('as_at') is not None: # noqa: E501
if isinstance(_params['as_at'], datetime):
_query_params.append(('asAt', _params['as_at'].strftime(self.api_client.configuration.datetime_format)))
else:
_query_params.append(('asAt', _params['as_at']))
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "ResourceListOfGetIndexConventionResponse",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/conventions/rates/indexconventions', 'GET',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def upsert_cds_flow_conventions(self, upsert_cds_flow_conventions_request : Annotated[UpsertCdsFlowConventionsRequest, Field(..., description="The CDS Flow Conventions to update or insert")], **kwargs) -> UpsertSingleStructuredDataResponse: # noqa: E501
...
@overload
def upsert_cds_flow_conventions(self, upsert_cds_flow_conventions_request : Annotated[UpsertCdsFlowConventionsRequest, Field(..., description="The CDS Flow Conventions to update or insert")], async_req: Optional[bool]=True, **kwargs) -> UpsertSingleStructuredDataResponse: # noqa: E501
...
[docs]
@validate_arguments
def upsert_cds_flow_conventions(self, upsert_cds_flow_conventions_request : Annotated[UpsertCdsFlowConventionsRequest, Field(..., description="The CDS Flow Conventions to update or insert")], async_req: Optional[bool]=None, **kwargs) -> Union[UpsertSingleStructuredDataResponse, Awaitable[UpsertSingleStructuredDataResponse]]: # noqa: E501
"""[BETA] UpsertCdsFlowConventions: Upsert a set of CDS Flow Conventions. This creates or updates the data in Lusid. # noqa: E501
Update or insert CDS Flow Conventions in a single scope. An item will be updated if it already exists and inserted if it does not. The response will return the successfully updated or inserted CDS Flow Conventions or failure message if unsuccessful It is important to always check to verify success (or failure). # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.upsert_cds_flow_conventions(upsert_cds_flow_conventions_request, async_req=True)
>>> result = thread.get()
:param upsert_cds_flow_conventions_request: The CDS Flow Conventions to update or insert (required)
:type upsert_cds_flow_conventions_request: UpsertCdsFlowConventionsRequest
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: UpsertSingleStructuredDataResponse
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the upsert_cds_flow_conventions_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.upsert_cds_flow_conventions_with_http_info(upsert_cds_flow_conventions_request, **kwargs) # noqa: E501
[docs]
@validate_arguments
def upsert_cds_flow_conventions_with_http_info(self, upsert_cds_flow_conventions_request : Annotated[UpsertCdsFlowConventionsRequest, Field(..., description="The CDS Flow Conventions to update or insert")], **kwargs) -> ApiResponse: # noqa: E501
"""[BETA] UpsertCdsFlowConventions: Upsert a set of CDS Flow Conventions. This creates or updates the data in Lusid. # noqa: E501
Update or insert CDS Flow Conventions in a single scope. An item will be updated if it already exists and inserted if it does not. The response will return the successfully updated or inserted CDS Flow Conventions or failure message if unsuccessful It is important to always check to verify success (or failure). # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.upsert_cds_flow_conventions_with_http_info(upsert_cds_flow_conventions_request, async_req=True)
>>> result = thread.get()
:param upsert_cds_flow_conventions_request: The CDS Flow Conventions to update or insert (required)
:type upsert_cds_flow_conventions_request: UpsertCdsFlowConventionsRequest
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(UpsertSingleStructuredDataResponse, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'upsert_cds_flow_conventions_request'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method upsert_cds_flow_conventions" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
# process the query parameters
_query_params = []
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
if _params['upsert_cds_flow_conventions_request'] is not None:
_body_params = _params['upsert_cds_flow_conventions_request']
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# set the HTTP header `Content-Type`
_content_types_list = _params.get('_content_type',
self.api_client.select_header_content_type(
['application/json-patch+json', 'application/json', 'text/json', 'application/*+json']))
if _content_types_list:
_header_params['Content-Type'] = _content_types_list
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "UpsertSingleStructuredDataResponse",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/conventions/credit/conventions', 'POST',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def upsert_flow_conventions(self, upsert_flow_conventions_request : Annotated[UpsertFlowConventionsRequest, Field(..., description="The Flow Conventions to update or insert")], **kwargs) -> UpsertSingleStructuredDataResponse: # noqa: E501
...
@overload
def upsert_flow_conventions(self, upsert_flow_conventions_request : Annotated[UpsertFlowConventionsRequest, Field(..., description="The Flow Conventions to update or insert")], async_req: Optional[bool]=True, **kwargs) -> UpsertSingleStructuredDataResponse: # noqa: E501
...
[docs]
@validate_arguments
def upsert_flow_conventions(self, upsert_flow_conventions_request : Annotated[UpsertFlowConventionsRequest, Field(..., description="The Flow Conventions to update or insert")], async_req: Optional[bool]=None, **kwargs) -> Union[UpsertSingleStructuredDataResponse, Awaitable[UpsertSingleStructuredDataResponse]]: # noqa: E501
"""[BETA] UpsertFlowConventions: Upsert Flow Conventions. This creates or updates the data in Lusid. # noqa: E501
Update or insert Flow Conventions in a single scope. An item will be updated if it already exists and inserted if it does not. The response will return the successfully updated or inserted Flow Conventions or failure message if unsuccessful It is important to always check to verify success (or failure). # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.upsert_flow_conventions(upsert_flow_conventions_request, async_req=True)
>>> result = thread.get()
:param upsert_flow_conventions_request: The Flow Conventions to update or insert (required)
:type upsert_flow_conventions_request: UpsertFlowConventionsRequest
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: UpsertSingleStructuredDataResponse
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the upsert_flow_conventions_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.upsert_flow_conventions_with_http_info(upsert_flow_conventions_request, **kwargs) # noqa: E501
[docs]
@validate_arguments
def upsert_flow_conventions_with_http_info(self, upsert_flow_conventions_request : Annotated[UpsertFlowConventionsRequest, Field(..., description="The Flow Conventions to update or insert")], **kwargs) -> ApiResponse: # noqa: E501
"""[BETA] UpsertFlowConventions: Upsert Flow Conventions. This creates or updates the data in Lusid. # noqa: E501
Update or insert Flow Conventions in a single scope. An item will be updated if it already exists and inserted if it does not. The response will return the successfully updated or inserted Flow Conventions or failure message if unsuccessful It is important to always check to verify success (or failure). # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.upsert_flow_conventions_with_http_info(upsert_flow_conventions_request, async_req=True)
>>> result = thread.get()
:param upsert_flow_conventions_request: The Flow Conventions to update or insert (required)
:type upsert_flow_conventions_request: UpsertFlowConventionsRequest
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(UpsertSingleStructuredDataResponse, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'upsert_flow_conventions_request'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method upsert_flow_conventions" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
# process the query parameters
_query_params = []
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
if _params['upsert_flow_conventions_request'] is not None:
_body_params = _params['upsert_flow_conventions_request']
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# set the HTTP header `Content-Type`
_content_types_list = _params.get('_content_type',
self.api_client.select_header_content_type(
['application/json-patch+json', 'application/json', 'text/json', 'application/*+json']))
if _content_types_list:
_header_params['Content-Type'] = _content_types_list
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "UpsertSingleStructuredDataResponse",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/conventions/rates/flowconventions', 'POST',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def upsert_index_convention(self, upsert_index_convention_request : Annotated[UpsertIndexConventionRequest, Field(..., description="The Index Conventions to update or insert")], **kwargs) -> UpsertSingleStructuredDataResponse: # noqa: E501
...
@overload
def upsert_index_convention(self, upsert_index_convention_request : Annotated[UpsertIndexConventionRequest, Field(..., description="The Index Conventions to update or insert")], async_req: Optional[bool]=True, **kwargs) -> UpsertSingleStructuredDataResponse: # noqa: E501
...
[docs]
@validate_arguments
def upsert_index_convention(self, upsert_index_convention_request : Annotated[UpsertIndexConventionRequest, Field(..., description="The Index Conventions to update or insert")], async_req: Optional[bool]=None, **kwargs) -> Union[UpsertSingleStructuredDataResponse, Awaitable[UpsertSingleStructuredDataResponse]]: # noqa: E501
"""[BETA] UpsertIndexConvention: Upsert a set of Index Convention. This creates or updates the data in Lusid. # noqa: E501
Update or insert Index Convention in a single scope. An item will be updated if it already exists and inserted if it does not. The response will return the successfully updated or inserted Index Convention or failure message if unsuccessful It is important to always check to verify success (or failure). # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.upsert_index_convention(upsert_index_convention_request, async_req=True)
>>> result = thread.get()
:param upsert_index_convention_request: The Index Conventions to update or insert (required)
:type upsert_index_convention_request: UpsertIndexConventionRequest
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: UpsertSingleStructuredDataResponse
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the upsert_index_convention_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.upsert_index_convention_with_http_info(upsert_index_convention_request, **kwargs) # noqa: E501
[docs]
@validate_arguments
def upsert_index_convention_with_http_info(self, upsert_index_convention_request : Annotated[UpsertIndexConventionRequest, Field(..., description="The Index Conventions to update or insert")], **kwargs) -> ApiResponse: # noqa: E501
"""[BETA] UpsertIndexConvention: Upsert a set of Index Convention. This creates or updates the data in Lusid. # noqa: E501
Update or insert Index Convention in a single scope. An item will be updated if it already exists and inserted if it does not. The response will return the successfully updated or inserted Index Convention or failure message if unsuccessful It is important to always check to verify success (or failure). # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.upsert_index_convention_with_http_info(upsert_index_convention_request, async_req=True)
>>> result = thread.get()
:param upsert_index_convention_request: The Index Conventions to update or insert (required)
:type upsert_index_convention_request: UpsertIndexConventionRequest
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(UpsertSingleStructuredDataResponse, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'upsert_index_convention_request'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method upsert_index_convention" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
# process the query parameters
_query_params = []
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
if _params['upsert_index_convention_request'] is not None:
_body_params = _params['upsert_index_convention_request']
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# set the HTTP header `Content-Type`
_content_types_list = _params.get('_content_type',
self.api_client.select_header_content_type(
['application/json-patch+json', 'application/json', 'text/json', 'application/*+json']))
if _content_types_list:
_header_params['Content-Type'] = _content_types_list
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "UpsertSingleStructuredDataResponse",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/conventions/rates/indexconventions', 'POST',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))