[PerlChina] POE eXample - alarm_forwarder

Shan Leiguang shanleiguang at gmail.com
Sun Aug 13 21:41:08 PDT 2006


alarm_collecters -> alarm_forwarder -> alarm_monitors
ac_101_001...
ac_101_002...

描述:一个告警监控程序的告警转发部分,有一组告警采集程序采集101系统的告警

然后写到log/101的目录下,alarm_forwarder在10001端口监听,并定期扫描log目录,

将日志内容放到队列中,有客户端连接时将队列的内容发送给客户端。

*file1:conf/ports_mapping.conf*

#ALarm Ports Mapping Config

101 NMS-selfMonitor 10001
*file2:alarm_forwarder.pl*

#!C:\Perl\bin\perl.exe
#Forward alarms of virual NEs to NFM or self-Monitor client
#By shanleiguang at he.chinamobile.com, 2006/08
#
# Events
#--------
#
#                                     +------<------+
#                                     |             |
+---<----+
# load_config->create_queues-+->alarm_scanning->update_queues
|        |
#                (queues)    |
|->client_send->+
#                            |
|->accept_success-+->client_error
#                            +->start_listen-+ (client_wheels)
#                             (listen_wheels)|
#                                            |->accept_failure
# $_[HEAP]
#----------
# $[HEAP]->{vnes}->{$vid}->{desc, port, queue, alarm_files, listen_wheel,
client_wheels}
#
package virtualOMC;

use strict;
use warnings;

use Socket;
use IO::File;
use IO::File::Multi;
use Queue::Base;
use POSIX qw/strftime/;

use POE;
use POE::Driver::SysRW;
use POE::Filter::Stream;
use POE::Wheel::ReadWrite;
use POE::Wheel::SocketFactory;

use constant PACKAGE_VERSION => '0.1';
use constant SELF_NAME       => 'alarmForwarder';
use constant SELF_VERSION    => '0.2';
use constant MAX_LENGTH      => 10000;
use constant SCANNING_TIMER  => 20;
use constant SENDING_TIMER   => 30;

#For debug
use Data::Dumper;

my $logdir   = 'log';
my $confdir  = 'conf';
my $alarmdir = 'alarm';
my $pmconfig = 'ports_mapping.conf';
my $logfile  = (strftime "%Y%m%d", localtime).'.log';
my $output   = new IO::File::Multi;

POE::Session->create(
    inline_states => {
        _start => \&main_start,
        _stop  => \&main_stop,

        alarm_scanning => \&alarm_scanning,
        update_queues  => \&update_queues,
        start_listen   => \&start_listen,
        accept_success => \&accept_success,
        accept_failure => \&accept_failure,
        client_send    => \&client_send,
        client_error   => \&client_error,
    },
);

POE::Kernel->run();

exit;

#Get timestamp
sub ts { return strftime "%H:%M:%S", localtime; }

