diff --git a/bigframes/core/global_session.py b/bigframes/core/global_session.py index 8732b55990..4698e4c4c5 100644 --- a/bigframes/core/global_session.py +++ b/bigframes/core/global_session.py @@ -110,8 +110,8 @@ def get_global_session(): _T = TypeVar("_T") -def with_default_session(func: Callable[..., _T], *args, **kwargs) -> _T: - return func(get_global_session(), *args, **kwargs) +def with_default_session(func_: Callable[..., _T], *args, **kwargs) -> _T: + return func_(get_global_session(), *args, **kwargs) class _GlobalSessionContext: diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 9e7555431a..a7910127e4 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -668,6 +668,30 @@ def wrapper(func): return wrapper + def deploy_remote_function( + self, + func, + **kwargs, + ): + """Orchestrates the creation of a BigQuery remote function that deploys immediately. + + This method ensures that the remote function is created and available for + use in BigQuery as soon as this call is made. + + Args: + kwargs: + All arguments are passed directly to + :meth:`~bigframes.session.Session.remote_function`. Please see + its docstring for parameter details. + + Returns: + A wrapped remote function, usable in + :meth:`~bigframes.series.Series.apply`. + """ + # TODO(tswast): If we update remote_function to defer deployment, update + # this method to deploy immediately. + return self.remote_function(**kwargs)(func) + def udf( self, input_types: Union[None, type, Sequence[type]] = None, @@ -866,6 +890,32 @@ def wrapper(func): return wrapper + def deploy_udf( + self, + func, + **kwargs, + ): + """Orchestrates the creation of a BigQuery UDF that deploys immediately. + + This method ensures that the UDF is created and available for + use in BigQuery as soon as this call is made. + + Args: + func: + Function to deploy. + kwargs: + All arguments are passed directly to + :meth:`~bigframes.session.Session.udf`. Please see + its docstring for parameter details. + + Returns: + A wrapped Python user defined function, usable in + :meth:`~bigframes.series.Series.apply`. + """ + # TODO(tswast): If we update udf to defer deployment, update this method + # to deploy immediately. + return self.udf(**kwargs)(func) + def _convert_row_processor_sig( signature: inspect.Signature, diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index a9d1c31865..ed999e62c1 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -117,6 +117,22 @@ def remote_function( remote_function.__doc__ = inspect.getdoc(bigframes.session.Session.remote_function) +def deploy_remote_function( + func, + **kwargs, +): + return global_session.with_default_session( + bigframes.session.Session.deploy_remote_function, + func=func, + **kwargs, + ) + + +deploy_remote_function.__doc__ = inspect.getdoc( + bigframes.session.Session.deploy_remote_function +) + + def udf( *, input_types: Union[None, type, Sequence[type]] = None, @@ -140,6 +156,20 @@ def udf( udf.__doc__ = inspect.getdoc(bigframes.session.Session.udf) +def deploy_udf( + func, + **kwargs, +): + return global_session.with_default_session( + bigframes.session.Session.deploy_udf, + func=func, + **kwargs, + ) + + +deploy_udf.__doc__ = inspect.getdoc(bigframes.session.Session.deploy_udf) + + @typing.overload def to_datetime( arg: Union[ @@ -330,6 +360,8 @@ def reset_session(): clean_up_by_session_id, concat, cut, + deploy_remote_function, + deploy_udf, get_default_session_id, get_dummies, merge, diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index c06233bad3..13db6823c1 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1343,6 +1343,40 @@ def _check_file_size(self, filepath: str): "for large files to avoid loading the file into local memory." ) + def deploy_remote_function( + self, + func, + **kwargs, + ): + """Orchestrates the creation of a BigQuery remote function that deploys immediately. + + This method ensures that the remote function is created and available for + use in BigQuery as soon as this call is made. + + Args: + func: + Function to deploy. + kwargs: + All arguments are passed directly to + :meth:`~bigframes.session.Session.remote_function`. Please see + its docstring for parameter details. + + Returns: + A wrapped remote function, usable in + :meth:`~bigframes.series.Series.apply`. + """ + return self._function_session.deploy_remote_function( + func, + # Session-provided arguments. + session=self, + bigquery_client=self._clients_provider.bqclient, + bigquery_connection_client=self._clients_provider.bqconnectionclient, + cloud_functions_client=self._clients_provider.cloudfunctionsclient, + resource_manager_client=self._clients_provider.resourcemanagerclient, + # User-provided arguments. + **kwargs, + ) + def remote_function( self, # Make sure that the input/output types, and dataset can be used @@ -1565,9 +1599,15 @@ def remote_function( `bigframes_remote_function` - The bigquery remote function capable of calling into `bigframes_cloud_function`. """ return self._function_session.remote_function( + # Session-provided arguments. + session=self, + bigquery_client=self._clients_provider.bqclient, + bigquery_connection_client=self._clients_provider.bqconnectionclient, + cloud_functions_client=self._clients_provider.cloudfunctionsclient, + resource_manager_client=self._clients_provider.resourcemanagerclient, + # User-provided arguments. input_types=input_types, output_type=output_type, - session=self, dataset=dataset, bigquery_connection=bigquery_connection, reuse=reuse, @@ -1585,6 +1625,37 @@ def remote_function( cloud_build_service_account=cloud_build_service_account, ) + def deploy_udf( + self, + func, + **kwargs, + ): + """Orchestrates the creation of a BigQuery UDF that deploys immediately. + + This method ensures that the UDF is created and available for + use in BigQuery as soon as this call is made. + + Args: + func: + Function to deploy. + kwargs: + All arguments are passed directly to + :meth:`~bigframes.session.Session.udf`. Please see + its docstring for parameter details. + + Returns: + A wrapped Python user defined function, usable in + :meth:`~bigframes.series.Series.apply`. + """ + return self._function_session.deploy_udf( + func, + # Session-provided arguments. + session=self, + bigquery_client=self._clients_provider.bqclient, + # User-provided arguments. + **kwargs, + ) + def udf( self, *, @@ -1726,9 +1797,12 @@ def udf( deployed for the user defined code. """ return self._function_session.udf( + # Session-provided arguments. + session=self, + bigquery_client=self._clients_provider.bqclient, + # User-provided arguments. input_types=input_types, output_type=output_type, - session=self, dataset=dataset, bigquery_connection=bigquery_connection, name=name, diff --git a/tests/unit/functions/test_remote_function.py b/tests/unit/functions/test_remote_function.py index 978281e5c9..ea09ac59d3 100644 --- a/tests/unit/functions/test_remote_function.py +++ b/tests/unit/functions/test_remote_function.py @@ -89,3 +89,57 @@ def function_without_return_annotation(myparam: int): match="'output_type' was not set .* missing a return type annotation", ): remote_function_decorator(function_without_return_annotation) + + +def test_deploy_remote_function(): + session = mocks.create_bigquery_session() + + def my_remote_func(x: int) -> int: + return x * 2 + + deployed = session.deploy_remote_function( + my_remote_func, cloud_function_service_account="test_sa@example.com" + ) + + # Test that the function would have been deployed somewhere. + assert deployed.bigframes_bigquery_function + + +def test_deploy_remote_function_with_name(): + session = mocks.create_bigquery_session() + + def my_remote_func(x: int) -> int: + return x * 2 + + deployed = session.deploy_remote_function( + my_remote_func, + name="my_custom_name", + cloud_function_service_account="test_sa@example.com", + ) + + # Test that the function would have been deployed somewhere. + assert "my_custom_name" in deployed.bigframes_bigquery_function + + +def test_deploy_udf(): + session = mocks.create_bigquery_session() + + def my_remote_func(x: int) -> int: + return x * 2 + + deployed = session.deploy_udf(my_remote_func) + + # Test that the function would have been deployed somewhere. + assert deployed.bigframes_bigquery_function + + +def test_deploy_udf_with_name(): + session = mocks.create_bigquery_session() + + def my_remote_func(x: int) -> int: + return x * 2 + + deployed = session.deploy_udf(my_remote_func, name="my_custom_name") + + # Test that the function would have been deployed somewhere. + assert "my_custom_name" in deployed.bigframes_bigquery_function