Long time reader, first time poster. I'm working with the BigQuery Storage API Python client library, and I'm running into some trouble splitting out my readers using Python multiprocessing.
There is a note included in the documentation that says:
Because this client uses grpcio library, it is safe to share instances across threads. In multiprocessing scenarios, the best practice is to create client instances after the invocation of os.fork() by multiprocessing.Pool or multiprocessing.Process.
I think I'm doing this correctly...but I must not be.
Here is my code as it currently stands. The goal is to read a BQ table in multiple parallel streams, and then write the rows of data to individual CSV files. Once all of the CSV files are created I'll then do a simple cat command to combine them.
As a side note, this code actually works well for small BigQuery tables, but it fails with segfault when trying to download large BQ tables.
import faulthandler
faulthandler.enable()
from google.cloud.bigquery_storage import BigQueryReadClient
from google.cloud.bigquery_storage import types
import multiprocessing as mp
import psutil
import os
import sys
import csv
from datetime import datetime
def extract_table(i):
client_in = BigQueryReadClient()
reader_in = client_in.read_rows(session.streams[i].name, timeout=10000)
rows = reader_in.rows(session)
csv_file = "/home/user/sas/" + table_name + "_" + str(i) + ".csv"
print(f"Starting at time {datetime.now()} for file {csv_file}")
try:
with open(csv_file, 'w') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
if i == 0:
writer.writeheader()
else:
pass
for data in rows:
# print(data)
writer.writerow(data)
except IOError:
print("I/O error")
print(f"Finished at time {datetime.now()} for file {csv_file}")
return
if __name__ == '__main__':
# Get input args
project_id = sys.argv[1]
db_name = sys.argv[2]
table_name = sys.argv[3]
n = len(sys.argv[4])
a = sys.argv[4][1:n - 1]
csv_columns = a.replace("'", '').split(', ')
output_type = sys.argv[5] # csv or sas
bucket_root = sys.argv[6]
# The read session is created in this project. This project can be
# different from that which contains the table.
client = BigQueryReadClient()
table = "projects/{}/datasets/{}/tables/{}".format(
project_id, db_name, table_name
)
requested_session = types.ReadSession()
requested_session.table = table
# This API can also deliver data serialized in Apache Arrow format.
# This example leverages Apache Avro.
requested_session.data_format = types.DataFormat.AVRO
# We limit the output columns to a subset of those allowed in the table
requested_session.read_options.selected_fields = csv_columns
ncpus = psutil.cpu_count(logical=False)
if ncpus <= 2:
ncpus_buffer = 2
else:
ncpus_buffer = ncpus - 2
print(f"You have {ncpus} cores according to psutil. Using {ncpus_buffer} cores")
parent = "projects/{}".format(project_id)
session = client.create_read_session(
parent=parent,
read_session=requested_session,
max_stream_count=ncpus_buffer,
)
print(f"There are {len(session.streams)} streams")
num_streams = int(len(session.streams))
with mp.Pool(processes=ncpus_buffer) as p:
result = p.map(extract_table, list(range(0, num_streams)), chunksize=1)
The code is called with the following command style:
python /home/user/sas/bq_extract_2.py gc-project-id dataset table "['column1', 'column2']" csv 'path/to/gcs/bucket'
Again, this works with small tables, and a couple of times I have gotten it to work on very large BQ tables that are in the 50-100 GB size range. However, most of the time the large tables fail with the following error:
There are 1000 streams You have 2 cores according to psutil. Using 2 cores Starting at time 2020-11-17 17:46:04.645398 for file /home/user/sas/diag_0.csv
Starting at time 2020-11-17 17:46:04.829381 for file /home/user/sas/diag_1.csv
Fatal Python error: Segmentation fault
Thread 0x00007f4293f94700 (most recent call first): File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/site-packages/grpc/_channel.py", line 1235 in channel_spin File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/threading.py", line 870 in run File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/threading.py", line 932 in _bootstrap_inner File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/threading.py", line 890 in _bootstrap
Thread 0x00007f42bc4c9740 (most recent call first): File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/csv.py", line 151 in _dict_to_list File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/csv.py", line 154 in writerow File "/home/user/sas/bq_extract_2.py", line 39 in extract_table File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py", line 48 in mapstar File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py", line 125 in worker File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/process.py", line 108 in run File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/process.py", line 315 in _bootstrap File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/popen_fork.py", line 75 in _launch File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/popen_fork.py", line 19 in init File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/context.py", line 277 in _Popen File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/process.py", line 121 in start File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py", line 326 in _repopulate_pool_static File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py", line 303 in _repopulate_pool File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/pool.py", line 212 in init File "/home/user/anaconda3/envs/sas-controller/lib/python3.8/multiprocessing/context.py", line 119 in Pool File "/home/user/sas/bq_extract_2.py", line 157 in module
Edit 1: Updated timeout on .read_rows to 10000 to allow large results to be read from BQ. Also changed the max_stream_count to equal the number of cores that will be used by the Pool. This seemed to help quite a bit in my testing, but I still get segfaults showing up in the console output when I run this as a startup script on Google Cloud Compute instances.
Edit 2: The more I look into this, the more it doesn't seem possible to effectively use Python multiprocessing with Google BigQuery Storage API. Given the need to create read sessions after the invocation of os.fork(), there is no way that I can see to ensure the individual processes are going to be assigned the correct number of rows to read. Each session is creating its own one-to-many (one session to many streams) relationship with the BQ table it's attached to, and each session appears to break up the table rows across the streams slightly differently.
Take, for example, a table with 30 rows that we want to export with 3 processes, each processing a single stream of rows. Formatting might look weird on mobile.
os.fork()
Process 1 Process 2 Process 3
Session1 Session2 Session3
*Stream1 - 10 rows Stream1 - 8 rows Stream1 - 9 rows
Stream2 - 10 rows *Stream2 - 12 rows Stream2 - 11 rows
Stream3 - 10 rows Stream3 - 10 rows *Stream3 - 10 rows
In this example, we end up with 32 output rows because each session does not define its streams in exactly the same way.
I tried using threading (code below) instead of processes and that worked because gRPC is thread safe.
# create read session here
# Then call the target worker function with one thread per worker
for i in range(0, num_streams):
t = threading.Thread(target=extract_table, args=(i,))
t.start()
However, the big problem with this is that using 8 threads takes just as long as using 1 thread, and aggregate throughput across the threads appears to max out at ~5 MB/s no matter now many threads you use.
This is in contrast to using processes where the throughput appears to scale linearly as workers are added (I saw up to ~100 MB/s in some tests)...on the rare occasions that I was able to get it to work without a segfault interrupting things. That appeared to just be pure luck.
Using 1 thread:
Total time: ~ 3:11
Using 8 threads:
Total time: ~ 3:15
There is essentially no speed benefit to using multiple threads from what I can tell.
If anyone has any thoughts on anything I'm missing please let me know! I would love to be able to get this to work. I really like the features of the BQ Storage API (row filters, column selection, no export limits), but we won't be able to use it until we can find a way to fan out the readers appropriately.
fastavrolibrary.