NettyでWebSocketサーバーを実装する

December 29, 2010 - Scala

この記事は Scala Advent Calendar JP 2010 23 日目(12/29)です。

前日の @cooldaemon さんがScala Actor + NIOという、ものすごい記事を書いていらっしゃったのでこの流れの中で投稿するのが忍びないくらいです。
#まさか前日にNIOネタがくるとはぁぁぁ…

さて、今回はNettyを使ってWebSocketサーバーを実装してみました。
JavaではなくScalaで、です。
とはいっても、目的はJavaコードをScalaに直す練習も兼ねて、公式サンプルにあるJavaコードをScalaに書きなおしただけですが。
ご指摘などありましたら謹んでお受けいたします…

Netty

Nettyの概要はこのへんで。

開発環境

  • Scala-2.8.0
  • Netty-3.2.3.Final

Echoサーバー

サンプル的にEchoサーバーから書いてみます。

やること

  1. Echoハンドラを実装
  2. NioChannelFactoryにスレッドプールを登録
  3. BootstrapにNioChannelFactoryを登録
  4. PipelineFactoryにEchoハンドラを登録
  5. BootstrapにPipelineFactoryを登録
  6. InetSocketAddressに(ホストと)ポート番号を指定してBootstrapに登録してサービス開始

EchoServer.scala
まずは起動元から。

object EchoServer {
  @throws(classOf[Exception])
  def main(args:Array[String]) {
    // サーバーのセットアップ
    val bootstrap = new ServerBootstrap(
      new NioServerSocketChannelFactory(
        Executors.newCachedThreadPool(), // bossExecutor
        Executors.newCachedThreadPool()  // workerExecutor
      ))

    bootstrap.setPipelineFactory(
      // リクエストをそのまま返すハンドラを実装して登録
      new ChannelPipelineFactory() {
        def getPipeline() = Channels.pipeline(new EchoServerHandler())
      }
    )

    // 8080番で待ち受け開始
    bootstrap.bind(new InetSocketAddress(8080))
  }
}

で、ハンドラ側がこんなかんじ。
SimpleChannelUpstreamHandlerの

  • messageReceived(ctx:ChannelHandlerContext, e:MessageEvent)
  • def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent)

をオーバーライドして処理内容を定義すればOK。
何かを呼び出し元に返すには、"e.getChannel().write( {レスポンス} )"で。

class EchoServerHandler extends SimpleChannelUpstreamHandler {
  val logger = java.util.logging.Logger.getLogger("EchoServerHandler")
  val transferredBytes = new AtomicLong()

  // そのまま返す
  override def messageReceived(ctx:ChannelHandlerContext, e:MessageEvent) {
    transferredBytes.addAndGet(
      e.getMessage().asInstanceOf[ChannelBuffer].readableBytes())

    println("echo_server: message received: " + e.getMessage())
    // レスポンスを返す
    e.getChannel().write(e.getMessage())
  }

  // 例外発生時はここにくる
  override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
    logger.log(Level.WARNING,
               "Unexpected exception from downstream.",
               e.getCause())

    e.getChannel().close()
  }
}

WebSocketサーバー

本題のWebSocketサーバーです。
Echoサーバーの応用で、WebSocket用のハンドラを作成して、Bootstrapに登録すれば良い訳です。

仕様

  • WebSocketServer: 起動元
  • WebSocketServerHandler: プロトコルハンドラ。肝の部分。
  • WebSocketServerPipelineFactory: パイプライン生成。
  • WebSocketServeIndexPage: WebSocketクライアント用HTMLの生成

実装

すべて書くと冗長なので要所だけ抜き出しました。
全ソースはGithubを見てください。

WebSocketServerIndexPage.scala
まずはクライアント側はこんな感じで、インプットフォームに入力した文字列をWebSocketサーバーに投げ、結果をdivに追記するだけです。

      var socket;
      if (window.WebSocket) {
        socket = new WebSocket( 'ws://localhost:8080/uppercase' );
        socket.onmessage = function(event) { log(event.data) }
        socket.onopen = function(event) { log('web socket opened') }
        socket.onclose = function(event) { log('web socket closed') }
      } else {
        alert('your browser does not supported web socket')
      }
      function log(message) {
        p = document.createElement('p');
        p.innerHTML = message
        document.getElementById('log').appendChild(p) ;
      }
      function send(message) {
        if (!window.WebSocket) { return; }
        if (socket.readyState == WebSocket.OPEN) {
          socket.send(message)
        } else {
          alert('the socket is not open.')
        }
      }

WebSocketServer.scala
サーバー起動オブジェクトです。
HTML/WebSocketを扱うハンドラを登録してポートで待ち受けます。

    // WebSocket用ハンドラを含むPipelineFactoryを登録
    bootstrap.setPipelineFactory(new WebSocketServerPipelineFactory())

    // 8080番で待ち受け開始
    bootstrap.bind(new InetSocketAddress("localhost", 8080))

WebSocketServerHandler.scala
実装するのは、Echoサーバーと同じく、messageReceived()とexceptionCaught()の2つ。
処理を分割しているのですこしだけメソッド多めです。
messageRecieved()で受信内容によって実際のハンドラをHttp/WebSocketのどちらかに切り替えます。

