Макс

Родной город: Омск

Фото галерея: смотреть

Контакты: написать

О себе:

Интересы:

- программирование

- интернет

- психология

- менеджмент

- автоматизация

Погляди
Голосование

Нравиться ли вам блог

  Да
  Нет
  Я тут случайно

 

ГлавнаяКарта сайтаПечать страницы

Сервер-демон на Perl

программка, клиент - сервер
#! /usr/bin/perl
use Socket;
chdir ("c:client");
$headip="";#ip адрес сервера
$mynik="mranarki";

#получение ip-адреса сервера
open (fin, "config.txt");
chomp($headip=);
close (fin);
print "Running Client";
print "$headip";
#задание команды головному серверу
print "Command to head server (ENTER to skip):";
chomp ($cmd=);
if ($cmd ne "exit"){
#запрос ника клиента
print "Enter your nik:";
chomp($mynik=);
$cmd="none";
}

#создание сокета для связи с головным сервером
socket(SERVER,PF_INET,SOCK_STREAM,getprotobyname('tcp'));
$server_addr=inet_aton("$headip");
$paddr=sockaddr_in(3000,$server_addr);
#подключение к головному серверу
print "Connecting to $headip server...";
connect(SERVER,$paddr);
print "Connect to head server successfully";

#отправка команды серверу
send (SERVER,$cmd,0);
if ($cmd eq "exit"){

#завершение программы
close(SERVER);
exit;
}

#получение нового порта
recv(SERVER,$myport,4,0);
print "$myport";
#отключение от головного сервера
close(SERVER);

#подключение к серверу по новому порту
socket(SERVER1,PF_INET,SOCK_STREAM,getprotobyname('tcp'));
$server_addr=inet_aton("$headip");
$paddr=sockaddr_in($myport,$server_addr);

connect(SERVER1,$paddr) or die "Erorr connection";
print "Connect to server at new port successfully";

#отправка серверу своего ника
send (SERVER1,$mynik,0) or die "SError";

#ожидание команд от пользователя
$line="none";
while($line ne "exit"){
println(":>");
chomp($line=);

#определение типа команды
if ($line eq "exit"){

#завершение работы клиента
send(SERVER1,$line,0);
exit;
}
else
{
@message=split(/:/,$line);#выделение команды из сообщения пользователя
if (((($message[0] eq "date") || ($message[0] eq "time")) || ($message[0] eq "send")) || ($message[0] eq "list")){
send (SERVER1,$line,0);
$msg="";

#ожидание ответа от сервера
while($msg eq ""){
recv(SERVER1,$msg,99,0);
}
println($msg);
}
}
}

#функция вывода сообщений на экран
sub println{
if ($_[0] eq ":>"){
print STDOUT ":>";
}
else
{
print STDOUT "$_[0]";
}
}



cервер:
#! /usr/bin/perl
use Socket;
use Switch;
use IO::Socket;
chdir ("c:server");
$nextport=3010;

#сброс файлов-каналов
open (clt,">client_table.txt");#таблица клиентов
open (snd,">sender.txt");#файл-канал сообщений
print clt "";#очистка таблицы клиентов
close (clt);
print snd "";#очистка файла-канала сообщений
close (snd);

print "Running HEAD SERVER";
#создание сокета сервера
socket(SERVER,PF_INET,SOCK_STREAM,getprotobyname('tcp'));
setsockopt(SERVER,SOL_SOCKET,SO_REUSEADDR,1);
$serv_addr=sockaddr_in(3000,INADDR_ANY);
bind(SERVER,$serv_addr);
# Установить очередь для входящих соединений
listen(SERVER,SOMAXCONN);

logger(0);
logger(2,"running HEAD SERVER");

#ожидание подключения клиента
print "Server waiting to connect clients...";
while($clientaddr=accept(CLIENT,SERVER)){
#определение ip-адреса клиента
($clientport,$clientip)=sockaddr_in($clientaddr);
$clip = inet_ntoa($clientip);
print "Client $clip is connected";
logger(2,"Client $clip is connected");
CLIENT->autoflush(1);
#получение от клиента команды на выключение
$cmd="";
while ($cmd eq ""){
recv (CLIENT,$cmd,4,0);
}
if ($cmd eq "exit"){
close (CLIENT);
close (SERVER);
logger(1);
println ("Server stop");
exit; #завершение головной программы
}

#сообщение клиенту нового нового порта подключения
$nextport+=2;
send(CLIENT,$nextport,0);
close (CLIENT);

#ветвление процесса на два: головной сервер и порожденный сервер
$pif=fork();
if ($pif){
#родитель
print "Server waiting to connect clients...";
}
else
{
#потомок
child();
}
}

