読者です 読者をやめる 読者になる 読者になる

in the mythosil

Rhythm & Biology.

Perlで手軽に非同期TCPサーバ

yokohama.pmに行ってきて、またperl熱が上がってきたので、久しぶりにコード書いてみた。

AnyEvent::TCPServer

AnyEvent::Socketを使って非同期サーバ書こうとすると色々面倒くさかったりするので、もっと手軽に書けるモジュールを作ってみました。こんな感じで非同期のechoサーバを書けます。

use strict;                                                                                                                        
use warnings;
use 5.010;

use AnyEvent::TCPServer;

on_connect(sub {
  say "connection accepted";
});

on_read(line => sub {
  my ($handle, $line) = @_; 
  say "got line: $line";
  $handle->push_write("$line\n");
});

on_eof(sub {
  say "connection closed";
});

on_error(sub {
  my ($handle, $fatal, $message) = @_; 
  say "error: $message";
});

server->run(
  host => undef,
  port => 1986,
);

各イベントの処理を追加していって、最後にrunでサーバを起動します。
handleをdestroyするといった処理はモジュール側でやってしまうので書く必要はありません。

今後

今だと簡単な処理しか扱えないので、on_drainなどの処理を登録できるようにするのが当面の目標です。AnyEvent::Handleのイベントは一通り網羅できたほうがいいでしょう。

ソース

小さいモジュールだし、githubにリポジトリ作るほど作り込んでないので、モジュールのソース貼っておきます。

package AnyEvent::TCPServer;                                                                                                       
use strict;
use warnings;

use AnyEvent::Socket;
use AnyEvent::Handle;

sub import {
  my $class = shift;
  my $app = $class->new;

  $SIG{PIPE} = "IGNORE";

  no strict 'refs';
  my $caller = caller;
  push @{"${caller}::ISA"}, $class;

  no warnings 'redefine';
  *{"${caller}::new"} = *{"${caller}::server"} = sub { $app };
  *{"${caller}::on_connect"} = sub (@) { $app->register_on_connect(shift) };
  *{"${caller}::on_read"}    = sub (%) { $app->register_on_read(@_) };
  *{"${caller}::on_eof"}     = sub (@) { $app->register_on_eof(shift) };
  *{"${caller}::on_error"}   = sub (@) { $app->register_on_error(shift) };

  strict->import;
  warnings->import;
}

sub new {
  my $class = shift;
  bless {
    on_connect => undef,
    on_read    => undef,
    on_eof     => undef,
    on_error   => undef,
    handles    => +{},
  }, $class;
}

sub run {
  my ($self, %config) = @_;

  my $cv = AE::cv;
  tcp_server $config{host}, $config{port}, sub {
    my ($fh) = @_;

    my $handle; $handle = AnyEvent::Handle->new(
      fh => $fh,
      on_read => sub {
        $handle->push_read(%{$self->{on_read}}) if defined $self->{on_read};
      },
      on_eof => sub {
        $self->{on_eof}->(@_) if defined $self->{on_eof};
        $handle->destroy;
        delete $self->{handles}->{$fh};
      },
      on_error => sub {
        $self->{on_error}->(@_) if defined $self->{on_error};
        $handle->destroy;
        $cv->send;
      },
    );
    $self->{handles}->{$fh} = $handle;
    $self->{on_connect}->($handle) if defined $self->{on_connect};
  };
  $cv->recv;
}

sub register_on_connect {
  my ($self, $on_connect) = @_;
  $self->{on_connect} = $on_connect;
}

sub register_on_read {
  my ($self, %on_read) = @_;
  $self->{on_read} = \%on_read;
}

sub register_on_eof {
  my ($self, $on_eof) = @_;
  $self->{on_eof} = $on_eof;
}

sub register_on_error {
  my ($self, $on_error) = @_;
  $self->{on_error} = $on_error;
}

1;

アドバイス等頂けると幸いです。