Skip to content
This repository was archived by the owner on Nov 12, 2025. It is now read-only.
This repository was archived by the owner on Nov 12, 2025. It is now read-only.

400 Errors found while processing rows. Please refer to the row_errors field for details. The list may not be complete because of the size limitations.  #717

@slice-amandata

Description

@slice-amandata

400 Errors found while processing rows. Please refer to the row_errors field for details. The list may not be complete because of the size limitations.

Entity: projects/ardent-quarter-361905/datasets/event_prod/tables/event_356_copy/streams/Cig2ZDU3ODc0NC0wMDAwLTJhZWItYTczZS1kNGY1NDdmNzM0NTg6czEy

`
project_id = os.getenv("PROJECT_ID")
dataset_id = 'event_prod'
table_id = 'event_356_copy'
parent = f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"

        write_stream = WriteStream()
        write_stream.type_ = WriteStream.Type.COMMITTED
        write_stream_request = CreateWriteStreamRequest()

        write_stream_request.write_stream = write_stream
        write_stream_request.parent = parent
        write_stream = bigquery_write_client.create_write_stream(write_stream_request)

        serialized_rows = []
        file = [{ 'field1' : "1",'field2' : "2",'field3' : "3" }]
        print(data)
        for row in [data]:
            print(row,type(row))
            message = event_356_pb2.Event_356()
            for field_name, value in row.items():
                print(field_name)
                if field_name == 'createdAta':
                    # timestamp_format = '%Y-%m-%dT%H:%M:%S.%fZ'
                    # corrected_value = value.strip("'")  # Remove single quotes
                    # timestamp_datetime = datetime.strptime(corrected_value, timestamp_format)
                    
                    # # Convert datetime to Epoch timestamp in microseconds
                    # timestamp_microseconds = int(timestamp_datetime.timestamp() * 1e6)
                    
                    # setattr(message, field_name, timestamp_microseconds)
                    # # pass
                    pass
                else:
                    print(type(value))
                    if field_name in ['firstTimeFilter' ] : 
                        if value == 'True' : 
                            setattr(message, field_name, True)
                        else : 
                            setattr(message, field_name, False)
                    elif field_name.startswith('u_'): 
                        # message[field_name] = []
                        repeated_field = getattr(message, field_name, None)
                        for x in value : 
                            if repeated_field is not None:
                                repeated_field.extend(x)
                    else : 
                        setattr(message, field_name, value)


            print("eror")
            # message = sample_pb2.SampleData(
            #     field1=row.get('field1', "0"),
            #     field2=row.get('field2', "0"),
            #     field3=row.get('field3', "0"),
            # )
            serialized_rows.append(message.SerializeToString())
        print("p2")
        stream_name = write_stream.name
        request_template = AppendRowsRequest()
        request_template.write_stream = stream_name

        proto_schema = ProtoSchema()
        proto_descriptor = descriptor_pb2.DescriptorProto()
        event_356_pb2.Event_356.DESCRIPTOR.CopyToProto(proto_descriptor)
        proto_schema.proto_descriptor = proto_descriptor
        proto_data = AppendRowsRequest.ProtoData()
        proto_data.writer_schema = proto_schema
        request_template.proto_rows = proto_data

        # Some stream types support an unbounded number of requests. Construct an
        # AppendRowsStream to send an arbitrary number of requests to a stream.
        append_rows_stream = writer.AppendRowsStream(bigquery_write_client, request_template)
        print("p3")
        request = AppendRowsRequest()
        proto_rows = ProtoRows() 
        proto_rows.serialized_rows = serialized_rows
        proto_data = AppendRowsRequest.ProtoData()
        proto_data.rows = proto_rows
        request.proto_rows = proto_data

        res = append_rows_stream.send(request)
        # logger.info(f"Result {res.result()}")
        print(res.result())
        append_rows_stream.close()
        print("done")`

Metadata

Metadata

Assignees

No one assigned

    Labels

    api: bigquerystorageIssues related to the googleapis/python-bigquery-storage API.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions