データ形式が汚れた状態からスタート
仮想のデータの形式の汚れ方を調べ、以下の問題を把握。
- 前後空白
- 大文字小文字の揺れ
- 日付表記の混在
- 電話番号フォーマットのばらつき
- 国や地域の表記揺れ
total_spendのカンマや通貨記号- 真偽値の揺れ
emailとphoneの入れ替わり- 重複や埋め込みヘッダ
元のデータはまだ正規化できる状態だから、parse error で不正を全部弾かずに、品質ルールを設けて篩にかけることとする。
成果物が説明できること
成果物はcleaned CSVだけでなく、次の問いにログで説明できることもセットになる。
- 何行読めたか
- 何行スキップしたか
- 失敗が多いのはどの列か
- 不正値は空にしたか、失敗として残したか
- 埋め込みヘッダや malformed row をどう処理したか
問いに答えるため、まずColumn をenumとして導入し、列を定義する。
pub enum Column {
CustomerId = 0,
FullName = 1,
Email = 2,
Phone = 3,
}
この小さな抽象化がCSV 処理のoff-by-one(列や行が一つずれること)の防止に効果的。
列ごとにモジュール分割
各列ごとに専用モジュールへと振り分ける。
pub fn process(column: Column, raw: &str) -> FieldResult {
match column {
Column::CustomerId => customer_id::process(raw),
Column::FullName => full_name::process(raw),
Column::Email => email::process(raw),
Column::Phone => phone::process(raw),
}
}
これにより、「CSV 全体を処理するコード」から「列ごとの正規化規則」が切り出される。例えば、メールアドレス列と電話番号列は、同じ文字列整形でも違うモジュールで処理される。
メールアドレス列では、構造的な最低限の検査を行う。
if parts.next().is_some() || local.is_empty() || domain.is_empty() {
return FieldResult::failure(compact, "email must contain exactly one @");
}
電話番号列では、全角や記号を落としてから digit count で妥当性を見る。
let digits = out.strip_prefix('+').unwrap_or(&out);
if digits.len() < 7 || digits.len() > 15 {
return FieldResult::failure(out, "phone number must have 7 to 15 digits");
}
品質ルールは、汎用 validator ではなく、列ごとの局所ルールとして置くほうが保守しやすい。ETL の変更が列単位で起きやすいため。
FieldResult による best effort と説明責任
FieldResult は、この実験の中心。
pub struct FieldResult {
pub value: String,
pub disposition: FieldDisposition,
pub reason: Option<String>,
}
重要なのは、「失敗したら行全体を落とす」設計ではなく、次の状態を明示する点である。
SuccessEmptyFailed
失敗時でも value を保持するため、issue log に raw input と cleaned fallback の両方を記録可能。
これにより、次の二つを両立。
- できるところまで cleaned output を出す
- どこで何が怪しかったかを後から点検できる
きれいな CSV だけを出すなら不要な層である。しかし、後続調査を前提にするなら必要な構造。
埋め込みヘッダと malformed row の可視化
整形処理では parse_input の結果を Skipped、Malformed、Data に分けて受け取る。さらに is_header_row で埋め込みヘッダを検知し、issue log に記録。
if is_header_row(&row.fields) {
report.record_row_skipped(
row.line_number,
&row.raw,
"embedded header row inside the dataset",
);
continue;
}
重要なのは、埋め込みヘッダを無視するだけでなく、「なぜスキップしたか」を残す点である。データパイプラインでは、処理が落ちなかったことより、「何を無視したか」が後で問われる。
issue log の分離
実装では、整形済み CSV と issue log を分離して出力。
const OUTPUT_PATH: &str = "formatted.csv";
const ISSUE_LOG_PATH: &str = "issues.csv";
さらに summary を標準出力へ出し、rows_written、rows_with_failures、malformed_rows、列別統計まで確認可能。これにより、実行一回を「成果物」と「診断情報」のセットとして扱える。
この発想は、運用設計にも接続可能。文書側ではさらに、次の拡張を想定。
run_idごとの出力管理- 機械可読な summary ファイル
- S3 上の入出力契約
- EventBridge と CloudWatch Logs
この段階の実装は、ローカル練習ではなく、運用へ持ち込む ETL コアの素地である。
Polars は比較軸
比較用の Polars 実験はまだ最小構成。DataFrame ベースの方向を触った段階である。現状で主役にするには薄いが、比較軸としては有効。
今回の ETL は列ごとのルールが細かく、FieldResult のような説明責任も必要。そのため初期段階では、DataFrame ベースの一括変換より、列モジュールを明示的に持つ Rust 実装のほうが読みやすい。
一方で、将来の再評価余地。
- 集計中心の処理が増える
- 列ルールがもっと宣言的に書ける
- 変換ログを別に持てる
この条件が揃うなら Polars を再評価。今すぐ置き換える相手ではなく、「どこまで hand-written ETL を維持するか」を考える比較対象である。
要点
ETLの育て方は「変換ロジックを書くこと」だけではない。「変換の説明責任を構造として持たせること」が重要。整理すると次の通り。
| 段階 | 課題 | 焦点 |
|---|---|---|
| 1 | 生の顧客 CSV | どこが汚れているかを観察する |
| 2 | 列定義の導入 | 列を静的に定義する |
| 3 | 列ごとの処理分割 | 列ごとの正規化責務を分ける |
| 4 | 実行結果の記録 | 成功・空・失敗を計測可能にする |
| 5 | 運用設計への拡張 | ローカル処理を運用単位に拡張する |