# coding: utf-8
"""
LUSID API
FINBOURNE Technology # noqa: E501
Contact: info@finbourne.com
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
"""
import re # noqa: F401
import io
import warnings
from pydantic.v1 import validate_arguments, ValidationError
from typing import overload, Optional, Union, Awaitable
from typing_extensions import Annotated
from datetime import datetime
from pydantic.v1 import Field, StrictStr, conint, conlist, constr, validator
from typing import Optional
from lusid.models.paged_resource_list_of_staged_modification import PagedResourceListOfStagedModification
from lusid.models.paged_resource_list_of_staged_modifications_requested_change_interval import PagedResourceListOfStagedModificationsRequestedChangeInterval
from lusid.models.staged_modification import StagedModification
from lusid.models.staged_modification_decision_request import StagedModificationDecisionRequest
from lusid.api_client import ApiClient
from lusid.api_response import ApiResponse
from lusid.exceptions import ( # noqa: F401
ApiTypeError,
ApiValueError
)
[docs]
class StagedModificationsApi:
"""NOTE: This class is auto generated by OpenAPI Generator
Ref: https://openapi-generator.tech
Do not edit the class manually.
"""
def __init__(self, api_client=None) -> None:
if api_client is None:
api_client = ApiClient.get_default()
self.api_client = api_client
@overload
async def add_decision(self, id : Annotated[constr(strict=True, max_length=40, min_length=30), Field(..., description="Unique Id for a staged modification..")], staged_modification_decision_request : Annotated[StagedModificationDecisionRequest, Field(..., description="The decision on the requested staged modification, \"Approve\" or \"Reject\".")], **kwargs) -> StagedModification: # noqa: E501
...
@overload
def add_decision(self, id : Annotated[constr(strict=True, max_length=40, min_length=30), Field(..., description="Unique Id for a staged modification..")], staged_modification_decision_request : Annotated[StagedModificationDecisionRequest, Field(..., description="The decision on the requested staged modification, \"Approve\" or \"Reject\".")], async_req: Optional[bool]=True, **kwargs) -> StagedModification: # noqa: E501
...
[docs]
@validate_arguments
def add_decision(self, id : Annotated[constr(strict=True, max_length=40, min_length=30), Field(..., description="Unique Id for a staged modification..")], staged_modification_decision_request : Annotated[StagedModificationDecisionRequest, Field(..., description="The decision on the requested staged modification, \"Approve\" or \"Reject\".")], async_req: Optional[bool]=None, **kwargs) -> Union[StagedModification, Awaitable[StagedModification]]: # noqa: E501
"""[EXPERIMENTAL] AddDecision: AddDecision # noqa: E501
Add decision to staged modification, Approve or Reject. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.add_decision(id, staged_modification_decision_request, async_req=True)
>>> result = thread.get()
:param id: Unique Id for a staged modification.. (required)
:type id: str
:param staged_modification_decision_request: The decision on the requested staged modification, \"Approve\" or \"Reject\". (required)
:type staged_modification_decision_request: StagedModificationDecisionRequest
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: StagedModification
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the add_decision_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.add_decision_with_http_info(id, staged_modification_decision_request, **kwargs) # noqa: E501
[docs]
@validate_arguments
def add_decision_with_http_info(self, id : Annotated[constr(strict=True, max_length=40, min_length=30), Field(..., description="Unique Id for a staged modification..")], staged_modification_decision_request : Annotated[StagedModificationDecisionRequest, Field(..., description="The decision on the requested staged modification, \"Approve\" or \"Reject\".")], **kwargs) -> ApiResponse: # noqa: E501
"""[EXPERIMENTAL] AddDecision: AddDecision # noqa: E501
Add decision to staged modification, Approve or Reject. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.add_decision_with_http_info(id, staged_modification_decision_request, async_req=True)
>>> result = thread.get()
:param id: Unique Id for a staged modification.. (required)
:type id: str
:param staged_modification_decision_request: The decision on the requested staged modification, \"Approve\" or \"Reject\". (required)
:type staged_modification_decision_request: StagedModificationDecisionRequest
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(StagedModification, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'id',
'staged_modification_decision_request'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method add_decision" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
if _params['id']:
_path_params['id'] = _params['id']
# process the query parameters
_query_params = []
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
if _params['staged_modification_decision_request'] is not None:
_body_params = _params['staged_modification_decision_request']
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# set the HTTP header `Content-Type`
_content_types_list = _params.get('_content_type',
self.api_client.select_header_content_type(
['application/json-patch+json', 'application/json', 'text/json', 'application/*+json']))
if _content_types_list:
_header_params['Content-Type'] = _content_types_list
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "StagedModification",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/stagedmodifications/{id}/decision', 'POST',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def get_staged_modification(self, id : Annotated[constr(strict=True, max_length=40, min_length=30), Field(..., description="The unique identifier for a staged modification.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the staged modification. Defaults to latest if not specified.")] = None, **kwargs) -> StagedModification: # noqa: E501
...
@overload
def get_staged_modification(self, id : Annotated[constr(strict=True, max_length=40, min_length=30), Field(..., description="The unique identifier for a staged modification.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the staged modification. Defaults to latest if not specified.")] = None, async_req: Optional[bool]=True, **kwargs) -> StagedModification: # noqa: E501
...
[docs]
@validate_arguments
def get_staged_modification(self, id : Annotated[constr(strict=True, max_length=40, min_length=30), Field(..., description="The unique identifier for a staged modification.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the staged modification. Defaults to latest if not specified.")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[StagedModification, Awaitable[StagedModification]]: # noqa: E501
"""[EXPERIMENTAL] GetStagedModification: GetStagedModification # noqa: E501
Retrieve the details of a staged modification. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.get_staged_modification(id, as_at, async_req=True)
>>> result = thread.get()
:param id: The unique identifier for a staged modification. (required)
:type id: str
:param as_at: The asAt datetime at which to retrieve the staged modification. Defaults to latest if not specified.
:type as_at: datetime
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: StagedModification
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the get_staged_modification_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.get_staged_modification_with_http_info(id, as_at, **kwargs) # noqa: E501
[docs]
@validate_arguments
def get_staged_modification_with_http_info(self, id : Annotated[constr(strict=True, max_length=40, min_length=30), Field(..., description="The unique identifier for a staged modification.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the staged modification. Defaults to latest if not specified.")] = None, **kwargs) -> ApiResponse: # noqa: E501
"""[EXPERIMENTAL] GetStagedModification: GetStagedModification # noqa: E501
Retrieve the details of a staged modification. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.get_staged_modification_with_http_info(id, as_at, async_req=True)
>>> result = thread.get()
:param id: The unique identifier for a staged modification. (required)
:type id: str
:param as_at: The asAt datetime at which to retrieve the staged modification. Defaults to latest if not specified.
:type as_at: datetime
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(StagedModification, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'id',
'as_at'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method get_staged_modification" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
if _params['id']:
_path_params['id'] = _params['id']
# process the query parameters
_query_params = []
if _params.get('as_at') is not None: # noqa: E501
if isinstance(_params['as_at'], datetime):
_query_params.append(('asAt', _params['as_at'].strftime(self.api_client.configuration.datetime_format)))
else:
_query_params.append(('asAt', _params['as_at']))
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "StagedModification",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/stagedmodifications/{id}', 'GET',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def list_requested_changes(self, id : Annotated[constr(strict=True, max_length=40, min_length=30), Field(..., description="Unique Id for a staged modification..")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to list changes. Defaults to return the latest version of each staged change if not specified.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="The pagination token to use to continue listing requested staged modification changes from a previous call to list requested staged modifications. This value is returned from the previous call. If a pagination token is provided the filter, effectiveAt and asAt fields must not have changed since the original request.")] = None, limit : Annotated[Optional[conint(strict=True, le=5000, ge=1)], Field(description="When paginating, limit the number of returned results to this many. Defaults to 100 if not specified.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Expression to filter the result set. Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="A list of field names suffixed by \" ASC\" or \" DESC\"")] = None, **kwargs) -> PagedResourceListOfStagedModificationsRequestedChangeInterval: # noqa: E501
...
@overload
def list_requested_changes(self, id : Annotated[constr(strict=True, max_length=40, min_length=30), Field(..., description="Unique Id for a staged modification..")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to list changes. Defaults to return the latest version of each staged change if not specified.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="The pagination token to use to continue listing requested staged modification changes from a previous call to list requested staged modifications. This value is returned from the previous call. If a pagination token is provided the filter, effectiveAt and asAt fields must not have changed since the original request.")] = None, limit : Annotated[Optional[conint(strict=True, le=5000, ge=1)], Field(description="When paginating, limit the number of returned results to this many. Defaults to 100 if not specified.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Expression to filter the result set. Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="A list of field names suffixed by \" ASC\" or \" DESC\"")] = None, async_req: Optional[bool]=True, **kwargs) -> PagedResourceListOfStagedModificationsRequestedChangeInterval: # noqa: E501
...
[docs]
@validate_arguments
def list_requested_changes(self, id : Annotated[constr(strict=True, max_length=40, min_length=30), Field(..., description="Unique Id for a staged modification..")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to list changes. Defaults to return the latest version of each staged change if not specified.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="The pagination token to use to continue listing requested staged modification changes from a previous call to list requested staged modifications. This value is returned from the previous call. If a pagination token is provided the filter, effectiveAt and asAt fields must not have changed since the original request.")] = None, limit : Annotated[Optional[conint(strict=True, le=5000, ge=1)], Field(description="When paginating, limit the number of returned results to this many. Defaults to 100 if not specified.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Expression to filter the result set. Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="A list of field names suffixed by \" ASC\" or \" DESC\"")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[PagedResourceListOfStagedModificationsRequestedChangeInterval, Awaitable[PagedResourceListOfStagedModificationsRequestedChangeInterval]]: # noqa: E501
"""[EXPERIMENTAL] ListRequestedChanges: ListRequestedChanges # noqa: E501
List the requested changes for a staged modification. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.list_requested_changes(id, as_at, page, limit, filter, sort_by, async_req=True)
>>> result = thread.get()
:param id: Unique Id for a staged modification.. (required)
:type id: str
:param as_at: The asAt datetime at which to list changes. Defaults to return the latest version of each staged change if not specified.
:type as_at: datetime
:param page: The pagination token to use to continue listing requested staged modification changes from a previous call to list requested staged modifications. This value is returned from the previous call. If a pagination token is provided the filter, effectiveAt and asAt fields must not have changed since the original request.
:type page: str
:param limit: When paginating, limit the number of returned results to this many. Defaults to 100 if not specified.
:type limit: int
:param filter: Expression to filter the result set. Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid.
:type filter: str
:param sort_by: A list of field names suffixed by \" ASC\" or \" DESC\"
:type sort_by: List[str]
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: PagedResourceListOfStagedModificationsRequestedChangeInterval
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the list_requested_changes_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.list_requested_changes_with_http_info(id, as_at, page, limit, filter, sort_by, **kwargs) # noqa: E501
[docs]
@validate_arguments
def list_requested_changes_with_http_info(self, id : Annotated[constr(strict=True, max_length=40, min_length=30), Field(..., description="Unique Id for a staged modification..")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to list changes. Defaults to return the latest version of each staged change if not specified.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="The pagination token to use to continue listing requested staged modification changes from a previous call to list requested staged modifications. This value is returned from the previous call. If a pagination token is provided the filter, effectiveAt and asAt fields must not have changed since the original request.")] = None, limit : Annotated[Optional[conint(strict=True, le=5000, ge=1)], Field(description="When paginating, limit the number of returned results to this many. Defaults to 100 if not specified.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Expression to filter the result set. Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="A list of field names suffixed by \" ASC\" or \" DESC\"")] = None, **kwargs) -> ApiResponse: # noqa: E501
"""[EXPERIMENTAL] ListRequestedChanges: ListRequestedChanges # noqa: E501
List the requested changes for a staged modification. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.list_requested_changes_with_http_info(id, as_at, page, limit, filter, sort_by, async_req=True)
>>> result = thread.get()
:param id: Unique Id for a staged modification.. (required)
:type id: str
:param as_at: The asAt datetime at which to list changes. Defaults to return the latest version of each staged change if not specified.
:type as_at: datetime
:param page: The pagination token to use to continue listing requested staged modification changes from a previous call to list requested staged modifications. This value is returned from the previous call. If a pagination token is provided the filter, effectiveAt and asAt fields must not have changed since the original request.
:type page: str
:param limit: When paginating, limit the number of returned results to this many. Defaults to 100 if not specified.
:type limit: int
:param filter: Expression to filter the result set. Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid.
:type filter: str
:param sort_by: A list of field names suffixed by \" ASC\" or \" DESC\"
:type sort_by: List[str]
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(PagedResourceListOfStagedModificationsRequestedChangeInterval, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'id',
'as_at',
'page',
'limit',
'filter',
'sort_by'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method list_requested_changes" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
if _params['id']:
_path_params['id'] = _params['id']
# process the query parameters
_query_params = []
if _params.get('as_at') is not None: # noqa: E501
if isinstance(_params['as_at'], datetime):
_query_params.append(('asAt', _params['as_at'].strftime(self.api_client.configuration.datetime_format)))
else:
_query_params.append(('asAt', _params['as_at']))
if _params.get('page') is not None: # noqa: E501
_query_params.append(('page', _params['page']))
if _params.get('limit') is not None: # noqa: E501
_query_params.append(('limit', _params['limit']))
if _params.get('filter') is not None: # noqa: E501
_query_params.append(('filter', _params['filter']))
if _params.get('sort_by') is not None: # noqa: E501
_query_params.append(('sortBy', _params['sort_by']))
_collection_formats['sortBy'] = 'multi'
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "PagedResourceListOfStagedModificationsRequestedChangeInterval",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/stagedmodifications/{id}/requestedChanges', 'GET',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def list_staged_modifications(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to list staged modifications. Defaults to return the latest version of each staged modification if not specified.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="The pagination token to use to continue listing staged modifications from a previous call to list staged modifications. This value is returned from the previous call. If a pagination token is provided the filter, effectiveAt and asAt fields must not have changed since the original request.")] = None, limit : Annotated[Optional[conint(strict=True, le=5000, ge=1)], Field(description="When paginating, limit the number of returned results to this many. Defaults to 100 if not specified.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Expression to filter the result set. Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="A list of field names suffixed by \" ASC\" or \" DESC\"")] = None, **kwargs) -> PagedResourceListOfStagedModification: # noqa: E501
...
@overload
def list_staged_modifications(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to list staged modifications. Defaults to return the latest version of each staged modification if not specified.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="The pagination token to use to continue listing staged modifications from a previous call to list staged modifications. This value is returned from the previous call. If a pagination token is provided the filter, effectiveAt and asAt fields must not have changed since the original request.")] = None, limit : Annotated[Optional[conint(strict=True, le=5000, ge=1)], Field(description="When paginating, limit the number of returned results to this many. Defaults to 100 if not specified.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Expression to filter the result set. Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="A list of field names suffixed by \" ASC\" or \" DESC\"")] = None, async_req: Optional[bool]=True, **kwargs) -> PagedResourceListOfStagedModification: # noqa: E501
...
[docs]
@validate_arguments
def list_staged_modifications(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to list staged modifications. Defaults to return the latest version of each staged modification if not specified.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="The pagination token to use to continue listing staged modifications from a previous call to list staged modifications. This value is returned from the previous call. If a pagination token is provided the filter, effectiveAt and asAt fields must not have changed since the original request.")] = None, limit : Annotated[Optional[conint(strict=True, le=5000, ge=1)], Field(description="When paginating, limit the number of returned results to this many. Defaults to 100 if not specified.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Expression to filter the result set. Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="A list of field names suffixed by \" ASC\" or \" DESC\"")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[PagedResourceListOfStagedModification, Awaitable[PagedResourceListOfStagedModification]]: # noqa: E501
"""[EXPERIMENTAL] ListStagedModifications: ListStagedModifications # noqa: E501
List summaries of the staged modifications. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.list_staged_modifications(as_at, page, limit, filter, sort_by, async_req=True)
>>> result = thread.get()
:param as_at: The asAt datetime at which to list staged modifications. Defaults to return the latest version of each staged modification if not specified.
:type as_at: datetime
:param page: The pagination token to use to continue listing staged modifications from a previous call to list staged modifications. This value is returned from the previous call. If a pagination token is provided the filter, effectiveAt and asAt fields must not have changed since the original request.
:type page: str
:param limit: When paginating, limit the number of returned results to this many. Defaults to 100 if not specified.
:type limit: int
:param filter: Expression to filter the result set. Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid.
:type filter: str
:param sort_by: A list of field names suffixed by \" ASC\" or \" DESC\"
:type sort_by: List[str]
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: PagedResourceListOfStagedModification
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the list_staged_modifications_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.list_staged_modifications_with_http_info(as_at, page, limit, filter, sort_by, **kwargs) # noqa: E501
[docs]
@validate_arguments
def list_staged_modifications_with_http_info(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to list staged modifications. Defaults to return the latest version of each staged modification if not specified.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="The pagination token to use to continue listing staged modifications from a previous call to list staged modifications. This value is returned from the previous call. If a pagination token is provided the filter, effectiveAt and asAt fields must not have changed since the original request.")] = None, limit : Annotated[Optional[conint(strict=True, le=5000, ge=1)], Field(description="When paginating, limit the number of returned results to this many. Defaults to 100 if not specified.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Expression to filter the result set. Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="A list of field names suffixed by \" ASC\" or \" DESC\"")] = None, **kwargs) -> ApiResponse: # noqa: E501
"""[EXPERIMENTAL] ListStagedModifications: ListStagedModifications # noqa: E501
List summaries of the staged modifications. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.list_staged_modifications_with_http_info(as_at, page, limit, filter, sort_by, async_req=True)
>>> result = thread.get()
:param as_at: The asAt datetime at which to list staged modifications. Defaults to return the latest version of each staged modification if not specified.
:type as_at: datetime
:param page: The pagination token to use to continue listing staged modifications from a previous call to list staged modifications. This value is returned from the previous call. If a pagination token is provided the filter, effectiveAt and asAt fields must not have changed since the original request.
:type page: str
:param limit: When paginating, limit the number of returned results to this many. Defaults to 100 if not specified.
:type limit: int
:param filter: Expression to filter the result set. Read more about filtering results from LUSID here https://support.lusid.com/filtering-results-from-lusid.
:type filter: str
:param sort_by: A list of field names suffixed by \" ASC\" or \" DESC\"
:type sort_by: List[str]
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(PagedResourceListOfStagedModification, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'as_at',
'page',
'limit',
'filter',
'sort_by'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method list_staged_modifications" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
# process the query parameters
_query_params = []
if _params.get('as_at') is not None: # noqa: E501
if isinstance(_params['as_at'], datetime):
_query_params.append(('asAt', _params['as_at'].strftime(self.api_client.configuration.datetime_format)))
else:
_query_params.append(('asAt', _params['as_at']))
if _params.get('page') is not None: # noqa: E501
_query_params.append(('page', _params['page']))
if _params.get('limit') is not None: # noqa: E501
_query_params.append(('limit', _params['limit']))
if _params.get('filter') is not None: # noqa: E501
_query_params.append(('filter', _params['filter']))
if _params.get('sort_by') is not None: # noqa: E501
_query_params.append(('sortBy', _params['sort_by']))
_collection_formats['sortBy'] = 'multi'
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "PagedResourceListOfStagedModification",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/stagedmodifications', 'GET',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))