Embulk を使って CSV から MySQL へデータ投入してみた

今回は Embulk を使って、CSV から MySQL にデータを投入してみたいと思います。

Embulk とは

Embulk とはバルクデータローダーと呼ばれる、データ転送ツールです。

Fluentd を開発した Tresure Data によって開発され、Fluentd を知っている方は、Fluentd のバッチ版だと考えると理解しやすいと思います。

(Fluentd についてはこちらで記事にしています)

Fluentd のように入力・出力のプラグインは多数公開されており、例えば、MySQL の入力プラグインと、ElasticSearch の出力プラグインを組み合わせることによって、MySQL のデータを ElasticSearch に移したりなど、異なるデータストア間でのデータの交換が容易に行うことができます。

作業環境

※ Embulk の実行には Java が必須となるので、事前にインストールしておきましょう。

Embulk のインストール

GitHub に記載されている通りにコマンドを叩いていきます。

$ curl --create-dirs -o ~/.embulk/bin/embulk -L "http://dl.embulk.org/embulk-latest.jar"
$ chmod +x ~/.embulk/bin/embulk
$ echo 'export PATH="$HOME/.embulk/bin:$PATH"' >> ~/.bashrc
$ source ~/.bashrc

以上で完了となるので、バージョンを確認してみます。

$ embulk --version
embulk 0.8.13

seed.yml から設定ファイルを自動生成する

livedoor グルメの研究用データセットを使い、口コミのデータを MySQL に投入したいと思います。

まずは、口コミデータ CSV をダウンロードしてきます。

$ wget -O test.tar.gz https://github.com/livedoor/datasets/blob/master/ldgourmet.tar.gz?raw=true
$ tar xfvz test.tar.gz
areas.csv
categories.csv
prefs.csv
ratings.csv
rating_votes.csv
restaurants.csv
stations.csv

何個か CSV ファイルがありますが、この中から ratings.csv のみ使用していきます。

ratings.csv は次のような内容になっています。

id,restaurant_id,user_id,total,food,service,atmosphere,cost_performance,title,body,purpose,created_on
156445,310595,ee02f26a,5,0,0,0,0,,"...",0,"2006-10-07 05:06:09"
3842,10237,fcc21401,1,0,0,0,0,,"...",0,"2004-10-20 00:34:28"
144379,3334,06412af7,2,0,0,0,0,,"...",0,"2006-06-03 16:07:43"
...

Embulk は config.yml ファイルを元にデータの移行を行うのですが、ゼロから記述する場合、CSV ファイルの文字コード、改行コード、デリミタなどをいちいち指定していくのは大変なので、設定ファイルを自動生成してくれる embulk guess コマンドを使って、config.yml ファイルを作成したいと思います。

自動生成するにしても最低限の情報がなければいけないので、seed.yml ファイルに次のように記述します。

  • seed.yml
in:
  type: file
  path_prefix: "./ratings.csv"
out:
  type: stdout

出力ファイル名に config.yml を指定して embulk guess コマンドを実行します。

