Source code for sdk.lusid.api.corporate_action_sources_api

# coding: utf-8

"""
    LUSID API

    FINBOURNE Technology  # noqa: E501

    Contact: info@finbourne.com
    Generated by OpenAPI Generator (https://openapi-generator.tech)

    Do not edit the class manually.
"""


import re  # noqa: F401
import io
import warnings

from pydantic.v1 import validate_arguments, ValidationError
from typing import overload, Optional, Union, Awaitable

from typing_extensions import Annotated
from datetime import datetime

from pydantic.v1 import Field, StrictInt, StrictStr, conlist, constr, validator

from typing import Optional

from lusid.models.corporate_action_source import CorporateActionSource
from lusid.models.create_corporate_action_source_request import CreateCorporateActionSourceRequest
from lusid.models.deleted_entity_response import DeletedEntityResponse
from lusid.models.paged_resource_list_of_corporate_action_source import PagedResourceListOfCorporateActionSource
from lusid.models.paged_resource_list_of_instrument_event_holder import PagedResourceListOfInstrumentEventHolder
from lusid.models.resource_list_of_corporate_action import ResourceListOfCorporateAction
from lusid.models.upsert_corporate_action_request import UpsertCorporateActionRequest
from lusid.models.upsert_corporate_actions_response import UpsertCorporateActionsResponse
from lusid.models.upsert_instrument_event_request import UpsertInstrumentEventRequest
from lusid.models.upsert_instrument_events_response import UpsertInstrumentEventsResponse

from lusid.api_client import ApiClient
from lusid.api_response import ApiResponse
from lusid.exceptions import (  # noqa: F401
    ApiTypeError,
    ApiValueError
)


