Python CSV完全攻略【読み書き・pandas操作を徹底マスター】

 

CSV(Comma-Separated Values)は、表形式データを扱う最も一般的なファイル形式です。Excel、データベース、Web アプリケーションなど幅広い場面で使用されます。本記事では、Python でのCSV操作を標準ライブラリから pandas まで、実用的なサンプルコードとともに徹底解説します。

CSVとは

CSV は、カンマ区切りでデータを格納するテキストファイル形式です。主な特徴:

  • シンプル: 人間が読みやすい構造
  • 互換性: 多くのアプリケーションでサポート
  • 軽量: テキストベースで容量が小さい
  • 構造化: 表形式データを効率的に表現
  • 標準化: RFC 4180 で仕様が定義

CSVファイルの基本構造

名前,年齢,職業,給与
田中太郎,30,エンジニア,500000
佐藤花子,25,デザイナー,450000
山田次郎,35,マネージャー,600000

標準csvモジュールでの読み込み

基本的な読み込み

import csv

with open('data.csv', 'r', encoding='utf-8') as file:
    reader = csv.reader(file)
    for row in reader:
        print(row)  # ['田中太郎', '30', 'エンジニア', '500000']

ヘッダー付きCSVの読み込み

import csv

with open('employees.csv', 'r', encoding='utf-8') as file:
    reader = csv.reader(file)
    header = next(reader)  # ヘッダー行を取得
    print(f"カラム: {header}")
    
    for row in reader:
        print(dict(zip(header, row)))

DictReaderを使った読み込み

import csv

with open('employees.csv', 'r', encoding='utf-8') as file:
    reader = csv.DictReader(file)
    for row in reader:
        print(f"{row['名前']}: {row['年齢']}歳, {row['職業']}")

標準csvモジュールでの書き込み

基本的な書き込み

import csv

data = [
    ['名前', '年齢', '職業'],
    ['田中', '30', 'エンジニア'],
    ['佐藤', '25', 'デザイナー']
]

with open('output.csv', 'w', newline='', encoding='utf-8') as file:
    writer = csv.writer(file)
    writer.writerows(data)

DictWriterを使った書き込み

import csv

employees = [
    {'名前': '田中', '年齢': 30, '職業': 'エンジニア'},
    {'名前': '佐藤', '年齢': 25, '職業': 'デザイナー'},
    {'名前': '山田', '年齢': 35, '職業': 'マネージャー'}
]

with open('employees_dict.csv', 'w', newline='', encoding='utf-8') as file:
    fieldnames = ['名前', '年齢', '職業']
    writer = csv.DictWriter(file, fieldnames=fieldnames)
    writer.writeheader()
    writer.writerows(employees)

CSVへの追記

import csv

new_employee = ['鈴木', '28', 'デベロッパー']

with open('employees.csv', 'a', newline='', encoding='utf-8') as file:
    writer = csv.writer(file)
    writer.writerow(new_employee)

pandasでのCSV操作

pandasでの読み込み

import pandas as pd

# 基本的な読み込み
df = pd.read_csv('data.csv', encoding='utf-8')
print(df.head())  # 最初の5行を表示

# 特定列のみ読み込み
df = pd.read_csv('data.csv', usecols=['名前', '年齢'])
print(df)

読み込みオプションの活用

import pandas as pd

# 高度な読み込みオプション
df = pd.read_csv(
    'sales_data.csv',
    encoding='utf-8',
    skiprows=1,           # 最初の1行をスキップ
    nrows=1000,          # 最初の1000行のみ読み込み
    na_values=['N/A', ''], # 欠損値の指定
    dtype={'ID': str}     # データ型指定
)
print(df.info())

pandasでの書き込み

import pandas as pd

# データフレーム作成
data = {
    '商品名': ['商品A', '商品B', '商品C'],
    '価格': [1000, 1500, 2000],
    '在庫': [50, 30, 20]
}
df = pd.DataFrame(data)