#функция реализации порожденного процесса
sub child{
close(CLIENT);
close(SERVER);

#создание сервера на новом порту
socket(SERVER1,PF_INET,SOCK_STREAM,getprotobyname('tcp'));
setsockopt(SERVER1,SOL_SOCKET,SO_REUSEADDR,1);
$serv_addr1=sockaddr_in($nextport,INADDR_ANY);
bind(SERVER1,$serv_addr1) or die "Erorr";
# Установить очередь для входящих соединений
listen(SERVER1,SOMAXCONN) or die "LError";
print "accepting...";
#ожидание подключения к серверу по новому порту
while($cln=accept(CLNT,SERVER1)){

#прием ника от клиента
$line="none";
while ($nik eq ""){
recv(CLNT,$nik,8,0);
#print "$nik";
}

#регистрация ника клиента в таблице клиентов
open (clt,">>client_table.txt");
print clt "$nik";
close (clt);
$msg="none";

#ожидание команд от клиента
$line="none";
while($line eq "none"){
recvmsg();#проверка файла-канала на наличие сообщений этому процессу

recv (CLNT,$msg,99,0);#получение данных от клиента

#выделение команды из сообщение от клиента
@message=split(/:/,$msg);
$msg1=$message[0];

#определение типа запроса
SWITCH:{

#запрос даты
if ($message[0] eq "date"){
#println("first");
($day,$mon,$year)=(localtime(time))[3,4,5];
$sendmsg=$day." ".$mon." ".$year;
send(CLNT,$sendmsg,0) or die "SError";
last SWITCH;
}

#запрос времени
if ($message[0] eq "time"){
($sec,$min,$hour)=(localtime(time))[0,1,2];
$sendmsg=$hour.":".$min.":".$sec;
send(CLNT,$sendmsg,0);
last SWITCH;
}

#запрос на отправку сообщения другому клиенту
if ($message[0] eq "send"){
open (clt,">sender.txt") or die "OError";
print clt "";
print clt "$message[1]:$nik:$message[2]";
close (clt);
#println("sender");
send (CLNT, "message send",0);
last SWITCH;
}

#запрос на получение списка клиентов
if ($message[0] eq "list"){
#println("query list users");
open (ct,"client_table.txt");
$list="list of users:";
while (){
chomp;
$list=$list.$_."";
}
close (ct);
send (CLNT,$list,0);
last SWITCH;
}

#запрос на завершение работы пользователя
if ($message[0] eq "exit"){
$line="exit";
println("Child deadly");
}
}
}

#завершение работы процесса
close (CLNT);
exit;
}
}

#вывод на экран сообщений
sub println{
print STDOUT "$_[0]";
}

#проверка файла-канала на наличие сообщений
sub recvmsg{
open (clt, "sender.txt");
$msg2=;
close(clt);
if ($msg2 ne ""){
#определение адресата сообщения
@message1=split(/:/,$msg2);
if ($message1[0] eq $nik){
$sendmessage="skip commandreceive message from $message1[1]: ".$message1[2];
send (CLNT,$sendmessage,0);
open (clt, ">sender.txt");
print clt "";
close (clt);
}
}
}

#функция ведения лог файла
#0 - открытие лог файла
#1 - закрытие лог файла
#2 - добавление данных в лог
sub logger{
($sec,$min,$hour,$day,$mon,$year)=(localtime(time))[0,1,2,3,4,5];
SWITCH:{

#запись на открытие лог файла
if ($_[0] eq "0")
{
open(LOG,">>log.txt");
printf LOG "$day.$mon.$year - Open log****************";
last SWITCH;
}

#запись на закрытие лог файла
if ($_[0] eq "1")
{
printf LOG "$hour:$min:$sec - Close log*************************";
close(LOG);
last SWITCH;
}

#запись сообщение в логе
if ($_[0] eq "2")
{
printf LOG "$hour:$min:$sec - $_[1]";last SWITCH;
}
}
}

-----------------------------------------------------------------------

Сервер-демон на языке perl прослушивающий tcp сокет

После прочтения данной статьи Вы узнаете, как создавать серверы-демоны, что такое Интернет сокеты, процессы-зомби и сигналы.
Демоны

Для начала необходимо отличать понятия обычного процесса, процесса – демона и системного процесса. Все сразу видно по таблице процессов. Запустите команду :

# ps aux
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
root 1 0.1 0.1 2060 620 ? Ss 08:27 0:00 init [5]
root 2 0.0 0.0 0 0 ? S< 08:27 0:00 [migration/0]
root 5222 0.0 0.2 4532 1412 pts/1 Ss 08:30 0:00 -bash
root 5274 0.0 0.1 4244 928 pts/1 R+ 08:33 0:00 ps aux

Системные процессы - те у которых TTY = ? и VSZ = 0
Демоны – те у которых TTY = ? и VSZ ≠ 0
Пользовательские процессы – те у которых TTY = pts/1(к примеру) и VSZ ≠ 0

Если при запуске Вашей программы Вы не добились TTY = ? и VSZ ≠ 0, то Ваш процесс не является демоном.
Процессы демоны обычно запускаются при загрузке системы и завершаются при завершении работы системы.

Для создания демона необходимы следующие действия - сначала завершить родительский процесс:

$pid = fork();
exit() if $pid;

Затем разорвать связь с управляющим терминалом и создать связь с новым терминалом при помощи команды:

POSIX::setsid();
Сокеты

Описанный в данной статье сервер будет поддерживать Интернет сокет, поэтому дадим определения сокетам.
Сокеты являются "конечными пунктами" в процессе обмена данными.
Обмен данными через сокеты может осуществляться на одном компьютере или через Интернет.
Существуют два самых распространенных типа сокетов: потоковые и датаграмные. Потоковые сокеты обеспечивают двусторонние, последовательные и надежные коммуникации; они похожи на каналы (pipes). Датаграммные сокеты не обеспечивают последовательную, надежную доставку, но они гарантируют, что в процессе чтения сохранятся границы сообщений. (Описание сокетов взято из книги "PERL: Библиотека программиста" Т. Кристиансен, Н. Торкингтон)
Сокеты также делятся по областям(domain): сокеты Интернета и сокеты UNIX. Интернет сокет содержит в себе две составляющие: хост (IP-адрес в определенном формате) и номер порта. UNIX сокеты представляют собой файлы (пример сокета Unix применяется в сервере mysqld socket=/var/lib/mysql/mysql.sock ).

Для создания Интернет сокета на сервере с портом 23 необходима следующая команда:
$server = new IO::Socket::INET(LocalPort => 23,
TYPE => SOCK_STREAM,
Reuse => 1,
Listen => 10);

