Remove the global $peers hashref

master
Daniel Berteaud 9 years ago
parent 16ad1b61f2
commit 513cf87136
  1. 91
      vroom.pl

@ -79,9 +79,6 @@ if ($excel){
# Global error check # Global error check
our $error = undef; our $error = undef;
# Global peers hash
our $peers = {};
# Initialize localization # Initialize localization
plugin I18N => { plugin I18N => {
namespace => 'Vroom::I18N', namespace => 'Vroom::I18N',
@ -233,7 +230,11 @@ helper get_peers => sub {
helper get_peer => sub { helper get_peer => sub {
my $self = shift; my $self = shift;
my $peer = shift; my $peer = shift;
return Mojo::JSON::from_json($self->redis->hget('peers', $peer); my $p = $self->redis->hget('peers', $peer);
if ($p){
return Mojo::JSON::from_json($p);
}
return 0;
}; };
# Store peers in redis # Store peers in redis
@ -517,24 +518,27 @@ helper set_peer_role => sub {
my $self = shift; my $self = shift;
my $data = shift; my $data = shift;
# Check the peer exists and is already in the room # Check the peer exists and is already in the room
if (!$data->{peer_id} || if (!$data->{peer_id}){
!$peers->{$data->{peer_id}}){
return 0; return 0;
} }
$peers->{$data->{peer_id}}->{role} = $data->{role}; my $peer = $self->get_peer($data->{peer_id});
if (!$peer){
return 0;
}
$peer->{role} = $data->{role};
$self->log_event({ $self->log_event({
event => 'peer_role', event => 'peer_role',
msg => "Peer " . $data->{peer_id} . " has now the " . msg => "Peer " . $data->{peer_id} . " has now the " .
$data->{role} . " role in room " . $peers->{$data->{peer_id}}->{room} $data->{role} . " role in room " . $peer->{room}
}); });
return 1; return $self->add_peer($data->{peer_id}, $peer);
}; };
# Return the role of a peer, take a peer object as arg ($data = { peer_id => XYZ }) # Return the role of a peer, take a peer object as arg ($data = { peer_id => XYZ })
helper get_peer_role => sub { helper get_peer_role => sub {
my $self = shift; my $self = shift;
my $peer_id = shift; my $peer_id = shift;
return $peers->{$peer_id}->{role}; return $self->get_peer($peer_id)->{role};
}; };
# Promote a peer to owner # Promote a peer to owner
@ -1109,7 +1113,8 @@ helper get_room_members => sub {
my $room = shift; my $room = shift;
return 0 if (!$self->get_room_by_name($room)); return 0 if (!$self->get_room_by_name($room));
my @p; my @p;
foreach my $peer (keys $peers){ my $peers = $self->get_peers;
foreach my $peer (keys %$peers){
if ($peers->{$peer}->{room} && if ($peers->{$peer}->{room} &&
$peers->{$peer}->{room} eq $room){ $peers->{$peer}->{room} eq $room){
push @p, $peer; push @p, $peer;
@ -1125,6 +1130,7 @@ helper signal_broadcast_room => sub {
# Send a message to all members of the same room as the sender # Send a message to all members of the same room as the sender
# except the sender himself # except the sender himself
my $peers = $self->get_peers;
foreach my $peer (keys %$peers){ foreach my $peer (keys %$peers){
next if $peer eq $data->{from}; next if $peer eq $data->{from};
next if !$peers->{$data->{from}}->{room}; next if !$peers->{$data->{from}}->{room};
@ -1227,8 +1233,9 @@ helper export_events_xlsx => sub {
# Disconnect a peer from the signaling channel # Disconnect a peer from the signaling channel
helper disconnect_peer => sub { helper disconnect_peer => sub {
my $self = shift; my $self = shift;
my $id = shift; my $id = shift;
my $peers =$self->get_peers;
return 0 if (!$id || !$peers->{$id}); return 0 if (!$id || !$peers->{$id});
if ($id && $peers->{$id} && $peers->{$id}->{room}){ if ($id && $peers->{$id} && $peers->{$id}->{room}){
$self->log_event({ $self->log_event({
@ -1247,7 +1254,7 @@ helper disconnect_peer => sub {
) )
}); });
$self->update_room_last_activity($peers->{$id}->{room}); $self->update_room_last_activity($peers->{$id}->{room});
delete $peers->{$id}; $self->del_peer($id);
}; };
# Socket.IO handshake # Socket.IO handshake
@ -1278,29 +1285,34 @@ websocket '/socket.io/:ver/websocket/:id' => sub {
} }
my $key = $self->session('key'); my $key = $self->session('key');
my $new_peer = {};
# We create the peer in the global hash # We create the peer in the global hash
$peers->{$id}->{socket} = $self->tx; $new_peer->{socket} = $self->tx;
# And set the initial "last seen" flag # And set the initial "last seen" flag
$peers->{$id}->{last} = time; $new_peer->{last} = time;
# Associate the unique ID and name # Associate the unique ID and name
$peers->{$id}->{id} = $self->session('id'); $new_peer->{id} = $self->session('id');
$peers->{$id}->{check_invitations} = 1; $new_peer->{check_invitations} = 1;
# Register the i18n stash, for localization will be available in the main IOLoop # Register the i18n stash, for localization will be available in the main IOLoop
# Outside of Mojo controller # Outside of Mojo controller
$peers->{$id}->{i18n} = $self->stash->{i18n}; $new_peer->{i18n} = $self->stash->{i18n};
$self->add_peer($id, $new_peer);
# When we recive a message, lets parse it as e Socket.IO one # When we recive a message, lets parse it as e Socket.IO one
$self->on('message' => sub { $self->on('message' => sub {
my $self = shift; my $self = shift;
my $msg = Protocol::SocketIO::Message->new->parse(shift); my $peer = $self->get_peer($id);
my $peers = $self->get_peers;
my $msg = Protocol::SocketIO::Message->new->parse(shift);
if ($msg->type eq 'event'){ if ($msg->type eq 'event'){
# Here's a client joining a room # Here's a client joining a room
if ($msg->{data}->{name} eq 'join'){ if ($msg->{data}->{name} eq 'join'){
my $room = $msg->{data}->{args}[0]; my $room = $msg->{data}->{args}[0];
my $role = $self->get_key_role($key, $room); my $role = $self->get_key_role($key, $room);
$peers->{$id}->{role} = $role; $peer->{role} = $role;
# Is this peer allowed to join the room ? # Is this peer allowed to join the room ?
if (!$self->get_room_by_name($room) || if (!$self->get_room_by_name($room) ||
!$role || !$role ||
@ -1333,12 +1345,12 @@ websocket '/socket.io/:ver/websocket/:id' => sub {
next if $peers->{$peer}->{room} ne $room; next if $peers->{$peer}->{room} ne $room;
$others->{$peer} = $peers->{$peer}->{details}; $others->{$peer} = $peers->{$peer}->{details};
} }
$peers->{$id}->{details} = { $peer->{details} = {
screen => \0, screen => \0,
video => \1, video => \1,
audio => \0 audio => \0
}; };
$peers->{$id}->{room} = $room; $peer->{room} = $room;
# Lets send the list of peers in our ack message # Lets send the list of peers in our ack message
# Not sure why the null arg is needed, got it by looking at how it works with SignalMaster # Not sure why the null arg is needed, got it by looking at how it works with SignalMaster
$self->send( $self->send(
@ -1371,7 +1383,7 @@ websocket '/socket.io/:ver/websocket/:id' => sub {
$peers->{$to}->{room} && $peers->{$to}->{room} &&
$peers->{$to}->{room} eq $peers->{$id}->{room} && $peers->{$to}->{room} eq $peers->{$id}->{room} &&
$peers->{$to}->{socket}){ $peers->{$to}->{socket}){
$peers->{$to}->{socket}->send(Protocol::SocketIO::Message->new(%$msg)); $self->send(Protocol::SocketIO::Message->new(%$msg));
} }
# No dest, multicast this to every members of the room # No dest, multicast this to every members of the room
else{ else{
@ -1383,11 +1395,11 @@ websocket '/socket.io/:ver/websocket/:id' => sub {
} }
# When a peer shares its screen # When a peer shares its screen
elsif ($msg->{data}->{name} eq 'shareScreen'){ elsif ($msg->{data}->{name} eq 'shareScreen'){
$peers->{$id}->{details}->{screen} = \1; $peer->{details}->{screen} = \1;
} }
# Or unshares it # Or unshares it
elsif ($msg->{data}->{name} eq 'unshareScreen'){ elsif ($msg->{data}->{name} eq 'unshareScreen'){
$peers->{$id}->{details}->{screen} = \0; $peer->{details}->{screen} = \0;
$self->signal_broadcast_room({ $self->signal_broadcast_room({
from => $id, from => $id,
msg => Protocol::SocketIO::Message->new( msg => Protocol::SocketIO::Message->new(
@ -1400,7 +1412,7 @@ websocket '/socket.io/:ver/websocket/:id' => sub {
}); });
} }
elsif ($msg->{data}->{name} =~ m/^leave|disconnect$/){ elsif ($msg->{data}->{name} =~ m/^leave|disconnect$/){
$peers->{$id}->{socket}->{finish}; $self->finish;
} }
else{ else{
$self->app->log->debug("Unhandled SocketIO message\n" . Dumper $msg); $self->app->log->debug("Unhandled SocketIO message\n" . Dumper $msg);
@ -1408,19 +1420,20 @@ websocket '/socket.io/:ver/websocket/:id' => sub {
} }
# Heartbeat reply, update timestamp # Heartbeat reply, update timestamp
elsif ($msg->type eq 'heartbeat'){ elsif ($msg->type eq 'heartbeat'){
$peers->{$id}->{last} = time; $peer->{last} = time;
# Update room last activity ~ every 40 heartbeats, so about every 2 minutes # Update room last activity ~ every 40 heartbeats, so about every 2 minutes
if ((int (rand 200)) <= 5){ if ((int (rand 200)) <= 5){
$self->update_room_last_activity($peers->{$id}->{room}); $self->update_room_last_activity($peer->{room});
} }
} }
$self->add_peer($id, $peer);
}); });
# Triggerred when a websocket connection ends # Triggerred when a websocket connection ends
$self->on(finish => sub { $self->on(finish => sub {
my $self = shift; my $self = shift;
$self->disconnect_peer($id); $self->disconnect_peer($id);
delete $peers->{$id}; $self->del_peer($id);
}); });
# This is just the end of the initial handshake, we indicate the client we're ready # This is just the end of the initial handshake, we indicate the client we're ready
@ -1430,12 +1443,13 @@ websocket '/socket.io/:ver/websocket/:id' => sub {
# Send heartbeats to all websocket clients # Send heartbeats to all websocket clients
# Every 3 seconds # Every 3 seconds
Mojo::IOLoop->recurring( 3 => sub { Mojo::IOLoop->recurring( 3 => sub {
my $peers = app->get_peers;
foreach my $id (keys %{$peers}){ foreach my $id (keys %{$peers}){
# This shouldn't happen, but better to log an error and fix it rather # This shouldn't happen, but better to log an error and fix it rather
# than looping indefinitly on a bogus entry if something went wrong # than looping indefinitly on a bogus entry if something went wrong
if (!$peers->{$id}->{socket}){ if (!$peers->{$id}->{socket}){
app->log->debug("Garbage found in peers (peer $id has no socket)\n"); app->log->debug("Garbage found in peers (peer $id has no socket)\n");
delete $peers->{$id}; app->del_peer($id);
} }
# If we had no reply from this peer in the last 15 sec # If we had no reply from this peer in the last 15 sec
# (5 heartbeat without response), we consider it dead and remove it # (5 heartbeat without response), we consider it dead and remove it
@ -1470,11 +1484,12 @@ Mojo::IOLoop->recurring( 3 => sub {
} }
) )
); );
delete $peers->{$id}->{check_invitations};
} }
# Send the heartbeat delete $peers->{$id}->{check_invitations};
$peers->{$id}->{socket}->send(Protocol::SocketIO::Message->new( type => 'heartbeat' )) app->add_peer($id, $peers->{$id});
} }
# Send the heartbeat
$peers->{$id}->{socket}->send(Protocol::SocketIO::Message->new( type => 'heartbeat' ));
} }
}); });
@ -1919,7 +1934,11 @@ any '/api' => sub {
$self->app->log->info("Email invitation to join room " . $req->{param}->{room} . " sent to " . $addr); $self->app->log->info("Email invitation to join room " . $req->{param}->{room} . " sent to " . $addr);
} }
# Mark the inviter as waiting for a reply # Mark the inviter as waiting for a reply
$peers->{$self->session('peer_id')}->{check_invitations} = 1; if ($self->session('peer_id')){
my $peer = $self->get_peer($self->session('peer_id'));
$peer->{check_invitations} = 1;
$self->add_peer($self->session('peer_id'), $peer);
}
return $self->render( return $self->render(
json => { json => {
msg => sprintf($self->l('INVITE_SENT_TO_s'), join("\n", @$rcpts)), msg => sprintf($self->l('INVITE_SENT_TO_s'), join("\n", @$rcpts)),

Loading…
Cancel
Save