Vengineerの妄想(準備期間)

人生は短いけど、長いです。人生を楽しみましょう!

Distributed TensorFlowは、GRPCを使っている


最近、Googleが公開しているいろいろなインフラに興味を持ち調べています。
今調べているのは、GRPCGoogleのProtocol Buffersを利用したRPCです。


こんなサンプルTake a REST with HTTP/2, Protobufs, and Swaggerも見つけた。


GRPCのTutorialをコンパイル、実行してどんな感じかは分かったのだが、
より具体的にどのようなものに使われているかを知りたかった。

そしたら、Distributed TensorFlowでは、GRPCが使われているということに。
TensorFlowおよびDistributed TensorFlowについては、
何といっても中の人が解説しているDistributed TensorFlowの話が詳しいですね。


ソースコードを眺めていたのですが、全体像が分からなくて困りましたが、(qiita-image-store.s3.amazonaws.com/0/38290/36d49fff-e82a-0320-8a9a-8cf67fd377f6.png)の図を見て、あー。
そんな感じなのかと。
(この図のURLからQiitaはAmazon AWSで動いている?)

左の図にあるように、
クライアントがマスターに対して、セッションを投げるのですね。
マスターはワーカーを起動する。
そのワーカーの中で複数のGPUを利用するというもの。
右の図では、ワーカーが複数有り、
各ワーカーが複数のGPUを利用する分散システム。

で、Distributed TensorFlowのソースコードのprotobufの部分を覗いてみたら、
この中のtensorflow_server.protoがTensorFlowクラスタクラスタ内のサーバーで使うプロトコルのようですね。
引用
// EXAMPLES
// --------
//
// 1. A single-process cluster, containing "/job:local/task:0".
//
//    Cluster:
//      job { name: 'local' tasks { key: 0 value: 'localhost:2222' } }
//
//    Server:
//      cluster { $CLUSTER } job_name: 'local' task_index: 0
//
// 2. A two-process cluster, containing "/job:local/task:{0,1}".
//
//    Cluster:
//      job { name: 'local' tasks { key: 0 value: 'localhost:2222' }
//                          tasks { key: 1 value: 'localhost:2223' } }
//
//    Servers:
//      cluster { $CLUSTER } job_name: 'local' task_index: 0
//      cluster { $CLUSTER } job_name: 'local' task_index: 1
//
// 3. A two-job cluster, containing "/job:worker/task:{0,1,2}" and
//    "/job:ps/task:{0,1}".
//
//    Cluster:
//      job { name: 'worker' tasks { key: 0 value: 'worker1:2222' }
//                           tasks { key: 1 value: 'worker2:2222' }
//                           tasks { key: 2 value: 'worker3:2222' } }
//      job { name: 'ps'     tasks { key: 0 value: 'ps0:2222' }
//                           tasks { key: 1 value: 'ps1:2222' } }
//
//    Servers:
//      cluster { $CLUSTER } job_name: 'worker' task_index: 0
//      cluster { $CLUSTER } job_name: 'worker' task_index: 1
//      cluster { $CLUSTER } job_name: 'worker' task_index: 2
//      cluster { $CLUSTER } job_name: 'ps'     task_index: 0
//      cluster { $CLUSTER } job_name: 'ps'     task_index: 1

これを見ると、local, worker, psの3つがある模様。
ここに説明があった
引用
Job
A job comprises a list of "tasks", which typically serve a common purpose. For example, a job named `ps` (for "parameter server") typically hosts nodes that store and update variables; while a job named `worker` typically hosts stateless nodes that perform compute-intensive tasks. The tasks in a job typically run on different machines.

Distributed TensorFlowを試してみるも参考になりました。

今日は、もう少し深く探っていきます。

Session managementにセッションのコンフィグレーション・オプションには、ConfigProtoを使っていると。
引用
sess = tf.Session(config=tf.ConfigProto(log_device_placement=True))
のように、ConfigProtoの引数log_device_placement=Trueとすると、ログが有効になる。

Worker Serviceでは、ローカルデバイスでTensorFlowを実行するときのサービスの定義。
Master Serviceはdistributed TensorFlowを実行するときのサービスの定義。

Using GPUsによると、GPUが付いていると優先的にGPUを使う。
明示的にCPUを使いたいときは、
引用
# Creates a graph.
with tf.device('/cpu:0'):
  a = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[2, 3], name='a')
  b = tf.constant([1.0, 2.0, 3.0, 4.0, 5.0, 6.0], shape=[3, 2], name='b')
  c = tf.matmul(a, b)
のように、deviceで指定する。

おっと、GRPCではなく、protobufを調べていた。
GRPCはここだった。

core/distributed_runtime/rpc/grpc_tensorflow_server.cc
  => NewServer (core/distributed_runtime/rpc/grpc_server_lib.cc)
    => TensorFlowrServer (core/distributed_runtime/rpc/grpc_server_lib.cc)
      => ->Start

        => NewGrpcMasterService関数  (rpc/grpc_master_service.cc)
          => GrpcMasterServiceクラス   (rpc/grpc_master_service.cc)
            => 別スレッドで、->HandleRPCsLoop関数 (rpc/grpc_master_service.cc)
              => (core/protobuf/master.proto)
                 ENQUEUE_REQUEST(CreateSession);  => CreateSessionHandler
                 ENQUEUE_REQUEST(ExtendSession);  => ExtendSession
                 for (int i = 0; i < 100; ++i) {
                   ENQUEUE_REQUEST(RunStep);      => RunStepSession
                 }
                 ENQUEUE_REQUEST(CloseSession);   => CloseSession
                 ENQUEUE_REQUEST(ListDevices);    => ListDevicesSession
                 ENQUEUE_REQUEST(Reset);          => ResetSession

        => NewGrpcWorkerService関数  (rpc/grpc_worker_service.cc)
          => GrpcWorkerServiceクラス   (rpc/grpc_worker_service.cc)
            => 別スレッドで、->HandleRPCsLoop関数 (rpc/grpc_master_service.cc)
              => (core/protobuf/worker.proto)
                 ENQUEUE_REQUEST(GetStatus);       => GetStatusHandler       
                 ENQUEUE_REQUEST(CleanupAll);      => CleanupAllHandler 
                 ENQUEUE_REQUEST(RegisterGraph);   => RegisterHandler 
                 ENQUEUE_REQUEST(DeregisterGraph); => DeregisterGraphHandler 
                 ENQUEUE_REQUEST(RecvTensor);      => RecvTensorHandler 
                 ENQUEUE_REQUEST(RunGraph);        => RunGraphHandler 
                   => GraphMgr::ExecuteAsync (core/distributed_runtime/graph_mgr.cc)
                     => RunASync (core/common_runtime/executor.c)
                 ENQUEUE_REQUEST(CleanupGraph);    => CleanupGraghHandler 
                 ENQUEUE_REQUEST(Logging);         => LoggingHandler 
                 ENQUEUE_REQUEST(Tracing);         => TracingHandler