[Moscow.pm] Обмен данными с потоком.

Vladimir Melnichenko melnichenkovv на gmail.com
Чт Июн 5 20:35:45 PDT 2014


>Ну а если воркеры вычисления делают, т.е. CPU потребляют, будет работать?

Отвечу за Антона - будет, медленней точно не станет.
Оборачиваем "воркер" в async - в цикле эти асинки складываем в массив,
Делаем join и радуемся массиву значений.
Создавать под это процессы это перебор.


5 июня 2014 г., 22:24 пользователь Victor Efimov <victor на vsespb.ru> написал:

> 5 июня 2014 г., 22:12 пользователь Antonio Nikishaev <a на lelf.me> написал:
> > Нахрена вам разделяемая память и костыли с трубами?
> >
> > Куча async. В конце всем ->join. Если надо по частям передавать данные
> то Coro::Channel к каждому
> >
>
> Ну а если воркеры вычисления делают, т.е. CPU потребляют, будет работать?
>
> >
> >
> > On 5 Jun 2014, at 19:47, Харпалёв Иван <ivan.kharpalev на gmail.com> wrote:
> >
> >> Спасибо!
> >> На  Go выглядит заманчиво, хотя и совершенно не понятно, как происходит
> распределение входа между воркерами.
> >>
> >> Классный пример с ForkEngine.
> >> Вот только в доке IO::Pipe не описаны  функции autoflush и blocking.
> Зачем они вызываются в ForkEngine?? и зачем делается binmode на дескриптор?
> >>
> >> Передача через Pipe -- подходящее решение (прогон через paip почти не
> замедляет построчное копирование из файла в файл на Perl).
> >>
> >> А как же быть c получением из воркеров??
> >> как-то так?
> >> open my $input, "<", $in_file or die "Can not open file for read";
> >> open my $output, ">", $out_file or die "Can not open file for write";
> >> for (@workers) {
> >> ⇥   my $wait_for_input  = AnyEvent->io (
> >> ⇥   ⇥   fh => $_->{fromchild},
> >> ⇥   ⇥   poll => 'r',
> >> ⇥   ⇥   cb => sub {
> >> ⇥   ⇥   ⇥   say $output readline ($_->{fromchild});
> >> ⇥   ⇥   ⇥   $_->{c}--; #счётчик очереди в worker'е
> >> ⇥   ⇥   }
> >> ⇥   )
> >> }
> >> my $number=0;
> >> while (<$input>) {
> >> ⇥   while(1){
> >> ⇥   ⇥   $number = ++$number  % $#workers;
> >> ⇥   ⇥   my $worker = $workers[$number];
> >> ⇥   ⇥   if ($worker->{c} < 10) {
> >> ⇥   ⇥   ⇥   my $to = $worker->{tochild};
> >> ⇥   ⇥   ⇥   say $to $_;
> >> ⇥   ⇥   ⇥   $worker->{c}++;
> >> ⇥   ⇥   }
> >> ⇥   }
> >> }
> >>
> >> Gearman -- штука крутая, но всё-таки хочется на Perl.
> >> А как решить на Coro по-прежнему непонятно(
> >> Вроде если есть симафоры, по они должны быть для разделяемых ресурсов и
> следовательно должно быть возможно изменять оду и ту же переменную из
> разных потоков...   Возможно ли вообще такое в Perl.  Хотя пайпы вроде норм
> решение для передачи и главная проблема в неблокирующем ожидании.
> >>
> >> Спасибо!
> >>
> >>
> >>
> >> 5 июня 2014 г., 17:16 пользователь Eugene Toropov <
> eugene.toropov на gmail.com> написал:
> >> Спасибо, реально разыгрался аппетит :) пошел тырить печеньки у
> товарищей :)
> >>
> >> On Jun 5, 2014, at 5:14 PM, Alexander Lourier <aml на rulezz.ru> wrote:
> >>
> >>> Запилил за 10 минут. Такие задачки на Go решаются элементарно. А если
> учесть, что каждая горутина может выполняться на своём CPU и никаких GIL,
> то становится ням-ням как вкусно.
> >>>
> >>>
> >>> 5 июня 2014 г., 15:10 пользователь Eugene Toropov <
> eugene.toropov на gmail.com> написал:
> >>> Интересно, это уже готовое было или запилил за полчаса?
> >>>
> >>> Евгений
> >>>
> >>> On Jun 5, 2014, at 5:03 PM, Alexander Lourier <aml на rulezz.ru> wrote:
> >>>
> >>>> Минутка рекламы. Вот решение задачи на Go. Оно длинное, потому что я
> его обильно снабдил комментариями. Если лишнее убрать, всё будет выглядеть
> очень компактно и работать производительно.
> >>>>
> >>>> package
> >>>>  main
> >>>>
> >>>>
> >>>> import
> >>>>  (
> >>>>
> >>>> "fmt"
> >>>>
> >>>>
> >>>> "math/rand"
> >>>>
> >>>>
> >>>> "time"
> >>>>
> >>>> )
> >>>>
> >>>>
> >>>> const
> >>>>  (
> >>>>     numWorkers =
> >>>> 10
> >>>>
> >>>> )
> >>>>
> >>>>
> >>>> // task - это задание для воркера.
> >>>> type task struct
> >>>>  {
> >>>>     value
> >>>> int
> >>>>
> >>>>     output
> >>>> chan
> >>>>  result
> >>>> }
> >>>>
> >>>>
> >>>> // result - это результат обработки задания воркером.
> >>>> type result struct
> >>>>  {
> >>>>     value
> >>>> int
> >>>>
> >>>>     worker
> >>>> int
> >>>>
> >>>> }
> >>>>
> >>>>
> >>>> // worker берёт данные из канала input, обрабатывает их (умножает на
> 100) и кладёт в канал ответа,
> >>>> // который прислан вместе с заданием.
> >>>> func worker(workerNumber int, input chan
> >>>>  task) {
> >>>>
> >>>> // Пока входной канал не закроют, читаем из него задание.
> >>>>
> >>>>
> >>>> for task := range
> >>>>  input {
> >>>>
> >>>> // Работаем в поте лица.
> >>>>
> >>>>             time.Sleep(time.Duration(rand.Intn(
> >>>> 100
> >>>> )) * time.Millisecond)
> >>>>             task.output <- result{task.value *
> >>>> 100
> >>>> , workerNumber}
> >>>>     }
> >>>> }
> >>>>
> >>>>
> >>>> // prepareInput готовит входные задания и кладёт их в два канала: в
> одну очередь
> >>>> // задания для воркеров, в другую - каналы ответа.
> >>>> func prepareInput(input chan task, output chan chan
> >>>>  result) {
> >>>>
> >>>> for i := 0; i < 100
> >>>> ; i++ {
> >>>>
> >>>> // Канал ответа буферизованный, чтобы воркер не ждал, когда его ответ
> считают,
> >>>>
> >>>>
> >>>> // а сразу брался за следующее задание.
> >>>>
> >>>>             outputChan :=
> >>>> make(chan result, 1
> >>>> )
> >>>>
> >>>> // Тот факт, что задания кладутся в input и output в одном и том же
> порядке,
> >>>>
> >>>>
> >>>> // гарантирует, что ответы будут упорядочены в том же порядке.
> >>>>
> >>>>             input <- task{i, outputChan}
> >>>>             output <- outputChan
> >>>>     }
> >>>>
> >>>> close
> >>>> (input)
> >>>>
> >>>> close
> >>>> (output)
> >>>> }
> >>>>
> >>>>
> >>>> func
> >>>>  main() {
> >>>>
> >>>> // Каналы обязательно буферизованные (длина буфера = числу воркеров).
> >>>>
> >>>>     input :=
> >>>> make(chan
> >>>>  task, numWorkers)
> >>>>     output :=
> >>>> make(chan chan
> >>>>  result, numWorkers)
> >>>>
> >>>>
> >>>> // Запускаем готовилку входных данных.
> >>>>
> >>>>
> >>>> go
> >>>>  prepareInput(input, output)
> >>>>
> >>>>
> >>>> // Запускаем воркеры.
> >>>>
> >>>>
> >>>> for i := 0
> >>>> ; i < numWorkers; i++ {
> >>>>
> >>>> go
> >>>>  worker(i, input)
> >>>>     }
> >>>>
> >>>>
> >>>> // Читаем ответы в порядке, в каком нам нужно.
> >>>>
> >>>>
> >>>> for res := range
> >>>>  output {
> >>>>             fmt.Printf(
> >>>> "%+v\n"
> >>>> , <-res)
> >>>>     }
> >>>> }
> >>>>
> >>>>
> >>>>
> >>>> 5 июня 2014 г., 13:46 пользователь Харпалёв Иван <
> ivan.kharpalev на gmail.com> написал:
> >>>> Добрый день, могучий MoscowPM
> >>>>
> >>>> Опять про параллельную обработку.
> >>>>
> >>>> Хочется написать вот такую схему обработки ввода:
> >>>> master создаёт work'ов,
> >>>> читает порции из файла, раздаёт порции worker'ам
> >>>> ждёт, пока worker обработает, получает ответ worker'a
> >>>> пишет результат в файл.
> >>>> Так же мастер буфереизует вывод, чтобы выход писался в правильном
> порядке.
> >>>>
> >>>> Самое туманное:
> >>>> Как передавать данные от мастера к worker'у и Обратно?!!!!
> >>>> Как ждать готовности?!!!
> >>>> Должна ли такая схема (работа с диском из одного места) дать
> ускорение по сравнению с чтением/записью файла в каждом worker'е?
> >>>>
> >>>> смотрел на Coro, увидел Coro::Simaphore, Coro::Signal ... но не пойму:
> >>>>   как сделать разделяемую память, (как быстро передавать данные между
> мастером и worker'ом внутри Perl)?
> >>>>   как сделать неблокирующее ожидание готовности одного из worker'ов
> (при котором можно заниматься вводом-выводом)?
> >>>>
> >>>> Подскажите, на чём и как такое писать!!
> >>>> Спасибо!
> >>>>
> >>>> Уважение
> >>>> Иван Харпалев
> >>>>
> >
> > --
> > Moscow.pm mailing list
> > moscow-pm на pm.org | http://moscow.pm.org
> --
> Moscow.pm mailing list
> moscow-pm на pm.org | http://moscow.pm.org
>
----------- следущая часть -----------
Вложение в формате HTML было извлечено…
URL: <http://mail.pm.org/pipermail/moscow-pm/attachments/20140606/9e6827f7/attachment-0001.html>


Подробная информация о списке рассылки Moscow-pm