← ブログ一覧へ戻る ← Back to blog index

/blog/rust-csv-etl-lab

CSV整形からETLへ From CSV cleanup to ETL

仮想顧客データのCSV整形からETLの形になるまでの過程まとめ。 A summary of the process from cleaning sample customer CSV data to shaping it into an ETL workflow.

  • Rust
  • ETL
  • CSV
  • DataEngineering
  • Polars

データ形式が汚れた状態からスタート

仮想のデータの形式の汚れ方を調べ、以下の問題を把握。

  • 前後空白
  • 大文字小文字の揺れ
  • 日付表記の混在
  • 電話番号フォーマットのばらつき
  • 国や地域の表記揺れ
  • total_spend のカンマや通貨記号
  • 真偽値の揺れ
  • emailphone の入れ替わり
  • 重複や埋め込みヘッダ

元のデータはまだ正規化できる状態だから、parse error で不正を全部弾かずに、品質ルールを設けて篩にかけることとする。

成果物が説明できること

成果物はcleaned CSVだけでなく、次の問いにログで説明できることもセットになる。

  • 何行読めたか
  • 何行スキップしたか
  • 失敗が多いのはどの列か
  • 不正値は空にしたか、失敗として残したか
  • 埋め込みヘッダや malformed row をどう処理したか

問いに答えるため、まずColumnenumとして導入し、列を定義する。

pub enum Column {
    CustomerId = 0,
    FullName = 1,
    Email = 2,
    Phone = 3,
}

この小さな抽象化がCSV 処理のoff-by-one(列や行が一つずれること)の防止に効果的。

列ごとにモジュール分割

各列ごとに専用モジュールへと振り分ける。

pub fn process(column: Column, raw: &str) -> FieldResult {
    match column {
        Column::CustomerId => customer_id::process(raw),
        Column::FullName => full_name::process(raw),
        Column::Email => email::process(raw),
        Column::Phone => phone::process(raw),
    }
}

これにより、「CSV 全体を処理するコード」から「列ごとの正規化規則」が切り出される。例えば、メールアドレス列と電話番号列は、同じ文字列整形でも違うモジュールで処理される。

メールアドレス列では、構造的な最低限の検査を行う。

if parts.next().is_some() || local.is_empty() || domain.is_empty() {
    return FieldResult::failure(compact, "email must contain exactly one @");
}

電話番号列では、全角や記号を落としてから digit count で妥当性を見る。

let digits = out.strip_prefix('+').unwrap_or(&out);
if digits.len() < 7 || digits.len() > 15 {
    return FieldResult::failure(out, "phone number must have 7 to 15 digits");
}

品質ルールは、汎用 validator ではなく、列ごとの局所ルールとして置くほうが保守しやすい。ETL の変更が列単位で起きやすいため。

FieldResult による best effort と説明責任

FieldResult は、この実験の中心。

pub struct FieldResult {
    pub value: String,
    pub disposition: FieldDisposition,
    pub reason: Option<String>,
}

重要なのは、「失敗したら行全体を落とす」設計ではなく、次の状態を明示する点である。

  • Success
  • Empty
  • Failed

失敗時でも value を保持するため、issue log に raw input と cleaned fallback の両方を記録可能。

これにより、次の二つを両立。

  • できるところまで cleaned output を出す
  • どこで何が怪しかったかを後から点検できる

きれいな CSV だけを出すなら不要な層である。しかし、後続調査を前提にするなら必要な構造。

埋め込みヘッダと malformed row の可視化

整形処理では parse_input の結果を SkippedMalformedData に分けて受け取る。さらに is_header_row で埋め込みヘッダを検知し、issue log に記録。

if is_header_row(&row.fields) {
    report.record_row_skipped(
        row.line_number,
        &row.raw,
        "embedded header row inside the dataset",
    );
    continue;
}

重要なのは、埋め込みヘッダを無視するだけでなく、「なぜスキップしたか」を残す点である。データパイプラインでは、処理が落ちなかったことより、「何を無視したか」が後で問われる。

issue log の分離

実装では、整形済み CSV と issue log を分離して出力。

const OUTPUT_PATH: &str = "formatted.csv";
const ISSUE_LOG_PATH: &str = "issues.csv";

さらに summary を標準出力へ出し、rows_writtenrows_with_failuresmalformed_rows、列別統計まで確認可能。これにより、実行一回を「成果物」と「診断情報」のセットとして扱える。

この発想は、運用設計にも接続可能。文書側ではさらに、次の拡張を想定。

  • run_id ごとの出力管理
  • 機械可読な summary ファイル
  • S3 上の入出力契約
  • EventBridge と CloudWatch Logs

この段階の実装は、ローカル練習ではなく、運用へ持ち込む ETL コアの素地である。

Polars は比較軸

比較用の Polars 実験はまだ最小構成。DataFrame ベースの方向を触った段階である。現状で主役にするには薄いが、比較軸としては有効。

今回の ETL は列ごとのルールが細かく、FieldResult のような説明責任も必要。そのため初期段階では、DataFrame ベースの一括変換より、列モジュールを明示的に持つ Rust 実装のほうが読みやすい。

一方で、将来の再評価余地。

  • 集計中心の処理が増える
  • 列ルールがもっと宣言的に書ける
  • 変換ログを別に持てる

この条件が揃うなら Polars を再評価。今すぐ置き換える相手ではなく、「どこまで hand-written ETL を維持するか」を考える比較対象である。

要点

ETLの育て方は「変換ロジックを書くこと」だけではない。「変換の説明責任を構造として持たせること」が重要。整理すると次の通り。

段階課題焦点
1生の顧客 CSVどこが汚れているかを観察する
2列定義の導入列を静的に定義する
3列ごとの処理分割列ごとの正規化責務を分ける
4実行結果の記録成功・空・失敗を計測可能にする
5運用設計への拡張ローカル処理を運用単位に拡張する
Image preview