読者です 読者をやめる 読者になる 読者になる

Deep Learning Tutorial

chainer and python

chainerの畳み込みニューラルネットワークで10種類の画像を識別(CIFAR-10)

<システムバージョン>
ubuntu 14.04
python 3.5.1
chainer 1.7.0

f:id:tsuruchan_0827:20160304130419p:plain
CIFAR-10 and CIFAR-100 datasets

32x32pixelのカラー画像を10のクラスに分類します。
訓練画像が50000枚、テスト画像が10000枚です。

CIFAR-10のデータは、各画像サンプルがchannel(3チャンネル)、row(32ピクセル)、column(32ピクセル)のフラット形式3*32*32=3072次元ベクトルの形で格納されている。
Chainerでは画像を (Nsample, channel, height, width) の形式にする必要があるためreshape()して次元を変換している。
詳しくはここに書いてあります

まずは、データの読み込み。
データのファイルはこのように分かれているために、読み込みには工夫が必要。
f:id:tsuruchan_0827:20160304130852p:plain

import sys
import pickle
import numpy as np

def unpickle(file):
    fp = open(file, 'rb')
    if sys.version_info.major == 2:
        data = pickle.load(fp)
    elif sys.version_info.major == 3:
        data = pickle.load(fp, encoding='latin-1')
    fp.close()

    return data

X_train = None
y_train = []

for i in range(1,6):
    data_dic = unpickle("cifar-10-batches-py/data_batch_{}".format(i))
    if i == 1:
        X_train = data_dic['data']
    else:
        X_train = np.vstack((X_train, data_dic['data']))
    y_train += data_dic['labels']

test_data_dic = unpickle("cifar-10-batches-py/test_batch")
X_test = test_data_dic['data']
X_test = X_test.reshape(len(X_test),3,32,32)
y_test = np.array(test_data_dic['labels'])
X_train = X_train.reshape((len(X_train),3, 32, 32))
y_train = np.array(y_train)
X_train = X_train.astype(np.float32)
X_test = X_test.astype(np.float32)
X_train /= 255
X_test /= 255
y_train = y_train.astype(np.int32)
y_test = y_test.astype(np.int32)

畳込み層のconv1=F.Convolution2D(3, 32, 3, pad=0)は、入力が3チャンネル(RGB)、出力の特徴マップが32チャンネル、フィルタ(カーネル)サイズが3x3、パディングサイズが0ということ。

モデルと順伝播を記述する。
今回は、

入力 → 畳み込み1 → 畳み込み2 → プーリング層 → 畳み込み4 → プーリング層 → 畳み込み5 → 畳み込み6 → プーリング層 → 全結合層 → 出力

という畳込みニューラルネットワークとなっている。

model = chainer.FunctionSet(conv1=F.Convolution2D(3, 32, 3, pad=1),
                            conv2=F.Convolution2D(32, 32, 3, pad=1),
                            conv3=F.Convolution2D(32, 32, 3, pad=1),
                            conv4=F.Convolution2D(32, 32, 3, pad=1),
                            conv5=F.Convolution2D(32, 32, 3, pad=1),
                            conv6=F.Convolution2D(32, 32, 3, pad=1),
                            l1=F.Linear(512, 512),
                            l2=F.Linear(512, 10))


def forward(x_data, y_data, train=True):
    x, t = chainer.Variable(x_data), chainer.Variable(y_data)
    h = F.relu(model.conv1(x))
    h = F.max_pooling_2d(F.relu(model.conv2(h)), 2)
    h = F.relu(model.conv3(h))
    h = F.max_pooling_2d(F.relu(model.conv4(h)), 2)
    h = F.relu(model.conv5(h))
    h = F.max_pooling_2d(F.relu(model.conv6(h)), 2)
    h = F.dropout(F.relu(model.l1(h)), train=train)
    y = model.l2(h)
    
    return F.softmax_cross_entropy(y, t), F.accuracy(y, t)

みんな使ってるAdamを使う。

optimizer = optimizers.Adam()
optimizer.setup(model)

あとは訓練していくだけ!
エポック数40にしました。

for epoch in range(40):
    print("epoch", epoch+1)
    
    perm = np.random.permutation(N)
    sum_accuracy = 0
    sum_loss = 0
    
    for i in tqdm(range(0, N, batch_size)):
        X_batch = xp.asarray(X_train[perm[i:i+batch_size]])
        y_batch = xp.asarray(y_train[perm[i:i+batch_size]])
        
        optimizer.zero_grads()
        loss, acc = forward(X_batch, y_batch)
        loss.backward()
        optimizer.update()

        train_loss.append(loss.data)
        train_acc.append(acc.data)
        sum_loss     += float(loss.data) * batch_size
        sum_accuracy += float(acc.data) * batch_size

    print("train mean loss={}, accuracy={}".format(sum_loss/N, sum_accuracy/N))

    sum_accuracy = 0
    sum_loss = 0
    for i in tqdm(range(0, N_test, batch_size)):
        X_batch = xp.asarray(X_train[perm[i:i+batch_size]])
        y_batch = xp.asarray(y_train[perm[i:i+batch_size]])
        
        loss, acc = forward(X_batch, y_batch)
       
        test_loss.append(loss.data)
        test_acc.append(acc.data)
        sum_loss     += float(loss.data) * batch_size
        sum_accuracy += float(acc.data) * batch_size
        
   
    print("test mean loss={}, accuracy={}".format(sum_loss/N_test, sum_accuracy/N_test))

結果

