[Moscow.pm] Обмен данными с потоком.

Victor Efimov victor на vsespb.ru
Чт Июн 5 09:26:29 PDT 2014


5 июня 2014 г., 19:47 пользователь Харпалёв Иван
<ivan.kharpalev на gmail.com> написал:
> Спасибо!
> На  Go выглядит заманчиво, хотя и совершенно не понятно, как происходит
> распределение входа между воркерами.
>
> Классный пример с ForkEngine.
> Вот только в доке IO::Pipe не описаны  функции autoflush и blocking. Зачем
> они вызываются в ForkEngine?? и зачем делается binmode на дескриптор?

IO::Pipe наследник IO::Handle (видно в его коде, но не очевидно), там
они и описаны https://metacpan.org/pod/IO::Handle
blocking - в POSIX системах есть неблокирующий ввод вывод и
блокирующий, тут обязательно блокирующий
autoflush - чтобы не было буферизации
binmode пожалуй, просто на всякий случай.

>
> Передача через Pipe -- подходящее решение (прогон через paip почти не
> замедляет построчное копирование из файла в файл на Perl).
>
> А как же быть c получением из воркеров??

Воркеры - отдельный процессы, соотв. AE там не нужен. Они просто
читают из пайпов и пишут в пайпы, функциями
read, print,
единственное докция perl просит не смешивать буферизированный ввод
вывод и IO::Select (хотя так делают и часто багов нет, но иногда
есть), так что я использую
sysread, syswrite. Но если очень внимательно посмотреть документацию к
sysread/syswrite видно что они могут вернуть ошибку EINTR даже если
всё нормально и можно работать дальше, поэтому над ними нужен враппер
типа такого

https://github.com/vsespb/mt-aws-glacier/blob/421f9d04b96a4657d89eb7ef2bc66aee15b8cec3/lib/App/MtAws/Utils.pm#L269
https://github.com/vsespb/mt-aws-glacier/blob/421f9d04b96a4657d89eb7ef2bc66aee15b8cec3/lib/App/MtAws/Utils.pm#L296

вообще если хочется самому это всё делать нужно хорошо знать API вызовы unix

