package Test2::Workflow::Runner; use strict; use warnings; our $VERSION = '1.302206'; use Test2::API(); use Test2::Todo(); use Test2::AsyncSubtest(); use Test2::Util qw/get_tid CAN_REALLY_FORK/; use Scalar::Util qw/blessed/; use Time::HiRes qw/sleep/; use List::Util qw/shuffle min/; use Carp qw/confess/; use Test2::Util::HashBase qw{ stack no_fork no_threads max slots pid tid rand subtests filter }; use overload( 'fallback' => 1, '&{}' => sub { my $self = shift; sub { @_ = ($self); goto &run; } }, ); sub init { my $self = shift; $self->{+STACK} = []; $self->{+SUBTESTS} = []; $self->{+PID} = $$; $self->{+TID} = get_tid(); $self->{+NO_FORK} ||= $ENV{T2_WORKFLOW_NO_FORK} || !CAN_REALLY_FORK(); my $can_thread = Test2::AsyncSubtest->CAN_REALLY_THREAD(); my $should_thread = ($ENV{T2_WORKFLOW_USE_THREADS} || $ENV{T2_DO_THREAD_TESTS}) && !$ENV{T2_WORKFLOW_NO_THREADS}; $self->{+NO_THREADS} ||= !($can_thread && $should_thread); $self->{+RAND} = 1 unless defined $self->{+RAND}; my @max = grep {defined $_} $self->{+MAX}, $ENV{T2_WORKFLOW_ASYNC}; my $max = @max ? min(@max) : 3; $self->{+MAX} = $max; $self->{+SLOTS} = [] if $max; unless(defined($self->{+FILTER})) { if (my $raw = $ENV{T2_WORKFLOW}) { my ($file, $line, $name); if ($raw =~ m/^(.*)\s+(\d+)$/) { ($file, $line) = ($1, $2); } elsif($raw =~ m/^(\d+)$/) { $line = $1; } else { $name = $raw; } $self->{+FILTER} = { file => $file, line => $line, name => $name, }; } } if (my $task = delete $self->{task}) { $self->push_task($task); } } sub is_local { my $self = shift; return 0 unless $self->{+PID} == $$; return 0 unless $self->{+TID} == get_tid(); return 1; } sub send_event { my $self = shift; my ($type, %params) = @_; my $class; if ($type =~ m/\+(.*)$/) { $class = $1; } else { $class = "Test2::Event::$type"; } my $hub = Test2::API::test2_stack()->top(); my $e = $class->new( trace => Test2::Util::Trace->new( frame => [caller(0)], buffered => $hub->buffered, nested => $hub->nested, hid => $hub->hid, huuid => $hub->uuid, #cid => $self->{+CID}, #uuid => $self->{+UUID}, ), %params, ); $hub->send($e); } sub current_subtest { my $self = shift; my $stack = $self->{+STACK} or return undef; for my $state (reverse @$stack) { next unless $state->{subtest}; return $state->{subtest}; } return undef; } sub run { my $self = shift; my $stack = $self->stack; my $c = 0; while (@$stack) { $self->cull; my $state = $stack->[-1]; my $task = $state->{task}; unless($state->{started}++) { my $skip = $task->skip; my $filter; if (my $f = $self->{+FILTER}) { my $in_var = grep { $_->{filter_satisfied} } @$stack; $filter = $task->filter($f) unless $in_var; $state->{filter_satisfied} = 1 if $filter->{satisfied}; } $skip ||= $filter->{skip} if $filter; if ($skip) { $state->{ended}++; $self->send_event( 'Skip', reason => $skip || $filter, name => $task->name, pass => 1, effective_pass => 1, ); pop @$stack; next; } if ($task->flat) { my $st = $self->current_subtest; my $hub = $st ? $st->hub : Test2::API::test2_stack->top; $state->{todo} = Test2::Todo->new(reason => $task->todo, hub => $hub) if $task->todo; $hub->send($_) for @{$task->events}; } else { my $st = Test2::AsyncSubtest->new( name => $task->name, frame => $task->frame, ); $state->{subtest} = $st; $state->{todo} = Test2::Todo->new(reason => $task->todo, hub => $st->hub) if $task->todo; for my $e (@{$task->events}) { my $hub = $st->hub; $e->trace->{buffered} = $hub->buffered; $e->trace->{nested} = $hub->nested; $e->trace->{hid} = $hub->hid; $e->trace->{huuid} = $hub->uuid; $hub->send($e); } my $slot = $self->isolate($state); # if we forked/threaded then this state has ended here. if (defined($slot)) { push @{$self->{+SUBTESTS}} => [$st, $task] unless $st->finished; $state->{subtest} = undef; $state->{ended} = 1; } } } if ($state->{ended}) { $state->{todo}->end() if $state->{todo}; $state->{subtest}->stop() if $state->{subtest}; return if $state->{in_thread}; if(my $guard = delete $state->{in_fork}) { $state->{subtest}->detach; $guard->dismiss; exit 0; } pop @$stack; next; } if($state->{subtest} && !$state->{subtest_started}++) { push @{$self->{+SUBTESTS}} => [$state->{subtest}, $task]; $state->{subtest}->start(); } if ($task->isa('Test2::Workflow::Task::Action')) { $state->{PID} = $$; my $ok = eval { $task->code->($self); 1 }; unless ($state->{PID} == $$) { print STDERR "Task '" . $task->name . "' started in pid $state->{PID}, but ended in pid $$, did you forget to exit after forking?\n"; exit 255; } $task->exception($@) unless $ok; $state->{ended} = 1; next; } if (!$state->{stage} || $state->{stage} eq 'BEFORE') { $state->{before} = (defined $state->{before}) ? $state->{before} : 0; if (my $add = $task->before->[$state->{before}++]) { if ($add->around) { $state->{PID} = $$; my $ok = eval { $add->code->($self); 1 }; my $err = $@; my $complete = $state->{stage} && $state->{stage} eq 'AFTER'; unless ($state->{PID} == $$) { print STDERR "Task '" . $task->name . "' started in pid $state->{PID}, but ended in pid $$, did you forget to exit after forking?\n"; exit 255; } unless($ok && $complete) { $state->{ended} = 1; $state->{stage} = 'AFTER'; $task->exception($ok ? "'around' task failed to continue into the workflow chain.\n" : $err); } } else { $self->push_task($add); } } else { $state->{stage} = 'VARIANT'; } } elsif ($state->{stage} eq 'VARIANT') { if (my $v = $task->variant) { $self->push_task($v); } $state->{stage} = 'PRIMARY'; } elsif ($state->{stage} eq 'PRIMARY') { unless (defined $state->{order}) { my $rand = defined($task->rand) ? $task->rand : $self->rand; $state->{order} = [0 .. scalar(@{$task->primary}) - 1]; @{$state->{order}} = shuffle(@{$state->{order}}) if $rand; } my $num = shift @{$state->{order}}; if (defined $num) { $self->push_task($task->primary->[$num]); } else { $state->{stage} = 'AFTER'; } } elsif ($state->{stage} eq 'AFTER') { $state->{after} = (defined $state->{after}) ? $state->{after} : 0; if (my $add = $task->after->[$state->{after}++]) { return if $add->around; $self->push_task($add); } else { $state->{ended} = 1; } } } $self->finish; } sub push_task { my $self = shift; my ($task) = @_; confess "No Task!" unless $task; confess "Bad Task ($task)!" unless blessed($task) && $task->isa('Test2::Workflow::Task'); if ($task->isa('Test2::Workflow::Build')) { confess "Can only push a Build instance when initializing the stack" if @{$self->{+STACK}}; $task = $task->compile(); } push @{$self->{+STACK}} => { task => $task, name => $task->name, }; } sub add_mock { my $self = shift; my ($mock) = @_; my $stack = $self->{+STACK}; confess "Nothing on the stack!" unless $stack && @$stack; my ($state) = grep { !$_->{task}->scaffold} reverse @$stack; push @{$state->{mocks}} => $mock; } sub isolate { my $self = shift; my ($state) = @_; return if $state->{task}->skip; my $iso = $state->{task}->iso; my $async = $state->{task}->async; # No need to isolate return undef unless $iso || $async; # Cannot isolate unless($self->{+MAX} && $self->is_local) { # async does not NEED to be isolated return undef unless $iso; } # Wait for a slot, if max is set to 0 then we will not find a slot, instead # we use '0'. We need to return a defined value to let the stack know that # the task has ended. my $slot = 0; while($self->{+MAX} && $self->is_local) { $self->cull; for my $s (1 .. $self->{+MAX}) { my $st = $self->{+SLOTS}->[$s]; next if $st && !$st->finished; $self->{+SLOTS}->[$s] = undef; $slot = $s; last; } last if $slot; sleep(0.02); } my $st = $state->{subtest} or confess "Cannot isolate a task without a subtest"; if (!$self->no_fork) { my $out = $st->fork; if (blessed($out)) { $state->{in_fork} = $out; # drop back out to complete the task. return undef; } else { $self->send_event( 'Note', message => "Forked PID $out to run: " . $state->{task}->name, ); $state->{pid} = $out; } } elsif (!$self->no_threads) { $state->{in_thread} = 1; my $thr = $st->run_thread(\&run, $self); $state->{thread} = $thr; delete $state->{in_thread}; $self->send_event( 'Note', message => "Started Thread-ID " . $thr->tid . " to run: " . $state->{task}->name, ); } else { $st->finish(skip => "No isolation method available"); return 0; } if($slot) { $self->{+SLOTS}->[$slot] = $st; } else { $st->finish; } return $slot; } sub cull { my $self = shift; my $subtests = delete $self->{+SUBTESTS} || return; my @new; # Cull subtests in reverse order, Nested subtests end before their parents. for my $set (reverse @$subtests) { my ($st, $task) = @$set; next if $st->finished; if (!$st->active && $st->ready) { $st->finish(); next; } # Use unshift to preserve order. unshift @new => $set; } $self->{+SUBTESTS} = \@new; return; } sub finish { my $self = shift; while(@{$self->{+SUBTESTS}}) { $self->cull; sleep(0.02) if @{$self->{+SUBTESTS}}; } } 1; __END__ =pod =encoding UTF-8 =head1 NAME Test2::Workflow::Runner - Runs the workflows. =head1 SOURCE The source code repository for Test2-Workflow can be found at F. =head1 MAINTAINERS =over 4 =item Chad Granum Eexodist@cpan.orgE =back =head1 AUTHORS =over 4 =item Chad Granum Eexodist@cpan.orgE =back =head1 COPYRIGHT Copyright Chad Granum Eexodist7@gmail.comE. This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See F =cut