From 6396da021efc99df2979065a8a342e93789bbc72 Mon Sep 17 00:00:00 2001 From: Daniel Berteaud Date: Thu, 6 Aug 2015 16:40:13 +0200 Subject: [PATCH] Switch signaling to redis pubsub instead of relying on a global var --- vroom.pl | 141 +++++++++++++++++++++++++++++++-------------------------------- 1 file changed, 70 insertions(+), 71 deletions(-) diff --git a/vroom.pl b/vroom.pl index 6b0bf52..3c75f71 100755 --- a/vroom.pl +++ b/vroom.pl @@ -8,8 +8,8 @@ use lib 'lib'; use Mojolicious::Lite; use Mojolicious::Plugin::Mail; use Mojolicious::Plugin::Database; -use Mojolicious::Plugin::Redis; use Mojolicious::Plugin::StaticCompressor; +use Mojo::Redis2; use Vroom::Constants; use Vroom::Conf; use Crypt::SaltedHash; @@ -79,6 +79,8 @@ if ($excel){ # Global error check our $error = undef; +our $listeners = {}; + # Initialize localization plugin I18N => { namespace => 'Vroom::I18N', @@ -98,12 +100,6 @@ plugin database => { } }; -# Connect to redis -plugin redis =>{ - serveur => $config->{'database.redis'}, - helper => 'redis' -}; - # Load mail plugin with its default values plugin mail => { from => $config->{'email.from'}, @@ -186,6 +182,12 @@ helper check_db_version => sub { return ($ver eq Vroom::Constants::DB_VERSION) ? '1' : '0'; }; +# Helper to access redis objects +helper redis => sub { + my $self = shift; + state $redis = Mojo::Redis2->new(url => $config->{'database.redis'}); +}; + # Get optional features helper get_opt_features => sub { my $self = shift; @@ -220,8 +222,9 @@ helper log_event => sub { helper get_peers => sub { my $self = shift; my $peers = {}; - foreach my $peer ($self->redis->hkeys('peers')){ - $peers->{$peer} = $self->get_peer($peer); + foreach my $id (@{$self->redis->hkeys('peers')}){ + my $peer = $self->get_peer($id); + $peers->{$id} = $peer if $peer; } return $peers; }; @@ -1136,8 +1139,8 @@ helper signal_broadcast_room => sub { next if !$peers->{$data->{from}}->{room}; next if !$peers->{$peer}->{room}; next if $peers->{$peer}->{room} ne $peers->{$data->{from}}->{room}; - $peers->{$peer}->{socket}->send($data->{msg}); - } + $self->redis->publish('signaling:peer:' . $peer, Mojo::JSON::to_json($data->{msg})); + } return 1; }; @@ -1235,7 +1238,7 @@ helper export_events_xlsx => sub { helper disconnect_peer => sub { my $self = shift; my $id = shift; - my $peers =$self->get_peers; + my $peers = $self->get_peers; return 0 if (!$id || !$peers->{$id}); if ($id && $peers->{$id} && $peers->{$id}->{room}){ $self->log_event({ @@ -1245,13 +1248,13 @@ helper disconnect_peer => sub { } $self->signal_broadcast_room({ from => $id, - msg => Protocol::SocketIO::Message->new( + msg => { type => 'event', data => { name => 'remove', args => [{ id => $id, type => 'video' }] } - ) + } }); $self->update_room_last_activity($peers->{$id}->{room}); $self->del_peer($id); @@ -1275,30 +1278,38 @@ get '/socket.io/:ver' => sub { websocket '/socket.io/:ver/websocket/:id' => sub { my $self = shift; my $id = $self->stash('id'); + my $loop = undef; + my $cb = undef; + + $self->inactivity_timeout(65); + # the ID must match the one stored in our session if ($id ne $self->session('peer_id')){ $self->log_event({ event => 'peer_id_mismatch', msg => 'Something is wrong, peer ID is ' . $id . ' but should be ' . $self->session('peer_id') }); - return $self->send('Bad session id'); + return $self->tx->send('Bad session id'); } my $key = $self->session('key'); - my $new_peer = {}; - # We create the peer in the global hash - $new_peer->{socket} = $self->tx; - # And set the initial "last seen" flag - $new_peer->{last} = time; - # Associate the unique ID and name - $new_peer->{id} = $self->session('id'); - $new_peer->{check_invitations} = 1; - # Register the i18n stash, for localization will be available in the main IOLoop - # Outside of Mojo controller - $new_peer->{i18n} = $self->stash->{i18n}; + # Add the peer on redis + $self->add_peer($id, + { + last => time, + id => $self->session('id'), + check_invitations => 1 + } + ); - $self->add_peer($id, $new_peer); + $self->redis->subscribe(['signaling:peer:' . $id]); + + $cb = $self->redis->on(message => sub { + my ($redis, $message, $channel) = @_; + $self->tx->send(Protocol::SocketIO::Message->new->parse(Mojo::JSON::from_json($message))) + if ($channel eq 'signaling:peer:' . $id); + }); # When we recive a message, lets parse it as e Socket.IO one $self->on('message' => sub { @@ -1343,7 +1354,9 @@ websocket '/socket.io/:ver/websocket/:id' => sub { next if $peer eq $id; next if !$peers->{$peer}->{room}; next if $peers->{$peer}->{room} ne $room; - $others->{$peer} = $peers->{$peer}->{details}; + $others->{$peer}->{screen} = ($peers->{$peer}->{details}->{screen} eq Mojo::JSON::true) ? \1 : \0; + $others->{$peer}->{video} = ($peers->{$peer}->{details}->{video} eq Mojo::JSON::true) ? \1 : \0; + $others->{$peer}->{audio} = ($peers->{$peer}->{details}->{audio} eq Mojo::JSON::true) ? \1 : \0; } $peer->{details} = { screen => \0, @@ -1381,15 +1394,14 @@ websocket '/socket.io/:ver/websocket/:id' => sub { if ($to && $peers->{$to} && $peers->{$to}->{room} && - $peers->{$to}->{room} eq $peers->{$id}->{room} && - $peers->{$to}->{socket}){ - $self->send(Protocol::SocketIO::Message->new(%$msg)); + $peers->{$to}->{room} eq $peers->{$id}->{room}){ + $self->redis->publish('signaling:peer:' . $to, Mojo::JSON::to_json($msg)); } # No dest, multicast this to every members of the room else{ $self->signal_broadcast_room({ from => $id, - msg => Protocol::SocketIO::Message->new(%$msg) + msg => $msg }); } } @@ -1402,13 +1414,13 @@ websocket '/socket.io/:ver/websocket/:id' => sub { $peer->{details}->{screen} = \0; $self->signal_broadcast_room({ from => $id, - msg => Protocol::SocketIO::Message->new( + msg => { type => 'event', data => { name => 'remove', args => [{ id => $id, type => 'screen' }] } - ) + } }); } elsif ($msg->{data}->{name} =~ m/^leave|disconnect$/){ @@ -1431,49 +1443,33 @@ websocket '/socket.io/:ver/websocket/:id' => sub { # Triggerred when a websocket connection ends $self->on(finish => sub { - my $self = shift; + my $self = shift; $self->disconnect_peer($id); - $self->del_peer($id); + $self->redis->unsubscribe(['signaling:peer:' . $id], $cb) + if $cb; + Mojo::IOLoop->remove($loop) if $loop; }); - # This is just the end of the initial handshake, we indicate the client we're ready - $self->send(Protocol::SocketIO::Message->new( type => 'connect' )); -}; - -# Send heartbeats to all websocket clients -# Every 3 seconds -Mojo::IOLoop->recurring( 3 => sub { - my $peers = app->get_peers; - foreach my $id (keys %{$peers}){ - # This shouldn't happen, but better to log an error and fix it rather - # than looping indefinitly on a bogus entry if something went wrong - if (!$peers->{$id}->{socket}){ - app->log->debug("Garbage found in peers (peer $id has no socket)\n"); - app->del_peer($id); - } - # If we had no reply from this peer in the last 15 sec - # (5 heartbeat without response), we consider it dead and remove it - elsif ($peers->{$id}->{last} < time - 15){ - app->log->debug("Peer $id didn't reply in 15 sec, disconnecting"); - $peers->{$id}->{socket}->finish; - app->disconnect_peer($id); - } - elsif ($peers->{$id}->{check_invitations}) { - my $invitations = app->get_invitation_list($peers->{$id}->{id}); + # Start a loop to send heartbeats every 3 sec + $loop = Mojo::IOLoop->recurring(3 => sub { + my $peer = $self->get_peer($id); + # Should we check invitations ? + if ($peer->{check_invitations}) { + my $invitations = $self->app->get_invitation_list($peer->{id}); foreach my $invit (keys %{$invitations}){ my $msg = ''; - $msg .= sprintf($peers->{$id}->{i18n}->localize('INVITE_REPONSE_FROM_s'), $invitations->{$invit}->{email}) . "\n" ; + $msg .= sprintf($self->l('INVITE_REPONSE_FROM_s'), $invitations->{$invit}->{email}) . "\n" ; if ($invitations->{$invit}->{response} && $invitations->{$invit}->{response} eq 'later'){ - $msg .= $peers->{$id}->{i18n}->localize('HE_WILL_TRY_TO_JOIN_LATER'); + $msg .= $self->l('HE_WILL_TRY_TO_JOIN_LATER'); } else{ - $msg .= $peers->{$id}->{i18n}->localize('HE_WONT_JOIN'); + $msg .= $self->l('HE_WONT_JOIN'); } if ($invitations->{$invit}->{message} && $invitations->{$invit}->{message} ne ''){ - $msg .= "\n" . $peers->{$id}->{i18n}->localize('MESSAGE') . ":\n" . $invitations->{$invit}->{message} . "\n"; + $msg .= "\n" . $self->l('MESSAGE') . ":\n" . $invitations->{$invit}->{message} . "\n"; } app->mark_invitation_processed($invitations->{$invit}->{token}); - $peers->{$id}->{socket}->send( + $self->send( Protocol::SocketIO::Message->new( type => 'event', data => { @@ -1485,13 +1481,16 @@ Mojo::IOLoop->recurring( 3 => sub { ) ); } - delete $peers->{$id}->{check_invitations}; - app->add_peer($id, $peers->{$id}); + delete $peer->{check_invitations}; + $self->add_peer($id, $peer); } - # Send the heartbeat - $peers->{$id}->{socket}->send(Protocol::SocketIO::Message->new( type => 'heartbeat' )); - } -}); + $self->send(Protocol::SocketIO::Message->new( type => 'heartbeat' )); + }); + + # This is just the end of the initial handshake, we indicate the client we're ready + $self->send(Protocol::SocketIO::Message->new( type => 'connect' )); + +}; # Maintenance loop # purge old stuff from the database