Такой командой создается потоковый сокет, который будет слушать 23 порт, с 10 подключениями в очереди , и с возможностью использования того же адреса после перезапуска сервера.
Серверы с ветвлением

Для обслуживания запросов от нескольких клиентов необходим сервер с ветвлением. Для этого при каждом входящем подключении от клиента необходимо делать копию родительского процесса (сервера) и "обслуживать" клиента непосредственно ответвленным (скопированным, дочерним) процессом. Делается это при помощи команды fork. Функция fork создает клон текущего процесса. В родительский процесс она возвращает $pid порожденного процесса, а в дочернем процессе возвращаемое значение равно нулю.
Поэтому для того чтобы создать самый простой сервер с ветвлением необходимо сделать следующее:
while($client = $server->accept()) {
defined(my $child_pid=fork()) or die "Can't fork new child $!";
###Родительский процесс идет в конец ###
###и ждет следующего подключения ###
next if $child_pid;
###Дочернему процессу копия сокета не нужна ###
if($child_pid == 0) {
close($server);
}
###Здесь идет обработка клиентского запроса, ###
###выполнение всех необходимых команд ###
....
exit;### В конце завершаем порожденный процесс ###
}
continue {
close($client); ### Не нужно родительскому процессу ###
}
Процессы-зомби

В сервере с ветвлением при завершении порождаемого процесса (exit) и не завершении родителя появляются процессы-зомби. Процесс-зомби - дочерний процесс в Unix системе, завершивший свое выполнение, но еще присутствующий в таблице процессов. Зомби можно узнать в списке процессов (выводимых утилитой ps) по флагу «Z» в колонке STAT.
Если родительский процесс игнорирует обработчик $SIG{CHLD}, то зомби остаются до завершения родителя. Необходимо добавить функцию отслеживания сигнала $SIG{CHLD} :
sub REAPER {
while ((my $waitedpid = waitpid(-1,WNOHANG)) > 0) { }
$SIG{CHLD} = &REAPER;
}

И перед разветвлением вызвать обработчик

$SIG{CHLD} = &REAPER;
defined(my $child_pid=fork()) or die "Can't fork new child $!";

Тогда наши наши уже не нужные отработавшие процессы будут корректно завершаться.
Сигналы

%SIG - это хэш ссылок на обработчики сигналов ( ссылки на функции).
Сигнал $SIG{INT} обычно возникает при нажатии Ctrl+C и требует, чтобы процесс завершил свою работу.
Сигнал $SIG{TERM} посылается командой kill при отсутствии явно заданного имени сигнала.
К примеру для обработки сигналов $SIG{INT} и $SIG{TERM} можно написать следующую функцию:
sub signal_handler{
$time_to_die = 1;
close($server);
}
$SIG{INT}= $SIG{TERM} = &signal_handler;

Также сервер должен обрабатывать сигнал HUP - который посылается процессу при при разрыве связи (hang-up) на управляющем терминале, либо когда программа должна перезапуститься или заново перечитать свою конфигурацию. В нашем случае когда сервер должен перечитать список разрешенных команд.
Напишем обработчик сигнала HUP следующим образом:
$SIG{HUP} = &rereading_config;
sub rereading_config{
@def_commands=();
open(FILECONF,$conf_name) or die "Can't open config file ";
while(){
chomp;
push(@def_commands, $_);
}
close(FILECONF);
}

Вызвать сигнал HUP для процесса сервера можно так:

kill -s HUP номер процесса
Краткое описание работы telnet сервера-демона

Наш сервер будет работать на 23 порту (слушать 23 порт). Он будет обрабатывать все входящие соединения на этом порту. Если пользователь будет посылать команду, указанную в конфигурационном файле, то сервер будет выполнять её, если же команда будет не из разрешенного списка, то сервер будет её пропускать и переходить к следующей команде. Для того чтобы обновить список разрешенных команд, необходимо серверу послать сигнал HUP.
Пример работы

Запускаем демона на сервере:
[root@server ~]# ./simple-telnetd.pl

Проверяем, что 23 порт прослушивается и ожидает соединения с клиентом:
[root@server ~]# netstat -an|grep LISTEN
tcp 0 0 0.0.0.0:23 0.0.0.0:* LISTEN

Теперь с машины клиента соединяемся по telnet на серевер и вводим команды, которые указаны во входных настройках (/etc/simple-telnetd.conf):

[root@client ~]# telnet 192.168.254.40
Trying 192.168.254.40...
Connected to localhost (192.168.254.40).
Escape character is '^]'.
Command :uname -a
Linux redhat2.ascon.ru 2.6.18-92.el5 #1 SMP Tue Apr 29 13:16:12 EDT 2008 i686 i686 i386 GNU/Linux
Command :uname
Linux
Command :who
root pts/1 2010-01-11 08:30 (192.168.254.1)
Command :

Видим, что программа реагирует корректно. Если ввести не корректную команду, то будет предложено ввести следующую команду.
Если в это время на сервере вывести список установленных соединений, то увидим соединение с клиентом:

[root@server ~]# netstat -an|grep EST
tcp 0 0 192.168.254.40:23 192.168.254.30:49598 ESTABLISHED

Если в это время на клиенте вывести список установленных соединений, то увидим соединение с сервером:

[root@client ~]# netstat -an|grep EST
tcp 0 0 192.168.254.30:49598 192.168.254.40:23 ESTABLISHED

После выхода клиентом из клиентской программы telnet соединение на обоих концах разорвется. Если вызывать клиентскую программу telnet несколько раз, то будут образовываться сразу несколько соединений с сервером, это реализуется с помощью распараллеливания процессов ( копирования самого себя с помощью fork).
Текст программы с комментариями
#!/usr/bin/perl -w
####################
#Sokunova Mariya
#01.12.2009
#Simple Telnetd
####################
###Подключение всех необходимых модулей###
use strict;
use POSIX;
use POSIX ":sys_wait_h";
use IO::Socket;
use IO::Handle;

