[Moscow.pm] Обмен данными с потоком.

Victor Efimov victor на vsespb.ru
Чт Июн 5 11:24:22 PDT 2014


5 июня 2014 г., 22:12 пользователь Antonio Nikishaev <a на lelf.me> написал:
> Нахрена вам разделяемая память и костыли с трубами?
>
> Куча async. В конце всем ->join. Если надо по частям передавать данные то Coro::Channel к каждому
>

Ну а если воркеры вычисления делают, т.е. CPU потребляют, будет работать?

>
>
> On 5 Jun 2014, at 19:47, Харпалёв Иван <ivan.kharpalev на gmail.com> wrote:
>
>> Спасибо!
>> На  Go выглядит заманчиво, хотя и совершенно не понятно, как происходит распределение входа между воркерами.
>>
>> Классный пример с ForkEngine.
>> Вот только в доке IO::Pipe не описаны  функции autoflush и blocking. Зачем они вызываются в ForkEngine?? и зачем делается binmode на дескриптор?
>>
>> Передача через Pipe -- подходящее решение (прогон через paip почти не замедляет построчное копирование из файла в файл на Perl).
>>
>> А как же быть c получением из воркеров??
>> как-то так?
>> open my $input, "<", $in_file or die "Can not open file for read";
>> open my $output, ">", $out_file or die "Can not open file for write";
>> for (@workers) {
>> ⇥   my $wait_for_input  = AnyEvent->io (
>> ⇥   ⇥   fh => $_->{fromchild},
>> ⇥   ⇥   poll => 'r',
>> ⇥   ⇥   cb => sub {
>> ⇥   ⇥   ⇥   say $output readline ($_->{fromchild});
>> ⇥   ⇥   ⇥   $_->{c}--; #счётчик очереди в worker'е
>> ⇥   ⇥   }
>> ⇥   )
>> }
>> my $number=0;
>> while (<$input>) {
>> ⇥   while(1){
>> ⇥   ⇥   $number = ++$number  % $#workers;
>> ⇥   ⇥   my $worker = $workers[$number];
>> ⇥   ⇥   if ($worker->{c} < 10) {
>> ⇥   ⇥   ⇥   my $to = $worker->{tochild};
>> ⇥   ⇥   ⇥   say $to $_;
>> ⇥   ⇥   ⇥   $worker->{c}++;
>> ⇥   ⇥   }
>> ⇥   }
>> }
>>
>> Gearman -- штука крутая, но всё-таки хочется на Perl.
>> А как решить на Coro по-прежнему непонятно(
>> Вроде если есть симафоры, по они должны быть для разделяемых ресурсов и следовательно должно быть возможно изменять оду и ту же переменную из разных потоков...   Возможно ли вообще такое в Perl.  Хотя пайпы вроде норм решение для передачи и главная проблема в неблокирующем ожидании.
>>
>> Спасибо!
>>
>>
>>
>> 5 июня 2014 г., 17:16 пользователь Eugene Toropov <eugene.toropov на gmail.com> написал:
>> Спасибо, реально разыгрался аппетит :) пошел тырить печеньки у товарищей :)
>>
>> On Jun 5, 2014, at 5:14 PM, Alexander Lourier <aml на rulezz.ru> wrote:
>>
>>> Запилил за 10 минут. Такие задачки на Go решаются элементарно. А если учесть, что каждая горутина может выполняться на своём CPU и никаких GIL, то становится ням-ням как вкусно.
>>>
>>>
>>> 5 июня 2014 г., 15:10 пользователь Eugene Toropov <eugene.toropov на gmail.com> написал:
>>> Интересно, это уже готовое было или запилил за полчаса?
>>>
>>> Евгений
>>>
>>> On Jun 5, 2014, at 5:03 PM, Alexander Lourier <aml на rulezz.ru> wrote:
>>>
>>>> Минутка рекламы. Вот решение задачи на Go. Оно длинное, потому что я его обильно снабдил комментариями. Если лишнее убрать, всё будет выглядеть очень компактно и работать производительно.
>>>>
>>>> package
>>>>  main
>>>>
>>>>
>>>> import
>>>>  (
>>>>
>>>> "fmt"
>>>>
>>>>
>>>> "math/rand"
>>>>
>>>>
>>>> "time"
>>>>
>>>> )
>>>>
>>>>
>>>> const
>>>>  (
>>>>     numWorkers =
>>>> 10
>>>>
>>>> )
>>>>
>>>>
>>>> // task - это задание для воркера.
>>>> type task struct
>>>>  {
>>>>     value
>>>> int
>>>>
>>>>     output
>>>> chan
>>>>  result
>>>> }
>>>>
>>>>
>>>> // result - это результат обработки задания воркером.
>>>> type result struct
>>>>  {
>>>>     value
>>>> int
>>>>
>>>>     worker
>>>> int
>>>>
>>>> }
>>>>
>>>>
>>>> // worker берёт данные из канала input, обрабатывает их (умножает на 100) и кладёт в канал ответа,
>>>> // который прислан вместе с заданием.
>>>> func worker(workerNumber int, input chan
>>>>  task) {
>>>>
>>>> // Пока входной канал не закроют, читаем из него задание.
>>>>
>>>>
>>>> for task := range
>>>>  input {
>>>>
>>>> // Работаем в поте лица.
>>>>
>>>>             time.Sleep(time.Duration(rand.Intn(
>>>> 100
>>>> )) * time.Millisecond)
>>>>             task.output <- result{task.value *
>>>> 100
>>>> , workerNumber}
>>>>     }
>>>> }
>>>>
>>>>
>>>> // prepareInput готовит входные задания и кладёт их в два канала: в одну очередь
>>>> // задания для воркеров, в другую - каналы ответа.
>>>> func prepareInput(input chan task, output chan chan
>>>>  result) {
>>>>
>>>> for i := 0; i < 100
>>>> ; i++ {
>>>>
>>>> // Канал ответа буферизованный, чтобы воркер не ждал, когда его ответ считают,
>>>>
>>>>
>>>> // а сразу брался за следующее задание.
>>>>
>>>>             outputChan :=
>>>> make(chan result, 1
>>>> )
>>>>
>>>> // Тот факт, что задания кладутся в input и output в одном и том же порядке,
>>>>
>>>>
>>>> // гарантирует, что ответы будут упорядочены в том же порядке.
>>>>
>>>>             input <- task{i, outputChan}
>>>>             output <- outputChan
>>>>     }
>>>>
>>>> close
>>>> (input)
>>>>
>>>> close
>>>> (output)
>>>> }
>>>>
>>>>
>>>> func
>>>>  main() {
>>>>
>>>> // Каналы обязательно буферизованные (длина буфера = числу воркеров).
>>>>
>>>>     input :=
>>>> make(chan
>>>>  task, numWorkers)
>>>>     output :=
>>>> make(chan chan
>>>>  result, numWorkers)
>>>>
>>>>
>>>> // Запускаем готовилку входных данных.
>>>>
>>>>
>>>> go
>>>>  prepareInput(input, output)
>>>>
>>>>
>>>> // Запускаем воркеры.
>>>>
>>>>
>>>> for i := 0
>>>> ; i < numWorkers; i++ {
>>>>
>>>> go
>>>>  worker(i, input)
>>>>     }
>>>>
>>>>
>>>> // Читаем ответы в порядке, в каком нам нужно.
>>>>
>>>>
>>>> for res := range
>>>>  output {
>>>>             fmt.Printf(
>>>> "%+v\n"
>>>> , <-res)
>>>>     }
>>>> }
>>>>
>>>>
>>>>
>>>> 5 июня 2014 г., 13:46 пользователь Харпалёв Иван <ivan.kharpalev на gmail.com> написал:
>>>> Добрый день, могучий MoscowPM
>>>>
>>>> Опять про параллельную обработку.
>>>>
>>>> Хочется написать вот такую схему обработки ввода:
>>>> master создаёт work'ов,
>>>> читает порции из файла, раздаёт порции worker'ам
>>>> ждёт, пока worker обработает, получает ответ worker'a
>>>> пишет результат в файл.
>>>> Так же мастер буфереизует вывод, чтобы выход писался в правильном порядке.
>>>>
>>>> Самое туманное:
>>>> Как передавать данные от мастера к worker'у и Обратно?!!!!
>>>> Как ждать готовности?!!!
>>>> Должна ли такая схема (работа с диском из одного места) дать ускорение по сравнению с чтением/записью файла в каждом worker'е?
>>>>
>>>> смотрел на Coro, увидел Coro::Simaphore, Coro::Signal ... но не пойму:
>>>>   как сделать разделяемую память, (как быстро передавать данные между мастером и worker'ом внутри Perl)?
>>>>   как сделать неблокирующее ожидание готовности одного из worker'ов (при котором можно заниматься вводом-выводом)?
>>>>
>>>> Подскажите, на чём и как такое писать!!
>>>> Спасибо!
>>>>
>>>> Уважение
>>>> Иван Харпалев
>>>>
>
> --
> Moscow.pm mailing list
> moscow-pm на pm.org | http://moscow.pm.org


Подробная информация о списке рассылки Moscow-pm