[Moscow.pm] Обмен данными с потоком.
Харпалёв Иван
ivan.kharpalev на gmail.com
Чт Июн 5 08:47:29 PDT 2014
Спасибо!
На Go выглядит заманчиво, хотя и совершенно не понятно, как происходит
распределение входа между воркерами.
Классный пример с *ForkEngine.*
Вот только в доке IO::Pipe не описаны функции autoflush и blocking. Зачем
они вызываются в ForkEngine?? и зачем делается binmode на дескриптор?
Передача через Pipe -- подходящее решение (прогон через paip почти не
замедляет построчное копирование из файла в файл на Perl).
А как же быть c получением из воркеров??
как-то так?
open my $input, "<", $in_file or die "Can not open file for read";
open my $output, ">", $out_file or die "Can not open file for write";
for (@workers) {
⇥ my $wait_for_input = AnyEvent->io (
⇥ ⇥ fh => $_->{fromchild},
⇥ ⇥ poll => 'r',
⇥ ⇥ cb => sub {
⇥ ⇥ ⇥ say $output readline ($_->{fromchild});
⇥ ⇥ ⇥ $_->{c}--; #счётчик очереди в worker'е
⇥ ⇥ }
⇥ )
}
my $number=0;
while (<$input>) {
⇥ while(1){
⇥ ⇥ $number = ++$number % $#workers;
⇥ ⇥ my $worker = $workers[$number];
⇥ ⇥ if ($worker->{c} < 10) {
⇥ ⇥ ⇥ my $to = $worker->{tochild};
⇥ ⇥ ⇥ say $to $_;
⇥ ⇥ ⇥ $worker->{c}++;
⇥ ⇥ }
⇥ }
}
Gearman -- штука крутая, но всё-таки хочется на Perl.
А как решить на Coro по-прежнему непонятно(
Вроде если есть симафоры, по они должны быть для разделяемых ресурсов и
следовательно должно быть возможно изменять оду и ту же переменную из
разных потоков... Возможно ли вообще такое в Perl. Хотя пайпы вроде норм
решение для передачи и главная проблема в неблокирующем ожидании.
Спасибо!
5 июня 2014 г., 17:16 пользователь Eugene Toropov <eugene.toropov на gmail.com>
написал:
> Спасибо, реально разыгрался аппетит :) пошел тырить печеньки у товарищей :)
>
> On Jun 5, 2014, at 5:14 PM, Alexander Lourier <aml на rulezz.ru> wrote:
>
> Запилил за 10 минут. Такие задачки на Go решаются элементарно. А если
> учесть, что каждая горутина может выполняться на своём CPU и никаких GIL,
> то становится ням-ням как вкусно.
>
>
> 5 июня 2014 г., 15:10 пользователь Eugene Toropov <
> eugene.toropov на gmail.com> написал:
>
>> Интересно, это уже готовое было или запилил за полчаса?
>>
>> Евгений
>>
>> On Jun 5, 2014, at 5:03 PM, Alexander Lourier <aml на rulezz.ru> wrote:
>>
>> Минутка рекламы. Вот решение задачи на Go. Оно длинное, потому что я его
>> обильно снабдил комментариями. Если лишнее убрать, всё будет выглядеть
>> очень компактно и работать производительно.
>>
>>
>> package main
>> import (
>> "fmt"
>> "math/rand"
>> "time"
>> )
>> const (
>> numWorkers = 10
>> )
>> // task - это задание для воркера.type task struct {
>> value int
>> output chan result
>> }
>> // result - это результат обработки задания воркером.type result struct {
>> value int
>> worker int
>> }
>> // worker берёт данные из канала input, обрабатывает их (умножает на 100) и кладёт в канал ответа,// который прислан вместе с заданием.func worker(workerNumber int, input chan task) {
>> // Пока входной канал не закроют, читаем из него задание.
>> for task := range input {
>> // Работаем в поте лица.
>> time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
>> task.output <- result{task.value * 100, workerNumber}
>> }
>> }
>> // prepareInput готовит входные задания и кладёт их в два канала: в одну очередь// задания для воркеров, в другую - каналы ответа.func prepareInput(input chan task, output chan chan result) {
>> for i := 0; i < 100; i++ {
>> // Канал ответа буферизованный, чтобы воркер не ждал, когда его ответ считают,
>> // а сразу брался за следующее задание.
>> outputChan := make(chan result, 1)
>> // Тот факт, что задания кладутся в input и output в одном и том же порядке,
>> // гарантирует, что ответы будут упорядочены в том же порядке.
>> input <- task{i, outputChan}
>> output <- outputChan
>> }
>> close(input)
>> close(output)
>> }
>> func main() {
>> // Каналы обязательно буферизованные (длина буфера = числу воркеров).
>> input := make(chan task, numWorkers)
>> output := make(chan chan result, numWorkers)
>>
>> // Запускаем готовилку входных данных.
>> go prepareInput(input, output)
>>
>> // Запускаем воркеры.
>> for i := 0; i < numWorkers; i++ {
>> go worker(i, input)
>> }
>>
>> // Читаем ответы в порядке, в каком нам нужно.
>> for res := range output {
>> fmt.Printf("%+v\n", <-res)
>> }
>> }
>>
>>
>>
>> 5 июня 2014 г., 13:46 пользователь Харпалёв Иван <
>> ivan.kharpalev на gmail.com> написал:
>>
>>> Добрый день, могучий MoscowPM
>>>
>>> Опять про параллельную обработку.
>>>
>>> Хочется написать вот такую схему обработки ввода:
>>> master создаёт work'ов,
>>> читает порции из файла, раздаёт порции worker'ам
>>> ждёт, пока worker обработает, получает ответ worker'a
>>> пишет результат в файл.
>>> Так же мастер буфереизует вывод, чтобы выход писался в правильном
>>> порядке.
>>>
>>> Самое туманное:
>>> Как передавать данные от мастера к worker'у и Обратно?!!!!
>>> Как ждать готовности?!!!
>>> Должна ли такая схема (работа с диском из одного места) дать ускорение
>>> по сравнению с чтением/записью файла в каждом worker'е?
>>>
>>> смотрел на Coro, увидел Coro::Simaphore, Coro::Signal ... но не пойму:
>>> как сделать разделяемую память, (как быстро передавать данные между
>>> мастером и worker'ом внутри Perl)?
>>> как сделать неблокирующее ожидание готовности одного из worker'ов (при
>>> котором можно заниматься вводом-выводом)?
>>>
>>> Подскажите, на чём и как такое писать!!
>>> Спасибо!
>>>
>>> Уважение
>>> Иван Харпалев
>>>
>>>
>>>
>>> --
>>> Moscow.pm mailing list
>>> moscow-pm на pm.org | http://moscow.pm.org
>>>
>>>
>> --
>> Moscow.pm mailing list
>> moscow-pm на pm.org | http://moscow.pm.org
>>
>>
>>
>> --
>> Moscow.pm mailing list
>> moscow-pm на pm.org | http://moscow.pm.org
>>
>>
> --
> Moscow.pm mailing list
> moscow-pm на pm.org | http://moscow.pm.org
>
>
>
> --
> Moscow.pm mailing list
> moscow-pm на pm.org | http://moscow.pm.org
>
>
----------- следущая часть -----------
Вложение в формате HTML было извлечено…
URL: <http://mail.pm.org/pipermail/moscow-pm/attachments/20140605/7a91630c/attachment-0001.html>
Подробная информация о списке рассылки Moscow-pm