###Создаем процесс-демон###
my $pid= fork();
exit() if $pid;
die "Couldn't fork: $! " unless defined($pid);
###Создаем связь с новым терминалом###
POSIX::setsid() or die "Can't start a new session $!";
###Переменная - бесконечное время жизни сервера###
my $time_to_die =0;
###Переменная - интернет-сокет или сервер###
my $server;
###Функция обработчик сигналов INT и TERM###
###Она срабатывает перед этими сигналами###
sub signal_handler{
$time_to_die = 1;
close($server);
}
$SIG{INT}= $SIG{TERM} = &signal_handler;
###Файл конфигурации с набором команд, которые обрабатывает наш сервер###
my $conf_name="/etc/simple-telnetd.conf";
###Массив где хранится список этих команд ###
my @def_commands;
###Функция обработчик сигнала HUP перечитывает конфигурационный файл###
###и обновляет массив @def_commands###
$SIG{HUP} = &rereading_config;
sub rereading_config{
@def_commands=();
open(FILECONF,$conf_name) or die "Can't open config file ";
while(){
chomp;
push(@def_commands, $_);
}
close(FILECONF);
}

###Функция обработчик сигнала CHLD - для уборки процессов зомби ###
sub REAPER {
while ((my $waitedpid = waitpid(-1,WNOHANG)) > 0) { }
$SIG{CHLD} = &REAPER;
}

###Заполняем массив разрешенных команд при старте сервера###
rereading_config();
###Создаем интернет сокет на порту 23###
my $server_port=23;
$server= new IO::Socket::INET(LocalPort => $server_port,
TYPE => SOCK_STREAM,
Reuse => 1,
Listen => 10)
or die "Couldn't be a tcp server on port $server_port: $@ ";
###Сервер работает до бесконечности пока его не вырубит Term ###
until($time_to_die){

my $client;
###Обрабатываем входящие подключения
while($client = $server->accept()){
###Включаем обработку зомби###
$SIG{CHLD} = &REAPER;
###Тот который постучался, отделяем в отдельный процесс###
defined(my $child_pid=fork()) or die "Can't fork new child $!";
###Родительский процесс идет в конец и ждет следующего подключения###
next if $child_pid;
###Дочернему процессу копия сокета не нужна, её закрываем###
if($child_pid == 0) {
close($server);
}
###Очистка буфера###
$client->autoflush(1);
my $is_def_command=0;
print $client "Command :";

###Считываем комады от клиента построчно###
while(<$client>){
###Если строка пустая переходим в конец блока###
next unless /S/;
###Запоминаем полную введенную строку, к примеру df -h ###
my $full_enter_str = $_;
chomp($full_enter_str);
###Переменная – имя команды, к примеру df###
my $enter_command="";
###Переменная – набор параметров, к примеру -h###
my $enter_params="";
###Разбиваем введенную строку на имя команды и параметры###
###########################################################
if($full_enter_str =~ /(w+)(s+)(.*)(s*)/){
$enter_command = $1;
$enter_params = $3;
}
elsif($full_enter_str =~ /(w+)/){
$enter_command = $1;
$enter_params = "";
}
else {
$enter_command = "";
$enter_params = "";
}

###Сравнение имени команды с набором разрешенных команд ###
###Просматриваем разрешенные команды в конфигурационном файле ###
foreach (@def_commands) {
if($enter_command eq $_) { $is_def_command=1;}
}


###Если команда разрешена — выполняем её###
###########################################
if($is_def_command){
my @lines = qx($enter_command $enter_params);
foreach (@lines){
print $client $_;
}
}

}
continue {
print $client "Command :";
$is_def_command=0;

}
exit;
}
continue {
close($client);
}

}

стырино http://www.netunix.ru/index.php/menuperl/articletelnetd.html

--------------------------------------------------------------------------------

Perl и GUI. Работа с потоками
Я затрону весьма наболевшую тему, Perl + GUI + потоки.
Наболевшую, потому что попытка заставить работать ваше приложение с потоками может закончиться неудачей. Программа «виснет», «сегфолитится», Вы обратитесь к документации, увидете там, что библиотека не thread-safe. Потраченное время было коту под хвост?

Хинт: создать потоки до вызова Tkx::MainLoop, так как MainLoop() запускает свой цикл событий и блокирует выполнение кода. Было бы все так просто! Переписали Вы код с этим условием, а она все равно виснет…

Что же делать? Выход есть.
Нужно использовать модель Boss/Workers (Начальник и Работники) и очереди сообщений (Queue).

Цель: написать приложение с GUI и использовать многопоточность.
Давайте, рассмотрим задачу «на пальцах», представим все в виде абстрактной модели.

Есть склад. Вы приходите к начальнику (boss),
— Привет, соберите мне вот этот списочек…
— Окей, сейчас раскидаю задание по частям, работники (workers) все сделают.

Кладовщики задания берут из стопки (причем берут по порядку их поступления).

Подобную очередь реализует пакет Thread::Queue.

Мы будем использовать несколько методов
— enqueue — положить задание
— dequeue, dequeue_nb — взять задание

Разница между dequeue и dequeue_nb в том, что последний неблокирующийся.

Другими словами, когда мы вызываем dequeue, мы ждем пока задание не появится, и только тогда его получаем. А во втором случае, если задания нет — то возвращается undef.

while( defined( my $item = $queue->dequeue() ) ) {
# выполняем какие-либо действия.
}


Кладовщики собрали весь необходимый товар, теперь его заберет грузчик, и Вам принесет.


Теперь приступим к реализации (упрощенный вариант).