# CSV書き込み
df.to_csv('products.csv', index=False, encoding='utf-8')

# 特定条件でフィルタして書き込み
expensive_products = df[df['価格'] > 1200]
expensive_products.to_csv('expensive.csv', index=False, encoding='utf-8')

CSV形式の設定

区切り文字とクォート文字

import csv

# セミコロン区切りのCSV
with open('semicolon.csv', 'w', newline='', encoding='utf-8') as file:
    writer = csv.writer(file, delimiter=';')
    writer.writerow(['名前', '年齢', '都市'])
    writer.writerow(['田中', '30', '東京'])

# タブ区切り(TSV)
with open('data.tsv', 'w', newline='', encoding='utf-8') as file:
    writer = csv.writer(file, delimiter='\t')
    writer.writerow(['項目1', '項目2', '項目3'])
    writer.writerow(['値1', '値2', '値3'])

クォートの制御

import csv

data = [['名前', '説明'], ['商品A', 'これは"特別な"商品です'], ['商品B', 'カンマ,を含む説明']]

# すべてクォートする
with open('quoted.csv', 'w', newline='', encoding='utf-8') as file:
    writer = csv.writer(file, quoting=csv.QUOTE_ALL)
    writer.writerows(data)

# 必要時のみクォート
with open('minimal_quote.csv', 'w', newline='', encoding='utf-8') as file:
    writer = csv.writer(file, quoting=csv.QUOTE_MINIMAL)
    writer.writerows(data)

データ型とバリデーション

データ型変換

import csv

def convert_row_types(row):
    """行データの型変換"""
    return {
        '名前': row[0],
        '年齢': int(row[1]) if row[1].isdigit() else 0,
        '給与': float(row[2]) if row[2].replace('.', '').isdigit() else 0.0
    }

with open('employees.csv', 'r', encoding='utf-8') as file:
    reader = csv.reader(file)
    header = next(reader)  # ヘッダースキップ
    
    employees = []
    for row in reader:
        try:
            employee = convert_row_types(row)
            employees.append(employee)
        except (ValueError, IndexError):
            print(f"無効な行をスキップ: {row}")

print(f"有効な従業員データ: {len(employees)}件")

データバリデーション

import csv
import re

def validate_employee_data(row):
    """従業員データのバリデーション"""
    errors = []
    
    # 名前の検証
    if not row.get('名前') or len(row['名前']) < 2:
        errors.append("名前は2文字以上である必要があります")
    
    # 年齢の検証
    try:
        age = int(row.get('年齢', 0))
        if not 18 <= age <= 65:
            errors.append("年齢は18-65歳の範囲である必要があります")
    except ValueError:
        errors.append("年齢は数値である必要があります")
    
    # メールアドレスの検証
    email = row.get('email', '')
    if email and not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
        errors.append("有効なメールアドレスではありません")
    
    return len(errors) == 0, errors

# 使用例
test_data = {'名前': '田中', '年齢': '30', 'email': 'tanaka@example.com'}
is_valid, errors = validate_employee_data(test_data)
print(f"バリデーション結果: {'成功' if is_valid else '失敗'}")
if errors:
    for error in errors:
        print(f"  - {error}")

大容量CSVファイルの処理

チャンク処理

import pandas as pd

def process_large_csv(filename, chunk_size=10000):
    """大容量CSVファイルのチャンク処理"""
    total_rows = 0
    
    for chunk in pd.read_csv(filename, chunksize=chunk_size):
        # チャンクごとの処理
        processed = chunk[chunk['価格'] > 1000]  # 例:価格フィルタ
        
        # 結果の保存(追記モード)
        processed.to_csv('filtered_output.csv', 
                        mode='a', 
                        header=(total_rows == 0),
                        index=False)
        
        total_rows += len(chunk)
        print(f"処理済み: {total_rows}行")
    
    return total_rows

