chainerの畳み込みニューラルネットワークで10種類の画像を識別(CIFAR-10)
<システムバージョン>
ubuntu 14.04
python 3.5.1
chainer 1.7.0
CIFAR-10 and CIFAR-100 datasets
32x32pixelのカラー画像を10のクラスに分類します。
訓練画像が50000枚、テスト画像が10000枚です。
CIFAR-10のデータは、各画像サンプルがchannel(3チャンネル)、row(32ピクセル)、column(32ピクセル)のフラット形式3*32*32=3072次元ベクトルの形で格納されている。
Chainerでは画像を (Nsample, channel, height, width) の形式にする必要があるためreshape()して次元を変換している。
詳しくはここに書いてあります
まずは、データの読み込み。
データのファイルはこのように分かれているために、読み込みには工夫が必要。
import sys import pickle import numpy as np def unpickle(file): fp = open(file, 'rb') if sys.version_info.major == 2: data = pickle.load(fp) elif sys.version_info.major == 3: data = pickle.load(fp, encoding='latin-1') fp.close() return data X_train = None y_train = [] for i in range(1,6): data_dic = unpickle("cifar-10-batches-py/data_batch_{}".format(i)) if i == 1: X_train = data_dic['data'] else: X_train = np.vstack((X_train, data_dic['data'])) y_train += data_dic['labels'] test_data_dic = unpickle("cifar-10-batches-py/test_batch") X_test = test_data_dic['data'] X_test = X_test.reshape(len(X_test),3,32,32) y_test = np.array(test_data_dic['labels']) X_train = X_train.reshape((len(X_train),3, 32, 32)) y_train = np.array(y_train) X_train = X_train.astype(np.float32) X_test = X_test.astype(np.float32) X_train /= 255 X_test /= 255 y_train = y_train.astype(np.int32) y_test = y_test.astype(np.int32)
畳込み層のconv1=F.Convolution2D(3, 32, 3, pad=0)は、入力が3チャンネル(RGB)、出力の特徴マップが32チャンネル、フィルタ(カーネル)サイズが3x3、パディングサイズが0ということ。
モデルと順伝播を記述する。
今回は、
入力 → 畳み込み1 → 畳み込み2 → プーリング層 → 畳み込み4 → プーリング層 → 畳み込み5 → 畳み込み6 → プーリング層 → 全結合層 → 出力
という畳込みニューラルネットワークとなっている。
model = chainer.FunctionSet(conv1=F.Convolution2D(3, 32, 3, pad=1), conv2=F.Convolution2D(32, 32, 3, pad=1), conv3=F.Convolution2D(32, 32, 3, pad=1), conv4=F.Convolution2D(32, 32, 3, pad=1), conv5=F.Convolution2D(32, 32, 3, pad=1), conv6=F.Convolution2D(32, 32, 3, pad=1), l1=F.Linear(512, 512), l2=F.Linear(512, 10)) def forward(x_data, y_data, train=True): x, t = chainer.Variable(x_data), chainer.Variable(y_data) h = F.relu(model.conv1(x)) h = F.max_pooling_2d(F.relu(model.conv2(h)), 2) h = F.relu(model.conv3(h)) h = F.max_pooling_2d(F.relu(model.conv4(h)), 2) h = F.relu(model.conv5(h)) h = F.max_pooling_2d(F.relu(model.conv6(h)), 2) h = F.dropout(F.relu(model.l1(h)), train=train) y = model.l2(h) return F.softmax_cross_entropy(y, t), F.accuracy(y, t)
みんな使ってるAdamを使う。
optimizer = optimizers.Adam() optimizer.setup(model)
あとは訓練していくだけ!
エポック数40にしました。
for epoch in range(40): print("epoch", epoch+1) perm = np.random.permutation(N) sum_accuracy = 0 sum_loss = 0 for i in tqdm(range(0, N, batch_size)): X_batch = xp.asarray(X_train[perm[i:i+batch_size]]) y_batch = xp.asarray(y_train[perm[i:i+batch_size]]) optimizer.zero_grads() loss, acc = forward(X_batch, y_batch) loss.backward() optimizer.update() train_loss.append(loss.data) train_acc.append(acc.data) sum_loss += float(loss.data) * batch_size sum_accuracy += float(acc.data) * batch_size print("train mean loss={}, accuracy={}".format(sum_loss/N, sum_accuracy/N)) sum_accuracy = 0 sum_loss = 0 for i in tqdm(range(0, N_test, batch_size)): X_batch = xp.asarray(X_train[perm[i:i+batch_size]]) y_batch = xp.asarray(y_train[perm[i:i+batch_size]]) loss, acc = forward(X_batch, y_batch) test_loss.append(loss.data) test_acc.append(acc.data) sum_loss += float(loss.data) * batch_size sum_accuracy += float(acc.data) * batch_size print("test mean loss={}, accuracy={}".format(sum_loss/N_test, sum_accuracy/N_test))
結果
最高正答率:93.86%
結構すごいと思います。
あと改善するとしたら、畳込み層のフィルタ数・活性化関数の種類・前処理ですね。
Classification datasets results
このサイトによると最高の正解率は96.53%とのことですので、もう少し頑張ってみたいと思います。
前処理ほぼ何もしてないのでもう3%なら上がる気がします。
そのまえにプログラムがちゃんと合ってるのか不安だけど...(何たってchainer歴2日だから笑)
全体のコード
import sys import pickle import time import argparse import numpy as np from tqdm import tqdm import chainer from chainer import cuda, Function, gradient_check, Variable, optimizers, serializers, utils from chainer import Link, Chain, ChainList, FunctionSet import chainer.functions as F import chainer.links as L import matplotlib.pyplot as plt parser = argparse.ArgumentParser(description='Chainer example: CIFAR-10') parser.add_argument('--gpu', '-gpu', default=-1, type=int, help='GPU ID (negative value indicates CPU)') # GPUが使えるか確認 args = parser.parse_args() if args.gpu >= 0: cuda.check_cuda_available() xp = cuda.cupy if args.gpu >= 0 else np def unpickle(file): fp = open(file, 'rb') if sys.version_info.major == 2: data = pickle.load(fp) elif sys.version_info.major == 3: data = pickle.load(fp, encoding='latin-1') fp.close() return data X_train = None y_train = [] for i in range(1,6): data_dic = unpickle("cifar-10-batches-py/data_batch_{}".format(i)) if i == 1: X_train = data_dic['data'] else: X_train = np.vstack((X_train, data_dic['data'])) y_train += data_dic['labels'] test_data_dic = unpickle("cifar-10-batches-py/test_batch") X_test = test_data_dic['data'] X_test = X_test.reshape(len(X_test),3,32,32) y_test = np.array(test_data_dic['labels']) X_train = X_train.reshape((len(X_train),3, 32, 32)) y_train = np.array(y_train) X_train = X_train.astype(np.float32) X_test = X_test.astype(np.float32) X_train /= 255 X_test /= 255 y_train = y_train.astype(np.int32) y_test = y_test.astype(np.int32) # 畳み込み6層 model = chainer.FunctionSet(conv1=F.Convolution2D(3, 32, 3, pad=1), conv2=F.Convolution2D(32, 32, 3, pad=1), conv3=F.Convolution2D(32, 32, 3, pad=1), conv4=F.Convolution2D(32, 32, 3, pad=1), conv5=F.Convolution2D(32, 32, 3, pad=1), conv6=F.Convolution2D(32, 32, 3, pad=1), l1=F.Linear(512, 512), l2=F.Linear(512, 10)) # GPU使用のときはGPUにモデルを転送 if args.gpu >= 0: cuda.get_device(args.gpu).use() model.to_gpu() def forward(x_data, y_data, train=True): x, t = chainer.Variable(x_data), chainer.Variable(y_data) h = F.relu(model.conv1(x)) h = F.max_pooling_2d(F.relu(model.conv2(h)), 2) h = F.relu(model.conv3(h)) h = F.max_pooling_2d(F.relu(model.conv4(h)), 2) h = F.relu(model.conv5(h)) h = F.max_pooling_2d(F.relu(model.conv6(h)), 2) h = F.dropout(F.relu(model.l1(h)), train=train) y = model.l2(h) return F.softmax_cross_entropy(y, t), F.accuracy(y, t) optimizer = optimizers.Adam() optimizer.setup(model) train_loss = [] train_acc = [] test_loss = [] test_acc = [] N = 50000 N_test = 10000 batch_size = 100 start_time = time.clock() for epoch in range(40): print("epoch", epoch+1) perm = np.random.permutation(N) sum_accuracy = 0 sum_loss = 0 for i in tqdm(range(0, N, batch_size)): X_batch = xp.asarray(X_train[perm[i:i+batch_size]]) y_batch = xp.asarray(y_train[perm[i:i+batch_size]]) optimizer.zero_grads() loss, acc = forward(X_batch, y_batch) loss.backward() optimizer.update() train_loss.append(loss.data) train_acc.append(acc.data) sum_loss += float(loss.data) * batch_size sum_accuracy += float(acc.data) * batch_size print("train mean loss={}, accuracy={}".format(sum_loss/N, sum_accuracy/N)) sum_accuracy = 0 sum_loss = 0 for i in tqdm(range(0, N_test, batch_size)): X_batch = xp.asarray(X_train[perm[i:i+batch_size]]) y_batch = xp.asarray(y_train[perm[i:i+batch_size]]) loss, acc = forward(X_batch, y_batch) test_loss.append(loss.data) test_acc.append(acc.data) sum_loss += float(loss.data) * batch_size sum_accuracy += float(acc.data) * batch_size print("test mean loss={}, accuracy={}".format(sum_loss/N_test, sum_accuracy/N_test)) end_time = time.clock() print("total time : ", end_time - start_time)