[Moscow.pm] Обмен данными с потоком.

Antonio Nikishaev a на lelf.me
Чт Июн 5 11:12:56 PDT 2014


Нахрена вам разделяемая память и костыли с трубами?

Куча async. В конце всем ->join. Если надо по частям передавать данные то Coro::Channel к каждому



On 5 Jun 2014, at 19:47, Харпалёв Иван <ivan.kharpalev at gmail.com> wrote:

> Спасибо!
> На  Go выглядит заманчиво, хотя и совершенно не понятно, как происходит распределение входа между воркерами.
> 
> Классный пример с ForkEngine.
> Вот только в доке IO::Pipe не описаны  функции autoflush и blocking. Зачем они вызываются в ForkEngine?? и зачем делается binmode на дескриптор?
> 
> Передача через Pipe -- подходящее решение (прогон через paip почти не замедляет построчное копирование из файла в файл на Perl).
> 
> А как же быть c получением из воркеров??
> как-то так?
> open my $input, "<", $in_file or die "Can not open file for read";
> open my $output, ">", $out_file or die "Can not open file for write";
> for (@workers) {
> ⇥   my $wait_for_input  = AnyEvent->io (
> ⇥   ⇥   fh => $_->{fromchild},
> ⇥   ⇥   poll => 'r',
> ⇥   ⇥   cb => sub {
> ⇥   ⇥   ⇥   say $output readline ($_->{fromchild});
> ⇥   ⇥   ⇥   $_->{c}--; #счётчик очереди в worker'е
> ⇥   ⇥   }
> ⇥   )
> }
> my $number=0;
> while (<$input>) {
> ⇥   while(1){
> ⇥   ⇥   $number = ++$number  % $#workers;
> ⇥   ⇥   my $worker = $workers[$number];
> ⇥   ⇥   if ($worker->{c} < 10) {
> ⇥   ⇥   ⇥   my $to = $worker->{tochild};
> ⇥   ⇥   ⇥   say $to $_;
> ⇥   ⇥   ⇥   $worker->{c}++;
> ⇥   ⇥   }
> ⇥   }
> }
> 
> Gearman -- штука крутая, но всё-таки хочется на Perl.
> А как решить на Coro по-прежнему непонятно(
> Вроде если есть симафоры, по они должны быть для разделяемых ресурсов и следовательно должно быть возможно изменять оду и ту же переменную из разных потоков...   Возможно ли вообще такое в Perl.  Хотя пайпы вроде норм решение для передачи и главная проблема в неблокирующем ожидании.
> 
> Спасибо!
> 
> 
> 
> 5 июня 2014 г., 17:16 пользователь Eugene Toropov <eugene.toropov at gmail.com> написал:
> Спасибо, реально разыгрался аппетит :) пошел тырить печеньки у товарищей :)
> 
> On Jun 5, 2014, at 5:14 PM, Alexander Lourier <aml at rulezz.ru> wrote:
> 
>> Запилил за 10 минут. Такие задачки на Go решаются элементарно. А если учесть, что каждая горутина может выполняться на своём CPU и никаких GIL, то становится ням-ням как вкусно.
>> 
>> 
>> 5 июня 2014 г., 15:10 пользователь Eugene Toropov <eugene.toropov at gmail.com> написал:
>> Интересно, это уже готовое было или запилил за полчаса?
>> 
>> Евгений
>> 
>> On Jun 5, 2014, at 5:03 PM, Alexander Lourier <aml at rulezz.ru> wrote:
>> 
>>> Минутка рекламы. Вот решение задачи на Go. Оно длинное, потому что я его обильно снабдил комментариями. Если лишнее убрать, всё будет выглядеть очень компактно и работать производительно.
>>> 
>>> package
>>>  main
>>> 
>>> 
>>> import
>>>  (
>>> 	
>>> "fmt"
>>> 
>>> 	
>>> "math/rand"
>>> 
>>> 	
>>> "time"
>>> 
>>> )
>>> 
>>> 
>>> const
>>>  (
>>> 	numWorkers = 
>>> 10
>>> 
>>> )
>>> 
>>> 
>>> // task - это задание для воркера.
>>> type task struct
>>>  {
>>> 	value  
>>> int
>>> 
>>> 	output 
>>> chan
>>>  result
>>> }
>>> 
>>> 
>>> // result - это результат обработки задания воркером.
>>> type result struct
>>>  {
>>> 	value  
>>> int
>>> 
>>> 	worker 
>>> int
>>> 
>>> }
>>> 
>>> 
>>> // worker берёт данные из канала input, обрабатывает их (умножает на 100) и кладёт в канал ответа,
>>> // который прислан вместе с заданием.
>>> func worker(workerNumber int, input chan
>>>  task) {
>>> 	
>>> // Пока входной канал не закроют, читаем из него задание.
>>> 
>>> 	
>>> for task := range
>>>  input {
>>> 		
>>> // Работаем в поте лица.
>>> 
>>> 		time.Sleep(time.Duration(rand.Intn(
>>> 100
>>> )) * time.Millisecond)
>>> 		task.output <- result{task.value * 
>>> 100
>>> , workerNumber}
>>> 	}
>>> }
>>> 
>>> 
>>> // prepareInput готовит входные задания и кладёт их в два канала: в одну очередь
>>> // задания для воркеров, в другую - каналы ответа.
>>> func prepareInput(input chan task, output chan chan
>>>  result) {
>>> 	
>>> for i := 0; i < 100
>>> ; i++ {
>>> 		
>>> // Канал ответа буферизованный, чтобы воркер не ждал, когда его ответ считают,
>>> 
>>> 		
>>> // а сразу брался за следующее задание.
>>> 
>>> 		outputChan := 
>>> make(chan result, 1
>>> )
>>> 		
>>> // Тот факт, что задания кладутся в input и output в одном и том же порядке,
>>> 
>>> 		
>>> // гарантирует, что ответы будут упорядочены в том же порядке.
>>> 
>>> 		input <- task{i, outputChan}
>>> 		output <- outputChan
>>> 	}
>>> 	
>>> close
>>> (input)
>>> 	
>>> close
>>> (output)
>>> }
>>> 
>>> 
>>> func
>>>  main() {
>>> 	
>>> // Каналы обязательно буферизованные (длина буфера = числу воркеров).
>>> 
>>> 	input := 
>>> make(chan
>>>  task, numWorkers)
>>> 	output := 
>>> make(chan chan
>>>  result, numWorkers)
>>> 
>>> 	
>>> // Запускаем готовилку входных данных.
>>> 
>>> 	
>>> go
>>>  prepareInput(input, output)
>>> 
>>> 	
>>> // Запускаем воркеры.
>>> 
>>> 	
>>> for i := 0
>>> ; i < numWorkers; i++ {
>>> 		
>>> go
>>>  worker(i, input)
>>> 	}
>>> 
>>> 	
>>> // Читаем ответы в порядке, в каком нам нужно.
>>> 
>>> 	
>>> for res := range
>>>  output {
>>> 		fmt.Printf(
>>> "%+v\n"
>>> , <-res)
>>> 	}
>>> }
>>> 
>>> 
>>> 
>>> 5 июня 2014 г., 13:46 пользователь Харпалёв Иван <ivan.kharpalev at gmail.com> написал:
>>> Добрый день, могучий MoscowPM
>>> 
>>> Опять про параллельную обработку.
>>> 
>>> Хочется написать вот такую схему обработки ввода: 
>>> master создаёт work'ов, 
>>> читает порции из файла, раздаёт порции worker'ам
>>> ждёт, пока worker обработает, получает ответ worker'a 
>>> пишет результат в файл.
>>> Так же мастер буфереизует вывод, чтобы выход писался в правильном порядке.
>>> 
>>> Самое туманное:
>>> Как передавать данные от мастера к worker'у и Обратно?!!!!
>>> Как ждать готовности?!!!
>>> Должна ли такая схема (работа с диском из одного места) дать ускорение по сравнению с чтением/записью файла в каждом worker'е?
>>> 
>>> смотрел на Coro, увидел Coro::Simaphore, Coro::Signal ... но не пойму:
>>>   как сделать разделяемую память, (как быстро передавать данные между мастером и worker'ом внутри Perl)?
>>>   как сделать неблокирующее ожидание готовности одного из worker'ов (при котором можно заниматься вводом-выводом)?
>>> 
>>> Подскажите, на чём и как такое писать!!
>>> Спасибо!
>>> 
>>> Уважение
>>> Иван Харпалев
>>> 



Подробная информация о списке рассылки Moscow-pm