2

Long time reader, first time poster. I'm working with the BigQuery Storage API Python client library, and I'm running into some trouble splitting out my readers using Python multiprocessing.

There is a note included in the documentation that says:

Because this client uses grpcio library, it is safe to share instances across threads. In multiprocessing scenarios, the best practice is to create client instances after the invocation of os.fork() by multiprocessing.Pool or multiprocessing.Process.

I think I'm doing this correctly...but I must not be.

Here is my code as it currently stands. The goal is to read a BQ table in multiple parallel streams, and then write the rows of data to individual CSV files. Once all of the CSV files are created I'll then do a simple cat command to combine them.

As a side note, this code actually works well for small BigQuery tables, but it fails with segfault when trying to download large BQ tables.

import faulthandler
faulthandler.enable()
from google.cloud.bigquery_storage import BigQueryReadClient
from google.cloud.bigquery_storage import types
import multiprocessing as mp
import psutil
import os
import sys
import csv
from datetime import datetime


def extract_table(i):

    client_in = BigQueryReadClient()
    reader_in = client_in.read_rows(session.streams[i].name, timeout=10000)

    rows = reader_in.rows(session)

    csv_file = "/home/user/sas/" + table_name + "_" + str(i) + ".csv"
    print(f"Starting at time {datetime.now()} for file {csv_file}")

    try:
        with open(csv_file, 'w') as csvfile:
            writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
            if i == 0:
                writer.writeheader()
            else:
                pass
            for data in rows:
                # print(data)
                writer.writerow(data)
    except IOError:
        print("I/O error")

    print(f"Finished at time {datetime.now()} for file {csv_file}")
    return


if __name__ == '__main__':
    # Get input args
    project_id = sys.argv[1]
    db_name = sys.argv[2]
    table_name = sys.argv[3]

    n = len(sys.argv[4])
    a = sys.argv[4][1:n - 1]
    csv_columns = a.replace("'", '').split(', ')

    output_type = sys.argv[5]  # csv or sas
    bucket_root = sys.argv[6]

    # The read session is created in this project. This project can be
    # different from that which contains the table.
    client = BigQueryReadClient()

    table = "projects/{}/datasets/{}/tables/{}".format(
        project_id, db_name, table_name
    )

    requested_session = types.ReadSession()
    requested_session.table = table
    
    # This API can also deliver data serialized in Apache Arrow format.
    # This example leverages Apache Avro.
    requested_session.data_format = types.DataFormat.AVRO

    # We limit the output columns to a subset of those allowed in the table
    requested_session.read_options.selected_fields = csv_columns
    
    ncpus = psutil.cpu_count(logical=False)

    if ncpus <= 2:
        ncpus_buffer = 2
    else:
        ncpus_buffer = ncpus - 2

    print(f"You have {ncpus} cores according to psutil. Using {ncpus_buffer} cores")

    parent = "projects/{}".format(project_id)
    session = client.create_read_session(
        parent=parent,
        read_session=requested_session,
        max_stream_count=ncpus_buffer,
    )

    print(f"There are {len(session.streams)} streams")

    num_streams = int(len(session.streams))

    with mp.Pool(processes=ncpus_buffer) as p:
        result = p.map(extract_table, list(range(0, num_streams)), chunksize=1)

The code is called with the following command style:

python /home/user/sas/bq_extract_2.py gc-project-id dataset table "['column1', 'column2']" csv 'path/to/gcs/bucket'

Again, this works with small tables, and a couple of times I have gotten it to work on very large BQ tables that are in the 50-100 GB size range. However, most of the time the large tables fail with the following error:

There are 1000 streams You have 2 cores according to psutil. Using 2 cores Starting at time 2020-11-17 17:46:04.645398 for file /home/user/sas/diag_0.csv

Starting at time 2020-11-17 17:46:04.829381 for file /home/user/sas/diag_1.csv

Fatal Python error: Segmentation fault

Thread 0x00007f4293f94700 (most recent call first): File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/site-packages/grpc/_channel.py", line 1235 in channel_spin File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/threading.py", line 870 in run File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/threading.py", line 932 in _bootstrap_inner File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/threading.py", line 890 in _bootstrap

Thread 0x00007f42bc4c9740 (most recent call first): File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/csv.py", line 151 in _dict_to_list File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/csv.py", line 154 in writerow File "/home/user/sas/bq_extract_2.py", line 39 in extract_table File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py", line 48 in mapstar File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py", line 125 in worker File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/process.py", line 108 in run File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/process.py", line 315 in _bootstrap File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/popen_fork.py", line 75 in _launch File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/popen_fork.py", line 19 in init File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/context.py", line 277 in _Popen File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/process.py", line 121 in start File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py", line 326 in _repopulate_pool_static File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py", line 303 in _repopulate_pool File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py", line 212 in init File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/context.py", line 119 in Pool File "/home/user/sas/bq_extract_2.py", line 157 in module

Edit 1: Updated timeout on .read_rows to 10000 to allow large results to be read from BQ. Also changed the max_stream_count to equal the number of cores that will be used by the Pool. This seemed to help quite a bit in my testing, but I still get segfaults showing up in the console output when I run this as a startup script on Google Cloud Compute instances.