$ embulk guess seed.yml -o config.yml
2016-09-05 16:30:57.386 +0000: Embulk v0.8.13
2016-09-05 16:30:58.470 +0000 [INFO] (0001:guess): Listing local files at directory '.' filtering filename by prefix 'ratings.csv'
2016-09-05 16:30:58.474 +0000 [INFO] (0001:guess): Loading files [ratings.csv]
2016-09-05 16:30:58.620 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/gzip from a load path
2016-09-05 16:30:58.630 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/bzip2 from a load path
2016-09-05 16:30:58.645 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/json from a load path
2016-09-05 16:30:58.653 +0000 [INFO] (0001:guess): Loaded plugin embulk/guess/csv from a load path
in:
  type: file
  path_prefix: ./ratings.csv
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: restaurant_id, type: long}
    - {name: user_id, type: string}
    - {name: total, type: long}
    - {name: food, type: long}
    - {name: service, type: long}
    - {name: atmosphere, type: long}
    - {name: cost_performance, type: long}
    - {name: title, type: string}
    - {name: body, type: string}
    - {name: purpose, type: long}
    - {name: created_on, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
out: {type: stdout}
Created 'config.yml' file.

これで次のような config.yml が生成されました。

  • config.yml
in:
  type: file
  path_prefix: ./ratings.csv
  parser:
    charset: UTF-8
    newline: CRLF
    type: csv
    delimiter: ','
    quote: '"'
    escape: '"'
    trim_if_not_quoted: false
    skip_header_lines: 1
    allow_extra_columns: false
    allow_optional_columns: false
    columns:
    - {name: id, type: long}
    - {name: restaurant_id, type: long}
    - {name: user_id, type: string}
    - {name: total, type: long}
    - {name: food, type: long}
    - {name: service, type: long}
    - {name: atmosphere, type: long}
    - {name: cost_performance, type: long}
    - {name: title, type: string}
    - {name: body, type: string}
    - {name: purpose, type: long}
    - {name: created_on, type: timestamp, format: '%Y-%m-%d %H:%M:%S'}
out: {type: stdout}

embulk-filter-column プラグイン

このまま MySQL にデータを取り込んでも良いのですが、今回は必要なカラムを絞り、id, restaurant_id, user_id, total, created_on の5つのみのデータを扱うことにしたいと思います。

カラムの絞り込みには embulk-filter-column という便利なプラグインがあるので、これを使ってみたいと思います。

まずは embulk gem install コマンドでインストールします。

$ embulk gem install embulk-filter-column
2016-09-05 16:37:50.601 +0000: Embulk v0.8.13
Fetching: embulk-filter-column-0.5.4.gem (100%)
Successfully installed embulk-filter-column-0.5.4
1 gem installed

インストールが終わったら config.ymlinout の間に、次のように必要なカラムのみ記述します。

  • config.yml
in:
(略)
filters:
  - type: column
    columns:
      - {name: 'id'}
      - {name: 'restaurant_id'}
      - {name: 'user_id'}
      - {name: 'total'}
      - {name: 'created_on'}
out: {type: stdout}

MySQL にデータを取り込む

いよいよ MySQL にデータを取り込んでみたいと思います。

embulk-filter-column プラグインの時と同じように、今度は MySQL の出力用のプラグインembulk-output-mysql をインストールします。

$ embulk gem install embulk-output-mysql
2016-09-05 16:50:54.513 +0000: Embulk v0.8.13
Fetching: embulk-output-mysql-0.6.3.gem (100%)
Successfully installed embulk-output-mysql-0.6.3
1 gem installed

プラグインのインストールが完了したら config.ymloutembulk-output-mysql プラグイン用の設定を記述していきます。

  • config.yml
out:
  type: mysql
  host: localhost
  user: root
  password: uhsd!Aid;3Zt
  database: testdb
  table: ratings
  mode: replace

mode という部分は insert も選択することができ、Embulk を繰り返した時に、insert だと追記、replace だと置き換えになります。今回は replace を選択しました。

データベースは事前に作成しておく必要がありますが、テーブルやスキーマの作成は Embulk 側で自動で作成してくれるので、事前に作成する必要はありません。

それでは embulk run コマンドでデータの取り込みを開始したいと思います。

$ embulk run config.yml

20万件以上のデータなので、少し時間がかかると思います。

※ 私の環境では完了するまで 5 分ほどかかりました。

データの取り込みが完了したので、MySQL のデータを確認してます。

mysql> select count(*) from ratings;
+----------+
| count(*) |
+----------+
|   205805 |
+----------+
1 row in set (0.04 sec)

mysql> select * from ratings limit 20;
+--------+---------------+----------+-------+---------------------+
| id     | restaurant_id | user_id  | total | created_on          |
+--------+---------------+----------+-------+---------------------+
| 156445 |        310595 | ee02f26a |     5 | 2006-10-07 05:06:09 |
|   3842 |         10237 | fcc21401 |     1 | 2004-10-20 00:34:28 |
| 144379 |          3334 | 06412af7 |     2 | 2006-06-03 16:07:43 |
| 144377 |         15163 | 06412af7 |     5 | 2006-06-03 15:14:45 |
|  75967 |           567 | 4ceec99d |     3 | 2004-12-01 23:12:29 |
| 104898 |          1026 | 4ceec99d |     5 | 2005-01-04 03:57:02 |
|  86073 |          1058 | 4ceec99d |     5 | 2004-11-09 00:34:17 |
|  13968 |          2569 | 4ceec99d |     3 | 2004-09-22 23:29:57 |
|  97833 |          3309 | 4ceec99d |     4 | 2005-05-28 23:17:16 |
|  13991 |          3648 | 4ceec99d |     4 | 2004-09-27 11:14:50 |
|  69284 |          4226 | 4ceec99d |     5 | 2004-10-31 16:32:43 |
|   5658 |          4659 | 4ceec99d |     4 | 2004-09-22 23:04:12 |
|  89226 |          5075 | 4ceec99d |     5 | 2005-03-21 22:56:45 |
|  15001 |          5099 | 4ceec99d |     3 | 2004-09-24 11:42:10 |
| 116898 |          5219 | 4ceec99d |     5 | 2005-09-20 00:53:40 |
|  21991 |          5449 | 4ceec99d |     5 | 2004-11-10 00:13:09 |
|  68974 |          6033 | 4ceec99d |     4 | 2004-11-03 23:45:02 |
|  15434 |          6899 | 4ceec99d |     5 | 2004-09-22 22:29:51 |
| 116890 |          8638 | 4ceec99d |     4 | 2005-09-20 00:34:25 |
|  13997 |          8895 | 4ceec99d |     5 | 2004-09-27 10:54:32 |
+--------+---------------+----------+-------+---------------------+
20 rows in set (0.00 sec)

mysql> select total, count(*) from ratings group by total;
+-------+----------+
| total | count(*) |
+-------+----------+
|     0 |     7159 |
|     1 |     4975 |
|     2 |    14565 |
|     3 |    64743 |
|     4 |    79593 |
|     5 |    34770 |
+-------+----------+
6 rows in set (0.10 sec)

まとめ

Embulk を使って CSV から MySQL へデータを投入してみました。

もう一度 Embulk を使うことによって、今度は MySQL から ElasticSearch などにデータを投入することもできるので、いろいろな使い方を考えていきたいと思います。

データ分析基盤構築入門[Fluentd、Elasticsearch、Kibanaによるログ収集と可視化]

データ分析基盤構築入門[Fluentd、Elasticsearch、Kibanaによるログ収集と可視化]