Task -> Tk -> Boss -> Worker -> Result



#!/usr/bin/perl
use strict;

use Tkx; # тулкит

use threads; # работа с потоками
use Thread::Queue; # реализует очередь

# создаем очереди
my $queue_tk = Thread::Queue->new(); # получаем задания из Tk
my $queue_job = Thread::Queue->new(); # отправляем работникам
my $queue_box = Thread::Queue->new(); # результаты

# босс
sub thread_boss {
my $self = threads->self();
my $tid = $self->tid();

while( defined( my $item = $queue_tk->dequeue() ) ) {
print STDERR "Boss($tid) has received the task from Tk: $item ";

# отправляем задание на обработку работнику
$queue_job->enqueue( $item );
}

$queue_job->enqueue( undef );
}

# работник(и)
sub thread_worker {
my $self = threads->self();
my $tid = $self->tid();

while( defined( my $job = $queue_job->dequeue() ) ) {
print STDERR "Worker($tid) has received task from Boss: $job ";

# выполняем какую-нибудь работу...
print STDERR "Worker($tid) has finished the task ";

# скидываем все в одну коробку ;)
$queue_box->enqueue( "processed: $job" );
}

$queue_box->enqueue( undef );
}

# создаем потоки
my $boss = threads->new( &thread_boss );
my $worker = threads->new( &thread_worker );


# Создаем UI
my $main_window = Tkx::widget->new( '.' );
my $frame = $main_window->new_ttk__frame( -padding => q/10 10 10 10/ );
$frame->g_grid();

my $label = $frame->new_ttk__label( -text => 'waiting' );
$label->g_grid( -row => 0, -column => 0, -columnspan => 2 );

# поле для ввода
my $entry_data = 'enter data here';
my $entry = $frame->new_ttk__entry( -textvariable => $entry_data );

my $button = $frame->new_ttk__button(
-text => 'Send to Boss',
-command => sub {
$queue_tk->enqueue( $entry_data );
},
);

$entry->g_grid( -row => 1, -column => 0 );
$button->g_grid( -row => 1, -column => 1 );

# обработчик события WM_DELETE_WINDOW
sub on_destroy {
my $mw = shift;

# отсылаем очереди undef, что завершит потоки
$queue_tk->enqueue( undef );
$queue_box->enqueue( 'finish' );

# Destroy
# или Tkx::destroy( '.' )
$mw->g_destroy();
}

$main_window->g_wm_protocol( 'WM_DELETE_WINDOW', [&on_destroy, $main_window] );

# обрабатываем результат
sub monitor {
my $status_lbl = shift;
my $result = $queue_box->dequeue_nb;

if( $result ne 'finish' ) {
if( defined $result ) {
$label->configure( -text => "job completed: ".scalar(localtime));
}

Tkx::after( 1000, [&monitor, $label]);
}

}

# запускаем мониторинг
Tkx::after( 100, [&monitor, $label] );

# открепляем потоки
# иначе при завершении программы, у нас будут предупреждения
# Perl exited with active threads:
# 2 running and unjoined
# 0 finished and unjoined
# 0 running and detached
$boss->detach();
$worker->detach();

Tkx::MainLoop();


Если Вы планируете писать многопоточную программу для работы с сетью, базами данных, то я думаю что вместо стандартных потоков, гораздо правильней будет использовать POE (событийная машина, non-blocking sockets).

-----------------------------------------------------------------------

Потоки в Perl
threads - Perl interpreter-based threads

КРАТКИЙ ОБЗОР

use threads ('yield',
'stack_size' => 64*4096,
'exit' => 'threads_only',
'stringify');

sub start_thread {
my @args = @_;
print('Thread started: ', join(' ', @args), " ");
}

my $thr = threads->create('start_thread', 'argument');
$thr->join();


threads->create(sub { print("I am a thread "); })->join();


my $thr2 = async { foreach (@files) { ... } };
$thr2->join();
if (my $err = $thr2->error()) {
warn("Thread error: $err ");
}


Вызов потока в списковом контексте(неявно), т.е. он может возвращать список

my ($thr) = threads->create(sub { return (qw/a b c/); });

или указать контекст списка явно

my $thr = threads->create({'context' => 'list'},
sub { return (qw/a b c/); });
my @results = $thr->join();


$thr->detach();


Получаем объект потока

$thr = threads->self();
$thr = threads->object($tid);

Получаем ID потока

$tid = threads->tid();
$tid = $thr->tid();
$tid = "$thr";

Даём возможность запуститься другим потокам

threads->yield();
yield();


Список зависимых(non-detached) потоков

my @threads = threads->list();
my $thread_count = threads->list();

my @running = threads->list(threads::running);
my @joinable = threads->list(threads::joinable);

Тестируем объекты потока

if ($thr1 == $thr2) {
...
}

Организация размера стека потока

$stack_size = threads->get_stack_size();
$old_size = threads->set_stack_size(32*4096);

Создание потока с заданым контекстом и размером стека

my $thr = threads->create({ 'context' => 'list',
'stack_size' => 32*4096,
'exit' => 'thread_only' },
&foo);

Получение контекста потока

my $wantarray = $thr->wantarray();

Проверка состояния потока

if ($thr->is_running()) {
sleep(1);
}
if ($thr->is_joinable()) {
$thr->join();
}

Посылаем сигнал потоку

$thr->kill('SIGUSR1');

Завершение работы потока

threads->exit();


ОПИСАНИЕ

Perl 5.6 содержит одну штуку), называемую интерпретатор потоков(interpreter threads). Интерпретатор потоков отличается 5005threads (модели потоков в Perl 5.005), т.к. создаёт новый Perl интерпретатор на отдельный поток, и не разделяет данные и структуры для совместного использования потоками по умолчанию.
До Perl 5.8, это использовалось для для эмуляции fork() в Windows.

