如上图,PID 为28329进程运行程序时使用的 GPU 默认占满所有可用的显卡 GPU 和 G_MEM,但只在第一个 GPU 上进行计算。虽然调用了八块显卡资源,并且占用了所有 GPU 剩下的显存,但却只有一块真正在工作,大大浪费了资源,利用率低下。
解决方法很简单,TensorFlow 有一些参数可以简单控制显卡调用数量,可以通过在运行代码前添加参数,如:
CUDA_VISIBLE_DEVIES=0,4 python3 main.py 或者在脚本里头添加代码 import os os.environ('CUDA_VISIBLE_DEVICES')='0,1' |
调用第 1 块和第 5 块显卡的资源进行运算。
但是!指定调用了显卡的 GPU 还是会完全占用指定显卡的所有显存。如上图第二号显卡,已经有别的人使用了 1329 MB,但 28329 进程很霸道的把显卡剩下的 10625 MB显存全部划走,明明都用不上!
为了不占用显卡所有的显存,还可以在创建 sess 的时候,添加参数约束调用显卡的显存量,如:
config = tf.ConfigProto(allow_soft_placement=True) # 允许根据运行时需要分配GPU内存 config.gpu_options.allow_grouth = True # 占用90%显存(优先级比上面大) config.gpu_options.per_process_gpu_memory_fraction = 0.9 sess = tf.Session(config=config) |
可是!实际上这只是解决了单 GPU 调用问题,用了合理的资源去运行我的程序而已。如果我需要用多个 GPU 去加速程序;或者运行程序需要多个显卡的 G_MEM 才能满足的时候,为了让程序可以顺利的调用多个 GPU,就需要从代码层面解决。
与 TensorFlow 的多 GPU 调用问题相比,Caffe 的多 GPU 调用问题就很简单了。Caffe 的单 GPU 切换到多 GPU 根本不需要修改任何代码,只需要在编译选项中吧 USE_NCCL 打开就可以了,剩下的都是 Caffe 自动完成。
但 TensorFlow 不一样,需要对代码有一定程度的了解,并且使用 tf.device(“/gpu:0”) 之类的代码包起来,还要注意各种变量存放的地方,各种操心,如变量放哪,怎么平均梯度,这需要对训练的过程代码非常熟悉。
- “/cpu:0” :机器的 CPU
- “/device:GPU:0” :机器的 GPU 如果只有一个
- “/device:GPU:1” :机器的第二个 GPU
TensorFlow 自己也有一些多 GPU 的示例,tutorials/image/cifar10/cifar10_multi_gpu_train.py,但也仅仅是一个 DEMO 而已,没有解释含义,换成自己的模型训练算法代码就不一定能跑起来了。
利用 CUDA_VISIBLE_DEVICES 之类的方法确实可以解决一定程度的问题,但精细下来,还是会有不少的性能缺失。例如多个用户公用几块卡的时候,每个人分配不同的卡来做实验,或者每张卡上运行不同参数设置的程序(!!!),来获得线性加速比,以便在更短的时间内得到计算结果。这个时候,上面都方法就无能为力了,必须从代码层面解决了,就算可以用工程能力设置每个用户的调用量之类的方式约束,也只是会得到用户运行时因为各种资源不足而使得程序报错。(当然,貌似用集群的方法可以治一下。)
一个 MNIST 的例子:
import tensorflow as tf from tensorflow.contrib import slim import numpy as np from tensorflow.examples.tutorials.mnist import input_data mnist=input_data.read_data_sets("/tmp/mnist/",one_hot=True) # MNIST数据集存放路径 num_gpus=2 num_steps=200 learning_rate=0.001 batch_size=1024 display_step=10 num_input=784 num_classes=10 def conv_net(x,is_training): # "updates_collections": None is very import ,without will only get 0.10 batch_norm_params = {"is_training": is_training, "decay": 0.9, "updates_collections": None} #,'variables_collections': [ tf.GraphKeys.TRAINABLE_VARIABLES ] with slim.arg_scope([slim.conv2d, slim.fully_connected], activation_fn=tf.nn.relu, weights_initializer=tf.truncated_normal_initializer(mean=0.0, stddev=0.01), weights_regularizer=slim.l2_regularizer(0.0005), normalizer_fn=slim.batch_norm, normalizer_params=batch_norm_params): with tf.variable_scope("ConvNet",reuse=tf.AUTO_REUSE): x = tf.reshape(x, [-1, 28, 28, 1]) net = slim.conv2d(x, 6, [5,5], scope="conv_1") net = slim.max_pool2d(net, [2, 2],scope="pool_1") net = slim.conv2d(net, 12, [5,5], scope="conv_2") net = slim.max_pool2d(net, [2, 2], scope="pool_2") net = slim.flatten(net, scope="flatten") net = slim.fully_connected(net, 100, scope="fc") net = slim.dropout(net,is_training=is_training) net = slim.fully_connected(net, num_classes, scope="prob", activation_fn=None,normalizer_fn=None) return net def train_single(): X = tf.placeholder(tf.float32, [None, num_input]) Y = tf.placeholder(tf.float32, [None, num_classes]) logits=conv_net(X,True) loss=tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=Y,logits=logits)) opt=tf.train.AdamOptimizer(learning_rate) train_op=opt.minimize(loss) logits_test=conv_net(X,False) correct_prediction = tf.equal(tf.argmax(logits_test, 1), tf.argmax(Y, 1)) accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) for step in range(1,num_steps+1): batch_x, batch_y = mnist.train.next_batch(batch_size) sess.run(train_op,feed_dict={X:batch_x,Y:batch_y}) if step%display_step==0 or step==1: loss_value,acc=sess.run([loss,accuracy],feed_dict={X:batch_x,Y:batch_y}) print("Step:" + str(step) + ":" + str(loss_value) + " " + str(acc)) print("Done") print("Testing Accuracy:",np.mean([sess.run(accuracy, feed_dict={X: mnist.test.images[i:i + batch_size], Y: mnist.test.labels[i:i + batch_size]}) for i in range(0, len(mnist.test.images), batch_size)])) if __name__ == "__main__": train_single() |
以上面的 MNIST 为例子,对其进行多 GPU 化改造。
多 GPU 并行可分为模型并行和数据并行两大类,我们经常用到的是数据并行的方式,而数据并行又可分为同步方式和异步方式,由于一般都会配置同样的显卡,因此这里用同步的方式,即将数据分给不同的卡,等所有的 GPU 都计算完梯度后进行平均,最后再更新梯度。
数据并行原理:模型参数保存在一个指定 GPU/CPU 上,模型参数的副本在不同 GPU 上,每次训练,提供 batch_size * gpu_num 数据,并等量拆分成多个 batch,分别送入不同的 GPU。
首先要改造的是数据读取部分,由于用到多块卡,每张卡要分到不同的数据,所以在获取 batch 的时候要把大小改为:
batch_x, batch_y = mnist.train.next_batch(batch_size * num_gpus) |
一次性取足够的数据保证每块卡都分到 batch_size 大小的数据。然后,对取到的数据进行切分。
_x = X[i*batch_size:(i+1)*batch_size] _y = Y[i*batch_size:(i+1)*batch_size] |
为了防止名字混乱,最好结合 name_scope 参数区分:
for i in range(2): with tf.device("/gpu:%d"%i): with tf.name_scope("tower_%d"%i): _x = X[i*batch_size:(i+1)*batch_size] _y = Y[i*batch_size:(i+1)*batch_size] logits = conv_net(_x,dropout,reuse_vars,True) |
import time import numpy as np import tensorflow as tf from tensorflow.contrib import slim from tensorflow.examples.tutorials.mnist import input_data mnist = input_data.read_data_sets("mnist/", one_hot=True) def get_available_gpus(): """ code from http://stackoverflow.com/questions/38559755/how-to-get-current-available-gpus-in-tensorflow """ from tensorflow.python.client import device_lib as _device_lib local_device_protos = _device_lib.list_local_devices() return [x.name for x in local_device_protos if x.device_type == 'GPU'] num_gpus = len(get_available_gpus()) print("Available GPU Number :"+str(num_gpus)) num_steps = 1000 learning_rate = 0.001 batch_size = 1000 display_step = 10 num_input = 784 num_classes = 10 def conv_net_with_layers(x,is_training,dropout = 0.75): with tf.variable_scope("ConvNet", reuse=tf.AUTO_REUSE): x = tf.reshape(x, [-1, 28, 28, 1]) x = tf.layers.conv2d(x, 12, 5, activation=tf.nn.relu) x = tf.layers.max_pooling2d(x, 2, 2) x = tf.layers.conv2d(x, 24, 3, activation=tf.nn.relu) x = tf.layers.max_pooling2d(x, 2, 2) x = tf.layers.flatten(x) x = tf.layers.dense(x, 100) x = tf.layers.dropout(x, rate=dropout, training=is_training) out = tf.layers.dense(x, 10) out = tf.nn.softmax(out) if not is_training else out return out def conv_net(x,is_training): # "updates_collections": None is very import ,without will only get 0.10 batch_norm_params = {"is_training": is_training, "decay": 0.9, "updates_collections": None} #,'variables_collections': [ tf.GraphKeys.TRAINABLE_VARIABLES ] with slim.arg_scope([slim.conv2d, slim.fully_connected], activation_fn=tf.nn.relu, weights_initializer=tf.truncated_normal_initializer(mean=0.0, stddev=0.01), weights_regularizer=slim.l2_regularizer(0.0005), normalizer_fn=slim.batch_norm, normalizer_params=batch_norm_params): with tf.variable_scope("ConvNet",reuse=tf.AUTO_REUSE): x = tf.reshape(x, [-1, 28, 28, 1]) net = slim.conv2d(x, 6, [5,5], scope="conv_1") net = slim.max_pool2d(net, [2, 2],scope="pool_1") net = slim.conv2d(net, 12, [5,5], scope="conv_2") net = slim.max_pool2d(net, [2, 2], scope="pool_2") net = slim.flatten(net, scope="flatten") net = slim.fully_connected(net, 100, scope="fc") net = slim.dropout(net,is_training=is_training) net = slim.fully_connected(net, num_classes, scope="prob", activation_fn=None,normalizer_fn=None) return net def average_gradients(tower_grads): average_grads = [] for grad_and_vars in zip(*tower_grads): grads = [] for g, _ in grad_and_vars: expend_g = tf.expand_dims(g, 0) grads.append(expend_g) grad = tf.concat(grads, 0) grad = tf.reduce_mean(grad, 0) v = grad_and_vars[0][1] grad_and_var = (grad, v) average_grads.append(grad_and_var) return average_grads PS_OPS = ['Variable', 'VariableV2', 'AutoReloadVariable'] def assign_to_device(device, ps_device='/cpu:0'): def _assign(op): node_def = op if isinstance(op, tf.NodeDef) else op.node_def if node_def.op in PS_OPS: return "/" + ps_device else: return device return _assign def train(): with tf.device("/cpu:0"): global_step=tf.train.get_or_create_global_step() tower_grads = [] X = tf.placeholder(tf.float32, [None, num_input]) Y = tf.placeholder(tf.float32, [None, num_classes]) opt = tf.train.AdamOptimizer(learning_rate) with tf.variable_scope(tf.get_variable_scope()): for i in range(num_gpus): with tf.device(assign_to_device('/gpu:{}'.format(i), ps_device='/cpu:0')): _x = X[i * batch_size:(i + 1) * batch_size] _y = Y[i * batch_size:(i + 1) * batch_size] logits = conv_net(_x, True) tf.get_variable_scope().reuse_variables() loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=_y, logits=logits)) grads = opt.compute_gradients(loss) tower_grads.append(grads) if i == 0: logits_test = conv_net(_x, False) correct_prediction = tf.equal(tf.argmax(logits_test, 1), tf.argmax(_y, 1)) accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) grads = average_gradients(tower_grads) train_op = opt.apply_gradients(grads) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) for step in range(1, num_steps + 1): batch_x, batch_y = mnist.train.next_batch(batch_size * num_gpus) ts = time.time() sess.run(train_op, feed_dict={X: batch_x, Y: batch_y}) te = time.time() - ts if step % 10 == 0 or step == 1: loss_value, acc = sess.run([loss, accuracy], feed_dict={X: batch_x, Y: batch_y}) print("Step:" + str(step) + ":" + str(loss_value) + " " + str(acc)+", %i Examples/sec" % int(len(batch_x)/te)) print("Done") print("Testing Accuracy:", np.mean([sess.run(accuracy, feed_dict={X: mnist.test.images[i:i + batch_size], Y: mnist.test.labels[i:i + batch_size]}) for i in range(0, len(mnist.test.images), batch_size)])) def train_single(): X = tf.placeholder(tf.float32, [None, num_input]) Y = tf.placeholder(tf.float32, [None, num_classes]) logits=conv_net(X,True) loss=tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(labels=Y,logits=logits)) opt=tf.train.AdamOptimizer(learning_rate) train_op=opt.minimize(loss) logits_test=conv_net(X,False) correct_prediction = tf.equal(tf.argmax(logits_test, 1), tf.argmax(Y, 1)) accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32)) with tf.Session() as sess: sess.run(tf.global_variables_initializer()) for step in range(1,num_steps+1): batch_x, batch_y = mnist.train.next_batch(batch_size) sess.run(train_op,feed_dict={X:batch_x,Y:batch_y}) if step%display_step==0 or step==1: loss_value,acc=sess.run([loss,accuracy],feed_dict={X:batch_x,Y:batch_y}) print("Step:" + str(step) + ":" + str(loss_value) + " " + str(acc)) print("Done") print("Testing Accuracy:",np.mean([sess.run(accuracy, feed_dict={X: mnist.test.images[i:i + batch_size], Y: mnist.test.labels[i:i + batch_size]}) for i in range(0, len(mnist.test.images), batch_size)])) if __name__ == "__main__": #train_single() train() |
CUDA_VISIBLE_DEVICES=0,1 python3 train.py |
调用多 GPU 跑的程序竟然还比调用单 GPU 的时候跑的程序用的时间还要长!!!