Mercurial > hg > Others > Rakudo
view src/core.c/Awaiter.pm6 @ 0:c341f82e7ad7 default tip
Rakudo branch in cr.ie.u-ryukyu.ac.jp
author | Shinji KONO <kono@ie.u-ryukyu.ac.jp> |
---|---|
date | Thu, 26 Dec 2019 16:50:27 +0900 |
parents | |
children |
line wrap: on
line source
my role Awaiter { method await(Awaitable:D $a) { ... } method await-all(Iterable:D $i) { ... } } my class Awaiter::Blocking does Awaiter { method await(Awaitable:D $a) { my $handle := $a.get-await-handle; if $handle.already { $handle.success ?? $handle.result !! $handle.cause.rethrow } else { my $s = Semaphore.new(0); my $success; my $result; $handle.subscribe-awaiter(-> \success, \result { $success := success; $result := result; $s.release; }); $s.acquire; $success ?? $result !! $result.rethrow } } method await-all(Iterable:D \i) { # Collect results that are already available, and handles where the # results are not yet available together with the matching insertion # indices. my \results = nqp::list(); my \handles = nqp::list(); my \indices = nqp::list_i(); my int $insert = 0; my $saw-slip = False; for i -> $awaitable { unless nqp::istype($awaitable, Awaitable) { die "Can only specify Awaitable objects to await (got a $awaitable.^name())"; } unless nqp::isconcrete($awaitable) { die "Must specify a defined Awaitable to await (got an undefined $awaitable.^name())"; } my $handle := $awaitable.get-await-handle; if $handle.already { if $handle.success { my \result = $handle.result; nqp::bindpos(results, $insert, result); $saw-slip = True if nqp::istype(result, Slip); } else { $handle.cause.rethrow } } else { nqp::push(handles, $handle); nqp::push_i(indices, $insert); } ++$insert; } # See if we have anything that we need to really block on. If so, we # use a lock and condition variable to handle the blocking. The lock # protects writes into the array. my int $num-handles = nqp::elems(handles); if $num-handles { my $exception = Mu; my $l = Lock.new; my $ready = $l.condition(); my int $remaining = $num-handles; loop (my int $i = 0; $i < $num-handles; ++$i) { my $handle := nqp::atpos(handles, $i); my int $insert = nqp::atpos_i(indices, $i); $handle.subscribe-awaiter(-> \success, \result { $l.protect: { if success && $remaining { nqp::bindpos(results, $insert, result); $saw-slip = True if nqp::istype(result, Slip); --$remaining; $ready.signal unless $remaining; } elsif !nqp::isconcrete($exception) { $exception := result; $remaining = 0; $ready.signal; } } }); } # Block until remaining is 0 (need the loop to cope with suprious # wakeups). loop { $l.protect: { last if $remaining == 0; $ready.wait; } } # If we got an exception, throw it. $exception.rethrow if nqp::isconcrete($exception); } my \result-list = nqp::p6bindattrinvres(nqp::create(List), List, '$!reified', results); $saw-slip ?? result-list.map(-> \val { val }).List !! result-list } } PROCESS::<$AWAITER> := Awaiter::Blocking; # vim: ft=perl6 expandtab sw=4