スレッドを楽に扱いたい
前にPerlでスレッドを使ってみたが,例えば同時実行数などを楽に制御してみたいと思った.
TheSchwartz的な何かが欲しい.
調べてみると,Thread::Poolがあった.
これはこれでいいんだけれど,
- Threadじゃなくてthreadsで実装したい(なんか,今後はithreadsの方が主流らしい?)
- キューに溜まったタスクが0になれば,joinしたい
と思い,適当に書いてみた.
相変わらず冗長なプログラムだなぁ.
参考資料:
package Mst::threads::Pool; use threads; use threads::shared; use Thread::Queue; use Thread::Semaphore; my $KILL_SIGNAL :shared; sub new { my $class = shift; my $param = {@_}; # init $param->{concurrency} ||= 3; $KILL_SIGNAL = 0; my $pool = bless $param, $class; $pool->{queue} = Thread::Queue->new; $pool->{semaphore} = Thread::Semaphore->new( $param->{concurrency} ); $pool->{mngr} = threads->new( \&thr_mngr, $pool->{queue}, $pool->{semaphore}, $param->{concurrency}, ); return $pool; } sub thr_mngr { my( $queue, $semaphore, $concurrency) = @_; while( $queue->pending || !$KILL_SIGNAL){ if( my $thr_params = $queue->dequeue_nb ){ $semaphore->down; threads->new( # task sub { my $semaphore = shift; my $code = shift; my @a = @_; eval { &$code(@a); }; if($@){ warn $@; } $semaphore->up; }, $semaphore, shift @$thr_params, @$thr_params, ); } } } sub enqueue { my $self = shift; my $subr = shift; if( !$KILL_SIGNAL){ $self->{queue}->enqueue( &shared_clone( [ $subr, @_ ] )); } elsif( $self->{debug} ){ warn q{can't create new thread}; } } sub join { my $self = shift; $KILL_SIGNAL = 1; $self->{mngr}->join(); foreach my $thr (threads->list) { if ($thr->tid && !threads::equal($thr, threads->self)) { $thr->join; } } } 1;
使用例:
#!/usr/bin/perl use lib '../lib'; use Mst::threads::Pool; my $pool = new Mst::threads::Pool( concurrency => 3, ); foreach(0..9){ $pool->enqueue("main::test",$_); } $pool->join; sub test { my($num) = @_; foreach(0..1){ warn sprintf("count:%2d\n",$num); sleep 1; } print qq{=== exit: $num\n}; } exit;
結果:
count: 1
count: 2
count: 0
count: 2
count: 0
count: 1
=== exit: 2
=== exit: 0
=== exit: 1
count: 3
count: 4
count: 5
count: 3
count: 4
count: 5
=== exit: 3
=== exit: 5
=== exit: 4
count: 6
count: 7
count: 8
count: 6
count: 7
count: 8
=== exit: 6
count: 9
=== exit: 7
=== exit: 8
count: 9
=== exit: 9
おk.成功.