#!/usr/bin/perl
$ENV{PERL_RL}=" o=0";
$SIG{PIPE} = 'IGNORE';
package settings;
use constant FD => 0;
use constant SOCKET => 1;
use constant USE_SOCKETPAIR => 0;
package arguments;
sub initialize_terminal();
sub fetch_arguments(@);
sub fetch_line($);
sub restore_last_argument();
sub restore_last_command();
sub get_next_command();
sub get_next_argument();
sub read_buffer_data($);
sub delete_buffer($);
sub get_data_buffer();
sub get_next_stuff($);
sub get_parameters();
sub skip_command_separator();
sub terminate_arguments_parsing();
sub args_file($);
sub machines_file($);
sub machine($);
sub gateway($);
sub localhost($);
sub not_root();
sub print_defaults();
sub print_package($);
sub print_version();
sub print_version_quit();
sub init();
sub parse_options();
sub check_separators_integrity();
sub begin_group();
sub end_group();
sub dynamic($);
package command;
sub new (%);
sub run ($);
sub cleanup();
sub my_shutdown($$);
sub close($);
sub output($$);
sub event_msg($);
package communicator;
sub create_channel();
sub init();
sub cleanup();
sub get_connections($);
sub get_root();
sub process_messages($$);
sub process_message($$);
sub process_command_output($$);
sub process_pipe_output($$);
sub run ();
sub terminate ();
sub add_descriptors($);
sub remove_descriptor($$);
sub remove_cobidule($);
sub no_pending_connectors();
sub add_connector($$);
sub connector_initialized($);
sub remove_from_set($);
sub remove_source($);
sub remove_interpreter($);
sub remove_sink($);
sub remove_control($);
sub remove_local_command($);
sub remove_pipe($);
sub post_write($$@);
sub post_close($);
sub post_termination($$$);
sub cleanup_pending_stuff($);
sub write_pending_stuff($);
sub flush_pending_stuff();
sub close_cobidule_read($);
sub assign_next_target($);
sub get_target_set($$);
package connector;
sub new (%);
sub read_data ($);
sub send_parameters();
sub get_message ();
sub send_message($);
sub route_file_part($$);
sub send_file($$$);
sub pack();
sub unpack($);
sub cancel();
package diagnostic;
sub print_info($$$);
sub system ();
sub debug ($);
sub error ($);
sub warning ($);
sub special ($);
package general;
sub init();
sub apply_gateway_restrictions();
sub expand($%);
sub open_file($);
sub load_file($);
sub load_taktuk_code();
sub load_taktuk_package($);
sub print_help();
sub print_help_quit();
sub decode_host_set($);
sub first_in_host($);
sub increment_host_vector($$);
sub host_vector_to_value($$);
sub decode_exclusion_set($);
sub in_exclusion_set($$);
sub add_peer($$@);
sub print($);
sub print_tree($$);
sub decode_set($$);
sub encode_set(@);
package handlers;
sub register_handler($$);
sub replace_handler($$);
sub get_handler($);
sub handler_blocked($$$);
sub arguments($$$);
sub broadcast($$$);
sub empty($$$);
sub eof($$$);
sub execute_timeout($);
sub execute($$$);
sub file_finalize($);
sub file($$$);
sub forward_up($$$);
sub get($$$);
sub get_info($$$);
sub input($$$);
sub kill($$$);
sub message($$$);
sub numbered($$$);
sub output($$$);
sub option($$$);
sub options($$$);
sub pipe($$$);
sub position($$$);
sub put($$$);
sub quit($$$);
sub ready($$$);
sub recv_timeout($);
sub reduce_count($$$$);
sub reduce_tree($$$$);
sub reduce($$$);
sub reduce_status_analysis($);
sub reduce_result($$$);
sub check_ongoing_reduces($);
sub resign($$$);
sub send_to($$$);
sub spread($$$);
sub steal($$$);
sub synchronize($$$);
sub taktuk_code($$$);
sub taktuk_perl($$$);
sub update_failed($$$);
sub wait_message($$$);
sub work($$$);
sub init();
package main;
sub is_one_of($@);
sub get_attributes();
sub translate();
sub terminate_interpreter();
sub fork_taktuk_interpreter();
sub handle_message($);
sub process_commands();
package option;
sub clone($);
sub register($$$$@);
sub set($$);
sub unset($);
sub get_config();
sub push_context();
sub pop_context();
sub apply_option($$);
use constant COMMAND_LINE => 0;
use constant MESSAGE => 1;
package TakTuk::Select;
sub new();
sub add($);
sub remove($);
sub select($$$$);
sub handles();
package TakTuk;
sub no_flush($);
sub unpack($);
sub pack($);
sub decode($);
sub encode($$);
sub syswrite($$);
sub read_data($);
sub get_message($);
sub find_sequence($$);
sub flush_buffer($);
sub error_msg($);
sub send(%);
sub recv(%);
sub wait_message(@);
sub get($);
BEGIN {
die "FATAL : Perl interpreter too old ($]), Taktuk require Perl >= 5.6.1\n"
if ($] < 5.006001);
}
our $VERSION = "3.7";
our $RELEASE = sprintf "%d", q$Rev: 512 $ =~ /(\d+)/g;
our $read_size = 32768;
our $write_size = 32768;
our $error = undef;
our $action="A";
our $eof="D";
our $taktuk_perl="E";
our $gateway="F";
our $get="G";
our $invalid="H";
our $info="I";
our $option="N";
our $timeout="O";
our $put="P";
our $reduce_result="Q";
our $reduce="R";
our $spread="S";
our $taktuk_code="T";
our $update_failed="U";
our $options="V";
our $wait_message="W";
our $resign="X";
our $arguments="a";
our $broadcast="b";
our $down="d";
our $execute="e";
our $file="f";
our $get_info="g";
our $input="i";
our $kill="k";
our $message="m";
our $numbered="n";
our $output="o";
our $position="p";
our $quit="q";
our $ready="r";
our $steal="s";
our $send_to="t";
our $forward_up="u";
our $work="w";
our $synchronize="x";
our $pipe="z";
our $reduce_count = 'c';
our $reduce_tree = 't';
use constant TAKTUK_READY => 0;
use constant TAKTUK_NUMBERED => 1;
use constant TAKTUK_TERMINATED => 2;
use constant CONNECTION_FAILED => 3;
use constant CONNECTION_INITIALIZED => 4;
use constant CONNECTION_LOST => 5;
use constant COMMAND_STARTED => 6;
use constant COMMAND_FAILED => 7;
use constant COMMAND_TERMINATED => 8;
use constant UPDATE_FAILED => 9;
use constant PIPE_STARTED => 10;
use constant PIPE_FAILED => 11;
use constant PIPE_TERMINATED => 12;
use constant FILE_RECEPTION_STARTED =>13;
use constant FILE_RECEPTION_FAILED =>14;
use constant FILE_RECEPTION_TERMINATED =>15;
use constant FILE_SEND_FAILED =>16;
use constant INVALID_TARGET => 17;
use constant NO_TARGET => 18;
use constant MESSAGE_DELIVERED => 19;
use constant INVALID_DESTINATION => 20;
use constant UNAVAILABLE_DESTINATION => 21;
package scheduler;
sub begin_group();
sub end_group();
sub set_limit($);
sub deploy_connector($);
sub add_connector($);
sub connector_initialized($);
sub connector_failed($);
sub at_limit($);
sub schedule();
sub is_idle();
sub send_work_to($);
sub dispatch_work($);
sub theft_handler($$);
sub send_steal_request($);
sub resign();
package stats_buffer;
sub new($$);
sub is_empty($);
sub add($$);
sub average($);
sub min($);
sub max($);
package synchronizer;
sub check_ready_state();
sub set_not_ready();
sub initialization_complete($);
sub initialization_failed($);
sub initialization_timeout($);
sub set_not_numbered();
sub block_until_event($@);
sub dispatch_event($);
sub add_pending_message($$$);
sub get_state($);
sub setup_synchronization();
package timer;
sub current_time();
sub register($$);
sub check_timeouts();
sub unregister();
sub gettime($);
sub print($);
package general;
use strict; use bytes;
our $has_hostname = eval("use Sys::Hostname;1")?1:0;
our $connector_command;
our $connector_timeout;
our $connector_send_files;
our $login_name;
our $host;
our $root=1;
our $taktuk_command;
our $perl_interpreter;
our $path_value;
our $self_propagate;
our $taktuk_code = undef;
our $taktuk_package = undef;
our $template;
our $redirect;
our $current_position = 0;
our $position = 0;
our $numbering_update = 0;
our %taktuk = (gateway=>0, rank=>-1, count=>-1,
child_min=>-1, child_max=>-1, father=>-1);
sub init() {
if ($has_hostname) {
$host = hostname();
chomp($host);
} else {
$host = "localhost";
}
}
sub apply_gateway_restrictions() {
if ($taktuk{gateway}) {
foreach my $message ($TakTuk::execute,
$TakTuk::eof,
$TakTuk::get,
$TakTuk::input,
$TakTuk::kill,
$TakTuk::message,
$TakTuk::put,
$TakTuk::taktuk_perl) {
handlers::replace_handler($message, \&handlers::empty);
}
}
}
sub expand($%) {
my $string = shift;
my $variables = "";
my %values = @_;
foreach my $key (keys(%values)) {
$variables .= "$key=$values{$key} &&";
}
diagnostic::debug("Name $string");
$string = `$variables echo $string`
if $string =~ m/[\$\`]/;
chomp($string);
diagnostic::debug("Expanded to $string");
return $string;
}
sub open_file($) {
my $filename = shift;
my $fd = undef;
$filename = expand($filename);
if ($filename and ($filename ne "-")) {
if (not open $fd, $filename) {
diagnostic::system;
diagnostic::error("Trying to open $filename");
$fd = undef;
} else {
binmode($fd) or diagnostic::system;
}
} else {
if ($main::interactive) {
diagnostic::error("Cannot load STDIN in interactive mode");
} elsif (not $general::root) {
diagnostic::error("Cannot load STDIN on a non root node");
} else {
$fd = \*STDIN;
}
}
return $fd;
}
sub load_file($) {
my $filename = shift;
my $fd = undef;
my $new_data;
my $end = 0;
my $stuff="";
my $result;
$fd = open_file($filename);
return "" if (not defined($fd));
while (not $end) {
$result = sysread($fd, $new_data, $TakTuk::read_size);
(diagnostic::system and $result = 0) if not defined($result);
if ($result > 0) {
$stuff = $stuff.$new_data;
} else {
$end = 1;
}
} 
if ($fd != \*STDIN) {
close $fd or diagnostic::system;
}
return $stuff;
}
sub load_taktuk_code() {
if (not defined($taktuk_code)) {
my $filename = $0;
my $fd = undef;
my $new_data;
my $end = 0;
my $result;
if (not -r $filename) {
$filename = `which $0`;
}
if ($filename) {
if (not $fd = open_file($filename)) {
diagnostic::error("Trying to open $filename");
return "";
}
} else {
diagnostic::error("Cannot find own taktuk executable");
return "";
}
$taktuk_code = "";
while (not $end) {
$result = sysread($fd, $new_data, $TakTuk::read_size);
(diagnostic::system and $result = 0) if not defined($result);
if ($result > 0) {
$taktuk_code = $taktuk_code.$new_data;
} else {
$end = 1;
}
} 
close $fd or diagnostic::system;
}
}
sub load_taktuk_package($) {
my $package_name = shift;
my $state = 0;
my $last_pos;
if (not defined($taktuk_package)) {
load_taktuk_code;
$taktuk_package = "";
my $last_pos = 0;
while ($taktuk_code =~ /(?=package\s+([^;\s]*)\s*;\s*$)/gmi) {
my $pos = pos($taktuk_code);
$taktuk_package .= substr($taktuk_code,$last_pos, $pos - $last_pos)
if $state;
if ($1 eq $package_name) {
$state = 1;
} else {
$state = 0;
}
$last_pos = $pos;
}
$taktuk_package .= substr($taktuk_code, $last_pos) if $state;
$taktuk_package .= "1;\n";
}
}
sub print_help() {
communicator::get_root->output('info', <<END_HELP);
Usage :
taktuk [ options ] [ commands ]
If one or more commands are given, TakTuk will execute them sequentially.
Otherwise, TakTuk enter interactive mode, waiting for commands.
Commands are (braces can be replaced by any delimiter):
<set> command : sends the command to all the peers of the set
(see TakTuk manual for set specification syntax)
broadcast command : broadcasts TakTuk command to all the remote peers
not including the node initiating the broadcast
downcast command : spreads TakTuk command to all the children of the
node initiating the command (not including itself)
exec <param> [ command ] : executes the given shell command on the local node
using optional execution parameters
get [ src ] [ dest ] : copies some file from destination node(s)
help : prints this help
input [ data ] : sends input to all commands being executed locally
input data [ data ] : same as input [ data ]
input line [ data ] : same as input but adds a newline at the end of data
input file [ filename ] : sends the content of a file as input
input pipe [ filename ] : sends the content of a pipe-like file as input
input close : closes stdin of all commands being executed locally
kill <signal> : sends a signal to local commands processes groups
(signal default to 15 if not given)
network : prints the TakTuk deployment tree
network state : same as network
network cancel : cancels all ongoing connections
network renumber : recomputes TakTuk logical numbering
network update : updates TakTuk logical numbering if possible
option name [ value ] : changes some TakTuk option on destination node(s)
option [ line ] : gives TakTuk options line to destination node(s)
put [ src ] [ dest ] : copies some local file to destination node(s)
quit : quit TakTuk and shutdown its logical network
synchronize command : forces the following command to wait for deployment,
numbering and preceding commands before executing
taktuk_perl [ args ] : forks a perl interpreter on the local node aware of
the taktuk package and communication routines (see
manual for a description of the taktuk package which
contains point-to-point communication routines).
Similar in principle to 'exec perl args'.
WARNING: due to limited parser, you have to give
args (even if empty) and to use '--' to explicitely
terminate switches if any
version : prints TakTuk version
Deployment options:
-b, --begin-group : starts a new deployment group
-c, --connector command : defines the connector commands used for following
remote machines
-d, --dynamic limit : turns dynamic mode on (work stealing) for all the
following remote machines specifications.
Uses limit as a maximal arity (0 = no limit).
A negative value for limit turns dynamic mode off.
Warning, currently it is a bad idea to use several
-d options in the same deployment group (the
resulting arity is not specified)
-e, --end-group : ends a deployment group
-f, --machines-file name : name is the name of a file that contains remote
machines names (equivalent to several -m opions)
-l, --login name : sets the login name for the following hosts
-m, --machine name -[ -] : name is the name of a remote host to be deployed
with its optional arguments between -[ and -]
-s, --self-propagate : propagate the TakTuk executable through connectors
(eliminates the need for a TakTuk installation on
remote machines)
-z, --dont-self-propagate: turns self propagation off
-F, --args-file name : name is the name of a file that contains options
-G, --gateway name : name is the name of a forward only remote host to be
deployed. Arguments can be given as with -m
-I, --perl-interpreter : the command to use as a perl interpreter (for
auto propagation mode and taktuk_perl command)
-L, --localhost name : sets the name of localhost as viewed by TakTuk
-S, --send-files files : send files at connection for further connections
-T, --taktuk-command com : com is the name of the TakTuk command that should
be used for the following hosts
-V, --path-value list : changes the PATH value on remote hosts
Command line parsing options:
-C, --command-separator : changes the set of characters considered as command
separators set separators
-E, --escape-character : defines an escape character that protects any
character following character from TakTuk interpretation
-O, --option-separator : changes the set of characters considered as option
separators set separators
I/O options:
-o, --output-template : set an output template specification for one of the
name=specification output streams (see man for details about
templates). When giving only a name (without
specification) this disables the stream
-R, --output-redirect : sets an output redirection for one of the output
name=filename streams (see man for details about output streams)
Performance tuning options:
-g, --time-granularity n : sets to n the maximal interval between timeouts
checks (usually checks are made more often)
-n, --no-numbering : disable taktuk nodes numbering, speed up deployment
-t, --timeout time : sets the timeout for connectors (0 = no timeout)
-u, --cache-limit n : sets a limit n to the size of pending write data.
A negative value for n means no limit
-w, --window number : sets initial window size to number (pipeline width)
-W, --window-adaptation : sets the windows adaptation scheme to number
number (0: no adaptation, 1: experimental algorithm)
Miscellaneous options:
-M, --my : makes the next option local (not inherited)
-h, --help : prints this help
-i, --interactive : forces interactive mode afer command line parsing
-v, --version : prints TakTuk version and exits
-P, --print-defaults : prints default settings
Environment variables defined by TakTuk for commands execution (see man for
details about variables that change TakTuk settings):
TAKTUK_COUNT : total number of successfully deployed nodes
TAKTUK_HOSTNAME : hostname of the local node as given to TakTuk
TAKTUK_PIDS : pids list of commands executed by the local TakTuk
TAKTUK_POSITION : host position on the command line
TAKTUK_RANK : logical number of local node (in [1..TAKTUK_COUNT])
END_HELP
}
sub print_help_quit() {
print_help;
$main::terminate = 1;
}
sub decode_host_set($) {
my $host_set = shift;
my $full_set = [];
$host_set = "" if not defined($host_set);
while (length($host_set)) {
$host_set =~ m/^((?:[^[,]*(?:\[[^]]*\])?)*)(?:,(.*))?$/o;
my $host_entry = $1;
$host_set = defined($2)?$2:"";
if (length($host_entry)) {
my $set;
$set = [];
diagnostic::debug("Host entry $host_entry");
while ($host_entry =~ s/^([^[]*)\[([^]]*)\](.*)$/$3/o) {
my ($prefix, $set_spec) = ($1,$2);
my $values;
$values = [];
push @$set, $prefix;
foreach my $range (split /,/, $set_spec) {
$range =~ m/^([a-zA-Z]*\d*)(?:-([a-zA-Z]*\d*))?$/o;
my ($min, $max) = ($1,$2);
$max = $min if not defined($max);
if (not defined($min) or (length($min) > length($max)) or
((length($min) == length($max)) and ($min gt $max))) {
diagnostic::error("Error in set spec");
exit 1;
} else {
push @$values, $min, $max;
}
}
push @$set, $values;
}
push @$set, $host_entry;
push @$full_set, $set if scalar(@$set);
}
}
return $full_set;
}
sub first_in_host($) {
my $host_spec = shift;
my $range_mode = 0;
my $result = [];
foreach my $part (@$host_spec) {
if ($range_mode) {
push @$result, 0, $part->[0];
}
$range_mode = not $range_mode;
}
return $result;
}
sub increment_host_vector($$) {
my $vector = shift;
my $vector_index = $#$vector - 1;
my $host = shift;
my $host_index = $#$host - 1;
my $carry = 1;
while ($carry and ($vector_index >= 0)) {
my $position = $vector->[$vector_index];
my $value = $vector->[$vector_index+1];
my $range = $host->[$host_index];
if ($value eq $range->[$position+1]) {
$position += 2;
if ($position < $#$range) {
$vector->[$vector_index] = $position;
$vector->[$vector_index+1] = $range->[$position];
$carry = 0;
} else {
$vector->[$vector_index] = 0;
$vector->[$vector_index+1] = $range->[0];
}
} else {
$value = "".$value;
$value++;
$vector->[$vector_index+1] = $value;
$carry = 0;
}
$vector_index -= 2;
$host_index -= 2;
}
return $carry;
}
sub host_vector_to_value($$) {
my $vector = shift;
my $vector_index = 1;
my $host = shift;
my $range_mode = 0;
my $result = "";
foreach my $part (@$host) {
if ($range_mode) {
$result .= $vector->[$vector_index];
$vector_index += 2;
} else {
$result .= $part;
}
$range_mode = not $range_mode;
}
return $result;
}
sub decode_exclusion_set($) {
my $set = decode_host_set(shift);
my $hash = {};
foreach my $host_spec (@$set) {
my $index = first_in_host($host_spec);
my $carry;
do {
my $name = host_vector_to_value($index, $host_spec);
$hash->{$name} = 1;
$carry = increment_host_vector($index, $host_spec);
}
while (not $carry);
}
return $hash;
}
sub in_exclusion_set($$) {
my $value = shift;
my $set = shift;
return exists($set->{$value});
}
sub add_peer($$@) {
my $actual_command;
my $peer_name = shift;
my $is_gateway = shift;
my ($command_args, $message_args) = option::get_config;
if ($self_propagate) {
$actual_command = "echo $connector::connector_functional_string;".
"exec $perl_interpreter -- - -r ";
} else {
$actual_command = "exec $taktuk_command -r ";
}
$actual_command .= "-n " if $taktuk{rank} > -1;
$actual_command = $actual_command.$command_args." ";
if (defined($path_value)) {
$actual_command = 'exec /bin/sh -c "PATH='.$path_value.
';export PATH;'.$actual_command.'"';
}
foreach my $element (@_) {
$message_args = $message_args.TakTuk::pack($element);
}
$peer_name = expand($peer_name);
my ($set, $exclusion) = split '/', $peer_name;
$set = decode_host_set($set);
$exclusion = decode_exclusion_set($exclusion);
foreach my $host_spec (@$set) {
my $index = first_in_host($host_spec);
my $carry;
do {
my $value = host_vector_to_value($index, $host_spec);
if (not in_exclusion_set($value, $exclusion)) {
$current_position++;
my $position_prefix = $position?$position.".":"";
my $connector=connector::new(
login => defined($login_name)?$login_name:"",
peer => $value,
taktuk => $actual_command." -L '$value'",
arguments => $message_args,
position => $position_prefix.$current_position,
group => $scheduler::current_group,
gateway => $is_gateway,
propagate => $self_propagate,
files => $connector_send_files,
timeout => $connector_timeout);
diagnostic::debug("New external connector command to $value:".
" $connector->{line}");
scheduler::add_connector($connector);
}
$carry = increment_host_vector($index, $host_spec);
}
while (not $carry);
}
}
sub print($) {
my $message=shift;
TakTuk::syswrite(\*STDOUT,$message) or diagnostic::system;
}
sub print_tree($$) {
my $prefix = shift;
my $data = shift;
communicator::get_root->output('info',$prefix.(shift @$data)."\n");
foreach my $subtree (@$data) {
print_tree($prefix." ",$subtree);
}
}
sub decode_set($$) {
my $string = shift;
my $upper_bound = shift;
my @result = ();
my $interval;
my $i;
($interval,$string) = split /\//,$string,2;
while (length($interval)) {
my ($min, $plus, $max) = ($interval =~ m/^(\d+)(?:(\+)|-(\d+))?$/);
my @replacement = ();
my $state = 0;
my $length = 0;
return () if not defined($min);
$i = $#result;
if (not defined($max)) {
if (defined($plus)) {
$max = $upper_bound;
} else {
$max = $min;
}
}
return () if ($min !~ /^\d+$/o) or ($max !~ /^\d+$/o) or
($min > $max);
while (($i > -1) and ($max < $result[$i])) {
$i--;
$state = 1 - $state;
}
unshift(@replacement, $max) if not $state;
while (($i > -1) and ($min <= $result[$i])) {
$i--;
$state = 1 - $state;
$length++;
}
unshift(@replacement, $min) if not $state;
$i++;
splice @result, $i, $length, @replacement;
if (defined($string)) {
($interval,$string) = split /\//,$string,2;
} else {
$interval = "";
}
}
for ($i=1; $i<$#result; $i+=2) {
splice @result, $i, 2
while (($i<$#result) and ($result[$i]+1 == $result[$i+1]));
}
return @result;
}
sub encode_set(@) {
my @set = @_;
my $result = "";
while (scalar(@set)) {
my $min = shift @set;
my $max = shift @set;
$result .= "/" if ($result);
if (defined($max)) {
if ($min eq $max) {
$result .= "$min";
} else {
$result .= "$min-$max";
}
} else {
$result .= "$min+";
}
}
return $result;
}
package option;
use strict; use bytes;
our $local_option=0;
our %handler;
our %long_name;
our %short_name;
our %type;
our %transmission_mode;
our $current_value = {};
our @current_config;
our $current_config_outdated = 1;
our @contexts = ( {} );
sub clone($) {
my $scalar = shift;
my $ref = ref($scalar);
my $return_value;
if ($ref eq 'HASH') {
$return_value = {};
foreach my $key (keys(%$scalar)) {
$return_value->{$key} = clone($scalar->{$key});
}
} else {
$return_value = $scalar;
}
return $return_value;
}
sub register($$$$@) {
my $short = shift;
my $long = shift;
my $opt_type = shift;
my $opt_handler = shift;
$long_name{$short} = $long;
$short_name{$long} = $short;
$handler{$short} = $opt_handler;
$type{$short} = $opt_type;
if (scalar(@_)) {
if (ref($opt_handler) eq 'CODE') {
&$opt_handler(shift);
} else {
$$opt_handler = shift;
$contexts[0]->{$short} = clone($$opt_handler);
}
if (scalar(@_)) {
$transmission_mode{$short} = shift;
}
}
}
sub set($$) {
my $name = shift;
my $value = shift;
my $key = undef;
my $current_type;
if ($type{$name} =~ m/^h(.)$/) {
$current_type = $1;
if ($value =~ m/^([^=]+)(?:=(.+))?$/) {
$key = $1;
$value = $2;
} else {
diagnostic::warning("Option $name needs a well formed key=value ".
"pair ($value doesn't match)");
$value = "";
}
} else {
$current_type = $type{$name};
}
if (($current_type eq "i") and defined($value) and ($value !~ /^-?\d+$/o)) {
diagnostic::warning("Option $name needs a numeric argument ".
"($value doesn't match)");
} elsif (($current_type eq "f") and defined($value) and ($value !~
/^(?:\d+(?:\.\d*)?|\.\d+)$/o)) {
diagnostic::warning("Option $name needs a floating point argument ".
"($value doesn't match)");
} else {
my $destination = $handler{$name};
if (defined($key)) {
if (ref($destination) eq 'CODE') {
diagnostic::error("NOT EXPECTED: Found option $name with ".
"code handler and hash value (key=$key)");
&$destination($key, $value);
} else {
diagnostic::debug("Found option $name with scalar ".
"handler and hash value (key=$key)");
$$destination->{$key} = $value;
}
} else {
if (ref($destination) eq 'CODE') {
diagnostic::debug("Found option $name with code handler and ".
"scalar value ". $value);
&$destination($value);
} else {
diagnostic::debug("Found option $name with scalar ".
"handler and scalar value ". $value);
$$destination = $value;
}
}
if (not $local_option) {
if (exists($transmission_mode{$name})) {
if ($key) {
$current_value->{$name} = {}
if not exists $current_value->{$name};
$current_value->{$name}->{$key} = $value;
} else {
$current_value->{$name}=$value;
}
$current_config_outdated = 1;
}
} else {
$local_option = 0;
}
}
}
sub unset($) {
my $name = shift;
my $ref = $handler{$name};
$current_config_outdated = 1;
if (exists($contexts[$#contexts]->{$name}) and ($#contexts > 0)) {
$current_value->{$name} = clone($contexts[$#contexts]->{$name});
$$ref = clone($current_value->{$name});
} else {
delete $current_value->{$name};
$$ref = clone($contexts[0]->{$name});
}
}
sub get_config() {
if ($current_config_outdated) {
my $command_args = "";
my $message_args = "";
foreach my $key (keys(%$current_value)) {
if (exists($transmission_mode{$key})) {
if ($transmission_mode{$key} == COMMAND_LINE) {
if (ref($current_value->{$key}) eq 'HASH') {
foreach my $field (keys(%{$current_value->{$key}})) {
$command_args.=" -$key$field";
$command_args.="=$current_value->{$key}->{$field}"
if defined($current_value->{$key}->{$field});
}
} else {
$command_args.=" -$key";
$command_args.="$current_value->{$key}"
if ($type{$key});
}
} elsif ($transmission_mode{$key} == MESSAGE) {
if (ref($current_value->{$key}) eq 'HASH') {
foreach my $field (keys(%{$current_value->{$key}})) {
$message_args.=TakTuk::pack("-$key");
$message_args.=TakTuk::pack("$field".
(defined($current_value->{$key}->{$field})?
"=$current_value->{$key}->{$field}":""));
}
} else {
$message_args.=TakTuk::pack("-$key");
$message_args.=TakTuk::pack($current_value->{$key})
if ($type{$key});
}
} else {
diagnostic::warning("Internal bug in get_config");
}
}
}
diagnostic::debug("Config : $command_args | ".$message_args."\n");
@current_config = ($command_args, $message_args);
$current_config_outdated = 0;
}
return @current_config;
}
sub push_context() {
my $save = {};
$save = clone($current_value);
push @contexts, $save;
}
sub pop_context() {
my $restored_context = pop @contexts;
foreach my $key (keys(%{$contexts[0]})) {
if (exists($restored_context->{$key})) {
apply_option($key, $restored_context->{$key});
} else {
if (exists($current_value->{$key})) {
apply_option($key, $contexts[0]->{$key});
}
}
}
$current_value = $restored_context;
}
sub apply_option($$) {
my $key = shift;
my $value = shift;
my $ref = $handler{$key};
if (ref($value) eq 'HASH') {
$$ref = clone($value);
} else {
set($key, $value);
}
}
package TakTuk::Select;
use strict; use bytes;
use constant EDESIS=>1;
use constant EDESNF=>2;
use constant EDESNS=>3;
our @select_errors = (
"Descriptor already in set",
"Didn't found descriptor to remove",
"Descriptor not in set");
sub error_msg($) {
my $error = shift;
$error--;
if ($error <= $#select_errors) {
return $select_errors[$error];
} else {
return "Unknown error";
}
}
sub new () {
my $class = shift;
my $data = { vector=>'', handles=>[] };
bless $data, $class;
return $data;
}
sub add ($) {
my $self = shift;
my $handle = shift;
if (vec($self->{vector}, fileno($handle), 1) == 0) {
vec($self->{vector}, fileno($handle), 1) = 1;
push @{$self->{handles}}, $handle;
return 0;
} else {
return EDESIS;
}
}
sub remove ($) {
my $self = shift;
my $handle = shift;
my $i=0;
my $handles_list = $self->{handles};
if (vec($self->{vector}, fileno($handle), 1) == 1) {
vec($self->{vector}, fileno($handle), 1) = 0;
while (($i <= $#$handles_list) and ($handles_list->[$i] != $handle)) {
$i++;
}
if ($i <= $#$handles_list) {
splice @$handles_list, $i, 1;
return 0;
} else {
return EDESNF;
}
} else {
return EDESNS;
}
}
sub select ($$$$) {
my $read_select = shift;
my $write_select = shift;
my $except_select = shift;
my $timeout = shift;
my $rin = defined($read_select)?$read_select->{vector}:undef;
my $win = defined($write_select)?$write_select->{vector}:undef;
my $ein = defined($except_select)?$except_select->{vector}:undef;
my $rout;
my $wout;
my $eout;
my ($nfound, $timeleft) =
CORE::select($rout=$rin, $wout=$win, $eout=$ein, $timeout);
if ($nfound == -1) {
return ();
} elsif ($nfound == 0) {
return ([],[],[]);
} else {
my $read_set = [];
my $write_set = [];
my $except_set = [];
my $handle;
foreach $handle (@{$read_select->{handles}}) {
push(@$read_set, $handle) if (vec($rout,fileno($handle),1) == 1);
}
foreach $handle (@{$write_select->{handles}}) {
push(@$write_set, $handle) if (vec($wout,fileno($handle),1) == 1);
}
foreach $handle (@{$except_select->{handles}}) {
push(@$except_set, $handle) if (vec($eout,fileno($handle),1) == 1);
}
return ($read_set, $write_set, $except_set);
}
}
sub handles () {
my $self=shift;
return @{$self->{handles}};
}
package arguments;
use strict; use bytes;
our $has_readline = eval("use Term::ReadLine;1")?1:0;
our $terminal = undef;
our $command_separator;
our $option_separator;
our $escape;
use constant NONE => 0;
use constant BUFFER => 1;
use constant LINE => 2;
use constant ARGUMENT => 3;
our $arguments_ended = 1;
our $again=0;
our @arguments = ();
our $current_argument = undef;
our $current_command = undef;
our $current_command_separators = undef;
our $current_stuff = undef;
our $current_separators = undef;
our $current_separators_prefix = undef;
our $is_first_argument = 0;
our $unescaped_stuff = undef;
our $options_ended=0;
sub initialize_terminal() {
if ($has_readline) {
$terminal = Term::ReadLine->new("TakTuk", \*STDIN, \*STDOUT);
}
}
sub fetch_arguments(@) {
if (scalar(@_)) {
unshift @arguments, @_;
$arguments_ended = 0;
$options_ended = 0;
}
}
sub fetch_line($) {
my $line = shift;
unshift @arguments, \$line;
$arguments_ended = 0;
$options_ended = 0;
}
sub restore_last_argument() {
if ($again) {
diagnostic::warning("Cannot restore an ungeted argument");
} else {
$again = 1;
}
}
sub restore_last_command() {
if ($current_stuff) {
$current_stuff = $current_command.$current_command_separators.
$current_stuff;
} else {
$current_stuff = $current_command;
$current_separators = $current_command_separators.$current_separators;
}
restore_last_argument if not $again;
}
sub get_next_command() {
get_next_stuff($option_separator.$command_separator);
if ($current_stuff =~
m/^(.*?)([$option_separator.$command_separator]+)(.*)$/g) {
$current_command = $1;
$current_command_separators = $2;
$current_stuff = $3;
restore_last_argument if length($current_stuff);
} else {
$current_command = $current_stuff;
$current_command_separators = $current_separators;
$current_stuff = "";
$current_separators = "";
}
if ($current_command_separators =~ m/[$command_separator]/) {
$current_stuff = $current_command_separators.$current_stuff;
$current_command_separators = "";
restore_last_argument if not $again;
}
diagnostic::debug("Command found : [$current_command]");
return $current_command;
}
sub get_next_argument() {
get_next_stuff($option_separator);
$current_argument = $current_stuff;
diagnostic::debug("Argument found : [$current_argument]");
return $current_stuff;
}
our %buffer;
our %end;
sub read_buffer_data($) {
my $argument = shift;
my $old_pos = undef;
my $new_data;
my $result;
if (defined($terminal) and ($argument == \*STDIN)) {
$new_data = $terminal->readline("");
if (defined($new_data)) {
$new_data .= "\n";
$result = length($new_data);
} else {
$result = 0;
}
} else {
$result = sysread($argument,$new_data,$TakTuk::read_size);
}
$old_pos = pos($buffer{$argument});
(diagnostic::system and $result = 0) if not defined($result);
if ($result) {
$buffer{$argument} .= $new_data if $result;
pos($buffer{$argument}) = $old_pos;
} else {
$end{$argument} = 1;
CORE::close $argument if ($argument != \*STDIN);
}
return $result;
}
sub delete_buffer($) {
my $argument = shift;
diagnostic::warning("Bug") if not exists($end{$argument});
delete $buffer{$argument};
delete $end{$argument};
}
sub get_data_buffer() {
my $data = undef;
my $data_type = NONE;
my $args_remaining = 1;
$args_remaining = 0 if not scalar(@arguments);
while (not defined($data) and $args_remaining) {
my $argument = $arguments[0];
if (ref($argument) eq 'GLOB') {
if ($end{$argument}) {
delete_buffer($argument);
shift @arguments;
$args_remaining = 0 if not scalar(@arguments);
} else {
$buffer{$argument} = "" if not exists($buffer{$argument});
read_buffer_data($argument) if not length($buffer{$argument});
if (length($buffer{$argument})) {
$data = \$buffer{$argument};
$data_type = BUFFER;
}
}
} elsif (ref($argument) eq 'SCALAR') {
if (length($$argument)) {
$data = $argument;
$data_type = LINE;
} else {
shift @arguments;
$args_remaining = 0 if not scalar(@arguments);
}
} else {
$data = \$arguments[0];
$data_type = ARGUMENT;
}
}
return ($data, $data_type);
}
sub get_next_stuff($) {
my $separator = shift;
my $done = 0;
my $data = 1;
my $data_type;
if ($again) {
$again = 0;
} else {
$current_stuff = "";
$current_separators = "";
$current_separators_prefix = "";
my $data_prefix = "";
while (not $done and defined($data)) {
($data, $data_type) = get_data_buffer;
if (defined($data)) {
if ($data_type == ARGUMENT) {
diagnostic::debug("Got argument");
$current_stuff = $$data;
shift @arguments;
$is_first_argument = 1;
$done = 1;
} else {
diagnostic::debug("Got buffer");
$$data = $data_prefix.$$data if $data_prefix;
$data_prefix = "";
my $regexp;
my $escape_found = 1;
if (defined($escape)) {
$regexp = qr/([$separator$escape])/;
} else {
$regexp = qr/([$separator])/;
}
while ($escape_found) {
$escape_found = 0;
if ($$data =~ m/$regexp/g) {
my $character = $1;
if (defined($escape) and ($character eq $escape)) {
if (pos($$data) >= length($$data)) {
$data_prefix =
substr $$data,length($$data)-1,1;
$current_stuff .=
substr $$data,0,length($$data)-1;
$$data = "";
} else {
$escape_found = 1;
pos($$data) = pos($$data) + 1;
}
} else {
my $position = pos($$data);
$current_separators = $character;
if ($$data =~ m/\G([$separator]+)/g) {
$current_separators .= $1;
$position = defined(pos($$data))?
pos($$data):length($$data);
} else {
pos($$data) = $position;
}
$current_stuff .= substr $$data, 0,
$position-length($current_separators);
$$data = substr $$data, $position;
if (length($current_stuff)) {
$is_first_argument = 0;
$done = 1;
} else {
$current_separators_prefix .=
$current_separators;
}
}
} else {
$current_stuff .= $$data;
$$data = "";
$done = 1 if $data_type == LINE;
}
}
}
}
}
$unescaped_stuff = $current_stuff;
if (defined($escape)) {
$current_stuff =~ s/$escape(.)/$1/g;
}
$arguments_ended = 1 if not length($current_stuff);
}
}
sub get_parameters() {
my $left_brace = undef;
my $right_brace;
my $parameters = "";
my $end_expression;
get_next_stuff($option_separator);
if (length($current_stuff) != 1) {
if (length($current_stuff)) {
diagnostic::error("TakTuk has changed: now commands parameters ".
"should be separated from braces in $current_stuff");
} else {
diagnostic::error("Missing brace in parameters");
}
return undef;
}
$left_brace = $current_stuff;
$right_brace = $left_brace;
if ($left_brace =~ m/[({[]/) {
$right_brace =~ tr/({[/)}]/;
}
my $separator = $option_separator.$command_separator;
my $last_separators = "";
my $done = 0;
get_next_stuff($separator);
$current_separators_prefix =~ s/^[^$command_separator]+//;
while (not $done) {
if ($arguments_ended) {
diagnostic::error("Missing closing brace or separator before ".
"closing brace in $parameters");
return undef;
} elsif (length($parameters) and ($unescaped_stuff eq $right_brace)) {
my $remaining = $last_separators.$current_separators_prefix;
$remaining =~ s/[^$command_separator]+$//;
$parameters .= $remaining;
if ($current_separators =~ /[$command_separator]/) {
$current_stuff = $current_separators;
$current_separators = "";
restore_last_argument;
}
$done = 1;
} elsif (length($parameters) and $is_first_argument and
($unescaped_stuff =~ s/^$right_brace([$separator].*)$/$1/)) {
$current_stuff = substr $current_stuff,1;
restore_last_argument if ($current_stuff =~ /[^$option_separator]/);
$done = 1;
} else {
$parameters .= $last_separators.$current_separators_prefix.
$current_stuff;
$last_separators = $current_separators;
$last_separators = " " if not length($last_separators);
get_next_stuff($separator);
}
}
diagnostic::debug("Parameters found : [$parameters]");
return $parameters;
}
sub skip_command_separator() {
my $done = 0;
my $data;
my $data_type = NONE;
if ($again) {
$data = \$current_stuff;
} else {
($data, $data_type) = get_data_buffer;
}
while (not $done and not $arguments_ended) {
if (defined($data)) {
if ($$data =~ s/^[$option_separator]*[$command_separator]+[$option_separator]*//) {
$done = 1;
shift @arguments if (not length($$data) and
($data_type == ARGUMENT));
} else {
if ($$data =~ m/^[$option_separator]*$/) {
$$data = "";
shift @arguments if ($data_type == ARGUMENT);
($data, $data_type) = get_data_buffer;
} else {
diagnostic::warning("Missing command separator");
$done = 1;
}
}
} else {
$arguments_ended = 1;
}
}
$again = 0 if not length($current_stuff);
}
sub terminate_arguments_parsing() {
while (scalar(@arguments)) {
my $argument = shift @arguments;
CORE::close $argument if ref($argument) eq 'GLOB';
}
$arguments_ended = 1;
}
sub args_file($) {
my $file = shift;
my $file_handle = undef;
if (not ($file_handle = general::open_file($file))) {
$main::terminate = 1;
} else {
fetch_arguments($file_handle);
}
}
sub dont_self_propagate() {
if ($general::self_propagate) {
option::set("s", 0);
}
}
sub machines_file($) {
my $file = shift;
my $file_handle = undef;
if (not ($file_handle = general::open_file($file))) {
$main::terminate = 1;
} else {
while (my $line = <$file_handle>) {
my ($peer_name, $comment) = split /\s+/,$line,2;
general::add_peer($peer_name, 0) if $peer_name;
}
CORE::close $file_handle if $file_handle != \*STDIN;
}
}
my $is_gateway = 0;
sub machine($) {
my $peer = shift;
my @peer_arguments=();
get_next_argument;
if ($current_argument eq "-[") {
my $count = 1;
while ($count and !$arguments_ended) {
get_next_argument;
$count-- if ($current_argument eq "-]");
$count++ if ($current_argument eq "-[");
push @peer_arguments, $current_argument;
}
if ($count) {
diagnostic::error("Invalid -[ and -] imbrication");
exit(1);
} else {
pop @peer_arguments;
}
} else {
restore_last_argument;
}
general::add_peer($peer, $is_gateway, @peer_arguments);
$is_gateway = 0;
}
sub gateway($) {
$is_gateway = 1;
machine(shift);
}
sub localhost($) {
my $hostname = shift;
$general::host = $hostname;
$ENV{TAKTUK_HOSTNAME} = $hostname;
}
sub not_root() {
if ($general::root) {
my $connector = connector::new(
write =>\*STDIN,
read =>\*STDOUT,
type =>settings::FD);
$connector->{remove_handler} = \&communicator::remove_source;
communicator::add_connector($connector, 'sources');
$general::root = 0;
}
}
sub print_defaults() {
foreach my $key (keys(%long_name)) {
if (not (ref($handler{$key}) eq 'CODE')) {
my $name = $long_name{$key};
if ($type{$key} =~ m/^h.$/) {
foreach my $field (keys(%${$handler{$key}})) {
my $line = uc("TAKTUK_".$name."_".$field);
my $value = defined(${$handler{$key}}->{$field})?
${$handler{$key}}->{$field}:"";
$line =~ tr/-/_/;
communicator::get_root->output('info',
$line."=".$value."\n");
}
} else {
my $line = uc("TAKTUK_".$name);
my $value = defined(${$handler{$key}})?
${$handler{$key}}:"";
$line =~ tr/-/_/;
communicator::get_root->output('info', $line."=".$value."\n");
}
}
}
$main::terminate = 1;
} 
sub print_package($) {
my $package_name = shift;
general::load_taktuk_package($package_name);
communicator::get_root->output('info',$general::taktuk_package);
$main::terminate = 1;
}
sub print_version() {
communicator::get_root->output('info',
"TakTuk version $TakTuk::VERSION release $TakTuk::RELEASE\n");
}
sub print_version_quit() {
print_version;
$main::terminate = 1;
}
sub print_k() {
my $source="A-Za-z0-9 ,.;:?!";
my $dest="M-Za-z0-9 ,.;:?!A-L";
my $line = <<END;
XmFo0zru563qFp'03mzsq4Fp6FP3Fe530zsFJ
Uzs3qpuqz54FJFDwsFp'03mzsq4GF:wsFpqF46o3qFqzF106p3q
Ym5q3uqxFJF6zqFnm44uzqFqzFo6u73qFym44urF(oqxxqFpqF7053qFs3mzp-yq3q)
Yqxmzsq,FA..sFpqF46o3qFmF6zF1q6Fp'qm6Fotm6pqFpqFymzuq3qFmFxqFpu4406p3q
505mxqyqz5HFRmu5q4Fom3myqxu4q3FxqF4u301F3q46x5mz5FmFrq6F7urHFgzqFr0u4FxmFn0zzq
5quz5qF0n5qz6qGF45011q,FxmFom3myqxu4m5u0zFqzF Fmv065mz5F6zF1q6Fp'qm6
n06uxxmz5qHFb3qxq7q,Fxq4F,q45q4FpqF;GBwsFpq4F03mzsq4Fq5F3q4q37q,Fxq4HFbqxq,F( 
o0y13u4FxmF1m35uqFnxmzotq)F5065q4Fxq4F03mzsq4GFo061q,Fxq4FqzF1q5u54Fy03oqm69Fq5
mv065q,Fxq4Fm6Fom3myqxHFO0y1xq5q,FxqFyqxmzsqFm7qoFxqF3q45qFp6F46o3qFq5Fxq4
,q45q4Fruzqyqz5Fqyuzoq4HFb035q,FmFqn6xxu5u0zF16u4Frmu5q4Fyuv05q3FBFmFDtFmFrq6
53q4Fp069GFv6426'mF0n5qzu3F6zqFo0z4u45mzoqF4u361q64qHFUxFzqF3q45qF1x64F26'm
qy105q3F(m55qz5u0zFmFxmFo0z4u45mzoqFJFxqFyqxmzsqFq1mu44u5FqzF3qr30upu44mz5)H
END
eval "\$line =~ tr/$dest/$source/";
communicator::get_root->output('info', $line);
$main::terminate = 1;
}
sub init() {
option::register("B","worksteal-behavior","hs", \$scheduler::worksteal, {
initial => 1,
growth => '$last_given * 2',
limit => '$available / 2'
}, option::MESSAGE);
option::register("C","command-separator","s",
\$arguments::command_separator,',;\n',option::MESSAGE);
option::register("D","debug","hi",\$diagnostic::package_level, { default=>2 },option::COMMAND_LINE);
option::register("E","escape-character","s",\$arguments::escape,undef,
option::MESSAGE);
option::register("F","args-file","s",\&args_file);
option::register("G","gateway","s",\&gateway);
option::register("I","perl-interpreter","s",\$general::perl_interpreter,
"perl",option::MESSAGE);
option::register("L","localhost","s",\&localhost);
option::register("M","my","",\$option::local_option);
option::register("O","option-separator","s",
\$arguments::option_separator,'\s',option::MESSAGE);
option::register("P","print-defaults","",\&print_defaults);
option::register("R","output-redirect","hs",\$general::redirect, { "default" => ">&=1", "taktuk" => ">&=2" },
option::MESSAGE);
option::register("S","send-files","s",\$general::connector_send_files,
"", option::MESSAGE);
option::register("T","taktuk-command","s",\$general::taktuk_command,
$0,option::MESSAGE);
option::register("V","path-value","s",\$general::path_value, undef,
option::MESSAGE);
option::register("W","window-adaptation","i",\$scheduler::window_adaptation,
0,option::MESSAGE);
option::register("b","begin-group","",\&begin_group);
option::register("c","connector","s",\$general::connector_command,
"ssh -o StrictHostKeyChecking=no -o BatchMode=yes", 
option::MESSAGE);
option::register("d","dynamic","i",\&dynamic,0,option::MESSAGE);
option::register("e","end-group","",\&end_group);
option::register("f","machines-file","s",\&machines_file);
option::register("g","time-granularity","f",\$communicator::select_timeout,
1, option::MESSAGE);
option::register("h","help","",\&general::print_help_quit);
option::register("i","interactive","",\$main::forced_interactive);
option::register("k","cook","",\&arguments::print_k);
option::register("l","login","s",\$general::login_name, undef,
option::MESSAGE);
option::register("m","machine","s",\&machine);
option::register("n","no-numbering","",\$main::no_numbering,0,
option::COMMAND_LINE);
option::register("o","output-template","hs",\$general::template, {
default => '"$host-$rank: $command ($pid): $type > $line\n"',
connector => '"$host: $peer ($pid): $type > $line\n"',
info => '"$line$eol"',
state => '($line == TakTuk::CONNECTION_FAILED)?'.
'"$host: $peer ($pid): $type > ".'.
'event_msg($line)."\n":undef',
status => '"$host-$rank: $command ($pid): $type > ".
"Exited with status $line\n"',
taktuk => '"[ TAKTUK $level_name ] $host (PID $pid) Line ".
"$line_number ($package) Release $release\n$line\n"',
message => '"$host-$rank: message from $from: $line\n"'
}, option::MESSAGE);
option::register("p","print-package","s",\&print_package);
option::register("r","not-root","",\&not_root);
option::register("s","self-propagate","",\$general::self_propagate,
0,option::MESSAGE);
option::register("t","timeout","f",\$general::connector_timeout,0,
option::MESSAGE);
option::register("u","cache-limit","i",\$communicator::cache_limit,
-1, option::MESSAGE);
option::register("v","version","",\&print_version_quit);
option::register("w","window","i",\$scheduler::window, 10, option::MESSAGE);
option::register("z","dont-self-propagate","",\&dont_self_propagate);
foreach my $full_name (keys(%ENV)) {
my $variable;
if ($full_name =~ m/^TAKTUK_(MY_)?(.+)$/) {
$option::local_option = $1?1:0;
$variable = $2;
} else {
undef($variable);
}
if (defined($variable)) {
my $prefix;
my $qualifier;
$variable = lc($variable);
$variable =~ m/^(.*?)(?:_([^_]+))?$/;
($prefix, $qualifier) = ($1, $2);
$prefix =~ tr/_/-/;
if (defined($qualifier)) {
if (exists($short_name{$prefix})) {
my $short = $short_name{$prefix};
option::set($short, "$qualifier=$ENV{$full_name}");
} else {
$prefix .= "-$qualifier";
$qualifier = undef;
}
}
if (not defined($qualifier)) {
if (exists($short_name{$prefix})) {
my $short = $short_name{$prefix};
option::set($short, $ENV{$full_name});
} else {
diagnostic::warning("Unknown setting $full_name");
}
}
}
$option::local_option = 0;
}
}
sub parse_options() {
get_next_argument();
while (not $options_ended) {
if ($current_argument =~ s/^-//o) {
my @names;
if ($current_argument =~ s/^-(.*)$//o) {
my $name = $1;
if ($name) {
my $short = $short_name{$name};
if (not defined($short)) {
diagnostic::warning("Unknown long option $name");
general::print_help_quit if ($general::root);
} else {
$current_argument = $short;
}
} else {
$options_ended = 1;
}
}
while ($current_argument) {
$current_argument =~ s/^(.)//o;
my $name = $1;
if (exists($type{$name})) {
if ($type{$name}) {
get_next_argument if not length $current_argument;
if ($arguments_ended) {
diagnostic::error("Missing argument")
} else {
option::set($name, $current_argument);
}
$current_argument = "";
} else {
option::set($name, 1);
}
} else {
diagnostic::warning("Unknown short option $name");
general::print_help_quit if ($general::root);
}
} 
get_next_argument;
} else {
if (($current_argument =~ m/^\s*$/) and not $arguments_ended) {
get_next_argument;
} else {
$options_ended=1;
restore_last_argument if ($current_argument !~ m/^\s*$/);
}
}
}
check_separators_integrity();
}
sub check_separators_integrity() {
diagnostic::warning("Options and command separators intersect")
if ($option_separator =~ m/[$command_separator]/) or
($command_separator =~ m/[$option_separator]/);
my $chaine =
"-/0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ";
diagnostic::warning("Options or command separators contain one of :".
"-/1-9a-zA-Z")
if ($chaine =~ m/[$command_separator]/) or
($chaine =~ m/[$option_separator]/);
}
sub begin_group() {
scheduler::begin_group;
option::push_context;
}
sub end_group() {
option::pop_context;
scheduler::end_group;
}
sub dynamic($) {
scheduler::set_limit(shift);
}
package diagnostic;
use strict; use bytes;
our $package_level;
our $depth = 0;
our $level="";
our $level_name="";
our $package="";
our $filename="";
our $line_number="";
our $special_enabled=0;
sub print_info($$$) {
$level = shift;
$level_name = shift;
($package, $filename, $line_number) = caller(1);
my $level_ok = defined($package_level->{$package})?
$package_level->{$package}<=$level:
$package_level->{default}<=$level;
$depth++;
if ($level_ok and ($depth < 3)) {
communicator::get_root->output('taktuk',shift);
}
$depth--;
}
sub system () {
print_info(3,"ERROR : SYSTEM", "Error $!");
}
sub debug ($) {
print_info(1,"DEBUG", shift);
}
sub error ($) {
print_info(3,"ERROR", shift);
}
sub warning ($) {
print_info(2,"WARNING", shift);
}
sub special ($) {
print_info(3, "SPECIAL", shift) if $special_enabled;
}
package TakTuk;
use strict; use bytes;
our %buffer;
sub no_flush($) {
my $new_fd = shift;
binmode($new_fd);
my $old_fd=select($new_fd);
$|=1;
select($old_fd);
}
sub unpack($) {
my $buffer = shift;
if (length($buffer) >= 4) {
my $size;
($size) = CORE::unpack("N",$buffer);
if (length($buffer) >= $size+4) {
return (substr($buffer, 4, $size), substr($buffer, $size+4));
} else {
return (undef, $buffer);
}
} else {
return (undef, $buffer);
}
}
sub pack($) {
my $full_message = shift;
my $size = length($full_message);
return CORE::pack("N",$size).$full_message;
}
sub decode($) {
my $message = shift;
my $message_code = substr($message, 0, 1);
my $body = substr($message, 1);
return ($message_code, $body);
}
sub encode($$) {
my $message = shift;
my $body = shift;
return ($message).($body);
}
sub syswrite ($$) {
my $unrecoverable = 0;
my $write_fd = shift;
my $full_message = shift;
my $result;
my $total_expected = length($full_message);
my $call_expected = $write_size;
my $offset = 0;
while ($total_expected and not $unrecoverable) {
$call_expected = $total_expected if $call_expected > $total_expected;
$result =
CORE::syswrite($write_fd, $full_message, $call_expected, $offset);
if ($result) {
$total_expected -= $result;
$offset += $result;
} else {
if ($!{EAGAIN}) {
} else {
print STDERR "Unrecoverable write error\n";
$unrecoverable = 1;
}
}
}
if ($unrecoverable) {
return undef;
} else {
return 1;
}
}
sub read_data ($) {
my $descriptor = shift;
my $new_data;
my $result = sysread($descriptor, $new_data, $read_size);
return undef if not defined($result);
if ($result and exists($buffer{$descriptor})) {
$buffer{$descriptor} .= $new_data;
} else {
$buffer{$descriptor} = $new_data;
}
return $result;
}
sub get_message ($) {
my $descriptor = shift;
if (exists($buffer{$descriptor})) {
my ($message, $new_buffer) = TakTuk::unpack($buffer{$descriptor});
if (defined($new_buffer)) {
$buffer{$descriptor} = $new_buffer;
} else {
delete($buffer{$descriptor});
}
if (defined($message)) {
return $message;
} else {
return "";
}
} else {
return "";
}
}
sub find_sequence($$) {
my $descriptor = shift;
my $sequence = shift;
my $found = undef;
if (exists($buffer{$descriptor})) {
my $position;
$position = index($buffer{$descriptor},"\n");
while (($position >= 0) and not defined($found)) {
my $string;
$string = substr($buffer{$descriptor}, 0, $position);
$buffer{$descriptor} = substr($buffer{$descriptor}, $position+1);
if ($string =~ m/($sequence)/) {
$found = $1;
} else {
$position = index($buffer{$descriptor},"\n");
}
}
}
return defined($found)?$found:"";
}
sub flush_buffer($) {
my $descriptor = shift;
if (exists($buffer{$descriptor})) {
my $result = $buffer{$descriptor};
delete($buffer{$descriptor});
return $result;
} else {
return "";
}
}
our $control_channel_read;
our $control_channel_write;
if ($ENV{TAKTUK_CONTROL_READ}) {
open($control_channel_read, "<&=". $ENV{TAKTUK_CONTROL_READ})
or print("Error opening taktuk control channel : $!\n");
binmode($control_channel_read);
}
if ($ENV{TAKTUK_CONTROL_WRITE}) {
open($control_channel_write, ">&=". $ENV{TAKTUK_CONTROL_WRITE})
or print("Error opening taktuk control channel : $!\n");
no_flush($control_channel_write);
}
use constant ESWRIT=>1;
use constant EFCLSD=>2;
use constant ESREAD=>3;
use constant EARGTO=>4;
use constant EARGBD=>5;
use constant ETMOUT=>6;
use constant EINVST=>7;
use constant EINVAL=>8;
use constant ENOERR=>9;
our @taktuk_errors = (
'"TakTuk::syswrite failed, system message : $!"',
'"TakTuk engine closed the communication channel"',
'"sysread error, system message : $!"',
'"field \"to\" not defined"',
'"field \"body\" not defined"',
'"timeouted"',
'"invalid destination set specification"',
'"invalid field required in get"',
'"no error"'
);
sub error_msg($) {
my $error = shift;
$error--;
if ($error <= $#taktuk_errors) {
return eval($taktuk_errors[$error]);
} else {
return "Unknown error";
}
}
sub send(%) {
my %argument = @_;
my $from = $ENV{TAKTUK_RANK};
my $target;
if (not exists($argument{to})) {
$error=EARGTO;
return undef;
}
my $to = $argument{to};
if (not exists($argument{body})) {
$error=EARGBD;
return undef;
}
my $body = $argument{body};
if (not exists($argument{target})) {
$target = "any";
} else {
$target = $argument{target};
}
my $full_message = TakTuk::encode($TakTuk::send_to,
TakTuk::pack($to).
TakTuk::encode($TakTuk::message,
TakTuk::pack($target).
TakTuk::pack($from).
$body));
my $result = TakTuk::syswrite($control_channel_write,
TakTuk::pack($full_message));
$error=ESWRIT if not $result;
return $result?$result:undef;
}
sub recv(%) {
my %argument = @_;
my $result;
my $message;
if (exists($argument{timeout})) {
$message = TakTuk::encode($TakTuk::wait_message, $argument{timeout});
} else {
$message = $TakTuk::wait_message;
}
$result = TakTuk::syswrite($control_channel_write,TakTuk::pack($message));
if (not $result) {
$error=ESWRIT;
return ();
}
my $message_code;
($message_code,$message) = wait_message($TakTuk::timeout,$TakTuk::message);
if (defined($message_code)) {
my $from;
if ($message_code eq $TakTuk::timeout) {
$error=ETMOUT;
return ();
}
($from, $message) = TakTuk::unpack($message);
return ($from, $message);
} else {
return ();
}
}
our @messages;
sub wait_message(@) {
my @codes = @_;
my ($code, $body);
my $result = 1;
my $message;
for (my $i=0; $i<$#messages; $i+=2) {
foreach my $message_code (@codes) {
if ($messages[$i] eq $message_code) {
($code, $body) = ($messages[$i], $messages[$i+1]);
splice @messages, $i, 2;
return ($code, $body);
}
}
}
while ($result) {
$message = get_message($control_channel_read);
while ($message) {
($code, $body) = TakTuk::decode($message);
foreach my $message_code (@codes) {
return ($code, $body) if ($message_code eq $code);
}
push @messages, $code, $body;
$message = get_message($control_channel_read);
}
$result = read_data($control_channel_read);
}
if (defined($result)) {
$error=EFCLSD;
} else {
$error=ESREAD;
}
return ();
}
sub get($) {
my $result;
my $message;
$message = TakTuk::encode($TakTuk::get_info, shift);
$result = TakTuk::syswrite($control_channel_write,TakTuk::pack($message));
if (not $result) {
$error=ESWRIT;
return -1;
}
my $message_code;
($message_code,$message) = wait_message($TakTuk::info,$TakTuk::invalid);
if (defined($message_code)) {
if ($message_code eq $TakTuk::invalid) {
$error=EINVAL;
return -1;
} else {
$error=ENOERR;
}
return $message;
} else {
return -1;
}
}
package command;
use strict; use bytes;
use Fcntl;
sub new (%) {
my $data = { @_ };
if (exists($data->{line}) and
(exists($data->{read}) or exists($data->{write}) or
exists($data->{error}) or exists($data->{type}))) {
diagnostic::error("Command built from both a command line and fds");
}
bless($data);
return $data;
}
sub run ($) {
my $self=shift;
my $command=shift;
my ($father_read,$father_write,$father_error);
my ($child_read,$child_write,$child_error);
undef($father_read);
undef($father_write);
undef($child_read);
undef($child_write);
if (settings::USE_SOCKETPAIR) {
if (not socketpair($father_read, $child_read, AF_UNIX, SOCK_STREAM,
PF_UNSPEC)) {
diagnostic::system;
return 0;
}
$father_write = $father_read;
$child_write = $child_read;
$self->{type} = settings::SOCKET;
} else {
if (not pipe($child_read, $father_read)) {
diagnostic::system;
return 0;
}
if (not pipe($father_write, $child_write)) {
diagnostic::system;
return 0;
}
$self->{type} = settings::FD;
}
undef($father_error);
undef($child_error);
if (not pipe($father_error,$child_error)) {
diagnostic::system;
return 0;
}
my ($taktuk_side, $command_side);
if ($command) {
($taktuk_side, $command_side) = communicator::create_channel;
if (defined($command_side)) {
$taktuk_side->{pending_messages} = 0;
$taktuk_side->{remove_handler} = \&communicator::remove_control;
communicator::add_connector($taktuk_side, 'control');
$self->{control} = $taktuk_side;
} else {
diagnostic::error("Degradated mode (no control channel)");
}
}
$self->{start_date} = timer::current_time;
my $pid;
$pid = fork;
if (not defined($pid)) {
diagnostic::system;
return 0;
}
$self->{pid} = $pid;
$communicator::c_from_pid{$pid} = $self;
if ($pid) {
diagnostic::debug("New child with pid $pid created".
" to execute $self->{line}");
CORE::close($child_read) or diagnostic::system;
(CORE::close($child_write) or diagnostic::system)
if not settings::USE_SOCKETPAIR;
CORE::close($child_error) or diagnostic::system;
$self->{read} = $father_read;
$self->{write} = $father_write;
$self->{error} = $father_error;
return $self;
} else {
setpgrp($$,$$) or diagnostic::system;
if ($command) {
$ENV{TAKTUK_CONTROL_READ} = fileno($command_side->{write});
$ENV{TAKTUK_CONTROL_WRITE} = fileno($command_side->{read});
fcntl($command_side->{read}, F_SETFD, 0) or die $!;
fcntl($command_side->{write}, F_SETFD, 0) or die $!;
}
CORE::close($father_read) or die $!;
(CORE::close($father_write) or die $!)
if not settings::USE_SOCKETPAIR;
CORE::close($father_error) or die $!;
open(STDIN,"<&".fileno($child_read)) or die $!;
open(STDOUT,">&".fileno($child_write)) or die $!;
open(STDERR,">&".fileno($child_error)) or die $!;
CORE::close($child_read) or die $!;
CORE::close($child_error) or die $!;
exec($self->{line}) or die "Exec failed : $!";
}
}
sub cleanup() {
my $self = shift;
foreach my $part ('read','write','error') {
if (exists $self->{$part}) {
$self->close($part);
}
}
}
sub my_shutdown($$) {
my $fd = shift;
my $how = shift;
if (not shutdown($fd,$how)) {
diagnostic::system if not $!{ENOTCONN};
}
}
sub close($) {
my $self=shift;
my $part=shift;
if ($part eq 'error') {
if (exists($self->{error})) {
TakTuk::flush_buffer($self->{error});
CORE::close($self->{error}) or diagnostic::system;
delete $self->{error};
} else {
diagnostic::debug("Cannot close error : does not exists");
}
} else {
if (exists($self->{type}) and exists($self->{$part})) {
if ($self->{type} == settings::FD) {
TakTuk::flush_buffer($self->{write}) if $part eq 'write';
CORE::close($self->{$part}) or diagnostic::system;
delete $self->{$part};
} elsif ($self->{type} == settings::SOCKET) {
if ($part eq 'read') {
my_shutdown($self->{read},1);
delete $self->{read};
} elsif ($part eq 'write') {
TakTuk::flush_buffer($self->{write});
my_shutdown($self->{write},0);
delete $self->{write};
} else {
diagnostic::warning("Invalid part for socket close");
}
} else {
diagnostic::warning("Unknown command type : BUG");
}
} else {
diagnostic::warning("Closing a non existant command type or part")
unless (exists($self->{type}) and ($part eq 'read'));
} 
}
}
sub output($$) {
my $self = shift;
my $type = shift;
my $message = shift;
my $template;
my $fd;
if (exists($general::template->{$type})) {
$template = $general::template->{$type};
} else {
$template = $general::template->{default};
}
if (exists($general::redirect->{$type})) {
$fd = $general::redirect->{$type};
} else {
$fd = $general::redirect->{default};
}
if (defined($template)) {
my $command = exists($self->{line})?$self->{line}:"";
my $count = $general::taktuk{count};
my $filename = $diagnostic::filename;
my $host = $general::host;
my $level = $diagnostic::level;
my $level_name = $diagnostic::level_name;
my $line_number = $diagnostic::line_number;
my $package = $diagnostic::package;
my $peer = exists($self->{peer})?$self->{peer}:"";
my $peer_position = exists($self->{position})?$self->{position}:"";
my $pid = exists($self->{pid})?$self->{pid}:$$;
my $position = $general::position;
my $rank = $general::taktuk{rank};
my $release = $TakTuk::RELEASE;
my $start_date = exists($self->{start_date})?$self->{start_date}:"";
my $stop_date = exists($self->{stop_date})?$self->{stop_date}:"";
my $reply_date = exists($self->{reply_date})?$self->{reply_date}:"";
my $init_date = exists($self->{init_date})?$self->{init_date}:"";
my $from = $handlers::from;
my $line;
my $eol;
my $to_be_sent = "";
pos($message) = 0;
while ($message =~ /\G([^\n]*)(\n|$)/go) {
$line = $1;
$eol = $2;
if (length($line) or length($eol)) {
my $result = eval("no strict 'vars'; $template");
$to_be_sent .= $result if defined($result) and length($result);
}
}
if (length($to_be_sent)) {
communicator::process_message($self,
TakTuk::encode($TakTuk::forward_up,
TakTuk::encode($TakTuk::output,
TakTuk::pack($fd).$to_be_sent)));
}
}
}
our @event_messages = (
"TakTuk is ready",
"TakTuk is numbered",
"TakTuk is terminated",
"Connection failed",
"Connection initialized",
"Connection lost",
"Command started",
"Command failed",
"Command terminated",
"Numbering update failed",
"Pipe input started",
"Pipe input failed",
"Pipe input terminated",
"File reception started",
"File reception failed",
"File reception terminated",
"File send failed",
"Invalid target",
"No target",
"Message delivered",
"Invalid destination",
"Destination not available anymore"
);
sub event_msg($) {
my $event = shift;
if (($event < 0) or ($event > $#event_messages)) {
return "Unknown event code";
} else {
return $event_messages[$event];
}
}
package connector;
use strict; use bytes;
use File::Basename;
our @ISA=qw(command);
our $init_string="Taktuk initialization, version ";
our $connector_functional_string="Taktuk connector functional";
our $pre_load=1;
our $simple_connector=2;
our $initialized=3;
sub new (%) {
my $data=command::new(@_,
data_handler=>\&communicator::process_messages);
return 0 if not $data;
$data->{last_given} = 0;
$data->{count} = 0;
if (exists($data->{peer})) {
if ($data->{propagate}) {
$data->{state}=$pre_load;
} else {
$data->{state}=$simple_connector;
}
my $command_line = $general::connector_command;
$command_line .= " -l $data->{login}" if ($data->{login});
$data->{line} = "$command_line $data->{peer} '$data->{taktuk}'";
} else {
$data->{state} = $initialized;
}
bless($data);
return $data;
}
sub read_data ($) {
my $self = shift;
my $descriptor = shift;
my $result = TakTuk::read_data($descriptor);
(diagnostic::system and $result = 0) if not defined($result);
my $write = undef;
my $error = undef;
$write = $self->{write} if exists $self->{write};
$error = $self->{error} if exists $self->{error};
if ($write and ($descriptor == $write)) {
if (($result >0) and not $self->{timeouted}) {
if ($self->{state} < $initialized) {
my $argument = 0;
$self->{reply_date} = timer::current_time
unless exists($self->{reply_date});
if ($self->{state} == $pre_load) {
if (length(TakTuk::find_sequence($descriptor,
".*$connector_functional_string"))) {
$argument = 1;
}
} elsif ($self->{state} == $simple_connector) {
my $sequence = TakTuk::find_sequence($descriptor,
".*$init_string"."[0-9]+[.0-9]*");
if (length($sequence)) {
$argument = $sequence;
$argument =~ s/.* (.*)$/$1/o;
}
} else {
diagnostic::warning("Wrong connector state : ".
$self->{state});
}
if ($argument > 0) {
my $connector_input = $self->{read};
if ($self->{state} < $simple_connector) {
general::load_taktuk_code;
communicator::post_write($connector_input,
$general::taktuk_code) or diagnostic::system;
} else {
diagnostic::warning("Protocol versions do not match")
if $argument != $TakTuk::RELEASE;
}
$self->{state} += 1;
diagnostic::debug("Connector $self->{line} promoted to ".
"state ".$self->{state});
synchronizer::initialization_complete($self)
if ($self->{state} == $initialized);
}
}
}
} elsif ($error and ($descriptor == $error)) {
if ($result >0) {
my $new_data = TakTuk::flush_buffer($descriptor);
$self->output('connector', $new_data);
} 
} else {
diagnostic::warning("Unknown descriptor");
}
return $result;
}
sub send_parameters() {
my $self = shift;
if ($self->{propagate}) {
general::load_taktuk_code;
$self->send_message(TakTuk::encode($TakTuk::taktuk_code,
$general::taktuk_code));
}
$self->send_message(TakTuk::encode($TakTuk::position,"$self->{position}"));
$self->send_message($TakTuk::gateway) if $self->{gateway};
if (length($self->{files})) {
foreach my $transfer (split /,/,$self->{files}) {
my ($source, $destination) = split /:/, $transfer;
$self->send_file("", $source, $destination);
}
} 
my $arguments = $self->{arguments};
diagnostic::debug("Arguments : ".$arguments);
$self->send_message(TakTuk::encode($TakTuk::arguments, $arguments));
}
sub get_message () {
my $self = shift;
if ($self->{state} >= $initialized) {
if (exists($self->{write})) {
return TakTuk::get_message($self->{write});
} else {
return "";
}
} else {
return "";
}
}
sub send_message($) {
my $self = shift;
my $write_fd = $self->{read};
my $message = shift;
my $full_message = TakTuk::pack($message);
if (not communicator::post_write($write_fd, $full_message)) {
diagnostic::error("Error in send message [$full_message], error : $!");
}
}
sub route_file_part($$) {
my $self = shift;
my $prefix = shift;
my $message = shift;
if ($prefix) {
$message = TakTuk::encode($prefix, $message);
communicator::process_message($self, $message);
} else {
$self->send_message($message);
}
}
sub send_file($$$) {
my $self = shift;
my $prefix = shift;
my $source = shift;
my $destination = shift;
my $permissions;
my $file = undef;
my $result;
my $type;
$source = general::expand($source, position=>$general::position,
host=>$general::host,
rank=>$general::taktuk{rank});
if (-d $source) {
$type = 1;
$result = open($file,"cd $source && tar c . |");
} else {
$type = 0;
$result = open($file, $source);
}
if ($result) {
my $read_result;
my $buffer;
my $message;
binmode($file) or diagnostic::system;
$permissions=(stat($source))[2] & 07777;
$message = TakTuk::pack($general::position).
TakTuk::pack($general::host).
TakTuk::pack($general::taktuk{rank}).
TakTuk::pack($type).
TakTuk::pack($permissions).
TakTuk::pack(basename($source)).
$destination;
$message = TakTuk::encode($TakTuk::file, $message);
$self->route_file_part($prefix, $message);
while ($read_result = sysread($file, $buffer, $TakTuk::read_size/2)) {
$message = TakTuk::encode($TakTuk::file,
TakTuk::pack($general::position).$buffer);
$self->route_file_part($prefix, $message);
}
diagnostic::system if not defined($read_result);
$message = TakTuk::encode($TakTuk::file,
TakTuk::pack($general::position));
$self->route_file_part($prefix, $message);
CORE::close($file) or diagnostic::system;
} else {
diagnostic::system;
$self->output('state',TakTuk::FILE_SEND_FAILED);
}
}
our @pack_fields = ('login', 'peer', 'taktuk', 'arguments', 'position',
'gateway', 'propagate', 'files', 'timeout');
sub pack() {
my $self = shift;
my $result = "";
foreach my $field (@pack_fields) {
$result .= TakTuk::pack($self->{$field});
}
return $result;
}
sub unpack($) {
my $buffer = shift;
my %data = ();
foreach my $field (@pack_fields) {
($data{$field}, $buffer) = TakTuk::unpack($buffer);
}
my $connector = connector::new(%data);
return ($connector, $buffer);
}
sub cancel() {
my $connector = shift;
CORE::kill 9, -$connector->{pid};
}
package communicator;
use strict; use bytes;
use POSIX ":sys_wait_h";
use Errno;
use Fcntl;
our $sinks_number = 0;
our $initialized_sinks_number = 0;
our $target_number = 0;
our $select;
our $pending_writes_select;
our $select_timeout;
our $connections;
our $end = 0;
our %c_from_fd;
our %c_from_target;
our %c_from_pid;
our %pending_writes;
our $pending_writes_number = 0;
our $pending_size = 0;
our $cache_limit;
our %pending_termination_handler;
our %pending_termination_argument;
our $default_root = connector::new(
read => \*STDOUT,
type => settings::FD);
sub create_channel() {
my $communicator_read = undef;
my $communicator_write = undef;
my $control_read = undef;
my $control_write = undef;
my $type = undef;
if (settings::USE_SOCKETPAIR) {
if (not socketpair($communicator_read, $control_read,
AF_UNIX, SOCK_STREAM, PF_UNSPEC)) {
diagnostic::system;
return ();
}
$communicator_write = $communicator_read;
$control_write = $control_read;
$type = settings::SOCKET;
} else {
if (not pipe($communicator_read, $control_write) or
not pipe($control_read, $communicator_write)) {
diagnostic::system;
return ();
}
$type = settings::FD;
}
TakTuk::no_flush($communicator_write);
TakTuk::no_flush($control_write);
my $communicator_connector = connector::new('read' =>$communicator_write,
'write' =>$communicator_read,
'type' =>$type);
my $control_connector = connector::new('read' =>$control_write,
'write' =>$control_read,
'type' =>$type);
return ($communicator_connector, $control_connector);
} 
sub init() {
$select = TakTuk::Select->new() or diagnostic::error("Select creation");
$pending_writes_select =
TakTuk::Select->new() or diagnostic::error("Select creation");
$connections = {};
$connections->{sources} = [];
$connections->{sinks} = [];
$connections->{control} = [];
$connections->{interpreter} = [];
$connections->{local_commands} = [];
$connections->{pipe} = [];
$ENV{TAKTUK_HOSTNAME} = $general::host;
}
sub cleanup() {
foreach my $connector
(@{$connections->{sinks}}, @{$connections->{control}}) {
$connector->cleanup;
}
}
sub get_connections($) {
my $set_name = shift;
my $list = $connections->{$set_name};
return @$list;
}
sub get_root() {
my $sources = $connections->{sources};
if (defined($sources) and scalar(@$sources)) {
return $sources->[0];
} else {
return $default_root;
}
}
sub process_message($$) {
my $connector = shift;
my ($message, $body) = TakTuk::decode(shift);
my $function = handlers::get_handler($message);
if (defined($function)) {
&$function($message, $connector, $body);
} else {
diagnostic::warning("Unknown message : $message (body: $body)");
}
}
sub process_messages($$) {
my $connector = shift;
my $descriptor = shift;
my $result = $connector->read_data($descriptor);
my $message = $connector->get_message;
while ($message) {
process_message($connector, $message);
$message = $connector->get_message;
}
return $result;
}
sub process_command_output($$) {
my $command = shift;
my $descriptor = shift;
my $buffer;
my $read_result;
$read_result = sysread($descriptor, $buffer, $TakTuk::read_size);
(diagnostic::system and $read_result = 0) if not defined($read_result);
if ($read_result > 0) {
my $type;
if (exists($command->{write}) and ($descriptor == $command->{write})) {
$type = "output";
} else {
$type = "error";
}
$command->output($type, $buffer);
}
return $read_result;
}
sub process_pipe_output($$) {
my $command = shift;
my $descriptor = shift;
my $buffer;
my $read_result;
if (not $end) {
$read_result = sysread($descriptor, $buffer, $TakTuk::read_size);
(diagnostic::system and $read_result = 0) if not defined($read_result);
if ($read_result > 0) {
process_message($command, TakTuk::encode($command->{message},
$buffer));
}
return $read_result;
} else {
return 1;
}
}
sub run() {
my @select_result;
my $sinks = $connections->{sinks};
my $local_commands = $connections->{local_commands};
while ((not $end) or scalar(@$sinks) or scalar(@$local_commands) or
$pending_writes_number) {
if (($pending_size > $cache_limit) and ($cache_limit >= 0)) {
@select_result = TakTuk::Select::select(
undef,$pending_writes_select,undef,$select_timeout);
} else {
@select_result = TakTuk::Select::select(
$select,$pending_writes_select,undef,$select_timeout);
}
if (scalar(@select_result)) {
my ($read_set, $write_set, $exception_set) = @select_result;
while (scalar(@$read_set)) {
my $descriptor = shift @$read_set;
my $cobidule = $c_from_fd{$descriptor};
if (exists($cobidule->{data_handler})) {
my $handler = $cobidule->{data_handler};
my $result = &$handler($cobidule, $descriptor);
remove_descriptor($cobidule, $descriptor) if (not $result);
} else {
diagnostic::error("Bug : connector has no data handler");
}
}
while (scalar(@$write_set)) {
my $descriptor = shift @$write_set;
write_pending_stuff($descriptor);
}
if (scalar(@$exception_set)) {
diagnostic::warning("Unexpected exceptional fds : ".
join(' ', @$exception_set));
}
} else {
if ($!{EBADF}) {
my $error_msg="Selected an invalid descriptor ($!)... Very ".
"bad\nRegistered handles = ";
foreach my $handle ($select->handles) {
$error_msg.="$handle (".fileno($handle).") "
if defined(fileno($handle));
}
diagnostic::error($error_msg);
exit 1;
} elsif ($!{EINTR}) {
diagnostic::warning("Select exited because of a signal");
} elsif ($!{EINVAL}) {
diagnostic::error("Invalid time limit for select");
} else {
diagnostic::error("Unknown error");
}
}
timer::check_timeouts;
my $pid;
while (($pid = waitpid -1, WNOHANG) > 0) {
diagnostic::debug("Reaping $pid");
if (exists($c_from_pid{$pid})) {
my $cobidule = $c_from_pid{$pid};
$cobidule->{status} = $?;
$cobidule->{stop_date} = timer::current_time;
remove_cobidule($cobidule);
delete $c_from_pid{$cobidule};
}
}
diagnostic::error("Waiting child, $!") unless defined($pid);
}
}
sub terminate () {
$end = 1;
}
sub add_descriptors($) {
my $command = shift;
my $error;
if (exists($command->{write})) {
$error = $select->add($command->{write});
diagnostic::warning(TakTuk::Select::error_msg($error)) if $error;
$c_from_fd{$command->{write}} = $command;
binmode($command->{write}) or diagnostic::system;
}
if (exists($command->{error})) {
$error = $select->add($command->{error});
diagnostic::warning(TakTuk::Select::error_msg($error)) if $error;
$c_from_fd{$command->{error}} = $command;
binmode($command->{error}) or diagnostic::system;
}
}
sub remove_descriptor($$) {
my $cobidule = shift;
my $descriptor = shift;
my $type;
diagnostic::debug("Removing $cobidule / $descriptor");
my $write = undef;
my $error = undef;
$write = $cobidule->{write} if exists $cobidule->{write};
$error = $cobidule->{error} if exists $cobidule->{error};
if (defined($descriptor)) {
if (defined($write) and ($write == $descriptor)) {
$type = 'write';
} elsif (defined($error) and ($error == $descriptor)) {
$type = 'error';
} else {
diagnostic::error("Invalid descriptor, serious BUG !");
}
if (exists $c_from_fd{$descriptor}) {
my $error = $select->remove($descriptor);
diagnostic::warning(TakTuk::Select::error_msg($error)) if $error;
delete $c_from_fd{$descriptor};
$cobidule->close($type);
remove_cobidule($cobidule);
} else {
diagnostic::warning("Descriptor not present in hash");
}
} else {
diagnostic::warning("Should not be called with undefined descriptor");
}
}
sub remove_cobidule($) {
my $cobidule = shift;
if ((exists($cobidule->{status}) or not exists($cobidule->{pid})) and
not exists($cobidule->{write}) and not exists($cobidule->{error})) {
diagnostic::debug("Removing cobidule $cobidule");
if (exists($cobidule->{read})) {
my $read = $cobidule->{read};
cleanup_pending_stuff($read)
if exists($pending_writes{$read});
$cobidule->close('read');
}
my $found = remove_from_set($cobidule);
diagnostic::warning("Connector to remove not found!") unless $found;
if (exists($cobidule->{remove_handler})) {
my $handler = $cobidule->{remove_handler};
&$handler($cobidule);
}
}
}
sub no_pending_connectors() {
diagnostic::debug("Connectors situation, sinks_number : $sinks_number, ".
"initialized : $initialized_sinks_number");
dagnostic::warning("More initialized than sinks")
if ($sinks_number<$initialized_sinks_number);
return $sinks_number == $initialized_sinks_number;
}
sub add_connector($$) {
my $connector = shift;
my $set_name = shift;
my $set = $connections->{$set_name};
diagnostic::debug("Adding connector: ".join(' ',%$connector));
if (exists($connector->{read})) {
TakTuk::no_flush($connector->{read});
fcntl($connector->{read}, F_SETFL, O_NONBLOCK) or diagnostic::system;
}
$connector->{set} = $set_name;
add_descriptors($connector);
push(@$set,$connector);
$sinks_number++ if $set_name eq 'sinks';
}
sub connector_initialized($) {
my $connector = shift;
$initialized_sinks_number++;
}
sub remove_from_set($) {
my $cobidule = shift;
my $set_name = $cobidule->{set};
my $set = $connections->{$set_name};
my $i = 0;
$i++ while (($i <= $#$set) and ($set->[$i] != $cobidule));
if ($i <= $#$set) {
splice @$set, $i, 1;
diagnostic::debug("Cobidule $cobidule, ".
($cobidule->{line}?$cobidule->{line}." ":"").
($cobidule->{pid}?$cobidule->{pid}." ":"").
"deleted from set $set_name at position $i");
return 1;
} else {
return 0;
}
}
sub remove_source($) {
my $connector = shift;
if (not scalar(@{$connections->{sources}})) {
exit 1;
}
} 
sub remove_interpreter($) {
my $connector = shift;
if ($general::root) {
process_message(get_root, $main::quit_message) unless $end;
} else {
diagnostic::warning("Lost interpreter connector in child (bug) !");
}
} 
sub remove_sink($) {
my $connector = shift;
$sinks_number--;
if ($connector->{state} >= $connector::initialized) {
$connector->output('state', TakTuk::CONNECTION_LOST)
unless $communicator::end;
$initialized_sinks_number--;
} else {
synchronizer::initialization_failed($connector);
}
handlers::check_ongoing_reduces($connector);
} 
sub remove_control($) {
my $connector = shift;
delete($c_from_target{$connector});
} 
sub remove_local_command($) {
my $command = shift;
if (exists($command->{timers})) {
my $timers = $command->{timers};
foreach my $timer (@$timers) {
$timer->unregister if not $timer->{elapsed};
}
}
if (exists($command->{status})) {
$command->output('status', $command->{status})
} else {
diagnostic::warning("Some command terminated without status");
}
$command->output('state', TakTuk::COMMAND_TERMINATED);
}
sub remove_pipe($) {
my $command = shift;
communicator::get_root->output('state', TakTuk::PIPE_TERMINATED);
}
sub post_write($$@) {
my $file = shift;
my $data = shift;
my $length = scalar(@_)?shift:length($data);
my $offset = scalar(@_)?shift:0;
my $result = 1;
(diagnostic::warning("Null length write") and return) if not $length;
diagnostic::warning("Null file") if not $file;
if (not exists($pending_writes{$file})) {
$result = CORE::syswrite($file, $data, $length, $offset);
if ($result) {
$length -= $result;
$offset += $result;
}
}
if ((defined($result) and $length) or $!{EAGAIN}) {
if (not exists($pending_writes{$file})) {
$pending_writes{$file} = [];
my $error = $pending_writes_select->add($file);
diagnostic::warning(TakTuk::Select::error_msg($error)) if $error;
}
my $list = $pending_writes{$file};
push @$list, [ $file, $data, $length, $offset ];
$pending_writes_number++;
$pending_size += $length;
return 1;
} else {
diagnostic::system unless defined($result);
return $result;
}
}
sub post_close($) {
my $cobidule = shift;
if (exists($cobidule->{read})) {
post_termination($cobidule->{read}, \&close_cobidule_read, $cobidule);
}
}
sub post_termination($$$) {
my $descriptor = shift;
my $handler = shift;
my $handler_arg = shift;
if (exists($pending_writes{$descriptor})) {
$pending_termination_handler{$descriptor} = $handler;
$pending_termination_argument{$descriptor} = $handler_arg;
} else {
&$handler($handler_arg);
}
}
sub cleanup_pending_stuff($) {
my $descriptor = shift;
if (exists($pending_writes{$descriptor})) {
foreach my $length (@{$pending_writes{$descriptor}}) {
$pending_size -= $length->[2];
}
$pending_writes_number -= scalar(@{$pending_writes{$descriptor}});
$error = $pending_writes_select->remove($descriptor);
diagnostic::warning(TakTuk::Select::error_msg($error)) if $error;
delete($pending_writes{$descriptor});
if (exists($pending_termination_handler{$descriptor})) {
my $handler = $pending_termination_handler{$descriptor};
my $argument = $pending_termination_argument{$descriptor};
&$handler($argument);
delete($pending_termination_handler{$descriptor});
delete($pending_termination_argument{$descriptor});
}
}
}
sub write_pending_stuff($) {
my $descriptor = shift;
if (exists($pending_writes{$descriptor})) {
my $list = $pending_writes{$descriptor};
my $chunk = shift @$list;
my ($file, $data, $length, $offset) = @$chunk;
my $result;
$result = CORE::syswrite($file, $data, $length, $offset);
if ($result) {
$length -= $result;
$pending_size -= $result;
$offset += $result;
}
if ((defined($result) and $length) or $!{EAGAIN}) {
$chunk->[2] = $length;
$chunk->[3] = $offset;
unshift @$list, $chunk;
} else {
diagnostic::system unless defined($result);
$pending_writes_number--;
}
if (not scalar(@$list)) {
cleanup_pending_stuff($descriptor);
}
}
}
sub flush_pending_stuff() {
while (scalar(keys(%pending_writes))) {
foreach my $descriptor (keys(%pending_writes)) {
write_pending_stuff($descriptor);
}
}
}
sub close_cobidule_read($) {
my $cobidule = shift;
if (exists($cobidule->{read})) {
$cobidule->close('read');
}
}
sub assign_next_target($) {
my $command = shift;
while (exists($c_from_target{$target_number})) {
$target_number++;
}
$command->{target} = $target_number;
$c_from_target{$target_number} = $command;
$target_number++;
}
sub get_target_set($$) {
my $target = shift;
my $set = shift;
my @result = ();
my @connections = get_connections($set);
if ($target eq "all") {
return @connections;
} else {
my @target_list = general::decode_set($target, $target_number - 1);
while (scalar(@target_list)) {
my $min = shift @target_list;
my $max = shift @target_list;
for (my $i=$min; $i <= $max; $i++) {
if (exists ($c_from_target{$i})) {
push @result, $c_from_target{$i};
} else {
diagnostic::warning("Invalid target $i");
communicator::get_root->output('state',
TakTuk::INVALID_TARGET);
}
}
}
if (not scalar(@result)) {
diagnostic::warning("No target");
communicator::get_root->output('state', TakTuk::NO_TARGET);
}
return @result;
}
}
package stats_buffer;
use strict; use bytes;
sub new($$) {
my $type = shift;
my $depth = shift;
my $data = { depth=>$depth, data=>[] };
bless ($data, $type);
return $data;
}
sub is_empty($) {
my $self = shift;
return not scalar(@{$self->{data}});
}
sub add($$) {
my $self = shift;
my $value = shift;
my $data = $self->{data};
push @$data, $value;
if ($#$data > $self->{depth}) {
shift @$data;
}
}
sub average($) {
my $self = shift;
my $average;
my $data = $self->{data};
if (scalar(@$data)) {
$average = 0;
foreach my $value (@$data) {
$average += $value;
}
return $average / scalar(@$data);
} else {
return undef;
}
}
sub min($) {
my $self = shift;
my $data = $self->{data};
my $min;
if (scalar(@$data)) {
$min = $self->{data}[0];
foreach my $value (@$data) {
$min = $value if $min > $value;
}
return $min;
} else {
return undef;
}
}
sub max($) {
my $self = shift;
my $data = $self->{data};
my $max;
if (scalar(@$data)) {
$max = $self->{data}[0];
foreach my $value (@$data) {
$max = $value if $max < $value;
}
return $max;
} else {
return undef;
}
}
package scheduler;
use strict; use bytes;
our $window;
our $window_adaptation;
our $undeployed_connectors = 0;
our $current_window = 0;
our $steal_request_sent = 0;
our $worksteal;
our @waiting_thieves = ();
our %stats = (connections=>stats_buffer->new(5), shifts=>stats_buffer->new(5));
our $last_time = undef;
our $threshold_low = 0.1;
our $threshold_high = 0.2;
our $max_increase = 1.5;
our @group = ( { static => [], dynamic => [], limit => 0, arity => 0 } );
our $current_group = 0;
our $last_group = 0;
our @groups_stack = ();
sub begin_group() {
my $limit = $group[$current_group]->{limit};
push @groups_stack, $current_group;
push @group, { static => [], dynamic => [], limit => $limit, arity => 0 };
$current_group = $#group;
}
sub end_group() {
$current_group = pop @groups_stack;
}
sub set_limit($) {
$group[$current_group]->{limit} = shift;
}
sub deploy_connector($) {
my $connector = shift;
if ($connector->run(0)) {
my $timer = timer->register($connector->{timeout},
\&synchronizer::initialization_timeout);
$connector->{timer} = $timer;
$timer->{connector} = $connector;
$connector->{remove_handler} = \&communicator::remove_sink;
communicator::add_connector($connector, 'sinks');
diagnostic::debug("Connector added : ".$connector->{line});
$group[$connector->{group}]->{arity}++;
$current_window++;
synchronizer::set_not_ready();
} else {
diagnostic::warning("Giving up connection to $connector->{peer}");
}
}
sub add_connector($) {
my $connector = shift;
my $number = $connector->{group};
my $static = $group[$number]->{static};
my $dynamic = $group[$number]->{dynamic};
my $arity = $group[$number]->{arity};
my $limit = $group[$number]->{limit};
if (($current_window < $window) and not at_limit($group[$number])) {
deploy_connector($connector);
} else {
if ($limit < 0) {
push @$static, $connector;
} else {
push @$dynamic, $connector;
}
$undeployed_connectors++;
}
}
sub connector_initialized($) {
my $connector = shift;
my $time = $connector->{init_date} - $connector->{start_date};
if ($window_adaptation) {
my $threshold;
my $current_time;
my $shift = 0;
$stats{connections}->add($time);
$threshold = $stats{connections}->min?
$time/$stats{connections}->min - 1:0;
$current_time = timer::current_time;
if ($last_time) {
$shift = $current_time - $last_time;
$stats{shifts}->add($shift);
}
$last_time = $current_time;
if ($threshold) {
my $adapt = 0;
my $old_window = $window;
if (($threshold < $threshold_low) and $stats{shifts}->average) {
my $max_window;
$max_window = floor($window * $max_increase);
$window = floor($stats{connections}->min / 
$stats{shifts}->average);
$window = $max_window if $window > $max_window;
} elsif ($threshold > $threshold_high) {
$window -= floor($threshold*$window);
}
$window = 1 if $window < 1;
diagnostic::debug("Window gone from $old_window to $window");
}
}
$current_window--;
schedule;
}
sub connector_failed($) {
my $connector = shift;
$current_window--;
$group[$connector->{group}]->{arity}--;
schedule;
}
sub at_limit($) {
my $group = shift;
if (($group->{limit} > 0) and ($group->{arity} >= $group->{limit})) {
return 1;
} else {
return 0;
}
}
sub schedule() {
my $deployed = 1;
my $new_group = $last_group + 1;
$new_group %= scalar(@group);
while ($undeployed_connectors
and ($deployed or ($new_group != $last_group))
and ($current_window < $window)) {
my $group = $group[$new_group];
my $connector = undef;
if (scalar(@{$group->{static}})) {
$connector = shift @{$group->{static}};
} elsif (scalar(@{$group->{dynamic}}) and not at_limit($group)) {
$connector = shift @{$group->{dynamic}};
}
if (defined($connector)) {
deploy_connector($connector);
$undeployed_connectors--;
$last_group = $new_group;
$deployed = 1;
} else {
$deployed = 0;
}
$new_group++;
$new_group %= scalar(@group);
}
$last_group = $new_group;
if (not $general::root and not $steal_request_sent and
not at_limit($group[0]) and
($undeployed_connectors == 0) and ($current_window < $window)) {
send_steal_request("");
diagnostic::debug("Steal request sent");
} else {
diagnostic::debug("Nothing done in scheduler");
}
}
sub is_idle() {
return $undeployed_connectors == 0;
}
sub send_work_to($) {
my $thief = shift;
my $group = $group[$thief->{group}];
my $work_to_give = scalar(@{$group->{dynamic}});
my $given=0;
diagnostic::debug("Theft in group $thief->{group}, ".
"dynamic connectors : ".scalar(@{$group->{dynamic}}).
", work to give : $work_to_give"); 
if ($work_to_give) {
my $available = scalar(@{$group->{dynamic}});
my $last_given = $thief->{last_given};
my $limit = eval($worksteal->{limit});
my $to_give = 0;
my $workpack="";
if ($last_given) {
$to_give = eval($worksteal->{growth});
} else {
$to_give = eval($worksteal->{initial});
}
$to_give = $limit if $to_give > $limit;
for ($given=0; $given < $to_give; $given++) {
my $connector_to_deploy = shift @{$group->{dynamic}};
$workpack = $workpack.$connector_to_deploy->pack();
$undeployed_connectors--;
}
$thief->send_message(TakTuk::encode($TakTuk::work, $workpack));
diagnostic::debug("Work sent : ".$workpack);
}
return $given;
} 
sub dispatch_work($) {
my $workpack = shift;
my $connector;
while ($workpack) {
($connector, $workpack) = connector::unpack($workpack);
$connector->{group} = 0;
diagnostic::debug("Unpacked connector: $connector");
push @{$group[0]->{dynamic}}, $connector;
$undeployed_connectors++;
}
$steal_request_sent = 0;
schedule;
my $satisfied = 1;
while (scalar(@waiting_thieves) and $satisfied) {
my $thief = shift @waiting_thieves;
$satisfied = send_work_to($thief);
unshift @waiting_thieves, $thief unless $satisfied;
}
send_steal_request("") unless $satisfied;
}
sub theft_handler($$) {
my $connector = shift;
my $parameters = shift;
my $given = send_work_to($connector);
push @waiting_thieves, $connector unless $given or $connector->{group};
if (scalar(@waiting_thieves) and not $general::root and
not $steal_request_sent) {
send_steal_request($parameters);
diagnostic::debug("Forwarded steal request, parameters : $parameters");
} elsif (scalar(@waiting_thieves)) {
if ($general::root) {
diagnostic::debug("Cannot satisfy steal request : no more dynamic ".
"work");
} elsif ($steal_request_sent) {
diagnostic::debug("Request already sent");
}
}
}
sub send_steal_request($) {
my $parameters = shift;
communicator::get_root->send_message(
TakTuk::encode($TakTuk::steal, $parameters));
$steal_request_sent = 1;
}
sub resign() {
foreach my $group (@group) {
$group->{static} = [];
$group->{dynamic} = [];
}
$undeployed_connectors = 0;
}
package timer;
use strict; use bytes;
our $has_timehires = eval("use Time::HiRes;1")?1:0;
our @registered_timers;
sub current_time() {
if ($has_timehires) {
my ($seconds, $micro) = Time::HiRes::gettimeofday();
return $seconds + $micro/1000000;
} else {
return time;
}
}
sub register($$) {
my $type = shift;
my $timeout = shift;
my $handler = shift;
my $current = current_time;
my $data = { 'handler'=>$handler,
'birth'=>$current,
'elapsed'=>0 };
bless($data, $type);
if ($timeout) {
$data->{timeout} = $current+$timeout;
my $i=$#registered_timers;
$i-- while (($i >= 0) and
($registered_timers[$i]->{timeout} > $data->{timeout}));
splice @registered_timers, $i+1, 0, $data;
} else {
$data->{timeout} = 0;
}
diagnostic::debug("Registered new timer $data, ".$data->print.
", list = @registered_timers");
return $data;
}
sub check_timeouts() {
my $current = current_time;
while (scalar(@registered_timers) and
($registered_timers[0]->{timeout} <= $current)) {
my $timer = shift @registered_timers;
$timer->{elapsed} = 1;
my $handler = $timer->{handler};
&$handler($timer);
diagnostic::debug("Timeout handled for $timer, ".$timer->print);
}
}
sub unregister() {
my $timer = shift;
if ($timer->{timeout}) {
my $i=0;
$i++ while (($i <= $#registered_timers) and
($registered_timers[$i] != $timer));
if ($i <= $#registered_timers) {
splice @registered_timers, $i, 1;
diagnostic::debug("Unregistered timer $timer, remaining ".
"@registered_timers equals ".
scalar(@registered_timers));
} else {
diagnostic::warning("Unregistering didn't found timer $timer");
}
}
}
sub gettime($) {
my $current = current_time;
my $timer = shift;
return $current - $timer->{birth};
}
sub print($) {
my $current = current_time;
my $timer = shift;
return "Timer created at $timer->{birth} timeouting at ".
"$timer->{timeout} current is $current";
}
package synchronizer;
use strict; use bytes;
our @states = (1, 1);
our $father_is_ready=0;
our %blocked;
our %event_waited;
sub check_ready_state() {
if (!$states[TakTuk::TAKTUK_READY]) {
if ($father_is_ready && scheduler::is_idle &&
communicator::no_pending_connectors) {
foreach my $connector (communicator::get_connections('sinks')) {
$connector->send_message($TakTuk::ready);
}
dispatch_event(TakTuk::TAKTUK_READY);
diagnostic::debug("I'm ready");
} else {
diagnostic::debug("I'm not ready : father_is_ready = ".
"$father_is_ready, is_idle = ".
scheduler::is_idle.", no_pending_connector = ".
communicator::no_pending_connectors);
}
}
}
sub set_not_ready() {
if ($states[TakTuk::TAKTUK_READY]) {
$states[TakTuk::TAKTUK_READY] = 0;
block_until_event(TakTuk::TAKTUK_READY, $TakTuk::broadcast,
$TakTuk::spread);
}
}
sub initialization_complete($) {
my $connector = shift;
diagnostic::debug("Connector $connector->{line} initialized");
$connector->{init_date} = timer::current_time;
$connector->{timer}->unregister;
delete $connector->{timer};
communicator::connector_initialized($connector);
scheduler::connector_initialized($connector);
$connector->send_parameters;
$connector->output('state', TakTuk::CONNECTION_INITIALIZED);
check_ready_state;
}
sub initialization_failed($) {
my $connector = shift;
if (exists($connector->{timer})) {
$connector->{timer}->unregister;
delete $connector->{timer};
}
$connector->output('state', TakTuk::CONNECTION_FAILED);
scheduler::connector_failed($connector);
check_ready_state;
}
sub initialization_timeout($) {
my $timer = shift;
my $connector = $timer->{connector};
delete $connector->{timer};
$connector->{timeouted} = 1;
if ($connector->{state} >= $connector::initialized) {
diagnostic::warning("Bug, timeouted an initialized connector");
} else {
if (exists($connector->{read})) {
communicator::cleanup_pending_stuff($connector->{read});
}
$connector->output('connector', "timeouted");
$connector->cancel;
}
}
sub set_not_numbered() {
if ($states[TakTuk::TAKTUK_NUMBERED]) {
$states[TakTuk::TAKTUK_NUMBERED] = 0;
block_until_event(TakTuk::TAKTUK_NUMBERED, $TakTuk::execute,
$TakTuk::eof,
$TakTuk::get,
$TakTuk::input,
$TakTuk::kill,
$TakTuk::message,
$TakTuk::put,
$TakTuk::send_to,
$TakTuk::synchronize,
$TakTuk::quit,
$TakTuk::taktuk_perl);
}
}
sub block_until_event($@) {
my $event = shift;
my $handlers;
my $pending;
if (exists($blocked{$event})) {
$handlers = $blocked{$event}->{handlers};
$pending = $blocked{$event}->{pending};
} else {
$handlers = {};
$pending = [];
$blocked{$event} = { handlers=>$handlers, pending=>$pending };
}
foreach my $message (@_) {
if (exists($event_waited{$message})) {
diagnostic::warning("Multiple blocking for $message");
} else {
$handlers->{$message} = handlers::get_handler($message);
handlers::replace_handler($message, \&handlers::handler_blocked);
$event_waited{$message} = $event;
}
}
}
sub dispatch_event($) {
my $event = shift;
my $handlers = $blocked{$event}->{handlers};
my $pending_list = $blocked{$event}->{pending};
if (not $states[$event]) {
communicator::get_root->output('state', $event);
$states[$event] = 1;
foreach my $message (keys(%$handlers)) {
handlers::replace_handler($message, $handlers->{$message});
delete $event_waited{$message};
}
general::apply_gateway_restrictions;
while (scalar(@$pending_list)) {
my $message = shift @$pending_list;
my $connector = shift @$pending_list;
my $body = shift @$pending_list;
my $function = handlers::get_handler($message);
&$function($message, $connector, $body);
diagnostic::debug("Processing $message with body $body");
}
delete $blocked{$event};
} else {
diagnostic::warning("Not in proper state to dispatch $event event");
}
}
sub add_pending_message($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
my $event = $event_waited{$message};
my $pending_list = $blocked{$event}->{pending};
push @$pending_list, $message, $connector, $body;
}
sub get_state($) {
return $states[shift];
}
sub setup_synchronization() {
if ($main::no_numbering) {
if (get_state(TakTuk::TAKTUK_READY)) {
diagnostic::warning("Cannot synchronize an unnumbered ready ".
"TakTuk");
} else {
block_until_event(TakTuk::TAKTUK_READY, $TakTuk::quit,
$TakTuk::synchronize);
}
} else {
set_not_numbered;
}
}
package handlers;
use strict; use bytes;
use Fcntl;
our %handlers;
sub register_handler($$) {
my $message = shift;
if (defined($handlers{$message})) {
diagnostic::error("Handler already defined for $message");
} else {
$handlers{$message} = shift;
}
}
sub replace_handler($$) {
my $message = shift;
if (defined($handlers{$message})) {
$handlers{$message} = shift;
} else {
diagnostic::error("Handler not defined for $message");
}
}
sub get_handler($) {
my $message = shift;
return $handlers{$message};
}
sub handler_blocked($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
synchronizer::add_pending_message($message, $connector, $body);
}
sub arguments($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
my $element;
my @arguments;
while ($body) {
($element, $body) = TakTuk::unpack($body);
push @arguments, $element;
}
arguments::fetch_arguments(@arguments);
arguments::parse_options;
main::process_commands;
scheduler::schedule;
}
sub broadcast($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
foreach my $other_connector (communicator::get_connections('sources'),
communicator::get_connections('sinks')) {
if ($other_connector != $connector) {
diagnostic::debug("Spreading message to $other_connector");
$other_connector->send_message(TakTuk::encode(
$TakTuk::spread, $body));
}
}
}
sub down($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
my $root_connector = communicator::get_root;
communicator::process_message($root_connector, $body);
}
sub empty($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
}
sub eof($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
my $target;
my @target_set;
$target = $body;
@target_set = communicator::get_target_set($target, 'local_commands');
foreach my $command (@target_set) {
communicator::post_close($command);
diagnostic::debug("Posted close inputs for command $command");
}
}
sub update_failed($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
communicator::get_root->output('state', TakTuk::UPDATE_FAILED)
if ($general::taktuk{rank} < 0);
synchronizer::dispatch_event(TakTuk::TAKTUK_NUMBERED)
}
sub execute_timeout($) {
my $timer = shift;
my $message_list = $timer->{messages};
diagnostic::debug("Timeout triggered for $timer->{command}->{line}");
$ENV{TAKTUK_PID} = $timer->{command}->{pid};
if (not length($message_list)) {
CORE::kill 15, -$timer->{command}->{pid};
}
while (length($message_list)) {
my ($message, $action);
($message, $message_list) = TakTuk::unpack($message_list);
($action, $message) = TakTuk::decode($message);
if ($action eq $TakTuk::action) {
communicator::process_message($timer->{command}->{control},
$message);
} elsif ($action eq $TakTuk::kill) {
CORE::kill $message, -$timer->{command}->{pid};
} else {
diagnostic::error("Internal error : unknown timeout action");
}
}
}
sub execute($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
my ($attributes, $line) = TakTuk::unpack($body);
my $timers = [];
my $command = command::new(line=>$line,
data_handler=>\&communicator::process_command_output,
timers=>$timers);
my $result;
while (length($attributes)) {
my $timeout_pack;
my $timeout;
($timeout_pack, $attributes) = TakTuk::unpack($attributes);
diagnostic::error("Internal error") if not defined $timeout_pack;
($timeout, $timeout_pack) = TakTuk::unpack($timeout_pack);
diagnostic::error("Internal error") if not defined $timeout;
my $timer = timer->register($timeout, \&execute_timeout);
$timer->{command} = $command;
$timer->{connector} = $connector;
$timer->{messages} = $timeout_pack;
push @$timers, $timer;
}
$ENV{TAKTUK_PIDS} = join(' ', map("$_->{pid}",
communicator::get_connections('local_commands')));
communicator::assign_next_target($command);
$ENV{TAKTUK_TARGET} = $command->{target};
if ($result = $command->run(1)) {
$command->{remove_handler} = \&communicator::remove_local_command;
communicator::add_connector($command, 'local_commands');
$command->output('state', TakTuk::COMMAND_STARTED);
} else {
diagnostic::warning("Giving up command $line");
$command->output('state', TakTuk::COMMAND_FAILED);
}
return $result;
}
our %filename;
our %file_perms;
our %file;
sub file_finalize($) {
my $position = shift;
if (exists($file{$position})) {
CORE::close($file{$position}) or diagnostic::system;
chmod $file_perms{$position}, $filename{$position}
or diagnostic::system;
}
}
sub file($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
my $position;
($position, $body) = TakTuk::unpack($body);
if (not exists($file{$position})) {
my ($host, $rank, $type, $permissions, $source, $destination);
($host, $body) = TakTuk::unpack($body);
($rank, $body) = TakTuk::unpack($body);
($type, $body) = TakTuk::unpack($body);
($permissions, $body) = TakTuk::unpack($body);
($source, $body) = TakTuk::unpack($body);
$destination = $body;
my $result = undef;
$destination = general::expand($destination,
position=>$position,
host=>$host,
rank=>$rank);
if ($type) {
if (-d $destination) {
if (-d "$destination/$source" or mkdir "$destination/$source") {
$filename{$position} = "$destination/$source";
} else {
diagnostic::system;
}
} elsif (mkdir $destination) {
$filename{$position} = $destination;
} else { 
diagnostic::system;
}
if (exists($filename{$position})) {
$result = open($file{$position},
"| cd $filename{$position} && tar x");
}
} else {
if (-d $destination) {
$filename{$position} = "$destination/$source";
} else {
$filename{$position} = $destination;
}
$result = open($file{$position}, ">", $filename{$position});
}
if (defined($result)) {
binmode($file{$position}) or diagnostic::system;
$file_perms{$position} = $permissions;
fcntl($file{$position}, F_SETFL, O_NONBLOCK) or diagnostic::system;
$connector->output('state',TakTuk::FILE_RECEPTION_STARTED);
} else {
diagnostic::system;
$file{$position} = undef;
$connector->output('state',TakTuk::FILE_RECEPTION_FAILED);
}
} else {
if (not length($body)) {
if (defined($file{$position})) {
communicator::post_termination($file{$position},
\&file_finalize, $position);
$connector->output('state',
TakTuk::FILE_RECEPTION_TERMINATED);
} else {
diagnostic::warning("End file message but no descritor");
}
delete($file{$position}) if exists($file{$position});
delete($file_perms{$position}) if exists($file_perms{$position});
delete($filename{$position}) if exists($filename{$position});
} else {
if (defined($file{$position})) {
communicator::post_write($file{$position},$body) or
diagnostic::system;
} else {
diagnostic::warning("Undefined file descriptor for data");
}
}
}
}
sub forward_up($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
if ($general::root) {
communicator::process_message($connector, $body);
} else {
foreach my $other_connector (communicator::get_connections('sources')) {
$other_connector->send_message(
TakTuk::encode($TakTuk::forward_up, $body));
}
}
}
sub gateway($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
$general::taktuk{gateway} = 1;
}
sub get($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
my $put_prefix;
my $get_prefix;
($get_prefix, $body) = TakTuk::unpack($body);
$put_prefix=TakTuk::encode($TakTuk::send_to,
TakTuk::pack($general::taktuk{rank}));
communicator::process_message($connector, TakTuk::encode($get_prefix,
TakTuk::encode($TakTuk::put, TakTuk::pack($put_prefix).$body)));
}
sub get_info($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
if (exists($general::taktuk{$body})) {
$connector->send_message(TakTuk::encode($TakTuk::info,
$general::taktuk{$body}));
} else {
$connector->send_message($TakTuk::invalid);
}
}
sub input($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
my $result;
my $target;
my @target_set;
($target, $body) = TakTuk::unpack($body);
@target_set = communicator::get_target_set($target, 'local_commands');
foreach my $command (@target_set) {
if (exists($command->{read})) {
$result = communicator::post_write($command->{read}, $body);
} else {
$result = 1;
}
if (not $result) {
if ($!{EPIPE}) {
diagnostic::debug("Broken pipe when sending input");
} else {
diagnostic::system;
}
}
}
}
sub kill($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
my $target;
my @target_set;
($target, $body) = TakTuk::unpack($body);
@target_set = communicator::get_target_set($target, 'local_commands');
foreach my $command (@target_set) {
CORE::kill $body, -$command->{pid};
}
}
our $from;
our $to;
our @messages_pool;
our @pending_recv;
sub message($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
my $target;
my @target_set = ();
($target, $body) = TakTuk::unpack($body);
diagnostic::debug("In message, target is $target, body is $body");
if ($target ne "output") {
if ($target eq "any") {
if (scalar(@pending_recv)) {
push @target_set, shift @pending_recv;
} else {
push @messages_pool, TakTuk::encode($message,
TakTuk::pack($target).$body);
} 
} else {
@target_set = communicator::get_target_set($target, 'control');
}
foreach my $control (@target_set) {
$control->send_message(TakTuk::encode($message, $body));
$control->{pending_messages}++;
if (($control->{pending_messages} == 0) and
exists($control->{timer})) {
$control->{timer}->unregister
if not $control->{timer}->{elapsed};
delete $control->{timer};
}
}
} else {
my $root = communicator::get_root;
($from, $body) = TakTuk::unpack($body);
$root->output('message', $body);
}
}
sub numbered($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
my ($father_rank, $rank, $count, $update) = split / /,$body;
my @connections = communicator::get_connections('sinks');
my $current = $rank;
diagnostic::debug("Numbering from $rank, count = $count");
if ($general::taktuk{gateway}) {
$general::taktuk{rank} = -1;
$ENV{TAKTUK_RANK} = -1;
} else {
$general::taktuk{rank} = $rank;
$ENV{TAKTUK_RANK} = $rank;
$rank = $father_rank;
$current++;
}
$general::taktuk{count} = $count;
$ENV{TAKTUK_COUNT} = $count;
$general::taktuk{father} = $father_rank;
$ENV{TAKTUK_FATHER} = $father_rank;
$general::taktuk{child_min} = $current;
for (my $i=0; $i<=$#connections; $i++) {
my $other_connector = $connections[$i];
if ($other_connector != $connector) {
my $min = $current;
my $max = $current + $other_connector->{count} - 1;
if ($update and exists($other_connector->{min})
and (($min != $other_connector->{min})
or (($max != $other_connector->{max})
and ($i != $#connections)))) {
communicator::get_root->output('state',
TakTuk::UPDATE_FAILED);
$min = $other_connector->{min};
$max = $other_connector->{max};
$other_connector->send_message(TakTuk::encode($TakTuk::spread,
$TakTuk::update_failed));
} else {
$other_connector->send_message(TakTuk::encode(
$TakTuk::numbered,"$rank $min $count $update"));
$other_connector->{min} = $min;
$other_connector->{max} = $max;
}
$current = $max+1;
} else {
diagnostic::warning("Bug : numbered coming from a sink");
}
}
$general::taktuk{child_max} = $current-1;
if ($general::taktuk{child_min} > $general::taktuk{child_max}) {
$general::taktuk{child_min} = -1;
$general::taktuk{child_max} = -1;
}
$ENV{TAKTUK_CHILD_MIN} = $general::taktuk{child_min};
$ENV{TAKTUK_CHILD_MAX} = $general::taktuk{child_max};
synchronizer::dispatch_event(TakTuk::TAKTUK_NUMBERED);
}
our %filehandles;
sub output($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
if ($general::root) {
my ($fd, $remaining) = TakTuk::unpack($body);
if (not exists($filehandles{$fd})) {
open ($filehandles{$fd}, $fd) or diagnostic::system;
}
communicator::post_write($filehandles{$fd}, $remaining)
or diagnostic::system;
} else {
diagnostic::warning("Output message received on non root node");
}
}
sub option($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
my ($name, $value) = split //,$body,2;
diagnostic::debug("Setting $name to $value in $general::taktuk{rank}");
option::set($name, $value);
}
sub options($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
diagnostic::debug("Fetching [ $body ] as options line");
arguments::fetch_line($body);
arguments::parse_options();
}
sub pipe($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
my ($filename, $prefix) = TakTuk::unpack($body);
my $fd = general::open_file($filename) or diagnostic::system;
if ($fd) {
my $command = command::new(write=>$fd, type=>settings::FD,
data_handler=>\&communicator::process_pipe_output,
message=>$prefix);
$command->{remove_handler} = \&communicator::remove_pipe;
communicator::add_connector($command, 'pipe');
communicator::get_root->output('state', TakTuk::PIPE_STARTED);
} else {
communicator::get_root->output('state', TakTuk::PIPE_FAILED);
}
}
sub position($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
($general::position) = $body;
$ENV{TAKTUK_POSITION} = $body;
}
sub put($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
my $prefix;
my $source;
my $destination;
($prefix, $body) = TakTuk::unpack($body);
($source, $body) = TakTuk::unpack($body);
($destination, $body) = $body;
diagnostic::debug("Calling send_file with $prefix, $source, ".
"$destination from $general::taktuk{rank}");
$connector->send_file($prefix, $source, $destination);
}
sub quit($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
if (not $communicator::end) {
handlers::eof($TakTuk::eof, $connector, "all");
communicator::terminate;
foreach my $other_connector (communicator::get_connections('sinks')) {
diagnostic::debug("Sending quit message to $other_connector");
$other_connector->send_message($TakTuk::quit);
}
}
}
sub ready($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
diagnostic::debug("Ready received");
$synchronizer::father_is_ready = 1;
synchronizer::check_ready_state;
}
sub recv_timeout($) {
my $timer = shift;
my $i=0;
$timer->{connector}->send_message($TakTuk::timeout);
$timer->{connector}->{pending_messages}++;
while ($i<=$#pending_recv) {
if ($pending_recv[$i] == $timer->{connector}) {
splice @pending_recv, $i, 1;
}
$i++;
}
}
our %reduce_result;
our %reduce_remaining;
our %reduce_connector;
our @reduce_pending;
use constant REDUCE_INITIALIZATION => -1;
use constant REDUCE_FINALIZATION => -2;
our %reduce_handler = ($TakTuk::reduce_count => \&reduce_count,
$TakTuk::reduce_tree => \&reduce_tree);
sub reduce_count ($$$$) {
my $connector = shift;
my $rank = shift;
my $new_value = shift;
my $data = shift;
if ($rank == REDUCE_INITIALIZATION) {
$general::numbering_update = $new_value?1:0;
synchronizer::set_not_numbered;
if ($general::taktuk{gateway}) {
return 0;
} else {
return 1;
}
} elsif ($rank == REDUCE_FINALIZATION) {
if ($general::root) {
$data--;
numbered($TakTuk::numbered, communicator::get_root,
"-1 0 $data $general::numbering_update");
return undef;
} else {
return $data;
}
} else {
$connector->{count} = $new_value;
return $data + $new_value; 
}
}
sub reduce_tree($$$$) {
my $source_connector = shift;
my $rank = shift;
my $new_value = shift;
my $data = shift;
my $result;
if ($rank == REDUCE_INITIALIZATION) {
my $ready = $synchronizer::states[TakTuk::TAKTUK_READY] ?
"ready":"busy";
my $rank = ($general::taktuk{rank} == -1) ? "X":$general::taktuk{rank};
$result = "['$general::host ($rank, $ready)'";
my @sinks = communicator::get_connections('sinks');
foreach my $connector (@sinks) {
if ($connector->{state} < $connector::initialized) {
my $number = $connector->{$TakTuk::reduce_tree};
$reduce_result{$TakTuk::reduce_tree}->[$number] =
", ['connecting $connector->{peer}']";
delete $connector->{$TakTuk::reduce_tree};
$reduce_remaining{$TakTuk::reduce_tree}--;
} else {
$connector->send_message(TakTuk::encode($TakTuk::reduce,
$TakTuk::reduce_tree));
}
}
} elsif ($rank == REDUCE_FINALIZATION) {
if ($general::root) {
eval('$new_value = '.$data."]");
general::print_tree("",$new_value);
$result = undef;
} else {
$result = $data."]";
}
} else {
$result = defined($new_value) ? $data.",".$new_value : $data;
}
return $result;
}
sub reduce($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
my @connectors = (communicator::get_connections('sources'),
communicator::get_connections('sinks'));
my $i;
diagnostic::debug("Reduce in $general::taktuk{rank}");
my ($type,$value) = TakTuk::decode($body);
if (not exists($reduce_connector{$type})) {
diagnostic::error("Handler not defined for reduce $type")
if not exists($reduce_handler{$type});
$reduce_connector{$type} = [];
$reduce_connector{$type}->[0] = $connector;
$i = 0;
foreach my $target (@connectors) {
if ($target != $connector) {
$i++;
$target->{$type} = $i;
$reduce_connector{$type}->[$i] = $target;
}
}
$reduce_remaining{$type} = $i;
my $handler = $reduce_handler{$type};
$reduce_result{$type} = [];
$reduce_result{$type}->[0] =
&$handler($connector, REDUCE_INITIALIZATION, $value, undef);
} else {
push @reduce_pending, [ $message, $connector, $body ];
}
reduce_status_analysis($type);
}
sub reduce_status_analysis($) {
my $type = shift;
if ($reduce_remaining{$type} == 0) {
my $data = $reduce_result{$type};
my $handler = $reduce_handler{$type};
my $connector = $reduce_connector{$type};
my $result = $data->[0];
for (my $i=1; $i<=$#$data; $i++) {
$result = &$handler($connector->[$i], $i, $data->[$i], $result);
}
$result = &$handler($connector->[0], REDUCE_FINALIZATION,
undef, $result);
if (defined($result)) {
my $reply_connector = $connector->[0];
$reply_connector->send_message(TakTuk::encode(
$TakTuk::reduce_result, TakTuk::encode($type, $result)));
}
delete $reduce_connector{$type};
delete $reduce_remaining{$type};
delete $reduce_result{$type};
if (scalar(@reduce_pending)) {
my $next = pop @reduce_pending;
reduce($next->[0], $next->[1], $next->[2]);
}
}
}
sub reduce_result($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
diagnostic::debug("Reduce result from $general::taktuk{rank}");
my ($type,$value) = TakTuk::decode($body);
$reduce_result{$type}->[$connector->{$type}] = $value;
$reduce_remaining{$type}--;
delete $connector->{$type};
reduce_status_analysis($type);
}
sub check_ongoing_reduces($) {
my $connector = shift;
foreach my $type (keys(%reduce_connector)) {
if (exists($connector->{$type})) {
$reduce_remaining{$type}--;
reduce_status_analysis($type);
}
}
}
sub resign($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
scheduler::resign;
foreach my $other_connector (communicator::get_connections('sources'),
communicator::get_connections('sinks')) {
if ($other_connector != $connector) {
if ($other_connector->{state} < $connector::initialized) {
$other_connector->output('connector',"canceled");
$other_connector->cancel;
} else {
$other_connector->send_message($TakTuk::resign);
}
}
}
}
sub send_to($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
my ($to, $remaining) = TakTuk::unpack($body);
diagnostic::debug("Message to $to and I'm $general::taktuk{rank}");
my $new_message;
my @destination_list = general::decode_set($to, $general::taktuk{count});
if (not scalar(@destination_list)) {
diagnostic::error("Invalid set specification : $to");
communicator::get_root->output('state', TakTuk::INVALID_DESTINATION);
}
my @upward = ();
my @downward = ();
my $delivery = 0;
my $min = shift @destination_list;
my $max = shift @destination_list;
my $local_min;
if ($general::taktuk{gateway}) {
$local_min = $general::taktuk{child_min};
} else {
$local_min = $general::taktuk{rank};
}
while (defined($min) and ($min < $local_min)) {
if ($max >= $local_min) {
push @upward, $min, $local_min-1;
unshift @destination_list, $local_min, $max;
} else {
push @upward, $min, $max;
}
$min = shift @destination_list;
$max = shift @destination_list;
}
if (defined($min) and ($min == $general::taktuk{rank})) {
$delivery = 1;
if ($max == $general::taktuk{rank}) {
$min = shift @destination_list;
$max = shift @destination_list;
} else {
$min++;
}
}
my $i=0;
my @sinks = communicator::get_connections('sinks');
while (defined($min) and ($min <= $general::taktuk{child_max})) {
if ($max > $general::taktuk{child_max}) {
push @upward, $general::taktuk{child_max}+1, $max;
$max = $general::taktuk{child_max};
}
while (($i <= $#sinks) and ($min > $sinks[$i]->{max})) {
if (scalar(@downward)) {
$new_message = TakTuk::encode($message,
TakTuk::pack(general::encode_set(@downward)).$remaining);
$sinks[$i]->send_message($new_message);
@downward = ();
}
$i++;
}
if (($i <= $#sinks) and ($min < $sinks[$i]->{min}) and
($max >= $sinks[$i]->{min})) {
unshift @destination_list, $sinks[$i]->{min}, $max;
$max = $sinks[$i]->{min}-1;
}
if (($i > $#sinks) or ($min < $sinks[$i]->{min})) {
diagnostic::warning("Send problem, ".
(($min != $max)?"$min-$max":"$min")." not available anymore");
communicator::get_root->output('state',
TakTuk::UNAVAILABLE_DESTINATION);
}
else {
if ($max > $sinks[$i]->{max}) {
unshift @destination_list, $sinks[$i]->{max}+1, $max;
$max = $sinks[$i]->{max};
}
push @downward, $min, $max;
}
$min = shift @destination_list;
$max = shift @destination_list;
}
if (scalar(@downward)) {
$new_message = TakTuk::encode($message,
TakTuk::pack(general::encode_set(@downward)).$remaining);
$sinks[$i]->send_message($new_message);
}
while (defined($min)) {
push @upward, $min, $max;
$min = shift @destination_list;
$max = shift @destination_list;
}
if (scalar(@upward)) {
if ($general::root) {
diagnostic::warning("Send problem, ".general::encode_set(@upward).
" is(are) invalid destination(s)");
} else {
$new_message = TakTuk::encode($message,
TakTuk::pack(general::encode_set(@upward)).$remaining);
communicator::get_root->send_message($new_message);
}
}
if ($delivery) {
diagnostic::debug("Delivered message");
communicator::get_root->output('state', TakTuk::MESSAGE_DELIVERED);
communicator::process_message($connector, $remaining);
}
}
sub spread($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
broadcast($message, $connector, $body);
diagnostic::debug("Handling message locally");
communicator::process_message($connector, $body);
}
sub steal($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
diagnostic::debug("Steal request : $body, connector : $connector");
scheduler::theft_handler($connector, $body);
synchronizer::check_ready_state;
}
sub synchronize($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
communicator::process_message($connector, $body);
}
sub taktuk_code($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
diagnostic::debug("Taktuk code received");
$general::taktuk_code = $body;
}
sub taktuk_perl($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
my ($options, $arguments, $filename, $attributes);
($attributes, $body) = TakTuk::unpack($body);
if ($body =~ /(?:^|\s)--(?:$|\s)/) {
($options, $arguments) = split /(?:^|\s)--(?:$|\s)/,$body,2;
$arguments = "" if not defined($arguments);
} else {
($options, $arguments) = ("", $body);
}
($filename, $arguments) = split /\s/, $arguments, 2;
$filename = "" if not defined($filename);
$arguments = "" if not defined($arguments);
diagnostic::debug("Taktuk perl execution, options [$options], filename ".
"[$filename], arguments [$arguments]");
my $command = execute($TakTuk::execute, $connector,
TakTuk::pack($attributes).
"$general::perl_interpreter $options -- - $arguments");
if ($command) {
$command->{line} = "taktuk_perl";
$command->{line} .= " $body" if $body;
general::load_taktuk_package('TakTuk');
communicator::post_write($command->{read}, "{")
or diagnostic::system;
communicator::post_write($command->{read}, $general::taktuk_package)
or diagnostic::system;
communicator::post_write($command->{read}, "}package main;")
or diagnostic::system;
if ($filename and ($filename ne "-")) {
$filename = qx{echo "$filename"};
chomp($filename);
communicator::post_write($command->{read},
general::load_file($filename)) or diagnostic::system;
communicator::post_close($command);
}
}
}
sub wait_message($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
$connector->{pending_messages}--;
if ($connector->{pending_messages} < 0) {
push @pending_recv, $connector;
if (scalar(@messages_pool)) {
my ($code, $remaining) = TakTuk::decode(shift @messages_pool);
message($code, undef, $remaining);
}
}
if (($connector->{pending_messages} < 0) and $body and ($body > 0)) {
my $timer = timer->register($body,\&recv_timeout);
$timer->{connector} = $connector;
$connector->{timer} = $timer;
}
}
sub work($$$) {
my $message = shift;
my $connector = shift;
my $body = shift;
diagnostic::debug("Work received : $body");
scheduler::dispatch_work($body);
synchronizer::check_ready_state;
}
sub init() {
handlers::register_handler($TakTuk::arguments, \&arguments);
handlers::register_handler($TakTuk::broadcast, \&broadcast);
handlers::register_handler($TakTuk::down, \&down);
handlers::register_handler($TakTuk::eof, \&eof);
handlers::register_handler($TakTuk::execute, \&execute);
handlers::register_handler($TakTuk::file, \&file);
handlers::register_handler($TakTuk::forward_up, \&forward_up);
handlers::register_handler($TakTuk::gateway, \&gateway);
handlers::register_handler($TakTuk::get, \&get);
handlers::register_handler($TakTuk::get_info, \&get_info);
handlers::register_handler($TakTuk::input, \&input);
handlers::register_handler($TakTuk::kill, \&kill);
handlers::register_handler($TakTuk::message, \&message);
handlers::register_handler($TakTuk::numbered, \&numbered);
handlers::register_handler($TakTuk::output, \&output);
handlers::register_handler($TakTuk::option, \&option);
handlers::register_handler($TakTuk::options, \&options);
handlers::register_handler($TakTuk::pipe, \&pipe);
handlers::register_handler($TakTuk::position, \&position);
handlers::register_handler($TakTuk::put, \&put);
handlers::register_handler($TakTuk::quit, \&quit);
handlers::register_handler($TakTuk::ready, \&ready);
handlers::register_handler($TakTuk::reduce, \&reduce);
handlers::register_handler($TakTuk::reduce_result, \&reduce_result);
handlers::register_handler($TakTuk::resign, \&resign);
handlers::register_handler($TakTuk::send_to, \&send_to);
handlers::register_handler($TakTuk::spread, \&spread);
handlers::register_handler($TakTuk::steal, \&steal);
handlers::register_handler($TakTuk::synchronize, \&synchronize);
handlers::register_handler($TakTuk::taktuk_code, \&taktuk_code);
handlers::register_handler($TakTuk::taktuk_perl, \&taktuk_perl);
handlers::register_handler($TakTuk::update_failed, \&update_failed);
handlers::register_handler($TakTuk::wait_message, \&wait_message);
handlers::register_handler($TakTuk::work, \&work);
}
package main;
use strict; use bytes;
our $taktuk_interpreter = undef;
our $in_interpreter = 1;
our $interactive = 0;
our $forced_interactive = 0;
our $no_numbering = 0;
our $terminate = 0;
our $quit_message = $TakTuk::quit;
our $taktuk_side;
our $interpreter_side;
sub is_one_of($@) {
my $prefix = shift;
my $found = undef;
my $number_found = 0;
foreach my $fullname (@_) {
if ($fullname =~ m/^$prefix/) {
$found = $fullname;
$number_found++;
}
}
$prefix=$found if ($number_found == 1);
return $prefix;
}
sub get_attributes() {
my $attributes = "";
my $timeout_pack = "";
my $argument = arguments::get_next_argument;
while ($argument =~ m/^[a-z]+$/o) {
$argument = is_one_of($argument, qw(timeout action kill));
if ($argument eq "timeout") {
my $value = arguments::get_next_argument;
if (length($timeout_pack)) {
$attributes .= TakTuk::pack($timeout_pack);
$timeout_pack = "";
}
$timeout_pack = TakTuk::pack($value);
} elsif ($argument eq "action") {
my ($message, $type, $data) = translate;
if (length($timeout_pack)) {
if (not $type) {
$timeout_pack .= TakTuk::pack(
TakTuk::encode($TakTuk::action, $message))
if defined($message);
} else {
diagnostic::warning("Invalid attribute action");
}
} else {
diagnostic::warning("Action requires timeout specification");
}
} elsif ($argument eq "kill") {
my $value = arguments::get_next_argument;
if ($value =~ m/^\d+$/) {
if (length($timeout_pack)) {
$timeout_pack .= TakTuk::pack(TakTuk::encode($TakTuk::kill,
$value));
} else {
diagnostic::warning("Kill requires timeout specification");
}
} else {
diagnostic::error("Invalid signal number for kill");
}
} else {
diagnostic::warning("Unknown option $argument for exec");
}
$argument = arguments::get_next_argument;
} 
$attributes .= TakTuk::pack($timeout_pack) if length($timeout_pack);
arguments::restore_last_argument;
diagnostic::debug("Exec attributes : $attributes");
return $attributes;
}
sub translate() {
my $message = undef;
my $type = "";
my $data = undef;
my @commands = qw(broadcast downcast exec get help input kill message
network option put synchronize taktuk_perl version quit);
my $found="";
my $number_found=0;
my $command;
$command = arguments::get_next_command;
if (not $arguments_ended and ($command =~ m/^[a-z]+$/o)) {
$command = is_one_of($command, @commands);
}
if ($arguments_ended) {
} elsif ($command =~ m/^\s*$/o) {
} elsif ($command =~ /^\d/) {
($message, $type, $data) = translate();
$message = TakTuk::encode($TakTuk::send_to,
TakTuk::pack($command).$message) if (defined($message));
} elsif ($command eq "broadcast") {
($message, $type, $data) = translate();
$message = TakTuk::encode($TakTuk::broadcast, $message)
if (defined($message));
} elsif ($command eq "downcast") {
($message, $type, $data) = translate();
$message = TakTuk::encode($TakTuk::down, TakTuk::encode(
$TakTuk::broadcast, $message)) if (defined($message));
} elsif ($command eq "exec") {
my $attributes = get_attributes;
my $parameters = arguments::get_parameters;
if (defined($parameters)) {
$message = TakTuk::encode($TakTuk::execute,
TakTuk::pack($attributes).$parameters);
} else {
$terminate = 1;
}
} elsif ($command eq "get") {
my $source = arguments::get_parameters;
my $destination = arguments::get_parameters;
if (defined($source) and defined($destination)) {
$message = "";
$type = $TakTuk::get;
$data = [ $source, $destination ];
} else {
$terminate = 1;
}
} elsif ($command eq "help") {
general::print_help;
} elsif (($command eq "input") or ($command eq "message")) {
my @keywords;
my $message_code;
my $target;
my $name = arguments::get_next_command;
if (defined($name) and ($name =~ m/^[a-z]+$/)) {
$name = is_one_of($name, "target");
}
if ($name eq "target") {
$target = arguments::get_next_command;
} else {
$target = ($command eq "input") ? "all" : "any";
arguments::restore_last_command;
}
if ($command eq "input") {
@keywords = qw(close data file line pipe);
$message_code = TakTuk::encode($TakTuk::input,
TakTuk::pack($target));
} else {
@keywords = qw(data file line pipe);
$message_code = TakTuk::encode($TakTuk::message,
TakTuk::pack($target).TakTuk::pack(0));
}
$name = arguments::get_next_command;
if (defined($name) and ($name =~ m/^[a-z]+$/)) {
$name = is_one_of($name, @keywords);
} else {
arguments::restore_last_command;
$name = "data";
}
if (($name eq "close") and ($command eq "input")) {
$message = TakTuk::encode($TakTuk::eof, $target);
} else {
my $parameters = arguments::get_parameters;
if (defined($parameters)) {
if ($name eq "data") {
$message = TakTuk::encode($message_code, $parameters);
} elsif ($name eq "line") {
$message = TakTuk::encode($message_code,$parameters."\n");
} elsif ($name eq "file") {
$message = "";
$type = $TakTuk::file;
$data = general::open_file($parameters);
$message = $message_code if $data;
} elsif ($name eq "pipe") {
$message = "";
$type = $TakTuk::pipe;
$data = $parameters;
$message = $message_code;
} else {
diagnostic::warning("Unknown input type, ignoring command");
}
} else {
$terminate = 1;
}
}
} elsif ($command eq "kill") {
my $target;
my $name = arguments::get_next_command;
if (defined($name) and ($name =~ m/^[a-z]+$/)) {
$name = is_one_of($name, "target");
}
if ($name eq "target") {
$target = arguments::get_next_command;
} else {
$target = "all";
arguments::restore_last_command;
}
my $parameter = arguments::get_next_command;
if ($parameter) {
$message = TakTuk::encode($TakTuk::kill,
TakTuk::pack($target).$parameter);
} else {
$message = TakTuk::encode($TakTuk::kill, TakTuk::pack($target).15);
}
} elsif ($command eq "network") {
my $subcommand = arguments::get_next_command;
if (defined($subcommand) and ($subcommand =~ m/^[a-z]+$/)) {
$subcommand = is_one_of($subcommand,
qw(cancel state renumber update));
}
if ($subcommand eq "cancel") {
$message = $TakTuk::resign;
} elsif (($subcommand eq "state") or ($subcommand eq "")) {
if ($general::root) {
$message =
TakTuk::encode($TakTuk::reduce, $TakTuk::reduce_tree);
} else {
diagnostic::warning("Cannot print tree from non root node");
}
} elsif ($subcommand eq "renumber") {
if (($general::root) and !$no_numbering) {
$message = TakTuk::encode($TakTuk::spread,
TakTuk::encode($TakTuk::reduce, $TakTuk::reduce_count));
} else {
diagnostic::warning("Cannot renumber from non root node");
}
} elsif ($subcommand eq "update") {
if (($general::root) and !$no_numbering) {
$message = TakTuk::encode($TakTuk::spread,
TakTuk::encode($TakTuk::reduce, $TakTuk::reduce_count.1));
} else {
diagnostic::warning("Cannot update from non root node");
}
} else {
diagnostic::warning("Unknwon network command $subcommand");
}
} elsif ($command eq "option") {
my $name = arguments::get_next_argument;
if (defined($name) and ($name =~ m/^[-a-z]+$/)) {
my $parameters = arguments::get_parameters;
if (defined($parameters)) {
$name = $option::short_name{$name} if (length($name) > 1);
$message = TakTuk::encode($TakTuk::option, $name.$parameters);
} else {
$terminate = 1;
}
} else {
arguments::restore_last_argument;
my $parameters = arguments::get_parameters;
if (defined($parameters)) {
$message = TakTuk::encode($TakTuk::options, $parameters);
} else {
$terminate = 1;
}
}
} elsif ($command eq "put") {
my $source = arguments::get_parameters;
my $destination = arguments::get_parameters;
if (defined($source) and defined($destination)) {
$message = "";
$type = $TakTuk::put;
$data = [ $source, $destination ];
} else {
$terminate = 1;
}
} elsif ($command eq "quit") {
$message = $quit_message;
} elsif ($command eq "synchronize") {
($message, $type, $data) = translate();
$message = TakTuk::encode($TakTuk::synchronize, $message)
if (defined($message));
} elsif ($command eq "taktuk_perl") {
my $attributes = get_attributes;
my $parameters = arguments::get_parameters;
if (defined($parameters)) {
$message = TakTuk::encode($TakTuk::taktuk_perl,
TakTuk::pack($attributes).$parameters);
} else {
$terminate = 1;
}
} elsif ($command eq "version") {
arguments::print_version;
} else {
diagnostic::error("Unknown command : $command");
}
return ($message, $type, $data);
}
sub terminate_interpreter() {
exit 0;
}
sub fork_taktuk_interpreter() {
($interpreter_side, $taktuk_side) = communicator::create_channel;
$taktuk_interpreter = fork();
if (not defined($taktuk_interpreter)) {
diagnostic::system;
diagnostic::error("FATAL : cannot continue without forked interpreter");
exit 1;
}
if (not $taktuk_interpreter) {
$SIG{INT} = \&terminate_interpreter;
if ($arguments::arguments_ended or $forced_interactive) {
diagnostic::debug("Interactive mode");
if ($arguments::arguments_ended) {
arguments::fetch_arguments(\*STDIN);
} else {
$interactive = 1;
}
}
$taktuk_side->cleanup;
communicator::cleanup;
arguments::initialize_terminal;
process_commands();
$interpreter_side->cleanup;
exit 0;
} else {
$interpreter_side->cleanup;
$taktuk_side->{remove_handler} = \&communicator::remove_interpreter;
communicator::add_connector($taktuk_side, 'interpreter');
}
}
sub handle_message($) {
my $message = shift;
if (defined($taktuk_interpreter)) {
$interpreter_side->send_message($message);
communicator::flush_pending_stuff;
} else {
communicator::process_message(undef, $message);
}
}
sub process_commands() {
my $message;
my $type;
my $data;
handle_message(TakTuk::encode($TakTuk::spread,
TakTuk::encode($TakTuk::reduce, $TakTuk::reduce_count)))
if (($general::root) and !$no_numbering);
while (not $arguments::arguments_ended and not $terminate) {
($message, $type, $data) = translate();
arguments::skip_command_separator unless $arguments::arguments_ended;
if (defined($message)) {
if ($type eq $TakTuk::file) {
my $result;
my $partial_message;
my $buffer;
$result = sysread($data, $buffer, $TakTuk::read_size/2);
while ($result) {
$partial_message = TakTuk::encode($message, $buffer);
diagnostic::debug("Message to send : $partial_message");
handle_message($partial_message);
$result = sysread($data, $buffer, $TakTuk::read_size/2);
}
diagnostic::system if not defined($result);
CORE::close($data) if ($data != \*STDIN);
} elsif ($type eq $TakTuk::pipe) {
my $new_message = TakTuk::encode($TakTuk::pipe,
TakTuk::pack($data).$message);
handle_message($new_message);
} elsif (($type eq $TakTuk::put) or ($type eq $TakTuk::get)) {
if ($message) {
my $new_message = TakTuk::encode($type, TakTuk::pack(
$message).TakTuk::pack($data->[0]).$data->[1]);
handle_message($new_message);
} else {
diagnostic::warning("get/put require a destination ".
"modifier");
}
} else {
diagnostic::debug("Message to send : $message");
handle_message($message);
}
}
if ($arguments::arguments_ended and $interactive) {
arguments::fetch_arguments(\*STDIN);
$interactive = 0;
}
}
diagnostic::debug("End of process commands");
diagnostic::debug("Args ended : $arguments::arguments_ended ".
"Terminate : $terminate");
}
TakTuk::no_flush(\*STDOUT);
general::init;
handlers::init;
arguments::init;
communicator::init;
synchronizer::set_not_ready;
arguments::fetch_arguments(@ARGV);
arguments::parse_options;
synchronizer::setup_synchronization;
if ($general::root) {
$synchronizer::father_is_ready = 1;
fork_taktuk_interpreter;
} else {
general::print($connector::init_string.$TakTuk::RELEASE."\n");
}
$in_interpreter = 0;
synchronizer::check_ready_state;
scheduler::schedule;
communicator::run;
diagnostic::debug("End of the taktuk code");
communicator::get_root()->output('state', TakTuk::TAKTUK_TERMINATED);
if ($general::root) {
kill 'INT', $taktuk_interpreter;
waitpid $taktuk_interpreter, 0;
}
exit 0;
__END__
