Skip to content

Commit 56e2b13

Browse files
fix: write data frame and write file with batch (#194)
1 parent 1b71b6a commit 56e2b13

File tree

4 files changed

+73
-9
lines changed

4 files changed

+73
-9
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,10 @@
22

33
## 0.18.0 [unreleased]
44

5+
### Bug Fixes
6+
7+
1. [#194](https://github.com/InfluxCommunity/influxdb3-python/pull/194): Fix `InfluxDBClient3.write_file()` and `InfluxDBClient3.write_dataframe()` fail with batching mode.
8+
59
## 0.17.0 [2026-01-08]
610

711
### Features

influxdb_client_3/write_client/client/write_api.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -391,12 +391,9 @@ def write(self, bucket: str, org: str = None,
391391

392392
_async_req = True if self._write_options.write_type == WriteType.asynchronous else False
393393

394-
# Filter out serializer-specific kwargs before passing to _post_write
395-
http_kwargs = {k: v for k, v in kwargs.items() if k not in SERIALIZER_KWARGS}
396-
397394
def write_payload(payload):
398395
final_string = b'\n'.join(payload[1])
399-
return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync, **http_kwargs)
396+
return self._post_write(_async_req, bucket, org, final_string, payload[0], no_sync, **kwargs)
400397

401398
results = list(map(write_payload, payloads.items()))
402399
if not _async_req:
@@ -586,11 +583,13 @@ def _retry_callback_delegate(exception):
586583
return _BatchResponse(data=batch_item)
587584

588585
def _post_write(self, _async_req, bucket, org, body, precision, no_sync, **kwargs):
586+
# Filter out serializer-specific kwargs before passing to _post_write
587+
http_kwargs = {k: v for k, v in kwargs.items() if k not in SERIALIZER_KWARGS}
589588
return self._write_service.post_write(org=org, bucket=bucket, body=body, precision=precision,
590589
no_sync=no_sync,
591590
async_req=_async_req,
592591
content_type="text/plain; charset=utf-8",
593-
**kwargs)
592+
**http_kwargs)
594593

595594
def _to_response(self, data: _BatchItem, delay: timedelta):
596595

tests/test_influxdb_client_3_integration.py

Lines changed: 65 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import logging
22
import os
3-
import pyarrow
4-
import pytest
53
import random
64
import string
75
import time
86
import unittest
97

8+
import pandas as pd
9+
import pyarrow
10+
import pytest
1011
from urllib3.exceptions import MaxRetryError, TimeoutError as Url3TimeoutError
1112

1213
from influxdb_client_3 import InfluxDBClient3, write_client_options, WriteOptions, \
@@ -48,6 +49,68 @@ def tearDown(self):
4849
if self.client:
4950
self.client.close()
5051

52+
def test_write_dataframe(self):
53+
measurement = f'test{random_hex(3)}'.lower()
54+
df = pd.DataFrame({
55+
'time': pd.to_datetime(['2024-01-01', '2024-01-02']),
56+
'city': ['London', 'Paris'],
57+
'temperature': [15.0, 18.5]
58+
})
59+
self.client.write_dataframe(df, measurement=measurement, timestamp_column='time', tags=['city'])
60+
self.client.flush()
61+
62+
result = self.client.query(query=f"select * from {measurement}", mode="pandas")
63+
64+
self.assertIsNotNone(result)
65+
self.assertEqual(2, len(result.get('city')))
66+
self.assertEqual(2, len(result.get('temperature')))
67+
68+
def test_write_dataframe_with_batch(self):
69+
self.client = InfluxDBClient3(host=self.host,
70+
database=self.database,
71+
token=self.token,
72+
write_client_options=write_client_options(
73+
write_options=WriteOptions(batch_size=100)
74+
))
75+
measurement = f'test{random_hex(3)}'.lower()
76+
df = pd.DataFrame({
77+
'time': pd.to_datetime(['2024-01-01', '2024-01-02']),
78+
'city': ['London', 'Paris'],
79+
'temperature': [15.0, 18.5]
80+
})
81+
self.client.write_dataframe(
82+
df,
83+
measurement=measurement,
84+
timestamp_column='time',
85+
tags=['city']
86+
)
87+
self.client.flush()
88+
89+
result = self.client.query(query=f"select * from {measurement}", mode="pandas")
90+
91+
self.assertIsNotNone(result)
92+
self.assertEqual(2, len(result.get('city')))
93+
self.assertEqual(2, len(result.get('temperature')))
94+
95+
def test_write_csv_file_with_batch(self):
96+
client = InfluxDBClient3(host=self.host,
97+
database=self.database,
98+
token=self.token,
99+
write_client_options=write_client_options(
100+
write_options=WriteOptions(batch_size=100)
101+
))
102+
measurement = f'test{random_hex(3)}'.lower()
103+
client.write_file(
104+
measurement_name=measurement,
105+
file='tests/data/iot.csv',
106+
timestamp_column='time', tag_columns=["name"])
107+
client.flush()
108+
109+
result = client.query(query=f"select * from {measurement}", mode="pandas")
110+
self.assertIsNotNone(result)
111+
self.assertEqual(3, len(result.get('building')))
112+
self.assertEqual(3, len(result.get('temperature')))
113+
51114
def test_write_and_query(self):
52115
test_id = time.time_ns()
53116
self.client.write(f"integration_test_python,type=used value=123.0,test_id={test_id}i")

tests/test_polars.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,4 @@ def test_write_polars_batching(self):
9696
async_req=ANY,
9797
content_type=ANY,
9898
urlopen_kw=ANY,
99-
data_frame_measurement_name='measurement',
100-
data_frame_timestamp_column='time',
10199
body=b'measurement temperature=22.4 1722470400000000000\nmeasurement temperature=21.8 1722474000000000000')

0 commit comments

Comments
 (0)