class WebSocketServerHandler extends SimpleChannelUpstreamHandler {
  val WEBSOCKET_PATH = "/uppercase"

  /**
   * メッセージ受信時の処理
   *   メッセージの内容によってWebSocketかHttpかハンドラを切り替える
   */
  @throws(classOf[Exception])
  override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
    val msg:Object = e.getMessage()
    msg match {
      case frame: WebSocketFrame => {
          handleWebSocketFrame(ctx, frame)
        }
      case req: HttpRequest => {
          handleHttpRequest(ctx, req)
        }
    }
  }
  /**
   * HTTPリクエスト時の処理
   */
  @throws(classOf[Exception])
  def handleHttpRequest(ctx: ChannelHandlerContext, req: HttpRequest) {
    // DEBUG
    println("handleHttpRequest: " + Thread.currentThread().getName())

    // GETリクエスト以外は処理しない
    // "/"にきたらWebSocketクライアント用ページを送信
    // "/uppercase"にきたらリクエスト文字列を大文字に変換して返す
    if (req.getMethod() != GET) {
      sendHttpResponse(
        ctx,
        req,
        new DefaultHttpResponse(HTTP_1_1, FORBIDDEN))
    } else if (req.getUri().equalsIgnoreCase("/")) {

      // (中略) デフォルトHTMLの送信

    } else if (req.getUri().equalsIgnoreCase(WEBSOCKET_PATH) &&
               Values.UPGRADE.equalsIgnoreCase(req.getHeader(Names.CONNECTION)) &&
               Values.WEBSOCKET.equalsIgnoreCase(req.getHeader(Names.UPGRADE))) {

      // (中略) WebSocket接続処理

      // ハンドラをHTTPからWebSocketに切り替えて、
      // send the handshake response
      val p = ctx.getChannel().getPipeline()
      p.remove("aggregator")
      p.replace("decoder", "wsdecoder", new WebSocketFrameDecoder())

      ctx.getChannel().write(res)

      p.replace("encoder", "wsencoder", new WebSocketFrameEncoder())
    } else {
      sendHttpResponse(
        ctx, req, new DefaultHttpResponse(HTTP_1_1, FORBIDDEN))
    }
  }
  /**
   * WebSocketリクエスト時の処理
   */
  @throws(classOf[Exception])
  def handleWebSocketFrame(ctx: ChannelHandlerContext, frame: WebSocketFrame) {
    // 大文字に変換するして、WebSocketFrameにのせてレスポンスを返す。
    ctx.getChannel().write(
      new DefaultWebSocketFrame(
        frame.getTextData().toUpperCase))
  }
  /**
   * HTTPレスポンスの送信
   */
  @throws(classOf[Exception])
  def sendHttpResponse(ctx: ChannelHandlerContext, req: HttpRequest, res: HttpResponse) {
    // ステータスコードが200じゃなければエラーページの表示
    if (res.getStatus().getCode() != 200) {
      res.setContent(
        ChannelBuffers.copiedBuffer(
          res.getStatus().toString(), CharsetUtil.UTF_8))
      HttpHeaders.setContentLength(res, res.getContent().readableBytes())
    }

    // keep-aliveでなければ接続を閉じる
    val f = ctx.getChannel().write(res)
    if (!HttpHeaders.isKeepAlive(req) || res.getStatus().getCode() != 200) {
      f.addListener(ChannelFutureListener.CLOSE)
    }
  }
  /**
   * 例外発生時の処理
   */
  @throws(classOf[Exception])
  override def exceptionCaught(ctx: ChannelHandlerContext, e: ExceptionEvent) {
    println("server: exception caught: ")
    e.getCause().printStackTrace()
    e.getChannel().close()
  }
  /**
   * WebSocket接続情報
   */
  def getWebSocketLocation(req: HttpRequest) =
    "ws://" + req.getHeader(HttpHeaders.Names.HOST) + WEBSOCKET_PATH
}

WebSocketServerPipelineFactory.scala
パイプラインへの登録を定義します。
リクエストはDecoderを通り、ハンドラで処理され、Encoderを通って返される、という流れです。
ここでは初期状態としてHttp用の設定になっていますが、一旦WebSocket通信開始のリクエストを受け取ると、そのあとはWebSocketのDecoder/Encoderに切り替わります。

<pre class="brush:scala">
class WebSocketServerPipelineFactory extends ChannelPipelineFactory{
  @throws(classOf[Exception])
  def getPipeline(): ChannelPipeline = {
    val pipeline = Channels.pipeline()

    pipeline.addLast("decoder"    , new HttpRequestDecoder())
    pipeline.addLast("aggregator" , new HttpChunkAggregator(65536))
    pipeline.addLast("encoder"    , new HttpResponseEncoder())
    pipeline.addLast("handler"    , new WebSocketServerHandler())

    pipeline
  }
}

実際、ブラウザでひらいてみると、Webフォームに入力されている「hello, world」が大文字に変換されてフォーム下部に追記されていきます。
まぁWebSocketサーバーを実装するだけなら、jetty7を使ったほうがシンプルに早くかけると思います。
Memcacheプロトコルを話すサービスをつくる場合とかには便利ですね。(messagepack-rpcでもnetty使っているとか。)

参考:

以上、お粗末様でした。。。

# あれ?Scalaあんまり関係ない??