# 使用例
# total = process_large_csv('huge_file.csv', chunk_size=5000)

メモリ効率的な処理

import csv

def memory_efficient_csv_processing(filename):
    """メモリ効率的なCSV処理"""
    stats = {'total': 0, 'valid': 0, 'invalid': 0}
    
    with open(filename, 'r', encoding='utf-8') as file:
        reader = csv.DictReader(file)
        
        for row in reader:
            stats['total'] += 1
            
            try:
                # データ処理(例:価格の数値変換)
                price = float(row.get('価格', 0))
                if price > 0:
                    stats['valid'] += 1
                    # 有効なデータの処理
                else:
                    stats['invalid'] += 1
            except ValueError:
                stats['invalid'] += 1
            
            # 進捗表示
            if stats['total'] % 10000 == 0:
                print(f"処理済み: {stats['total']}行")
    
    return stats

# 使用例
# result = memory_efficient_csv_processing('large_data.csv')
# print(f"統計: {result}")

CSV データの分析と集計

基本統計の計算

import pandas as pd

# CSVからデータ分析
df = pd.read_csv('sales_data.csv')

# 基本統計
print("=== 基本統計 ===")
print(f"総売上: {df['売上'].sum():,}円")
print(f"平均売上: {df['売上'].mean():.0f}円")
print(f"最大売上: {df['売上'].max():,}円")
print(f"売上データ数: {len(df)}件")

# カテゴリ別集計
category_sales = df.groupby('カテゴリ')['売上'].sum()
print("\n=== カテゴリ別売上 ===")
print(category_sales)

データのフィルタリングと抽出

import pandas as pd

df = pd.read_csv('employees.csv')

# 条件フィルタ
high_earners = df[df['給与'] > 500000]
engineers = df[df['職業'] == 'エンジニア']
senior_engineers = df[(df['職業'] == 'エンジニア') & (df['年齢'] > 30)]

# 結果をCSVに保存
high_earners.to_csv('high_earners.csv', index=False, encoding='utf-8')
print(f"高給取り: {len(high_earners)}人")
print(f"エンジニア: {len(engineers)}人")
print(f"シニアエンジニア: {len(senior_engineers)}人")

データクリーニング

欠損値の処理

import pandas as pd

df = pd.read_csv('messy_data.csv')

# 欠損値の確認
print("=== 欠損値の状況 ===")
print(df.isnull().sum())

# 欠損値の処理
df_cleaned = df.copy()
df_cleaned['年齢'].fillna(df['年齢'].median(), inplace=True)  # 中央値で補完
df_cleaned['職業'].fillna('未設定', inplace=True)              # 固定値で補完
df_cleaned.dropna(subset=['名前'], inplace=True)              # 名前が空の行を削除

# クリーニング済みデータを保存
df_cleaned.to_csv('cleaned_data.csv', index=False, encoding='utf-8')
print(f"クリーニング前: {len(df)}行 → クリーニング後: {len(df_cleaned)}行")

重複データの除去

import pandas as pd

df = pd.read_csv('data_with_duplicates.csv')

# 重複行の確認
duplicates = df.duplicated()
print(f"重複行数: {duplicates.sum()}行")

# 重複を除去(最初の行を残す)
df_unique = df.drop_duplicates()

# 特定列での重複除去
df_unique_by_name = df.drop_duplicates(subset=['名前'])

# 結果を保存
df_unique.to_csv('unique_data.csv', index=False, encoding='utf-8')
print(f"重複除去前: {len(df)}行 → 重複除去後: {len(df_unique)}行")

CSVファイルの結合

複数CSVファイルの結合

import pandas as pd
import glob