Потоковое API основано на старом Thread.pm API. Очень важно отметить, что переменные не разделяются между потоками, все переменные ,по умолчанию, являются локальными для каждого потока. Для использования разделяемых переменных необходимо использовать threads::shared:

use threads;
use threads::shared;

Также важно отметить, что вы должны включить в скрипт директиву use threads заблаговременно, насколько это возможно, а также то, что нельзя использовать потоковую модель внутри команд eval "", do, require,или use. В частности, если вы собираетесь использовать разделяемые потоками переменные с помощью threads::shared, вы обязаны указать оператор use threads до использования threads::shared. (если этого не сделать - получите предупреждение)

$thr = threads->create(FUNCTION, ARGS)
Это создаст новый поток, который начнёт выполнение с указанной точки входа функции. Список ARGS содержит параметры, которые будут переданы в функцию. Результат выполнения: соответствующий объект-поток или undef в случае неудачи.

FUNCTION может быть именем функции, анонимной подпрограммой или ссылкой на код.

my $thr = threads->create('func_name', ...);
# или
my $thr = threads->create(sub { ... }, ...);
# или
my $thr = threads->create(&func, ...);

Метод ->new() является псевдонимом для метода ->create().

$thr->join()
Этот метод будет ожидать завершения выполнения соответствующего потока. Когда поток завершит выполнение, ->join() вернёт значение(я), которые возвращает функция потока(FUNCTION).
Значение контекста (void, скаляр или список), которое будет возвращаться методу ->join() определяется во время создания потока.

# Создание потока в списковом контексте(неявно)
my ($thr1) = threads->create(sub {
my @results = qw(a b c);
return (@results);
});
# явно
my $thr1 = threads->create({'context' => 'list'},
sub {
my @results = qw(a b c);
return (@results);
});
# Вернуть результат в контексте списка из потока
my @res1 = $thr1->join();

# Создание потока в скалярном контексте(неявно)
my $thr2 = threads->create(sub {
my $result = 42;
return ($result);
});
# Вернуть результат в скалярном из потока
my $res2 = $thr2->join();

# Создание потока в void контексте(неявно)
my $thr3 = threads->create({'void' => 1},
sub { print("Hello, world "); });
# Join в контексте void (нет возвращаемого значения)
$thr3->join();

Если программа завершается до того как все потоки успели стать "присоединёнными"(joined) или "обособленными"(detached), будет выдано предупреждение. Вызов ->join() или ->detach() для уже завершившего работу потока приведёт к ошибке.

$thr->detach()
Делает поток "разъединённым"(unjoinable), что ведёт за собой сброс(отвержение) любых возможных возвращаемых значений. При завершении работы программы, все потоки такого вида, которые ещё выполняются, будут молча уничтожены.

threads->detach()
Метод класса, который позволяет потоку сделать самого себя "обособленным".

threads->self()
Метод класса, который позволяет потоку получить свой собственный объект-поток.

$thr->tid()
Возвращает ID потока. ID потока - уникальное целочисленное значение, с значением для главного потока программы 0, и увеличивающееся на 1 для каждого создаваемого потока.

threads->tid()
Метод класса, позволяющий потоку получить свой собственный ID.

"$thr"
Если добавить опцию импорта stringify в декларирование use threads, можно будет использовать объект потока как строку или в строковом контексте(н-р, как ключ в хэше) и отображаемым результатом будет ID потока:

use threads qw(stringify);
my $thr = threads->create(...);
print("Thread $thr started... "); # Результат: Thread 1 started...

threads->object($tid)
Возвращает объект-поток для активного потока, связанного с указанным ID. Если нет потока, связанного с TID, если поток является "присоединёнными"(joined) или "обособленными"(detached), если не определён TID или значение TID равно undef, то метод возвращает undef.

threads->yield()
Этот метод "рекомендует" операционной системе разрешить передачу процессорного времени данного потока другим потокам. То, что может произойти на самом деле при вызове метода, очень сильно зависит от низкоуровневой реализации многопоточности.
Необходимо указать use threads qw(yield), и затем можно использовать yield() в программе.

threads->list()
threads->list(threads::all)
threads->list(threads::running)
threads->list(threads::joinable)
Данныей вызов без аргументов(или используя threads::all) в списковом контексте, вернёт список всех объектов-потоков, которые не являются "присоединёнными"(joined) или "обособленными"(detached). В скалярном контексте будет возвращено их количество.
С аргументом true (используя threads::running), будет возвращён список объектов-потоков, которые не являются "присоединёнными"(joined) или "обособленными"(detached), которые всё ещё выполняются.
С аргументом false используя threads::joinable), будет возвращён список объектов-потоков, которые не являются "присоединёнными"(joined) или "обособленными"(detached), которые закончили выполнение(и которые не заблокированы методом ->join()).

$thr1->equal($thr2)
Проверяет являются ли одинаковыми 2 потока или нет. Этот метод перегружен для более естественной формы сравнения:

if ($thr1 == $thr2) {
print("Threads are the same ");
}
#или
if ($thr1 != $thr2) {
print("Threads differ ");
}

(Сравнение потоков основывается на их идентификаторах(ID).)

async BLOCK;
async создаёт поток для немедленного выполнения блока. BLOCK обрабатывается как анонимная процедура, и поэтому должен иметь точку с запятой после закрывающей скобки. Как и threads->create(), async возвращает объект-поток.

$thr->error()
Потоки выполняются в вычисляемом(eval) контексте. Этот метод возвращает undef при нормальном завершении потока, иначе он вернёт значение $@ связанное со статусом выполнения потока.

