スレッドを楽に扱いたい

前にPerlでスレッドを使ってみたが,例えば同時実行数などを楽に制御してみたいと思った.
TheSchwartz的な何かが欲しい.
調べてみると,Thread::Poolがあった.
これはこれでいいんだけれど,

  • Threadじゃなくてthreadsで実装したい(なんか,今後はithreadsの方が主流らしい?)
  • キューに溜まったタスクが0になれば,joinしたい

と思い,適当に書いてみた.
相変わらず冗長なプログラムだなぁ.

参考資料:

package Mst::threads::Pool;
use threads;
use threads::shared;
use Thread::Queue;
use Thread::Semaphore;

my $KILL_SIGNAL :shared;

sub new {
    my $class = shift;
    my $param = {@_};

    # init
    $param->{concurrency} ||= 3;
    $KILL_SIGNAL = 0;

    my $pool = bless $param, $class;
    $pool->{queue} = Thread::Queue->new;
    $pool->{semaphore} = Thread::Semaphore->new( $param->{concurrency} );

    $pool->{mngr} = threads->new(
	\&thr_mngr,
	$pool->{queue},
	$pool->{semaphore},
	$param->{concurrency},
	);
    
    return $pool;
}

sub thr_mngr {
    my( $queue, $semaphore, $concurrency) = @_;
    while( $queue->pending || !$KILL_SIGNAL){
	if( my $thr_params = $queue->dequeue_nb ){
	    $semaphore->down;
	    threads->new(
		# task
		sub {
		    my $semaphore = shift;
		    my $code = shift;
		    my @a = @_;
		    eval { &$code(@a); };
		    if($@){
			warn $@;
		    }
		    $semaphore->up;
		},
		$semaphore,
		shift @$thr_params,
		@$thr_params,
		);
	}
    }
}

sub enqueue {
    my $self = shift;
    my $subr = shift;

    if( !$KILL_SIGNAL){
	$self->{queue}->enqueue( &shared_clone( [ $subr, @_ ] ));
    }
    elsif( $self->{debug} ){
	warn q{can't create new thread};
    }
}

sub join {
    my $self = shift;
    $KILL_SIGNAL = 1;
    $self->{mngr}->join();
    foreach my $thr (threads->list) {
	if ($thr->tid && !threads::equal($thr, threads->self)) {
	    $thr->join;
	}
    }
}

1;

使用例:

#!/usr/bin/perl
use lib '../lib';
use Mst::threads::Pool;

my $pool = new Mst::threads::Pool(
    concurrency => 3,
    );

foreach(0..9){
    $pool->enqueue("main::test",$_);
}
$pool->join;

sub test {
    my($num) = @_;
    foreach(0..1){
	warn sprintf("count:%2d\n",$num);
	sleep 1;
    }
    print qq{=== exit: $num\n};
}

exit;

結果:

count: 1
count: 2
count: 0
count: 2
count: 0
count: 1
=== exit: 2
=== exit: 0
=== exit: 1
count: 3
count: 4
count: 5
count: 3
count: 4
count: 5
=== exit: 3
=== exit: 5
=== exit: 4
count: 6
count: 7
count: 8
count: 6
count: 7
count: 8
=== exit: 6
count: 9
=== exit: 7
=== exit: 8
count: 9
=== exit: 9

おk.成功.