diff --git a/vroom.pl b/vroom.pl index 9efc278..6b0bf52 100755 --- a/vroom.pl +++ b/vroom.pl @@ -79,9 +79,6 @@ if ($excel){ # Global error check our $error = undef; -# Global peers hash -our $peers = {}; - # Initialize localization plugin I18N => { namespace => 'Vroom::I18N', @@ -233,7 +230,11 @@ helper get_peers => sub { helper get_peer => sub { my $self = shift; my $peer = shift; - return Mojo::JSON::from_json($self->redis->hget('peers', $peer); + my $p = $self->redis->hget('peers', $peer); + if ($p){ + return Mojo::JSON::from_json($p); + } + return 0; }; # Store peers in redis @@ -517,24 +518,27 @@ helper set_peer_role => sub { my $self = shift; my $data = shift; # Check the peer exists and is already in the room - if (!$data->{peer_id} || - !$peers->{$data->{peer_id}}){ + if (!$data->{peer_id}){ return 0; } - $peers->{$data->{peer_id}}->{role} = $data->{role}; + my $peer = $self->get_peer($data->{peer_id}); + if (!$peer){ + return 0; + } + $peer->{role} = $data->{role}; $self->log_event({ event => 'peer_role', msg => "Peer " . $data->{peer_id} . " has now the " . - $data->{role} . " role in room " . $peers->{$data->{peer_id}}->{room} + $data->{role} . " role in room " . $peer->{room} }); - return 1; + return $self->add_peer($data->{peer_id}, $peer); }; # Return the role of a peer, take a peer object as arg ($data = { peer_id => XYZ }) helper get_peer_role => sub { my $self = shift; my $peer_id = shift; - return $peers->{$peer_id}->{role}; + return $self->get_peer($peer_id)->{role}; }; # Promote a peer to owner @@ -1109,7 +1113,8 @@ helper get_room_members => sub { my $room = shift; return 0 if (!$self->get_room_by_name($room)); my @p; - foreach my $peer (keys $peers){ + my $peers = $self->get_peers; + foreach my $peer (keys %$peers){ if ($peers->{$peer}->{room} && $peers->{$peer}->{room} eq $room){ push @p, $peer; @@ -1125,6 +1130,7 @@ helper signal_broadcast_room => sub { # Send a message to all members of the same room as the sender # except the sender himself + my $peers = $self->get_peers; foreach my $peer (keys %$peers){ next if $peer eq $data->{from}; next if !$peers->{$data->{from}}->{room}; @@ -1227,8 +1233,9 @@ helper export_events_xlsx => sub { # Disconnect a peer from the signaling channel helper disconnect_peer => sub { - my $self = shift; - my $id = shift; + my $self = shift; + my $id = shift; + my $peers =$self->get_peers; return 0 if (!$id || !$peers->{$id}); if ($id && $peers->{$id} && $peers->{$id}->{room}){ $self->log_event({ @@ -1247,7 +1254,7 @@ helper disconnect_peer => sub { ) }); $self->update_room_last_activity($peers->{$id}->{room}); - delete $peers->{$id}; + $self->del_peer($id); }; # Socket.IO handshake @@ -1278,29 +1285,34 @@ websocket '/socket.io/:ver/websocket/:id' => sub { } my $key = $self->session('key'); + my $new_peer = {}; # We create the peer in the global hash - $peers->{$id}->{socket} = $self->tx; + $new_peer->{socket} = $self->tx; # And set the initial "last seen" flag - $peers->{$id}->{last} = time; + $new_peer->{last} = time; # Associate the unique ID and name - $peers->{$id}->{id} = $self->session('id'); - $peers->{$id}->{check_invitations} = 1; + $new_peer->{id} = $self->session('id'); + $new_peer->{check_invitations} = 1; # Register the i18n stash, for localization will be available in the main IOLoop # Outside of Mojo controller - $peers->{$id}->{i18n} = $self->stash->{i18n}; + $new_peer->{i18n} = $self->stash->{i18n}; + + $self->add_peer($id, $new_peer); # When we recive a message, lets parse it as e Socket.IO one $self->on('message' => sub { - my $self = shift; - my $msg = Protocol::SocketIO::Message->new->parse(shift); + my $self = shift; + my $peer = $self->get_peer($id); + my $peers = $self->get_peers; + my $msg = Protocol::SocketIO::Message->new->parse(shift); if ($msg->type eq 'event'){ # Here's a client joining a room if ($msg->{data}->{name} eq 'join'){ my $room = $msg->{data}->{args}[0]; my $role = $self->get_key_role($key, $room); - $peers->{$id}->{role} = $role; + $peer->{role} = $role; # Is this peer allowed to join the room ? if (!$self->get_room_by_name($room) || !$role || @@ -1333,12 +1345,12 @@ websocket '/socket.io/:ver/websocket/:id' => sub { next if $peers->{$peer}->{room} ne $room; $others->{$peer} = $peers->{$peer}->{details}; } - $peers->{$id}->{details} = { + $peer->{details} = { screen => \0, video => \1, audio => \0 }; - $peers->{$id}->{room} = $room; + $peer->{room} = $room; # Lets send the list of peers in our ack message # Not sure why the null arg is needed, got it by looking at how it works with SignalMaster $self->send( @@ -1371,7 +1383,7 @@ websocket '/socket.io/:ver/websocket/:id' => sub { $peers->{$to}->{room} && $peers->{$to}->{room} eq $peers->{$id}->{room} && $peers->{$to}->{socket}){ - $peers->{$to}->{socket}->send(Protocol::SocketIO::Message->new(%$msg)); + $self->send(Protocol::SocketIO::Message->new(%$msg)); } # No dest, multicast this to every members of the room else{ @@ -1383,11 +1395,11 @@ websocket '/socket.io/:ver/websocket/:id' => sub { } # When a peer shares its screen elsif ($msg->{data}->{name} eq 'shareScreen'){ - $peers->{$id}->{details}->{screen} = \1; + $peer->{details}->{screen} = \1; } # Or unshares it elsif ($msg->{data}->{name} eq 'unshareScreen'){ - $peers->{$id}->{details}->{screen} = \0; + $peer->{details}->{screen} = \0; $self->signal_broadcast_room({ from => $id, msg => Protocol::SocketIO::Message->new( @@ -1400,7 +1412,7 @@ websocket '/socket.io/:ver/websocket/:id' => sub { }); } elsif ($msg->{data}->{name} =~ m/^leave|disconnect$/){ - $peers->{$id}->{socket}->{finish}; + $self->finish; } else{ $self->app->log->debug("Unhandled SocketIO message\n" . Dumper $msg); @@ -1408,19 +1420,20 @@ websocket '/socket.io/:ver/websocket/:id' => sub { } # Heartbeat reply, update timestamp elsif ($msg->type eq 'heartbeat'){ - $peers->{$id}->{last} = time; + $peer->{last} = time; # Update room last activity ~ every 40 heartbeats, so about every 2 minutes if ((int (rand 200)) <= 5){ - $self->update_room_last_activity($peers->{$id}->{room}); + $self->update_room_last_activity($peer->{room}); } } + $self->add_peer($id, $peer); }); # Triggerred when a websocket connection ends $self->on(finish => sub { my $self = shift; $self->disconnect_peer($id); - delete $peers->{$id}; + $self->del_peer($id); }); # This is just the end of the initial handshake, we indicate the client we're ready @@ -1430,12 +1443,13 @@ websocket '/socket.io/:ver/websocket/:id' => sub { # Send heartbeats to all websocket clients # Every 3 seconds Mojo::IOLoop->recurring( 3 => sub { + my $peers = app->get_peers; foreach my $id (keys %{$peers}){ # This shouldn't happen, but better to log an error and fix it rather # than looping indefinitly on a bogus entry if something went wrong if (!$peers->{$id}->{socket}){ app->log->debug("Garbage found in peers (peer $id has no socket)\n"); - delete $peers->{$id}; + app->del_peer($id); } # If we had no reply from this peer in the last 15 sec # (5 heartbeat without response), we consider it dead and remove it @@ -1470,11 +1484,12 @@ Mojo::IOLoop->recurring( 3 => sub { } ) ); - delete $peers->{$id}->{check_invitations}; } - # Send the heartbeat - $peers->{$id}->{socket}->send(Protocol::SocketIO::Message->new( type => 'heartbeat' )) + delete $peers->{$id}->{check_invitations}; + app->add_peer($id, $peers->{$id}); } + # Send the heartbeat + $peers->{$id}->{socket}->send(Protocol::SocketIO::Message->new( type => 'heartbeat' )); } }); @@ -1919,7 +1934,11 @@ any '/api' => sub { $self->app->log->info("Email invitation to join room " . $req->{param}->{room} . " sent to " . $addr); } # Mark the inviter as waiting for a reply - $peers->{$self->session('peer_id')}->{check_invitations} = 1; + if ($self->session('peer_id')){ + my $peer = $self->get_peer($self->session('peer_id')); + $peer->{check_invitations} = 1; + $self->add_peer($self->session('peer_id'), $peer); + } return $self->render( json => { msg => sprintf($self->l('INVITE_SENT_TO_s'), join("\n", @$rcpts)),