f:id:tsuruchan_0827:20160304155959p:plain
最高正答率:93.86%
結構すごいと思います。
あと改善するとしたら、畳込み層のフィルタ数・活性化関数の種類・前処理ですね。
Classification datasets results
このサイトによると最高の正解率は96.53%とのことですので、もう少し頑張ってみたいと思います。
前処理ほぼ何もしてないのでもう3%なら上がる気がします。
そのまえにプログラムがちゃんと合ってるのか不安だけど...(何たってchainer歴2日だから笑)

ここまでこれた方なら以下の本が理解できると思うので是非読んでみてください!

全体のコード

import sys
import pickle
import time
import argparse
import numpy as np
from tqdm import tqdm
import chainer
from chainer import cuda, Function, gradient_check, Variable, optimizers, serializers, utils
from chainer import Link, Chain, ChainList, FunctionSet
import chainer.functions as F
import chainer.links as L
import matplotlib.pyplot as plt


parser = argparse.ArgumentParser(description='Chainer example: CIFAR-10')
parser.add_argument('--gpu', '-gpu', default=-1, type=int,
                    help='GPU ID (negative value indicates CPU)')


# GPUが使えるか確認
args = parser.parse_args()
if args.gpu >= 0:
    cuda.check_cuda_available()
xp = cuda.cupy if args.gpu >= 0 else np

def unpickle(file):
    fp = open(file, 'rb')
    if sys.version_info.major == 2:
        data = pickle.load(fp)
    elif sys.version_info.major == 3:
        data = pickle.load(fp, encoding='latin-1')
    fp.close()

    return data

X_train = None
y_train = []

for i in range(1,6):
    data_dic = unpickle("cifar-10-batches-py/data_batch_{}".format(i))
    if i == 1:
        X_train = data_dic['data']
    else:
        X_train = np.vstack((X_train, data_dic['data']))
    y_train += data_dic['labels']


test_data_dic = unpickle("cifar-10-batches-py/test_batch")
X_test = test_data_dic['data']
X_test = X_test.reshape(len(X_test),3,32,32)
y_test = np.array(test_data_dic['labels'])
X_train = X_train.reshape((len(X_train),3, 32, 32))
y_train = np.array(y_train)
X_train = X_train.astype(np.float32)
X_test = X_test.astype(np.float32)
X_train /= 255
X_test /= 255
y_train = y_train.astype(np.int32)
y_test = y_test.astype(np.int32)

# 畳み込み6層
model = chainer.FunctionSet(conv1=F.Convolution2D(3, 32, 3, pad=1),
                            conv2=F.Convolution2D(32, 32, 3, pad=1),
                            conv3=F.Convolution2D(32, 32, 3, pad=1),
                            conv4=F.Convolution2D(32, 32, 3, pad=1),
                            conv5=F.Convolution2D(32, 32, 3, pad=1),
                            conv6=F.Convolution2D(32, 32, 3, pad=1),
                            l1=F.Linear(512, 512),
                            l2=F.Linear(512, 10))

# GPU使用のときはGPUにモデルを転送
if args.gpu >= 0:
    cuda.get_device(args.gpu).use()
    model.to_gpu()

def forward(x_data, y_data, train=True):
    x, t = chainer.Variable(x_data), chainer.Variable(y_data)
    h = F.relu(model.conv1(x))
    h = F.max_pooling_2d(F.relu(model.conv2(h)), 2)
    h = F.relu(model.conv3(h))
    h = F.max_pooling_2d(F.relu(model.conv4(h)), 2)
    h = F.relu(model.conv5(h))
    h = F.max_pooling_2d(F.relu(model.conv6(h)), 2)
    h = F.dropout(F.relu(model.l1(h)), train=train)
    y = model.l2(h)
    
    return F.softmax_cross_entropy(y, t), F.accuracy(y, t)


optimizer = optimizers.Adam()
optimizer.setup(model)

train_loss = []
train_acc  = []
test_loss = []
test_acc  = []
N = 50000
N_test = 10000
batch_size = 100

start_time = time.clock()
for epoch in range(40):
    print("epoch", epoch+1)
    
    perm = np.random.permutation(N)
    sum_accuracy = 0
    sum_loss = 0
    
    for i in tqdm(range(0, N, batch_size)):
        X_batch = xp.asarray(X_train[perm[i:i+batch_size]])
        y_batch = xp.asarray(y_train[perm[i:i+batch_size]])
        
        optimizer.zero_grads()
        loss, acc = forward(X_batch, y_batch)
        loss.backward()
        optimizer.update()

        train_loss.append(loss.data)
        train_acc.append(acc.data)
        sum_loss     += float(loss.data) * batch_size
        sum_accuracy += float(acc.data) * batch_size

    print("train mean loss={}, accuracy={}".format(sum_loss/N, sum_accuracy/N))

    sum_accuracy = 0
    sum_loss = 0
    for i in tqdm(range(0, N_test, batch_size)):
        X_batch = xp.asarray(X_train[perm[i:i+batch_size]])
        y_batch = xp.asarray(y_train[perm[i:i+batch_size]])
        
        loss, acc = forward(X_batch, y_batch)
       
        test_loss.append(loss.data)
        test_acc.append(acc.data)
        sum_loss     += float(loss.data) * batch_size
        sum_accuracy += float(acc.data) * batch_size
        
   
    print("test mean loss={}, accuracy={}".format(sum_loss/N_test, sum_accuracy/N_test))

end_time = time.clock()
print("total time : ", end_time - start_time)