$thr->_handle()
Этот private метод возвращает расположение в памяти внутренней структуры, связанной с оъектом потока. Для Win32, это указатель на значение HANDLE, возвращаемое CreateThread ( HANDLE *); для других платформ, это указатель на структуру pthread_t, которая используется в вызове pthread_create( pthread_t *).
Этот метод не следует использовать для программирования потоков, его основная цель обеспечивать другие (XS-based) модули потоков возможностями доступа и манипулирования низкоуровневых структур, необходимых для реализации потоков в Perl.
threads->_handle()
Метод класса, позволяющий потоку получить его дескриптор.

ЗАВЕРШЕНИЕ ПОТОКА

Нормальный способ завершения потока вызов return() из функции с присвоением возвращаемого значения.

threads->exit()
Если необходимо, поток может выйти в любое время путём вызова threads->exit(). Это вынуждает поток вернуть undef в скалярном контексте, или пустой список в контексте списка.
При вызове из главного потока результат будет такой же как и при вызове exit(0).

threads->exit(status)
При вызове из потока, метод аналогичен threads->exit() ( статус завершения игнорируется).
При вызове из главного потока, аналогичен exit(status).

die()
Вызов die() в потоке указывает на ненормальное завершение потока. В первую очередь будет вызван любой дескриптор $SIG{__DIE__} и затем поток завершит работу с предупреждающим сообщением, которое будет содержать аргументы переданные при вызове die().

exit(status)
Вызов exit() внутри потока приводит к завершению всего приложения. По этой причине использование exit() внутри кода потока, или модулях, которые предполагается использовать в многопоточных приложениях крайне нежелательно.
Если же существует реальная необходимость в exit(), то рассмотрите один из следующих вариантов:

threads->exit() if threads->can('exit'); # потоко дружелюбно )
exit(status);


use threads 'exit' => 'threads_only'
Это глобально переопределяет поведение по умолчанию для exit() внутри потока, и фактически определяет поведение метода таким же как при вызове threads->exit(). Другими словами, с такими установками, вызов exit() приведёт именно к завершению работы самого потока.
Из-за того, что такое объявление имеет глобальный характер, не стоит использовать его в модулях и т.п.
Данная установка не влияет на работу главного потока.

threads->create({'exit' => 'thread_only'}, ...)
Таким образом можно переопределить поведение по умолчанию для exit() внутри только вновь создаваемых потоков.

$thr->set_thread_exit_only(boolean)
Этим вызовом можно изменить поведение потока при завершение, после его создания. Если аргумент true - вызов exit() приведёт только к завершению потока, а если false, вызов exit() прекратит работу всего приложения.
На главный поток это действие не распространяется.

threads->set_thread_exit_only(boolean)
Метод класса для использования внутри потока, который изменяет поведение при вызове exit().
На главный поток это действие не распространяется.

СОСТОЯНИЕ ПОТОКА
Для определения состояния потока используются следующие методы:

$thr->is_running()
Возвращает true если поток всё ещё выполняется(т.е., если функция потока не завершена).

$thr->is_joinable()
Возвращает true если поток завершил выполнение, и не поток не является "обособленными"(detached) и еще не был "присоединёнными"(joined). Другими словами, поток готов стать "присоединённым"(joined), и вызов $thr->join() не будет блокироваться.

$thr->is_detached()
Возвращает true если поток стал "обособленными"(detached)

threads->is_detached()
Метод класса, позволяющий потоку определить, является ли он "обособленными"(detached).

КОНТЕКСТ ПОТОКА
Также как и при работе с процедурами, тип значения которое возвращается функцией потока может определяться контекстом потока: список, скаляр, void(не возвращает значений). Контекст потока определяется во время его создания. Это необходимо, так как контекст доступен для функции потока(entry point function) через вызов wantarray().Поток имеет возможность определить тип возвращаемого значения, которое будет возвращено при вызове ->join().

Явный контекст
Поскольку создание и присоединение потоков могут происходить в разных контекстах, может быть весьма желательным определение контекста потока явным образом для функции потока. Это можно осуществить вызовом ->create() с передачей в качестве первого аргумента ссылки на хэш:

my $thr = threads->create({'context' => 'list'}, &foo);
...
my @results = $thr->join();

В примере выше, объект потока возвращается в родительский поток в скалярном контексте, а функция потока foo будет вызвана в контексте списка(массива) и таким образом родительский поток может принять список(массив) при вызове ->join(). (заметим, что массив и список синонимы)

Подобным образом, если вам нужен поток, который не возвращает значений(контекст void) можно создать его так:

my $thr = threads->create({'context' => 'void'}, &foo);
...
$thr->join();

Тип контекста может также использоваться в виде ключа хэша со значением true:

threads->create({'scalar' => 1}, &foo);
...
my ($thr) = threads->list();
my $result = $thr->join();


Неявный контекст
Если контекст не указан явно, он будет подразумеваться согласно контексту вызова ->create():

# Создание потока в контексте списка
my ($thr) = threads->create(...);

# Создание потока в контексте скаляра
my $thr = threads->create(...);

# Создание потока в контексте void
threads->create(...);


$thr->wantarray()
Возвращает контекст таким же образом как и функция wantarray().

threads->wantarray()
Класс метода: возвращает текущий контекст потока. Возвращает тоже значение, что и wantarray(), вызванная внутри функции потока.