[docs] class CorporateActionSourcesApi: """NOTE: This class is auto generated by OpenAPI Generator Ref: https://openapi-generator.tech Do not edit the class manually. """ def __init__(self, api_client=None) -> None: if api_client is None: api_client = ApiClient.get_default() self.api_client = api_client @overload async def batch_upsert_corporate_actions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of corporate action source")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source")], upsert_corporate_action_request : Annotated[Optional[conlist(UpsertCorporateActionRequest, max_items=10000)], Field(description="The corporate action definitions")] = None, **kwargs) -> UpsertCorporateActionsResponse: # noqa: E501 ... @overload def batch_upsert_corporate_actions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of corporate action source")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source")], upsert_corporate_action_request : Annotated[Optional[conlist(UpsertCorporateActionRequest, max_items=10000)], Field(description="The corporate action definitions")] = None, async_req: Optional[bool]=True, **kwargs) -> UpsertCorporateActionsResponse: # noqa: E501 ...
[docs] @validate_arguments def batch_upsert_corporate_actions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of corporate action source")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source")], upsert_corporate_action_request : Annotated[Optional[conlist(UpsertCorporateActionRequest, max_items=10000)], Field(description="The corporate action definitions")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[UpsertCorporateActionsResponse, Awaitable[UpsertCorporateActionsResponse]]: # noqa: E501 """[EARLY ACCESS] BatchUpsertCorporateActions: Batch upsert corporate actions (instrument transition events) to corporate action source. # noqa: E501 Create or update one or more corporate actions in a particular corporate action source. Failures are identified in the body of the response. If a corporate action is upserted at exactly the same effective datetime as a transaction for the same instrument, the corporate action takes precedence. Depending on the nature of the corporate action, this may mean it affects the transaction. The maximum number of corporate actions that this method can upsert per request is 10,000. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.batch_upsert_corporate_actions(scope, code, upsert_corporate_action_request, async_req=True) >>> result = thread.get() :param scope: The scope of corporate action source (required) :type scope: str :param code: The code of the corporate action source (required) :type code: str :param upsert_corporate_action_request: The corporate action definitions :type upsert_corporate_action_request: List[UpsertCorporateActionRequest] :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: UpsertCorporateActionsResponse """ kwargs['_return_http_data_only'] = True if '_preload_content' in kwargs: message = "Error! Please call the batch_upsert_corporate_actions_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501 raise ValueError(message) if async_req is not None: kwargs['async_req'] = async_req return self.batch_upsert_corporate_actions_with_http_info(scope, code, upsert_corporate_action_request, **kwargs) # noqa: E501
[docs] @validate_arguments def batch_upsert_corporate_actions_with_http_info(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of corporate action source")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source")], upsert_corporate_action_request : Annotated[Optional[conlist(UpsertCorporateActionRequest, max_items=10000)], Field(description="The corporate action definitions")] = None, **kwargs) -> ApiResponse: # noqa: E501 """[EARLY ACCESS] BatchUpsertCorporateActions: Batch upsert corporate actions (instrument transition events) to corporate action source. # noqa: E501 Create or update one or more corporate actions in a particular corporate action source. Failures are identified in the body of the response. If a corporate action is upserted at exactly the same effective datetime as a transaction for the same instrument, the corporate action takes precedence. Depending on the nature of the corporate action, this may mean it affects the transaction. The maximum number of corporate actions that this method can upsert per request is 10,000. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.batch_upsert_corporate_actions_with_http_info(scope, code, upsert_corporate_action_request, async_req=True) >>> result = thread.get() :param scope: The scope of corporate action source (required) :type scope: str :param code: The code of the corporate action source (required) :type code: str :param upsert_corporate_action_request: The corporate action definitions :type upsert_corporate_action_request: List[UpsertCorporateActionRequest] :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(UpsertCorporateActionsResponse, status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = [ 'scope', 'code', 'upsert_corporate_action_request' ] _all_params.extend( [ 'async_req', '_return_http_data_only', '_preload_content', '_request_timeout', '_request_auth', '_content_type', '_headers' ] ) # validate the arguments for _key, _val in _params['kwargs'].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method batch_upsert_corporate_actions" % _key ) _params[_key] = _val del _params['kwargs'] _collection_formats = {} # process the path parameters _path_params = {} if _params['scope']: _path_params['scope'] = _params['scope'] if _params['code']: _path_params['code'] = _params['code'] # process the query parameters _query_params = [] # process the header parameters _header_params = dict(_params.get('_headers', {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None if _params['upsert_corporate_action_request'] is not None: _body_params = _params['upsert_corporate_action_request'] # set the HTTP header `Accept` _header_params['Accept'] = self.api_client.select_header_accept( ['text/plain', 'application/json', 'text/json']) # noqa: E501 # set the HTTP header `Content-Type` _content_types_list = _params.get('_content_type', self.api_client.select_header_content_type( ['application/json-patch+json', 'application/json', 'text/json', 'application/*+json'])) if _content_types_list: _header_params['Content-Type'] = _content_types_list # authentication setting _auth_settings = ['oauth2'] # noqa: E501 _response_types_map = { '201': "UpsertCorporateActionsResponse", '400': "LusidValidationProblemDetails", } return self.api_client.call_api( '/api/corporateactionsources/{scope}/{code}/corporateactions', 'POST', _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get('async_req'), _return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501 _preload_content=_params.get('_preload_content', True), _request_timeout=_params.get('_request_timeout'), collection_formats=_collection_formats, _request_auth=_params.get('_request_auth'))
@overload async def create_corporate_action_source(self, create_corporate_action_source_request : Annotated[CreateCorporateActionSourceRequest, Field(..., description="The corporate action source definition")], **kwargs) -> CorporateActionSource: # noqa: E501 ... @overload def create_corporate_action_source(self, create_corporate_action_source_request : Annotated[CreateCorporateActionSourceRequest, Field(..., description="The corporate action source definition")], async_req: Optional[bool]=True, **kwargs) -> CorporateActionSource: # noqa: E501 ...
[docs] @validate_arguments def create_corporate_action_source(self, create_corporate_action_source_request : Annotated[CreateCorporateActionSourceRequest, Field(..., description="The corporate action source definition")], async_req: Optional[bool]=None, **kwargs) -> Union[CorporateActionSource, Awaitable[CorporateActionSource]]: # noqa: E501 """[EARLY ACCESS] CreateCorporateActionSource: Create corporate action source # noqa: E501 Create a corporate action source. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.create_corporate_action_source(create_corporate_action_source_request, async_req=True) >>> result = thread.get() :param create_corporate_action_source_request: The corporate action source definition (required) :type create_corporate_action_source_request: CreateCorporateActionSourceRequest :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: CorporateActionSource """ kwargs['_return_http_data_only'] = True if '_preload_content' in kwargs: message = "Error! Please call the create_corporate_action_source_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501 raise ValueError(message) if async_req is not None: kwargs['async_req'] = async_req return self.create_corporate_action_source_with_http_info(create_corporate_action_source_request, **kwargs) # noqa: E501
[docs] @validate_arguments def create_corporate_action_source_with_http_info(self, create_corporate_action_source_request : Annotated[CreateCorporateActionSourceRequest, Field(..., description="The corporate action source definition")], **kwargs) -> ApiResponse: # noqa: E501 """[EARLY ACCESS] CreateCorporateActionSource: Create corporate action source # noqa: E501 Create a corporate action source. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.create_corporate_action_source_with_http_info(create_corporate_action_source_request, async_req=True) >>> result = thread.get() :param create_corporate_action_source_request: The corporate action source definition (required) :type create_corporate_action_source_request: CreateCorporateActionSourceRequest :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(CorporateActionSource, status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = [ 'create_corporate_action_source_request' ] _all_params.extend( [ 'async_req', '_return_http_data_only', '_preload_content', '_request_timeout', '_request_auth', '_content_type', '_headers' ] ) # validate the arguments for _key, _val in _params['kwargs'].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method create_corporate_action_source" % _key ) _params[_key] = _val del _params['kwargs'] _collection_formats = {} # process the path parameters _path_params = {} # process the query parameters _query_params = [] # process the header parameters _header_params = dict(_params.get('_headers', {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None if _params['create_corporate_action_source_request'] is not None: _body_params = _params['create_corporate_action_source_request'] # set the HTTP header `Accept` _header_params['Accept'] = self.api_client.select_header_accept( ['text/plain', 'application/json', 'text/json']) # noqa: E501 # set the HTTP header `Content-Type` _content_types_list = _params.get('_content_type', self.api_client.select_header_content_type( ['application/json-patch+json', 'application/json', 'text/json', 'application/*+json'])) if _content_types_list: _header_params['Content-Type'] = _content_types_list # authentication setting _auth_settings = ['oauth2'] # noqa: E501 _response_types_map = { '201': "CorporateActionSource", '400': "LusidValidationProblemDetails", } return self.api_client.call_api( '/api/corporateactionsources', 'POST', _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get('async_req'), _return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501 _preload_content=_params.get('_preload_content', True), _request_timeout=_params.get('_request_timeout'), collection_formats=_collection_formats, _request_auth=_params.get('_request_auth'))
@overload async def delete_corporate_action_source(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source to be deleted")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source to be deleted")], **kwargs) -> DeletedEntityResponse: # noqa: E501 ... @overload def delete_corporate_action_source(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source to be deleted")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source to be deleted")], async_req: Optional[bool]=True, **kwargs) -> DeletedEntityResponse: # noqa: E501 ...
[docs] @validate_arguments def delete_corporate_action_source(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source to be deleted")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source to be deleted")], async_req: Optional[bool]=None, **kwargs) -> Union[DeletedEntityResponse, Awaitable[DeletedEntityResponse]]: # noqa: E501 """[BETA] DeleteCorporateActionSource: Delete corporate actions (instrument transition events) from the corporate action source. # noqa: E501 Deletes a single corporate action source # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.delete_corporate_action_source(scope, code, async_req=True) >>> result = thread.get() :param scope: The scope of the corporate action source to be deleted (required) :type scope: str :param code: The code of the corporate action source to be deleted (required) :type code: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: DeletedEntityResponse """ kwargs['_return_http_data_only'] = True if '_preload_content' in kwargs: message = "Error! Please call the delete_corporate_action_source_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501 raise ValueError(message) if async_req is not None: kwargs['async_req'] = async_req return self.delete_corporate_action_source_with_http_info(scope, code, **kwargs) # noqa: E501
[docs] @validate_arguments def delete_corporate_action_source_with_http_info(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source to be deleted")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source to be deleted")], **kwargs) -> ApiResponse: # noqa: E501 """[BETA] DeleteCorporateActionSource: Delete corporate actions (instrument transition events) from the corporate action source. # noqa: E501 Deletes a single corporate action source # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.delete_corporate_action_source_with_http_info(scope, code, async_req=True) >>> result = thread.get() :param scope: The scope of the corporate action source to be deleted (required) :type scope: str :param code: The code of the corporate action source to be deleted (required) :type code: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(DeletedEntityResponse, status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = [ 'scope', 'code' ] _all_params.extend( [ 'async_req', '_return_http_data_only', '_preload_content', '_request_timeout', '_request_auth', '_content_type', '_headers' ] ) # validate the arguments for _key, _val in _params['kwargs'].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method delete_corporate_action_source" % _key ) _params[_key] = _val del _params['kwargs'] _collection_formats = {} # process the path parameters _path_params = {} if _params['scope']: _path_params['scope'] = _params['scope'] if _params['code']: _path_params['code'] = _params['code'] # process the query parameters _query_params = [] # process the header parameters _header_params = dict(_params.get('_headers', {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None # set the HTTP header `Accept` _header_params['Accept'] = self.api_client.select_header_accept( ['text/plain', 'application/json', 'text/json']) # noqa: E501 # authentication setting _auth_settings = ['oauth2'] # noqa: E501 _response_types_map = { '200': "DeletedEntityResponse", '400': "LusidValidationProblemDetails", } return self.api_client.call_api( '/api/corporateactionsources/{scope}/{code}', 'DELETE', _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get('async_req'), _return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501 _preload_content=_params.get('_preload_content', True), _request_timeout=_params.get('_request_timeout'), collection_formats=_collection_formats, _request_auth=_params.get('_request_auth'))
@overload async def delete_corporate_actions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source")], corporate_action_ids : Annotated[conlist(StrictStr, max_items=1000), Field(..., description="The IDs of the corporate actions to delete")], **kwargs) -> DeletedEntityResponse: # noqa: E501 ... @overload def delete_corporate_actions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source")], corporate_action_ids : Annotated[conlist(StrictStr, max_items=1000), Field(..., description="The IDs of the corporate actions to delete")], async_req: Optional[bool]=True, **kwargs) -> DeletedEntityResponse: # noqa: E501 ...
[docs] @validate_arguments def delete_corporate_actions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source")], corporate_action_ids : Annotated[conlist(StrictStr, max_items=1000), Field(..., description="The IDs of the corporate actions to delete")], async_req: Optional[bool]=None, **kwargs) -> Union[DeletedEntityResponse, Awaitable[DeletedEntityResponse]]: # noqa: E501 """[EARLY ACCESS] DeleteCorporateActions: Delete corporate actions # noqa: E501 Delete one or more corporate actions from a particular corporate action source. The maximum number of corporate actions that this method can delete per request is 1,000. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.delete_corporate_actions(scope, code, corporate_action_ids, async_req=True) >>> result = thread.get() :param scope: The scope of the corporate action source (required) :type scope: str :param code: The code of the corporate action source (required) :type code: str :param corporate_action_ids: The IDs of the corporate actions to delete (required) :type corporate_action_ids: List[str] :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: DeletedEntityResponse """ kwargs['_return_http_data_only'] = True if '_preload_content' in kwargs: message = "Error! Please call the delete_corporate_actions_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501 raise ValueError(message) if async_req is not None: kwargs['async_req'] = async_req return self.delete_corporate_actions_with_http_info(scope, code, corporate_action_ids, **kwargs) # noqa: E501
[docs] @validate_arguments def delete_corporate_actions_with_http_info(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source")], corporate_action_ids : Annotated[conlist(StrictStr, max_items=1000), Field(..., description="The IDs of the corporate actions to delete")], **kwargs) -> ApiResponse: # noqa: E501 """[EARLY ACCESS] DeleteCorporateActions: Delete corporate actions # noqa: E501 Delete one or more corporate actions from a particular corporate action source. The maximum number of corporate actions that this method can delete per request is 1,000. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.delete_corporate_actions_with_http_info(scope, code, corporate_action_ids, async_req=True) >>> result = thread.get() :param scope: The scope of the corporate action source (required) :type scope: str :param code: The code of the corporate action source (required) :type code: str :param corporate_action_ids: The IDs of the corporate actions to delete (required) :type corporate_action_ids: List[str] :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(DeletedEntityResponse, status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = [ 'scope', 'code', 'corporate_action_ids' ] _all_params.extend( [ 'async_req', '_return_http_data_only', '_preload_content', '_request_timeout', '_request_auth', '_content_type', '_headers' ] ) # validate the arguments for _key, _val in _params['kwargs'].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method delete_corporate_actions" % _key ) _params[_key] = _val del _params['kwargs'] _collection_formats = {} # process the path parameters _path_params = {} if _params['scope']: _path_params['scope'] = _params['scope'] if _params['code']: _path_params['code'] = _params['code'] # process the query parameters _query_params = [] if _params.get('corporate_action_ids') is not None: # noqa: E501 _query_params.append(('corporateActionIds', _params['corporate_action_ids'])) _collection_formats['corporateActionIds'] = 'multi' # process the header parameters _header_params = dict(_params.get('_headers', {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None # set the HTTP header `Accept` _header_params['Accept'] = self.api_client.select_header_accept( ['text/plain', 'application/json', 'text/json']) # noqa: E501 # authentication setting _auth_settings = ['oauth2'] # noqa: E501 _response_types_map = { '200': "DeletedEntityResponse", '400': "LusidValidationProblemDetails", } return self.api_client.call_api( '/api/corporateactionsources/{scope}/{code}/corporateactions', 'DELETE', _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get('async_req'), _return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501 _preload_content=_params.get('_preload_content', True), _request_timeout=_params.get('_request_timeout'), collection_formats=_collection_formats, _request_auth=_params.get('_request_auth'))
@overload async def delete_instrument_events(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source")], instrument_event_ids : Annotated[conlist(StrictStr, max_items=1000), Field(..., description="The IDs of the instrument events to delete")], **kwargs) -> DeletedEntityResponse: # noqa: E501 ... @overload def delete_instrument_events(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source")], instrument_event_ids : Annotated[conlist(StrictStr, max_items=1000), Field(..., description="The IDs of the instrument events to delete")], async_req: Optional[bool]=True, **kwargs) -> DeletedEntityResponse: # noqa: E501 ...
[docs] @validate_arguments def delete_instrument_events(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source")], instrument_event_ids : Annotated[conlist(StrictStr, max_items=1000), Field(..., description="The IDs of the instrument events to delete")], async_req: Optional[bool]=None, **kwargs) -> Union[DeletedEntityResponse, Awaitable[DeletedEntityResponse]]: # noqa: E501 """[EARLY ACCESS] DeleteInstrumentEvents: Delete corporate actions (instrument transition events) from the corporate action source. # noqa: E501 Delete one or more corporate actions from a particular corporate action source. The maximum number of instrument events that this method can delete per request is 1,000. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.delete_instrument_events(scope, code, instrument_event_ids, async_req=True) >>> result = thread.get() :param scope: The scope of the corporate action source (required) :type scope: str :param code: The code of the corporate action source (required) :type code: str :param instrument_event_ids: The IDs of the instrument events to delete (required) :type instrument_event_ids: List[str] :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: DeletedEntityResponse """ kwargs['_return_http_data_only'] = True if '_preload_content' in kwargs: message = "Error! Please call the delete_instrument_events_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501 raise ValueError(message) if async_req is not None: kwargs['async_req'] = async_req return self.delete_instrument_events_with_http_info(scope, code, instrument_event_ids, **kwargs) # noqa: E501
[docs] @validate_arguments def delete_instrument_events_with_http_info(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source")], instrument_event_ids : Annotated[conlist(StrictStr, max_items=1000), Field(..., description="The IDs of the instrument events to delete")], **kwargs) -> ApiResponse: # noqa: E501 """[EARLY ACCESS] DeleteInstrumentEvents: Delete corporate actions (instrument transition events) from the corporate action source. # noqa: E501 Delete one or more corporate actions from a particular corporate action source. The maximum number of instrument events that this method can delete per request is 1,000. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.delete_instrument_events_with_http_info(scope, code, instrument_event_ids, async_req=True) >>> result = thread.get() :param scope: The scope of the corporate action source (required) :type scope: str :param code: The code of the corporate action source (required) :type code: str :param instrument_event_ids: The IDs of the instrument events to delete (required) :type instrument_event_ids: List[str] :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(DeletedEntityResponse, status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = [ 'scope', 'code', 'instrument_event_ids' ] _all_params.extend( [ 'async_req', '_return_http_data_only', '_preload_content', '_request_timeout', '_request_auth', '_content_type', '_headers' ] ) # validate the arguments for _key, _val in _params['kwargs'].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method delete_instrument_events" % _key ) _params[_key] = _val del _params['kwargs'] _collection_formats = {} # process the path parameters _path_params = {} if _params['scope']: _path_params['scope'] = _params['scope'] if _params['code']: _path_params['code'] = _params['code'] # process the query parameters _query_params = [] if _params.get('instrument_event_ids') is not None: # noqa: E501 _query_params.append(('instrumentEventIds', _params['instrument_event_ids'])) _collection_formats['instrumentEventIds'] = 'multi' # process the header parameters _header_params = dict(_params.get('_headers', {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None # set the HTTP header `Accept` _header_params['Accept'] = self.api_client.select_header_accept( ['text/plain', 'application/json', 'text/json']) # noqa: E501 # authentication setting _auth_settings = ['oauth2'] # noqa: E501 _response_types_map = { '200': "DeletedEntityResponse", '400': "LusidValidationProblemDetails", } return self.api_client.call_api( '/api/corporateactionsources/{scope}/{code}/instrumentevents', 'DELETE', _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get('async_req'), _return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501 _preload_content=_params.get('_preload_content', True), _request_timeout=_params.get('_request_timeout'), collection_formats=_collection_formats, _request_auth=_params.get('_request_auth'))
@overload async def get_corporate_actions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source.")], from_effective_at : Annotated[Optional[constr(strict=True, max_length=256, min_length=0)], Field(description="Optional. The start effective date of the data range.")] = None, to_effective_at : Annotated[Optional[constr(strict=True, max_length=256, min_length=0)], Field(description="Optional. The end effective date of the data range.")] = None, as_at : Annotated[Optional[datetime], Field(description="Optional. The AsAt date of the data.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="Optional. Order the results by these fields. Use use the '-' sign to denote descending order e.g. -MyFieldName")] = None, limit : Annotated[Optional[StrictInt], Field(description="Optional. When paginating, limit the results to this number.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Optional. Expression to filter the result set. For example, to filter on the Announcement Date, use \"announcementDate eq '2020-03-06'\" Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid.")] = None, **kwargs) -> ResourceListOfCorporateAction: # noqa: E501 ... @overload def get_corporate_actions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source.")], from_effective_at : Annotated[Optional[constr(strict=True, max_length=256, min_length=0)], Field(description="Optional. The start effective date of the data range.")] = None, to_effective_at : Annotated[Optional[constr(strict=True, max_length=256, min_length=0)], Field(description="Optional. The end effective date of the data range.")] = None, as_at : Annotated[Optional[datetime], Field(description="Optional. The AsAt date of the data.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="Optional. Order the results by these fields. Use use the '-' sign to denote descending order e.g. -MyFieldName")] = None, limit : Annotated[Optional[StrictInt], Field(description="Optional. When paginating, limit the results to this number.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Optional. Expression to filter the result set. For example, to filter on the Announcement Date, use \"announcementDate eq '2020-03-06'\" Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid.")] = None, async_req: Optional[bool]=True, **kwargs) -> ResourceListOfCorporateAction: # noqa: E501 ...
[docs] @validate_arguments def get_corporate_actions(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source.")], from_effective_at : Annotated[Optional[constr(strict=True, max_length=256, min_length=0)], Field(description="Optional. The start effective date of the data range.")] = None, to_effective_at : Annotated[Optional[constr(strict=True, max_length=256, min_length=0)], Field(description="Optional. The end effective date of the data range.")] = None, as_at : Annotated[Optional[datetime], Field(description="Optional. The AsAt date of the data.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="Optional. Order the results by these fields. Use use the '-' sign to denote descending order e.g. -MyFieldName")] = None, limit : Annotated[Optional[StrictInt], Field(description="Optional. When paginating, limit the results to this number.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Optional. Expression to filter the result set. For example, to filter on the Announcement Date, use \"announcementDate eq '2020-03-06'\" Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid.")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[ResourceListOfCorporateAction, Awaitable[ResourceListOfCorporateAction]]: # noqa: E501 """[EARLY ACCESS] GetCorporateActions: List corporate actions (instrument transition events) from the corporate action source. # noqa: E501 Get corporate actions from a particular corporate action source. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.get_corporate_actions(scope, code, from_effective_at, to_effective_at, as_at, sort_by, limit, filter, async_req=True) >>> result = thread.get() :param scope: The scope of the corporate action source. (required) :type scope: str :param code: The code of the corporate action source. (required) :type code: str :param from_effective_at: Optional. The start effective date of the data range. :type from_effective_at: str :param to_effective_at: Optional. The end effective date of the data range. :type to_effective_at: str :param as_at: Optional. The AsAt date of the data. :type as_at: datetime :param sort_by: Optional. Order the results by these fields. Use use the '-' sign to denote descending order e.g. -MyFieldName :type sort_by: List[str] :param limit: Optional. When paginating, limit the results to this number. :type limit: int :param filter: Optional. Expression to filter the result set. For example, to filter on the Announcement Date, use \"announcementDate eq '2020-03-06'\" Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid. :type filter: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: ResourceListOfCorporateAction """ kwargs['_return_http_data_only'] = True if '_preload_content' in kwargs: message = "Error! Please call the get_corporate_actions_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501 raise ValueError(message) if async_req is not None: kwargs['async_req'] = async_req return self.get_corporate_actions_with_http_info(scope, code, from_effective_at, to_effective_at, as_at, sort_by, limit, filter, **kwargs) # noqa: E501
[docs] @validate_arguments def get_corporate_actions_with_http_info(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source.")], from_effective_at : Annotated[Optional[constr(strict=True, max_length=256, min_length=0)], Field(description="Optional. The start effective date of the data range.")] = None, to_effective_at : Annotated[Optional[constr(strict=True, max_length=256, min_length=0)], Field(description="Optional. The end effective date of the data range.")] = None, as_at : Annotated[Optional[datetime], Field(description="Optional. The AsAt date of the data.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="Optional. Order the results by these fields. Use use the '-' sign to denote descending order e.g. -MyFieldName")] = None, limit : Annotated[Optional[StrictInt], Field(description="Optional. When paginating, limit the results to this number.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Optional. Expression to filter the result set. For example, to filter on the Announcement Date, use \"announcementDate eq '2020-03-06'\" Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid.")] = None, **kwargs) -> ApiResponse: # noqa: E501 """[EARLY ACCESS] GetCorporateActions: List corporate actions (instrument transition events) from the corporate action source. # noqa: E501 Get corporate actions from a particular corporate action source. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.get_corporate_actions_with_http_info(scope, code, from_effective_at, to_effective_at, as_at, sort_by, limit, filter, async_req=True) >>> result = thread.get() :param scope: The scope of the corporate action source. (required) :type scope: str :param code: The code of the corporate action source. (required) :type code: str :param from_effective_at: Optional. The start effective date of the data range. :type from_effective_at: str :param to_effective_at: Optional. The end effective date of the data range. :type to_effective_at: str :param as_at: Optional. The AsAt date of the data. :type as_at: datetime :param sort_by: Optional. Order the results by these fields. Use use the '-' sign to denote descending order e.g. -MyFieldName :type sort_by: List[str] :param limit: Optional. When paginating, limit the results to this number. :type limit: int :param filter: Optional. Expression to filter the result set. For example, to filter on the Announcement Date, use \"announcementDate eq '2020-03-06'\" Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid. :type filter: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(ResourceListOfCorporateAction, status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = [ 'scope', 'code', 'from_effective_at', 'to_effective_at', 'as_at', 'sort_by', 'limit', 'filter' ] _all_params.extend( [ 'async_req', '_return_http_data_only', '_preload_content', '_request_timeout', '_request_auth', '_content_type', '_headers' ] ) # validate the arguments for _key, _val in _params['kwargs'].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method get_corporate_actions" % _key ) _params[_key] = _val del _params['kwargs'] _collection_formats = {} # process the path parameters _path_params = {} if _params['scope']: _path_params['scope'] = _params['scope'] if _params['code']: _path_params['code'] = _params['code'] # process the query parameters _query_params = [] if _params.get('from_effective_at') is not None: # noqa: E501 _query_params.append(('fromEffectiveAt', _params['from_effective_at'])) if _params.get('to_effective_at') is not None: # noqa: E501 _query_params.append(('toEffectiveAt', _params['to_effective_at'])) if _params.get('as_at') is not None: # noqa: E501 if isinstance(_params['as_at'], datetime): _query_params.append(('asAt', _params['as_at'].strftime(self.api_client.configuration.datetime_format))) else: _query_params.append(('asAt', _params['as_at'])) if _params.get('sort_by') is not None: # noqa: E501 _query_params.append(('sortBy', _params['sort_by'])) _collection_formats['sortBy'] = 'multi' if _params.get('limit') is not None: # noqa: E501 _query_params.append(('limit', _params['limit'])) if _params.get('filter') is not None: # noqa: E501 _query_params.append(('filter', _params['filter'])) # process the header parameters _header_params = dict(_params.get('_headers', {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None # set the HTTP header `Accept` _header_params['Accept'] = self.api_client.select_header_accept( ['text/plain', 'application/json', 'text/json']) # noqa: E501 # authentication setting _auth_settings = ['oauth2'] # noqa: E501 _response_types_map = { '200': "ResourceListOfCorporateAction", '400': "LusidValidationProblemDetails", } return self.api_client.call_api( '/api/corporateactionsources/{scope}/{code}/corporateactions', 'GET', _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get('async_req'), _return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501 _preload_content=_params.get('_preload_content', True), _request_timeout=_params.get('_request_timeout'), collection_formats=_collection_formats, _request_auth=_params.get('_request_auth'))
@overload async def get_instrument_events(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source.")], as_at : Annotated[Optional[datetime], Field(description="Optional. The AsAt date of the data.")] = None, limit : Annotated[Optional[StrictInt], Field(description="Optional. When paginating, limit the number of returned results to this many. If not specified, a default of 1000 is used.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="Optional. The pagination token to use to continue listing items from a previous call. Page values are return from list calls, and must be supplied exactly as returned. Additionally, when specifying this value, asAt, filter and limit must not be modified.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Optional. Expression to filter the result set.")] = None, **kwargs) -> PagedResourceListOfInstrumentEventHolder: # noqa: E501 ... @overload def get_instrument_events(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source.")], as_at : Annotated[Optional[datetime], Field(description="Optional. The AsAt date of the data.")] = None, limit : Annotated[Optional[StrictInt], Field(description="Optional. When paginating, limit the number of returned results to this many. If not specified, a default of 1000 is used.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="Optional. The pagination token to use to continue listing items from a previous call. Page values are return from list calls, and must be supplied exactly as returned. Additionally, when specifying this value, asAt, filter and limit must not be modified.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Optional. Expression to filter the result set.")] = None, async_req: Optional[bool]=True, **kwargs) -> PagedResourceListOfInstrumentEventHolder: # noqa: E501 ...
[docs] @validate_arguments def get_instrument_events(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source.")], as_at : Annotated[Optional[datetime], Field(description="Optional. The AsAt date of the data.")] = None, limit : Annotated[Optional[StrictInt], Field(description="Optional. When paginating, limit the number of returned results to this many. If not specified, a default of 1000 is used.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="Optional. The pagination token to use to continue listing items from a previous call. Page values are return from list calls, and must be supplied exactly as returned. Additionally, when specifying this value, asAt, filter and limit must not be modified.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Optional. Expression to filter the result set.")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[PagedResourceListOfInstrumentEventHolder, Awaitable[PagedResourceListOfInstrumentEventHolder]]: # noqa: E501 """[EARLY ACCESS] GetInstrumentEvents: Get extrinsic instrument events out of a given corporate actions source. # noqa: E501 Retrieves extrinsic corporate actions out of a corporate actions source # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.get_instrument_events(scope, code, as_at, limit, page, filter, async_req=True) >>> result = thread.get() :param scope: The scope of the corporate action source. (required) :type scope: str :param code: The code of the corporate action source. (required) :type code: str :param as_at: Optional. The AsAt date of the data. :type as_at: datetime :param limit: Optional. When paginating, limit the number of returned results to this many. If not specified, a default of 1000 is used. :type limit: int :param page: Optional. The pagination token to use to continue listing items from a previous call. Page values are return from list calls, and must be supplied exactly as returned. Additionally, when specifying this value, asAt, filter and limit must not be modified. :type page: str :param filter: Optional. Expression to filter the result set. :type filter: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: PagedResourceListOfInstrumentEventHolder """ kwargs['_return_http_data_only'] = True if '_preload_content' in kwargs: message = "Error! Please call the get_instrument_events_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501 raise ValueError(message) if async_req is not None: kwargs['async_req'] = async_req return self.get_instrument_events_with_http_info(scope, code, as_at, limit, page, filter, **kwargs) # noqa: E501
[docs] @validate_arguments def get_instrument_events_with_http_info(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source.")], as_at : Annotated[Optional[datetime], Field(description="Optional. The AsAt date of the data.")] = None, limit : Annotated[Optional[StrictInt], Field(description="Optional. When paginating, limit the number of returned results to this many. If not specified, a default of 1000 is used.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="Optional. The pagination token to use to continue listing items from a previous call. Page values are return from list calls, and must be supplied exactly as returned. Additionally, when specifying this value, asAt, filter and limit must not be modified.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Optional. Expression to filter the result set.")] = None, **kwargs) -> ApiResponse: # noqa: E501 """[EARLY ACCESS] GetInstrumentEvents: Get extrinsic instrument events out of a given corporate actions source. # noqa: E501 Retrieves extrinsic corporate actions out of a corporate actions source # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.get_instrument_events_with_http_info(scope, code, as_at, limit, page, filter, async_req=True) >>> result = thread.get() :param scope: The scope of the corporate action source. (required) :type scope: str :param code: The code of the corporate action source. (required) :type code: str :param as_at: Optional. The AsAt date of the data. :type as_at: datetime :param limit: Optional. When paginating, limit the number of returned results to this many. If not specified, a default of 1000 is used. :type limit: int :param page: Optional. The pagination token to use to continue listing items from a previous call. Page values are return from list calls, and must be supplied exactly as returned. Additionally, when specifying this value, asAt, filter and limit must not be modified. :type page: str :param filter: Optional. Expression to filter the result set. :type filter: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(PagedResourceListOfInstrumentEventHolder, status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = [ 'scope', 'code', 'as_at', 'limit', 'page', 'filter' ] _all_params.extend( [ 'async_req', '_return_http_data_only', '_preload_content', '_request_timeout', '_request_auth', '_content_type', '_headers' ] ) # validate the arguments for _key, _val in _params['kwargs'].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method get_instrument_events" % _key ) _params[_key] = _val del _params['kwargs'] _collection_formats = {} # process the path parameters _path_params = {} if _params['scope']: _path_params['scope'] = _params['scope'] if _params['code']: _path_params['code'] = _params['code'] # process the query parameters _query_params = [] if _params.get('as_at') is not None: # noqa: E501 if isinstance(_params['as_at'], datetime): _query_params.append(('asAt', _params['as_at'].strftime(self.api_client.configuration.datetime_format))) else: _query_params.append(('asAt', _params['as_at'])) if _params.get('limit') is not None: # noqa: E501 _query_params.append(('limit', _params['limit'])) if _params.get('page') is not None: # noqa: E501 _query_params.append(('page', _params['page'])) if _params.get('filter') is not None: # noqa: E501 _query_params.append(('filter', _params['filter'])) # process the header parameters _header_params = dict(_params.get('_headers', {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None # set the HTTP header `Accept` _header_params['Accept'] = self.api_client.select_header_accept( ['text/plain', 'application/json', 'text/json']) # noqa: E501 # authentication setting _auth_settings = ['oauth2'] # noqa: E501 _response_types_map = { '200': "PagedResourceListOfInstrumentEventHolder", '400': "LusidValidationProblemDetails", } return self.api_client.call_api( '/api/corporateactionsources/{scope}/{code}/instrumentevents', 'GET', _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get('async_req'), _return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501 _preload_content=_params.get('_preload_content', True), _request_timeout=_params.get('_request_timeout'), collection_formats=_collection_formats, _request_auth=_params.get('_request_auth'))
@overload async def list_corporate_action_sources(self, as_at : Annotated[Optional[datetime], Field(description="Optional. The AsAt date of the data")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="Optional. Order the results by these fields. Use use the '-' sign to denote descending order e.g. -MyFieldName")] = None, limit : Annotated[Optional[StrictInt], Field(description="Optional. When paginating, limit the number of returned results to this many. If not specified, a default of 100 is used.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Optional. Expression to filter the result set. For example, to filter on the Display Name, use \"displayName eq 'string'\" Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="Optional. The pagination token to use to continue listing items from a previous call. Page values are return from list calls, and must be supplied exactly as returned. Additionally, when specifying this value, the filter, asAt, and limit must not be modified.")] = None, **kwargs) -> PagedResourceListOfCorporateActionSource: # noqa: E501 ... @overload def list_corporate_action_sources(self, as_at : Annotated[Optional[datetime], Field(description="Optional. The AsAt date of the data")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="Optional. Order the results by these fields. Use use the '-' sign to denote descending order e.g. -MyFieldName")] = None, limit : Annotated[Optional[StrictInt], Field(description="Optional. When paginating, limit the number of returned results to this many. If not specified, a default of 100 is used.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Optional. Expression to filter the result set. For example, to filter on the Display Name, use \"displayName eq 'string'\" Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="Optional. The pagination token to use to continue listing items from a previous call. Page values are return from list calls, and must be supplied exactly as returned. Additionally, when specifying this value, the filter, asAt, and limit must not be modified.")] = None, async_req: Optional[bool]=True, **kwargs) -> PagedResourceListOfCorporateActionSource: # noqa: E501 ...
[docs] @validate_arguments def list_corporate_action_sources(self, as_at : Annotated[Optional[datetime], Field(description="Optional. The AsAt date of the data")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="Optional. Order the results by these fields. Use use the '-' sign to denote descending order e.g. -MyFieldName")] = None, limit : Annotated[Optional[StrictInt], Field(description="Optional. When paginating, limit the number of returned results to this many. If not specified, a default of 100 is used.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Optional. Expression to filter the result set. For example, to filter on the Display Name, use \"displayName eq 'string'\" Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="Optional. The pagination token to use to continue listing items from a previous call. Page values are return from list calls, and must be supplied exactly as returned. Additionally, when specifying this value, the filter, asAt, and limit must not be modified.")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[PagedResourceListOfCorporateActionSource, Awaitable[PagedResourceListOfCorporateActionSource]]: # noqa: E501 """[EARLY ACCESS] ListCorporateActionSources: List corporate action sources # noqa: E501 Gets a list of all corporate action sources # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.list_corporate_action_sources(as_at, sort_by, limit, filter, page, async_req=True) >>> result = thread.get() :param as_at: Optional. The AsAt date of the data :type as_at: datetime :param sort_by: Optional. Order the results by these fields. Use use the '-' sign to denote descending order e.g. -MyFieldName :type sort_by: List[str] :param limit: Optional. When paginating, limit the number of returned results to this many. If not specified, a default of 100 is used. :type limit: int :param filter: Optional. Expression to filter the result set. For example, to filter on the Display Name, use \"displayName eq 'string'\" Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid. :type filter: str :param page: Optional. The pagination token to use to continue listing items from a previous call. Page values are return from list calls, and must be supplied exactly as returned. Additionally, when specifying this value, the filter, asAt, and limit must not be modified. :type page: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: PagedResourceListOfCorporateActionSource """ kwargs['_return_http_data_only'] = True if '_preload_content' in kwargs: message = "Error! Please call the list_corporate_action_sources_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501 raise ValueError(message) if async_req is not None: kwargs['async_req'] = async_req return self.list_corporate_action_sources_with_http_info(as_at, sort_by, limit, filter, page, **kwargs) # noqa: E501
[docs] @validate_arguments def list_corporate_action_sources_with_http_info(self, as_at : Annotated[Optional[datetime], Field(description="Optional. The AsAt date of the data")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="Optional. Order the results by these fields. Use use the '-' sign to denote descending order e.g. -MyFieldName")] = None, limit : Annotated[Optional[StrictInt], Field(description="Optional. When paginating, limit the number of returned results to this many. If not specified, a default of 100 is used.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Optional. Expression to filter the result set. For example, to filter on the Display Name, use \"displayName eq 'string'\" Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="Optional. The pagination token to use to continue listing items from a previous call. Page values are return from list calls, and must be supplied exactly as returned. Additionally, when specifying this value, the filter, asAt, and limit must not be modified.")] = None, **kwargs) -> ApiResponse: # noqa: E501 """[EARLY ACCESS] ListCorporateActionSources: List corporate action sources # noqa: E501 Gets a list of all corporate action sources # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.list_corporate_action_sources_with_http_info(as_at, sort_by, limit, filter, page, async_req=True) >>> result = thread.get() :param as_at: Optional. The AsAt date of the data :type as_at: datetime :param sort_by: Optional. Order the results by these fields. Use use the '-' sign to denote descending order e.g. -MyFieldName :type sort_by: List[str] :param limit: Optional. When paginating, limit the number of returned results to this many. If not specified, a default of 100 is used. :type limit: int :param filter: Optional. Expression to filter the result set. For example, to filter on the Display Name, use \"displayName eq 'string'\" Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid. :type filter: str :param page: Optional. The pagination token to use to continue listing items from a previous call. Page values are return from list calls, and must be supplied exactly as returned. Additionally, when specifying this value, the filter, asAt, and limit must not be modified. :type page: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(PagedResourceListOfCorporateActionSource, status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = [ 'as_at', 'sort_by', 'limit', 'filter', 'page' ] _all_params.extend( [ 'async_req', '_return_http_data_only', '_preload_content', '_request_timeout', '_request_auth', '_content_type', '_headers' ] ) # validate the arguments for _key, _val in _params['kwargs'].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method list_corporate_action_sources" % _key ) _params[_key] = _val del _params['kwargs'] _collection_formats = {} # process the path parameters _path_params = {} # process the query parameters _query_params = [] if _params.get('as_at') is not None: # noqa: E501 if isinstance(_params['as_at'], datetime): _query_params.append(('asAt', _params['as_at'].strftime(self.api_client.configuration.datetime_format))) else: _query_params.append(('asAt', _params['as_at'])) if _params.get('sort_by') is not None: # noqa: E501 _query_params.append(('sortBy', _params['sort_by'])) _collection_formats['sortBy'] = 'multi' if _params.get('limit') is not None: # noqa: E501 _query_params.append(('limit', _params['limit'])) if _params.get('filter') is not None: # noqa: E501 _query_params.append(('filter', _params['filter'])) if _params.get('page') is not None: # noqa: E501 _query_params.append(('page', _params['page'])) # process the header parameters _header_params = dict(_params.get('_headers', {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None # set the HTTP header `Accept` _header_params['Accept'] = self.api_client.select_header_accept( ['text/plain', 'application/json', 'text/json']) # noqa: E501 # authentication setting _auth_settings = ['oauth2'] # noqa: E501 _response_types_map = { '200': "PagedResourceListOfCorporateActionSource", '400': "LusidValidationProblemDetails", } return self.api_client.call_api( '/api/corporateactionsources', 'GET', _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get('async_req'), _return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501 _preload_content=_params.get('_preload_content', True), _request_timeout=_params.get('_request_timeout'), collection_formats=_collection_formats, _request_auth=_params.get('_request_auth'))
@overload async def upsert_instrument_events(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source.")], upsert_instrument_event_request : Annotated[Optional[conlist(UpsertInstrumentEventRequest, max_items=10000)], Field(description="The instrument event definitions.")] = None, **kwargs) -> UpsertInstrumentEventsResponse: # noqa: E501 ... @overload def upsert_instrument_events(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source.")], upsert_instrument_event_request : Annotated[Optional[conlist(UpsertInstrumentEventRequest, max_items=10000)], Field(description="The instrument event definitions.")] = None, async_req: Optional[bool]=True, **kwargs) -> UpsertInstrumentEventsResponse: # noqa: E501 ...
[docs] @validate_arguments def upsert_instrument_events(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source.")], upsert_instrument_event_request : Annotated[Optional[conlist(UpsertInstrumentEventRequest, max_items=10000)], Field(description="The instrument event definitions.")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[UpsertInstrumentEventsResponse, Awaitable[UpsertInstrumentEventsResponse]]: # noqa: E501 """[EARLY ACCESS] UpsertInstrumentEvents: Upsert instrument events to the provided corporate actions source. # noqa: E501 Batch upsert instrument events to corporate action sources. The maximum number of instrument events that this method can upsert per request is 10,000. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.upsert_instrument_events(scope, code, upsert_instrument_event_request, async_req=True) >>> result = thread.get() :param scope: The scope of the corporate action source. (required) :type scope: str :param code: The code of the corporate action source. (required) :type code: str :param upsert_instrument_event_request: The instrument event definitions. :type upsert_instrument_event_request: List[UpsertInstrumentEventRequest] :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: UpsertInstrumentEventsResponse """ kwargs['_return_http_data_only'] = True if '_preload_content' in kwargs: message = "Error! Please call the upsert_instrument_events_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501 raise ValueError(message) if async_req is not None: kwargs['async_req'] = async_req return self.upsert_instrument_events_with_http_info(scope, code, upsert_instrument_event_request, **kwargs) # noqa: E501
[docs] @validate_arguments def upsert_instrument_events_with_http_info(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope of the corporate action source.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The code of the corporate action source.")], upsert_instrument_event_request : Annotated[Optional[conlist(UpsertInstrumentEventRequest, max_items=10000)], Field(description="The instrument event definitions.")] = None, **kwargs) -> ApiResponse: # noqa: E501 """[EARLY ACCESS] UpsertInstrumentEvents: Upsert instrument events to the provided corporate actions source. # noqa: E501 Batch upsert instrument events to corporate action sources. The maximum number of instrument events that this method can upsert per request is 10,000. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.upsert_instrument_events_with_http_info(scope, code, upsert_instrument_event_request, async_req=True) >>> result = thread.get() :param scope: The scope of the corporate action source. (required) :type scope: str :param code: The code of the corporate action source. (required) :type code: str :param upsert_instrument_event_request: The instrument event definitions. :type upsert_instrument_event_request: List[UpsertInstrumentEventRequest] :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(UpsertInstrumentEventsResponse, status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = [ 'scope', 'code', 'upsert_instrument_event_request' ] _all_params.extend( [ 'async_req', '_return_http_data_only', '_preload_content', '_request_timeout', '_request_auth', '_content_type', '_headers' ] ) # validate the arguments for _key, _val in _params['kwargs'].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method upsert_instrument_events" % _key ) _params[_key] = _val del _params['kwargs'] _collection_formats = {} # process the path parameters _path_params = {} if _params['scope']: _path_params['scope'] = _params['scope'] if _params['code']: _path_params['code'] = _params['code'] # process the query parameters _query_params = [] # process the header parameters _header_params = dict(_params.get('_headers', {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None if _params['upsert_instrument_event_request'] is not None: _body_params = _params['upsert_instrument_event_request'] # set the HTTP header `Accept` _header_params['Accept'] = self.api_client.select_header_accept( ['text/plain', 'application/json', 'text/json']) # noqa: E501 # set the HTTP header `Content-Type` _content_types_list = _params.get('_content_type', self.api_client.select_header_content_type( ['application/json-patch+json', 'application/json', 'text/json', 'application/*+json'])) if _content_types_list: _header_params['Content-Type'] = _content_types_list # authentication setting _auth_settings = ['oauth2'] # noqa: E501 _response_types_map = { '201': "UpsertInstrumentEventsResponse", '400': "LusidValidationProblemDetails", } return self.api_client.call_api( '/api/corporateactionsources/{scope}/{code}/instrumentevents', 'POST', _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get('async_req'), _return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501 _preload_content=_params.get('_preload_content', True), _request_timeout=_params.get('_request_timeout'), collection_formats=_collection_formats, _request_auth=_params.get('_request_auth'))