diff --git a/.github/.OwlBot.lock.yaml b/.github/.OwlBot.lock.yaml
index 6301519a9a..10cf433a8b 100644
--- a/.github/.OwlBot.lock.yaml
+++ b/.github/.OwlBot.lock.yaml
@@ -1,4 +1,4 @@
-# Copyright 2024 Google LLC
+# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,5 +13,5 @@
# limitations under the License.
docker:
image: gcr.io/cloud-devrel-public-resources/owlbot-python:latest
- digest: sha256:2ed982f884312e4883e01b5ab8af8b6935f0216a5a2d82928d273081fc3be562
-# created: 2024-11-12T12:09:45.821174897Z
+ digest: sha256:8ff1efe878e18bd82a0fb7b70bb86f77e7ab6901fed394440b6135db0ba8d84a
+# created: 2025-01-09T12:01:16.422459506Z
diff --git a/.github/workflows/unittest.yml b/.github/workflows/unittest.yml
index ce5137e58c..8659d83d82 100644
--- a/.github/workflows/unittest.yml
+++ b/.github/workflows/unittest.yml
@@ -5,7 +5,10 @@ on:
name: unittest
jobs:
unit:
- runs-on: ubuntu-latest
+ # TODO(https://github.com/googleapis/gapic-generator-python/issues/2303): use `ubuntu-latest` once this bug is fixed.
+ # Use ubuntu-22.04 until Python 3.7 is removed from the test matrix
+ # https://docs.github.com/en/actions/using-github-hosted-runners/using-github-hosted-runners/about-github-hosted-runners#standard-github-hosted-runners-for-public-repositories
+ runs-on: ubuntu-22.04
strategy:
matrix:
python: ['3.9', '3.10', '3.11', '3.12']
diff --git a/.kokoro/docker/docs/requirements.txt b/.kokoro/docker/docs/requirements.txt
index 8bb0764594..f99a5c4aac 100644
--- a/.kokoro/docker/docs/requirements.txt
+++ b/.kokoro/docker/docs/requirements.txt
@@ -2,11 +2,11 @@
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
-# pip-compile --allow-unsafe --generate-hashes requirements.in
+# pip-compile --allow-unsafe --generate-hashes synthtool/gcp/templates/python_library/.kokoro/docker/docs/requirements.in
#
-argcomplete==3.5.1 \
- --hash=sha256:1a1d148bdaa3e3b93454900163403df41448a248af01b6e849edc5ac08e6c363 \
- --hash=sha256:eb1ee355aa2557bd3d0145de7b06b2a45b0ce461e1e7813f5d066039ab4177b4
+argcomplete==3.5.2 \
+ --hash=sha256:036d020d79048a5d525bc63880d7a4b8d1668566b8a76daf1144c0bbe0f63472 \
+ --hash=sha256:23146ed7ac4403b70bd6026402468942ceba34a6732255b9edf5b7354f68a6bb
# via nox
colorlog==6.9.0 \
--hash=sha256:5906e71acd67cb07a71e779c47c4bcb45fb8c2993eebe9e5adcd6a6f1b283eff \
@@ -23,7 +23,7 @@ filelock==3.16.1 \
nox==2024.10.9 \
--hash=sha256:1d36f309a0a2a853e9bccb76bbef6bb118ba92fa92674d15604ca99adeb29eab \
--hash=sha256:7aa9dc8d1c27e9f45ab046ffd1c3b2c4f7c91755304769df231308849ebded95
- # via -r requirements.in
+ # via -r synthtool/gcp/templates/python_library/.kokoro/docker/docs/requirements.in
packaging==24.2 \
--hash=sha256:09abb1bccd265c01f4a3aa3f7a7db064b36514d2cba19a2f694fe6150451a759 \
--hash=sha256:c228a6dc5e932d346bc5739379109d49e8853dd8223571c7c5b55260edc0b97f
@@ -32,11 +32,41 @@ platformdirs==4.3.6 \
--hash=sha256:357fb2acbc885b0419afd3ce3ed34564c13c9b95c89360cd9563f73aa5e2b907 \
--hash=sha256:73e575e1408ab8103900836b97580d5307456908a03e92031bab39e4554cc3fb
# via virtualenv
-tomli==2.0.2 \
- --hash=sha256:2ebe24485c53d303f690b0ec092806a085f07af5a5aa1464f3931eec36caaa38 \
- --hash=sha256:d46d457a85337051c36524bc5349dd91b1877838e2979ac5ced3e710ed8a60ed
+tomli==2.2.1 \
+ --hash=sha256:023aa114dd824ade0100497eb2318602af309e5a55595f76b626d6d9f3b7b0a6 \
+ --hash=sha256:02abe224de6ae62c19f090f68da4e27b10af2b93213d36cf44e6e1c5abd19fdd \
+ --hash=sha256:286f0ca2ffeeb5b9bd4fcc8d6c330534323ec51b2f52da063b11c502da16f30c \
+ --hash=sha256:2d0f2fdd22b02c6d81637a3c95f8cd77f995846af7414c5c4b8d0545afa1bc4b \
+ --hash=sha256:33580bccab0338d00994d7f16f4c4ec25b776af3ffaac1ed74e0b3fc95e885a8 \
+ --hash=sha256:400e720fe168c0f8521520190686ef8ef033fb19fc493da09779e592861b78c6 \
+ --hash=sha256:40741994320b232529c802f8bc86da4e1aa9f413db394617b9a256ae0f9a7f77 \
+ --hash=sha256:465af0e0875402f1d226519c9904f37254b3045fc5084697cefb9bdde1ff99ff \
+ --hash=sha256:4a8f6e44de52d5e6c657c9fe83b562f5f4256d8ebbfe4ff922c495620a7f6cea \
+ --hash=sha256:4e340144ad7ae1533cb897d406382b4b6fede8890a03738ff1683af800d54192 \
+ --hash=sha256:678e4fa69e4575eb77d103de3df8a895e1591b48e740211bd1067378c69e8249 \
+ --hash=sha256:6972ca9c9cc9f0acaa56a8ca1ff51e7af152a9f87fb64623e31d5c83700080ee \
+ --hash=sha256:7fc04e92e1d624a4a63c76474610238576942d6b8950a2d7f908a340494e67e4 \
+ --hash=sha256:889f80ef92701b9dbb224e49ec87c645ce5df3fa2cc548664eb8a25e03127a98 \
+ --hash=sha256:8d57ca8095a641b8237d5b079147646153d22552f1c637fd3ba7f4b0b29167a8 \
+ --hash=sha256:8dd28b3e155b80f4d54beb40a441d366adcfe740969820caf156c019fb5c7ec4 \
+ --hash=sha256:9316dc65bed1684c9a98ee68759ceaed29d229e985297003e494aa825ebb0281 \
+ --hash=sha256:a198f10c4d1b1375d7687bc25294306e551bf1abfa4eace6650070a5c1ae2744 \
+ --hash=sha256:a38aa0308e754b0e3c67e344754dff64999ff9b513e691d0e786265c93583c69 \
+ --hash=sha256:a92ef1a44547e894e2a17d24e7557a5e85a9e1d0048b0b5e7541f76c5032cb13 \
+ --hash=sha256:ac065718db92ca818f8d6141b5f66369833d4a80a9d74435a268c52bdfa73140 \
+ --hash=sha256:b82ebccc8c8a36f2094e969560a1b836758481f3dc360ce9a3277c65f374285e \
+ --hash=sha256:c954d2250168d28797dd4e3ac5cf812a406cd5a92674ee4c8f123c889786aa8e \
+ --hash=sha256:cb55c73c5f4408779d0cf3eef9f762b9c9f147a77de7b258bef0a5628adc85cc \
+ --hash=sha256:cd45e1dc79c835ce60f7404ec8119f2eb06d38b1deba146f07ced3bbc44505ff \
+ --hash=sha256:d3f5614314d758649ab2ab3a62d4f2004c825922f9e370b29416484086b264ec \
+ --hash=sha256:d920f33822747519673ee656a4b6ac33e382eca9d331c87770faa3eef562aeb2 \
+ --hash=sha256:db2b95f9de79181805df90bedc5a5ab4c165e6ec3fe99f970d0e302f384ad222 \
+ --hash=sha256:e59e304978767a54663af13c07b3d1af22ddee3bb2fb0618ca1593e4f593a106 \
+ --hash=sha256:e85e99945e688e32d5a35c1ff38ed0b3f41f43fad8df0bdf79f72b2ba7bc5272 \
+ --hash=sha256:ece47d672db52ac607a3d9599a9d48dcb2f2f735c6c2d1f34130085bb12b112a \
+ --hash=sha256:f4039b9cbc3048b2416cc57ab3bda989a6fcf9b36cf8937f01a6e731b64f80d7
# via nox
-virtualenv==20.27.1 \
- --hash=sha256:142c6be10212543b32c6c45d3d3893dff89112cc588b7d0879ae5a1ec03a47ba \
- --hash=sha256:f11f1b8a29525562925f745563bfd48b189450f61fb34c4f9cc79dd5aa32a1f4
+virtualenv==20.28.0 \
+ --hash=sha256:23eae1b4516ecd610481eda647f3a7c09aea295055337331bb4e6892ecce47b0 \
+ --hash=sha256:2c9c3262bb8e7b87ea801d715fae4495e6032450c71d2309be9550e7364049aa
# via nox
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ff5ce11006..b4bec86e9e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,6 +4,35 @@
[1]: https://pypi.org/project/bigframes/#history
+## [1.32.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v1.31.0...v1.32.0) (2025-01-13)
+
+
+### Features
+
+* Add max_retries to TextEmbeddingGenerator and Claude3TextGenerator ([#1259](https://github.com/googleapis/python-bigquery-dataframes/issues/1259)) ([8077ff4](https://github.com/googleapis/python-bigquery-dataframes/commit/8077ff49426b103dc5a52eeb86a2c6a869c99825))
+* Bigframes.bigquery.parse_json ([#1265](https://github.com/googleapis/python-bigquery-dataframes/issues/1265)) ([27bbd80](https://github.com/googleapis/python-bigquery-dataframes/commit/27bbd8085ccac175f113afbd6c94b52c034a3d97))
+* Support DataFrame.astype(dict) ([#1262](https://github.com/googleapis/python-bigquery-dataframes/issues/1262)) ([5934f8e](https://github.com/googleapis/python-bigquery-dataframes/commit/5934f8ee0a1c950a820d1911d73a46f6891a40bb))
+
+
+### Bug Fixes
+
+* Avoid global mutation in `BigQueryOptions.client_endpoints_override` ([#1280](https://github.com/googleapis/python-bigquery-dataframes/issues/1280)) ([788f6e9](https://github.com/googleapis/python-bigquery-dataframes/commit/788f6e94a1e80f0ba8741a53a05a467e7b18e902))
+* Fix erroneous window bounds removal during compilation ([#1163](https://github.com/googleapis/python-bigquery-dataframes/issues/1163)) ([f91756a](https://github.com/googleapis/python-bigquery-dataframes/commit/f91756a4413b10f1072c0ae96301fe854bb1ba4e))
+
+
+### Dependencies
+
+* Relax sqlglot upper bound ([#1278](https://github.com/googleapis/python-bigquery-dataframes/issues/1278)) ([c71ec09](https://github.com/googleapis/python-bigquery-dataframes/commit/c71ec093314409cd4c7a52a713dbd6164fbbd792))
+
+
+### Documentation
+
+* Add bq studio links that allows users to generate Jupiter notebooks in bq studio with github contents ([#1266](https://github.com/googleapis/python-bigquery-dataframes/issues/1266)) ([58f13cb](https://github.com/googleapis/python-bigquery-dataframes/commit/58f13cb9ef8bac3222e5013d8ae77dd20f886e30))
+* Add snippet to evaluate ARIMA plus model in the Forecast a single time series with a univariate model tutorial ([#1267](https://github.com/googleapis/python-bigquery-dataframes/issues/1267)) ([3dcae2d](https://github.com/googleapis/python-bigquery-dataframes/commit/3dcae2dca45efdd4493cf3f367bf025ea291f4df))
+* Add snippet to see the ARIMA coefficients in the Forecast a single time series with a univariate model tutorial ([#1268](https://github.com/googleapis/python-bigquery-dataframes/issues/1268)) ([059a564](https://github.com/googleapis/python-bigquery-dataframes/commit/059a564095dfea0518982f13c8118d3807861ccf))
+* Update `bigframes.pandas.pandas` docstrings ([#1247](https://github.com/googleapis/python-bigquery-dataframes/issues/1247)) ([c4bffc3](https://github.com/googleapis/python-bigquery-dataframes/commit/c4bffc3e8ec630a362c94f9d269a66073a14ad04))
+* Use 002 model for better scalability in text generation ([#1270](https://github.com/googleapis/python-bigquery-dataframes/issues/1270)) ([bb7a850](https://github.com/googleapis/python-bigquery-dataframes/commit/bb7a85005ebebfbcb0d2a4d5c4c27b354f38d3d1))
+
## [1.31.0](https://github.com/googleapis/python-bigquery-dataframes/compare/v1.30.0...v1.31.0) (2025-01-05)
@@ -97,6 +126,11 @@
* Update df.corr, df.cov to be used with more than 30 columns case. ([#1161](https://github.com/googleapis/python-bigquery-dataframes/issues/1161)) ([9dcf1aa](https://github.com/googleapis/python-bigquery-dataframes/commit/9dcf1aa918919704dcf4d12b05935b22fb502fc6))
+### Dependencies
+
+* Remove `ibis-framework` by vendoring a fork of the package to `bigframes_vendored`. ([#1170](https://github.com/googleapis/python-bigquery-dataframes/pull/1170)) ([421d24d](https://github.com/googleapis/python-bigquery-dataframes/commit/421d24d6e61d557aa696fc701c08c84389f72ed2))
+
+
### Documentation
* Add a code sample using `bpd.options.bigquery.ordering_mode = "partial"` ([#909](https://github.com/googleapis/python-bigquery-dataframes/issues/909)) ([f80d705](https://github.com/googleapis/python-bigquery-dataframes/commit/f80d70503b80559a0b1fe64434383aa3e028bf9b))
diff --git a/bigframes/_config/bigquery_options.py b/bigframes/_config/bigquery_options.py
index 052ad5d921..8fec253b24 100644
--- a/bigframes/_config/bigquery_options.py
+++ b/bigframes/_config/bigquery_options.py
@@ -25,7 +25,7 @@
import bigframes.constants
import bigframes.enums
-import bigframes.exceptions
+import bigframes.exceptions as bfe
SESSION_STARTED_MESSAGE = (
"Cannot change '{attribute}' once a session has started. "
@@ -55,15 +55,12 @@ def _get_validated_location(value: Optional[str]) -> Optional[str]:
bigframes.constants.ALL_BIGQUERY_LOCATIONS,
key=lambda item: jellyfish.levenshtein_distance(location, item),
)
- warnings.warn(
- UNKNOWN_LOCATION_MESSAGE.format(location=location, possibility=possibility),
- # There are many layers before we get to (possibly) the user's code:
- # -> bpd.options.bigquery.location = "us-central-1"
- # -> location.setter
- # -> _get_validated_location
- stacklevel=3,
- category=bigframes.exceptions.UnknownLocationWarning,
- )
+ # There are many layers before we get to (possibly) the user's code:
+ # -> bpd.options.bigquery.location = "us-central-1"
+ # -> location.setter
+ # -> _get_validated_location
+ msg = UNKNOWN_LOCATION_MESSAGE.format(location=location, possibility=possibility)
+ warnings.warn(msg, stacklevel=3, category=bfe.UnknownLocationWarning)
return value
@@ -91,7 +88,7 @@ def __init__(
skip_bq_connection_check: bool = False,
*,
ordering_mode: Literal["strict", "partial"] = "strict",
- client_endpoints_override: dict = {},
+ client_endpoints_override: Optional[dict] = None,
):
self._credentials = credentials
self._project = project
@@ -104,6 +101,10 @@ def __init__(
self._session_started = False
# Determines the ordering strictness for the session.
self._ordering_mode = _validate_ordering_mode(ordering_mode)
+
+ if client_endpoints_override is None:
+ client_endpoints_override = {}
+
self._client_endpoints_override = client_endpoints_override
@property
@@ -271,10 +272,11 @@ def use_regional_endpoints(self, value: bool):
)
if value:
- warnings.warn(
+ msg = (
"Use of regional endpoints is a feature in preview and "
"available only in selected regions and projects. "
)
+ warnings.warn(msg, category=bfe.PreviewWarning, stacklevel=2)
self._use_regional_endpoints = value
@@ -330,9 +332,12 @@ def client_endpoints_override(self) -> dict:
@client_endpoints_override.setter
def client_endpoints_override(self, value: dict):
- warnings.warn(
- "This is an advanced configuration option for directly setting endpoints. Incorrect use may lead to unexpected behavior or system instability. Proceed only if you fully understand its implications."
+ msg = (
+ "This is an advanced configuration option for directly setting endpoints. "
+ "Incorrect use may lead to unexpected behavior or system instability. "
+ "Proceed only if you fully understand its implications."
)
+ warnings.warn(msg)
if self._session_started and self._client_endpoints_override != value:
raise ValueError(
diff --git a/bigframes/_config/experiment_options.py b/bigframes/_config/experiment_options.py
index 6b79dcf748..69273aef1c 100644
--- a/bigframes/_config/experiment_options.py
+++ b/bigframes/_config/experiment_options.py
@@ -14,6 +14,8 @@
import warnings
+import bigframes.exceptions as bfe
+
class ExperimentOptions:
"""
@@ -31,9 +33,11 @@ def semantic_operators(self) -> bool:
@semantic_operators.setter
def semantic_operators(self, value: bool):
if value is True:
- warnings.warn(
- "Semantic operators are still under experiments, and are subject to change in the future."
+ msg = (
+ "Semantic operators are still under experiments, and are subject "
+ "to change in the future."
)
+ warnings.warn(msg, category=bfe.PreviewWarning)
self._semantic_operators = value
@property
@@ -43,7 +47,9 @@ def blob(self) -> bool:
@blob.setter
def blob(self, value: bool):
if value is True:
- warnings.warn(
- "BigFrames Blob is still under experiments. It may not work and subject to change in the future."
+ msg = (
+ "BigFrames Blob is still under experiments. It may not work and "
+ "subject to change in the future."
)
+ warnings.warn(msg, category=bfe.PreviewWarning)
self._blob = value
diff --git a/bigframes/bigquery/__init__.py b/bigframes/bigquery/__init__.py
index a39914d6e7..ff52ae8d36 100644
--- a/bigframes/bigquery/__init__.py
+++ b/bigframes/bigquery/__init__.py
@@ -27,20 +27,27 @@
json_extract_array,
json_extract_string_array,
json_set,
+ parse_json,
)
from bigframes.bigquery._operations.search import create_vector_index, vector_search
from bigframes.bigquery._operations.struct import struct
__all__ = [
+ # approximate aggregate ops
+ "approx_top_count",
+ # array ops
"array_length",
"array_agg",
"array_to_string",
+ # json ops
"json_set",
"json_extract",
"json_extract_array",
"json_extract_string_array",
- "approx_top_count",
- "struct",
+ "parse_json",
+ # search ops
"create_vector_index",
"vector_search",
+ # struct ops
+ "struct",
]
diff --git a/bigframes/bigquery/_operations/json.py b/bigframes/bigquery/_operations/json.py
index 843991807e..52b01d3ef7 100644
--- a/bigframes/bigquery/_operations/json.py
+++ b/bigframes/bigquery/_operations/json.py
@@ -23,6 +23,7 @@
from typing import Any, cast, Optional, Sequence, Tuple, Union
+import bigframes.core.utils as utils
import bigframes.dtypes
import bigframes.operations as ops
import bigframes.series as series
@@ -30,6 +31,7 @@
from . import array
+@utils.preview(name="The JSON-related API `json_set`")
def json_set(
input: series.Series,
json_path_value_pairs: Sequence[Tuple[str, Any]],
@@ -37,6 +39,10 @@ def json_set(
"""Produces a new JSON value within a Series by inserting or replacing values at
specified paths.
+ .. warning::
+ The JSON-related API `parse_json` is in preview. Its behavior may change in
+ future versions.
+
**Examples:**
>>> import bigframes.pandas as bpd
@@ -223,3 +229,37 @@ def json_extract_string_array(
),
)
return array_series
+
+
+@utils.preview(name="The JSON-related API `parse_json`")
+def parse_json(
+ input: series.Series,
+) -> series.Series:
+ """Converts a series with a JSON-formatted STRING value to a JSON value.
+
+ .. warning::
+ The JSON-related API `parse_json` is in preview. Its behavior may change in
+ future versions.
+
+ **Examples:**
+
+ >>> import bigframes.pandas as bpd
+ >>> import bigframes.bigquery as bbq
+ >>> bpd.options.display.progress_bar = None
+
+ >>> s = bpd.Series(['{"class": {"students": [{"id": 5}, {"id": 12}]}}'])
+ >>> s
+ 0 {"class": {"students": [{"id": 5}, {"id": 12}]}}
+ dtype: string
+ >>> bbq.parse_json(s)
+ 0 {"class":{"students":[{"id":5},{"id":12}]}}
+ dtype: large_string[pyarrow]
+
+ Args:
+ input (bigframes.series.Series):
+ The Series containing JSON-formatted strings).
+
+ Returns:
+ bigframes.series.Series: A new Series with the JSON value.
+ """
+ return input._apply_unary_op(ops.ParseJSON())
diff --git a/bigframes/blob/_functions.py b/bigframes/blob/_functions.py
new file mode 100644
index 0000000000..4b3841252c
--- /dev/null
+++ b/bigframes/blob/_functions.py
@@ -0,0 +1,130 @@
+# Copyright 2024 Google LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from dataclasses import dataclass
+import inspect
+from typing import Callable, Iterable
+
+import google.cloud.bigquery as bigquery
+
+import bigframes
+import bigframes.session._io.bigquery as bf_io_bigquery
+
+_PYTHON_TO_BQ_TYPES = {int: "INT64", float: "FLOAT64", str: "STRING", bytes: "BYTES"}
+
+
+@dataclass(frozen=True)
+class FunctionDef:
+ """Definition of a Python UDF."""
+
+ func: Callable # function body
+ requirements: Iterable[str] # required packages
+
+
+# TODO(garrettwu): migrate to bigframes UDF when it is available
+class TransformFunction:
+ """Simple transform function class to deal with Python UDF."""
+
+ def __init__(
+ self, func_def: FunctionDef, session: bigframes.Session, connection: str
+ ):
+ self._func = func_def.func
+ self._requirements = func_def.requirements
+ self._session = session
+ self._connection = connection
+
+ def _input_bq_signature(self):
+ sig = inspect.signature(self._func)
+ inputs = []
+ for k, v in sig.parameters.items():
+ inputs.append(f"{k} {_PYTHON_TO_BQ_TYPES[v.annotation]}")
+ return ", ".join(inputs)
+
+ def _output_bq_type(self):
+ sig = inspect.signature(self._func)
+ return _PYTHON_TO_BQ_TYPES[sig.return_annotation]
+
+ def _create_udf(self):
+ """Create Python UDF in BQ. Return name of the UDF."""
+ udf_name = str(self._session._loader._storage_manager._random_table())
+
+ func_body = inspect.getsource(self._func)
+ func_name = self._func.__name__
+ packages = str(list(self._requirements))
+
+ sql = f"""
+CREATE OR REPLACE FUNCTION `{udf_name}`({self._input_bq_signature()})
+RETURNS {self._output_bq_type()} LANGUAGE python
+WITH CONNECTION `{self._connection}`
+OPTIONS (entry_point='{func_name}', runtime_version='python-3.11', packages={packages})
+AS r\"\"\"
+
+
+{func_body}
+
+
+\"\"\"
+ """
+
+ bf_io_bigquery.start_query_with_client(
+ self._session.bqclient,
+ sql,
+ job_config=bigquery.QueryJobConfig(),
+ metrics=self._session._metrics,
+ )
+
+ return udf_name
+
+ def udf(self):
+ """Create and return the UDF object."""
+ udf_name = self._create_udf()
+ return self._session.read_gbq_function(udf_name)
+
+
+# Blur images. Takes ObjectRefRuntime as JSON string. Outputs ObjectRefRuntime JSON string.
+def image_blur_func(
+ src_obj_ref_rt: str, dst_obj_ref_rt: str, ksize_x: int, ksize_y: int
+) -> str:
+ import json
+
+ import cv2 as cv # type: ignore
+ import numpy as np
+ import requests
+
+ src_obj_ref_rt_json = json.loads(src_obj_ref_rt)
+ dst_obj_ref_rt_json = json.loads(dst_obj_ref_rt)
+
+ src_url = src_obj_ref_rt_json["access_urls"]["read_url"]
+ dst_url = dst_obj_ref_rt_json["access_urls"]["write_url"]
+
+ response = requests.get(src_url)
+ bts = response.content
+
+ nparr = np.frombuffer(bts, np.uint8)
+ img = cv.imdecode(nparr, cv.IMREAD_UNCHANGED)
+ img_blurred = cv.blur(img, ksize=(ksize_x, ksize_y))
+ bts = cv.imencode(".jpeg", img_blurred)[1].tobytes()
+
+ requests.put(
+ url=dst_url,
+ data=bts,
+ headers={
+ "Content-Type": "image/jpeg",
+ },
+ )
+
+ return dst_obj_ref_rt
+
+
+image_blur_def = FunctionDef(image_blur_func, ["opencv-python", "numpy", "requests"])
diff --git a/bigframes/core/__init__.py b/bigframes/core/__init__.py
index 5e3f6df355..a88e365dcd 100644
--- a/bigframes/core/__init__.py
+++ b/bigframes/core/__init__.py
@@ -39,6 +39,7 @@
import bigframes.core.utils
from bigframes.core.window_spec import WindowSpec
import bigframes.dtypes
+import bigframes.exceptions as bfe
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
@@ -106,10 +107,11 @@ def from_table(
if offsets_col and primary_key:
raise ValueError("must set at most one of 'offests', 'primary_key'")
if any(i.field_type == "JSON" for i in table.schema if i.name in schema.names):
- warnings.warn(
- "Interpreting JSON column(s) as StringDtype and pyarrow.large_string. This behavior may change in future versions.",
- bigframes.exceptions.PreviewWarning,
+ msg = (
+ "Interpreting JSON column(s) as pyarrow.large_string. "
+ "This behavior may change in future versions."
)
+ warnings.warn(msg, bfe.PreviewWarning)
# define data source only for needed columns, this makes row-hashing cheaper
table_def = nodes.GbqTable.from_table(table, columns=schema.names)
@@ -228,10 +230,8 @@ def slice(
self, start: Optional[int], stop: Optional[int], step: Optional[int]
) -> ArrayValue:
if self.node.order_ambiguous and not (self.session._strictly_ordered):
- warnings.warn(
- "Window ordering may be ambiguous, this can cause unstable results.",
- bigframes.exceptions.AmbiguousWindowWarning,
- )
+ msg = "Window ordering may be ambiguous, this can cause unstable results."
+ warnings.warn(msg, bfe.AmbiguousWindowWarning)
return ArrayValue(
nodes.SliceNode(
self.node,
@@ -252,10 +252,10 @@ def promote_offsets(self) -> Tuple[ArrayValue, str]:
"Generating offsets not supported in partial ordering mode"
)
else:
- warnings.warn(
- "Window ordering may be ambiguous, this can cause unstable results.",
- bigframes.exceptions.AmbiguousWindowWarning,
+ msg = (
+ "Window ordering may be ambiguous, this can cause unstable results."
)
+ warnings.warn(msg, category=bfe.AmbiguousWindowWarning)
return (
ArrayValue(
@@ -391,10 +391,8 @@ def project_window_op(
"Generating offsets not supported in partial ordering mode"
)
else:
- warnings.warn(
- "Window ordering may be ambiguous, this can cause unstable results.",
- bigframes.exceptions.AmbiguousWindowWarning,
- )
+ msg = "Window ordering may be ambiguous, this can cause unstable results."
+ warnings.warn(msg, category=bfe.AmbiguousWindowWarning)
output_name = self._gen_namespaced_uid()
return (
diff --git a/bigframes/core/block_transforms.py b/bigframes/core/block_transforms.py
index 785691edd6..a7f75e7264 100644
--- a/bigframes/core/block_transforms.py
+++ b/bigframes/core/block_transforms.py
@@ -86,9 +86,10 @@ def indicate_duplicates(
# Discard this value if there are copies ANYWHERE
window_spec = windows.unbound(grouping_keys=tuple(columns))
block, dummy = block.create_constant(1)
+ # use row number as will work even with partial ordering
block, val_count_col_id = block.apply_window_op(
dummy,
- agg_ops.count_op,
+ agg_ops.sum_op,
window_spec=window_spec,
)
block, duplicate_indicator = block.project_expr(
diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py
index ca860612f8..522d1743ff 100644
--- a/bigframes/core/blocks.py
+++ b/bigframes/core/blocks.py
@@ -63,7 +63,7 @@
import bigframes.core.utils as utils
import bigframes.core.window_spec as windows
import bigframes.dtypes
-import bigframes.exceptions
+import bigframes.exceptions as bfe
import bigframes.features
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
@@ -137,10 +137,8 @@ def __init__(
)
if len(index_columns) == 0:
- warnings.warn(
- "Creating object with Null Index. Null Index is a preview feature.",
- category=bigframes.exceptions.NullIndexPreviewWarning,
- )
+ msg = "Creating object with Null Index. Null Index is a preview feature."
+ warnings.warn(msg, category=bfe.NullIndexPreviewWarning)
self._index_columns = tuple(index_columns)
# Index labels don't need complicated hierarchical access so can store as tuple
self._index_labels = (
@@ -616,13 +614,13 @@ def _materialize_local(
" # Setting it to None will download all the data\n"
f"{constants.FEEDBACK_LINK}"
)
-
- warnings.warn(
+ msg = (
f"The data size ({table_mb:.2f} MB) exceeds the maximum download limit of"
- f"({max_download_size} MB). It will be downsampled to {max_download_size} MB for download."
- "\nPlease refer to the documentation for configuring the downloading limit.",
- UserWarning,
+ f"({max_download_size} MB). It will be downsampled to {max_download_size} "
+ "MB for download.\nPlease refer to the documentation for configuring "
+ "the downloading limit."
)
+ warnings.warn(msg, category=UserWarning)
total_rows = execute_result.total_rows
# Remove downsampling config from subsequent invocations, as otherwise could result in many
# iterations if downsampling undershoots
diff --git a/bigframes/core/compile/aggregate_compiler.py b/bigframes/core/compile/aggregate_compiler.py
index 482c38ae3d..f97856efa5 100644
--- a/bigframes/core/compile/aggregate_compiler.py
+++ b/bigframes/core/compile/aggregate_compiler.py
@@ -479,6 +479,15 @@ def _(
return _apply_window_if_present(column.dense_rank(), window) + 1
+@compile_unary_agg.register
+def _(
+ op: agg_ops.RowNumberOp,
+ column: ibis_types.Column,
+ window=None,
+) -> ibis_types.IntegerValue:
+ return _apply_window_if_present(ibis_api.row_number(), window)
+
+
@compile_unary_agg.register
def _(op: agg_ops.FirstOp, column: ibis_types.Column, window=None) -> ibis_types.Value:
return _apply_window_if_present(column.first(), window)
diff --git a/bigframes/core/compile/compiled.py b/bigframes/core/compile/compiled.py
index d4c814145b..f879eb3feb 100644
--- a/bigframes/core/compile/compiled.py
+++ b/bigframes/core/compile/compiled.py
@@ -1330,7 +1330,7 @@ def _ibis_window_from_spec(
if require_total_order or isinstance(window_spec.bounds, RowsWindowBounds):
# Some operators need an unambiguous ordering, so the table's total ordering is appended
order_by = tuple([*order_by, *self._ibis_order])
- elif isinstance(window_spec.bounds, RowsWindowBounds):
+ elif require_total_order or isinstance(window_spec.bounds, RowsWindowBounds):
# If window spec has following or preceding bounds, we need to apply an unambiguous ordering.
order_by = tuple(self._ibis_order)
else:
diff --git a/bigframes/core/compile/ibis_types.py b/bigframes/core/compile/ibis_types.py
index 544af69091..a6d3949bc0 100644
--- a/bigframes/core/compile/ibis_types.py
+++ b/bigframes/core/compile/ibis_types.py
@@ -32,6 +32,7 @@
import pyarrow as pa
import bigframes.dtypes
+import bigframes.exceptions as bfe
# Type hints for Ibis data types supported by BigQuery DataFrame
IbisDtype = Union[
@@ -305,10 +306,11 @@ def ibis_dtype_to_bigframes_dtype(
# Temporary: Will eventually support an explicit json type instead of casting to string.
if isinstance(ibis_dtype, ibis_dtypes.JSON):
- warnings.warn(
- "Interpreting JSON as string. This behavior may change in future versions.",
- bigframes.exceptions.PreviewWarning,
+ msg = (
+ "Interpreting JSON column(s) as pyarrow.large_string. This behavior may change "
+ "in future versions."
)
+ warnings.warn(msg, category=bfe.PreviewWarning)
return bigframes.dtypes.JSON_DTYPE
if ibis_dtype in IBIS_TO_BIGFRAMES:
diff --git a/bigframes/core/compile/polars/__init__.py b/bigframes/core/compile/polars/__init__.py
index e15f229faf..8c37e046ab 100644
--- a/bigframes/core/compile/polars/__init__.py
+++ b/bigframes/core/compile/polars/__init__.py
@@ -22,4 +22,5 @@
__all__ = ["PolarsCompiler"]
except Exception:
- warnings.warn("Polars compiler not available as polars is not installed.")
+ msg = "Polars compiler not available as polars is not installed."
+ warnings.warn(msg)
diff --git a/bigframes/core/compile/scalar_op_compiler.py b/bigframes/core/compile/scalar_op_compiler.py
index d824009fec..d594cb3d68 100644
--- a/bigframes/core/compile/scalar_op_compiler.py
+++ b/bigframes/core/compile/scalar_op_compiler.py
@@ -20,7 +20,6 @@
import bigframes_vendored.constants as constants
import bigframes_vendored.ibis.expr.api as ibis_api
import bigframes_vendored.ibis.expr.datatypes as ibis_dtypes
-import bigframes_vendored.ibis.expr.operations as ibis_ops
import bigframes_vendored.ibis.expr.operations.generic as ibis_generic
import bigframes_vendored.ibis.expr.operations.udf as ibis_udf
import bigframes_vendored.ibis.expr.types as ibis_types
@@ -1181,13 +1180,13 @@ def json_set_op_impl(x: ibis_types.Value, y: ibis_types.Value, op: ops.JSONSet):
)
else:
# Enabling JSON type eliminates the need for less efficient string conversions.
- return ibis_ops.ToJsonString(
+ return to_json_string(
json_set( # type: ignore
- json_obj=parse_json(x),
+ json_obj=parse_json(json_str=x),
json_path=op.json_path,
json_value=y,
)
- ).to_expr()
+ )
@scalar_op_compiler.register_unary_op(ops.JSONExtract, pass_op=True)
@@ -1210,6 +1209,16 @@ def json_extract_string_array_op_impl(
return json_extract_string_array(json_obj=x, json_path=op.json_path)
+@scalar_op_compiler.register_unary_op(ops.ParseJSON, pass_op=True)
+def parse_json_op_impl(x: ibis_types.Value, op: ops.ParseJSON):
+ return parse_json(json_str=x)
+
+
+@scalar_op_compiler.register_unary_op(ops.ToJSONString)
+def to_json_string_op_impl(json_obj: ibis_types.Value):
+ return to_json_string(json_obj=json_obj)
+
+
# Blob Ops
@scalar_op_compiler.register_unary_op(ops.obj_fetch_metadata_op)
def obj_fetch_metadata_op_impl(obj_ref: ibis_types.Value):
@@ -1909,6 +1918,13 @@ def json_extract_string_array( # type: ignore[empty-body]
"""Extracts a JSON array and converts it to a SQL ARRAY of STRINGs."""
+@ibis_udf.scalar.builtin(name="to_json_string")
+def to_json_string( # type: ignore[empty-body]
+ json_obj: ibis_dtypes.JSON,
+) -> ibis_dtypes.String:
+ """Convert JSON to STRING."""
+
+
@ibis_udf.scalar.builtin(name="ML.DISTANCE")
def vector_distance(vector1, vector2, type: str) -> ibis_dtypes.Float64: # type: ignore[empty-body]
"""Computes the distance between two vectors using specified type ("EUCLIDEAN", "MANHATTAN", or "COSINE")"""
diff --git a/bigframes/core/global_session.py b/bigframes/core/global_session.py
index e70cdad59e..8b32fee5b4 100644
--- a/bigframes/core/global_session.py
+++ b/bigframes/core/global_session.py
@@ -22,6 +22,7 @@
import google.auth.exceptions
import bigframes._config
+import bigframes.exceptions as bfe
import bigframes.session
_global_session: Optional[bigframes.session.Session] = None
@@ -38,11 +39,11 @@ def _try_close_session(session: bigframes.session.Session):
session_id = session.session_id
location = session._location
project_id = session._project
- warnings.warn(
+ msg = (
f"Session cleanup failed for session with id: {session_id}, "
- f"location: {location}, project: {project_id}",
- category=bigframes.exceptions.CleanupFailedWarning,
+ f"location: {location}, project: {project_id}"
)
+ warnings.warn(msg, category=bfe.CleanupFailedWarning)
traceback.print_tb(e.__traceback__)
diff --git a/bigframes/core/indexers.py b/bigframes/core/indexers.py
index 47ece70fb8..9c7fba8ec1 100644
--- a/bigframes/core/indexers.py
+++ b/bigframes/core/indexers.py
@@ -29,7 +29,7 @@
import bigframes.core.scalar
import bigframes.dataframe
import bigframes.dtypes
-import bigframes.exceptions
+import bigframes.exceptions as bfe
import bigframes.operations as ops
import bigframes.series
@@ -407,11 +407,12 @@ def _struct_accessor_check_and_warn(
return
if not bigframes.dtypes.is_string_like(series.index.dtype):
- warnings.warn(
- "Are you trying to access struct fields? If so, please use Series.struct.field(...) method instead.",
- category=bigframes.exceptions.BadIndexerKeyWarning,
- stacklevel=7, # Stack depth from series.__getitem__ to here
+ msg = (
+ "Are you trying to access struct fields? If so, please use Series.struct.field(...) "
+ "method instead."
)
+ # Stack depth from series.__getitem__ to here
+ warnings.warn(msg, stacklevel=7, category=bfe.BadIndexerKeyWarning)
@typing.overload
diff --git a/bigframes/core/utils.py b/bigframes/core/utils.py
index e684ac55a4..f9ca6cb5f0 100644
--- a/bigframes/core/utils.py
+++ b/bigframes/core/utils.py
@@ -11,14 +11,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import functools
import re
import typing
from typing import Hashable, Iterable, List
+import warnings
import bigframes_vendored.pandas.io.common as vendored_pandas_io_common
import pandas as pd
import typing_extensions
+import bigframes.exceptions as bfe
+
UNNAMED_COLUMN_ID = "bigframes_unnamed_column"
UNNAMED_INDEX_ID = "bigframes_unnamed_index"
@@ -164,3 +168,19 @@ def merge_column_labels(
result_labels.append(col_label)
return pd.Index(result_labels)
+
+
+def preview(*, name: str):
+ """Decorate to warn of a preview API."""
+
+ def decorator(func):
+ msg = f"{name} is in preview. Its behavior may change in future versions."
+
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ warnings.warn(msg, category=bfe.PreviewWarning)
+ return func(*args, **kwargs)
+
+ return wrapper
+
+ return decorator
diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py
index efbe56abf7..01e9bd6308 100644
--- a/bigframes/dataframe.py
+++ b/bigframes/dataframe.py
@@ -68,7 +68,7 @@
import bigframes.core.window
import bigframes.core.window_spec as windows
import bigframes.dtypes
-import bigframes.exceptions
+import bigframes.exceptions as bfe
import bigframes.formatting_helpers as formatter
import bigframes.operations as ops
import bigframes.operations.aggregations
@@ -367,14 +367,35 @@ def __iter__(self):
def astype(
self,
- dtype: Union[bigframes.dtypes.DtypeString, bigframes.dtypes.Dtype],
+ dtype: Union[
+ bigframes.dtypes.DtypeString,
+ bigframes.dtypes.Dtype,
+ dict[str, Union[bigframes.dtypes.DtypeString, bigframes.dtypes.Dtype]],
+ ],
*,
errors: Literal["raise", "null"] = "raise",
) -> DataFrame:
if errors not in ["raise", "null"]:
raise ValueError("Arg 'error' must be one of 'raise' or 'null'")
- return self._apply_unary_op(
- ops.AsTypeOp(to_type=dtype, safe=(errors == "null"))
+
+ safe_cast = errors == "null"
+
+ # Type strings check
+ if dtype in bigframes.dtypes.DTYPE_STRINGS:
+ return self._apply_unary_op(ops.AsTypeOp(dtype, safe_cast))
+
+ # Type instances check
+ if type(dtype) in bigframes.dtypes.DTYPES:
+ return self._apply_unary_op(ops.AsTypeOp(dtype, safe_cast))
+
+ if isinstance(dtype, dict):
+ result = self.copy()
+ for col, to_type in dtype.items():
+ result[col] = result[col].astype(to_type)
+ return result
+
+ raise TypeError(
+ f"Invalid type {type(dtype)} for dtype input. {constants.FEEDBACK_LINK}"
)
def _to_sql_query(
@@ -718,10 +739,23 @@ def _repr_html_(self) -> str:
if opts.repr_mode == "deferred":
return formatter.repr_query_job(self._compute_dry_run())
+ df = self.copy()
+ if bigframes.options.experiments.blob:
+ import bigframes.bigquery as bbq
+
+ blob_cols = [
+ col
+ for col in df.columns
+ if df[col].dtype == bigframes.dtypes.OBJ_REF_DTYPE
+ ]
+ for col in blob_cols:
+ df[col] = df[col]._apply_unary_op(ops.ObjGetAccessUrl(mode="R"))
+ df[col] = bbq.json_extract(df[col], "$.access_urls.read_url")
+
# TODO(swast): pass max_columns and get the true column count back. Maybe
# get 1 more column than we have requested so that pandas can add the
# ... for us?
- pandas_df, row_count, query_job = self._block.retrieve_repr_request_results(
+ pandas_df, row_count, query_job = df._block.retrieve_repr_request_results(
max_results
)
@@ -730,8 +764,31 @@ def _repr_html_(self) -> str:
column_count = len(pandas_df.columns)
with display_options.pandas_repr(opts):
- # _repr_html_ stub is missing so mypy thinks it's a Series. Ignore mypy.
- html_string = pandas_df._repr_html_() # type:ignore
+ # Allows to preview images in the DataFrame. The implementation changes the string repr as well, that it doesn't truncate strings or escape html charaters such as "<" and ">". We may need to implement a full-fledged repr module to better support types not in pandas.
+ if bigframes.options.experiments.blob:
+
+ def url_to_image_html(url: str) -> str:
+ # url is a json string, which already contains double-quotes ""
+ return f""
+
+ formatters = {blob_col: url_to_image_html for blob_col in blob_cols}
+
+ # set max_colwidth so not to truncate the image url
+ with pandas.option_context("display.max_colwidth", None):
+ max_rows = pandas.get_option("display.max_rows")
+ max_cols = pandas.get_option("display.max_columns")
+ show_dimensions = pandas.get_option("display.show_dimensions")
+ html_string = pandas_df.to_html(
+ escape=False,
+ notebook=True,
+ max_rows=max_rows,
+ max_cols=max_cols,
+ show_dimensions=show_dimensions,
+ formatters=formatters, # type: ignore
+ )
+ else:
+ # _repr_html_ stub is missing so mypy thinks it's a Series. Ignore mypy.
+ html_string = pandas_df._repr_html_() # type:ignore
html_string += f"[{row_count} rows x {column_count} columns in total]"
return html_string
@@ -1424,10 +1481,8 @@ def to_arrow(
Returns:
pyarrow.Table: A pyarrow Table with all rows and columns of this DataFrame.
"""
- warnings.warn(
- "to_arrow is in preview. Types and unnamed / duplicate name columns may change in future.",
- category=bigframes.exceptions.PreviewWarning,
- )
+ msg = "to_arrow is in preview. Types and unnamed / duplicate name columns may change in future."
+ warnings.warn(msg, category=bfe.PreviewWarning)
pa_table, query_job = self._block.to_arrow(ordered=ordered)
self._set_internal_query_job(query_job)
@@ -3863,10 +3918,8 @@ def map(self, func, na_action: Optional[str] = None) -> DataFrame:
def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs):
if utils.get_axis_number(axis) == 1:
- warnings.warn(
- "axis=1 scenario is in preview.",
- category=bigframes.exceptions.PreviewWarning,
- )
+ msg = "axis=1 scenario is in preview."
+ warnings.warn(msg, category=bfe.PreviewWarning)
# Check if the function is a remote function
if not hasattr(func, "bigframes_remote_function"):
diff --git a/bigframes/dtypes.py b/bigframes/dtypes.py
index ff3e7a31fb..6e179225ea 100644
--- a/bigframes/dtypes.py
+++ b/bigframes/dtypes.py
@@ -36,6 +36,8 @@
pd.ArrowDtype,
gpd.array.GeometryDtype,
]
+
+DTYPES = typing.get_args(Dtype)
# Represents both column types (dtypes) and local-only types
# None represents the type of a None scalar.
ExpressionType = typing.Optional[Dtype]
@@ -238,6 +240,8 @@ class SimpleDtypeInfo:
"binary[pyarrow]",
]
+DTYPE_STRINGS = typing.get_args(DtypeString)
+
BOOL_BIGFRAMES_TYPES = [BOOL_DTYPE]
# Corresponds to the pandas concept of numeric type (such as when 'numeric_only' is specified in an operation)
diff --git a/bigframes/functions/_remote_function_session.py b/bigframes/functions/_remote_function_session.py
index 84bf2e1fc9..662c32a6a6 100644
--- a/bigframes/functions/_remote_function_session.py
+++ b/bigframes/functions/_remote_function_session.py
@@ -300,7 +300,7 @@ def remote_function(
https://cloud.google.com/functions/docs/networking/network-settings#ingress_settings.
"""
# Some defaults may be used from the session if not provided otherwise
- import bigframes.exceptions as bf_exceptions
+ import bigframes.exceptions as bfe
import bigframes.pandas as bpd
import bigframes.series as bf_series
import bigframes.session
@@ -445,11 +445,8 @@ def wrapper(func):
(input_type := input_types[0]) == bf_series.Series
or input_type == pandas.Series
):
- warnings.warn(
- "input_types=Series is in preview.",
- stacklevel=1,
- category=bf_exceptions.PreviewWarning,
- )
+ msg = "input_types=Series is in preview."
+ warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning)
# we will model the row as a json serialized string containing the data
# and the metadata representing the row
diff --git a/bigframes/functions/remote_function.py b/bigframes/functions/remote_function.py
index b72b8ce8da..9b68843a7d 100644
--- a/bigframes/functions/remote_function.py
+++ b/bigframes/functions/remote_function.py
@@ -32,6 +32,7 @@
import bigframes.core.compile.ibis_types
import bigframes.dtypes
+import bigframes.exceptions as bfe
import bigframes.functions.remote_function_template
from . import _remote_function_session as rf_session
@@ -197,11 +198,11 @@ def func(*bigframes_args, **bigframes_kwargs):
)
function_input_dtypes.append(input_dtype)
if has_unknown_dtypes:
- warnings.warn(
- "The function has one or more missing input data types."
- f" BigQuery DataFrames will assume default data type {bigframes.dtypes.DEFAULT_DTYPE} for them.",
- category=bigframes.exceptions.UnknownDataTypeWarning,
+ msg = (
+ "The function has one or more missing input data types. BigQuery DataFrames "
+ f"will assume default data type {bigframes.dtypes.DEFAULT_DTYPE} for them."
)
+ warnings.warn(msg, category=bfe.UnknownDataTypeWarning)
func.input_dtypes = tuple(function_input_dtypes) # type: ignore
func.output_dtype = bigframes.core.compile.ibis_types.ibis_dtype_to_bigframes_dtype( # type: ignore
diff --git a/bigframes/ml/base.py b/bigframes/ml/base.py
index 4058647adb..f06de99181 100644
--- a/bigframes/ml/base.py
+++ b/bigframes/ml/base.py
@@ -22,7 +22,8 @@
"""
import abc
-from typing import cast, Optional, TypeVar
+from typing import Callable, cast, Mapping, Optional, TypeVar
+import warnings
import bigframes_vendored.sklearn.base
@@ -77,6 +78,9 @@ def fit_transform(self, x_train: Union[DataFrame, Series], y_train: Union[DataFr
...
"""
+ def __init__(self):
+ self._bqml_model: Optional[core.BqmlModel] = None
+
def __repr__(self):
"""Print the estimator's constructor with all non-default parameter values."""
@@ -95,9 +99,6 @@ def __repr__(self):
class Predictor(BaseEstimator):
"""A BigQuery DataFrames ML Model base class that can be used to predict outputs."""
- def __init__(self):
- self._bqml_model: Optional[core.BqmlModel] = None
-
@abc.abstractmethod
def predict(self, X):
pass
@@ -213,12 +214,61 @@ def fit(
return self._fit(X, y)
+class RetriableRemotePredictor(BaseEstimator):
+ @property
+ @abc.abstractmethod
+ def _predict_func(self) -> Callable[[bpd.DataFrame, Mapping], bpd.DataFrame]:
+ pass
+
+ @property
+ @abc.abstractmethod
+ def _status_col(self) -> str:
+ pass
+
+ def _predict_and_retry(
+ self, X: bpd.DataFrame, options: Mapping, max_retries: int
+ ) -> bpd.DataFrame:
+ assert self._bqml_model is not None
+
+ df_result = bpd.DataFrame(session=self._bqml_model.session) # placeholder
+ df_fail = X
+ for _ in range(max_retries + 1):
+ df = self._predict_func(df_fail, options)
+
+ success = df[self._status_col].str.len() == 0
+ df_succ = df[success]
+ df_fail = df[~success]
+
+ if df_succ.empty:
+ if max_retries > 0:
+ msg = "Can't make any progress, stop retrying."
+ warnings.warn(msg, category=RuntimeWarning)
+ break
+
+ df_result = (
+ bpd.concat([df_result, df_succ]) if not df_result.empty else df_succ
+ )
+
+ if df_fail.empty:
+ break
+
+ if not df_fail.empty:
+ msg = (
+ f"Some predictions failed. Check column {self._status_col} for detailed "
+ "status. You may want to filter the failed rows and retry."
+ )
+ warnings.warn(msg, category=RuntimeWarning)
+
+ df_result = cast(
+ bpd.DataFrame,
+ bpd.concat([df_result, df_fail]) if not df_result.empty else df_fail,
+ )
+ return df_result
+
+
class BaseTransformer(BaseEstimator):
"""Transformer base class."""
- def __init__(self):
- self._bqml_model: Optional[core.BqmlModel] = None
-
@abc.abstractmethod
def _keys(self):
pass
diff --git a/bigframes/ml/llm.py b/bigframes/ml/llm.py
index d42138b006..8d1df6e0b9 100644
--- a/bigframes/ml/llm.py
+++ b/bigframes/ml/llm.py
@@ -16,7 +16,7 @@
from __future__ import annotations
-from typing import cast, Literal, Optional
+from typing import Callable, cast, Literal, Mapping, Optional
import warnings
import bigframes_vendored.constants as constants
@@ -180,12 +180,11 @@ def _create_bqml_model(self):
)
if self.model_name not in _TEXT_GENERATOR_ENDPOINTS:
- warnings.warn(
- _MODEL_NOT_SUPPORTED_WARNING.format(
- model_name=self.model_name,
- known_models=", ".join(_TEXT_GENERATOR_ENDPOINTS),
- )
+ msg = _MODEL_NOT_SUPPORTED_WARNING.format(
+ model_name=self.model_name,
+ known_models=", ".join(_TEXT_GENERATOR_ENDPOINTS),
)
+ warnings.warn(msg)
options = {
"endpoint": self.model_name,
@@ -360,10 +359,11 @@ def predict(
df = self._bqml_model.generate_text(X, options)
if (df[_ML_GENERATE_TEXT_STATUS] != "").any():
- warnings.warn(
- f"Some predictions failed. Check column {_ML_GENERATE_TEXT_STATUS} for detailed status. You may want to filter the failed rows and retry.",
- RuntimeWarning,
+ msg = (
+ f"Some predictions failed. Check column {_ML_GENERATE_TEXT_STATUS} for "
+ "detailed status. You may want to filter the failed rows and retry."
)
+ warnings.warn(msg, category=RuntimeWarning)
return df
@@ -513,12 +513,11 @@ def _create_bqml_model(self):
)
if self.model_name not in _PALM2_EMBEDDING_GENERATOR_ENDPOINTS:
- warnings.warn(
- _MODEL_NOT_SUPPORTED_WARNING.format(
- model_name=self.model_name,
- known_models=", ".join(_PALM2_EMBEDDING_GENERATOR_ENDPOINTS),
- )
+ msg = _MODEL_NOT_SUPPORTED_WARNING.format(
+ model_name=self.model_name,
+ known_models=", ".join(_PALM2_EMBEDDING_GENERATOR_ENDPOINTS),
)
+ warnings.warn(msg)
endpoint = (
self.model_name + "@" + self.version if self.version else self.model_name
@@ -590,10 +589,11 @@ def predict(self, X: utils.ArrayType) -> bpd.DataFrame:
)
if (df[_ML_EMBED_TEXT_STATUS] != "").any():
- warnings.warn(
- f"Some predictions failed. Check column {_ML_EMBED_TEXT_STATUS} for detailed status. You may want to filter the failed rows and retry.",
- RuntimeWarning,
+ msg = (
+ f"Some predictions failed. Check column {_ML_EMBED_TEXT_STATUS} for "
+ "detailed status. You may want to filter the failed rows and retry."
)
+ warnings.warn(msg, category=RuntimeWarning)
return df
@@ -616,7 +616,7 @@ def to_gbq(
@log_adapter.class_logger
-class TextEmbeddingGenerator(base.BaseEstimator):
+class TextEmbeddingGenerator(base.RetriableRemotePredictor):
"""Text embedding generator LLM model.
Args:
@@ -678,12 +678,11 @@ def _create_bqml_model(self):
)
if self.model_name not in _TEXT_EMBEDDING_ENDPOINTS:
- warnings.warn(
- _MODEL_NOT_SUPPORTED_WARNING.format(
- model_name=self.model_name,
- known_models=", ".join(_TEXT_EMBEDDING_ENDPOINTS),
- )
+ msg = _MODEL_NOT_SUPPORTED_WARNING.format(
+ model_name=self.model_name,
+ known_models=", ".join(_TEXT_EMBEDDING_ENDPOINTS),
)
+ warnings.warn(msg)
options = {
"endpoint": self.model_name,
@@ -715,18 +714,33 @@ def _from_bq(
model._bqml_model = core.BqmlModel(session, bq_model)
return model
- def predict(self, X: utils.ArrayType) -> bpd.DataFrame:
+ @property
+ def _predict_func(self) -> Callable[[bpd.DataFrame, Mapping], bpd.DataFrame]:
+ return self._bqml_model.generate_embedding
+
+ @property
+ def _status_col(self) -> str:
+ return _ML_GENERATE_EMBEDDING_STATUS
+
+ def predict(self, X: utils.ArrayType, *, max_retries: int = 0) -> bpd.DataFrame:
"""Predict the result from input DataFrame.
Args:
X (bigframes.dataframe.DataFrame or bigframes.series.Series or pandas.core.frame.DataFrame or pandas.core.series.Series):
Input DataFrame or Series, can contain one or more columns. If multiple columns are in the DataFrame, it must contain a "content" column for prediction.
+ max_retries (int, default 0):
+ Max number of retries if the prediction for any rows failed. Each try needs to make progress (i.e. has successfully predicted rows) to continue the retry.
+ Each retry will append newly succeeded rows. When the max retries are reached, the remaining rows (the ones without successful predictions) will be appended to the end of the result.
+
Returns:
bigframes.dataframe.DataFrame: DataFrame of shape (n_samples, n_input_columns + n_prediction_columns). Returns predicted values.
"""
+ if max_retries < 0:
+ raise ValueError(
+ f"max_retries must be larger than or equal to 0, but is {max_retries}."
+ )
- # Params reference: https://cloud.google.com/vertex-ai/docs/generative-ai/learn/models
(X,) = utils.batch_convert_to_dataframe(X, session=self._bqml_model.session)
if len(X.columns) == 1:
@@ -738,15 +752,7 @@ def predict(self, X: utils.ArrayType) -> bpd.DataFrame:
"flatten_json_output": True,
}
- df = self._bqml_model.generate_embedding(X, options)
-
- if (df[_ML_GENERATE_EMBEDDING_STATUS] != "").any():
- warnings.warn(
- f"Some predictions failed. Check column {_ML_GENERATE_EMBEDDING_STATUS} for detailed status. You may want to filter the failed rows and retry.",
- RuntimeWarning,
- )
-
- return df
+ return self._predict_and_retry(X, options=options, max_retries=max_retries)
def to_gbq(self, model_name: str, replace: bool = False) -> TextEmbeddingGenerator:
"""Save the model to BigQuery.
@@ -765,7 +771,7 @@ def to_gbq(self, model_name: str, replace: bool = False) -> TextEmbeddingGenerat
@log_adapter.class_logger
-class GeminiTextGenerator(base.BaseEstimator):
+class GeminiTextGenerator(base.RetriableRemotePredictor):
"""Gemini text generator LLM model.
Args:
@@ -806,13 +812,15 @@ def __init__(
max_iterations: int = 300,
):
if model_name in _GEMINI_PREVIEW_ENDPOINTS:
- warnings.warn(
- f"""Model {model_name} is subject to the "Pre-GA Offerings Terms" in the General Service Terms section of the
- Service Specific Terms(https://cloud.google.com/terms/service-terms#1). Pre-GA products and features are available "as is"
- and might have limited support. For more information, see the launch stage descriptions
- (https://cloud.google.com/products#product-launch-stages).""",
- category=exceptions.PreviewWarning,
+ msg = (
+ f'Model {model_name} is subject to the "Pre-GA Offerings Terms" in '
+ "the General Service Terms section of the Service Specific Terms"
+ "(https://cloud.google.com/terms/service-terms#1). Pre-GA products and "
+ 'features are available "as is" and might have limited support. For '
+ "more information, see the launch stage descriptions "
+ "(https://cloud.google.com/products#product-launch-stages)."
)
+ warnings.warn(msg, category=exceptions.PreviewWarning)
self.model_name = model_name
self.session = session or bpd.get_global_session()
self.max_iterations = max_iterations
@@ -849,12 +857,11 @@ def _create_bqml_model(self):
)
if self.model_name not in _GEMINI_ENDPOINTS:
- warnings.warn(
- _MODEL_NOT_SUPPORTED_WARNING.format(
- model_name=self.model_name,
- known_models=", ".join(_GEMINI_ENDPOINTS),
- )
+ msg = _MODEL_NOT_SUPPORTED_WARNING.format(
+ model_name=self.model_name,
+ known_models=", ".join(_GEMINI_ENDPOINTS),
)
+ warnings.warn(msg)
options = {"endpoint": self.model_name}
@@ -891,6 +898,14 @@ def _bqml_options(self) -> dict:
}
return options
+ @property
+ def _predict_func(self) -> Callable[[bpd.DataFrame, Mapping], bpd.DataFrame]:
+ return self._bqml_model.generate_text
+
+ @property
+ def _status_col(self) -> str:
+ return _ML_GENERATE_TEXT_STATUS
+
def fit(
self,
X: utils.ArrayType,
@@ -1028,41 +1043,7 @@ def predict(
"ground_with_google_search": ground_with_google_search,
}
- df_result = bpd.DataFrame(session=self._bqml_model.session) # placeholder
- df_fail = X
- for _ in range(max_retries + 1):
- df = self._bqml_model.generate_text(df_fail, options)
-
- success = df[_ML_GENERATE_TEXT_STATUS].str.len() == 0
- df_succ = df[success]
- df_fail = df[~success]
-
- if df_succ.empty:
- if max_retries > 0:
- warnings.warn(
- "Can't make any progress, stop retrying.", RuntimeWarning
- )
- break
-
- df_result = (
- bpd.concat([df_result, df_succ]) if not df_result.empty else df_succ
- )
-
- if df_fail.empty:
- break
-
- if not df_fail.empty:
- warnings.warn(
- f"Some predictions failed. Check column {_ML_GENERATE_TEXT_STATUS} for detailed status. You may want to filter the failed rows and retry.",
- RuntimeWarning,
- )
-
- df_result = cast(
- bpd.DataFrame,
- bpd.concat([df_result, df_fail]) if not df_result.empty else df_fail,
- )
-
- return df_result
+ return self._predict_and_retry(X, options=options, max_retries=max_retries)
def score(
self,
@@ -1144,7 +1125,7 @@ def to_gbq(self, model_name: str, replace: bool = False) -> GeminiTextGenerator:
@log_adapter.class_logger
-class Claude3TextGenerator(base.BaseEstimator):
+class Claude3TextGenerator(base.RetriableRemotePredictor):
"""Claude3 text generator LLM model.
Go to Google Cloud Console -> Vertex AI -> Model Garden page to enabe the models before use. Must have the Consumer Procurement Entitlement Manager Identity and Access Management (IAM) role to enable the models.
@@ -1223,13 +1204,11 @@ def _create_bqml_model(self):
)
if self.model_name not in _CLAUDE_3_ENDPOINTS:
- warnings.warn(
- _MODEL_NOT_SUPPORTED_WARNING.format(
- model_name=self.model_name,
- known_models=", ".join(_CLAUDE_3_ENDPOINTS),
- )
+ msg = _MODEL_NOT_SUPPORTED_WARNING.format(
+ model_name=self.model_name,
+ known_models=", ".join(_CLAUDE_3_ENDPOINTS),
)
-
+ warnings.warn(msg)
options = {
"endpoint": self.model_name,
}
@@ -1273,6 +1252,14 @@ def _bqml_options(self) -> dict:
}
return options
+ @property
+ def _predict_func(self) -> Callable[[bpd.DataFrame, Mapping], bpd.DataFrame]:
+ return self._bqml_model.generate_text
+
+ @property
+ def _status_col(self) -> str:
+ return _ML_GENERATE_TEXT_STATUS
+
def predict(
self,
X: utils.ArrayType,
@@ -1280,6 +1267,7 @@ def predict(
max_output_tokens: int = 128,
top_k: int = 40,
top_p: float = 0.95,
+ max_retries: int = 0,
) -> bpd.DataFrame:
"""Predict the result from input DataFrame.
@@ -1307,6 +1295,10 @@ def predict(
Specify a lower value for less random responses and a higher value for more random responses.
Default 0.95. Possible values [0.0, 1.0].
+ max_retries (int, default 0):
+ Max number of retries if the prediction for any rows failed. Each try needs to make progress (i.e. has successfully predicted rows) to continue the retry.
+ Each retry will append newly succeeded rows. When the max retries are reached, the remaining rows (the ones without successful predictions) will be appended to the end of the result.
+
Returns:
bigframes.dataframe.DataFrame: DataFrame of shape (n_samples, n_input_columns + n_prediction_columns). Returns predicted values.
@@ -1324,6 +1316,11 @@ def predict(
if top_p < 0.0 or top_p > 1.0:
raise ValueError(f"top_p must be [0.0, 1.0], but is {top_p}.")
+ if max_retries < 0:
+ raise ValueError(
+ f"max_retries must be larger than or equal to 0, but is {max_retries}."
+ )
+
(X,) = utils.batch_convert_to_dataframe(X, session=self._bqml_model.session)
if len(X.columns) == 1:
@@ -1338,15 +1335,7 @@ def predict(
"flatten_json_output": True,
}
- df = self._bqml_model.generate_text(X, options)
-
- if (df[_ML_GENERATE_TEXT_STATUS] != "").any():
- warnings.warn(
- f"Some predictions failed. Check column {_ML_GENERATE_TEXT_STATUS} for detailed status. You may want to filter the failed rows and retry.",
- RuntimeWarning,
- )
-
- return df
+ return self._predict_and_retry(X, options=options, max_retries=max_retries)
def to_gbq(self, model_name: str, replace: bool = False) -> Claude3TextGenerator:
"""Save the model to BigQuery.
diff --git a/bigframes/ml/remote.py b/bigframes/ml/remote.py
index bb614b9da5..f4f55ad34e 100644
--- a/bigframes/ml/remote.py
+++ b/bigframes/ml/remote.py
@@ -139,9 +139,10 @@ def predict(
# unlike LLM models, the general remote model status is null for successful runs.
if (df[_REMOTE_MODEL_STATUS].notna()).any():
- warnings.warn(
- f"Some predictions failed. Check column {_REMOTE_MODEL_STATUS} for detailed status. You may want to filter the failed rows and retry.",
- RuntimeWarning,
+ msg = (
+ f"Some predictions failed. Check column {_REMOTE_MODEL_STATUS} for "
+ "detailed status. You may want to filter the failed rows and retry."
)
+ warnings.warn(msg, category=RuntimeWarning)
return df
diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py
index 2884d56551..37a40b7d01 100644
--- a/bigframes/operations/__init__.py
+++ b/bigframes/operations/__init__.py
@@ -740,6 +740,34 @@ def output_type(self, *input_types):
)
+@dataclasses.dataclass(frozen=True)
+class ParseJSON(UnaryOp):
+ name: typing.ClassVar[str] = "parse_json"
+
+ def output_type(self, *input_types):
+ input_type = input_types[0]
+ if input_type != dtypes.STRING_DTYPE:
+ raise TypeError(
+ "Input type must be an valid JSON-formatted string type."
+ + f" Received type: {input_type}"
+ )
+ return dtypes.JSON_DTYPE
+
+
+@dataclasses.dataclass(frozen=True)
+class ToJSONString(UnaryOp):
+ name: typing.ClassVar[str] = "to_json_string"
+
+ def output_type(self, *input_types):
+ input_type = input_types[0]
+ if not dtypes.is_json_like(input_type):
+ raise TypeError(
+ "Input type must be an valid JSON object or JSON-formatted string type."
+ + f" Received type: {input_type}"
+ )
+ return dtypes.STRING_DTYPE
+
+
## Blob Ops
@dataclasses.dataclass(frozen=True)
class ObjGetAccessUrl(UnaryOp):
diff --git a/bigframes/operations/_matplotlib/core.py b/bigframes/operations/_matplotlib/core.py
index b7c926be99..9c68a2c5ca 100644
--- a/bigframes/operations/_matplotlib/core.py
+++ b/bigframes/operations/_matplotlib/core.py
@@ -70,11 +70,10 @@ def _compute_sample_data(self, data):
if self._sampling_warning_msg is not None:
total_n = data.shape[0]
if sampling_n < total_n:
- warnings.warn(
- self._sampling_warning_msg.format(
- sampling_n=sampling_n, total_n=total_n
- )
+ msg = self._sampling_warning_msg.format(
+ sampling_n=sampling_n, total_n=total_n
)
+ warnings.warn(msg)
sampling_random_state = self.kwargs.pop(
"sampling_random_state", DEFAULT_SAMPLING_STATE
diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py
index 6b7f56d708..9de58fe5db 100644
--- a/bigframes/operations/aggregations.py
+++ b/bigframes/operations/aggregations.py
@@ -379,6 +379,19 @@ def skips_nulls(self):
return True
+# This should really by a NullaryWindowOp, but APIs don't support that yet.
+@dataclasses.dataclass(frozen=True)
+class RowNumberOp(UnaryWindowOp):
+ name: ClassVar[str] = "rownumber"
+
+ @property
+ def skips_nulls(self):
+ return False
+
+ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType:
+ return dtypes.INT_DTYPE
+
+
@dataclasses.dataclass(frozen=True)
class RankOp(UnaryWindowOp):
name: ClassVar[str] = "rank"
diff --git a/bigframes/operations/blob.py b/bigframes/operations/blob.py
index c074c72971..f78db2b6fc 100644
--- a/bigframes/operations/blob.py
+++ b/bigframes/operations/blob.py
@@ -14,9 +14,14 @@
from __future__ import annotations
+import os
+from typing import cast, Optional, Union
+
import IPython.display as ipy_display
import requests
+from bigframes import clients
+import bigframes.dataframe
from bigframes.operations import base
import bigframes.operations as ops
import bigframes.series
@@ -66,3 +71,63 @@ def display(self, n: int = 3):
read_url = str(read_url).strip('"')
response = requests.get(read_url)
ipy_display.display(ipy_display.Image(response.content))
+
+ def image_blur(
+ self,
+ ksize: tuple[int, int],
+ *,
+ dst: Union[str, bigframes.series.Series],
+ connection: Optional[str] = None,
+ ) -> bigframes.series.Series:
+ """Blurs images.
+
+ .. note::
+ BigFrames Blob is still under experiments. It may not work and subject to change in the future.
+
+ Args:
+ ksize (tuple(int, int)): Kernel size.
+ dst (str or bigframes.series.Series): Destination GCS folder str or blob series.
+ connection (str or None, default None): BQ connection used for function internet transactions, and the output blob if "dst" is str. If None, uses default connection of the session.
+
+ Returns:
+ BigFrames Blob Series
+ """
+ import bigframes.blob._functions as blob_func
+
+ connection = connection or self._block.session._bq_connection
+ connection = clients.resolve_full_bq_connection_name(
+ connection,
+ default_project=self._block.session._project,
+ default_location=self._block.session._location,
+ )
+
+ if isinstance(dst, str):
+ dst = os.path.join(dst, "")
+ src_uri = bigframes.series.Series(self._block).struct.explode()["uri"]
+ # Replace src folder with dst folder, keep the file names.
+ dst_uri = src_uri.str.replace(r"^.*\/(.*)$", rf"{dst}\1", regex=True)
+ dst = cast(
+ bigframes.series.Series, dst_uri.str.to_blob(connection=connection)
+ )
+
+ image_blur_udf = blob_func.TransformFunction(
+ blob_func.image_blur_def,
+ session=self._block.session,
+ connection=connection,
+ ).udf()
+
+ src_rt = bigframes.series.Series(self._block)._apply_unary_op(
+ ops.ObjGetAccessUrl(mode="R")
+ )
+ dst_rt = dst._apply_unary_op(ops.ObjGetAccessUrl(mode="RW"))
+
+ src_rt = src_rt._apply_unary_op(ops.ToJSONString())
+ dst_rt = dst_rt._apply_unary_op(ops.ToJSONString())
+
+ df = src_rt.to_frame().join(dst_rt.to_frame(), how="outer")
+ df["ksize_x"], df["ksize_y"] = ksize
+
+ res = df.apply(image_blur_udf, axis=1)
+ res.cache() # to execute the udf
+
+ return dst
diff --git a/bigframes/operations/semantics.py b/bigframes/operations/semantics.py
index 6a537db4f3..a2bf18a41d 100644
--- a/bigframes/operations/semantics.py
+++ b/bigframes/operations/semantics.py
@@ -140,10 +140,11 @@ def agg(
column = columns[0]
if ground_with_google_search:
- warnings.warn(
+ msg = (
"Enables Grounding with Google Search may impact billing cost. See pricing "
"details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models"
)
+ warnings.warn(msg)
user_instruction = self._format_instruction(instruction, columns)
@@ -370,10 +371,11 @@ def filter(self, instruction: str, model, ground_with_google_search: bool = Fals
raise ValueError(f"Column {column} not found.")
if ground_with_google_search:
- warnings.warn(
+ msg = (
"Enables Grounding with Google Search may impact billing cost. See pricing "
"details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models"
)
+ warnings.warn(msg)
self._confirm_operation(len(self._df))
@@ -468,10 +470,11 @@ def map(
raise ValueError(f"Column {column} not found.")
if ground_with_google_search:
- warnings.warn(
+ msg = (
"Enables Grounding with Google Search may impact billing cost. See pricing "
"details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models"
)
+ warnings.warn(msg)
self._confirm_operation(len(self._df))
@@ -569,10 +572,11 @@ def join(
columns = self._parse_columns(instruction)
if ground_with_google_search:
- warnings.warn(
+ msg = (
"Enables Grounding with Google Search may impact billing cost. See pricing "
"details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models"
)
+ warnings.warn(msg)
work_estimate = len(self._df) * len(other)
self._confirm_operation(work_estimate)
@@ -811,10 +815,11 @@ def top_k(
)
if ground_with_google_search:
- warnings.warn(
+ msg = (
"Enables Grounding with Google Search may impact billing cost. See pricing "
"details: https://cloud.google.com/vertex-ai/generative-ai/pricing#google_models"
)
+ warnings.warn(msg)
work_estimate = int(len(self._df) * (len(self._df) - 1) / 2)
self._confirm_operation(work_estimate)
diff --git a/bigframes/pandas/__init__.py b/bigframes/pandas/__init__.py
index 9c3c98ec55..395b573916 100644
--- a/bigframes/pandas/__init__.py
+++ b/bigframes/pandas/__init__.py
@@ -163,7 +163,8 @@ def get_default_session_id() -> str:
the table id of all temporary tables created in the global session.
Returns:
- str, the default global session id, ex. 'sessiona1b2c'
+ str:
+ The default global session id, ex. 'sessiona1b2c'
"""
return get_global_session().session_id
diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py
index edac7efa4b..d787f8e7f3 100644
--- a/bigframes/session/__init__.py
+++ b/bigframes/session/__init__.py
@@ -66,6 +66,7 @@
import bigframes.dataframe
import bigframes.dtypes
import bigframes.exceptions
+import bigframes.exceptions as bfe
import bigframes.functions._remote_function_session as bigframes_rf_session
import bigframes.functions.remote_function as bigframes_rf
import bigframes.session._io.bigquery as bf_io_bigquery
@@ -150,25 +151,22 @@ def __init__(
if context.location is None:
self._location = "US"
- warnings.warn(
- f"No explicit location is set, so using location {self._location} for the session.",
- # User's code
- # -> get_global_session()
- # -> connect()
- # -> Session()
- #
- # Note: We could also have:
- # User's code
- # -> read_gbq()
- # -> with_default_session()
- # -> get_global_session()
- # -> connect()
- # -> Session()
- # but we currently have no way to disambiguate these
- # situations.
- stacklevel=4,
- category=bigframes.exceptions.DefaultLocationWarning,
- )
+ msg = f"No explicit location is set, so using location {self._location} for the session."
+ # User's code
+ # -> get_global_session()
+ # -> connect()
+ # -> Session()
+ #
+ # Note: We could also have:
+ # User's code
+ # -> read_gbq()
+ # -> with_default_session()
+ # -> get_global_session()
+ # -> connect()
+ # -> Session()
+ # but we currently have no way to disambiguate these
+ # situations.
+ warnings.warn(msg, stacklevel=4, category=bfe.DefaultLocationWarning)
else:
self._location = context.location
@@ -236,10 +234,8 @@ def __init__(
# Will expose as feature later, only False for internal testing
self._strictly_ordered: bool = context.ordering_mode != "partial"
if not self._strictly_ordered:
- warnings.warn(
- "Partial ordering mode is a preview feature and is subject to change.",
- bigframes.exceptions.OrderingModePartialPreviewWarning,
- )
+ msg = "Partial ordering mode is a preview feature and is subject to change."
+ warnings.warn(msg, bfe.OrderingModePartialPreviewWarning)
self._allow_ambiguity = not self._strictly_ordered
self._default_index_type = (
@@ -604,11 +600,8 @@ def read_gbq_table_streaming(
bigframes.streaming.dataframe.StreamingDataFrame:
A StreamingDataFrame representing results of the table.
"""
- warnings.warn(
- "The bigframes.streaming module is a preview feature, and subject to change.",
- stacklevel=1,
- category=bigframes.exceptions.PreviewWarning,
- )
+ msg = "The bigframes.streaming module is a preview feature, and subject to change."
+ warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning)
import bigframes.streaming.dataframe as streaming_dataframe
@@ -1370,13 +1363,14 @@ def remote_function(
`all`, `internal-only`, `internal-and-gclb`. See for more details
https://cloud.google.com/functions/docs/networking/network-settings#ingress_settings.
Returns:
- callable: A remote function object pointing to the cloud assets created
- in the background to support the remote execution. The cloud assets can be
- located through the following properties set in the object:
+ collections.abc.Callable:
+ A remote function object pointing to the cloud assets created
+ in the background to support the remote execution. The cloud assets can be
+ located through the following properties set in the object:
- `bigframes_cloud_function` - The google cloud function deployed for the user defined code.
+ `bigframes_cloud_function` - The google cloud function deployed for the user defined code.
- `bigframes_remote_function` - The bigquery remote function capable of calling into `bigframes_cloud_function`.
+ `bigframes_remote_function` - The bigquery remote function capable of calling into `bigframes_cloud_function`.
"""
return self._remote_function_session.remote_function(
input_types,
@@ -1545,12 +1539,13 @@ def read_gbq_function(
a pandas Series.
Returns:
- callable: A function object pointing to the BigQuery function read
- from BigQuery.
+ collections.abc.Callable:
+ A function object pointing to the BigQuery function read
+ from BigQuery.
- The object is similar to the one created by the `remote_function`
- decorator, including the `bigframes_remote_function` property, but
- not including the `bigframes_cloud_function` property.
+ The object is similar to the one created by the `remote_function`
+ decorator, including the `bigframes_remote_function` property, but
+ not including the `bigframes_cloud_function` property.
"""
return bigframes_rf.read_gbq_function(
@@ -1590,7 +1585,7 @@ def _start_query_ml_ddl(
job_config.destination_encryption_configuration = None
return bf_io_bigquery.start_query_with_client(
- self.bqclient, sql, job_config, metrics=self._metrics
+ self.bqclient, sql, job_config=job_config, metrics=self._metrics
)
def _create_object_table(self, path: str, connection: str) -> str:
diff --git a/bigframes/session/_io/bigquery/__init__.py b/bigframes/session/_io/bigquery/__init__.py
index b7706d34ca..6a5ba3f4c7 100644
--- a/bigframes/session/_io/bigquery/__init__.py
+++ b/bigframes/session/_io/bigquery/__init__.py
@@ -40,7 +40,7 @@
IO_ORDERING_ID = "bqdf_row_nums"
-MAX_LABELS_COUNT = 64
+MAX_LABELS_COUNT = 64 - 8
_LIST_TABLES_LIMIT = 10000 # calls to bqclient.list_tables
# will be limited to this many tables
@@ -204,7 +204,12 @@ def format_option(key: str, value: Union[bool, str]) -> str:
return f"{key}={repr(value)}"
-def add_labels(job_config, api_name: Optional[str] = None):
+def add_and_trim_labels(job_config, api_name: Optional[str] = None):
+ """
+ Add additional labels to the job configuration and trim the total number of labels
+ to ensure they do not exceed the maximum limit allowed by BigQuery, which is 64
+ labels per job.
+ """
api_methods = log_adapter.get_and_reset_api_methods(dry_run=job_config.dry_run)
job_config.labels = create_job_configs_labels(
job_configs_labels=job_config.labels,
@@ -217,7 +222,10 @@ def start_query_with_client(
bq_client: bigquery.Client,
sql: str,
job_config: bigquery.job.QueryJobConfig,
+ location: Optional[str] = None,
+ project: Optional[str] = None,
max_results: Optional[int] = None,
+ page_size: Optional[int] = None,
timeout: Optional[float] = None,
api_name: Optional[str] = None,
metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None,
@@ -225,10 +233,17 @@ def start_query_with_client(
"""
Starts query job and waits for results.
"""
- add_labels(job_config, api_name=api_name)
-
try:
- query_job = bq_client.query(sql, job_config=job_config, timeout=timeout)
+ # Note: Ensure no additional labels are added to job_config after this point,
+ # as `add_and_trim_labels` ensures the label count does not exceed 64.
+ add_and_trim_labels(job_config, api_name=api_name)
+ query_job = bq_client.query(
+ sql,
+ job_config=job_config,
+ location=location,
+ project=project,
+ timeout=timeout,
+ )
except google.api_core.exceptions.Forbidden as ex:
if "Drive credentials" in ex.message:
ex.message += CHECK_DRIVE_PERMISSIONS
@@ -237,10 +252,15 @@ def start_query_with_client(
opts = bigframes.options.display
if opts.progress_bar is not None and not query_job.configuration.dry_run:
results_iterator = formatting_helpers.wait_for_query_job(
- query_job, max_results=max_results, progress_bar=opts.progress_bar
+ query_job,
+ max_results=max_results,
+ progress_bar=opts.progress_bar,
+ page_size=page_size,
)
else:
- results_iterator = query_job.result(max_results=max_results)
+ results_iterator = query_job.result(
+ max_results=max_results, page_size=page_size
+ )
if metrics is not None:
metrics.count_job_stats(query_job)
@@ -304,11 +324,15 @@ def create_bq_dataset_reference(
bigquery.DatasetReference: The constructed reference to the anonymous dataset.
"""
job_config = google.cloud.bigquery.QueryJobConfig()
- add_labels(job_config, api_name=api_name)
- query_job = bq_client.query(
- "SELECT 1", location=location, project=project, job_config=job_config
+
+ _, query_job = start_query_with_client(
+ bq_client,
+ "SELECT 1",
+ location=location,
+ job_config=job_config,
+ project=project,
+ api_name=api_name,
)
- query_job.result() # blocks until finished
# The anonymous dataset is used by BigQuery to write query results and
# session tables. BigQuery DataFrames also writes temp tables directly
diff --git a/bigframes/session/_io/bigquery/read_gbq_table.py b/bigframes/session/_io/bigquery/read_gbq_table.py
index 4044b7bf43..6114427570 100644
--- a/bigframes/session/_io/bigquery/read_gbq_table.py
+++ b/bigframes/session/_io/bigquery/read_gbq_table.py
@@ -33,6 +33,7 @@
import bigframes.core.compile.default_ordering
import bigframes.core.sql
import bigframes.dtypes
+import bigframes.exceptions as bfe
import bigframes.session._io.bigquery
import bigframes.session.clients
import bigframes.version
@@ -59,21 +60,21 @@ def get_table_metadata(
# Cache hit could be unexpected. See internal issue 329545805.
# Raise a warning with more information about how to avoid the
# problems with the cache.
- warnings.warn(
+ msg = (
f"Reading cached table from {snapshot_timestamp} to avoid "
"incompatibilies with previous reads of this table. To read "
"the latest version, set `use_cache=False` or close the "
"current session with Session.close() or "
- "bigframes.pandas.close_session().",
- # There are many layers before we get to (possibly) the user's code:
- # pandas.read_gbq_table
- # -> with_default_session
- # -> Session.read_gbq_table
- # -> _read_gbq_table
- # -> _get_snapshot_sql_and_primary_key
- # -> get_snapshot_datetime_and_table_metadata
- stacklevel=7,
+ "bigframes.pandas.close_session()."
)
+ # There are many layers before we get to (possibly) the user's code:
+ # pandas.read_gbq_table
+ # -> with_default_session
+ # -> Session.read_gbq_table
+ # -> _read_gbq_table
+ # -> _get_snapshot_sql_and_primary_key
+ # -> get_snapshot_datetime_and_table_metadata
+ warnings.warn(msg, stacklevel=7)
return cached_table
table = bqclient.get_table(table_ref)
@@ -104,13 +105,13 @@ def validate_table(
# Only true tables support time travel
elif table.table_type != "TABLE":
if table.table_type == "MATERIALIZED_VIEW":
- warnings.warn(
+ msg = (
"Materialized views do not support FOR SYSTEM_TIME AS OF queries. "
"Attempting query without time travel. Be aware that as materialized views "
"are updated periodically, modifications to the underlying data in the view may "
- "result in errors or unexpected behavior.",
- category=bigframes.exceptions.TimeTravelDisabledWarning,
+ "result in errors or unexpected behavior."
)
+ warnings.warn(msg, category=bfe.TimeTravelDisabledWarning)
else:
# table might support time travel, lets do a dry-run query with time travel
snapshot_sql = bigframes.session._io.bigquery.to_query(
@@ -142,13 +143,13 @@ def validate_table(
snapshot_sql, job_config=bigquery.QueryJobConfig(dry_run=True)
)
if time_travel_not_found:
- warnings.warn(
+ msg = (
"NotFound error when reading table with time travel."
" Attempting query without time travel. Warning: Without"
" time travel, modifications to the underlying table may"
- " result in errors or unexpected behavior.",
- category=bigframes.exceptions.TimeTravelDisabledWarning,
+ " result in errors or unexpected behavior."
)
+ warnings.warn(msg, category=bfe.TimeTravelDisabledWarning)
return False
@@ -263,15 +264,15 @@ def get_index_cols(
# resource utilization because of the default sequential index. See
# internal issue 335727141.
if _is_table_clustered_or_partitioned(table) and not primary_keys:
- warnings.warn(
+ msg = (
f"Table '{str(table.reference)}' is clustered and/or "
"partitioned, but BigQuery DataFrames was not able to find a "
"suitable index. To avoid this warning, set at least one of: "
# TODO(b/338037499): Allow max_results to override this too,
# once we make it more efficient.
- "`index_col` or `filters`.",
- category=bigframes.exceptions.DefaultIndexWarning,
+ "`index_col` or `filters`."
)
+ warnings.warn(msg, category=bfe.DefaultIndexWarning)
# If there are primary keys defined, the query engine assumes these
# columns are unique, even if the constraint is not enforced. We make
@@ -296,21 +297,21 @@ def get_time_travel_datetime_and_table_metadata(
# Cache hit could be unexpected. See internal issue 329545805.
# Raise a warning with more information about how to avoid the
# problems with the cache.
- warnings.warn(
+ msg = (
f"Reading cached table from {snapshot_timestamp} to avoid "
"incompatibilies with previous reads of this table. To read "
"the latest version, set `use_cache=False` or close the "
"current session with Session.close() or "
- "bigframes.pandas.close_session().",
- # There are many layers before we get to (possibly) the user's code:
- # pandas.read_gbq_table
- # -> with_default_session
- # -> Session.read_gbq_table
- # -> _read_gbq_table
- # -> _get_snapshot_sql_and_primary_key
- # -> get_snapshot_datetime_and_table_metadata
- stacklevel=7,
+ "bigframes.pandas.close_session()."
)
+ # There are many layers before we get to (possibly) the user's code:
+ # pandas.read_gbq_table
+ # -> with_default_session
+ # -> Session.read_gbq_table
+ # -> _read_gbq_table
+ # -> _get_snapshot_sql_and_primary_key
+ # -> get_snapshot_datetime_and_table_metadata
+ warnings.warn(msg, stacklevel=7)
return cached_table
# TODO(swast): It's possible that the table metadata is changed between now
diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py
index 01476ed113..553c3fd6e6 100644
--- a/bigframes/session/executor.py
+++ b/bigframes/session/executor.py
@@ -48,7 +48,6 @@
import bigframes.core.schema
import bigframes.core.tree_properties as tree_properties
import bigframes.features
-import bigframes.formatting_helpers as formatting_helpers
import bigframes.session._io.bigquery as bq_io
import bigframes.session.metrics
import bigframes.session.planner
@@ -347,10 +346,14 @@ def export_gcs(
format=format,
export_options=dict(export_options),
)
- job_config = bigquery.QueryJobConfig()
- bq_io.add_labels(job_config, api_name=f"dataframe-to_{format.lower()}")
- export_job = self.bqclient.query(export_data_statement, job_config=job_config)
- self._wait_on_job(export_job)
+
+ bq_io.start_query_with_client(
+ self.bqclient,
+ export_data_statement,
+ job_config=bigquery.QueryJobConfig(),
+ api_name=f"dataframe-to_{format.lower()}",
+ metrics=self.metrics,
+ )
return query_job
def dry_run(
@@ -358,9 +361,7 @@ def dry_run(
) -> bigquery.QueryJob:
sql = self.to_sql(array_value, ordered=ordered)
job_config = bigquery.QueryJobConfig(dry_run=True)
- bq_io.add_labels(job_config)
query_job = self.bqclient.query(sql, job_config=job_config)
- _ = query_job.result()
return query_job
def peek(
@@ -373,7 +374,8 @@ def peek(
"""
plan = self.replace_cached_subtrees(array_value.node)
if not tree_properties.can_fast_peek(plan):
- warnings.warn("Peeking this value cannot be done efficiently.")
+ msg = "Peeking this value cannot be done efficiently."
+ warnings.warn(msg)
sql = self.compiler.compile_peek(plan, n_rows)
@@ -487,15 +489,19 @@ def _run_execute_query(
if not self.strictly_ordered:
job_config.labels["bigframes-mode"] = "unordered"
- # Note: add_labels is global scope which may have unexpected effects
- bq_io.add_labels(job_config, api_name=api_name)
+ # Note: add_and_trim_labels is global scope which may have unexpected effects
+ # Ensure no additional labels are added to job_config after this point,
+ # as `add_and_trim_labels` ensures the label count does not exceed 64.
+ bq_io.add_and_trim_labels(job_config, api_name=api_name)
try:
- query_job = self.bqclient.query(sql, job_config=job_config)
- return (
- self._wait_on_job(
- query_job, max_results=max_results, page_size=page_size
- ),
- query_job,
+ return bq_io.start_query_with_client(
+ self.bqclient,
+ sql,
+ job_config=job_config,
+ api_name=api_name,
+ max_results=max_results,
+ page_size=page_size,
+ metrics=self.metrics,
)
except google.api_core.exceptions.BadRequest as e:
@@ -506,29 +512,6 @@ def _run_execute_query(
else:
raise
- def _wait_on_job(
- self,
- query_job: bigquery.QueryJob,
- page_size: Optional[int] = None,
- max_results: Optional[int] = None,
- ) -> bq_table.RowIterator:
- opts = bigframes.options.display
- if opts.progress_bar is not None and not query_job.configuration.dry_run:
- results_iterator = formatting_helpers.wait_for_query_job(
- query_job,
- progress_bar=opts.progress_bar,
- max_results=max_results,
- page_size=page_size,
- )
- else:
- results_iterator = query_job.result(
- max_results=max_results, page_size=page_size
- )
-
- if self.metrics is not None:
- self.metrics.count_job_stats(query_job)
- return results_iterator
-
def replace_cached_subtrees(self, node: nodes.BigFrameNode) -> nodes.BigFrameNode:
return nodes.top_down(
node, lambda x: self._cached_executions.get(x, x), memoize=True
diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py
index e7579b1138..ec922e286d 100644
--- a/bigframes/session/loader.py
+++ b/bigframes/session/loader.py
@@ -707,9 +707,9 @@ def _start_query(
return bf_io_bigquery.start_query_with_client(
self._bqclient,
sql,
- job_config,
- max_results,
- timeout,
+ job_config=job_config,
+ max_results=max_results,
+ timeout=timeout,
api_name=api_name,
)
diff --git a/bigframes/streaming/dataframe.py b/bigframes/streaming/dataframe.py
index b83ae5d822..960da9f57c 100644
--- a/bigframes/streaming/dataframe.py
+++ b/bigframes/streaming/dataframe.py
@@ -26,6 +26,7 @@
import bigframes
from bigframes import dataframe
from bigframes.core import log_adapter
+import bigframes.exceptions as bfe
def _return_type_wrapper(method, cls):
@@ -347,11 +348,8 @@ def _to_bigtable(
For example, the job can be cancelled or its error status
can be examined.
"""
- warnings.warn(
- "The bigframes.streaming module is a preview feature, and subject to change.",
- stacklevel=1,
- category=bigframes.exceptions.PreviewWarning,
- )
+ msg = "The bigframes.streaming module is a preview feature, and subject to change."
+ warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning)
# get default client if not passed
if session is None:
@@ -462,11 +460,8 @@ def _to_pubsub(
For example, the job can be cancelled or its error status
can be examined.
"""
- warnings.warn(
- "The bigframes.streaming module is a preview feature, and subject to change.",
- stacklevel=1,
- category=bigframes.exceptions.PreviewWarning,
- )
+ msg = "The bigframes.streaming module is a preview feature, and subject to change."
+ warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning)
# get default client if not passed
if session is None:
diff --git a/bigframes/version.py b/bigframes/version.py
index 7b6d1f2153..0858c02c1e 100644
--- a/bigframes/version.py
+++ b/bigframes/version.py
@@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-__version__ = "1.31.0"
+__version__ = "1.32.0"
diff --git a/notebooks/experimental/semantic_operators.ipynb b/notebooks/experimental/semantic_operators.ipynb
index f9c7f67358..d3fec469b4 100644
--- a/notebooks/experimental/semantic_operators.ipynb
+++ b/notebooks/experimental/semantic_operators.ipynb
@@ -44,6 +44,12 @@
" View on GitHub\n",
" \n",
" \n",
+ "