Skip to content

Commit 7d44011

Browse files
authored
feat: support automatic per-cell execution history filtering and isolated callbacks (#17144)
This change introduces scoped query tracking and event callback management for BigQuery DataFrames within interactive notebook environments (Jupyter/Colab). Key Changes - Jupyter Cell Scoping: Resolves and carries the active IPython cell execution count (cell_execution_count) through TableWidget, ExecutionSpec, query executors, and final JobMetadata. - Execution History Filtering: Adds events, job_ids, and all_cells parameters to session.execution_history(). When all_cells=False, it filters query logs down to the current active notebook cell. - Scoped Callback Support: Adds a callback parameter to _read_gbq_colab that automatically subscribes to the query progress publisher during execution and automatically unsubscribes upon completion. - Robustness Fixes: 1. Instantiates expected schema/columns in _ExecutionHistory even when the dataframe is empty to prevent indexing errors. 2. Converts custom option mappings to native Python dicts when assigning query labels to avoid validation errors in the underlying BigQuery client. 3. Captures and propagates query_id in BigQueryFinishedEvent. Verified at: [go/scrcast/NjQzOTAzMTUwMzA2MDk5MnwzZWQ2MTMzYS0xYg](http://goto.google.com/scrcast/NjQzOTAzMTUwMzA2MDk5MnwzZWQ2MTMzYS0xYg) Colab notebook test: screen/7d6Yt3C28BUAKEH Fixes #<513337964> 🦕
1 parent c7bb44c commit 7d44011

18 files changed

Lines changed: 424 additions & 56 deletions

File tree

packages/bigframes/bigframes/core/blocks.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -696,6 +696,7 @@ def to_pandas_batches(
696696
page_size: Optional[int] = None,
697697
max_results: Optional[int] = None,
698698
allow_large_results: Optional[bool] = None,
699+
cell_execution_count: Optional[int] = None,
699700
) -> PandasBatches:
700701
"""Download results one message at a time.
701702
@@ -713,6 +714,7 @@ def to_pandas_batches(
713714
execution_spec.ExecutionSpec(
714715
promise_under_10gb=under_10gb,
715716
ordered=True,
717+
cell_execution_count=cell_execution_count,
716718
),
717719
)
718720
result_batches = execution_result.batches()

packages/bigframes/bigframes/core/events.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import datetime
2121
import threading
2222
import uuid
23-
from typing import Any, Callable, Literal, Set
23+
from typing import Any, Callable, Literal, Optional, Set
2424

2525
import google.cloud.bigquery._job_helpers
2626
import google.cloud.bigquery.job.query
@@ -127,8 +127,22 @@ class Event:
127127

128128
@dataclasses.dataclass(frozen=True)
129129
class EventEnvelope:
130+
"""An envelope that wraps an execution event with metadata and display options.
131+
132+
Attributes:
133+
event:
134+
The actual execution event details (e.g., ExecutionStarted, BigQuerySentEvent).
135+
progress_bar:
136+
Specifies the style of progress bar to display during execution.
137+
cell_execution_count:
138+
The 1-indexed IPython/Jupyter notebook cell execution number (e.g. the 'x' in 'In [x]').
139+
This is NOT a job count, but rather the sequential number of the cell execution in the
140+
current notebook session, used to group and filter execution history on a per-cell basis.
141+
"""
142+
130143
event: Event
131144
progress_bar: ProgressBarType = _DEFAULT
145+
cell_execution_count: Optional[int] = None
132146

133147

134148
@dataclasses.dataclass(frozen=True)

packages/bigframes/bigframes/core/global_session.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import threading
2020
import traceback
2121
import warnings
22-
from typing import TYPE_CHECKING, Callable, Optional, TypeVar
22+
from typing import TYPE_CHECKING, Callable, Iterable, Optional, TypeVar
2323

2424
import google.auth.exceptions
2525

@@ -124,12 +124,20 @@ def with_default_session(func_: Callable[..., _T], *args, **kwargs) -> _T:
124124
return func_(get_global_session(), *args, **kwargs)
125125

126126

127-
def execution_history() -> "bigframes.session._ExecutionHistory":
128-
import pandas # noqa: F401
129-
127+
def execution_history(
128+
*,
129+
events: Optional[Iterable[bigframes.core.events.Event]] = None,
130+
job_ids: Optional[Iterable[str]] = None,
131+
all_cells: bool = True,
132+
) -> "bigframes.session._ExecutionHistory":
130133
import bigframes.session
131134

132-
return with_default_session(bigframes.session.Session.execution_history)
135+
return with_default_session(
136+
bigframes.session.Session.execution_history,
137+
events=events,
138+
job_ids=job_ids,
139+
all_cells=all_cells,
140+
)
133141

134142

135143
class _GlobalSessionContext:

packages/bigframes/bigframes/core/utils.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,3 +249,16 @@ def timedelta_to_micros(
249249
) * 1_000_000 + timedelta.microseconds
250250

251251
raise TypeError(f"Unrecognized input type: {type(timedelta)}")
252+
253+
254+
def get_ipython_execution_count() -> typing.Optional[int]:
255+
"""Returns the current IPython cell execution count if running in a notebook, else None."""
256+
try:
257+
from IPython.core.interactiveshell import InteractiveShell
258+
259+
if InteractiveShell.initialized():
260+
ipy = InteractiveShell.instance()
261+
return getattr(ipy, "execution_count", None)
262+
except (ImportError, NameError):
263+
pass
264+
return None

packages/bigframes/bigframes/dataframe.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1755,6 +1755,7 @@ def to_pandas_batches(
17551755
max_results: Optional[int] = None,
17561756
*,
17571757
allow_large_results: Optional[bool] = None,
1758+
cell_execution_count: Optional[int] = None,
17581759
) -> blocks.PandasBatches:
17591760
"""Stream DataFrame results to an iterable of pandas DataFrame.
17601761
@@ -1807,6 +1808,7 @@ def to_pandas_batches(
18071808
page_size=page_size,
18081809
max_results=max_results,
18091810
allow_large_results=allow_large_results,
1811+
cell_execution_count=cell_execution_count,
18101812
)
18111813

18121814
def _to_pandas_batches(
@@ -1815,11 +1817,13 @@ def _to_pandas_batches(
18151817
max_results: Optional[int] = None,
18161818
*,
18171819
allow_large_results: Optional[bool] = None,
1820+
cell_execution_count: Optional[int] = None,
18181821
) -> blocks.PandasBatches:
18191822
return self._block.to_pandas_batches(
18201823
page_size=page_size,
18211824
max_results=max_results,
18221825
allow_large_results=allow_large_results,
1826+
cell_execution_count=cell_execution_count,
18231827
)
18241828

18251829
def _compute_dry_run(self) -> google.cloud.bigquery.job.QueryJob:

packages/bigframes/bigframes/display/anywidget.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ def __init__(self, dataframe: bigframes.dataframe.DataFrame):
9292

9393
self._dataframe = dataframe
9494

95+
from bigframes.core.utils import get_ipython_execution_count
96+
97+
self._cell_execution_count = get_ipython_execution_count()
98+
9599
super().__init__()
96100

97101
# Initialize attributes that might be needed by observers first
@@ -286,7 +290,10 @@ def _reset_batch_cache(self) -> None:
286290
def _reset_batches_for_new_page_size(self) -> None:
287291
"""Reset the batch iterator when page size changes."""
288292
with bigframes.option_context("display.progress_bar", None):
289-
self._batches = self._dataframe.to_pandas_batches(page_size=self.page_size)
293+
self._batches = self._dataframe.to_pandas_batches(
294+
page_size=self.page_size,
295+
cell_execution_count=self._cell_execution_count,
296+
)
290297

291298
self._reset_batch_cache()
292299

@@ -318,7 +325,8 @@ def _set_table_html(self) -> None:
318325
current_sort_state = _SortState(tuple(sort_columns), tuple(sort_ascending))
319326
if self._last_sort_state != current_sort_state:
320327
self._batches = df_to_display.to_pandas_batches(
321-
page_size=self.page_size
328+
page_size=self.page_size,
329+
cell_execution_count=self._cell_execution_count,
322330
)
323331
self._reset_batch_cache()
324332
self._last_sort_state = current_sort_state

packages/bigframes/bigframes/pandas/io/api.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -300,23 +300,26 @@ def _try_read_gbq_colab_sessionless_dry_run(
300300
def _read_gbq_colab( # type: ignore[overload-overlap]
301301
query_or_table: str,
302302
*,
303-
pyformat_args: Optional[Dict[str, Any]] = ...,
304-
dry_run: Literal[False] = ...,
303+
callback: Optional[Callable[[bigframes.core.events.EventEnvelope], None]] = None,
304+
pyformat_args: Optional[Dict[str, Any]] = None,
305+
dry_run: Literal[False] = False,
305306
) -> bigframes.dataframe.DataFrame: ...
306307

307308

308309
@overload
309310
def _read_gbq_colab(
310311
query_or_table: str,
311312
*,
312-
pyformat_args: Optional[Dict[str, Any]] = ...,
313-
dry_run: Literal[True] = ...,
313+
callback: Optional[Callable[[bigframes.core.events.EventEnvelope], None]] = None,
314+
pyformat_args: Optional[Dict[str, Any]] = None,
315+
dry_run: Literal[True],
314316
) -> pandas.Series: ...
315317

316318

317319
def _read_gbq_colab(
318320
query_or_table: str,
319321
*,
322+
callback: Optional[Callable[[bigframes.core.events.EventEnvelope], None]] = None,
320323
pyformat_args: Optional[Dict[str, Any]] = None,
321324
dry_run: bool = False,
322325
) -> bigframes.dataframe.DataFrame | pandas.Series:
@@ -328,6 +331,8 @@ def _read_gbq_colab(
328331
Args:
329332
query_or_table (str):
330333
SQL query or table ID (table ID not yet supported).
334+
callback (Optional[Callable[[bigframes.core.events.EventEnvelope], None]]):
335+
Callback to receive query execution events.
331336
pyformat_args (Optional[Dict[str, Any]]):
332337
Parameters to format into the query string.
333338
dry_run (bool):
@@ -379,6 +384,7 @@ def _read_gbq_colab(
379384
return global_session.with_default_session(
380385
bigframes.session.Session._read_gbq_colab,
381386
query_or_table,
387+
callback=callback,
382388
pyformat_args=pyformat_args,
383389
dry_run=dry_run,
384390
)

packages/bigframes/bigframes/series.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,7 @@ def to_pandas_batches(
760760
max_results: Optional[int] = None,
761761
*,
762762
allow_large_results: Optional[bool] = None,
763+
cell_execution_count: Optional[int] = None,
763764
) -> Iterable[pandas.Series]:
764765
"""Stream Series results to an iterable of pandas Series.
765766
@@ -812,6 +813,7 @@ def to_pandas_batches(
812813
page_size=page_size,
813814
max_results=max_results,
814815
allow_large_results=allow_large_results,
816+
cell_execution_count=cell_execution_count,
815817
)
816818
return map(lambda df: cast(pandas.Series, df.squeeze(1)), batches)
817819

0 commit comments

Comments
 (0)