diff --git a/.github/.OwlBot.lock.yaml b/.github/.OwlBot.lock.yaml index 6301519a9a..10cf433a8b 100644 --- a/.github/.OwlBot.lock.yaml +++ b/.github/.OwlBot.lock.yaml @@ -1,4 +1,4 @@ -# Copyright 2024 Google LLC +# Copyright 2025 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,5 +13,5 @@ # limitations under the License. docker: image: gcr.io/cloud-devrel-public-resources/owlbot-python:latest - digest: sha256:2ed982f884312e4883e01b5ab8af8b6935f0216a5a2d82928d273081fc3be562 -# created: 2024-11-12T12:09:45.821174897Z + digest: sha256:8ff1efe878e18bd82a0fb7b70bb86f77e7ab6901fed394440b6135db0ba8d84a +# created: 2025-01-09T12:01:16.422459506Z diff --git a/.github/workflows/unittest.yml b/.github/workflows/unittest.yml index ce5137e58c..8659d83d82 100644 --- a/.github/workflows/unittest.yml +++ b/.github/workflows/unittest.yml @@ -5,7 +5,10 @@ on: name: unittest jobs: unit: - runs-on: ubuntu-latest + # TODO(https://github.com/googleapis/gapic-generator-python/issues/2303): use `ubuntu-latest` once this bug is fixed. + # Use ubuntu-22.04 until Python 3.7 is removed from the test matrix + # https://docs.github.com/en/actions/using-github-hosted-runners/using-github-hosted-runners/about-github-hosted-runners#standard-github-hosted-runners-for-public-repositories + runs-on: ubuntu-22.04 strategy: matrix: python: ['3.9', '3.10', '3.11', '3.12'] diff --git a/.kokoro/docker/docs/requirements.txt b/.kokoro/docker/docs/requirements.txt index 8bb0764594..f99a5c4aac 100644 --- a/.kokoro/docker/docs/requirements.txt +++ b/.kokoro/docker/docs/requirements.txt @@ -2,11 +2,11 @@ # This file is autogenerated by pip-compile with Python 3.10 # by the following command: # -# pip-compile --allow-unsafe --generate-hashes requirements.in +# pip-compile --allow-unsafe --generate-hashes synthtool/gcp/templates/python_library/.kokoro/docker/docs/requirements.in # -argcomplete==3.5.1 \ - --hash=sha256:1a1d148bdaa3e3b93454900163403df41448a248af01b6e849edc5ac08e6c363 \ - --hash=sha256:eb1ee355aa2557bd3d0145de7b06b2a45b0ce461e1e7813f5d066039ab4177b4 +argcomplete==3.5.2 \ + --hash=sha256:036d020d79048a5d525bc63880d7a4b8d1668566b8a76daf1144c0bbe0f63472 \ + --hash=sha256:23146ed7ac4403b70bd6026402468942ceba34a6732255b9edf5b7354f68a6bb # via nox colorlog==6.9.0 \ --hash=sha256:5906e71acd67cb07a71e779c47c4bcb45fb8c2993eebe9e5adcd6a6f1b283eff \ @@ -23,7 +23,7 @@ filelock==3.16.1 \ nox==2024.10.9 \ --hash=sha256:1d36f309a0a2a853e9bccb76bbef6bb118ba92fa92674d15604ca99adeb29eab \ --hash=sha256:7aa9dc8d1c27e9f45ab046ffd1c3b2c4f7c91755304769df231308849ebded95 - # via -r requirements.in + # via -r synthtool/gcp/templates/python_library/.kokoro/docker/docs/requirements.in packaging==24.2 \ --hash=sha256:09abb1bccd265c01f4a3aa3f7a7db064b36514d2cba19a2f694fe6150451a759 \ --hash=sha256:c228a6dc5e932d346bc5739379109d49e8853dd8223571c7c5b55260edc0b97f @@ -32,11 +32,41 @@ platformdirs==4.3.6 \ --hash=sha256:357fb2acbc885b0419afd3ce3ed34564c13c9b95c89360cd9563f73aa5e2b907 \ --hash=sha256:73e575e1408ab8103900836b97580d5307456908a03e92031bab39e4554cc3fb # via virtualenv -tomli==2.0.2 \ - --hash=sha256:2ebe24485c53d303f690b0ec092806a085f07af5a5aa1464f3931eec36caaa38 \ - --hash=sha256:d46d457a85337051c36524bc5349dd91b1877838e2979ac5ced3e710ed8a60ed +tomli==2.2.1 \ + --hash=sha256:023aa114dd824ade0100497eb2318602af309e5a55595f76b626d6d9f3b7b0a6 \ + --hash=sha256:02abe224de6ae62c19f090f68da4e27b10af2b93213d36cf44e6e1c5abd19fdd \ + --hash=sha256:286f0ca2ffeeb5b9bd4fcc8d6c330534323ec51b2f52da063b11c502da16f30c \ + --hash=sha256:2d0f2fdd22b02c6d81637a3c95f8cd77f995846af7414c5c4b8d0545afa1bc4b \ + --hash=sha256:33580bccab0338d00994d7f16f4c4ec25b776af3ffaac1ed74e0b3fc95e885a8 \ + --hash=sha256:400e720fe168c0f8521520190686ef8ef033fb19fc493da09779e592861b78c6 \ + --hash=sha256:40741994320b232529c802f8bc86da4e1aa9f413db394617b9a256ae0f9a7f77 \ + --hash=sha256:465af0e0875402f1d226519c9904f37254b3045fc5084697cefb9bdde1ff99ff \ + --hash=sha256:4a8f6e44de52d5e6c657c9fe83b562f5f4256d8ebbfe4ff922c495620a7f6cea \ + --hash=sha256:4e340144ad7ae1533cb897d406382b4b6fede8890a03738ff1683af800d54192 \ + --hash=sha256:678e4fa69e4575eb77d103de3df8a895e1591b48e740211bd1067378c69e8249 \ + --hash=sha256:6972ca9c9cc9f0acaa56a8ca1ff51e7af152a9f87fb64623e31d5c83700080ee \ + --hash=sha256:7fc04e92e1d624a4a63c76474610238576942d6b8950a2d7f908a340494e67e4 \ + --hash=sha256:889f80ef92701b9dbb224e49ec87c645ce5df3fa2cc548664eb8a25e03127a98 \ + --hash=sha256:8d57ca8095a641b8237d5b079147646153d22552f1c637fd3ba7f4b0b29167a8 \ + --hash=sha256:8dd28b3e155b80f4d54beb40a441d366adcfe740969820caf156c019fb5c7ec4 \ + --hash=sha256:9316dc65bed1684c9a98ee68759ceaed29d229e985297003e494aa825ebb0281 \ + --hash=sha256:a198f10c4d1b1375d7687bc25294306e551bf1abfa4eace6650070a5c1ae2744 \ + --hash=sha256:a38aa0308e754b0e3c67e344754dff64999ff9b513e691d0e786265c93583c69 \ + --hash=sha256:a92ef1a44547e894e2a17d24e7557a5e85a9e1d0048b0b5e7541f76c5032cb13 \ + --hash=sha256:ac065718db92ca818f8d6141b5f66369833d4a80a9d74435a268c52bdfa73140 \ + --hash=sha256:b82ebccc8c8a36f2094e969560a1b836758481f3dc360ce9a3277c65f374285e \ + --hash=sha256:c954d2250168d28797dd4e3ac5cf812a406cd5a92674ee4c8f123c889786aa8e \ + --hash=sha256:cb55c73c5f4408779d0cf3eef9f762b9c9f147a77de7b258bef0a5628adc85cc \ + --hash=sha256:cd45e1dc79c835ce60f7404ec8119f2eb06d38b1deba146f07ced3bbc44505ff \ + --hash=sha256:d3f5614314d758649ab2ab3a62d4f2004c825922f9e370b29416484086b264ec \ + --hash=sha256:d920f33822747519673ee656a4b6ac33e382eca9d331c87770faa3eef562aeb2 \ + --hash=sha256:db2b95f9de79181805df90bedc5a5ab4c165e6ec3fe99f970d0e302f384ad222 \ + --hash=sha256:e59e304978767a54663af13c07b3d1af22ddee3bb2fb0618ca1593e4f593a106 \ + --hash=sha256:e85e99945e688e32d5a35c1ff38ed0b3f41f43fad8df0bdf79f72b2ba7bc5272 \ + --hash=sha256:ece47d672db52ac607a3d9599a9d48dcb2f2f735c6c2d1f34130085bb12b112a \ + --hash=sha256:f4039b9cbc3048b2416cc57ab3bda989a6fcf9b36cf8937f01a6e731b64f80d7 # via nox -virtualenv==20.27.1 \ - --hash=sha256:142c6be10212543b32c6c45d3d3893dff89112cc588b7d0879ae5a1ec03a47ba \ - --hash=sha256:f11f1b8a29525562925f745563bfd48b189450f61fb34c4f9cc79dd5aa32a1f4 +virtualenv==20.28.0 \ + --hash=sha256:23eae1b4516ecd610481eda647f3a7c09aea295055337331bb4e6892ecce47b0 \ + --hash=sha256:2c9c3262bb8e7b87ea801d715fae4495e6032450c71d2309be9550e7364049aa # via nox diff --git a/CHANGELOG.md b/CHANGELOG.md index ff5ce11006..b4bec86e9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,35 @@ [1]: https://pypi.org/project/bigframes/#history +## [1.32.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v1.31.0...v1.32.0) (2025-01-13) + + +### Features + +* Add max_retries to TextEmbeddingGenerator and Claude3TextGenerator ([#1259](https://github.com/googleapis/python-bigquery-dataframes/issues/1259)) ([8077ff4](https://github.com/googleapis/python-bigquery-dataframes/commit/8077ff49426b103dc5a52eeb86a2c6a869c99825)) +* Bigframes.bigquery.parse_json ([#1265](https://github.com/googleapis/python-bigquery-dataframes/issues/1265)) ([27bbd80](https://github.com/googleapis/python-bigquery-dataframes/commit/27bbd8085ccac175f113afbd6c94b52c034a3d97)) +* Support DataFrame.astype(dict) ([#1262](https://github.com/googleapis/python-bigquery-dataframes/issues/1262)) ([5934f8e](https://github.com/googleapis/python-bigquery-dataframes/commit/5934f8ee0a1c950a820d1911d73a46f6891a40bb)) + + +### Bug Fixes + +* Avoid global mutation in `BigQueryOptions.client_endpoints_override` ([#1280](https://github.com/googleapis/python-bigquery-dataframes/issues/1280)) ([788f6e9](https://github.com/googleapis/python-bigquery-dataframes/commit/788f6e94a1e80f0ba8741a53a05a467e7b18e902)) +* Fix erroneous window bounds removal during compilation ([#1163](https://github.com/googleapis/python-bigquery-dataframes/issues/1163)) ([f91756a](https://github.com/googleapis/python-bigquery-dataframes/commit/f91756a4413b10f1072c0ae96301fe854bb1ba4e)) + + +### Dependencies + +* Relax sqlglot upper bound ([#1278](https://github.com/googleapis/python-bigquery-dataframes/issues/1278)) ([c71ec09](https://github.com/googleapis/python-bigquery-dataframes/commit/c71ec093314409cd4c7a52a713dbd6164fbbd792)) + + +### Documentation + +* Add bq studio links that allows users to generate Jupiter notebooks in bq studio with github contents ([#1266](https://github.com/googleapis/python-bigquery-dataframes/issues/1266)) ([58f13cb](https://github.com/googleapis/python-bigquery-dataframes/commit/58f13cb9ef8bac3222e5013d8ae77dd20f886e30)) +* Add snippet to evaluate ARIMA plus model in the Forecast a single time series with a univariate model tutorial ([#1267](https://github.com/googleapis/python-bigquery-dataframes/issues/1267)) ([3dcae2d](https://github.com/googleapis/python-bigquery-dataframes/commit/3dcae2dca45efdd4493cf3f367bf025ea291f4df)) +* Add snippet to see the ARIMA coefficients in the Forecast a single time series with a univariate model tutorial ([#1268](https://github.com/googleapis/python-bigquery-dataframes/issues/1268)) ([059a564](https://github.com/googleapis/python-bigquery-dataframes/commit/059a564095dfea0518982f13c8118d3807861ccf)) +* Update `bigframes.pandas.pandas` docstrings ([#1247](https://github.com/googleapis/python-bigquery-dataframes/issues/1247)) ([c4bffc3](https://github.com/googleapis/python-bigquery-dataframes/commit/c4bffc3e8ec630a362c94f9d269a66073a14ad04)) +* Use 002 model for better scalability in text generation ([#1270](https://github.com/googleapis/python-bigquery-dataframes/issues/1270)) ([bb7a850](https://github.com/googleapis/python-bigquery-dataframes/commit/bb7a85005ebebfbcb0d2a4d5c4c27b354f38d3d1)) + ## [1.31.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v1.30.0...v1.31.0) (2025-01-05) @@ -97,6 +126,11 @@ * Update df.corr, df.cov to be used with more than 30 columns case. ([#1161](https://github.com/googleapis/python-bigquery-dataframes/issues/1161)) ([9dcf1aa](https://github.com/googleapis/python-bigquery-dataframes/commit/9dcf1aa918919704dcf4d12b05935b22fb502fc6)) +### Dependencies + +* Remove `ibis-framework` by vendoring a fork of the package to `bigframes_vendored`. ([#1170](https://github.com/googleapis/python-bigquery-dataframes/pull/1170)) ([421d24d](https://github.com/googleapis/python-bigquery-dataframes/commit/421d24d6e61d557aa696fc701c08c84389f72ed2)) + + ### Documentation * Add a code sample using `bpd.options.bigquery.ordering_mode = "partial"` ([#909](https://github.com/googleapis/python-bigquery-dataframes/issues/909)) ([f80d705](https://github.com/googleapis/python-bigquery-dataframes/commit/f80d70503b80559a0b1fe64434383aa3e028bf9b)) diff --git a/bigframes/_config/bigquery_options.py b/bigframes/_config/bigquery_options.py index 052ad5d921..8fec253b24 100644 --- a/bigframes/_config/bigquery_options.py +++ b/bigframes/_config/bigquery_options.py @@ -25,7 +25,7 @@ import bigframes.constants import bigframes.enums -import bigframes.exceptions +import bigframes.exceptions as bfe SESSION_STARTED_MESSAGE = ( "Cannot change '{attribute}' once a session has started. " @@ -55,15 +55,12 @@ def _get_validated_location(value: Optional[str]) -> Optional[str]: bigframes.constants.ALL_BIGQUERY_LOCATIONS, key=lambda item: jellyfish.levenshtein_distance(location, item), ) - warnings.warn( - UNKNOWN_LOCATION_MESSAGE.format(location=location, possibility=possibility), - # There are many layers before we get to (possibly) the user's code: - # -> bpd.options.bigquery.location = "us-central-1" - # -> location.setter - # -> _get_validated_location - stacklevel=3, - category=bigframes.exceptions.UnknownLocationWarning, - ) + # There are many layers before we get to (possibly) the user's code: + # -> bpd.options.bigquery.location = "us-central-1" + # -> location.setter + # -> _get_validated_location + msg = UNKNOWN_LOCATION_MESSAGE.format(location=location, possibility=possibility) + warnings.warn(msg, stacklevel=3, category=bfe.UnknownLocationWarning) return value @@ -91,7 +88,7 @@ def __init__( skip_bq_connection_check: bool = False, *, ordering_mode: Literal["strict", "partial"] = "strict", - client_endpoints_override: dict = {}, + client_endpoints_override: Optional[dict] = None, ): self._credentials = credentials self._project = project @@ -104,6 +101,10 @@ def __init__( self._session_started = False # Determines the ordering strictness for the session. self._ordering_mode = _validate_ordering_mode(ordering_mode) + + if client_endpoints_override is None: + client_endpoints_override = {} + self._client_endpoints_override = client_endpoints_override @property @@ -271,10 +272,11 @@ def use_regional_endpoints(self, value: bool): ) if value: - warnings.warn( + msg = ( "Use of regional endpoints is a feature in preview and " "available only in selected regions and projects. " ) + warnings.warn(msg, category=bfe.PreviewWarning, stacklevel=2) self._use_regional_endpoints = value @@ -330,9 +332,12 @@ def client_endpoints_override(self) -> dict: @client_endpoints_override.setter def client_endpoints_override(self, value: dict): - warnings.warn( - "This is an advanced configuration option for directly setting endpoints. Incorrect use may lead to unexpected behavior or system instability. Proceed only if you fully understand its implications." + msg = ( + "This is an advanced configuration option for directly setting endpoints. " + "Incorrect use may lead to unexpected behavior or system instability. " + "Proceed only if you fully understand its implications." ) + warnings.warn(msg) if self._session_started and self._client_endpoints_override != value: raise ValueError( diff --git a/bigframes/_config/experiment_options.py b/bigframes/_config/experiment_options.py index 6b79dcf748..69273aef1c 100644 --- a/bigframes/_config/experiment_options.py +++ b/bigframes/_config/experiment_options.py @@ -14,6 +14,8 @@ import warnings +import bigframes.exceptions as bfe + class ExperimentOptions: """ @@ -31,9 +33,11 @@ def semantic_operators(self) -> bool: @semantic_operators.setter def semantic_operators(self, value: bool): if value is True: - warnings.warn( - "Semantic operators are still under experiments, and are subject to change in the future." + msg = ( + "Semantic operators are still under experiments, and are subject " + "to change in the future." ) + warnings.warn(msg, category=bfe.PreviewWarning) self._semantic_operators = value @property @@ -43,7 +47,9 @@ def blob(self) -> bool: @blob.setter def blob(self, value: bool): if value is True: - warnings.warn( - "BigFrames Blob is still under experiments. It may not work and subject to change in the future." + msg = ( + "BigFrames Blob is still under experiments. It may not work and " + "subject to change in the future." ) + warnings.warn(msg, category=bfe.PreviewWarning) self._blob = value diff --git a/bigframes/bigquery/__init__.py b/bigframes/bigquery/__init__.py index a39914d6e7..ff52ae8d36 100644 --- a/bigframes/bigquery/__init__.py +++ b/bigframes/bigquery/__init__.py @@ -27,20 +27,27 @@ json_extract_array, json_extract_string_array, json_set, + parse_json, ) from bigframes.bigquery._operations.search import create_vector_index, vector_search from bigframes.bigquery._operations.struct import struct __all__ = [ + # approximate aggregate ops + "approx_top_count", + # array ops "array_length", "array_agg", "array_to_string", + # json ops "json_set", "json_extract", "json_extract_array", "json_extract_string_array", - "approx_top_count", - "struct", + "parse_json", + # search ops "create_vector_index", "vector_search", + # struct ops + "struct", ] diff --git a/bigframes/bigquery/_operations/json.py b/bigframes/bigquery/_operations/json.py index 843991807e..52b01d3ef7 100644 --- a/bigframes/bigquery/_operations/json.py +++ b/bigframes/bigquery/_operations/json.py @@ -23,6 +23,7 @@ from typing import Any, cast, Optional, Sequence, Tuple, Union +import bigframes.core.utils as utils import bigframes.dtypes import bigframes.operations as ops import bigframes.series as series @@ -30,6 +31,7 @@ from . import array +@utils.preview(name="The JSON-related API `json_set`") def json_set( input: series.Series, json_path_value_pairs: Sequence[Tuple[str, Any]], @@ -37,6 +39,10 @@ def json_set( """Produces a new JSON value within a Series by inserting or replacing values at specified paths. + .. warning:: + The JSON-related API `parse_json` is in preview. Its behavior may change in + future versions. + **Examples:** >>> import bigframes.pandas as bpd @@ -223,3 +229,37 @@ def json_extract_string_array( ), ) return array_series + + +@utils.preview(name="The JSON-related API `parse_json`") +def parse_json( + input: series.Series, +) -> series.Series: + """Converts a series with a JSON-formatted STRING value to a JSON value. + + .. warning:: + The JSON-related API `parse_json` is in preview. Its behavior may change in + future versions. + + **Examples:** + + >>> import bigframes.pandas as bpd + >>> import bigframes.bigquery as bbq + >>> bpd.options.display.progress_bar = None + + >>> s = bpd.Series(['{"class": {"students": [{"id": 5}, {"id": 12}]}}']) + >>> s + 0 {"class": {"students": [{"id": 5}, {"id": 12}]}} + dtype: string + >>> bbq.parse_json(s) + 0 {"class":{"students":[{"id":5},{"id":12}]}} + dtype: large_string[pyarrow] + + Args: + input (bigframes.series.Series): + The Series containing JSON-formatted strings). + + Returns: + bigframes.series.Series: A new Series with the JSON value. + """ + return input._apply_unary_op(ops.ParseJSON()) diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py new file mode 100644 index 0000000000..4b3841252c --- /dev/null +++ b/bigframes/blob/_functions.py @@ -0,0 +1,130 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from dataclasses import dataclass +import inspect +from typing import Callable, Iterable + +import google.cloud.bigquery as bigquery + +import bigframes +import bigframes.session._io.bigquery as bf_io_bigquery + +_PYTHON_TO_BQ_TYPES = {int: "INT64", float: "FLOAT64", str: "STRING", bytes: "BYTES"} + + +@dataclass(frozen=True) +class FunctionDef: + """Definition of a Python UDF.""" + + func: Callable # function body + requirements: Iterable[str] # required packages + + +# TODO(garrettwu): migrate to bigframes UDF when it is available +class TransformFunction: + """Simple transform function class to deal with Python UDF.""" + + def __init__( + self, func_def: FunctionDef, session: bigframes.Session, connection: str + ): + self._func = func_def.func + self._requirements = func_def.requirements + self._session = session + self._connection = connection + + def _input_bq_signature(self): + sig = inspect.signature(self._func) + inputs = [] + for k, v in sig.parameters.items(): + inputs.append(f"{k} {_PYTHON_TO_BQ_TYPES[v.annotation]}") + return ", ".join(inputs) + + def _output_bq_type(self): + sig = inspect.signature(self._func) + return _PYTHON_TO_BQ_TYPES[sig.return_annotation] + + def _create_udf(self): + """Create Python UDF in BQ. Return name of the UDF.""" + udf_name = str(self._session._loader._storage_manager._random_table()) + + func_body = inspect.getsource(self._func) + func_name = self._func.__name__ + packages = str(list(self._requirements)) + + sql = f""" +CREATE OR REPLACE FUNCTION `{udf_name}`({self._input_bq_signature()}) +RETURNS {self._output_bq_type()} LANGUAGE python +WITH CONNECTION `{self._connection}` +OPTIONS (entry_point='{func_name}', runtime_version='python-3.11', packages={packages}) +AS r\"\"\" + + +{func_body} + + +\"\"\" + """ + + bf_io_bigquery.start_query_with_client( + self._session.bqclient, + sql, + job_config=bigquery.QueryJobConfig(), + metrics=self._session._metrics, + ) + + return udf_name + + def udf(self): + """Create and return the UDF object.""" + udf_name = self._create_udf() + return self._session.read_gbq_function(udf_name) + + +# Blur images. Takes ObjectRefRuntime as JSON string. Outputs ObjectRefRuntime JSON string. +def image_blur_func( + src_obj_ref_rt: str, dst_obj_ref_rt: str, ksize_x: int, ksize_y: int +) -> str: + import json + + import cv2 as cv # type: ignore + import numpy as np + import requests + + src_obj_ref_rt_json = json.loads(src_obj_ref_rt) + dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt) + + src_url = src_obj_ref_rt_json["access_urls"]["read_url"] + dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"] + + response = requests.get(src_url) + bts = response.content + + nparr = np.frombuffer(bts, np.uint8) + img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED) + img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y)) + bts = cv.imencode(".jpeg", img_blurred)[1].tobytes() + + requests.put( + url=dst_url, + data=bts, + headers={ + "Content-Type": "image/jpeg", + }, + ) + + return dst_obj_ref_rt + + +image_blur_def = FunctionDef(image_blur_func, ["opencv-python", "numpy", "requests"]) diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py index 5e3f6df355..a88e365dcd 100644 --- a/bigframes/core/__init__.py +++ b/bigframes/core/__init__.py @@ -39,6 +39,7 @@ import bigframes.core.utils from bigframes.core.window_spec import WindowSpec import bigframes.dtypes +import bigframes.exceptions as bfe import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops @@ -106,10 +107,11 @@ def from_table( if offsets_col and primary_key: raise ValueError("must set at most one of 'offests', 'primary_key'") if any(i.field_type == "JSON" for i in table.schema if i.name in schema.names): - warnings.warn( - "Interpreting JSON column(s) as StringDtype and pyarrow.large_string. This behavior may change in future versions.", - bigframes.exceptions.PreviewWarning, + msg = ( + "Interpreting JSON column(s) as pyarrow.large_string. " + "This behavior may change in future versions." ) + warnings.warn(msg, bfe.PreviewWarning) # define data source only for needed columns, this makes row-hashing cheaper table_def = nodes.GbqTable.from_table(table, columns=schema.names) @@ -228,10 +230,8 @@ def slice( self, start: Optional[int], stop: Optional[int], step: Optional[int] ) -> ArrayValue: if self.node.order_ambiguous and not (self.session._strictly_ordered): - warnings.warn( - "Window ordering may be ambiguous, this can cause unstable results.", - bigframes.exceptions.AmbiguousWindowWarning, - ) + msg = "Window ordering may be ambiguous, this can cause unstable results." + warnings.warn(msg, bfe.AmbiguousWindowWarning) return ArrayValue( nodes.SliceNode( self.node, @@ -252,10 +252,10 @@ def promote_offsets(self) -> Tuple[ArrayValue, str]: "Generating offsets not supported in partial ordering mode" ) else: - warnings.warn( - "Window ordering may be ambiguous, this can cause unstable results.", - bigframes.exceptions.AmbiguousWindowWarning, + msg = ( + "Window ordering may be ambiguous, this can cause unstable results." ) + warnings.warn(msg, category=bfe.AmbiguousWindowWarning) return ( ArrayValue( @@ -391,10 +391,8 @@ def project_window_op( "Generating offsets not supported in partial ordering mode" ) else: - warnings.warn( - "Window ordering may be ambiguous, this can cause unstable results.", - bigframes.exceptions.AmbiguousWindowWarning, - ) + msg = "Window ordering may be ambiguous, this can cause unstable results." + warnings.warn(msg, category=bfe.AmbiguousWindowWarning) output_name = self._gen_namespaced_uid() return ( diff --git a/bigframes/core/block_transforms.py b/bigframes/core/block_transforms.py index 785691edd6..a7f75e7264 100644 --- a/bigframes/core/block_transforms.py +++ b/bigframes/core/block_transforms.py @@ -86,9 +86,10 @@ def indicate_duplicates( # Discard this value if there are copies ANYWHERE window_spec = windows.unbound(grouping_keys=tuple(columns)) block, dummy = block.create_constant(1) + # use row number as will work even with partial ordering block, val_count_col_id = block.apply_window_op( dummy, - agg_ops.count_op, + agg_ops.sum_op, window_spec=window_spec, ) block, duplicate_indicator = block.project_expr( diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index ca860612f8..522d1743ff 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -63,7 +63,7 @@ import bigframes.core.utils as utils import bigframes.core.window_spec as windows import bigframes.dtypes -import bigframes.exceptions +import bigframes.exceptions as bfe import bigframes.features import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops @@ -137,10 +137,8 @@ def __init__( ) if len(index_columns) == 0: - warnings.warn( - "Creating object with Null Index. Null Index is a preview feature.", - category=bigframes.exceptions.NullIndexPreviewWarning, - ) + msg = "Creating object with Null Index. Null Index is a preview feature." + warnings.warn(msg, category=bfe.NullIndexPreviewWarning) self._index_columns = tuple(index_columns) # Index labels don't need complicated hierarchical access so can store as tuple self._index_labels = ( @@ -616,13 +614,13 @@ def _materialize_local( " # Setting it to None will download all the data\n" f"{constants.FEEDBACK_LINK}" ) - - warnings.warn( + msg = ( f"The data size ({table_mb:.2f} MB) exceeds the maximum download limit of" - f"({max_download_size} MB). It will be downsampled to {max_download_size} MB for download." - "\nPlease refer to the documentation for configuring the downloading limit.", - UserWarning, + f"({max_download_size} MB). It will be downsampled to {max_download_size} " + "MB for download.\nPlease refer to the documentation for configuring " + "the downloading limit." ) + warnings.warn(msg, category=UserWarning) total_rows = execute_result.total_rows # Remove downsampling config from subsequent invocations, as otherwise could result in many # iterations if downsampling undershoots diff --git a/bigframes/core/compile/aggregate_compiler.py b/bigframes/core/compile/aggregate_compiler.py index 482c38ae3d..f97856efa5 100644 --- a/bigframes/core/compile/aggregate_compiler.py +++ b/bigframes/core/compile/aggregate_compiler.py @@ -479,6 +479,15 @@ def _( return _apply_window_if_present(column.dense_rank(), window) + 1 +@compile_unary_agg.register +def _( + op: agg_ops.RowNumberOp, + column: ibis_types.Column, + window=None, +) -> ibis_types.IntegerValue: + return _apply_window_if_present(ibis_api.row_number(), window) + + @compile_unary_agg.register def _(op: agg_ops.FirstOp, column: ibis_types.Column, window=None) -> ibis_types.Value: return _apply_window_if_present(column.first(), window) diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py index d4c814145b..f879eb3feb 100644 --- a/bigframes/core/compile/compiled.py +++ b/bigframes/core/compile/compiled.py @@ -1330,7 +1330,7 @@ def _ibis_window_from_spec( if require_total_order or isinstance(window_spec.bounds, RowsWindowBounds): # Some operators need an unambiguous ordering, so the table's total ordering is appended order_by = tuple([*order_by, *self._ibis_order]) - elif isinstance(window_spec.bounds, RowsWindowBounds): + elif require_total_order or isinstance(window_spec.bounds, RowsWindowBounds): # If window spec has following or preceding bounds, we need to apply an unambiguous ordering. order_by = tuple(self._ibis_order) else: diff --git a/bigframes/core/compile/ibis_types.py b/bigframes/core/compile/ibis_types.py index 544af69091..a6d3949bc0 100644 --- a/bigframes/core/compile/ibis_types.py +++ b/bigframes/core/compile/ibis_types.py @@ -32,6 +32,7 @@ import pyarrow as pa import bigframes.dtypes +import bigframes.exceptions as bfe # Type hints for Ibis data types supported by BigQuery DataFrame IbisDtype = Union[ @@ -305,10 +306,11 @@ def ibis_dtype_to_bigframes_dtype( # Temporary: Will eventually support an explicit json type instead of casting to string. if isinstance(ibis_dtype, ibis_dtypes.JSON): - warnings.warn( - "Interpreting JSON as string. This behavior may change in future versions.", - bigframes.exceptions.PreviewWarning, + msg = ( + "Interpreting JSON column(s) as pyarrow.large_string. This behavior may change " + "in future versions." ) + warnings.warn(msg, category=bfe.PreviewWarning) return bigframes.dtypes.JSON_DTYPE if ibis_dtype in IBIS_TO_BIGFRAMES: diff --git a/bigframes/core/compile/polars/__init__.py b/bigframes/core/compile/polars/__init__.py index e15f229faf..8c37e046ab 100644 --- a/bigframes/core/compile/polars/__init__.py +++ b/bigframes/core/compile/polars/__init__.py @@ -22,4 +22,5 @@ __all__ = ["PolarsCompiler"] except Exception: - warnings.warn("Polars compiler not available as polars is not installed.") + msg = "Polars compiler not available as polars is not installed." + warnings.warn(msg) diff --git a/bigframes/core/compile/scalar_op_compiler.py b/bigframes/core/compile/scalar_op_compiler.py index d824009fec..d594cb3d68 100644 --- a/bigframes/core/compile/scalar_op_compiler.py +++ b/bigframes/core/compile/scalar_op_compiler.py @@ -20,7 +20,6 @@ import bigframes_vendored.constants as constants import bigframes_vendored.ibis.expr.api as ibis_api import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes -import bigframes_vendored.ibis.expr.operations as ibis_ops import bigframes_vendored.ibis.expr.operations.generic as ibis_generic import bigframes_vendored.ibis.expr.operations.udf as ibis_udf import bigframes_vendored.ibis.expr.types as ibis_types @@ -1181,13 +1180,13 @@ def json_set_op_impl(x: ibis_types.Value, y: ibis_types.Value, op: ops.JSONSet): ) else: # Enabling JSON type eliminates the need for less efficient string conversions. - return ibis_ops.ToJsonString( + return to_json_string( json_set( # type: ignore - json_obj=parse_json(x), + json_obj=parse_json(json_str=x), json_path=op.json_path, json_value=y, ) - ).to_expr() + ) @scalar_op_compiler.register_unary_op(ops.JSONExtract, pass_op=True) @@ -1210,6 +1209,16 @@ def json_extract_string_array_op_impl( return json_extract_string_array(json_obj=x, json_path=op.json_path) +@scalar_op_compiler.register_unary_op(ops.ParseJSON, pass_op=True) +def parse_json_op_impl(x: ibis_types.Value, op: ops.ParseJSON): + return parse_json(json_str=x) + + +@scalar_op_compiler.register_unary_op(ops.ToJSONString) +def to_json_string_op_impl(json_obj: ibis_types.Value): + return to_json_string(json_obj=json_obj) + + # Blob Ops @scalar_op_compiler.register_unary_op(ops.obj_fetch_metadata_op) def obj_fetch_metadata_op_impl(obj_ref: ibis_types.Value): @@ -1909,6 +1918,13 @@ def json_extract_string_array( # type: ignore[empty-body] """Extracts a JSON array and converts it to a SQL ARRAY of STRINGs.""" +@ibis_udf.scalar.builtin(name="to_json_string") +def to_json_string( # type: ignore[empty-body] + json_obj: ibis_dtypes.JSON, +) -> ibis_dtypes.String: + """Convert JSON to STRING.""" + + @ibis_udf.scalar.builtin(name="ML.DISTANCE") def vector_distance(vector1, vector2, type: str) -> ibis_dtypes.Float64: # type: ignore[empty-body] """Computes the distance between two vectors using specified type ("EUCLIDEAN", "MANHATTAN", or "COSINE")""" diff --git a/bigframes/core/global_session.py b/bigframes/core/global_session.py index e70cdad59e..8b32fee5b4 100644 --- a/bigframes/core/global_session.py +++ b/bigframes/core/global_session.py @@ -22,6 +22,7 @@ import google.auth.exceptions import bigframes._config +import bigframes.exceptions as bfe import bigframes.session _global_session: Optional[bigframes.session.Session] = None @@ -38,11 +39,11 @@ def _try_close_session(session: bigframes.session.Session): session_id = session.session_id location = session._location project_id = session._project - warnings.warn( + msg = ( f"Session cleanup failed for session with id: {session_id}, " - f"location: {location}, project: {project_id}", - category=bigframes.exceptions.CleanupFailedWarning, + f"location: {location}, project: {project_id}" ) + warnings.warn(msg, category=bfe.CleanupFailedWarning) traceback.print_tb(e.__traceback__) diff --git a/bigframes/core/indexers.py b/bigframes/core/indexers.py index 47ece70fb8..9c7fba8ec1 100644 --- a/bigframes/core/indexers.py +++ b/bigframes/core/indexers.py @@ -29,7 +29,7 @@ import bigframes.core.scalar import bigframes.dataframe import bigframes.dtypes -import bigframes.exceptions +import bigframes.exceptions as bfe import bigframes.operations as ops import bigframes.series @@ -407,11 +407,12 @@ def _struct_accessor_check_and_warn( return if not bigframes.dtypes.is_string_like(series.index.dtype): - warnings.warn( - "Are you trying to access struct fields? If so, please use Series.struct.field(...) method instead.", - category=bigframes.exceptions.BadIndexerKeyWarning, - stacklevel=7, # Stack depth from series.__getitem__ to here + msg = ( + "Are you trying to access struct fields? If so, please use Series.struct.field(...) " + "method instead." ) + # Stack depth from series.__getitem__ to here + warnings.warn(msg, stacklevel=7, category=bfe.BadIndexerKeyWarning) @typing.overload diff --git a/bigframes/core/utils.py b/bigframes/core/utils.py index e684ac55a4..f9ca6cb5f0 100644 --- a/bigframes/core/utils.py +++ b/bigframes/core/utils.py @@ -11,14 +11,18 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import functools import re import typing from typing import Hashable, Iterable, List +import warnings import bigframes_vendored.pandas.io.common as vendored_pandas_io_common import pandas as pd import typing_extensions +import bigframes.exceptions as bfe + UNNAMED_COLUMN_ID = "bigframes_unnamed_column" UNNAMED_INDEX_ID = "bigframes_unnamed_index" @@ -164,3 +168,19 @@ def merge_column_labels( result_labels.append(col_label) return pd.Index(result_labels) + + +def preview(*, name: str): + """Decorate to warn of a preview API.""" + + def decorator(func): + msg = f"{name} is in preview. Its behavior may change in future versions." + + @functools.wraps(func) + def wrapper(*args, **kwargs): + warnings.warn(msg, category=bfe.PreviewWarning) + return func(*args, **kwargs) + + return wrapper + + return decorator diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index efbe56abf7..01e9bd6308 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -68,7 +68,7 @@ import bigframes.core.window import bigframes.core.window_spec as windows import bigframes.dtypes -import bigframes.exceptions +import bigframes.exceptions as bfe import bigframes.formatting_helpers as formatter import bigframes.operations as ops import bigframes.operations.aggregations @@ -367,14 +367,35 @@ def __iter__(self): def astype( self, - dtype: Union[bigframes.dtypes.DtypeString, bigframes.dtypes.Dtype], + dtype: Union[ + bigframes.dtypes.DtypeString, + bigframes.dtypes.Dtype, + dict[str, Union[bigframes.dtypes.DtypeString, bigframes.dtypes.Dtype]], + ], *, errors: Literal["raise", "null"] = "raise", ) -> DataFrame: if errors not in ["raise", "null"]: raise ValueError("Arg 'error' must be one of 'raise' or 'null'") - return self._apply_unary_op( - ops.AsTypeOp(to_type=dtype, safe=(errors == "null")) + + safe_cast = errors == "null" + + # Type strings check + if dtype in bigframes.dtypes.DTYPE_STRINGS: + return self._apply_unary_op(ops.AsTypeOp(dtype, safe_cast)) + + # Type instances check + if type(dtype) in bigframes.dtypes.DTYPES: + return self._apply_unary_op(ops.AsTypeOp(dtype, safe_cast)) + + if isinstance(dtype, dict): + result = self.copy() + for col, to_type in dtype.items(): + result[col] = result[col].astype(to_type) + return result + + raise TypeError( + f"Invalid type {type(dtype)} for dtype input. {constants.FEEDBACK_LINK}" ) def _to_sql_query( @@ -718,10 +739,23 @@ def _repr_html_(self) -> str: if opts.repr_mode == "deferred": return formatter.repr_query_job(self._compute_dry_run()) + df = self.copy() + if bigframes.options.experiments.blob: + import bigframes.bigquery as bbq + + blob_cols = [ + col + for col in df.columns + if df[col].dtype == bigframes.dtypes.OBJ_REF_DTYPE + ] + for col in blob_cols: + df[col] = df[col]._apply_unary_op(ops.ObjGetAccessUrl(mode="R")) + df[col] = bbq.json_extract(df[col], "$.access_urls.read_url") + # TODO(swast): pass max_columns and get the true column count back. Maybe # get 1 more column than we have requested so that pandas can add the # ... for us? - pandas_df, row_count, query_job = self._block.retrieve_repr_request_results( + pandas_df, row_count, query_job = df._block.retrieve_repr_request_results( max_results ) @@ -730,8 +764,31 @@ def _repr_html_(self) -> str: column_count = len(pandas_df.columns) with display_options.pandas_repr(opts): - # _repr_html_ stub is missing so mypy thinks it's a Series. Ignore mypy. - html_string = pandas_df._repr_html_() # type:ignore + # Allows to preview images in the DataFrame. The implementation changes the string repr as well, that it doesn't truncate strings or escape html charaters such as "<" and ">". We may need to implement a full-fledged repr module to better support types not in pandas. + if bigframes.options.experiments.blob: + + def url_to_image_html(url: str) -> str: + # url is a json string, which already contains double-quotes "" + return f"" + + formatters = {blob_col: url_to_image_html for blob_col in blob_cols} + + # set max_colwidth so not to truncate the image url + with pandas.option_context("display.max_colwidth", None): + max_rows = pandas.get_option("display.max_rows") + max_cols = pandas.get_option("display.max_columns") + show_dimensions = pandas.get_option("display.show_dimensions") + html_string = pandas_df.to_html( + escape=False, + notebook=True, + max_rows=max_rows, + max_cols=max_cols, + show_dimensions=show_dimensions, + formatters=formatters, # type: ignore + ) + else: + # _repr_html_ stub is missing so mypy thinks it's a Series. Ignore mypy. + html_string = pandas_df._repr_html_() # type:ignore html_string += f"[{row_count} rows x {column_count} columns in total]" return html_string @@ -1424,10 +1481,8 @@ def to_arrow( Returns: pyarrow.Table: A pyarrow Table with all rows and columns of this DataFrame. """ - warnings.warn( - "to_arrow is in preview. Types and unnamed / duplicate name columns may change in future.", - category=bigframes.exceptions.PreviewWarning, - ) + msg = "to_arrow is in preview. Types and unnamed / duplicate name columns may change in future." + warnings.warn(msg, category=bfe.PreviewWarning) pa_table, query_job = self._block.to_arrow(ordered=ordered) self._set_internal_query_job(query_job) @@ -3863,10 +3918,8 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame: def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): if utils.get_axis_number(axis) == 1: - warnings.warn( - "axis=1 scenario is in preview.", - category=bigframes.exceptions.PreviewWarning, - ) + msg = "axis=1 scenario is in preview." + warnings.warn(msg, category=bfe.PreviewWarning) # Check if the function is a remote function if not hasattr(func, "bigframes_remote_function"): diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py index ff3e7a31fb..6e179225ea 100644 --- a/bigframes/dtypes.py +++ b/bigframes/dtypes.py @@ -36,6 +36,8 @@ pd.ArrowDtype, gpd.array.GeometryDtype, ] + +DTYPES = typing.get_args(Dtype) # Represents both column types (dtypes) and local-only types # None represents the type of a None scalar. ExpressionType = typing.Optional[Dtype] @@ -238,6 +240,8 @@ class SimpleDtypeInfo: "binary[pyarrow]", ] +DTYPE_STRINGS = typing.get_args(DtypeString) + BOOL_BIGFRAMES_TYPES = [BOOL_DTYPE] # Corresponds to the pandas concept of numeric type (such as when 'numeric_only' is specified in an operation) diff --git a/bigframes/functions/_remote_function_session.py b/bigframes/functions/_remote_function_session.py index 84bf2e1fc9..662c32a6a6 100644 --- a/bigframes/functions/_remote_function_session.py +++ b/bigframes/functions/_remote_function_session.py @@ -300,7 +300,7 @@ def remote_function( https://cloud.google.com/functions/docs/networking/network-settings#ingress_settings. """ # Some defaults may be used from the session if not provided otherwise - import bigframes.exceptions as bf_exceptions + import bigframes.exceptions as bfe import bigframes.pandas as bpd import bigframes.series as bf_series import bigframes.session @@ -445,11 +445,8 @@ def wrapper(func): (input_type := input_types[0]) == bf_series.Series or input_type == pandas.Series ): - warnings.warn( - "input_types=Series is in preview.", - stacklevel=1, - category=bf_exceptions.PreviewWarning, - ) + msg = "input_types=Series is in preview." + warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning) # we will model the row as a json serialized string containing the data # and the metadata representing the row diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py index b72b8ce8da..9b68843a7d 100644 --- a/bigframes/functions/remote_function.py +++ b/bigframes/functions/remote_function.py @@ -32,6 +32,7 @@ import bigframes.core.compile.ibis_types import bigframes.dtypes +import bigframes.exceptions as bfe import bigframes.functions.remote_function_template from . import _remote_function_session as rf_session @@ -197,11 +198,11 @@ def func(*bigframes_args, **bigframes_kwargs): ) function_input_dtypes.append(input_dtype) if has_unknown_dtypes: - warnings.warn( - "The function has one or more missing input data types." - f" BigQuery DataFrames will assume default data type {bigframes.dtypes.DEFAULT_DTYPE} for them.", - category=bigframes.exceptions.UnknownDataTypeWarning, + msg = ( + "The function has one or more missing input data types. BigQuery DataFrames " + f"will assume default data type {bigframes.dtypes.DEFAULT_DTYPE} for them." ) + warnings.warn(msg, category=bfe.UnknownDataTypeWarning) func.input_dtypes = tuple(function_input_dtypes) # type: ignore func.output_dtype = bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( # type: ignore diff --git a/bigframes/ml/base.py b/bigframes/ml/base.py index 4058647adb..f06de99181 100644 --- a/bigframes/ml/base.py +++ b/bigframes/ml/base.py @@ -22,7 +22,8 @@ """ import abc -from typing import cast, Optional, TypeVar +from typing import Callable, cast, Mapping, Optional, TypeVar +import warnings import bigframes_vendored.sklearn.base @@ -77,6 +78,9 @@ def fit_transform(self, x_train: Union[DataFrame, Series], y_train: Union[DataFr ... """ + def __init__(self): + self._bqml_model: Optional[core.BqmlModel] = None + def __repr__(self): """Print the estimator's constructor with all non-default parameter values.""" @@ -95,9 +99,6 @@ def __repr__(self): class Predictor(BaseEstimator): """A BigQuery DataFrames ML Model base class that can be used to predict outputs.""" - def __init__(self): - self._bqml_model: Optional[core.BqmlModel] = None - @abc.abstractmethod def predict(self, X): pass @@ -213,12 +214,61 @@ def fit( return self._fit(X, y) +class RetriableRemotePredictor(BaseEstimator): + @property + @abc.abstractmethod + def _predict_func(self) -> Callable[[bpd.DataFrame, Mapping], bpd.DataFrame]: + pass + + @property + @abc.abstractmethod + def _status_col(self) -> str: + pass + + def _predict_and_retry( + self, X: bpd.DataFrame, options: Mapping, max_retries: int + ) -> bpd.DataFrame: + assert self._bqml_model is not None + + df_result = bpd.DataFrame(session=self._bqml_model.session) # placeholder + df_fail = X + for _ in range(max_retries + 1): + df = self._predict_func(df_fail, options) + + success = df[self._status_col].str.len() == 0 + df_succ = df[success] + df_fail = df[~success] + + if df_succ.empty: + if max_retries > 0: + msg = "Can't make any progress, stop retrying." + warnings.warn(msg, category=RuntimeWarning) + break + + df_result = ( + bpd.concat([df_result, df_succ]) if not df_result.empty else df_succ + ) + + if df_fail.empty: + break + + if not df_fail.empty: + msg = ( + f"Some predictions failed. Check column {self._status_col} for detailed " + "status. You may want to filter the failed rows and retry." + ) + warnings.warn(msg, category=RuntimeWarning) + + df_result = cast( + bpd.DataFrame, + bpd.concat([df_result, df_fail]) if not df_result.empty else df_fail, + ) + return df_result + + class BaseTransformer(BaseEstimator): """Transformer base class.""" - def __init__(self): - self._bqml_model: Optional[core.BqmlModel] = None - @abc.abstractmethod def _keys(self): pass diff --git a/bigframes/ml/llm.py b/bigframes/ml/llm.py index d42138b006..8d1df6e0b9 100644 --- a/bigframes/ml/llm.py +++ b/bigframes/ml/llm.py @@ -16,7 +16,7 @@ from __future__ import annotations -from typing import cast, Literal, Optional +from typing import Callable, cast, Literal, Mapping, Optional import warnings import bigframes_vendored.constants as constants @@ -180,12 +180,11 @@ def _create_bqml_model(self): ) if self.model_name not in _TEXT_GENERATOR_ENDPOINTS: - warnings.warn( - _MODEL_NOT_SUPPORTED_WARNING.format( - model_name=self.model_name, - known_models=", ".join(_TEXT_GENERATOR_ENDPOINTS), - ) + msg = _MODEL_NOT_SUPPORTED_WARNING.format( + model_name=self.model_name, + known_models=", ".join(_TEXT_GENERATOR_ENDPOINTS), ) + warnings.warn(msg) options = { "endpoint": self.model_name, @@ -360,10 +359,11 @@ def predict( df = self._bqml_model.generate_text(X, options) if (df[_ML_GENERATE_TEXT_STATUS] != "").any(): - warnings.warn( - f"Some predictions failed. Check column {_ML_GENERATE_TEXT_STATUS} for detailed status. You may want to filter the failed rows and retry.", - RuntimeWarning, + msg = ( + f"Some predictions failed. Check column {_ML_GENERATE_TEXT_STATUS} for " + "detailed status. You may want to filter the failed rows and retry." ) + warnings.warn(msg, category=RuntimeWarning) return df @@ -513,12 +513,11 @@ def _create_bqml_model(self): ) if self.model_name not in _PALM2_EMBEDDING_GENERATOR_ENDPOINTS: - warnings.warn( - _MODEL_NOT_SUPPORTED_WARNING.format( - model_name=self.model_name, - known_models=", ".join(_PALM2_EMBEDDING_GENERATOR_ENDPOINTS), - ) + msg = _MODEL_NOT_SUPPORTED_WARNING.format( + model_name=self.model_name, + known_models=", ".join(_PALM2_EMBEDDING_GENERATOR_ENDPOINTS), ) + warnings.warn(msg) endpoint = ( self.model_name + "@" + self.version if self.version else self.model_name @@ -590,10 +589,11 @@ def predict(self, X: utils.ArrayType) -> bpd.DataFrame: ) if (df[_ML_EMBED_TEXT_STATUS] != "").any(): - warnings.warn( - f"Some predictions failed. Check column {_ML_EMBED_TEXT_STATUS} for detailed status. You may want to filter the failed rows and retry.", - RuntimeWarning, + msg = ( + f"Some predictions failed. Check column {_ML_EMBED_TEXT_STATUS} for " + "detailed status. You may want to filter the failed rows and retry." ) + warnings.warn(msg, category=RuntimeWarning) return df @@ -616,7 +616,7 @@ def to_gbq( @log_adapter.class_logger -class TextEmbeddingGenerator(base.BaseEstimator): +class TextEmbeddingGenerator(base.RetriableRemotePredictor): """Text embedding generator LLM model. Args: @@ -678,12 +678,11 @@ def _create_bqml_model(self): ) if self.model_name not in _TEXT_EMBEDDING_ENDPOINTS: - warnings.warn( - _MODEL_NOT_SUPPORTED_WARNING.format( - model_name=self.model_name, - known_models=", ".join(_TEXT_EMBEDDING_ENDPOINTS), - ) + msg = _MODEL_NOT_SUPPORTED_WARNING.format( + model_name=self.model_name, + known_models=", ".join(_TEXT_EMBEDDING_ENDPOINTS), ) + warnings.warn(msg) options = { "endpoint": self.model_name, @@ -715,18 +714,33 @@ def _from_bq( model._bqml_model = core.BqmlModel(session, bq_model) return model - def predict(self, X: utils.ArrayType) -> bpd.DataFrame: + @property + def _predict_func(self) -> Callable[[bpd.DataFrame, Mapping], bpd.DataFrame]: + return self._bqml_model.generate_embedding + + @property + def _status_col(self) -> str: + return _ML_GENERATE_EMBEDDING_STATUS + + def predict(self, X: utils.ArrayType, *, max_retries: int = 0) -> bpd.DataFrame: """Predict the result from input DataFrame. Args: X (bigframes.dataframe.DataFrame or bigframes.series.Series or pandas.core.frame.DataFrame or pandas.core.series.Series): Input DataFrame or Series, can contain one or more columns. If multiple columns are in the DataFrame, it must contain a "content" column for prediction. + max_retries (int, default 0): + Max number of retries if the prediction for any rows failed. Each try needs to make progress (i.e. has successfully predicted rows) to continue the retry. + Each retry will append newly succeeded rows. When the max retries are reached, the remaining rows (the ones without successful predictions) will be appended to the end of the result. + Returns: bigframes.dataframe.DataFrame: DataFrame of shape (n_samples, n_input_columns + n_prediction_columns). Returns predicted values. """ + if max_retries < 0: + raise ValueError( + f"max_retries must be larger than or equal to 0, but is {max_retries}." + ) - # Params reference: https://cloud.google.com/vertex-ai/docs/generative-ai/learn/models (X,) = utils.batch_convert_to_dataframe(X, session=self._bqml_model.session) if len(X.columns) == 1: @@ -738,15 +752,7 @@ def predict(self, X: utils.ArrayType) -> bpd.DataFrame: "flatten_json_output": True, } - df = self._bqml_model.generate_embedding(X, options) - - if (df[_ML_GENERATE_EMBEDDING_STATUS] != "").any(): - warnings.warn( - f"Some predictions failed. Check column {_ML_GENERATE_EMBEDDING_STATUS} for detailed status. You may want to filter the failed rows and retry.", - RuntimeWarning, - ) - - return df + return self._predict_and_retry(X, options=options, max_retries=max_retries) def to_gbq(self, model_name: str, replace: bool = False) -> TextEmbeddingGenerator: """Save the model to BigQuery. @@ -765,7 +771,7 @@ def to_gbq(self, model_name: str, replace: bool = False) -> TextEmbeddingGenerat @log_adapter.class_logger -class GeminiTextGenerator(base.BaseEstimator): +class GeminiTextGenerator(base.RetriableRemotePredictor): """Gemini text generator LLM model. Args: @@ -806,13 +812,15 @@ def __init__( max_iterations: int = 300, ): if model_name in _GEMINI_PREVIEW_ENDPOINTS: - warnings.warn( - f"""Model {model_name} is subject to the "Pre-GA Offerings Terms" in the General Service Terms section of the - Service Specific Terms(https://cloud.google.com/terms/service-terms#1). Pre-GA products and features are available "as is" - and might have limited support. For more information, see the launch stage descriptions - (https://cloud.google.com/products#product-launch-stages).""", - category=exceptions.PreviewWarning, + msg = ( + f'Model {model_name} is subject to the "Pre-GA Offerings Terms" in ' + "the General Service Terms section of the Service Specific Terms" + "(https://cloud.google.com/terms/service-terms#1). Pre-GA products and " + 'features are available "as is" and might have limited support. For ' + "more information, see the launch stage descriptions " + "(https://cloud.google.com/products#product-launch-stages)." ) + warnings.warn(msg, category=exceptions.PreviewWarning) self.model_name = model_name self.session = session or bpd.get_global_session() self.max_iterations = max_iterations @@ -849,12 +857,11 @@ def _create_bqml_model(self): ) if self.model_name not in _GEMINI_ENDPOINTS: - warnings.warn( - _MODEL_NOT_SUPPORTED_WARNING.format( - model_name=self.model_name, - known_models=", ".join(_GEMINI_ENDPOINTS), - ) + msg = _MODEL_NOT_SUPPORTED_WARNING.format( + model_name=self.model_name, + known_models=", ".join(_GEMINI_ENDPOINTS), ) + warnings.warn(msg) options = {"endpoint": self.model_name} @@ -891,6 +898,14 @@ def _bqml_options(self) -> dict: } return options + @property + def _predict_func(self) -> Callable[[bpd.DataFrame, Mapping], bpd.DataFrame]: + return self._bqml_model.generate_text + + @property + def _status_col(self) -> str: + return _ML_GENERATE_TEXT_STATUS + def fit( self, X: utils.ArrayType, @@ -1028,41 +1043,7 @@ def predict( "ground_with_google_search": ground_with_google_search, } - df_result = bpd.DataFrame(session=self._bqml_model.session) # placeholder - df_fail = X - for _ in range(max_retries + 1): - df = self._bqml_model.generate_text(df_fail, options) - - success = df[_ML_GENERATE_TEXT_STATUS].str.len() == 0 - df_succ = df[success] - df_fail = df[~success] - - if df_succ.empty: - if max_retries > 0: - warnings.warn( - "Can't make any progress, stop retrying.", RuntimeWarning - ) - break - - df_result = ( - bpd.concat([df_result, df_succ]) if not df_result.empty else df_succ - ) - - if df_fail.empty: - break - - if not df_fail.empty: - warnings.warn( - f"Some predictions failed. Check column {_ML_GENERATE_TEXT_STATUS} for detailed status. You may want to filter the failed rows and retry.", - RuntimeWarning, - ) - - df_result = cast( - bpd.DataFrame, - bpd.concat([df_result, df_fail]) if not df_result.empty else df_fail, - ) - - return df_result + return self._predict_and_retry(X, options=options, max_retries=max_retries) def score( self, @@ -1144,7 +1125,7 @@ def to_gbq(self, model_name: str, replace: bool = False) -> GeminiTextGenerator: @log_adapter.class_logger -class Claude3TextGenerator(base.BaseEstimator): +class Claude3TextGenerator(base.RetriableRemotePredictor): """Claude3 text generator LLM model. Go to Google Cloud Console -> Vertex AI -> Model Garden page to enabe the models before use. Must have the Consumer Procurement Entitlement Manager Identity and Access Management (IAM) role to enable the models. @@ -1223,13 +1204,11 @@ def _create_bqml_model(self): ) if self.model_name not in _CLAUDE_3_ENDPOINTS: - warnings.warn( - _MODEL_NOT_SUPPORTED_WARNING.format( - model_name=self.model_name, - known_models=", ".join(_CLAUDE_3_ENDPOINTS), - ) + msg = _MODEL_NOT_SUPPORTED_WARNING.format( + model_name=self.model_name, + known_models=", ".join(_CLAUDE_3_ENDPOINTS), ) - + warnings.warn(msg) options = { "endpoint": self.model_name, } @@ -1273,6 +1252,14 @@ def _bqml_options(self) -> dict: } return options + @property + def _predict_func(self) -> Callable[[bpd.DataFrame, Mapping], bpd.DataFrame]: + return self._bqml_model.generate_text + + @property + def _status_col(self) -> str: + return _ML_GENERATE_TEXT_STATUS + def predict( self, X: utils.ArrayType, @@ -1280,6 +1267,7 @@ def predict( max_output_tokens: int = 128, top_k: int = 40, top_p: float = 0.95, + max_retries: int = 0, ) -> bpd.DataFrame: """Predict the result from input DataFrame. @@ -1307,6 +1295,10 @@ def predict( Specify a lower value for less random responses and a higher value for more random responses. Default 0.95. Possible values [0.0, 1.0]. + max_retries (int, default 0): + Max number of retries if the prediction for any rows failed. Each try needs to make progress (i.e. has successfully predicted rows) to continue the retry. + Each retry will append newly succeeded rows. When the max retries are reached, the remaining rows (the ones without successful predictions) will be appended to the end of the result. + Returns: bigframes.dataframe.DataFrame: DataFrame of shape (n_samples, n_input_columns + n_prediction_columns). Returns predicted values. @@ -1324,6 +1316,11 @@ def predict( if top_p < 0.0 or top_p > 1.0: raise ValueError(f"top_p must be [0.0, 1.0], but is {top_p}.") + if max_retries < 0: + raise ValueError( + f"max_retries must be larger than or equal to 0, but is {max_retries}." + ) + (X,) = utils.batch_convert_to_dataframe(X, session=self._bqml_model.session) if len(X.columns) == 1: @@ -1338,15 +1335,7 @@ def predict( "flatten_json_output": True, } - df = self._bqml_model.generate_text(X, options) - - if (df[_ML_GENERATE_TEXT_STATUS] != "").any(): - warnings.warn( - f"Some predictions failed. Check column {_ML_GENERATE_TEXT_STATUS} for detailed status. You may want to filter the failed rows and retry.", - RuntimeWarning, - ) - - return df + return self._predict_and_retry(X, options=options, max_retries=max_retries) def to_gbq(self, model_name: str, replace: bool = False) -> Claude3TextGenerator: """Save the model to BigQuery. diff --git a/bigframes/ml/remote.py b/bigframes/ml/remote.py index bb614b9da5..f4f55ad34e 100644 --- a/bigframes/ml/remote.py +++ b/bigframes/ml/remote.py @@ -139,9 +139,10 @@ def predict( # unlike LLM models, the general remote model status is null for successful runs. if (df[_REMOTE_MODEL_STATUS].notna()).any(): - warnings.warn( - f"Some predictions failed. Check column {_REMOTE_MODEL_STATUS} for detailed status. You may want to filter the failed rows and retry.", - RuntimeWarning, + msg = ( + f"Some predictions failed. Check column {_REMOTE_MODEL_STATUS} for " + "detailed status. You may want to filter the failed rows and retry." ) + warnings.warn(msg, category=RuntimeWarning) return df diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index 2884d56551..37a40b7d01 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -740,6 +740,34 @@ def output_type(self, *input_types): ) +@dataclasses.dataclass(frozen=True) +class ParseJSON(UnaryOp): + name: typing.ClassVar[str] = "parse_json" + + def output_type(self, *input_types): + input_type = input_types[0] + if input_type != dtypes.STRING_DTYPE: + raise TypeError( + "Input type must be an valid JSON-formatted string type." + + f" Received type: {input_type}" + ) + return dtypes.JSON_DTYPE + + +@dataclasses.dataclass(frozen=True) +class ToJSONString(UnaryOp): + name: typing.ClassVar[str] = "to_json_string" + + def output_type(self, *input_types): + input_type = input_types[0] + if not dtypes.is_json_like(input_type): + raise TypeError( + "Input type must be an valid JSON object or JSON-formatted string type." + + f" Received type: {input_type}" + ) + return dtypes.STRING_DTYPE + + ## Blob Ops @dataclasses.dataclass(frozen=True) class ObjGetAccessUrl(UnaryOp): diff --git a/bigframes/operations/_matplotlib/core.py b/bigframes/operations/_matplotlib/core.py index b7c926be99..9c68a2c5ca 100644 --- a/bigframes/operations/_matplotlib/core.py +++ b/bigframes/operations/_matplotlib/core.py @@ -70,11 +70,10 @@ def _compute_sample_data(self, data): if self._sampling_warning_msg is not None: total_n = data.shape[0] if sampling_n < total_n: - warnings.warn( - self._sampling_warning_msg.format( - sampling_n=sampling_n, total_n=total_n - ) + msg = self._sampling_warning_msg.format( + sampling_n=sampling_n, total_n=total_n ) + warnings.warn(msg) sampling_random_state = self.kwargs.pop( "sampling_random_state", DEFAULT_SAMPLING_STATE diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index 6b7f56d708..9de58fe5db 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -379,6 +379,19 @@ def skips_nulls(self): return True +# This should really by a NullaryWindowOp, but APIs don't support that yet. +@dataclasses.dataclass(frozen=True) +class RowNumberOp(UnaryWindowOp): + name: ClassVar[str] = "rownumber" + + @property + def skips_nulls(self): + return False + + def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: + return dtypes.INT_DTYPE + + @dataclasses.dataclass(frozen=True) class RankOp(UnaryWindowOp): name: ClassVar[str] = "rank" diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py index c074c72971..f78db2b6fc 100644 --- a/bigframes/operations/blob.py +++ b/bigframes/operations/blob.py @@ -14,9 +14,14 @@ from __future__ import annotations +import os +from typing import cast, Optional, Union + import IPython.display as ipy_display import requests +from bigframes import clients +import bigframes.dataframe from bigframes.operations import base import bigframes.operations as ops import bigframes.series @@ -66,3 +71,63 @@ def display(self, n: int = 3): read_url = str(read_url).strip('"') response = requests.get(read_url) ipy_display.display(ipy_display.Image(response.content)) + + def image_blur( + self, + ksize: tuple[int, int], + *, + dst: Union[str, bigframes.series.Series], + connection: Optional[str] = None, + ) -> bigframes.series.Series: + """Blurs images. + + .. note:: + BigFrames Blob is still under experiments. It may not work and subject to change in the future. + + Args: + ksize (tuple(int, int)): Kernel size. + dst (str or bigframes.series.Series): Destination GCS folder str or blob series. + connection (str or None, default None): BQ connection used for function internet transactions, and the output blob if "dst" is str. If None, uses default connection of the session. + + Returns: + BigFrames Blob Series + """ + import bigframes.blob._functions as blob_func + + connection = connection or self._block.session._bq_connection + connection = clients.resolve_full_bq_connection_name( + connection, + default_project=self._block.session._project, + default_location=self._block.session._location, + ) + + if isinstance(dst, str): + dst = os.path.join(dst, "") + src_uri = bigframes.series.Series(self._block).struct.explode()["uri"] + # Replace src folder with dst folder, keep the file names. + dst_uri = src_uri.str.replace(r"^.*\/(.*)$", rf"{dst}\1", regex=True) + dst = cast( + bigframes.series.Series, dst_uri.str.to_blob(connection=connection) + ) + + image_blur_udf = blob_func.TransformFunction( + blob_func.image_blur_def, + session=self._block.session, + connection=connection, + ).udf() + + src_rt = bigframes.series.Series(self._block)._apply_unary_op( + ops.ObjGetAccessUrl(mode="R") + ) + dst_rt = dst._apply_unary_op(ops.ObjGetAccessUrl(mode="RW")) + + src_rt = src_rt._apply_unary_op(ops.ToJSONString()) + dst_rt = dst_rt._apply_unary_op(ops.ToJSONString()) + + df = src_rt.to_frame().join(dst_rt.to_frame(), how="outer") + df["ksize_x"], df["ksize_y"] = ksize + + res = df.apply(image_blur_udf, axis=1) + res.cache() # to execute the udf + + return dst diff --git a/bigframes/operations/semantics.py b/bigframes/operations/semantics.py index 6a537db4f3..a2bf18a41d 100644 --- a/bigframes/operations/semantics.py +++ b/bigframes/operations/semantics.py @@ -140,10 +140,11 @@ def agg( column = columns[0] if ground_with_google_search: - warnings.warn( + msg = ( "Enables Grounding with Google Search may impact billing cost. See pricing " "details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models" ) + warnings.warn(msg) user_instruction = self._format_instruction(instruction, columns) @@ -370,10 +371,11 @@ def filter(self, instruction: str, model, ground_with_google_search: bool = Fals raise ValueError(f"Column {column} not found.") if ground_with_google_search: - warnings.warn( + msg = ( "Enables Grounding with Google Search may impact billing cost. See pricing " "details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models" ) + warnings.warn(msg) self._confirm_operation(len(self._df)) @@ -468,10 +470,11 @@ def map( raise ValueError(f"Column {column} not found.") if ground_with_google_search: - warnings.warn( + msg = ( "Enables Grounding with Google Search may impact billing cost. See pricing " "details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models" ) + warnings.warn(msg) self._confirm_operation(len(self._df)) @@ -569,10 +572,11 @@ def join( columns = self._parse_columns(instruction) if ground_with_google_search: - warnings.warn( + msg = ( "Enables Grounding with Google Search may impact billing cost. See pricing " "details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models" ) + warnings.warn(msg) work_estimate = len(self._df) * len(other) self._confirm_operation(work_estimate) @@ -811,10 +815,11 @@ def top_k( ) if ground_with_google_search: - warnings.warn( + msg = ( "Enables Grounding with Google Search may impact billing cost. See pricing " "details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models" ) + warnings.warn(msg) work_estimate = int(len(self._df) * (len(self._df) - 1) / 2) self._confirm_operation(work_estimate) diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py index 9c3c98ec55..395b573916 100644 --- a/bigframes/pandas/__init__.py +++ b/bigframes/pandas/__init__.py @@ -163,7 +163,8 @@ def get_default_session_id() -> str: the table id of all temporary tables created in the global session. Returns: - str, the default global session id, ex. 'sessiona1b2c' + str: + The default global session id, ex. 'sessiona1b2c' """ return get_global_session().session_id diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index edac7efa4b..d787f8e7f3 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -66,6 +66,7 @@ import bigframes.dataframe import bigframes.dtypes import bigframes.exceptions +import bigframes.exceptions as bfe import bigframes.functions._remote_function_session as bigframes_rf_session import bigframes.functions.remote_function as bigframes_rf import bigframes.session._io.bigquery as bf_io_bigquery @@ -150,25 +151,22 @@ def __init__( if context.location is None: self._location = "US" - warnings.warn( - f"No explicit location is set, so using location {self._location} for the session.", - # User's code - # -> get_global_session() - # -> connect() - # -> Session() - # - # Note: We could also have: - # User's code - # -> read_gbq() - # -> with_default_session() - # -> get_global_session() - # -> connect() - # -> Session() - # but we currently have no way to disambiguate these - # situations. - stacklevel=4, - category=bigframes.exceptions.DefaultLocationWarning, - ) + msg = f"No explicit location is set, so using location {self._location} for the session." + # User's code + # -> get_global_session() + # -> connect() + # -> Session() + # + # Note: We could also have: + # User's code + # -> read_gbq() + # -> with_default_session() + # -> get_global_session() + # -> connect() + # -> Session() + # but we currently have no way to disambiguate these + # situations. + warnings.warn(msg, stacklevel=4, category=bfe.DefaultLocationWarning) else: self._location = context.location @@ -236,10 +234,8 @@ def __init__( # Will expose as feature later, only False for internal testing self._strictly_ordered: bool = context.ordering_mode != "partial" if not self._strictly_ordered: - warnings.warn( - "Partial ordering mode is a preview feature and is subject to change.", - bigframes.exceptions.OrderingModePartialPreviewWarning, - ) + msg = "Partial ordering mode is a preview feature and is subject to change." + warnings.warn(msg, bfe.OrderingModePartialPreviewWarning) self._allow_ambiguity = not self._strictly_ordered self._default_index_type = ( @@ -604,11 +600,8 @@ def read_gbq_table_streaming( bigframes.streaming.dataframe.StreamingDataFrame: A StreamingDataFrame representing results of the table. """ - warnings.warn( - "The bigframes.streaming module is a preview feature, and subject to change.", - stacklevel=1, - category=bigframes.exceptions.PreviewWarning, - ) + msg = "The bigframes.streaming module is a preview feature, and subject to change." + warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning) import bigframes.streaming.dataframe as streaming_dataframe @@ -1370,13 +1363,14 @@ def remote_function( `all`, `internal-only`, `internal-and-gclb`. See for more details https://cloud.google.com/functions/docs/networking/network-settings#ingress_settings. Returns: - callable: A remote function object pointing to the cloud assets created - in the background to support the remote execution. The cloud assets can be - located through the following properties set in the object: + collections.abc.Callable: + A remote function object pointing to the cloud assets created + in the background to support the remote execution. The cloud assets can be + located through the following properties set in the object: - `bigframes_cloud_function` - The google cloud function deployed for the user defined code. + `bigframes_cloud_function` - The google cloud function deployed for the user defined code. - `bigframes_remote_function` - The bigquery remote function capable of calling into `bigframes_cloud_function`. + `bigframes_remote_function` - The bigquery remote function capable of calling into `bigframes_cloud_function`. """ return self._remote_function_session.remote_function( input_types, @@ -1545,12 +1539,13 @@ def read_gbq_function( a pandas Series. Returns: - callable: A function object pointing to the BigQuery function read - from BigQuery. + collections.abc.Callable: + A function object pointing to the BigQuery function read + from BigQuery. - The object is similar to the one created by the `remote_function` - decorator, including the `bigframes_remote_function` property, but - not including the `bigframes_cloud_function` property. + The object is similar to the one created by the `remote_function` + decorator, including the `bigframes_remote_function` property, but + not including the `bigframes_cloud_function` property. """ return bigframes_rf.read_gbq_function( @@ -1590,7 +1585,7 @@ def _start_query_ml_ddl( job_config.destination_encryption_configuration = None return bf_io_bigquery.start_query_with_client( - self.bqclient, sql, job_config, metrics=self._metrics + self.bqclient, sql, job_config=job_config, metrics=self._metrics ) def _create_object_table(self, path: str, connection: str) -> str: diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py index b7706d34ca..6a5ba3f4c7 100644 --- a/bigframes/session/_io/bigquery/__init__.py +++ b/bigframes/session/_io/bigquery/__init__.py @@ -40,7 +40,7 @@ IO_ORDERING_ID = "bqdf_row_nums" -MAX_LABELS_COUNT = 64 +MAX_LABELS_COUNT = 64 - 8 _LIST_TABLES_LIMIT = 10000 # calls to bqclient.list_tables # will be limited to this many tables @@ -204,7 +204,12 @@ def format_option(key: str, value: Union[bool, str]) -> str: return f"{key}={repr(value)}" -def add_labels(job_config, api_name: Optional[str] = None): +def add_and_trim_labels(job_config, api_name: Optional[str] = None): + """ + Add additional labels to the job configuration and trim the total number of labels + to ensure they do not exceed the maximum limit allowed by BigQuery, which is 64 + labels per job. + """ api_methods = log_adapter.get_and_reset_api_methods(dry_run=job_config.dry_run) job_config.labels = create_job_configs_labels( job_configs_labels=job_config.labels, @@ -217,7 +222,10 @@ def start_query_with_client( bq_client: bigquery.Client, sql: str, job_config: bigquery.job.QueryJobConfig, + location: Optional[str] = None, + project: Optional[str] = None, max_results: Optional[int] = None, + page_size: Optional[int] = None, timeout: Optional[float] = None, api_name: Optional[str] = None, metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, @@ -225,10 +233,17 @@ def start_query_with_client( """ Starts query job and waits for results. """ - add_labels(job_config, api_name=api_name) - try: - query_job = bq_client.query(sql, job_config=job_config, timeout=timeout) + # Note: Ensure no additional labels are added to job_config after this point, + # as `add_and_trim_labels` ensures the label count does not exceed 64. + add_and_trim_labels(job_config, api_name=api_name) + query_job = bq_client.query( + sql, + job_config=job_config, + location=location, + project=project, + timeout=timeout, + ) except google.api_core.exceptions.Forbidden as ex: if "Drive credentials" in ex.message: ex.message += CHECK_DRIVE_PERMISSIONS @@ -237,10 +252,15 @@ def start_query_with_client( opts = bigframes.options.display if opts.progress_bar is not None and not query_job.configuration.dry_run: results_iterator = formatting_helpers.wait_for_query_job( - query_job, max_results=max_results, progress_bar=opts.progress_bar + query_job, + max_results=max_results, + progress_bar=opts.progress_bar, + page_size=page_size, ) else: - results_iterator = query_job.result(max_results=max_results) + results_iterator = query_job.result( + max_results=max_results, page_size=page_size + ) if metrics is not None: metrics.count_job_stats(query_job) @@ -304,11 +324,15 @@ def create_bq_dataset_reference( bigquery.DatasetReference: The constructed reference to the anonymous dataset. """ job_config = google.cloud.bigquery.QueryJobConfig() - add_labels(job_config, api_name=api_name) - query_job = bq_client.query( - "SELECT 1", location=location, project=project, job_config=job_config + + _, query_job = start_query_with_client( + bq_client, + "SELECT 1", + location=location, + job_config=job_config, + project=project, + api_name=api_name, ) - query_job.result() # blocks until finished # The anonymous dataset is used by BigQuery to write query results and # session tables. BigQuery DataFrames also writes temp tables directly diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py index 4044b7bf43..6114427570 100644 --- a/bigframes/session/_io/bigquery/read_gbq_table.py +++ b/bigframes/session/_io/bigquery/read_gbq_table.py @@ -33,6 +33,7 @@ import bigframes.core.compile.default_ordering import bigframes.core.sql import bigframes.dtypes +import bigframes.exceptions as bfe import bigframes.session._io.bigquery import bigframes.session.clients import bigframes.version @@ -59,21 +60,21 @@ def get_table_metadata( # Cache hit could be unexpected. See internal issue 329545805. # Raise a warning with more information about how to avoid the # problems with the cache. - warnings.warn( + msg = ( f"Reading cached table from {snapshot_timestamp} to avoid " "incompatibilies with previous reads of this table. To read " "the latest version, set `use_cache=False` or close the " "current session with Session.close() or " - "bigframes.pandas.close_session().", - # There are many layers before we get to (possibly) the user's code: - # pandas.read_gbq_table - # -> with_default_session - # -> Session.read_gbq_table - # -> _read_gbq_table - # -> _get_snapshot_sql_and_primary_key - # -> get_snapshot_datetime_and_table_metadata - stacklevel=7, + "bigframes.pandas.close_session()." ) + # There are many layers before we get to (possibly) the user's code: + # pandas.read_gbq_table + # -> with_default_session + # -> Session.read_gbq_table + # -> _read_gbq_table + # -> _get_snapshot_sql_and_primary_key + # -> get_snapshot_datetime_and_table_metadata + warnings.warn(msg, stacklevel=7) return cached_table table = bqclient.get_table(table_ref) @@ -104,13 +105,13 @@ def validate_table( # Only true tables support time travel elif table.table_type != "TABLE": if table.table_type == "MATERIALIZED_VIEW": - warnings.warn( + msg = ( "Materialized views do not support FOR SYSTEM_TIME AS OF queries. " "Attempting query without time travel. Be aware that as materialized views " "are updated periodically, modifications to the underlying data in the view may " - "result in errors or unexpected behavior.", - category=bigframes.exceptions.TimeTravelDisabledWarning, + "result in errors or unexpected behavior." ) + warnings.warn(msg, category=bfe.TimeTravelDisabledWarning) else: # table might support time travel, lets do a dry-run query with time travel snapshot_sql = bigframes.session._io.bigquery.to_query( @@ -142,13 +143,13 @@ def validate_table( snapshot_sql, job_config=bigquery.QueryJobConfig(dry_run=True) ) if time_travel_not_found: - warnings.warn( + msg = ( "NotFound error when reading table with time travel." " Attempting query without time travel. Warning: Without" " time travel, modifications to the underlying table may" - " result in errors or unexpected behavior.", - category=bigframes.exceptions.TimeTravelDisabledWarning, + " result in errors or unexpected behavior." ) + warnings.warn(msg, category=bfe.TimeTravelDisabledWarning) return False @@ -263,15 +264,15 @@ def get_index_cols( # resource utilization because of the default sequential index. See # internal issue 335727141. if _is_table_clustered_or_partitioned(table) and not primary_keys: - warnings.warn( + msg = ( f"Table '{str(table.reference)}' is clustered and/or " "partitioned, but BigQuery DataFrames was not able to find a " "suitable index. To avoid this warning, set at least one of: " # TODO(b/338037499): Allow max_results to override this too, # once we make it more efficient. - "`index_col` or `filters`.", - category=bigframes.exceptions.DefaultIndexWarning, + "`index_col` or `filters`." ) + warnings.warn(msg, category=bfe.DefaultIndexWarning) # If there are primary keys defined, the query engine assumes these # columns are unique, even if the constraint is not enforced. We make @@ -296,21 +297,21 @@ def get_time_travel_datetime_and_table_metadata( # Cache hit could be unexpected. See internal issue 329545805. # Raise a warning with more information about how to avoid the # problems with the cache. - warnings.warn( + msg = ( f"Reading cached table from {snapshot_timestamp} to avoid " "incompatibilies with previous reads of this table. To read " "the latest version, set `use_cache=False` or close the " "current session with Session.close() or " - "bigframes.pandas.close_session().", - # There are many layers before we get to (possibly) the user's code: - # pandas.read_gbq_table - # -> with_default_session - # -> Session.read_gbq_table - # -> _read_gbq_table - # -> _get_snapshot_sql_and_primary_key - # -> get_snapshot_datetime_and_table_metadata - stacklevel=7, + "bigframes.pandas.close_session()." ) + # There are many layers before we get to (possibly) the user's code: + # pandas.read_gbq_table + # -> with_default_session + # -> Session.read_gbq_table + # -> _read_gbq_table + # -> _get_snapshot_sql_and_primary_key + # -> get_snapshot_datetime_and_table_metadata + warnings.warn(msg, stacklevel=7) return cached_table # TODO(swast): It's possible that the table metadata is changed between now diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index 01476ed113..553c3fd6e6 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -48,7 +48,6 @@ import bigframes.core.schema import bigframes.core.tree_properties as tree_properties import bigframes.features -import bigframes.formatting_helpers as formatting_helpers import bigframes.session._io.bigquery as bq_io import bigframes.session.metrics import bigframes.session.planner @@ -347,10 +346,14 @@ def export_gcs( format=format, export_options=dict(export_options), ) - job_config = bigquery.QueryJobConfig() - bq_io.add_labels(job_config, api_name=f"dataframe-to_{format.lower()}") - export_job = self.bqclient.query(export_data_statement, job_config=job_config) - self._wait_on_job(export_job) + + bq_io.start_query_with_client( + self.bqclient, + export_data_statement, + job_config=bigquery.QueryJobConfig(), + api_name=f"dataframe-to_{format.lower()}", + metrics=self.metrics, + ) return query_job def dry_run( @@ -358,9 +361,7 @@ def dry_run( ) -> bigquery.QueryJob: sql = self.to_sql(array_value, ordered=ordered) job_config = bigquery.QueryJobConfig(dry_run=True) - bq_io.add_labels(job_config) query_job = self.bqclient.query(sql, job_config=job_config) - _ = query_job.result() return query_job def peek( @@ -373,7 +374,8 @@ def peek( """ plan = self.replace_cached_subtrees(array_value.node) if not tree_properties.can_fast_peek(plan): - warnings.warn("Peeking this value cannot be done efficiently.") + msg = "Peeking this value cannot be done efficiently." + warnings.warn(msg) sql = self.compiler.compile_peek(plan, n_rows) @@ -487,15 +489,19 @@ def _run_execute_query( if not self.strictly_ordered: job_config.labels["bigframes-mode"] = "unordered" - # Note: add_labels is global scope which may have unexpected effects - bq_io.add_labels(job_config, api_name=api_name) + # Note: add_and_trim_labels is global scope which may have unexpected effects + # Ensure no additional labels are added to job_config after this point, + # as `add_and_trim_labels` ensures the label count does not exceed 64. + bq_io.add_and_trim_labels(job_config, api_name=api_name) try: - query_job = self.bqclient.query(sql, job_config=job_config) - return ( - self._wait_on_job( - query_job, max_results=max_results, page_size=page_size - ), - query_job, + return bq_io.start_query_with_client( + self.bqclient, + sql, + job_config=job_config, + api_name=api_name, + max_results=max_results, + page_size=page_size, + metrics=self.metrics, ) except google.api_core.exceptions.BadRequest as e: @@ -506,29 +512,6 @@ def _run_execute_query( else: raise - def _wait_on_job( - self, - query_job: bigquery.QueryJob, - page_size: Optional[int] = None, - max_results: Optional[int] = None, - ) -> bq_table.RowIterator: - opts = bigframes.options.display - if opts.progress_bar is not None and not query_job.configuration.dry_run: - results_iterator = formatting_helpers.wait_for_query_job( - query_job, - progress_bar=opts.progress_bar, - max_results=max_results, - page_size=page_size, - ) - else: - results_iterator = query_job.result( - max_results=max_results, page_size=page_size - ) - - if self.metrics is not None: - self.metrics.count_job_stats(query_job) - return results_iterator - def replace_cached_subtrees(self, node: nodes.BigFrameNode) -> nodes.BigFrameNode: return nodes.top_down( node, lambda x: self._cached_executions.get(x, x), memoize=True diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index e7579b1138..ec922e286d 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -707,9 +707,9 @@ def _start_query( return bf_io_bigquery.start_query_with_client( self._bqclient, sql, - job_config, - max_results, - timeout, + job_config=job_config, + max_results=max_results, + timeout=timeout, api_name=api_name, ) diff --git a/bigframes/streaming/dataframe.py b/bigframes/streaming/dataframe.py index b83ae5d822..960da9f57c 100644 --- a/bigframes/streaming/dataframe.py +++ b/bigframes/streaming/dataframe.py @@ -26,6 +26,7 @@ import bigframes from bigframes import dataframe from bigframes.core import log_adapter +import bigframes.exceptions as bfe def _return_type_wrapper(method, cls): @@ -347,11 +348,8 @@ def _to_bigtable( For example, the job can be cancelled or its error status can be examined. """ - warnings.warn( - "The bigframes.streaming module is a preview feature, and subject to change.", - stacklevel=1, - category=bigframes.exceptions.PreviewWarning, - ) + msg = "The bigframes.streaming module is a preview feature, and subject to change." + warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning) # get default client if not passed if session is None: @@ -462,11 +460,8 @@ def _to_pubsub( For example, the job can be cancelled or its error status can be examined. """ - warnings.warn( - "The bigframes.streaming module is a preview feature, and subject to change.", - stacklevel=1, - category=bigframes.exceptions.PreviewWarning, - ) + msg = "The bigframes.streaming module is a preview feature, and subject to change." + warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning) # get default client if not passed if session is None: diff --git a/bigframes/version.py b/bigframes/version.py index 7b6d1f2153..0858c02c1e 100644 --- a/bigframes/version.py +++ b/bigframes/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "1.31.0" +__version__ = "1.32.0" diff --git a/notebooks/experimental/semantic_operators.ipynb b/notebooks/experimental/semantic_operators.ipynb index f9c7f67358..d3fec469b4 100644 --- a/notebooks/experimental/semantic_operators.ipynb +++ b/notebooks/experimental/semantic_operators.ipynb @@ -44,6 +44,12 @@ " View on GitHub\n", " \n", " \n", + " \n", + " \n", + " \"BQ\n", + " Open in BQ Studio\n", + " \n", + " \n", "" ] }, @@ -135,24 +141,24 @@ }, { "cell_type": "markdown", - "source": [ - "Specify your GCP project and location." - ], "metadata": { "id": "W8TPUvnsqxhv" - } + }, + "source": [ + "Specify your GCP project and location." + ] }, { "cell_type": "code", - "source": [ - "bpd.options.bigquery.project = 'YOUR_PROJECT_ID'\n", - "bpd.options.bigquery.location = 'US'" - ], + "execution_count": null, "metadata": { "id": "vCkraKOeqJFl" }, - "execution_count": null, - "outputs": [] + "outputs": [], + "source": [ + "bpd.options.bigquery.project = 'YOUR_PROJECT_ID'\n", + "bpd.options.bigquery.location = 'US'" + ] }, { "cell_type": "markdown", @@ -182,7 +188,7 @@ "source": [ "Create LLM instances. They will be passed in as parameters for each semantic operator.\n", "\n", - "This tutorial uses the \"gemini-1.5-flash-001\" model for text generation and \"text-embedding-005\" for embedding. While these are recommended, you can choose [other Vertex AI LLM models](https://cloud.google.com/vertex-ai/generative-ai/docs/learn/models) based on your needs and availability. Ensure you have [sufficient quota](https://cloud.google.com/vertex-ai/generative-ai/docs/quotas) for your chosen models and adjust it if necessary." + "This tutorial uses the \"gemini-1.5-flash-002\" model for text generation and \"text-embedding-005\" for embedding. While these are recommended, you can choose [other Vertex AI LLM models](https://cloud.google.com/vertex-ai/generative-ai/docs/learn/models) based on your needs and availability. Ensure you have [sufficient quota](https://cloud.google.com/vertex-ai/generative-ai/docs/quotas) for your chosen models and adjust it if necessary." ] }, { @@ -293,16 +299,7 @@ }, "outputs": [ { - "output_type": "execute_result", "data": { - "text/plain": [ - " country city\n", - "0 USA Seattle\n", - "1 Germany Berlin\n", - "2 Japan Kyoto\n", - "\n", - "[3 rows x 2 columns]" - ], "text/html": [ "
\n", "