Source code for sdk.lusid.api.staging_rule_set_api

# coding: utf-8

"""
    LUSID API

    FINBOURNE Technology  # noqa: E501

    Contact: info@finbourne.com
    Generated by OpenAPI Generator (https://openapi-generator.tech)

    Do not edit the class manually.
"""


import re  # noqa: F401
import io
import warnings

from pydantic.v1 import validate_arguments, ValidationError
from typing import overload, Optional, Union, Awaitable

from typing_extensions import Annotated
from datetime import datetime

from pydantic.v1 import Field, StrictStr, conint, conlist, constr, validator

from typing import Optional

from lusid.models.create_staging_rule_set_request import CreateStagingRuleSetRequest
from lusid.models.deleted_entity_response import DeletedEntityResponse
from lusid.models.paged_resource_list_of_staging_rule_set import PagedResourceListOfStagingRuleSet
from lusid.models.staging_rule_set import StagingRuleSet
from lusid.models.update_staging_rule_set_request import UpdateStagingRuleSetRequest

from lusid.api_client import ApiClient
from lusid.api_response import ApiResponse
from lusid.exceptions import (  # noqa: F401
    ApiTypeError,
    ApiValueError
)


[docs] class StagingRuleSetApi: """NOTE: This class is auto generated by OpenAPI Generator Ref: https://openapi-generator.tech Do not edit the class manually. """ def __init__(self, api_client=None) -> None: if api_client is None: api_client = ApiClient.get_default() self.api_client = api_client @overload async def create_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to create the staging rule set.")], create_staging_rule_set_request : Annotated[CreateStagingRuleSetRequest, Field(..., description="Request to create a staging rule set.")], **kwargs) -> StagingRuleSet: # noqa: E501 ... @overload def create_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to create the staging rule set.")], create_staging_rule_set_request : Annotated[CreateStagingRuleSetRequest, Field(..., description="Request to create a staging rule set.")], async_req: Optional[bool]=True, **kwargs) -> StagingRuleSet: # noqa: E501 ...
[docs] @validate_arguments def create_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to create the staging rule set.")], create_staging_rule_set_request : Annotated[CreateStagingRuleSetRequest, Field(..., description="Request to create a staging rule set.")], async_req: Optional[bool]=None, **kwargs) -> Union[StagingRuleSet, Awaitable[StagingRuleSet]]: # noqa: E501 """[EXPERIMENTAL] CreateStagingRuleSet: Create a StagingRuleSet # noqa: E501 Create a new staging rule set. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.create_staging_rule_set(entity_type, create_staging_rule_set_request, async_req=True) >>> result = thread.get() :param entity_type: The entity type for which to create the staging rule set. (required) :type entity_type: str :param create_staging_rule_set_request: Request to create a staging rule set. (required) :type create_staging_rule_set_request: CreateStagingRuleSetRequest :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: StagingRuleSet """ kwargs['_return_http_data_only'] = True if '_preload_content' in kwargs: message = "Error! Please call the create_staging_rule_set_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501 raise ValueError(message) if async_req is not None: kwargs['async_req'] = async_req return self.create_staging_rule_set_with_http_info(entity_type, create_staging_rule_set_request, **kwargs) # noqa: E501
[docs] @validate_arguments def create_staging_rule_set_with_http_info(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to create the staging rule set.")], create_staging_rule_set_request : Annotated[CreateStagingRuleSetRequest, Field(..., description="Request to create a staging rule set.")], **kwargs) -> ApiResponse: # noqa: E501 """[EXPERIMENTAL] CreateStagingRuleSet: Create a StagingRuleSet # noqa: E501 Create a new staging rule set. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.create_staging_rule_set_with_http_info(entity_type, create_staging_rule_set_request, async_req=True) >>> result = thread.get() :param entity_type: The entity type for which to create the staging rule set. (required) :type entity_type: str :param create_staging_rule_set_request: Request to create a staging rule set. (required) :type create_staging_rule_set_request: CreateStagingRuleSetRequest :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(StagingRuleSet, status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = [ 'entity_type', 'create_staging_rule_set_request' ] _all_params.extend( [ 'async_req', '_return_http_data_only', '_preload_content', '_request_timeout', '_request_auth', '_content_type', '_headers' ] ) # validate the arguments for _key, _val in _params['kwargs'].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method create_staging_rule_set" % _key ) _params[_key] = _val del _params['kwargs'] _collection_formats = {} # process the path parameters _path_params = {} if _params['entity_type']: _path_params['entityType'] = _params['entity_type'] # process the query parameters _query_params = [] # process the header parameters _header_params = dict(_params.get('_headers', {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None if _params['create_staging_rule_set_request'] is not None: _body_params = _params['create_staging_rule_set_request'] # set the HTTP header `Accept` _header_params['Accept'] = self.api_client.select_header_accept( ['text/plain', 'application/json', 'text/json']) # noqa: E501 # set the HTTP header `Content-Type` _content_types_list = _params.get('_content_type', self.api_client.select_header_content_type( ['application/json-patch+json', 'application/json', 'text/json', 'application/*+json'])) if _content_types_list: _header_params['Content-Type'] = _content_types_list # authentication setting _auth_settings = ['oauth2'] # noqa: E501 _response_types_map = { '201': "StagingRuleSet", '400': "LusidValidationProblemDetails", } return self.api_client.call_api( '/api/stagingrulesets/{entityType}', 'POST', _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get('async_req'), _return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501 _preload_content=_params.get('_preload_content', True), _request_timeout=_params.get('_request_timeout'), collection_formats=_collection_formats, _request_auth=_params.get('_request_auth'))
@overload async def delete_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to delete the staging rule set.")], **kwargs) -> DeletedEntityResponse: # noqa: E501 ... @overload def delete_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to delete the staging rule set.")], async_req: Optional[bool]=True, **kwargs) -> DeletedEntityResponse: # noqa: E501 ...
[docs] @validate_arguments def delete_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to delete the staging rule set.")], async_req: Optional[bool]=None, **kwargs) -> Union[DeletedEntityResponse, Awaitable[DeletedEntityResponse]]: # noqa: E501 """[EXPERIMENTAL] DeleteStagingRuleSet: Delete a StagingRuleSet # noqa: E501 Delete a staging rule set of a specific entity type. Deletion will be valid from the staging rule set's creation datetime. This means that the staging rule set will no longer exist at any effective datetime from the asAt datetime of deletion. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.delete_staging_rule_set(entity_type, async_req=True) >>> result = thread.get() :param entity_type: The entity type for which to delete the staging rule set. (required) :type entity_type: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: DeletedEntityResponse """ kwargs['_return_http_data_only'] = True if '_preload_content' in kwargs: message = "Error! Please call the delete_staging_rule_set_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501 raise ValueError(message) if async_req is not None: kwargs['async_req'] = async_req return self.delete_staging_rule_set_with_http_info(entity_type, **kwargs) # noqa: E501
[docs] @validate_arguments def delete_staging_rule_set_with_http_info(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to delete the staging rule set.")], **kwargs) -> ApiResponse: # noqa: E501 """[EXPERIMENTAL] DeleteStagingRuleSet: Delete a StagingRuleSet # noqa: E501 Delete a staging rule set of a specific entity type. Deletion will be valid from the staging rule set's creation datetime. This means that the staging rule set will no longer exist at any effective datetime from the asAt datetime of deletion. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.delete_staging_rule_set_with_http_info(entity_type, async_req=True) >>> result = thread.get() :param entity_type: The entity type for which to delete the staging rule set. (required) :type entity_type: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(DeletedEntityResponse, status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = [ 'entity_type' ] _all_params.extend( [ 'async_req', '_return_http_data_only', '_preload_content', '_request_timeout', '_request_auth', '_content_type', '_headers' ] ) # validate the arguments for _key, _val in _params['kwargs'].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method delete_staging_rule_set" % _key ) _params[_key] = _val del _params['kwargs'] _collection_formats = {} # process the path parameters _path_params = {} if _params['entity_type']: _path_params['entityType'] = _params['entity_type'] # process the query parameters _query_params = [] # process the header parameters _header_params = dict(_params.get('_headers', {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None # set the HTTP header `Accept` _header_params['Accept'] = self.api_client.select_header_accept( ['text/plain', 'application/json', 'text/json']) # noqa: E501 # authentication setting _auth_settings = ['oauth2'] # noqa: E501 _response_types_map = { '200': "DeletedEntityResponse", '400': "LusidValidationProblemDetails", } return self.api_client.call_api( '/api/stagingrulesets/{entityType}', 'DELETE', _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get('async_req'), _return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501 _preload_content=_params.get('_preload_content', True), _request_timeout=_params.get('_request_timeout'), collection_formats=_collection_formats, _request_auth=_params.get('_request_auth'))
@overload async def get_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to retrieve the staging rule set.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the staging rule set. Defaults to return the latest version of the staging rule set if not specified.")] = None, **kwargs) -> StagingRuleSet: # noqa: E501 ... @overload def get_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to retrieve the staging rule set.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the staging rule set. Defaults to return the latest version of the staging rule set if not specified.")] = None, async_req: Optional[bool]=True, **kwargs) -> StagingRuleSet: # noqa: E501 ...
[docs] @validate_arguments def get_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to retrieve the staging rule set.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the staging rule set. Defaults to return the latest version of the staging rule set if not specified.")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[StagingRuleSet, Awaitable[StagingRuleSet]]: # noqa: E501 """[EXPERIMENTAL] GetStagingRuleSet: Get a StagingRuleSet # noqa: E501 Get the staging rule set for the given entity type at the specific asAt time. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.get_staging_rule_set(entity_type, as_at, async_req=True) >>> result = thread.get() :param entity_type: The entity type for which to retrieve the staging rule set. (required) :type entity_type: str :param as_at: The asAt datetime at which to retrieve the staging rule set. Defaults to return the latest version of the staging rule set if not specified. :type as_at: datetime :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: StagingRuleSet """ kwargs['_return_http_data_only'] = True if '_preload_content' in kwargs: message = "Error! Please call the get_staging_rule_set_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501 raise ValueError(message) if async_req is not None: kwargs['async_req'] = async_req return self.get_staging_rule_set_with_http_info(entity_type, as_at, **kwargs) # noqa: E501
[docs] @validate_arguments def get_staging_rule_set_with_http_info(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to retrieve the staging rule set.")], as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the staging rule set. Defaults to return the latest version of the staging rule set if not specified.")] = None, **kwargs) -> ApiResponse: # noqa: E501 """[EXPERIMENTAL] GetStagingRuleSet: Get a StagingRuleSet # noqa: E501 Get the staging rule set for the given entity type at the specific asAt time. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.get_staging_rule_set_with_http_info(entity_type, as_at, async_req=True) >>> result = thread.get() :param entity_type: The entity type for which to retrieve the staging rule set. (required) :type entity_type: str :param as_at: The asAt datetime at which to retrieve the staging rule set. Defaults to return the latest version of the staging rule set if not specified. :type as_at: datetime :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(StagingRuleSet, status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = [ 'entity_type', 'as_at' ] _all_params.extend( [ 'async_req', '_return_http_data_only', '_preload_content', '_request_timeout', '_request_auth', '_content_type', '_headers' ] ) # validate the arguments for _key, _val in _params['kwargs'].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method get_staging_rule_set" % _key ) _params[_key] = _val del _params['kwargs'] _collection_formats = {} # process the path parameters _path_params = {} if _params['entity_type']: _path_params['entityType'] = _params['entity_type'] # process the query parameters _query_params = [] if _params.get('as_at') is not None: # noqa: E501 if isinstance(_params['as_at'], datetime): _query_params.append(('asAt', _params['as_at'].strftime(self.api_client.configuration.datetime_format))) else: _query_params.append(('asAt', _params['as_at'])) # process the header parameters _header_params = dict(_params.get('_headers', {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None # set the HTTP header `Accept` _header_params['Accept'] = self.api_client.select_header_accept( ['text/plain', 'application/json', 'text/json']) # noqa: E501 # authentication setting _auth_settings = ['oauth2'] # noqa: E501 _response_types_map = { '200': "StagingRuleSet", '400': "LusidValidationProblemDetails", } return self.api_client.call_api( '/api/stagingrulesets/{entityType}', 'GET', _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get('async_req'), _return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501 _preload_content=_params.get('_preload_content', True), _request_timeout=_params.get('_request_timeout'), collection_formats=_collection_formats, _request_auth=_params.get('_request_auth'))
@overload async def list_staging_rule_sets(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the staging rule sets. Defaults to return the latest version of the staging rule sets if not specified.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="The pagination token to use to continue listing staging rule sets from a previous call to list staging rule sets. This value is returned from the previous call. If a pagination token is provided the sortBy, filter, effectiveAt, and asAt fields must not have changed since the original request.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="A list of field names to sort by, each suffixed by \" ASC\" or \" DESC\"")] = None, limit : Annotated[Optional[conint(strict=True, le=5000, ge=1)], Field(description="When paginating, limit the number of returned results to this many.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Expression to filter the result set. Read more about filtering results from LUSID here: https://support.lusid.com/filtering-results-from-lusid.")] = None, **kwargs) -> PagedResourceListOfStagingRuleSet: # noqa: E501 ... @overload def list_staging_rule_sets(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the staging rule sets. Defaults to return the latest version of the staging rule sets if not specified.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="The pagination token to use to continue listing staging rule sets from a previous call to list staging rule sets. This value is returned from the previous call. If a pagination token is provided the sortBy, filter, effectiveAt, and asAt fields must not have changed since the original request.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="A list of field names to sort by, each suffixed by \" ASC\" or \" DESC\"")] = None, limit : Annotated[Optional[conint(strict=True, le=5000, ge=1)], Field(description="When paginating, limit the number of returned results to this many.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Expression to filter the result set. Read more about filtering results from LUSID here: https://support.lusid.com/filtering-results-from-lusid.")] = None, async_req: Optional[bool]=True, **kwargs) -> PagedResourceListOfStagingRuleSet: # noqa: E501 ...
[docs] @validate_arguments def list_staging_rule_sets(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the staging rule sets. Defaults to return the latest version of the staging rule sets if not specified.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="The pagination token to use to continue listing staging rule sets from a previous call to list staging rule sets. This value is returned from the previous call. If a pagination token is provided the sortBy, filter, effectiveAt, and asAt fields must not have changed since the original request.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="A list of field names to sort by, each suffixed by \" ASC\" or \" DESC\"")] = None, limit : Annotated[Optional[conint(strict=True, le=5000, ge=1)], Field(description="When paginating, limit the number of returned results to this many.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Expression to filter the result set. Read more about filtering results from LUSID here: https://support.lusid.com/filtering-results-from-lusid.")] = None, async_req: Optional[bool]=None, **kwargs) -> Union[PagedResourceListOfStagingRuleSet, Awaitable[PagedResourceListOfStagingRuleSet]]: # noqa: E501 """[EXPERIMENTAL] ListStagingRuleSets: List StagingRuleSets # noqa: E501 List all the staging rule sets matching particular criteria. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.list_staging_rule_sets(as_at, page, sort_by, limit, filter, async_req=True) >>> result = thread.get() :param as_at: The asAt datetime at which to retrieve the staging rule sets. Defaults to return the latest version of the staging rule sets if not specified. :type as_at: datetime :param page: The pagination token to use to continue listing staging rule sets from a previous call to list staging rule sets. This value is returned from the previous call. If a pagination token is provided the sortBy, filter, effectiveAt, and asAt fields must not have changed since the original request. :type page: str :param sort_by: A list of field names to sort by, each suffixed by \" ASC\" or \" DESC\" :type sort_by: List[str] :param limit: When paginating, limit the number of returned results to this many. :type limit: int :param filter: Expression to filter the result set. Read more about filtering results from LUSID here: https://support.lusid.com/filtering-results-from-lusid. :type filter: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: PagedResourceListOfStagingRuleSet """ kwargs['_return_http_data_only'] = True if '_preload_content' in kwargs: message = "Error! Please call the list_staging_rule_sets_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501 raise ValueError(message) if async_req is not None: kwargs['async_req'] = async_req return self.list_staging_rule_sets_with_http_info(as_at, page, sort_by, limit, filter, **kwargs) # noqa: E501
[docs] @validate_arguments def list_staging_rule_sets_with_http_info(self, as_at : Annotated[Optional[datetime], Field(description="The asAt datetime at which to retrieve the staging rule sets. Defaults to return the latest version of the staging rule sets if not specified.")] = None, page : Annotated[Optional[constr(strict=True, max_length=500, min_length=1)], Field(description="The pagination token to use to continue listing staging rule sets from a previous call to list staging rule sets. This value is returned from the previous call. If a pagination token is provided the sortBy, filter, effectiveAt, and asAt fields must not have changed since the original request.")] = None, sort_by : Annotated[Optional[conlist(StrictStr)], Field(description="A list of field names to sort by, each suffixed by \" ASC\" or \" DESC\"")] = None, limit : Annotated[Optional[conint(strict=True, le=5000, ge=1)], Field(description="When paginating, limit the number of returned results to this many.")] = None, filter : Annotated[Optional[constr(strict=True, max_length=16384, min_length=0)], Field(description="Expression to filter the result set. Read more about filtering results from LUSID here: https://support.lusid.com/filtering-results-from-lusid.")] = None, **kwargs) -> ApiResponse: # noqa: E501 """[EXPERIMENTAL] ListStagingRuleSets: List StagingRuleSets # noqa: E501 List all the staging rule sets matching particular criteria. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.list_staging_rule_sets_with_http_info(as_at, page, sort_by, limit, filter, async_req=True) >>> result = thread.get() :param as_at: The asAt datetime at which to retrieve the staging rule sets. Defaults to return the latest version of the staging rule sets if not specified. :type as_at: datetime :param page: The pagination token to use to continue listing staging rule sets from a previous call to list staging rule sets. This value is returned from the previous call. If a pagination token is provided the sortBy, filter, effectiveAt, and asAt fields must not have changed since the original request. :type page: str :param sort_by: A list of field names to sort by, each suffixed by \" ASC\" or \" DESC\" :type sort_by: List[str] :param limit: When paginating, limit the number of returned results to this many. :type limit: int :param filter: Expression to filter the result set. Read more about filtering results from LUSID here: https://support.lusid.com/filtering-results-from-lusid. :type filter: str :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(PagedResourceListOfStagingRuleSet, status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = [ 'as_at', 'page', 'sort_by', 'limit', 'filter' ] _all_params.extend( [ 'async_req', '_return_http_data_only', '_preload_content', '_request_timeout', '_request_auth', '_content_type', '_headers' ] ) # validate the arguments for _key, _val in _params['kwargs'].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method list_staging_rule_sets" % _key ) _params[_key] = _val del _params['kwargs'] _collection_formats = {} # process the path parameters _path_params = {} # process the query parameters _query_params = [] if _params.get('as_at') is not None: # noqa: E501 if isinstance(_params['as_at'], datetime): _query_params.append(('asAt', _params['as_at'].strftime(self.api_client.configuration.datetime_format))) else: _query_params.append(('asAt', _params['as_at'])) if _params.get('page') is not None: # noqa: E501 _query_params.append(('page', _params['page'])) if _params.get('sort_by') is not None: # noqa: E501 _query_params.append(('sortBy', _params['sort_by'])) _collection_formats['sortBy'] = 'multi' if _params.get('limit') is not None: # noqa: E501 _query_params.append(('limit', _params['limit'])) if _params.get('filter') is not None: # noqa: E501 _query_params.append(('filter', _params['filter'])) # process the header parameters _header_params = dict(_params.get('_headers', {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None # set the HTTP header `Accept` _header_params['Accept'] = self.api_client.select_header_accept( ['text/plain', 'application/json', 'text/json']) # noqa: E501 # authentication setting _auth_settings = ['oauth2'] # noqa: E501 _response_types_map = { '200': "PagedResourceListOfStagingRuleSet", '400': "LusidValidationProblemDetails", } return self.api_client.call_api( '/api/stagingrulesets', 'GET', _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get('async_req'), _return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501 _preload_content=_params.get('_preload_content', True), _request_timeout=_params.get('_request_timeout'), collection_formats=_collection_formats, _request_auth=_params.get('_request_auth'))
@overload async def update_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to update the staging rule set.")], update_staging_rule_set_request : Annotated[UpdateStagingRuleSetRequest, Field(..., description="Request to update a staging rule set.")], **kwargs) -> StagingRuleSet: # noqa: E501 ... @overload def update_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to update the staging rule set.")], update_staging_rule_set_request : Annotated[UpdateStagingRuleSetRequest, Field(..., description="Request to update a staging rule set.")], async_req: Optional[bool]=True, **kwargs) -> StagingRuleSet: # noqa: E501 ...
[docs] @validate_arguments def update_staging_rule_set(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to update the staging rule set.")], update_staging_rule_set_request : Annotated[UpdateStagingRuleSetRequest, Field(..., description="Request to update a staging rule set.")], async_req: Optional[bool]=None, **kwargs) -> Union[StagingRuleSet, Awaitable[StagingRuleSet]]: # noqa: E501 """[EXPERIMENTAL] UpdateStagingRuleSet: Update a StagingRuleSet # noqa: E501 Update an existing staging rule set. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.update_staging_rule_set(entity_type, update_staging_rule_set_request, async_req=True) >>> result = thread.get() :param entity_type: The entity type for which to update the staging rule set. (required) :type entity_type: str :param update_staging_rule_set_request: Request to update a staging rule set. (required) :type update_staging_rule_set_request: UpdateStagingRuleSetRequest :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: StagingRuleSet """ kwargs['_return_http_data_only'] = True if '_preload_content' in kwargs: message = "Error! Please call the update_staging_rule_set_with_http_info method with `_preload_content` instead and obtain raw data from ApiResponse.raw_data" # noqa: E501 raise ValueError(message) if async_req is not None: kwargs['async_req'] = async_req return self.update_staging_rule_set_with_http_info(entity_type, update_staging_rule_set_request, **kwargs) # noqa: E501
[docs] @validate_arguments def update_staging_rule_set_with_http_info(self, entity_type : Annotated[constr(strict=True, max_length=64, min_length=1), Field(..., description="The entity type for which to update the staging rule set.")], update_staging_rule_set_request : Annotated[UpdateStagingRuleSetRequest, Field(..., description="Request to update a staging rule set.")], **kwargs) -> ApiResponse: # noqa: E501 """[EXPERIMENTAL] UpdateStagingRuleSet: Update a StagingRuleSet # noqa: E501 Update an existing staging rule set. # noqa: E501 This method makes a synchronous HTTP request by default. To make an asynchronous HTTP request, please pass async_req=True >>> thread = api.update_staging_rule_set_with_http_info(entity_type, update_staging_rule_set_request, async_req=True) >>> result = thread.get() :param entity_type: The entity type for which to update the staging rule set. (required) :type entity_type: str :param update_staging_rule_set_request: Request to update a staging rule set. (required) :type update_staging_rule_set_request: UpdateStagingRuleSetRequest :param async_req: Whether to execute the request asynchronously. :type async_req: bool, optional :param _preload_content: if False, the ApiResponse.data will be set to none and raw_data will store the HTTP response body without reading/decoding. Default is True. :type _preload_content: bool, optional :param _return_http_data_only: response data instead of ApiResponse object with status code, headers, etc :type _return_http_data_only: bool, optional :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. :param _request_auth: set to override the auth_settings for an a single request; this effectively ignores the authentication in the spec for a single request. :type _request_auth: dict, optional :type _content_type: string, optional: force content-type for the request :return: Returns the result object. If the method is called asynchronously, returns the request thread. :rtype: tuple(StagingRuleSet, status_code(int), headers(HTTPHeaderDict)) """ _params = locals() _all_params = [ 'entity_type', 'update_staging_rule_set_request' ] _all_params.extend( [ 'async_req', '_return_http_data_only', '_preload_content', '_request_timeout', '_request_auth', '_content_type', '_headers' ] ) # validate the arguments for _key, _val in _params['kwargs'].items(): if _key not in _all_params: raise ApiTypeError( "Got an unexpected keyword argument '%s'" " to method update_staging_rule_set" % _key ) _params[_key] = _val del _params['kwargs'] _collection_formats = {} # process the path parameters _path_params = {} if _params['entity_type']: _path_params['entityType'] = _params['entity_type'] # process the query parameters _query_params = [] # process the header parameters _header_params = dict(_params.get('_headers', {})) # process the form parameters _form_params = [] _files = {} # process the body parameter _body_params = None if _params['update_staging_rule_set_request'] is not None: _body_params = _params['update_staging_rule_set_request'] # set the HTTP header `Accept` _header_params['Accept'] = self.api_client.select_header_accept( ['text/plain', 'application/json', 'text/json']) # noqa: E501 # set the HTTP header `Content-Type` _content_types_list = _params.get('_content_type', self.api_client.select_header_content_type( ['application/json-patch+json', 'application/json', 'text/json', 'application/*+json'])) if _content_types_list: _header_params['Content-Type'] = _content_types_list # authentication setting _auth_settings = ['oauth2'] # noqa: E501 _response_types_map = { '200': "StagingRuleSet", '400': "LusidValidationProblemDetails", } return self.api_client.call_api( '/api/stagingrulesets/{entityType}', 'PUT', _path_params, _query_params, _header_params, body=_body_params, post_params=_form_params, files=_files, response_types_map=_response_types_map, auth_settings=_auth_settings, async_req=_params.get('async_req'), _return_http_data_only=_params.get('_return_http_data_only'), # noqa: E501 _preload_content=_params.get('_preload_content', True), _request_timeout=_params.get('_request_timeout'), collection_formats=_collection_formats, _request_auth=_params.get('_request_auth'))