РАЗМЕР СТЕКА ПОТОКА
Размер стека для отдельного потока очень сильно различается на разных платформах, и почти всегда далёк от реальных нужд приложения. На Win32, размер стека явно устанавливается в значение по умолчанию равное 16МB; на большинстве других платформ, используются системные значения по умолчанию, которые, напротив, могут быть значительно больше чем это необходимо.
Тщательная настройка размеров стека, необходимого вашему приложению, позволяет значительно сократить количество потребляемой памяти и увеличить число одновременно работающих потоков
Обратите внимание, что на Windows, степень детализации разбиения адресного пространства составляет 64 KB, по этой причине, установка размера стека меньше этого значения для Win32 Perl не приведёт ни к какой экономии памяти.

threads->get_stack_size();
Возвращает текущее значение по умолчанию размера стека для потока. Значение по умолчанию равно нулю, что означает, что используются системный размер стека по умолчанию.

$size = $thr->get_stack_size();
Возвращает размер стека для конкретного потока. Значение 0 говорит о использовании системного значения размера стека.

$old_size = threads->set_stack_size($new_size);
Устанавливает новый размер стека по умолчанию для потока и возвращает предыдущее значение размера стека.
Некоторые платформы определяют минимальный размер стека и попытка установить размер стека меньше этого значение вызовет предупреждающее сообщение, а размер стека будет установлен в минимум для данной платформы.
Некоторые платформы Linux имеют ограничения на максимальный размер стека и установка размера стека больше этого значения приведёт к ошибке при создании потока.
При необходимости, $new_size может быть округлён до размеров страницы памяти (обычно 4096 или 8192).
Потоки, которые создаются после установки размера стека будут вызывать или pthread_attr_setstacksize() (для платформ pthreads), или задавать размер стека привызове CreateThread() (для Win32 Perl).
(Разумеется, этот вызов никак не влияет на уже имеющиеся потоки.)

use threads ('stack_size' => VALUE);
Устанавливает значение стека по умолчанию при запуске приложения

$ENV{'PERL5_ITHREADS_STACK_SIZE'}
Значение по умолчанию для размера стека может быть установлено при запуске приложения, путём изменения переменной окружения PERL5_ITHREADS_STACK_SIZE:

PERL5_ITHREADS_STACK_SIZE=1048576
export PERL5_ITHREADS_STACK_SIZE
perl -e'use threads; print(threads->get_stack_size(), " ")'

Это значение переопределяет любой параметр переданный в use threads. Его первостепенная задача разрешить установку размера стека для приложений использующих потоки.

threads->create({'stack_size' => VALUE}, FUNCTION, ARGS)
Для определения индивидуального размера стека для потока, вызывайте ->create() с сылкой на хэш в качестве первого аргумента:

my $thr = threads->create({'stack_size' => 32*4096}, &foo, @args);


$thr2 = $thr1->create(FUNCTION, ARGS)
Такой вызов создаёт новый поток ($thr2), который наследует размер стека у существующего потока ($thr1). Это равносильно следующему:

my $stack_size = $thr1->get_stack_size();
my $thr2 = threads->create({'stack_size' => $stack_size}, FUNCTION, ARGS);


СИГНАЛЫ и ПОТОКИ
При использовании безопасных сигналов(safe signals), сигналы могут посылаться и действовать в соответствии с индивидуальными потоками.

$thr->kill('SIG...');
Посылает определённый сигнал потоку. Имя сигнала и (положительный) номер сигнала эквивалентны относительно их поддержки в команде kill(). Н-р, 'SIGTERM', 'TERM' и(зависит от ОС) 15 являются корректными аргументами для ->kill().

Возвращает объект потока с возможностью формирования метода цепочкой:

$thr->kill('SIG...')->join();

Обработчики сигналов должны быть установлены в потоках для тех сигналов, на которые предполагается обрабатывать. Вот пример для отмены потока:

use threads;
sub thr_func
{
# Обработчик сигнала аннулирования потока
$SIG{'KILL'} = sub { threads->exit(); };
...
}
# Создаем поток
my $thr = threads->create('thr_func');
...
# Посылаем сигнал на уничтожение потоку и затем отделяяем его, т.о. он очистится автоматически
$thr->kill('KILL')->detach();

Вот ещё простейший пример, иллюстрирующий использование сигналов в связке с семафором для обеспечения примитивных возможностей приостановки(suspend) и возобновления(resume) работы потока:

use threads;
use Thread::Semaphore;
sub thr_func
{
my $sema = shift;
# Обработчик сигнала 'suspend/resume'
$SIG{'STOP'} = sub {
$sema->down(); # Поток приостанавливается
$sema->up(); # Поток возобновляется
};

...
}
# Создаем семафор и передаем его в поток
my $sema = Thread::Semaphore->new();
my $thr = threads->create('thr_func', $sema);
# Приостановка потока
$sema->down();
$thr->kill('STOP');
...
# Разрешаем продолжить работу
$sema->up();

ПРЕДОСТЕРЕЖЕНИЕ: Возможности работы с сигналами и потоками предоставляемые этим модулем на самом деле не посылают сигналы через ОС. Происходит эмуляция сигналов на уровне интерпретатора Perl, устроенная таким образом, что обработчики сигналов вызываются в нужных потоках. Н-р, посылая $thr->kill('STOP') мы не приостанавливаем поток на самом деле(или весь процесс), но благодаря этому воздействию вызывается обработчик $SIG{'STOP'} потока(что и было показано выше).

И таким образом, сигналы, которые нельзя было бы использовать для передачи в команду kill() (н-р, kill('KILL', $$)), можно без проблем использовать в методе ->kill().

Помимо этого, посыламый потоку сигнал не может повредить выполнению текущей операции(то, что поток обрабатывает в данный момент): сигнал будет обработан только по завершении текущей операции. Н-р, если поток подвис в ожидании вводавывода, то послав ему сигнал мы не добьёмся того, что ожидание вводавывода будет прервано и сигнал будет немедленно обработан.
Сигналы, посылаемые завершённым потокам игнорируются.