This repository was archived by the owner on Nov 12, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 45
This repository was archived by the owner on Nov 12, 2025. It is now read-only.
400 Errors found while processing rows. Please refer to the row_errors field for details. The list may not be complete because of the size limitations. #717
Copy link
Copy link
Closed
Labels
api: bigquerystorageIssues related to the googleapis/python-bigquery-storage API.Issues related to the googleapis/python-bigquery-storage API.
Description
400 Errors found while processing rows. Please refer to the row_errors field for details. The list may not be complete because of the size limitations.
Entity: projects/ardent-quarter-361905/datasets/event_prod/tables/event_356_copy/streams/Cig2ZDU3ODc0NC0wMDAwLTJhZWItYTczZS1kNGY1NDdmNzM0NTg6czEy
`
project_id = os.getenv("PROJECT_ID")
dataset_id = 'event_prod'
table_id = 'event_356_copy'
parent = f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"
write_stream = WriteStream()
write_stream.type_ = WriteStream.Type.COMMITTED
write_stream_request = CreateWriteStreamRequest()
write_stream_request.write_stream = write_stream
write_stream_request.parent = parent
write_stream = bigquery_write_client.create_write_stream(write_stream_request)
serialized_rows = []
file = [{ 'field1' : "1",'field2' : "2",'field3' : "3" }]
print(data)
for row in [data]:
print(row,type(row))
message = event_356_pb2.Event_356()
for field_name, value in row.items():
print(field_name)
if field_name == 'createdAta':
# timestamp_format = '%Y-%m-%dT%H:%M:%S.%fZ'
# corrected_value = value.strip("'") # Remove single quotes
# timestamp_datetime = datetime.strptime(corrected_value, timestamp_format)
# # Convert datetime to Epoch timestamp in microseconds
# timestamp_microseconds = int(timestamp_datetime.timestamp() * 1e6)
# setattr(message, field_name, timestamp_microseconds)
# # pass
pass
else:
print(type(value))
if field_name in ['firstTimeFilter' ] :
if value == 'True' :
setattr(message, field_name, True)
else :
setattr(message, field_name, False)
elif field_name.startswith('u_'):
# message[field_name] = []
repeated_field = getattr(message, field_name, None)
for x in value :
if repeated_field is not None:
repeated_field.extend(x)
else :
setattr(message, field_name, value)
print("eror")
# message = sample_pb2.SampleData(
# field1=row.get('field1', "0"),
# field2=row.get('field2', "0"),
# field3=row.get('field3', "0"),
# )
serialized_rows.append(message.SerializeToString())
print("p2")
stream_name = write_stream.name
request_template = AppendRowsRequest()
request_template.write_stream = stream_name
proto_schema = ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
event_356_pb2.Event_356.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Some stream types support an unbounded number of requests. Construct an
# AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(bigquery_write_client, request_template)
print("p3")
request = AppendRowsRequest()
proto_rows = ProtoRows()
proto_rows.serialized_rows = serialized_rows
proto_data = AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
res = append_rows_stream.send(request)
# logger.info(f"Result {res.result()}")
print(res.result())
append_rows_stream.close()
print("done")`
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
api: bigquerystorageIssues related to the googleapis/python-bigquery-storage API.Issues related to the googleapis/python-bigquery-storage API.