Edit 2: The more I look into this, the more it doesn't seem possible to effectively use Python multiprocessing with Google BigQuery Storage API. Given the need to create read sessions after the invocation of os.fork(), there is no way that I can see to ensure the individual processes are going to be assigned the correct number of rows to read. Each session is creating its own one-to-many (one session to many streams) relationship with the BQ table it's attached to, and each session appears to break up the table rows across the streams slightly differently.

Take, for example, a table with 30 rows that we want to export with 3 processes, each processing a single stream of rows. Formatting might look weird on mobile.

                       os.fork()

Process 1              Process 2              Process 3
Session1               Session2               Session3
*Stream1 - 10 rows     Stream1 - 8 rows       Stream1 - 9 rows
Stream2 - 10 rows      *Stream2 - 12 rows     Stream2 - 11 rows
Stream3 - 10 rows      Stream3 - 10 rows      *Stream3 - 10 rows

In this example, we end up with 32 output rows because each session does not define its streams in exactly the same way.

I tried using threading (code below) instead of processes and that worked because gRPC is thread safe.

# create read session here
    
# Then call the target worker function with one thread per worker
    for i in range(0, num_streams):
        t = threading.Thread(target=extract_table, args=(i,))
        t.start()

However, the big problem with this is that using 8 threads takes just as long as using 1 thread, and aggregate throughput across the threads appears to max out at ~5 MB/s no matter now many threads you use.

This is in contrast to using processes where the throughput appears to scale linearly as workers are added (I saw up to ~100 MB/s in some tests)...on the rare occasions that I was able to get it to work without a segfault interrupting things. That appeared to just be pure luck.

Using 1 thread:

Total time: ~ 3:11

Using 8 threads:

Total time: ~ 3:15

There is essentially no speed benefit to using multiple threads from what I can tell.

If anyone has any thoughts on anything I'm missing please let me know! I would love to be able to get this to work. I really like the features of the BQ Storage API (row filters, column selection, no export limits), but we won't be able to use it until we can find a way to fan out the readers appropriately.

10
  • Looks like the segfault is happening in the CSV module, not the BigQuery Storage API client. Do you happen to know if there is something special about the shape of the row that is causing this segfault to happen? Commented Nov 17, 2020 at 21:20
  • Hi Tim, good question. I'll have to take a look, but from what I can tell the BQ API is just returning a stream of dictionary data that is being converted by the CSV module into data rows. I made one edit to the code (above) that limits the # of streams to equal the number of cores on the machine. This seems to have helped, but I still get segfaults showing up on the console when I run this as a startup script on Google Cloud Compute. Commented Nov 18, 2020 at 16:03
  • @TimSwast: I don't see anything wrong with the data. I tried removing the csv code to see if it was causing the issue, but I still get segfaults. It looks like you may work for Google? If so, does anyone on the BQ team happen to have an example of how to use mutilprocessing.Pool to read multiple streams? I would assume they do since it's noted as a use case right in the docs...but without an example. googleapis.dev/python/bigquerystorage/latest/index.html Commented Nov 18, 2020 at 20:53
  • When you remove the CSV code, where is the segfault happening? Also, have you tried using Arrow instead of Avro? It's possible this is a bug in the fastavro library. Commented Nov 18, 2020 at 23:42
  • 1
    @TimSwast I had that thought as well, but handling that dynamically across all the tables would likely turn into a pain. Let me see if I can write up a script that demonstrates the error using public BQ data. It's frustratingly unpredictable. Sometimes the segfault happens, and sometimes it doesn't. Once I get a cleaned up script I'll share it via GitHub. Commented Nov 20, 2020 at 21:10

1 Answer 1

0

There is a known issue with grpcio (used by the google-cloud-bigquery-storage library) and multiprocessing. Per this code example, "worker subprocesses be forked before any gRPC servers start up".

Since your workload is mostly I/O bound, the global interpreter lock shouldn't be the main performance bottleneck. I recommend using threads to distribute the work, as is done in the google-cloud-bigquery library.

Replace:

with mp.Pool(processes=ncpus_buffer) as p:
    result = p.map(extract_table, list(range(0, num_streams)), chunksize=1)

With:

with concurrent.futures.ThreadPoolExecutor(max_workers=num_streams) as p:
    result = p.map(extract_table, list(range(0, num_streams)), chunksize=1)
Sign up to request clarification or add additional context in comments.

3 Comments

Thanks for the suggestion. I've unfortunately already tried threads, and they do not seem to offer any speed improvement over a non-multi-threaded implementation. I tried the concurrent.futures.ThreadPoolExecutor code and it showed the same results. I've only been able to demonstrate linear speed improvements with processes during the rare instances that they actually work. I agree with your assessment about this being IO bound, so I'm not sure why threading wouldn't improve throughput.
Per github.com/grpc/grpc/blob/master/doc/fork_support.md You can try creating the process pool before constructing any client objects. This should allow the process to fork before any background grpc-related threads are started.
Yep, I gave that a shot following this example: gist.github.com/tseaver/95c8f54416c4a5093b4ad2d4755ab819 The problem is that forking processes before grpc leads to a new session being created for each process. A big test I just ran resulted in 8,562,767,649 output rows when it should have been 8,549,457,649. I may have to resort to the individual filters for each table to evenly break up the rows unless we can think of some other solution.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.