# coding: utf-8
"""
LUSID API
FINBOURNE Technology # noqa: E501
Contact: info@finbourne.com
Generated by OpenAPI Generator (https://openapi-generator.tech)
Do not edit the class manually.
"""
import re # noqa: F401
import io
import warnings
from pydantic.v1 import validate_arguments, ValidationError
from typing import overload, Optional, Union, Awaitable
from typing_extensions import Annotated
from datetime import datetime
from pydantic.v1 import Field, StrictStr, conint, conlist, constr, validator
from typing import Optional
from lusid.models.create_staging_rule_set_request import CreateStagingRuleSetRequest
from lusid.models.deleted_entity_response import DeletedEntityResponse
from lusid.models.paged_resource_list_of_staging_rule_set import PagedResourceListOfStagingRuleSet
from lusid.models.staging_rule_set import StagingRuleSet
from lusid.models.update_staging_rule_set_request import UpdateStagingRuleSetRequest
from lusid.api_client import ApiClient
from lusid.api_response import ApiResponse
from lusid.exceptions import ( # noqa: F401
ApiTypeError,
ApiValueError
)
[docs]
class StagingRuleSetApi:
"""NOTE: This class is auto generated by OpenAPI Generator
Ref: https://openapi-generator.tech
Do not edit the class manually.
"""
def __init__(self, api_client=None) -> None:
if api_client is None:
api_client = ApiClient.get_default()
self.api_client = api_client
@overload
async def create_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to create the staging rule set.")], create_staging_rule_set_request : Annotated[CreateStagingRuleSetRequest, Field(..., description="Request to create a staging rule set.")], **kwargs) -> StagingRuleSet: # noqa: E501
...
@overload
def create_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to create the staging rule set.")], create_staging_rule_set_request : Annotated[CreateStagingRuleSetRequest, Field(..., description="Request to create a staging rule set.")], async_req: Optional[bool]=True, **kwargs) -> StagingRuleSet: # noqa: E501
...
[docs]
@validate_arguments
def create_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to create the staging rule set.")], create_staging_rule_set_request : Annotated[CreateStagingRuleSetRequest, Field(..., description="Request to create a staging rule set.")], async_req: Optional[bool]=None, **kwargs) -> Union[StagingRuleSet, Awaitable[StagingRuleSet]]: # noqa: E501
"""[EXPERIMENTAL] CreateStagingRuleSet: Create a StagingRuleSet # noqa: E501
Create a new staging rule set. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.create_staging_rule_set(entity_type, create_staging_rule_set_request, async_req=True)
>>> result = thread.get()
:param entity_type: The entity type for which to create the staging rule set. (required)
:type entity_type: str
:param create_staging_rule_set_request: Request to create a staging rule set. (required)
:type create_staging_rule_set_request: CreateStagingRuleSetRequest
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: StagingRuleSet
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the create_staging_rule_set_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.create_staging_rule_set_with_http_info(entity_type, create_staging_rule_set_request, **kwargs) # noqa: E501
[docs]
@validate_arguments
def create_staging_rule_set_with_http_info(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to create the staging rule set.")], create_staging_rule_set_request : Annotated[CreateStagingRuleSetRequest, Field(..., description="Request to create a staging rule set.")], **kwargs) -> ApiResponse: # noqa: E501
"""[EXPERIMENTAL] CreateStagingRuleSet: Create a StagingRuleSet # noqa: E501
Create a new staging rule set. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.create_staging_rule_set_with_http_info(entity_type, create_staging_rule_set_request, async_req=True)
>>> result = thread.get()
:param entity_type: The entity type for which to create the staging rule set. (required)
:type entity_type: str
:param create_staging_rule_set_request: Request to create a staging rule set. (required)
:type create_staging_rule_set_request: CreateStagingRuleSetRequest
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(StagingRuleSet, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'entity_type',
'create_staging_rule_set_request'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method create_staging_rule_set" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
if _params['entity_type']:
_path_params['entityType'] = _params['entity_type']
# process the query parameters
_query_params = []
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
if _params['create_staging_rule_set_request'] is not None:
_body_params = _params['create_staging_rule_set_request']
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# set the HTTP header `Content-Type`
_content_types_list = _params.get('_content_type',
self.api_client.select_header_content_type(
['application/json-patch+json', 'application/json', 'text/json', 'application/*+json']))
if _content_types_list:
_header_params['Content-Type'] = _content_types_list
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'201': "StagingRuleSet",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/stagingrulesets/{entityType}', 'POST',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def delete_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to delete the staging rule set.")], **kwargs) -> DeletedEntityResponse: # noqa: E501
...
@overload
def delete_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to delete the staging rule set.")], async_req: Optional[bool]=True, **kwargs) -> DeletedEntityResponse: # noqa: E501
...
[docs]
@validate_arguments
def delete_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to delete the staging rule set.")], async_req: Optional[bool]=None, **kwargs) -> Union[DeletedEntityResponse, Awaitable[DeletedEntityResponse]]: # noqa: E501
"""[EXPERIMENTAL] DeleteStagingRuleSet: Delete a StagingRuleSet # noqa: E501
Delete a staging rule set of a specific entity type. Deletion will be valid from the staging rule set's creation datetime. This means that the staging rule set will no longer exist at any effective datetime from the asAt datetime of deletion. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.delete_staging_rule_set(entity_type, async_req=True)
>>> result = thread.get()
:param entity_type: The entity type for which to delete the staging rule set. (required)
:type entity_type: str
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: DeletedEntityResponse
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the delete_staging_rule_set_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.delete_staging_rule_set_with_http_info(entity_type, **kwargs) # noqa: E501
[docs]
@validate_arguments
def delete_staging_rule_set_with_http_info(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to delete the staging rule set.")], **kwargs) -> ApiResponse: # noqa: E501
"""[EXPERIMENTAL] DeleteStagingRuleSet: Delete a StagingRuleSet # noqa: E501
Delete a staging rule set of a specific entity type. Deletion will be valid from the staging rule set's creation datetime. This means that the staging rule set will no longer exist at any effective datetime from the asAt datetime of deletion. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.delete_staging_rule_set_with_http_info(entity_type, async_req=True)
>>> result = thread.get()
:param entity_type: The entity type for which to delete the staging rule set. (required)
:type entity_type: str
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(DeletedEntityResponse, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'entity_type'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method delete_staging_rule_set" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
if _params['entity_type']:
_path_params['entityType'] = _params['entity_type']
# process the query parameters
_query_params = []
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "DeletedEntityResponse",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/stagingrulesets/{entityType}', 'DELETE',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def get_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to retrieve the staging rule set.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the staging rule set. Defaults to return the latest version of the staging rule set if not specified.")] = None, **kwargs) -> StagingRuleSet: # noqa: E501
...
@overload
def get_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to retrieve the staging rule set.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the staging rule set. Defaults to return the latest version of the staging rule set if not specified.")] = None, async_req: Optional[bool]=True, **kwargs) -> StagingRuleSet: # noqa: E501
...
[docs]
@validate_arguments
def get_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to retrieve the staging rule set.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the staging rule set. Defaults to return the latest version of the staging rule set if not specified.")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[StagingRuleSet, Awaitable[StagingRuleSet]]: # noqa: E501
"""[EXPERIMENTAL] GetStagingRuleSet: Get a StagingRuleSet # noqa: E501
Get the staging rule set for the given entity type at the specific asAt time. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.get_staging_rule_set(entity_type, as_at, async_req=True)
>>> result = thread.get()
:param entity_type: The entity type for which to retrieve the staging rule set. (required)
:type entity_type: str
:param as_at: The asAt datetime at which to retrieve the staging rule set. Defaults to return the latest version of the staging rule set if not specified.
:type as_at: datetime
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: StagingRuleSet
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the get_staging_rule_set_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.get_staging_rule_set_with_http_info(entity_type, as_at, **kwargs) # noqa: E501
[docs]
@validate_arguments
def get_staging_rule_set_with_http_info(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to retrieve the staging rule set.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the staging rule set. Defaults to return the latest version of the staging rule set if not specified.")] = None, **kwargs) -> ApiResponse: # noqa: E501
"""[EXPERIMENTAL] GetStagingRuleSet: Get a StagingRuleSet # noqa: E501
Get the staging rule set for the given entity type at the specific asAt time. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.get_staging_rule_set_with_http_info(entity_type, as_at, async_req=True)
>>> result = thread.get()
:param entity_type: The entity type for which to retrieve the staging rule set. (required)
:type entity_type: str
:param as_at: The asAt datetime at which to retrieve the staging rule set. Defaults to return the latest version of the staging rule set if not specified.
:type as_at: datetime
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(StagingRuleSet, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'entity_type',
'as_at'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method get_staging_rule_set" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
if _params['entity_type']:
_path_params['entityType'] = _params['entity_type']
# process the query parameters
_query_params = []
if _params.get('as_at') is not None: # noqa: E501
if isinstance(_params['as_at'], datetime):
_query_params.append(('asAt', _params['as_at'].strftime(self.api_client.configuration.datetime_format)))
else:
_query_params.append(('asAt', _params['as_at']))
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "StagingRuleSet",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/stagingrulesets/{entityType}', 'GET',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def list_staging_rule_sets(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the staging rule sets. Defaults to return the latest version of the staging rule sets if not specified.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="The pagination token to use to continue listing staging rule sets from a previous call to list staging rule sets. This value is returned from the previous call. If a pagination token is provided the sortBy, filter, effectiveAt, and asAt fields must not have changed since the original request.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="A list of field names to sort by, each suffixed by \" ASC\" or \" DESC\"")] = None, limit : Annotated[Optional[conint(strict=True, le=5000, ge=1)], Field(description="When paginating, limit the number of returned results to this many.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Expression to filter the result set. Read more about filtering results from LUSID here: https://support.lusid.com/filtering-results-from-lusid.")] = None, **kwargs) -> PagedResourceListOfStagingRuleSet: # noqa: E501
...
@overload
def list_staging_rule_sets(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the staging rule sets. Defaults to return the latest version of the staging rule sets if not specified.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="The pagination token to use to continue listing staging rule sets from a previous call to list staging rule sets. This value is returned from the previous call. If a pagination token is provided the sortBy, filter, effectiveAt, and asAt fields must not have changed since the original request.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="A list of field names to sort by, each suffixed by \" ASC\" or \" DESC\"")] = None, limit : Annotated[Optional[conint(strict=True, le=5000, ge=1)], Field(description="When paginating, limit the number of returned results to this many.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Expression to filter the result set. Read more about filtering results from LUSID here: https://support.lusid.com/filtering-results-from-lusid.")] = None, async_req: Optional[bool]=True, **kwargs) -> PagedResourceListOfStagingRuleSet: # noqa: E501
...
[docs]
@validate_arguments
def list_staging_rule_sets(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the staging rule sets. Defaults to return the latest version of the staging rule sets if not specified.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="The pagination token to use to continue listing staging rule sets from a previous call to list staging rule sets. This value is returned from the previous call. If a pagination token is provided the sortBy, filter, effectiveAt, and asAt fields must not have changed since the original request.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="A list of field names to sort by, each suffixed by \" ASC\" or \" DESC\"")] = None, limit : Annotated[Optional[conint(strict=True, le=5000, ge=1)], Field(description="When paginating, limit the number of returned results to this many.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Expression to filter the result set. Read more about filtering results from LUSID here: https://support.lusid.com/filtering-results-from-lusid.")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[PagedResourceListOfStagingRuleSet, Awaitable[PagedResourceListOfStagingRuleSet]]: # noqa: E501
"""[EXPERIMENTAL] ListStagingRuleSets: List StagingRuleSets # noqa: E501
List all the staging rule sets matching particular criteria. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.list_staging_rule_sets(as_at, page, sort_by, limit, filter, async_req=True)
>>> result = thread.get()
:param as_at: The asAt datetime at which to retrieve the staging rule sets. Defaults to return the latest version of the staging rule sets if not specified.
:type as_at: datetime
:param page: The pagination token to use to continue listing staging rule sets from a previous call to list staging rule sets. This value is returned from the previous call. If a pagination token is provided the sortBy, filter, effectiveAt, and asAt fields must not have changed since the original request.
:type page: str
:param sort_by: A list of field names to sort by, each suffixed by \" ASC\" or \" DESC\"
:type sort_by: List[str]
:param limit: When paginating, limit the number of returned results to this many.
:type limit: int
:param filter: Expression to filter the result set. Read more about filtering results from LUSID here: https://support.lusid.com/filtering-results-from-lusid.
:type filter: str
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: PagedResourceListOfStagingRuleSet
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the list_staging_rule_sets_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.list_staging_rule_sets_with_http_info(as_at, page, sort_by, limit, filter, **kwargs) # noqa: E501
[docs]
@validate_arguments
def list_staging_rule_sets_with_http_info(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the staging rule sets. Defaults to return the latest version of the staging rule sets if not specified.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="The pagination token to use to continue listing staging rule sets from a previous call to list staging rule sets. This value is returned from the previous call. If a pagination token is provided the sortBy, filter, effectiveAt, and asAt fields must not have changed since the original request.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="A list of field names to sort by, each suffixed by \" ASC\" or \" DESC\"")] = None, limit : Annotated[Optional[conint(strict=True, le=5000, ge=1)], Field(description="When paginating, limit the number of returned results to this many.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Expression to filter the result set. Read more about filtering results from LUSID here: https://support.lusid.com/filtering-results-from-lusid.")] = None, **kwargs) -> ApiResponse: # noqa: E501
"""[EXPERIMENTAL] ListStagingRuleSets: List StagingRuleSets # noqa: E501
List all the staging rule sets matching particular criteria. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.list_staging_rule_sets_with_http_info(as_at, page, sort_by, limit, filter, async_req=True)
>>> result = thread.get()
:param as_at: The asAt datetime at which to retrieve the staging rule sets. Defaults to return the latest version of the staging rule sets if not specified.
:type as_at: datetime
:param page: The pagination token to use to continue listing staging rule sets from a previous call to list staging rule sets. This value is returned from the previous call. If a pagination token is provided the sortBy, filter, effectiveAt, and asAt fields must not have changed since the original request.
:type page: str
:param sort_by: A list of field names to sort by, each suffixed by \" ASC\" or \" DESC\"
:type sort_by: List[str]
:param limit: When paginating, limit the number of returned results to this many.
:type limit: int
:param filter: Expression to filter the result set. Read more about filtering results from LUSID here: https://support.lusid.com/filtering-results-from-lusid.
:type filter: str
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(PagedResourceListOfStagingRuleSet, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'as_at',
'page',
'sort_by',
'limit',
'filter'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method list_staging_rule_sets" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
# process the query parameters
_query_params = []
if _params.get('as_at') is not None: # noqa: E501
if isinstance(_params['as_at'], datetime):
_query_params.append(('asAt', _params['as_at'].strftime(self.api_client.configuration.datetime_format)))
else:
_query_params.append(('asAt', _params['as_at']))
if _params.get('page') is not None: # noqa: E501
_query_params.append(('page', _params['page']))
if _params.get('sort_by') is not None: # noqa: E501
_query_params.append(('sortBy', _params['sort_by']))
_collection_formats['sortBy'] = 'multi'
if _params.get('limit') is not None: # noqa: E501
_query_params.append(('limit', _params['limit']))
if _params.get('filter') is not None: # noqa: E501
_query_params.append(('filter', _params['filter']))
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "PagedResourceListOfStagingRuleSet",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/stagingrulesets', 'GET',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))
@overload
async def update_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to update the staging rule set.")], update_staging_rule_set_request : Annotated[UpdateStagingRuleSetRequest, Field(..., description="Request to update a staging rule set.")], **kwargs) -> StagingRuleSet: # noqa: E501
...
@overload
def update_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to update the staging rule set.")], update_staging_rule_set_request : Annotated[UpdateStagingRuleSetRequest, Field(..., description="Request to update a staging rule set.")], async_req: Optional[bool]=True, **kwargs) -> StagingRuleSet: # noqa: E501
...
[docs]
@validate_arguments
def update_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to update the staging rule set.")], update_staging_rule_set_request : Annotated[UpdateStagingRuleSetRequest, Field(..., description="Request to update a staging rule set.")], async_req: Optional[bool]=None, **kwargs) -> Union[StagingRuleSet, Awaitable[StagingRuleSet]]: # noqa: E501
"""[EXPERIMENTAL] UpdateStagingRuleSet: Update a StagingRuleSet # noqa: E501
Update an existing staging rule set. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.update_staging_rule_set(entity_type, update_staging_rule_set_request, async_req=True)
>>> result = thread.get()
:param entity_type: The entity type for which to update the staging rule set. (required)
:type entity_type: str
:param update_staging_rule_set_request: Request to update a staging rule set. (required)
:type update_staging_rule_set_request: UpdateStagingRuleSetRequest
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _request_timeout: timeout setting for this request.
If one number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: StagingRuleSet
"""
kwargs['_return_http_data_only'] = True
if '_preload_content' in kwargs:
message = "Error! Please call the update_staging_rule_set_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501
raise ValueError(message)
if async_req is not None:
kwargs['async_req'] = async_req
return self.update_staging_rule_set_with_http_info(entity_type, update_staging_rule_set_request, **kwargs) # noqa: E501
[docs]
@validate_arguments
def update_staging_rule_set_with_http_info(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to update the staging rule set.")], update_staging_rule_set_request : Annotated[UpdateStagingRuleSetRequest, Field(..., description="Request to update a staging rule set.")], **kwargs) -> ApiResponse: # noqa: E501
"""[EXPERIMENTAL] UpdateStagingRuleSet: Update a StagingRuleSet # noqa: E501
Update an existing staging rule set. # noqa: E501
This method makes a synchronous HTTP request by default. To make an
asynchronous HTTP request, please pass async_req=True
>>> thread = api.update_staging_rule_set_with_http_info(entity_type, update_staging_rule_set_request, async_req=True)
>>> result = thread.get()
:param entity_type: The entity type for which to update the staging rule set. (required)
:type entity_type: str
:param update_staging_rule_set_request: Request to update a staging rule set. (required)
:type update_staging_rule_set_request: UpdateStagingRuleSetRequest
:param async_req: Whether to execute the request asynchronously.
:type async_req: bool, optional
:param _preload_content: if False, the ApiResponse.data will
be set to none and raw_data will store the
HTTP response body without reading/decoding.
Default is True.
:type _preload_content: bool, optional
:param _return_http_data_only: response data instead of ApiResponse
object with status code, headers, etc
:type _return_http_data_only: bool, optional
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
:param _request_auth: set to override the auth_settings for an a single
request; this effectively ignores the authentication
in the spec for a single request.
:type _request_auth: dict, optional
:type _content_type: string, optional: force content-type for the request
:return: Returns the result object.
If the method is called asynchronously,
returns the request thread.
:rtype: tuple(StagingRuleSet, status_code(int), headers(HTTPHeaderDict))
"""
_params = locals()
_all_params = [
'entity_type',
'update_staging_rule_set_request'
]
_all_params.extend(
[
'async_req',
'_return_http_data_only',
'_preload_content',
'_request_timeout',
'_request_auth',
'_content_type',
'_headers'
]
)
# validate the arguments
for _key, _val in _params['kwargs'].items():
if _key not in _all_params:
raise ApiTypeError(
"Got an unexpected keyword argument '%s'"
" to method update_staging_rule_set" % _key
)
_params[_key] = _val
del _params['kwargs']
_collection_formats = {}
# process the path parameters
_path_params = {}
if _params['entity_type']:
_path_params['entityType'] = _params['entity_type']
# process the query parameters
_query_params = []
# process the header parameters
_header_params = dict(_params.get('_headers', {}))
# process the form parameters
_form_params = []
_files = {}
# process the body parameter
_body_params = None
if _params['update_staging_rule_set_request'] is not None:
_body_params = _params['update_staging_rule_set_request']
# set the HTTP header `Accept`
_header_params['Accept'] = self.api_client.select_header_accept(
['text/plain', 'application/json', 'text/json']) # noqa: E501
# set the HTTP header `Content-Type`
_content_types_list = _params.get('_content_type',
self.api_client.select_header_content_type(
['application/json-patch+json', 'application/json', 'text/json', 'application/*+json']))
if _content_types_list:
_header_params['Content-Type'] = _content_types_list
# authentication setting
_auth_settings = ['oauth2'] # noqa: E501
_response_types_map = {
'200': "StagingRuleSet",
'400': "LusidValidationProblemDetails",
}
return self.api_client.call_api(
'/api/stagingrulesets/{entityType}', 'PUT',
_path_params,
_query_params,
_header_params,
body=_body_params,
post_params=_form_params,
files=_files,
response_types_map=_response_types_map,
auth_settings=_auth_settings,
async_req=_params.get('async_req'),
_return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501
_preload_content=_params.get('_preload_content', True),
_request_timeout=_params.get('_request_timeout'),
collection_formats=_collection_formats,
_request_auth=_params.get('_request_auth'))