#'_start' event
sub main_start {
    $output->open('>-');
    $output->open(">> $logdir/$logfile");
    $output->autoflush(1);

    $output->print("\n".('=' x 60)."\n");
    $output->print(SELF_NAME.SELF_VERSION.' of
'.__PACKAGE__.PACKAGE_VERSION."\n");
    $output->print(('=' x 60)."\n\n");

    &load_config;
}

#'_stop' event
sub main_stop {
    $output->close();
    delete $_[HEAP]->{vnes};
}

#Load ports-mapping config of virtual NEs
sub load_config {
    $output->print(ts().", load ports-mapping config\n");

    my $config_fh = new IO::File "$confdir/$pmconfig", "r";

    while(<$config_fh>) {
        chomp;
        next if(m/^\s{0,}$/ or /^#/);
        my ($vid, $desc, $port) = split /\s+/, $_;

        $_[HEAP]->{vnes}->{$vid}->{desc} = $desc;
        $_[HEAP]->{vnes}->{$vid}->{port} = $port;
    }

    #print Dumper($_[HEAP]->{vnes});
    $config_fh->close();
    &create_queues;
}

#Create alarm queues
sub create_queues {
    $output->print(ts().", create alarm queues\n");
    $_[HEAP]->{vnes}->{$_}->{queue} = new Queue::Base foreach(keys
%{$_[HEAP]->{vnes}});

    #&dump_queues;
    $_[KERNEL]->yield('alarm_scanning');
    $_[KERNEL]->yield('start_listen');
}

#'alarm_scanning' event
sub alarm_scanning {
    $output->print(ts().", scan alarm directory\n");

    foreach my $vid (keys %{$_[HEAP]->{vnes}}) {
        $_[HEAP]->{vnes}->{$vid}->{alarm_files} = ();
        opendir(ALARMDIR, "$alarmdir/$vid");

        foreach (sort{$a cmp $b} readdir(ALARMDIR)) {
            next if(not m/\.log$/i);
            push @{$_[HEAP]->{vnes}->{$vid}->{alarm_files}}, $_;
        }

        closedir(ALARMDIR);
    }

    #&dump_alarmfiles;
    $_[KERNEL]->yield('update_queues');
}

#'update_queues' event
sub update_queues {
    $output->print(ts().", update alarm queues\n");

    foreach my $vid (keys %{$_[HEAP]->{vnes}}) {
        foreach my $alarmfile (@{$_[HEAP]->{vnes}->{$vid}->{alarm_files}}) {
            my @elems;
            my $alarmfh = new IO::File "$alarmdir/$vid/$alarmfile", "r";

            chomp and push @elems, $_ while(<$alarmfh>);

            $alarmfh->close();
            $_[HEAP]->{vnes}->{$vid}->{queue}->add(@elems);

            my $total_length = $_[HEAP]->{vnes}->{$vid}->{queue}->size();

            $_[HEAP]->{vnes}->{$vid}->{queue}->remove($total_length -
MAX_LENGTH)
                if($total_length > MAX_LENGTH);

            unlink("$alarmdir/$vid/$alarmfile");
        }
    }

    #&dump_queues;
    $_[KERNEL]->delay('alarm_scanning' => SCANNING_TIMER);
}

#For debug
sub dump_queues {
    foreach my $vid (keys %{$_[HEAP]->{vnes}}) {
        my $queue = $_[HEAP]->{vnes}->{$vid}->{queue};

        print "Queue of $vid is empty\n" and next if($queue->empty());

        my @elems = $queue->remove($queue->size());

        print Dumper(\@elems);
    }
}

#For debug
sub dump_alarmfiles {
    foreach (keys %{$_[HEAP]->{vnes}}) {
        print Dumper($_[HEAP]->{vnes}->{$_}->{alarm_files});
    }
}

#'start_listen' event
sub start_listen {
    $output->print(ts().", create listening wheels\n");

    foreach my $vid (keys %{$_[HEAP]->{vnes}}) {
        $_[HEAP]->{vnes}->{$vid}->{listen_wheel} =
POE::Wheel::SocketFactory->new(
            BindAddress    => '127.0.0.1',
            BindPort       => $_[HEAP]->{vnes}->{$vid}->{port},
            SocketDomain   => AF_INET,
            SocketType     => SOCK_STREAM,
            ListenQueue    => SOMAXCONN,
            SocketProtocol => 'tcp',
            Reuse          => 'on',

            SuccessEvent   => 'accept_success',
            FailureEvent   => 'accept_failure',
        );
    }
}

#'accept_success' event
sub accept_success {
    my $accepted_handle = $_[ARG0];
    my ($peer_addr, $peer_port, $listen_wid)= (inet_ntoa($_[ARG1]),
$_[ARG2], $_[ARG3]);

    foreach my $vid (keys %{$_[HEAP]->{vnes}}) {
        if($_[HEAP]->{vnes}->{$vid}->{listen_wheel}->ID() == $listen_wid) {
            $output->print(ts().", vID:$vid connected from
'$peer_addr:$peer_port'\n");

            my $client_wheel = POE::Wheel::ReadWrite->new(
                Handle     => $accepted_handle,
                Driver     => POE::Driver::SysRW->new(),
                Filter     => POE::Filter::Stream->new(),

                InputEvent => 'client_send',
                ErrorEvent => 'client_error',
            );

            my $client_wid = $client_wheel->ID();

            $_[HEAP]->{vnes}->{$vid}->{client_wheels}->{$client_wid} =
$client_wheel;
            $_[KERNEL]->yield('client_send');

            last;
        }
    }
}

#'accept_failure' event
sub accept_failure { }

#'client_send' event
sub client_send {
    $output->print(ts().", send alarm queues\n");

    foreach my $vid (keys %{$_[HEAP]->{vnes}}) {
        my $queue = $_[HEAP]->{vnes}->{$vid}->{queue};

        if($queue->size()) {
            my @elems = $queue->remove($queue->size());

            foreach my $client_wid (keys
%{$_[HEAP]->{vnes}->{$vid}->{client_wheels}}) {

$_[HEAP]->{vnes}->{$vid}->{client_wheels}->{$client_wid}->put("$_\n")
foreach(@elems);
            }
        }
    }

    $_[KERNEL]->delay('client_send' => SENDING_TIMER);
}

#'client_error' event
sub client_error {
    my $client_wid = $_[ARG3];

    foreach my $vid (keys %{$_[HEAP]->{vnes}}) {
        if(defined $_[HEAP]->{vnes}->{$vid}->{client_wheels}->{$client_wid})
{
            $output->print(ts().", client wheel '$client_wid' of vID:$vid
closed\n");
            delete $_[HEAP]->{vnes}->{$vid}->{client_wheels}->{$client_wid};
        }
    }
}
-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://mail.pm.org/pipermail/china-pm/attachments/20060814/a5bcab1f/attachment-0001.html 


More information about the China-pm mailing list