|
1 | 1 | import logging |
2 | 2 | import os |
3 | | -import pyarrow |
4 | | -import pytest |
5 | 3 | import random |
6 | 4 | import string |
7 | 5 | import time |
8 | 6 | import unittest |
9 | 7 |
|
| 8 | +import pandas as pd |
| 9 | +import pyarrow |
| 10 | +import pytest |
10 | 11 | from urllib3.exceptions import MaxRetryError, TimeoutError as Url3TimeoutError |
11 | 12 |
|
12 | 13 | from influxdb_client_3 import InfluxDBClient3, write_client_options, WriteOptions, \ |
@@ -48,6 +49,68 @@ def tearDown(self): |
48 | 49 | if self.client: |
49 | 50 | self.client.close() |
50 | 51 |
|
| 52 | + def test_write_dataframe(self): |
| 53 | + measurement = f'test{random_hex(3)}'.lower() |
| 54 | + df = pd.DataFrame({ |
| 55 | + 'time': pd.to_datetime(['2024-01-01', '2024-01-02']), |
| 56 | + 'city': ['London', 'Paris'], |
| 57 | + 'temperature': [15.0, 18.5] |
| 58 | + }) |
| 59 | + self.client.write_dataframe(df, measurement=measurement, timestamp_column='time', tags=['city']) |
| 60 | + self.client.flush() |
| 61 | + |
| 62 | + result = self.client.query(query=f"select * from {measurement}", mode="pandas") |
| 63 | + |
| 64 | + self.assertIsNotNone(result) |
| 65 | + self.assertEqual(2, len(result.get('city'))) |
| 66 | + self.assertEqual(2, len(result.get('temperature'))) |
| 67 | + |
| 68 | + def test_write_dataframe_with_batch(self): |
| 69 | + self.client = InfluxDBClient3(host=self.host, |
| 70 | + database=self.database, |
| 71 | + token=self.token, |
| 72 | + write_client_options=write_client_options( |
| 73 | + write_options=WriteOptions(batch_size=100) |
| 74 | + )) |
| 75 | + measurement = f'test{random_hex(3)}'.lower() |
| 76 | + df = pd.DataFrame({ |
| 77 | + 'time': pd.to_datetime(['2024-01-01', '2024-01-02']), |
| 78 | + 'city': ['London', 'Paris'], |
| 79 | + 'temperature': [15.0, 18.5] |
| 80 | + }) |
| 81 | + self.client.write_dataframe( |
| 82 | + df, |
| 83 | + measurement=measurement, |
| 84 | + timestamp_column='time', |
| 85 | + tags=['city'] |
| 86 | + ) |
| 87 | + self.client.flush() |
| 88 | + |
| 89 | + result = self.client.query(query=f"select * from {measurement}", mode="pandas") |
| 90 | + |
| 91 | + self.assertIsNotNone(result) |
| 92 | + self.assertEqual(2, len(result.get('city'))) |
| 93 | + self.assertEqual(2, len(result.get('temperature'))) |
| 94 | + |
| 95 | + def test_write_csv_file_with_batch(self): |
| 96 | + client = InfluxDBClient3(host=self.host, |
| 97 | + database=self.database, |
| 98 | + token=self.token, |
| 99 | + write_client_options=write_client_options( |
| 100 | + write_options=WriteOptions(batch_size=100) |
| 101 | + )) |
| 102 | + measurement = f'test{random_hex(3)}'.lower() |
| 103 | + client.write_file( |
| 104 | + measurement_name=measurement, |
| 105 | + file='tests/data/iot.csv', |
| 106 | + timestamp_column='time', tag_columns=["name"]) |
| 107 | + client.flush() |
| 108 | + |
| 109 | + result = client.query(query=f"select * from {measurement}", mode="pandas") |
| 110 | + self.assertIsNotNone(result) |
| 111 | + self.assertEqual(3, len(result.get('building'))) |
| 112 | + self.assertEqual(3, len(result.get('temperature'))) |
| 113 | + |
51 | 114 | def test_write_and_query(self): |
52 | 115 | test_id = time.time_ns() |
53 | 116 | self.client.write(f"integration_test_python,type=used value=123.0,test_id={test_id}i") |
|
0 commit comments