Switch signaling to redis pubsub

instead of relying on a global var
master
Daniel Berteaud 9 years ago
parent 0fcd630d82
commit 6396da021e
  1. 141
      vroom.pl

@ -8,8 +8,8 @@ use lib 'lib';
use Mojolicious::Lite; use Mojolicious::Lite;
use Mojolicious::Plugin::Mail; use Mojolicious::Plugin::Mail;
use Mojolicious::Plugin::Database; use Mojolicious::Plugin::Database;
use Mojolicious::Plugin::Redis;
use Mojolicious::Plugin::StaticCompressor; use Mojolicious::Plugin::StaticCompressor;
use Mojo::Redis2;
use Vroom::Constants; use Vroom::Constants;
use Vroom::Conf; use Vroom::Conf;
use Crypt::SaltedHash; use Crypt::SaltedHash;
@ -79,6 +79,8 @@ if ($excel){
# Global error check # Global error check
our $error = undef; our $error = undef;
our $listeners = {};
# Initialize localization # Initialize localization
plugin I18N => { plugin I18N => {
namespace => 'Vroom::I18N', namespace => 'Vroom::I18N',
@ -98,12 +100,6 @@ plugin database => {
} }
}; };
# Connect to redis
plugin redis =>{
serveur => $config->{'database.redis'},
helper => 'redis'
};
# Load mail plugin with its default values # Load mail plugin with its default values
plugin mail => { plugin mail => {
from => $config->{'email.from'}, from => $config->{'email.from'},
@ -186,6 +182,12 @@ helper check_db_version => sub {
return ($ver eq Vroom::Constants::DB_VERSION) ? '1' : '0'; return ($ver eq Vroom::Constants::DB_VERSION) ? '1' : '0';
}; };
# Helper to access redis objects
helper redis => sub {
my $self = shift;
state $redis = Mojo::Redis2->new(url => $config->{'database.redis'});
};
# Get optional features # Get optional features
helper get_opt_features => sub { helper get_opt_features => sub {
my $self = shift; my $self = shift;
@ -220,8 +222,9 @@ helper log_event => sub {
helper get_peers => sub { helper get_peers => sub {
my $self = shift; my $self = shift;
my $peers = {}; my $peers = {};
foreach my $peer ($self->redis->hkeys('peers')){ foreach my $id (@{$self->redis->hkeys('peers')}){
$peers->{$peer} = $self->get_peer($peer); my $peer = $self->get_peer($id);
$peers->{$id} = $peer if $peer;
} }
return $peers; return $peers;
}; };
@ -1136,8 +1139,8 @@ helper signal_broadcast_room => sub {
next if !$peers->{$data->{from}}->{room}; next if !$peers->{$data->{from}}->{room};
next if !$peers->{$peer}->{room}; next if !$peers->{$peer}->{room};
next if $peers->{$peer}->{room} ne $peers->{$data->{from}}->{room}; next if $peers->{$peer}->{room} ne $peers->{$data->{from}}->{room};
$peers->{$peer}->{socket}->send($data->{msg}); $self->redis->publish('signaling:peer:' . $peer, Mojo::JSON::to_json($data->{msg}));
} }
return 1; return 1;
}; };
@ -1235,7 +1238,7 @@ helper export_events_xlsx => sub {
helper disconnect_peer => sub { helper disconnect_peer => sub {
my $self = shift; my $self = shift;
my $id = shift; my $id = shift;
my $peers =$self->get_peers; my $peers = $self->get_peers;
return 0 if (!$id || !$peers->{$id}); return 0 if (!$id || !$peers->{$id});
if ($id && $peers->{$id} && $peers->{$id}->{room}){ if ($id && $peers->{$id} && $peers->{$id}->{room}){
$self->log_event({ $self->log_event({
@ -1245,13 +1248,13 @@ helper disconnect_peer => sub {
} }
$self->signal_broadcast_room({ $self->signal_broadcast_room({
from => $id, from => $id,
msg => Protocol::SocketIO::Message->new( msg => {
type => 'event', type => 'event',
data => { data => {
name => 'remove', name => 'remove',
args => [{ id => $id, type => 'video' }] args => [{ id => $id, type => 'video' }]
} }
) }
}); });
$self->update_room_last_activity($peers->{$id}->{room}); $self->update_room_last_activity($peers->{$id}->{room});
$self->del_peer($id); $self->del_peer($id);
@ -1275,30 +1278,38 @@ get '/socket.io/:ver' => sub {
websocket '/socket.io/:ver/websocket/:id' => sub { websocket '/socket.io/:ver/websocket/:id' => sub {
my $self = shift; my $self = shift;
my $id = $self->stash('id'); my $id = $self->stash('id');
my $loop = undef;
my $cb = undef;
$self->inactivity_timeout(65);
# the ID must match the one stored in our session # the ID must match the one stored in our session
if ($id ne $self->session('peer_id')){ if ($id ne $self->session('peer_id')){
$self->log_event({ $self->log_event({
event => 'peer_id_mismatch', event => 'peer_id_mismatch',
msg => 'Something is wrong, peer ID is ' . $id . ' but should be ' . $self->session('peer_id') msg => 'Something is wrong, peer ID is ' . $id . ' but should be ' . $self->session('peer_id')
}); });
return $self->send('Bad session id'); return $self->tx->send('Bad session id');
} }
my $key = $self->session('key'); my $key = $self->session('key');
my $new_peer = {};
# We create the peer in the global hash # Add the peer on redis
$new_peer->{socket} = $self->tx; $self->add_peer($id,
# And set the initial "last seen" flag {
$new_peer->{last} = time; last => time,
# Associate the unique ID and name id => $self->session('id'),
$new_peer->{id} = $self->session('id'); check_invitations => 1
$new_peer->{check_invitations} = 1; }
# Register the i18n stash, for localization will be available in the main IOLoop );
# Outside of Mojo controller
$new_peer->{i18n} = $self->stash->{i18n};
$self->add_peer($id, $new_peer); $self->redis->subscribe(['signaling:peer:' . $id]);
$cb = $self->redis->on(message => sub {
my ($redis, $message, $channel) = @_;
$self->tx->send(Protocol::SocketIO::Message->new->parse(Mojo::JSON::from_json($message)))
if ($channel eq 'signaling:peer:' . $id);
});
# When we recive a message, lets parse it as e Socket.IO one # When we recive a message, lets parse it as e Socket.IO one
$self->on('message' => sub { $self->on('message' => sub {
@ -1343,7 +1354,9 @@ websocket '/socket.io/:ver/websocket/:id' => sub {
next if $peer eq $id; next if $peer eq $id;
next if !$peers->{$peer}->{room}; next if !$peers->{$peer}->{room};
next if $peers->{$peer}->{room} ne $room; next if $peers->{$peer}->{room} ne $room;
$others->{$peer} = $peers->{$peer}->{details}; $others->{$peer}->{screen} = ($peers->{$peer}->{details}->{screen} eq Mojo::JSON::true) ? \1 : \0;
$others->{$peer}->{video} = ($peers->{$peer}->{details}->{video} eq Mojo::JSON::true) ? \1 : \0;
$others->{$peer}->{audio} = ($peers->{$peer}->{details}->{audio} eq Mojo::JSON::true) ? \1 : \0;
} }
$peer->{details} = { $peer->{details} = {
screen => \0, screen => \0,
@ -1381,15 +1394,14 @@ websocket '/socket.io/:ver/websocket/:id' => sub {
if ($to && if ($to &&
$peers->{$to} && $peers->{$to} &&
$peers->{$to}->{room} && $peers->{$to}->{room} &&
$peers->{$to}->{room} eq $peers->{$id}->{room} && $peers->{$to}->{room} eq $peers->{$id}->{room}){
$peers->{$to}->{socket}){ $self->redis->publish('signaling:peer:' . $to, Mojo::JSON::to_json($msg));
$self->send(Protocol::SocketIO::Message->new(%$msg));
} }
# No dest, multicast this to every members of the room # No dest, multicast this to every members of the room
else{ else{
$self->signal_broadcast_room({ $self->signal_broadcast_room({
from => $id, from => $id,
msg => Protocol::SocketIO::Message->new(%$msg) msg => $msg
}); });
} }
} }
@ -1402,13 +1414,13 @@ websocket '/socket.io/:ver/websocket/:id' => sub {
$peer->{details}->{screen} = \0; $peer->{details}->{screen} = \0;
$self->signal_broadcast_room({ $self->signal_broadcast_room({
from => $id, from => $id,
msg => Protocol::SocketIO::Message->new( msg => {
type => 'event', type => 'event',
data => { data => {
name => 'remove', name => 'remove',
args => [{ id => $id, type => 'screen' }] args => [{ id => $id, type => 'screen' }]
} }
) }
}); });
} }
elsif ($msg->{data}->{name} =~ m/^leave|disconnect$/){ elsif ($msg->{data}->{name} =~ m/^leave|disconnect$/){
@ -1431,49 +1443,33 @@ websocket '/socket.io/:ver/websocket/:id' => sub {
# Triggerred when a websocket connection ends # Triggerred when a websocket connection ends
$self->on(finish => sub { $self->on(finish => sub {
my $self = shift; my $self = shift;
$self->disconnect_peer($id); $self->disconnect_peer($id);
$self->del_peer($id); $self->redis->unsubscribe(['signaling:peer:' . $id], $cb)
if $cb;
Mojo::IOLoop->remove($loop) if $loop;
}); });
# This is just the end of the initial handshake, we indicate the client we're ready # Start a loop to send heartbeats every 3 sec
$self->send(Protocol::SocketIO::Message->new( type => 'connect' )); $loop = Mojo::IOLoop->recurring(3 => sub {
}; my $peer = $self->get_peer($id);
# Should we check invitations ?
# Send heartbeats to all websocket clients if ($peer->{check_invitations}) {
# Every 3 seconds my $invitations = $self->app->get_invitation_list($peer->{id});
Mojo::IOLoop->recurring( 3 => sub {
my $peers = app->get_peers;
foreach my $id (keys %{$peers}){
# This shouldn't happen, but better to log an error and fix it rather
# than looping indefinitly on a bogus entry if something went wrong
if (!$peers->{$id}->{socket}){
app->log->debug("Garbage found in peers (peer $id has no socket)\n");
app->del_peer($id);
}
# If we had no reply from this peer in the last 15 sec
# (5 heartbeat without response), we consider it dead and remove it
elsif ($peers->{$id}->{last} < time - 15){
app->log->debug("Peer $id didn't reply in 15 sec, disconnecting");
$peers->{$id}->{socket}->finish;
app->disconnect_peer($id);
}
elsif ($peers->{$id}->{check_invitations}) {
my $invitations = app->get_invitation_list($peers->{$id}->{id});
foreach my $invit (keys %{$invitations}){ foreach my $invit (keys %{$invitations}){
my $msg = ''; my $msg = '';
$msg .= sprintf($peers->{$id}->{i18n}->localize('INVITE_REPONSE_FROM_s'), $invitations->{$invit}->{email}) . "\n" ; $msg .= sprintf($self->l('INVITE_REPONSE_FROM_s'), $invitations->{$invit}->{email}) . "\n" ;
if ($invitations->{$invit}->{response} && $invitations->{$invit}->{response} eq 'later'){ if ($invitations->{$invit}->{response} && $invitations->{$invit}->{response} eq 'later'){
$msg .= $peers->{$id}->{i18n}->localize('HE_WILL_TRY_TO_JOIN_LATER'); $msg .= $self->l('HE_WILL_TRY_TO_JOIN_LATER');
} }
else{ else{
$msg .= $peers->{$id}->{i18n}->localize('HE_WONT_JOIN'); $msg .= $self->l('HE_WONT_JOIN');
} }
if ($invitations->{$invit}->{message} && $invitations->{$invit}->{message} ne ''){ if ($invitations->{$invit}->{message} && $invitations->{$invit}->{message} ne ''){
$msg .= "\n" . $peers->{$id}->{i18n}->localize('MESSAGE') . ":\n" . $invitations->{$invit}->{message} . "\n"; $msg .= "\n" . $self->l('MESSAGE') . ":\n" . $invitations->{$invit}->{message} . "\n";
} }
app->mark_invitation_processed($invitations->{$invit}->{token}); app->mark_invitation_processed($invitations->{$invit}->{token});
$peers->{$id}->{socket}->send( $self->send(
Protocol::SocketIO::Message->new( Protocol::SocketIO::Message->new(
type => 'event', type => 'event',
data => { data => {
@ -1485,13 +1481,16 @@ Mojo::IOLoop->recurring( 3 => sub {
) )
); );
} }
delete $peers->{$id}->{check_invitations}; delete $peer->{check_invitations};
app->add_peer($id, $peers->{$id}); $self->add_peer($id, $peer);
} }
# Send the heartbeat $self->send(Protocol::SocketIO::Message->new( type => 'heartbeat' ));
$peers->{$id}->{socket}->send(Protocol::SocketIO::Message->new( type => 'heartbeat' )); });
}
}); # This is just the end of the initial handshake, we indicate the client we're ready
$self->send(Protocol::SocketIO::Message->new( type => 'connect' ));
};
# Maintenance loop # Maintenance loop
# purge old stuff from the database # purge old stuff from the database

Loading…
Cancel
Save