def merge_csv_files(pattern, output_file):
    """複数のCSVファイルを結合"""
    csv_files = glob.glob(pattern)
    dataframes = []
    
    for file in csv_files:
        df = pd.read_csv(file)
        df['source_file'] = file  # ソースファイル名を追加
        dataframes.append(df)
    
    # 縦方向に結合
    merged_df = pd.concat(dataframes, ignore_index=True)
    merged_df.to_csv(output_file, index=False, encoding='utf-8')
    
    print(f"結合完了: {len(csv_files)}ファイル → {output_file}")
    return merged_df

# 使用例
# merged = merge_csv_files('sales_*.csv', 'all_sales.csv')

異なる構造のCSVを結合

import pandas as pd

def smart_csv_merge(files, output_file):
    """異なる列構造のCSVを安全に結合"""
    dataframes = []
    
    for file in files:
        df = pd.read_csv(file)
        print(f"{file}: {df.shape[0]}行, {df.shape[1]}列")
        dataframes.append(df)
    
    # 外部結合で全ての列を保持
    merged = pd.concat(dataframes, ignore_index=True, sort=False)
    
    # 欠損値の確認
    print("\n=== 結合後の欠損値 ===")
    print(merged.isnull().sum())
    
    merged.to_csv(output_file, index=False, encoding='utf-8')
    return merged

# 使用例
# files = ['sales_2023.csv', 'sales_2024.csv']
# result = smart_csv_merge(files, 'combined_sales.csv')

エラーハンドリング

堅牢なCSV読み込み

import csv
import pandas as pd

def robust_csv_read(filename, use_pandas=False):
    """エラーに強いCSV読み込み"""
    try:
        if use_pandas:
            # pandasでの読み込み
            df = pd.read_csv(filename, encoding='utf-8')
            return df, None
        else:
            # 標準csvモジュールでの読み込み
            data = []
            with open(filename, 'r', encoding='utf-8') as file:
                reader = csv.DictReader(file)
                for row_num, row in enumerate(reader, 1):
                    try:
                        data.append(dict(row))
                    except Exception as e:
                        print(f"行{row_num}でエラー: {e}")
            return data, None
    
    except FileNotFoundError:
        return None, "ファイルが見つかりません"
    except pd.errors.EmptyDataError:
        return None, "空のCSVファイルです"
    except UnicodeDecodeError:
        return None, "文字エンコーディングエラー"
    except Exception as e:
        return None, f"予期しないエラー: {e}"

# 使用例
data, error = robust_csv_read('data.csv', use_pandas=True)
if error:
    print(f"エラー: {error}")
else:
    print(f"読み込み成功: {len(data)}行")

文字エンコーディング自動検出

import chardet
import pandas as pd

def detect_encoding_and_read(filename):
    """文字エンコーディングを自動検出してCSV読み込み"""
    # エンコーディング検出
    with open(filename, 'rb') as file:
        raw_data = file.read(10000)  # 最初の10KB
        encoding = chardet.detect(raw_data)['encoding']
    
    print(f"検出されたエンコーディング: {encoding}")
    
    try:
        df = pd.read_csv(filename, encoding=encoding)
        return df, encoding
    except UnicodeDecodeError:
        # フォールバック: utf-8 with error handling
        df = pd.read_csv(filename, encoding='utf-8', errors='ignore')
        return df, 'utf-8 (with errors ignored)'

# 使用例
# df, encoding = detect_encoding_and_read('unknown_encoding.csv')
# print(f"使用エンコーディング: {encoding}")

実用的なCSV処理ツール

CSVファイル情報の取得

import csv
import os
from collections import Counter

def analyze_csv_file(filename):
    """CSVファイルの詳細分析"""
    if not os.path.exists(filename):
        return None
    
    analysis = {
        'file_size': os.path.getsize(filename),
        'row_count': 0,
        'column_count': 0,
        'columns': [],
        'sample_data': []
    }
    
    with open(filename, 'r', encoding='utf-8') as file:
        reader = csv.reader(file)
        
        # ヘッダー処理
        try:
            header = next(reader)
            analysis['columns'] = header
            analysis['column_count'] = len(header)
        except StopIteration:
            return analysis
        
        # データ行の処理
        for i, row in enumerate(reader):
            analysis['row_count'] += 1
            if i < 3:  # 最初の3行をサンプルとして保存
                analysis['sample_data'].append(row)
    
    return analysis

