[PerlChina] POE eXample - alarm_forwarder
Shan Leiguang
shanleiguang at gmail.com
Sun Aug 13 21:41:08 PDT 2006
alarm_collecters -> alarm_forwarder -> alarm_monitors
ac_101_001...
ac_101_002...
描述:一个告警监控程序的告警转发部分,有一组告警采集程序采集101系统的告警
然后写到log/101的目录下,alarm_forwarder在10001端口监听,并定期扫描log目录,
将日志内容放到队列中,有客户端连接时将队列的内容发送给客户端。
*file1:conf/ports_mapping.conf*
#ALarm Ports Mapping Config
101 NMS-selfMonitor 10001
*file2:alarm_forwarder.pl*
#!C:\Perl\bin\perl.exe
#Forward alarms of virual NEs to NFM or self-Monitor client
#By shanleiguang at he.chinamobile.com, 2006/08
#
# Events
#--------
#
# +------<------+
# | |
+---<----+
# load_config->create_queues-+->alarm_scanning->update_queues
| |
# (queues) |
|->client_send->+
# |
|->accept_success-+->client_error
# +->start_listen-+ (client_wheels)
# (listen_wheels)|
# |->accept_failure
# $_[HEAP]
#----------
# $[HEAP]->{vnes}->{$vid}->{desc, port, queue, alarm_files, listen_wheel,
client_wheels}
#
package virtualOMC;
use strict;
use warnings;
use Socket;
use IO::File;
use IO::File::Multi;
use Queue::Base;
use POSIX qw/strftime/;
use POE;
use POE::Driver::SysRW;
use POE::Filter::Stream;
use POE::Wheel::ReadWrite;
use POE::Wheel::SocketFactory;
use constant PACKAGE_VERSION => '0.1';
use constant SELF_NAME => 'alarmForwarder';
use constant SELF_VERSION => '0.2';
use constant MAX_LENGTH => 10000;
use constant SCANNING_TIMER => 20;
use constant SENDING_TIMER => 30;
#For debug
use Data::Dumper;
my $logdir = 'log';
my $confdir = 'conf';
my $alarmdir = 'alarm';
my $pmconfig = 'ports_mapping.conf';
my $logfile = (strftime "%Y%m%d", localtime).'.log';
my $output = new IO::File::Multi;
POE::Session->create(
inline_states => {
_start => \&main_start,
_stop => \&main_stop,
alarm_scanning => \&alarm_scanning,
update_queues => \&update_queues,
start_listen => \&start_listen,
accept_success => \&accept_success,
accept_failure => \&accept_failure,
client_send => \&client_send,
client_error => \&client_error,
},
);
POE::Kernel->run();
exit;
#Get timestamp
sub ts { return strftime "%H:%M:%S", localtime; }
#'_start' event
sub main_start {
$output->open('>-');
$output->open(">> $logdir/$logfile");
$output->autoflush(1);
$output->print("\n".('=' x 60)."\n");
$output->print(SELF_NAME.SELF_VERSION.' of
'.__PACKAGE__.PACKAGE_VERSION."\n");
$output->print(('=' x 60)."\n\n");
&load_config;
}
#'_stop' event
sub main_stop {
$output->close();
delete $_[HEAP]->{vnes};
}
#Load ports-mapping config of virtual NEs
sub load_config {
$output->print(ts().", load ports-mapping config\n");
my $config_fh = new IO::File "$confdir/$pmconfig", "r";
while(<$config_fh>) {
chomp;
next if(m/^\s{0,}$/ or /^#/);
my ($vid, $desc, $port) = split /\s+/, $_;
$_[HEAP]->{vnes}->{$vid}->{desc} = $desc;
$_[HEAP]->{vnes}->{$vid}->{port} = $port;
}
#print Dumper($_[HEAP]->{vnes});
$config_fh->close();
&create_queues;
}
#Create alarm queues
sub create_queues {
$output->print(ts().", create alarm queues\n");
$_[HEAP]->{vnes}->{$_}->{queue} = new Queue::Base foreach(keys
%{$_[HEAP]->{vnes}});
#&dump_queues;
$_[KERNEL]->yield('alarm_scanning');
$_[KERNEL]->yield('start_listen');
}
#'alarm_scanning' event
sub alarm_scanning {
$output->print(ts().", scan alarm directory\n");
foreach my $vid (keys %{$_[HEAP]->{vnes}}) {
$_[HEAP]->{vnes}->{$vid}->{alarm_files} = ();
opendir(ALARMDIR, "$alarmdir/$vid");
foreach (sort{$a cmp $b} readdir(ALARMDIR)) {
next if(not m/\.log$/i);
push @{$_[HEAP]->{vnes}->{$vid}->{alarm_files}}, $_;
}
closedir(ALARMDIR);
}
#&dump_alarmfiles;
$_[KERNEL]->yield('update_queues');
}
#'update_queues' event
sub update_queues {
$output->print(ts().", update alarm queues\n");
foreach my $vid (keys %{$_[HEAP]->{vnes}}) {
foreach my $alarmfile (@{$_[HEAP]->{vnes}->{$vid}->{alarm_files}}) {
my @elems;
my $alarmfh = new IO::File "$alarmdir/$vid/$alarmfile", "r";
chomp and push @elems, $_ while(<$alarmfh>);
$alarmfh->close();
$_[HEAP]->{vnes}->{$vid}->{queue}->add(@elems);
my $total_length = $_[HEAP]->{vnes}->{$vid}->{queue}->size();
$_[HEAP]->{vnes}->{$vid}->{queue}->remove($total_length -
MAX_LENGTH)
if($total_length > MAX_LENGTH);
unlink("$alarmdir/$vid/$alarmfile");
}
}
#&dump_queues;
$_[KERNEL]->delay('alarm_scanning' => SCANNING_TIMER);
}
#For debug
sub dump_queues {
foreach my $vid (keys %{$_[HEAP]->{vnes}}) {
my $queue = $_[HEAP]->{vnes}->{$vid}->{queue};
print "Queue of $vid is empty\n" and next if($queue->empty());
my @elems = $queue->remove($queue->size());
print Dumper(\@elems);
}
}
#For debug
sub dump_alarmfiles {
foreach (keys %{$_[HEAP]->{vnes}}) {
print Dumper($_[HEAP]->{vnes}->{$_}->{alarm_files});
}
}
#'start_listen' event
sub start_listen {
$output->print(ts().", create listening wheels\n");
foreach my $vid (keys %{$_[HEAP]->{vnes}}) {
$_[HEAP]->{vnes}->{$vid}->{listen_wheel} =
POE::Wheel::SocketFactory->new(
BindAddress => '127.0.0.1',
BindPort => $_[HEAP]->{vnes}->{$vid}->{port},
SocketDomain => AF_INET,
SocketType => SOCK_STREAM,
ListenQueue => SOMAXCONN,
SocketProtocol => 'tcp',
Reuse => 'on',
SuccessEvent => 'accept_success',
FailureEvent => 'accept_failure',
);
}
}
#'accept_success' event
sub accept_success {
my $accepted_handle = $_[ARG0];
my ($peer_addr, $peer_port, $listen_wid)= (inet_ntoa($_[ARG1]),
$_[ARG2], $_[ARG3]);
foreach my $vid (keys %{$_[HEAP]->{vnes}}) {
if($_[HEAP]->{vnes}->{$vid}->{listen_wheel}->ID() == $listen_wid) {
$output->print(ts().", vID:$vid connected from
'$peer_addr:$peer_port'\n");
my $client_wheel = POE::Wheel::ReadWrite->new(
Handle => $accepted_handle,
Driver => POE::Driver::SysRW->new(),
Filter => POE::Filter::Stream->new(),
InputEvent => 'client_send',
ErrorEvent => 'client_error',
);
my $client_wid = $client_wheel->ID();
$_[HEAP]->{vnes}->{$vid}->{client_wheels}->{$client_wid} =
$client_wheel;
$_[KERNEL]->yield('client_send');
last;
}
}
}
#'accept_failure' event
sub accept_failure { }
#'client_send' event
sub client_send {
$output->print(ts().", send alarm queues\n");
foreach my $vid (keys %{$_[HEAP]->{vnes}}) {
my $queue = $_[HEAP]->{vnes}->{$vid}->{queue};
if($queue->size()) {
my @elems = $queue->remove($queue->size());
foreach my $client_wid (keys
%{$_[HEAP]->{vnes}->{$vid}->{client_wheels}}) {
$_[HEAP]->{vnes}->{$vid}->{client_wheels}->{$client_wid}->put("$_\n")
foreach(@elems);
}
}
}
$_[KERNEL]->delay('client_send' => SENDING_TIMER);
}
#'client_error' event
sub client_error {
my $client_wid = $_[ARG3];
foreach my $vid (keys %{$_[HEAP]->{vnes}}) {
if(defined $_[HEAP]->{vnes}->{$vid}->{client_wheels}->{$client_wid})
{
$output->print(ts().", client wheel '$client_wid' of vID:$vid
closed\n");
delete $_[HEAP]->{vnes}->{$vid}->{client_wheels}->{$client_wid};
}
}
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://mail.pm.org/pipermail/china-pm/attachments/20060814/a5bcab1f/attachment-0001.html
More information about the China-pm
mailing list