[Moscow.pm] Обмен данными с потоком.
Antonio Nikishaev
a на lelf.me
Чт Июн 5 11:12:56 PDT 2014
Нахрена вам разделяемая память и костыли с трубами?
Куча async. В конце всем ->join. Если надо по частям передавать данные то Coro::Channel к каждому
On 5 Jun 2014, at 19:47, Харпалёв Иван <ivan.kharpalev at gmail.com> wrote:
> Спасибо!
> На Go выглядит заманчиво, хотя и совершенно не понятно, как происходит распределение входа между воркерами.
>
> Классный пример с ForkEngine.
> Вот только в доке IO::Pipe не описаны функции autoflush и blocking. Зачем они вызываются в ForkEngine?? и зачем делается binmode на дескриптор?
>
> Передача через Pipe -- подходящее решение (прогон через paip почти не замедляет построчное копирование из файла в файл на Perl).
>
> А как же быть c получением из воркеров??
> как-то так?
> open my $input, "<", $in_file or die "Can not open file for read";
> open my $output, ">", $out_file or die "Can not open file for write";
> for (@workers) {
> ⇥ my $wait_for_input = AnyEvent->io (
> ⇥ ⇥ fh => $_->{fromchild},
> ⇥ ⇥ poll => 'r',
> ⇥ ⇥ cb => sub {
> ⇥ ⇥ ⇥ say $output readline ($_->{fromchild});
> ⇥ ⇥ ⇥ $_->{c}--; #счётчик очереди в worker'е
> ⇥ ⇥ }
> ⇥ )
> }
> my $number=0;
> while (<$input>) {
> ⇥ while(1){
> ⇥ ⇥ $number = ++$number % $#workers;
> ⇥ ⇥ my $worker = $workers[$number];
> ⇥ ⇥ if ($worker->{c} < 10) {
> ⇥ ⇥ ⇥ my $to = $worker->{tochild};
> ⇥ ⇥ ⇥ say $to $_;
> ⇥ ⇥ ⇥ $worker->{c}++;
> ⇥ ⇥ }
> ⇥ }
> }
>
> Gearman -- штука крутая, но всё-таки хочется на Perl.
> А как решить на Coro по-прежнему непонятно(
> Вроде если есть симафоры, по они должны быть для разделяемых ресурсов и следовательно должно быть возможно изменять оду и ту же переменную из разных потоков... Возможно ли вообще такое в Perl. Хотя пайпы вроде норм решение для передачи и главная проблема в неблокирующем ожидании.
>
> Спасибо!
>
>
>
> 5 июня 2014 г., 17:16 пользователь Eugene Toropov <eugene.toropov at gmail.com> написал:
> Спасибо, реально разыгрался аппетит :) пошел тырить печеньки у товарищей :)
>
> On Jun 5, 2014, at 5:14 PM, Alexander Lourier <aml at rulezz.ru> wrote:
>
>> Запилил за 10 минут. Такие задачки на Go решаются элементарно. А если учесть, что каждая горутина может выполняться на своём CPU и никаких GIL, то становится ням-ням как вкусно.
>>
>>
>> 5 июня 2014 г., 15:10 пользователь Eugene Toropov <eugene.toropov at gmail.com> написал:
>> Интересно, это уже готовое было или запилил за полчаса?
>>
>> Евгений
>>
>> On Jun 5, 2014, at 5:03 PM, Alexander Lourier <aml at rulezz.ru> wrote:
>>
>>> Минутка рекламы. Вот решение задачи на Go. Оно длинное, потому что я его обильно снабдил комментариями. Если лишнее убрать, всё будет выглядеть очень компактно и работать производительно.
>>>
>>> package
>>> main
>>>
>>>
>>> import
>>> (
>>>
>>> "fmt"
>>>
>>>
>>> "math/rand"
>>>
>>>
>>> "time"
>>>
>>> )
>>>
>>>
>>> const
>>> (
>>> numWorkers =
>>> 10
>>>
>>> )
>>>
>>>
>>> // task - это задание для воркера.
>>> type task struct
>>> {
>>> value
>>> int
>>>
>>> output
>>> chan
>>> result
>>> }
>>>
>>>
>>> // result - это результат обработки задания воркером.
>>> type result struct
>>> {
>>> value
>>> int
>>>
>>> worker
>>> int
>>>
>>> }
>>>
>>>
>>> // worker берёт данные из канала input, обрабатывает их (умножает на 100) и кладёт в канал ответа,
>>> // который прислан вместе с заданием.
>>> func worker(workerNumber int, input chan
>>> task) {
>>>
>>> // Пока входной канал не закроют, читаем из него задание.
>>>
>>>
>>> for task := range
>>> input {
>>>
>>> // Работаем в поте лица.
>>>
>>> time.Sleep(time.Duration(rand.Intn(
>>> 100
>>> )) * time.Millisecond)
>>> task.output <- result{task.value *
>>> 100
>>> , workerNumber}
>>> }
>>> }
>>>
>>>
>>> // prepareInput готовит входные задания и кладёт их в два канала: в одну очередь
>>> // задания для воркеров, в другую - каналы ответа.
>>> func prepareInput(input chan task, output chan chan
>>> result) {
>>>
>>> for i := 0; i < 100
>>> ; i++ {
>>>
>>> // Канал ответа буферизованный, чтобы воркер не ждал, когда его ответ считают,
>>>
>>>
>>> // а сразу брался за следующее задание.
>>>
>>> outputChan :=
>>> make(chan result, 1
>>> )
>>>
>>> // Тот факт, что задания кладутся в input и output в одном и том же порядке,
>>>
>>>
>>> // гарантирует, что ответы будут упорядочены в том же порядке.
>>>
>>> input <- task{i, outputChan}
>>> output <- outputChan
>>> }
>>>
>>> close
>>> (input)
>>>
>>> close
>>> (output)
>>> }
>>>
>>>
>>> func
>>> main() {
>>>
>>> // Каналы обязательно буферизованные (длина буфера = числу воркеров).
>>>
>>> input :=
>>> make(chan
>>> task, numWorkers)
>>> output :=
>>> make(chan chan
>>> result, numWorkers)
>>>
>>>
>>> // Запускаем готовилку входных данных.
>>>
>>>
>>> go
>>> prepareInput(input, output)
>>>
>>>
>>> // Запускаем воркеры.
>>>
>>>
>>> for i := 0
>>> ; i < numWorkers; i++ {
>>>
>>> go
>>> worker(i, input)
>>> }
>>>
>>>
>>> // Читаем ответы в порядке, в каком нам нужно.
>>>
>>>
>>> for res := range
>>> output {
>>> fmt.Printf(
>>> "%+v\n"
>>> , <-res)
>>> }
>>> }
>>>
>>>
>>>
>>> 5 июня 2014 г., 13:46 пользователь Харпалёв Иван <ivan.kharpalev at gmail.com> написал:
>>> Добрый день, могучий MoscowPM
>>>
>>> Опять про параллельную обработку.
>>>
>>> Хочется написать вот такую схему обработки ввода:
>>> master создаёт work'ов,
>>> читает порции из файла, раздаёт порции worker'ам
>>> ждёт, пока worker обработает, получает ответ worker'a
>>> пишет результат в файл.
>>> Так же мастер буфереизует вывод, чтобы выход писался в правильном порядке.
>>>
>>> Самое туманное:
>>> Как передавать данные от мастера к worker'у и Обратно?!!!!
>>> Как ждать готовности?!!!
>>> Должна ли такая схема (работа с диском из одного места) дать ускорение по сравнению с чтением/записью файла в каждом worker'е?
>>>
>>> смотрел на Coro, увидел Coro::Simaphore, Coro::Signal ... но не пойму:
>>> как сделать разделяемую память, (как быстро передавать данные между мастером и worker'ом внутри Perl)?
>>> как сделать неблокирующее ожидание готовности одного из worker'ов (при котором можно заниматься вводом-выводом)?
>>>
>>> Подскажите, на чём и как такое писать!!
>>> Спасибо!
>>>
>>> Уважение
>>> Иван Харпалев
>>>
Подробная информация о списке рассылки Moscow-pm