<div><font face="courier new,monospace" size="1"></font> </div>
<div><font face="Courier New" size="2">alarm_collecters -> alarm_forwarder -> alarm_monitors</font></div>
<div><font face="Courier New" size="1">ac_101_001...</font></div>
<div><font face="Courier New" size="1">ac_101_002...</font></div>
<div><font face="Courier New" size="1"></font> </div>
<div><font face="Courier New" size="2">描述:一个告警监控程序的告警转发部分,有一组告警采集程序采集101系统的告警</font></div>
<div><font face="Courier New" size="2"></font> </div>
<div><font face="Courier New" size="2">然后写到log/101的目录下,alarm_forwarder在10001端口监听,并定期扫描log目录,</font></div>
<div><font face="Courier New" size="2"></font> </div>
<div><font face="Courier New" size="2">将日志内容放到队列中,有客户端连接时将队列的内容发送给客户端。</font></div>
<div><font face="courier new,monospace" size="1"></font> </div>
<div><font face="Courier New" size="2"><strong>file1:conf/ports_mapping.conf</strong></font></div>
<div><font face="Courier New" size="1">
<p>#ALarm Ports Mapping Config</p>
<p>101 NMS-selfMonitor 10001</p></font></div>
<div><font face="Courier New" size="2"><strong>file2:alarm_forwarder.pl</strong></font></div>
<div><font face="Courier New" size="1"></font> </div>
<div><font face="courier new,monospace" size="1">#!C:\Perl\bin\perl.exe<br>#Forward alarms of virual NEs to NFM or self-Monitor client<br>#By </font><a href="mailto:shanleiguang@he.chinamobile.com"><font face="courier new,monospace" size="1">
shanleiguang@he.chinamobile.com</font></a><font face="courier new,monospace" size="1">, 2006/08<br>#<br># Events<br>#--------<br>#<br># +------<------+<br># | | +---<----+
<br># load_config->create_queues-+->alarm_scanning->update_queues | |<br># (queues) | |->client_send->+<br># | |->accept_success-+->client_error
<br># +->start_listen-+ (client_wheels)<br># (listen_wheels)|<br># |->accept_failure<br># $_[HEAP]<br>#----------<br>
# $[HEAP]->{vnes}->{$vid}->{desc, port, queue, alarm_files, listen_wheel, client_wheels}<br>#<br>package virtualOMC;</font></div>
<p><font face="courier new,monospace" size="1">use strict;<br>use warnings;</font></p>
<p><font face="courier new,monospace" size="1">use Socket;<br>use IO::File;<br>use IO::File::Multi;<br>use Queue::Base;<br>use POSIX qw/strftime/;</font></p>
<p><font face="courier new,monospace" size="1">use POE;<br>use POE::Driver::SysRW;<br>use POE::Filter::Stream;<br>use POE::Wheel::ReadWrite;<br>use POE::Wheel::SocketFactory;</font></p>
<p><font face="courier new,monospace" size="1">use constant PACKAGE_VERSION => '0.1';<br>use constant SELF_NAME => 'alarmForwarder';<br>use constant SELF_VERSION => '0.2';<br>use constant MAX_LENGTH => 10000;
<br>use constant SCANNING_TIMER => 20;<br>use constant SENDING_TIMER => 30;</font></p>
<p><font face="courier new,monospace" size="1">#For debug<br>use Data::Dumper;</font></p>
<p><font face="courier new,monospace" size="1">my $logdir = 'log';<br>my $confdir = 'conf';<br>my $alarmdir = 'alarm';<br>my $pmconfig = 'ports_mapping.conf';<br>my $logfile = (strftime "%Y%m%d", localtime).'.log';
<br>my $output = new IO::File::Multi;</font></p>
<p><font face="courier new,monospace" size="1">POE::Session->create(<br> inline_states => {<br> _start => \&main_start,<br> _stop => \&main_stop,</font></p>
<p><font face="courier new,monospace" size="1"> alarm_scanning => \&alarm_scanning,<br> update_queues => \&update_queues,<br> start_listen => \&start_listen,<br> accept_success => \&accept_success,
<br> accept_failure => \&accept_failure,<br> client_send => \&client_send,<br> client_error => \&client_error,<br> },<br>);</font></p>
<p><font face="courier new,monospace" size="1">POE::Kernel->run();</font></p>
<p><font face="courier new,monospace" size="1">exit;</font></p>
<p><font face="courier new,monospace" size="1">#Get timestamp<br>sub ts { return strftime "%H:%M:%S", localtime; }</font></p>
<p><font face="courier new,monospace" size="1">#'_start' event<br>sub main_start {<br> $output->open('>-');<br> $output->open(">> $logdir/$logfile");<br> $output->autoflush(1);</font>
</p>
<p><font face="courier new,monospace" size="1"> $output->print("\n".('=' x 60)."\n");<br> $output->print(SELF_NAME.SELF_VERSION.' of '.__PACKAGE__.PACKAGE_VERSION."\n");<br> $output->print(('=' x 60)."\n\n");
</font></p>
<p><font face="courier new,monospace" size="1"> &load_config;<br>}</font></p>
<p><font face="courier new,monospace" size="1">#'_stop' event<br>sub main_stop {<br> $output->close();<br> delete $_[HEAP]->{vnes};<br>}</font></p>
<p><font face="courier new,monospace" size="1">#Load ports-mapping config of virtual NEs<br>sub load_config {<br> $output->print(ts().", load ports-mapping config\n");</font></p>
<p><font face="courier new,monospace" size="1"> my $config_fh = new IO::File "$confdir/$pmconfig", "r";</font></p>
<p><font face="courier new,monospace" size="1"> while(<$config_fh>) {<br> chomp;<br> next if(m/^\s{0,}$/ or /^#/);<br> my ($vid, $desc, $port) = split /\s+/, $_;</font></p>
<p><font face="courier new,monospace" size="1"> $_[HEAP]->{vnes}->{$vid}->{desc} = $desc;<br> $_[HEAP]->{vnes}->{$vid}->{port} = $port;<br> }</font></p>
<p><font face="courier new,monospace" size="1"> #print Dumper($_[HEAP]->{vnes});<br> $config_fh->close();<br> &create_queues;<br>}</font></p>
<p><font face="courier new,monospace" size="1">#Create alarm queues<br>sub create_queues {<br> $output->print(ts().", create alarm queues\n");<br> $_[HEAP]->{vnes}->{$_}->{queue} = new Queue::Base foreach(keys %{$_[HEAP]->{vnes}});
</font></p>
<p><font face="courier new,monospace" size="1"> #&dump_queues;<br> $_[KERNEL]->yield('alarm_scanning');<br> $_[KERNEL]->yield('start_listen');<br>}</font></p>
<p><font face="courier new,monospace" size="1">#'alarm_scanning' event<br>sub alarm_scanning {<br> $output->print(ts().", scan alarm directory\n");</font></p>
<p><font face="courier new,monospace" size="1"> foreach my $vid (keys %{$_[HEAP]->{vnes}}) {<br> $_[HEAP]->{vnes}->{$vid}->{alarm_files} = ();<br> opendir(ALARMDIR, "$alarmdir/$vid");
</font></p>
<p><font face="courier new,monospace" size="1"> foreach (sort{$a cmp $b} readdir(ALARMDIR)) {<br> next if(not m/\.log$/i);<br> push @{$_[HEAP]->{vnes}->{$vid}->{alarm_files}}, $_;<br>
}</font></p>
<p><font face="courier new,monospace" size="1"> closedir(ALARMDIR);<br> }</font></p>
<p><font face="courier new,monospace" size="1"> #&dump_alarmfiles;<br> $_[KERNEL]->yield('update_queues');<br>}</font></p>
<p><font face="courier new,monospace" size="1">#'update_queues' event<br>sub update_queues {<br> $output->print(ts().", update alarm queues\n");</font></p>
<p><font face="courier new,monospace" size="1"> foreach my $vid (keys %{$_[HEAP]->{vnes}}) {<br> foreach my $alarmfile (@{$_[HEAP]->{vnes}->{$vid}->{alarm_files}}) {<br> my @elems;<br> my $alarmfh = new IO::File "$alarmdir/$vid/$alarmfile", "r";
</font></p>
<p><font face="courier new,monospace" size="1"> chomp and push @elems, $_ while(<$alarmfh>);</font></p>
<p><font face="courier new,monospace" size="1"> $alarmfh->close();<br> $_[HEAP]->{vnes}->{$vid}->{queue}->add(@elems);</font></p>
<p><font face="courier new,monospace" size="1"> my $total_length = $_[HEAP]->{vnes}->{$vid}->{queue}->size();</font></p>
<p><font face="courier new,monospace" size="1"> $_[HEAP]->{vnes}->{$vid}->{queue}->remove($total_length - MAX_LENGTH)<br> if($total_length > MAX_LENGTH);</font></p>
<p><font face="courier new,monospace" size="1"> unlink("$alarmdir/$vid/$alarmfile");<br> }<br> }</font></p>
<p><font face="courier new,monospace" size="1"> #&dump_queues;<br> $_[KERNEL]->delay('alarm_scanning' => SCANNING_TIMER);<br>}</font></p>
<p><font face="courier new,monospace" size="1">#For debug<br>sub dump_queues {<br> foreach my $vid (keys %{$_[HEAP]->{vnes}}) {<br> my $queue = $_[HEAP]->{vnes}->{$vid}->{queue};</font></p>
<p><font face="courier new,monospace" size="1"> print "Queue of $vid is empty\n" and next if($queue->empty());</font></p>
<p><font face="courier new,monospace" size="1"> my @elems = $queue->remove($queue->size());</font></p>
<p><font face="courier new,monospace" size="1"> print Dumper(\@elems);<br> }<br>}</font></p>
<p><font face="courier new,monospace" size="1">#For debug<br>sub dump_alarmfiles {<br> foreach (keys %{$_[HEAP]->{vnes}}) {<br> print Dumper($_[HEAP]->{vnes}->{$_}->{alarm_files});<br> }<br>}</font>
</p>
<p><font face="courier new,monospace" size="1">#'start_listen' event<br>sub start_listen {<br> $output->print(ts().", create listening wheels\n");</font></p>
<p><font face="courier new,monospace" size="1"> foreach my $vid (keys %{$_[HEAP]->{vnes}}) {<br> $_[HEAP]->{vnes}->{$vid}->{listen_wheel} = POE::Wheel::SocketFactory->new(<br> BindAddress => '
<a href="http://127.0.0.1">127.0.0.1</a>',<br> BindPort => $_[HEAP]->{vnes}->{$vid}->{port},<br> SocketDomain => AF_INET,<br> SocketType => SOCK_STREAM,<br> ListenQueue => SOMAXCONN,
<br> SocketProtocol => 'tcp',<br> Reuse => 'on',</font></p>
<p><font face="courier new,monospace" size="1"> SuccessEvent => 'accept_success',<br> FailureEvent => 'accept_failure',<br> );<br> }<br>}</font></p>
<p><font face="courier new,monospace" size="1">#'accept_success' event<br>sub accept_success {<br> my $accepted_handle = $_[ARG0];<br> my ($peer_addr, $peer_port, $listen_wid)= (inet_ntoa($_[ARG1]), $_[ARG2], $_[ARG3]);
</font></p>
<p><font face="courier new,monospace" size="1"> foreach my $vid (keys %{$_[HEAP]->{vnes}}) {<br> if($_[HEAP]->{vnes}->{$vid}->{listen_wheel}->ID() == $listen_wid) {<br> $output->print(ts().", vID:$vid connected from '$peer_addr:$peer_port'\n");
</font></p>
<p><font face="courier new,monospace" size="1"> my $client_wheel = POE::Wheel::ReadWrite->new(<br> Handle => $accepted_handle,<br> Driver => POE::Driver::SysRW->new(),
<br> Filter => POE::Filter::Stream->new(),</font></p>
<p><font face="courier new,monospace" size="1"> InputEvent => 'client_send',<br> ErrorEvent => 'client_error',<br> );</font></p>
<p><font face="courier new,monospace" size="1"> my $client_wid = $client_wheel->ID();</font></p>
<p><font face="courier new,monospace" size="1"> $_[HEAP]->{vnes}->{$vid}->{client_wheels}->{$client_wid} = $client_wheel;<br> $_[KERNEL]->yield('client_send');</font></p>
<p><font face="courier new,monospace" size="1"> last;<br> }<br> }<br>}</font></p>
<p><font face="courier new,monospace" size="1">#'accept_failure' event<br>sub accept_failure { }</font></p>
<p><font face="courier new,monospace" size="1">#'client_send' event<br>sub client_send {<br> $output->print(ts().", send alarm queues\n");</font></p>
<p><font face="courier new,monospace" size="1"> foreach my $vid (keys %{$_[HEAP]->{vnes}}) {<br> my $queue = $_[HEAP]->{vnes}->{$vid}->{queue};</font></p>
<p><font face="courier new,monospace" size="1"> if($queue->size()) {<br> my @elems = $queue->remove($queue->size());</font></p>
<p><font face="courier new,monospace" size="1"> foreach my $client_wid (keys %{$_[HEAP]->{vnes}->{$vid}->{client_wheels}}) {<br> $_[HEAP]->{vnes}->{$vid}->{client_wheels}->{$client_wid}->put("$_\n") foreach(@elems);
<br> }<br> }<br> }</font></p>
<p><font face="courier new,monospace" size="1"> $_[KERNEL]->delay('client_send' => SENDING_TIMER);<br>}</font></p>
<p><font face="courier new,monospace" size="1">#'client_error' event<br>sub client_error {<br> my $client_wid = $_[ARG3];</font></p>
<p><font face="courier new,monospace" size="1"> foreach my $vid (keys %{$_[HEAP]->{vnes}}) {<br> if(defined $_[HEAP]->{vnes}->{$vid}->{client_wheels}->{$client_wid}) {<br> $output->print(ts().", client wheel '$client_wid' of vID:$vid closed\n");
<br> delete $_[HEAP]->{vnes}->{$vid}->{client_wheels}->{$client_wid};<br> }<br> }<br>}</font></p>