> как-то так?
> open my $input, "<", $in_file or die "Can not open file for read";
> open my $output, ">", $out_file or die "Can not open file for write";
> for (@workers) {
> ⇥   my $wait_for_input  = AnyEvent->io (
> ⇥   ⇥   fh => $_->{fromchild},
> ⇥   ⇥   poll => 'r',
> ⇥   ⇥   cb => sub {
> ⇥   ⇥   ⇥   say $output readline ($_->{fromchild});
> ⇥   ⇥   ⇥   $_->{c}--; #счётчик очереди в worker'е
> ⇥   ⇥   }
> ⇥   )
> }
> my $number=0;
> while (<$input>) {
> ⇥   while(1){
> ⇥   ⇥   $number = ++$number  % $#workers;
> ⇥   ⇥   my $worker = $workers[$number];
> ⇥   ⇥   if ($worker->{c} < 10) {
> ⇥   ⇥   ⇥   my $to = $worker->{tochild};
> ⇥   ⇥   ⇥   say $to $_;
> ⇥   ⇥   ⇥   $worker->{c}++;
> ⇥   ⇥   }
> ⇥   }
> }
>
> Gearman -- штука крутая, но всё-таки хочется на Perl.
> А как решить на Coro по-прежнему непонятно(
> Вроде если есть симафоры, по они должны быть для разделяемых ресурсов и
> следовательно должно быть возможно изменять оду и ту же переменную из разных
> потоков...   Возможно ли вообще такое в Perl.  Хотя пайпы вроде норм решение
> для передачи и главная проблема в неблокирующем ожидании.
>
> Спасибо!
>
>
>
> 5 июня 2014 г., 17:16 пользователь Eugene Toropov <eugene.toropov на gmail.com>
> написал:
>
>> Спасибо, реально разыгрался аппетит :) пошел тырить печеньки у товарищей
>> :)
>>
>> On Jun 5, 2014, at 5:14 PM, Alexander Lourier <aml на rulezz.ru> wrote:
>>
>> Запилил за 10 минут. Такие задачки на Go решаются элементарно. А если
>> учесть, что каждая горутина может выполняться на своём CPU и никаких GIL, то
>> становится ням-ням как вкусно.
>>
>>
>> 5 июня 2014 г., 15:10 пользователь Eugene Toropov
>> <eugene.toropov на gmail.com> написал:
>>>
>>> Интересно, это уже готовое было или запилил за полчаса?
>>>
>>> Евгений
>>>
>>> On Jun 5, 2014, at 5:03 PM, Alexander Lourier <aml на rulezz.ru> wrote:
>>>
>>> Минутка рекламы. Вот решение задачи на Go. Оно длинное, потому что я его
>>> обильно снабдил комментариями. Если лишнее убрать, всё будет выглядеть очень
>>> компактно и работать производительно.
>>>
>>>
>>> package main
>>>
>>> import (
>>> "fmt"
>>> "math/rand"
>>> "time"
>>> )
>>>
>>> const (
>>> numWorkers = 10
>>> )
>>>
>>> // task - это задание для воркера.
>>> type task struct {
>>> value  int
>>> output chan result
>>> }
>>>
>>> // result - это результат обработки задания воркером.
>>> type result struct {
>>> value  int
>>> worker int
>>> }
>>>
>>> // worker берёт данные из канала input, обрабатывает их (умножает на 100)
>>> и кладёт в канал ответа,
>>> // который прислан вместе с заданием.
>>> func worker(workerNumber int, input chan task) {
>>> // Пока входной канал не закроют, читаем из него задание.
>>> for task := range input {
>>> // Работаем в поте лица.
>>> time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
>>> task.output <- result{task.value * 100, workerNumber}
>>> }
>>> }
>>>
>>> // prepareInput готовит входные задания и кладёт их в два канала: в одну
>>> очередь
>>> // задания для воркеров, в другую - каналы ответа.
>>> func prepareInput(input chan task, output chan chan result) {
>>> for i := 0; i < 100; i++ {
>>> // Канал ответа буферизованный, чтобы воркер не ждал, когда его ответ
>>> считают,
>>> // а сразу брался за следующее задание.
>>> outputChan := make(chan result, 1)
>>> // Тот факт, что задания кладутся в input и output в одном и том же
>>> порядке,
>>> // гарантирует, что ответы будут упорядочены в том же порядке.
>>> input <- task{i, outputChan}
>>> output <- outputChan
>>> }
>>> close(input)
>>> close(output)
>>> }
>>>
>>> func main() {
>>> // Каналы обязательно буферизованные (длина буфера = числу воркеров).
>>> input := make(chan task, numWorkers)
>>> output := make(chan chan result, numWorkers)
>>>
>>> // Запускаем готовилку входных данных.
>>> go prepareInput(input, output)
>>>
>>> // Запускаем воркеры.
>>> for i := 0; i < numWorkers; i++ {
>>> go worker(i, input)
>>> }
>>>
>>> // Читаем ответы в порядке, в каком нам нужно.
>>> for res := range output {
>>> fmt.Printf("%+v\n", <-res)
>>> }
>>> }
>>>
>>>
>>>
>>> 5 июня 2014 г., 13:46 пользователь Харпалёв Иван
>>> <ivan.kharpalev на gmail.com> написал:
>>>>
>>>> Добрый день, могучий MoscowPM
>>>>
>>>> Опять про параллельную обработку.
>>>>
>>>> Хочется написать вот такую схему обработки ввода:
>>>> master создаёт work'ов,
>>>> читает порции из файла, раздаёт порции worker'ам
>>>> ждёт, пока worker обработает, получает ответ worker'a
>>>> пишет результат в файл.
>>>> Так же мастер буфереизует вывод, чтобы выход писался в правильном
>>>> порядке.
>>>>
>>>> Самое туманное:
>>>> Как передавать данные от мастера к worker'у и Обратно?!!!!
>>>> Как ждать готовности?!!!
>>>> Должна ли такая схема (работа с диском из одного места) дать ускорение
>>>> по сравнению с чтением/записью файла в каждом worker'е?
>>>>
>>>> смотрел на Coro, увидел Coro::Simaphore, Coro::Signal ... но не пойму:
>>>>   как сделать разделяемую память, (как быстро передавать данные между
>>>> мастером и worker'ом внутри Perl)?
>>>>   как сделать неблокирующее ожидание готовности одного из worker'ов (при
>>>> котором можно заниматься вводом-выводом)?
>>>>
>>>> Подскажите, на чём и как такое писать!!
>>>> Спасибо!
>>>>
>>>> Уважение
>>>> Иван Харпалев
>>>>
>>>>
>>>>
>>>> --
>>>> Moscow.pm mailing list
>>>> moscow-pm на pm.org | http://moscow.pm.org
>>>>
>>>
>>> --
>>> Moscow.pm mailing list
>>> moscow-pm на pm.org | http://moscow.pm.org
>>>
>>>
>>>
>>> --
>>> Moscow.pm mailing list
>>> moscow-pm на pm.org | http://moscow.pm.org
>>>
>>
>> --
>> Moscow.pm mailing list
>> moscow-pm на pm.org | http://moscow.pm.org
>>
>>
>>
>> --
>> Moscow.pm mailing list
>> moscow-pm на pm.org | http://moscow.pm.org
>>
>
>
> --
> Moscow.pm mailing list
> moscow-pm на pm.org | http://moscow.pm.org
>


Подробная информация о списке рассылки Moscow-pm