package Sprocket; use strict; use warnings; our $VERSION = '0.01'; use Carp qw( croak ); use Sprocket::Common; use Sprocket::Connection; use Sprocket::Session; use Sprocket::AIO; use Scalar::Util qw( weaken blessed ); use POE; use overload '""' => sub { shift->as_string(); }; # weak list of all sprocket components our @COMPONENTS; # events sent to process_plugins sub EVENT_NAME() { 0 } sub SERVER() { 1 } sub CONNECTION() { 2 } BEGIN { eval "use BSD::Resource"; if ( $@ ) { eval "sub HAS_BSD_RESOURCE() { 0 }"; } else { eval "sub HAS_BSD_RESOURCE() { 1 }"; } } sub import { my $self = shift; my @modules = @_; unshift( @modules, 'Common' ); @modules = map { 'Sprocket::'.$_ } @modules; # XXX does this work right, TESTME push( @modules, 'POE' ); my $package = caller(); my @failed; foreach my $module ( @modules ) { my $code = "package $package; use $module;"; eval( $code ); if ( $@ ) { warn $@; push( @failed, $module ); } } @failed and croak "could not import (" . join( ' ', @failed ) . ")"; } our @base_states = qw( _start _default signals _shutdown _log events_received events_ready exception process_plugins sig_child time_out_check cleanup ); sub spawn { my ( $class, $self, @states ) = @_; Sprocket::Session->create( # options => { trace => 1 }, object_states => [ $self => [ @base_states, @states ] ], ); return $self; } sub as_string { __PACKAGE__; } sub new { my $class = shift; croak "$class requires an even number of parameters" if @_ % 2; my %opts = &adjust_params; $opts{alias} = 'sprocket' unless defined( $opts{alias} ) and length( $opts{alias} ); $opts{time_out} = defined $opts{time_out} ? $opts{time_out} : 30; $opts{listen_address} ||= '0.0.0.0'; $opts{log_level} = 4 unless( defined $opts{log_level} ); my $type = delete $opts{_type}; # local / remote my $self = bless( { name => $opts{name}, opts => \%opts, heaps => {}, connections => 0, plugins => {}, plugin_pri => [], time_out => 10, # time_out checker type => $type, }, ref $class || $class ); die 'ListenPort not set, please a port to listen to' if ( $self->isa( 'Sprocket::Server' ) && !defined( $opts{listen_port} ) ); if ( $opts{max_connections} ) { if ( HAS_BSD_RESOURCE ) { my $ret = setrlimit( RLIMIT_NOFILE, $opts{max_connections}, $opts{max_connections} ); unless ( defined $ret && $ret ) { if ( $> == 0 ) { #warn "Unable to set max connections limit"; $self->_log(v => 1, msg => 'Unable to set max connections limit'); } else { #warn "Need to be root to increase max connections"; $self->_log(v => 1, msg => 'Need to be root to increase max connections'); } } } else { $self->_log(v => 1, msg => 'Need BSD::Resource installed to increase max connections'); } } push( @COMPONENTS, $self ); weaken( $COMPONENTS[ -1 ] ); return $self; } sub _start { my ( $self, $kernel ) = @_[OBJECT, KERNEL]; Sprocket::AIO->new( parent_id => $self->{session_id} = $_[ SESSION ]->ID() ); if ( $self->{opts}->{plugins} ) { foreach my $t ( @{ $self->{opts}->{plugins} } ) { $t = adjust_params($t); $self->add_plugin( $t->{plugin}, $t->{priority} || 0 ); } } if ( my $ev = delete $self->{opts}->{event_manager} ) { eval "use $ev->{module}"; if ($@) { $self->_log(v => 1, msg => "Error loading $ev->{module} : $@"); $self->shutdown_all(); return; } unless ( $ev->{options} && ref( $ev->{options} ) eq 'ARRAY' ) { $ev->{options} = []; } $self->{event_manager} = "$ev->{module}"->new( @{$ev->{options}}, parent_id => $self->{session_id} ); } $self->{aio} = Sprocket::AIO::HAS_AIO(); $self->{time_out_id} = $kernel->alarm_set( time_out_check => time() + $self->{time_out} ) if ( $self->{time_out} ); $kernel->sig( DIE => 'exception' ) if ( $self->{opts}->{use_exception_handler} ); $kernel->sig( TSTP => 'signals' ); $kernel->yield( '_startup' ); } sub _default { my ( $self, $con, $cmd ) = @_[OBJECT, HEAP, ARG0]; return if ( $cmd =~ m/^_(child|parent)/ ); return $self->process_plugins( [ $cmd, $self, $con, @_[ ARG1 .. $#_ ] ] ) if ( blessed( $con ) ); $self->_log(v => 1, msg => "_default called, no handler for event $cmd [$con] (the connection for this event may be gone)"); } sub signals { my ( $self, $signal_name ) = @_[OBJECT, ARG0]; $self->_log(v => 1, msg => "Client caught SIG$signal_name"); # to stop ctrl-c / INT if ($signal_name eq 'INT') { #$_[KERNEL]->sig_handled(); } elsif ( $signal_name eq 'TSTP' ) { local $SIG{TSTP} = 'DEFAULT'; kill( TSTP => $$ ); $_[KERNEL]->sig_handled(); } return 0; } sub sig_child { $_[KERNEL]->sig_handled(); } sub new_connection { my $self = shift; my $con = Sprocket::Connection->new( parent_id => $self->{session_id}, @_ ); $con->event_manager( $self->{event_manager}->{alias} ) if ( $self->{event_manager} ); $self->{heaps}->{ $con->ID } = $con; $self->{connections}++; return $con; } # gets a connection obj from any component sub get_connection { my ( $self, $id ) = @_; if ( my $con = $self->{heaps}->{$id} ) { return $con; } foreach ( @COMPONENTS ) { next unless ( defined ); if ( my $con = $_->{heaps}->{$id} ) { return $con; } } return undef; } sub _log { my ( $self, %o ); if ( ref $_[ KERNEL ] ) { ( $self, %o ) = @_[ OBJECT, ARG0 .. $#_ ]; # $o{l}++; } else { ( $self, %o ) = @_; } return unless ( $o{v} <= $self->{opts}->{log_level} ); my $con = $self->{heap}; my $sender = ( $con ) ? ( $con->peer_addr ? $con->peer_addr : '' )."(".$con->ID.")" : "?"; my $l = $o{l} ? $o{l}+1 : 1; my $caller = $o{call} ? $o{call} : ( caller($l) )[3] || '?'; $caller =~ s/^POE::Component/PoCo/o; print STDERR '['.localtime()."][$self->{connections}][$caller][$sender] $o{msg}\n"; } sub cleanup { my ( $self, $con_id ) = @_[ OBJECT, ARG0 ]; my $con = $self->{heaps}->{$con_id}; return unless ( $con ); $self->process_plugins( [ $self->{type}.'_disconnected', $self, $con, 0 ] ) unless ( defined $con->error ); $self->cleanup_connection( $con ); } sub cleanup_connection { my ( $self, $con ) = @_; return unless( $con ); my $wheel = $con->{wheel}; if ( $wheel ) { $wheel->shutdown_input(); $wheel->shutdown_output(); } delete $con->{wheel}; $self->{connections}--; delete $self->{heaps}->{ $con->ID }; return undef; } sub shutdown_all { foreach my $comp (@COMPONENTS) { next unless ( defined $comp ); $comp->shutdown(); } } sub shutdown { my $self = shift; $poe_kernel->call( $self->{session_id} => '_shutdown' ); } sub _shutdown { my ( $self, $kernel ) = @_[ OBJECT, KERNEL ]; foreach ( values %{$self->{heaps}} ) { $_->close( 1 ); # force $self->cleanup_connection( $_ ); } $self->{heaps} = {}; foreach ( keys %{$self->{listeners}} ) { $kernel->refcount_decrement( $_, __PACKAGE__ ); } $kernel->sig( INT => undef ); $kernel->sig( TSTP => undef ); $kernel->alarm_remove_all(); $kernel->alias_remove( $self->{opts}->{alias} ) if ( $self->{opts}->{alias} ); # XXX remove plugins one by one? delete @{$self}{qw( wheel sf )}; # last component, shutdown aio my $count = 0; for my $i ( 0 .. $#COMPONENTS ) { next unless defined $COMPONENTS[ $i ]; if ( "$COMPONENTS[ $i ]" eq "$self" ) { splice( @COMPONENTS, $i, 1 ); next; } $count++; } if ( $count == 0 && $self->{aio} ) { Sprocket::AIO->new()->shutdown(); } return undef; } # TODO class:accessor::fast sub name { shift->{name}; } sub events_received { my ( $kernel, $self ) = @_[ KERNEL, OBJECT ]; $self->process_plugins( [ 'events_received', $self, @_[ HEAP, ARG0 .. $#_ ] ] ); } sub events_ready { my ( $kernel, $self ) = @_[ KERNEL, OBJECT ]; $self->process_plugins( [ 'events_ready', $self, @_[ HEAP, ARG0 .. $#_ ] ] ); } sub exception { my ($kernel, $self, $con, $sig, $error) = @_[KERNEL, OBJECT, HEAP, ARG0, ARG1]; $self->_log(v => 1, l => 1, msg => "plugin exception handled: ($sig) : " .join(' | ',map { $_.':'.$error->{$_} } keys %$error ) ); # doesn't work? if ( blessed( $con ) && $con->isa( 'Sprocket::Connection' ) ) { $con->close( 1 ); } $kernel->sig_handled(); } sub time_out_check { my ($kernel, $self) = @_[KERNEL, OBJECT]; my $time = time(); $self->{time_out_id} = $kernel->alarm_set( time_out_check => $time + $self->{time_out} ); foreach my $con ( values %{$self->{heaps}} ) { if ( my $timeout = $con->time_out() ) { # warn "$con timeout is $con->{time_out} ".( $con->active_time() + $timeout ). " < $time"; if ( ( $con->active_time() + $timeout ) < $time ) { $self->process_plugins( [ 'time_out', $self, $con, $time ] ); } } } } sub add_plugin { my $self = shift; my $t = $self->{plugins}; my ( $plugin, $pri ) = @_; my $name; if ( $plugin->can( 'name' ) ) { $name = $plugin->name(); } else { $name = "$plugin"; } warn "WARNING : Overwriting existing plugin '$name' (You have two plugins with the same name)" if ( exists( $t->{ $name } ) ); $pri ||= 0; my $found = 0; foreach ( keys %$t ) { $pri == $t->{$_}->{priority} && $found++; } if ( $found ) { warn "WARNING: You have defined more than one plugin with the same priority, was this intended? plugin: $name pri: $pri"; } $t->{ $name } = { plugin => $plugin, priority => $pri, }; $plugin->parent_id( $self->{session_id} ); $plugin->add_plugin( $self, $pri ) if ( $plugin->can( 'add_plugin' ) ); # recalc plugin order @{ $self->{plugin_pri} } = sort { $t->{ $a }->{priority} <=> $t->{ $b }->{priority} } keys %$t; return 1; } sub remove_plugin { my $self = shift; my $tr = shift; # TODO remove by name or obj my $t = $self->{plugins}; my $plugin = delete $t->{ $tr }; return 0 unless ( $plugin ); $plugin->{plugin}->remove_plugin( $self, $plugin->{priority} ) if ( $plugin->{plugin}->can( 'remove_plugin' ) ); # recalc plugin_pri @{ $self->{plugin_pri} } = sort { $t->{ $a }->{priority} <=> $t->{ $b }->{priority} } keys %$t; return 1; } sub process_plugins { my ( $self, $args, $i ) = $_[ KERNEL ] ? @_[ OBJECT, ARG0, ARG1 ] : @_; return unless ( @{ $self->{plugin_pri} } ); my $con = $args->[ CONNECTION ]; $con->state( $args->[ EVENT_NAME ] ); if ( my $t = $con->plugin() ) { return $self->{plugins}->{ $t }->{plugin}->handle_event( @$args ); } else { $i ||= 0; if ( $#{ $self->{plugin_pri} } >= $i ) { return if ( $self->{plugins}->{ $self->{plugin_pri}->[ $i ] }->{plugin}->handle_event( @$args ) ); } $i++; # avoid a post return if ( $#{ $self->{plugin_pri} } < $i ); } # XXX call? #$poe_kernel->call( $self->{session_id} => process_plugins => $args => $i ); $poe_kernel->yield( process_plugins => $args => $i ); } sub forward_plugin { my $self = shift; my $plug_name = shift; unless( exists( $self->{plugins}->{ $plug_name } ) ) { $self->_log( v => 4, msg => 'plugin not loaded: '.$plug_name ); return 0; } # XXX my $con = $self->{heap}; $con->plugin( $plug_name ); return $self->process_plugins( [ $con->state, $self, $con, @_ ] ); } 1; __END__ =pod =head1 NAME Sprocket - A pluggable POE based Client / Server Library =head1 SYNOPSIS See examples =head1 ABSTRACT Sprocket is an POE based client server library that uses plugins similar to POE Components. =head1 DESCRIPTION Sprocket uses a single session for each object/component created to increase speed and reduce the memory footprint of your apps. Sprocket is used in the Perl version of Cometd L =head1 NOTES Sprocket is fully compatable with other POE Compoents. Apps are normally written as Sprocket plugins and paired with a L or L. =head1 SEE ALSO L, L, L, L, L =head1 AUTHOR David Davis Exantus@cpan.orgE =head1 RATING Please rate this module. L =head1 COPYRIGHT AND LICENSE Copyright 2006-2007 by David Davis Artistic License =cut