# coding: utf-8
"""
LUSID API
FINBOURNE Technology # noqa: E501
Contact: info@finbourne.com
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
"""
import re # noqa: F401
import io
import warnings
from pydantic.v1 import validate_arguments, ValidationError
from typing import overload, Optional, Union, Awaitable
from typing_extensions import Annotated
from datetime import datetime
from pydantic.v1 import Field, StrictStr, conint, conlist, constr, validator
from typing import Optional
from lusid.models.deleted_entity_response import DeletedEntityResponse
from lusid.models.paged_resource_list_of_participation import PagedResourceListOfParticipation
from lusid.models.participation import Participation
from lusid.models.participation_set_request import ParticipationSetRequest
from lusid.models.resource_list_of_participation import ResourceListOfParticipation
from lusid.api_client import ApiClient
from lusid.api_response import ApiResponse
from lusid.exceptions import ( # noqa: F401
ApiTypeError,
ApiValueError
)
[docs]
class ParticipationsApi:
"""NOTE: This class is auto generated by OpenAPI Generator
Ref: https://openapi-generator.tech
Do not edit the class manually.
"""
def __init__(self, api_client=None) -> None:
if api_client is None:
api_client = ApiClient.get_default()
self.api_client = api_client
@overload
async def delete_participation(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The participation scope.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The participation's code. This, together with the scope uniquely identifies the participation to delete.")], **kwargs) -> DeletedEntityResponse: # noqa: E501
...
@overload
def delete_participation(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The participation scope.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The participation's code. This, together with the scope uniquely identifies the participation to delete.")], async_req: Optional[bool]=True, **kwargs) -> DeletedEntityResponse: # noqa: E501
...
[docs]
@validate_arguments
def delete_participation(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The participation scope.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The participation's code. This, together with the scope uniquely identifies the participation to delete.")], async_req: Optional[bool]=None, **kwargs) -> Union[DeletedEntityResponse, Awaitable[DeletedEntityResponse]]: # noqa: E501
"""[EARLY ACCESS] DeleteParticipation: Delete participation # noqa: E501
Delete an participation. Deletion will be valid from the participation's creation datetime. This means that the participation will no longer exist at any effective datetime from the asAt datetime of deletion. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.delete_participation(scope, code, async_req=True)
>>> result = thread.get()
:param scope: The participation scope. (required)
:type scope: str
:param code: The participation's code. This, together with the scope uniquely identifies the participation to delete. (required)
:type code: str
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: DeletedEntityResponse
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the delete_participation_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.delete_participation_with_http_info(scope, code, **kwargs) # noqa: E501
[docs]
@validate_arguments
def delete_participation_with_http_info(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The participation scope.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The participation's code. This, together with the scope uniquely identifies the participation to delete.")], **kwargs) -> ApiResponse: # noqa: E501
"""[EARLY ACCESS] DeleteParticipation: Delete participation # noqa: E501
Delete an participation. Deletion will be valid from the participation's creation datetime. This means that the participation will no longer exist at any effective datetime from the asAt datetime of deletion. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.delete_participation_with_http_info(scope, code, async_req=True)
>>> result = thread.get()
:param scope: The participation scope. (required)
:type scope: str
:param code: The participation's code. This, together with the scope uniquely identifies the participation to delete. (required)
:type code: str
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(DeletedEntityResponse, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'scope',
'code'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method delete_participation" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
if _params['scope']:
_path_params['scope'] = _params['scope']
if _params['code']:
_path_params['code'] = _params['code']
# process the query parameters
_query_params = []
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "DeletedEntityResponse",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/participations/{scope}/{code}', 'DELETE',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def get_participation(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope to which the participation belongs.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The participation's unique identifier.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the participation. Defaults to return the latest version of the participation if not specified.")] = None, property_keys : Annotated[Optional[conlist(StrictStr)], Field(description="A list of property keys from the \"Participation\" domain to decorate onto the participation. These take the format {domain}/{scope}/{code} e.g. \"Participation/system/Name\".")] = None, **kwargs) -> Participation: # noqa: E501
...
@overload
def get_participation(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope to which the participation belongs.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The participation's unique identifier.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the participation. Defaults to return the latest version of the participation if not specified.")] = None, property_keys : Annotated[Optional[conlist(StrictStr)], Field(description="A list of property keys from the \"Participation\" domain to decorate onto the participation. These take the format {domain}/{scope}/{code} e.g. \"Participation/system/Name\".")] = None, async_req: Optional[bool]=True, **kwargs) -> Participation: # noqa: E501
...
[docs]
@validate_arguments
def get_participation(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope to which the participation belongs.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The participation's unique identifier.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the participation. Defaults to return the latest version of the participation if not specified.")] = None, property_keys : Annotated[Optional[conlist(StrictStr)], Field(description="A list of property keys from the \"Participation\" domain to decorate onto the participation. These take the format {domain}/{scope}/{code} e.g. \"Participation/system/Name\".")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[Participation, Awaitable[Participation]]: # noqa: E501
"""[EARLY ACCESS] GetParticipation: Get Participation # noqa: E501
Fetch a Participation that matches the specified identifier # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.get_participation(scope, code, as_at, property_keys, async_req=True)
>>> result = thread.get()
:param scope: The scope to which the participation belongs. (required)
:type scope: str
:param code: The participation's unique identifier. (required)
:type code: str
:param as_at: The asAt datetime at which to retrieve the participation. Defaults to return the latest version of the participation if not specified.
:type as_at: datetime
:param property_keys: A list of property keys from the \"Participation\" domain to decorate onto the participation. These take the format {domain}/{scope}/{code} e.g. \"Participation/system/Name\".
:type property_keys: List[str]
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: Participation
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the get_participation_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.get_participation_with_http_info(scope, code, as_at, property_keys, **kwargs) # noqa: E501
[docs]
@validate_arguments
def get_participation_with_http_info(self, scope : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The scope to which the participation belongs.")], code : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The participation's unique identifier.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the participation. Defaults to return the latest version of the participation if not specified.")] = None, property_keys : Annotated[Optional[conlist(StrictStr)], Field(description="A list of property keys from the \"Participation\" domain to decorate onto the participation. These take the format {domain}/{scope}/{code} e.g. \"Participation/system/Name\".")] = None, **kwargs) -> ApiResponse: # noqa: E501
"""[EARLY ACCESS] GetParticipation: Get Participation # noqa: E501
Fetch a Participation that matches the specified identifier # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.get_participation_with_http_info(scope, code, as_at, property_keys, async_req=True)
>>> result = thread.get()
:param scope: The scope to which the participation belongs. (required)
:type scope: str
:param code: The participation's unique identifier. (required)
:type code: str
:param as_at: The asAt datetime at which to retrieve the participation. Defaults to return the latest version of the participation if not specified.
:type as_at: datetime
:param property_keys: A list of property keys from the \"Participation\" domain to decorate onto the participation. These take the format {domain}/{scope}/{code} e.g. \"Participation/system/Name\".
:type property_keys: List[str]
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(Participation, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'scope',
'code',
'as_at',
'property_keys'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method get_participation" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
if _params['scope']:
_path_params['scope'] = _params['scope']
if _params['code']:
_path_params['code'] = _params['code']
# process the query parameters
_query_params = []
if _params.get('as_at') is not None: # noqa: E501
if isinstance(_params['as_at'], datetime):
_query_params.append(('asAt', _params['as_at'].strftime(self.api_client.configuration.datetime_format)))
else:
_query_params.append(('asAt', _params['as_at']))
if _params.get('property_keys') is not None: # noqa: E501
_query_params.append(('propertyKeys', _params['property_keys']))
_collection_formats['propertyKeys'] = 'multi'
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "Participation",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/participations/{scope}/{code}', 'GET',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def list_participations(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the participation. Defaults to return the latest version of the participation if not specified.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="The pagination token to use to continue listing participations from a previous call to list participations. This value is returned from the previous call. If a pagination token is provided the sortBy, filter, effectiveAt, and asAt fields must not have changed since the original request.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="A list of field names to sort by, each suffixed by \" ASC\" or \" DESC\"")] = None, limit : Annotated[Optional[conint(strict=True, le=5000, ge=1)], Field(description="When paginating, limit the number of returned results to this many.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Expression to filter the result set. Read more about filtering results from LUSID here: https://support.lusid.com/filtering-results-from-lusid.")] = None, property_keys : Annotated[Optional[conlist(StrictStr)], Field(description="A list of property keys from the \"Participation\" domain to decorate onto each participation. These take the format {domain}/{scope}/{code} e.g. \"Participation/system/Name\".")] = None, **kwargs) -> PagedResourceListOfParticipation: # noqa: E501
...
@overload
def list_participations(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the participation. Defaults to return the latest version of the participation if not specified.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="The pagination token to use to continue listing participations from a previous call to list participations. This value is returned from the previous call. If a pagination token is provided the sortBy, filter, effectiveAt, and asAt fields must not have changed since the original request.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="A list of field names to sort by, each suffixed by \" ASC\" or \" DESC\"")] = None, limit : Annotated[Optional[conint(strict=True, le=5000, ge=1)], Field(description="When paginating, limit the number of returned results to this many.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Expression to filter the result set. Read more about filtering results from LUSID here: https://support.lusid.com/filtering-results-from-lusid.")] = None, property_keys : Annotated[Optional[conlist(StrictStr)], Field(description="A list of property keys from the \"Participation\" domain to decorate onto each participation. These take the format {domain}/{scope}/{code} e.g. \"Participation/system/Name\".")] = None, async_req: Optional[bool]=True, **kwargs) -> PagedResourceListOfParticipation: # noqa: E501
...
[docs]
@validate_arguments
def list_participations(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the participation. Defaults to return the latest version of the participation if not specified.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="The pagination token to use to continue listing participations from a previous call to list participations. This value is returned from the previous call. If a pagination token is provided the sortBy, filter, effectiveAt, and asAt fields must not have changed since the original request.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="A list of field names to sort by, each suffixed by \" ASC\" or \" DESC\"")] = None, limit : Annotated[Optional[conint(strict=True, le=5000, ge=1)], Field(description="When paginating, limit the number of returned results to this many.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Expression to filter the result set. Read more about filtering results from LUSID here: https://support.lusid.com/filtering-results-from-lusid.")] = None, property_keys : Annotated[Optional[conlist(StrictStr)], Field(description="A list of property keys from the \"Participation\" domain to decorate onto each participation. These take the format {domain}/{scope}/{code} e.g. \"Participation/system/Name\".")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[PagedResourceListOfParticipation, Awaitable[PagedResourceListOfParticipation]]: # noqa: E501
"""[EARLY ACCESS] ListParticipations: List Participations # noqa: E501
Fetch the last pre-AsAt date version of each Participation in scope (does not fetch the entire history). # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.list_participations(as_at, page, sort_by, limit, filter, property_keys, async_req=True)
>>> result = thread.get()
:param as_at: The asAt datetime at which to retrieve the participation. Defaults to return the latest version of the participation if not specified.
:type as_at: datetime
:param page: The pagination token to use to continue listing participations from a previous call to list participations. This value is returned from the previous call. If a pagination token is provided the sortBy, filter, effectiveAt, and asAt fields must not have changed since the original request.
:type page: str
:param sort_by: A list of field names to sort by, each suffixed by \" ASC\" or \" DESC\"
:type sort_by: List[str]
:param limit: When paginating, limit the number of returned results to this many.
:type limit: int
:param filter: Expression to filter the result set. Read more about filtering results from LUSID here: https://support.lusid.com/filtering-results-from-lusid.
:type filter: str
:param property_keys: A list of property keys from the \"Participation\" domain to decorate onto each participation. These take the format {domain}/{scope}/{code} e.g. \"Participation/system/Name\".
:type property_keys: List[str]
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: PagedResourceListOfParticipation
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the list_participations_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.list_participations_with_http_info(as_at, page, sort_by, limit, filter, property_keys, **kwargs) # noqa: E501
[docs]
@validate_arguments
def list_participations_with_http_info(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the participation. Defaults to return the latest version of the participation if not specified.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="The pagination token to use to continue listing participations from a previous call to list participations. This value is returned from the previous call. If a pagination token is provided the sortBy, filter, effectiveAt, and asAt fields must not have changed since the original request.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="A list of field names to sort by, each suffixed by \" ASC\" or \" DESC\"")] = None, limit : Annotated[Optional[conint(strict=True, le=5000, ge=1)], Field(description="When paginating, limit the number of returned results to this many.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Expression to filter the result set. Read more about filtering results from LUSID here: https://support.lusid.com/filtering-results-from-lusid.")] = None, property_keys : Annotated[Optional[conlist(StrictStr)], Field(description="A list of property keys from the \"Participation\" domain to decorate onto each participation. These take the format {domain}/{scope}/{code} e.g. \"Participation/system/Name\".")] = None, **kwargs) -> ApiResponse: # noqa: E501
"""[EARLY ACCESS] ListParticipations: List Participations # noqa: E501
Fetch the last pre-AsAt date version of each Participation in scope (does not fetch the entire history). # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.list_participations_with_http_info(as_at, page, sort_by, limit, filter, property_keys, async_req=True)
>>> result = thread.get()
:param as_at: The asAt datetime at which to retrieve the participation. Defaults to return the latest version of the participation if not specified.
:type as_at: datetime
:param page: The pagination token to use to continue listing participations from a previous call to list participations. This value is returned from the previous call. If a pagination token is provided the sortBy, filter, effectiveAt, and asAt fields must not have changed since the original request.
:type page: str
:param sort_by: A list of field names to sort by, each suffixed by \" ASC\" or \" DESC\"
:type sort_by: List[str]
:param limit: When paginating, limit the number of returned results to this many.
:type limit: int
:param filter: Expression to filter the result set. Read more about filtering results from LUSID here: https://support.lusid.com/filtering-results-from-lusid.
:type filter: str
:param property_keys: A list of property keys from the \"Participation\" domain to decorate onto each participation. These take the format {domain}/{scope}/{code} e.g. \"Participation/system/Name\".
:type property_keys: List[str]
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(PagedResourceListOfParticipation, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'as_at',
'page',
'sort_by',
'limit',
'filter',
'property_keys'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method list_participations" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
# process the query parameters
_query_params = []
if _params.get('as_at') is not None: # noqa: E501
if isinstance(_params['as_at'], datetime):
_query_params.append(('asAt', _params['as_at'].strftime(self.api_client.configuration.datetime_format)))
else:
_query_params.append(('asAt', _params['as_at']))
if _params.get('page') is not None: # noqa: E501
_query_params.append(('page', _params['page']))
if _params.get('sort_by') is not None: # noqa: E501
_query_params.append(('sortBy', _params['sort_by']))
_collection_formats['sortBy'] = 'multi'
if _params.get('limit') is not None: # noqa: E501
_query_params.append(('limit', _params['limit']))
if _params.get('filter') is not None: # noqa: E501
_query_params.append(('filter', _params['filter']))
if _params.get('property_keys') is not None: # noqa: E501
_query_params.append(('propertyKeys', _params['property_keys']))
_collection_formats['propertyKeys'] = 'multi'
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "PagedResourceListOfParticipation",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/participations', 'GET',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def upsert_participations(self, participation_set_request : Annotated[Optional[ParticipationSetRequest], Field(description="The collection of participation requests.")] = None, **kwargs) -> ResourceListOfParticipation: # noqa: E501
...
@overload
def upsert_participations(self, participation_set_request : Annotated[Optional[ParticipationSetRequest], Field(description="The collection of participation requests.")] = None, async_req: Optional[bool]=True, **kwargs) -> ResourceListOfParticipation: # noqa: E501
...
[docs]
@validate_arguments
def upsert_participations(self, participation_set_request : Annotated[Optional[ParticipationSetRequest], Field(description="The collection of participation requests.")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[ResourceListOfParticipation, Awaitable[ResourceListOfParticipation]]: # noqa: E501
"""[EARLY ACCESS] UpsertParticipations: Upsert Participation # noqa: E501
Upsert; update existing participations with given ids, or create new participations otherwise. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.upsert_participations(participation_set_request, async_req=True)
>>> result = thread.get()
:param participation_set_request: The collection of participation requests.
:type participation_set_request: ParticipationSetRequest
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: ResourceListOfParticipation
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the upsert_participations_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.upsert_participations_with_http_info(participation_set_request, **kwargs) # noqa: E501
[docs]
@validate_arguments
def upsert_participations_with_http_info(self, participation_set_request : Annotated[Optional[ParticipationSetRequest], Field(description="The collection of participation requests.")] = None, **kwargs) -> ApiResponse: # noqa: E501
"""[EARLY ACCESS] UpsertParticipations: Upsert Participation # noqa: E501
Upsert; update existing participations with given ids, or create new participations otherwise. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.upsert_participations_with_http_info(participation_set_request, async_req=True)
>>> result = thread.get()
:param participation_set_request: The collection of participation requests.
:type participation_set_request: ParticipationSetRequest
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(ResourceListOfParticipation, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'participation_set_request'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method upsert_participations" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
# process the query parameters
_query_params = []
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
if _params['participation_set_request'] is not None:
_body_params = _params['participation_set_request']
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# set the HTTP header `Content-Type`
_content_types_list = _params.get('_content_type',
self.api_client.select_header_content_type(
['application/json-patch+json', 'application/json', 'text/json', 'application/*+json']))
if _content_types_list:
_header_params['Content-Type'] = _content_types_list
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'201': "ResourceListOfParticipation",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/participations', 'POST',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))