かとじゅん:まだ、あまり具体的なことは話せないのですが、ChatWork社での役割は次のプロダクトの開発で、現行の開発には関わっていないです。言うまでもなくScalaは使っていますが、そこそこ大きいサービスになってきたこともあって問題解決の手法も変わってきました。なので、メッセージ基盤として、ストレージはHBaseを使いながら、アプリケーションはAkkaのActorベースでやっています。前職ではFinagleを利用していたのですが、他と違ってAkkaの良いところはErlang/OTP由来のSupervisionがあるところです。Actorヒエラルキーの下位層のActorが障害を起こしても、上の監督者としてのActor(Supervisor)がそれらを管理することでリカバリもしやすくなる利点があります。耐障害性という面では魅力的な機構だと思っています。
-- レジリエンスのような話でしょうか。
かとじゅん: レジリエンスは回復力とか治癒力などという意味ですね。先ほども簡単に説明したように、下位層のActorで起きた障害はSuprvisorに通知され、必要に応じて再起動・停止・さらに親のSupervisorにエスカレートすることができます。例えば、子アクターでネットワークやディスクなどのIO例外が発生した場合は、起きた例外をSupervisorに通知し、Supervisorが子アクターに再起動を命じます。そうすると子アクターは破棄され、再び起動(リトライ)できるようになります。これは、障害が起きた部分を正常な部分から隔離するという可用性に優れた考え方で、let-it-crashとも呼ばれています。
無論、要件に応じて、無限にリトライしないようにもできますし、BackoffSupervisorを利用することで指数関数的にリトライまでの待ち時間を調整し、タイトなリトライループを回避して、過度な負荷がDBやネットワークなどにかからないようにすることもできます。
蛇足ですが、この際、アクターのインスタンスは入れ替わりますが、その参照であるActorRefは同じ値を示します。これはなかなかすごいことですが、プログラミング言語の標準機能で同様の実装をするのは骨が折れますが、Actor Systemの恩恵にあずかることで僕らエンジニアは本題に集中できるようになります。
今は実戦でやりきるだけのノウハウを溜めているところで、自らフィードバックできれば良いかなと思っています。話が少しそれましたが、僕の役割は次の事業の基盤になるような開発をすることです。
-- 普段は開発もこなしつつまとめ役のようなポジションでお仕事されているのですか。
かとじゅん:マネージャーやリーダーなど、チームをまとめる立場の人は別にいます。基本的にはチームで作業していて、その中でも僕は毎日Akkaと戯れているという感じです(笑)日々、Actorヒエラルキーをどう設計するかといったような会話をチームメンバーとしています。
ここから話が少し変わってしまいますが、個人的にもAkka Streamsが好きなので、個人活動としてRedisクライアントを題材に「車輪の再発明」をしてます。
Akka Streamsを知らない人に簡単にまず前提をお話すると、Akka Streamsではストリームの上流でデータを提供するSourceと、下流でデータを消費するSinkというAPIが提供されています。例えば、Sourceは、Source#singleで単一の値を取ったり、Source#applyでコレクションを取ったり、Source#fromFutureでFutureのインスタンスを取ったりできます。また、SinkはSink#foreachであったり、Sink#reduceなどがあります。これらを結合すると、ストリームとしての実行可能なRunnableGraphというオブジェクトが手に入ります。そして、RunnableGraph#runメソッドを実行するとストリームを実行できますが、SourceやSinkが実行中に内部で持つマテリアライズド・バリュー(以下 MVと略す)という値を取得することができます。
あまり内部がどのような仕組みで動いているか細かく説明しませんが、たとえば、数列を合計するようなストリーム処理は以下のようになります。
implicit val system = ActorSystem("myActorSystem")
implicit val actorMaterializer = ActorMaterializer()
// コレクションとして値を持つSource。MVはNotUsed。つまり使わない。
val source: Source[Int, NotUsed] = Source(1 to 10)
// ストリームデータをReduceするSink。MVはFuture[Int]
val sink: Sink[Int, Future[Int]] = Sink.reduce[Int](_ + _)
// 実行可能なグラフを生成。Keep.rightはストリームの右側にあるSinkが実行時に持つMVを取得することを意味する。
// Keep.leftはSourceのMVだがNotUsedなので取得しても意味がない。
val runnableGraph: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)
// 実行可能なグラフを実行すると指定したMVのインスタンスが取得できる。
val future: Future[Int] = runnableGraph.run()
// MVからSinkが得た計算結果を得る。
val value = Await.result(future, Duration.Inf)
println(value)
また、以下のようにActorをSourceとして利用するストリームも作れます。少し難しく見えますが、実行すると前述と同じ計算結果になります。ストリームのデータサイズが固定できない場合や、アクターをインターフェイスにしたい場合などに役に立ちます。
// グラフを構築・実行し、Source.actorRefのMVであるActorRefとSinkのMVであるFutureを取得する。
val (actorRef, future) = Source.actorRef[Int](Int.MaxValue, OverflowStrategy.fail)
.toMat(Sink.reduce[Int](_ + _))(Keep.both).run()
// SourceとしてのActorRefを使ってストリームにデータを流す
for {n <- 1 to 10} {
actorRef ! n
}
// ストリームの完了を通知する。
actorRef ! Status.Success(1)
// SinkのMVから結果を取得する。
val value = Await.result(future, Duration.Inf)
println(value)
さらに、ちょっとコードが長いですが、以下のように、このグラフをアクター内部で構築・実行することも可能です(サンプルコードなので、Supervisionは考慮していません)。 Akka StreamのTcpにはoutgoingConnectionというFlowを返すAPIがあって、これを使うと非同期・ノンブロッキングI/Oができるので、これを使ってRedisクライアントを作って遊んでいます。
ちなみに、Flowは、Source, Sinkに並ぶコンポーネントで、上流から受け取った値に何かを行って下流に流すことができます。また、FlowをSourceに結合するとSourceに、Sinkに結合するとSinkになる特徴を持っています。なので、Source -> Flow -> Sinkのようなグラフを作ることができます。以下の例では、リクエストとしてByteStringを流すとレスポンスとしてByteStringが返ってくる Source -> Flow -> Sink なグラフを作っています。
class ClientActor(remoteAddress: InetSocketAddress) extends Actor {
// requestIdとsenderを紐づける
private def putSender(requestId: Long, sender: ActorRef): Unit = ???
// requestIdからsenderを取得する
private def getSender(requestId: Long): ActorRef = ???
// Source#actorRef
private val sourceActorRef: Source[(Long, ByteString), ActorRef] =
Source.actorRef[(Long, ByteString)](Int.MaxValue, OverflowStrategy.fail)
// RedisにTCPで接続するフロー
private val tcpFlow: Flow[ByteString, ByteString, Future[OutgoingConnection]] =
Tcp().outgoingConnection(remoteAddress)
// Sink#foreach
private val sinkForeach: Sink[(ByteString, Long), Future[Done]] =
Sink.foreach[(ByteString, Long)](responseWithRequestId => self ! responseWithRequestId)
// 内部でtcpFlowを使うが、(requestId, request) -> (response, requestId)にするFlow
private val connectionFlow: Flow[(Long, ByteString), (ByteString, Long), NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val requestFlow = b.add(Flow[(Long, ByteString)].map {
case (requestId, request) => (request.concat(ByteString("\r\n")), requestId)
})
val unzip = b.add(Unzip[ByteString, Long]())
val zip = b.add(Zip[ByteString, Long]())
requestFlow.out ~> unzip.in
unzip.out0 ~> tcpFlow ~> zip.in0 // request -> response
unzip.out1 ~> zip.in1 // requestId をそのまま引き継ぐ
FlowShape(requestFlow.in, zip.out)
})
// ストリームの構築・実行
private val internalClientRef: ActorRef = sourceActorRef
.via(connectionFlow)
.toMat(sinkForeach)(Keep.left)
.run()
override def receive: Receive = {
case (requestId: Long, request: String) =>
putSender(requestId, sender())
internalClientRef ! (requestId, ByteString.fromString(request))
case (response: ByteString, requestId: Long) =>
getSender(requestId) ! (requestId, response.toString())
}
}
-- その素振りは面白いですね。
これは、Akka Streamsの勉強をするために作っているので、実用では既に存在するOSSなどを使えば良いと思います。それなりに使い込まないと自分で理解できないので、大事かなと思ってひたすらAkka StreamsとActorを使いこんでいます。あとは、最近あまり参加できていないのですが、、、Akka in Actionの読書会を開いてもらったり、Reactive Messaging Patterns with the Actor Modelsの読書会に参加したりしています。Akka StreamsのActorMaterializer(RunnableGraphをActor上で実行可能にするオブジェクト)の詳細な説明や、コードを読むときの糸口になるようなヒントが書いてあるので、見ておくとコードも書きやすくなるかなと思います。
というか、ほとんど仕事の話をしていませんね(笑)。詳しくはまた時期をみてということで。ChatWork社では常に新しいことをやっています、って感じです。
-- ズバリ、楽しいですか。
かとじゅん:これで楽しくないと言ったら怒られてしまいますね(笑)実際楽しくやらせてもらっています。Scala化するという宣言をしてから暫くリリース物が出ていませんが、結果を出すまでは絶対にやらないといけないので最後までやり通したいなと思います。