# 使用例
info = analyze_csv_file('data.csv')
if info:
    print(f"ファイルサイズ: {info['file_size']:,} bytes")
    print(f"行数: {info['row_count']:,}")
    print(f"列数: {info['column_count']}")
    print(f"カラム: {info['columns']}")

CSVフォーマットバリデーター

import csv
import io

def validate_csv_format(filename):
    """CSVファイルの形式検証"""
    issues = []
    
    try:
        with open(filename, 'r', encoding='utf-8') as file:
            # CSVフォーマットの基本チェック
            sample = file.read(1024)
            file.seek(0)
            
            # 区切り文字の推定
            sniffer = csv.Sniffer()
            dialect = sniffer.sniff(sample)
            
            reader = csv.reader(file, dialect)
            rows = list(reader)
            
            if not rows:
                issues.append("空のファイルです")
                return issues
            
            # 列数の一貫性チェック
            column_counts = [len(row) for row in rows]
            if len(set(column_counts)) > 1:
                issues.append(f"行ごとに列数が異なります: {set(column_counts)}")
            
            # ヘッダーの重複チェック
            if len(rows) > 0:
                header = rows[0]
                if len(header) != len(set(header)):
                    issues.append("ヘッダーに重複があります")
            
            print(f"検出された区切り文字: '{dialect.delimiter}'")
            print(f"クォート文字: '{dialect.quotechar}'")
    
    except Exception as e:
        issues.append(f"ファイル読み込みエラー: {e}")
    
    return issues

# 使用例
problems = validate_csv_format('data.csv')
if problems:
    print("検出された問題:")
    for problem in problems:
        print(f"  - {problem}")
else:
    print("CSVフォーマットに問題はありません")

まとめ

Python での CSV 操作は、標準 csv モジュールと pandas を適切に使い分けることで、様々な要件に対応できます。データの規模と処理内容に応じて最適な手法を選択することが重要です。

手法の使い分け:

  • 小〜中規模データ: 標準csvモジュール
  • 大規模データ・複雑な分析: pandas
  • メモリ制約がある場合: チャンク処理
  • リアルタイム処理: 標準csvモジュール

重要なポイント:

  • エンコーディングの適切な指定(utf-8推奨)
  • エラーハンドリングによる堅牢な処理
  • データバリデーションで品質保証
  • メモリ効率を考慮した大容量ファイル処理
  • 型変換で適切なデータ型に変換

実用的なベストプラクティス:

  • with文を使った安全なファイル操作
  • DictReader/DictWriterでの可読性向上
  • チャンク処理による大容量ファイル対応
  • 文字エンコーディング自動検出
  • データクリーニングとバリデーション

本記事のサンプルコードを参考に、あなたのプロジェクトに最適なCSV処理を実装してください。適切なライブラリ選択とエラーハンドリングにより、実用的なデータ処理システムを構築できます。

参考文献

  • Python公式csvドキュメント
  • pandas公式ドキュメント
  • RFC 4180 – CSV仕様
  • Python Data Analysis Library

■プロンプトだけでオリジナルアプリを開発・公開してみた!!

■AI時代の第一歩!「AI駆動開発コース」はじめました!

テックジム東京本校で先行開始。

■テックジム東京本校

「武田塾」のプログラミング版といえば「テックジム」。
講義動画なし、教科書なし。「進捗管理とコーチング」で効率学習。
より早く、より安く、しかも対面型のプログラミングスクールです。

<短期講習>5日で5万円の「Pythonミニキャンプ」開催中。

<月1開催>放送作家による映像ディレクター養成講座

<オンライン無料>ゼロから始めるPython爆速講座