Some comment in the signaling part

master
Daniel Berteaud 10 years ago
parent 45806953a5
commit fd4d464a51
  1. 41
      vroom.pl

@ -1060,19 +1060,24 @@ get '/socket.io/:ver' => sub {
websocket '/socket.io/:ver/websocket/:id' => sub { websocket '/socket.io/:ver/websocket/:id' => sub {
my $self = shift; my $self = shift;
my $id = $self->stash('id'); my $id = $self->stash('id');
# the ID must match the one stored in our session
if ($id ne $self->session('peer_id') || !$self->session('name')){ if ($id ne $self->session('peer_id') || !$self->session('name')){
$self->app->log->debug('Sometyhing is wrong, peer ID is ' . $id . ' but should be ' . $self->session('peer_id')); $self->app->log->debug('Sometyhing is wrong, peer ID is ' . $id . ' but should be ' . $self->session('peer_id'));
return $self->send('Bad session'); return $self->send('Bad session');
} }
# We create the peer in the global hash
$peers->{$id}->{socket} = $self->tx; $peers->{$id}->{socket} = $self->tx;
# And set the initial "last seen" flag
$peers->{$id}->{last} = time; $peers->{$id}->{last} = time;
# When we recive a message, lets parse it as e Socket.IO one
$self->on('message' => sub { $self->on('message' => sub {
my $self = shift; my $self = shift;
my $msg = Protocol::SocketIO::Message->new->parse(shift); my $msg = Protocol::SocketIO::Message->new->parse(shift);
if ($msg->type eq 'event'){ if ($msg->type eq 'event'){
# Here's a client joining a room
if ($msg->{data}->{name} eq 'join'){ if ($msg->{data}->{name} eq 'join'){
my $room = $msg->{data}->{args}[0]; my $room = $msg->{data}->{args}[0];
# Is this peer allowed to join the room ? # Is this peer allowed to join the room ?
@ -1085,6 +1090,8 @@ websocket '/socket.io/:ver/websocket/:id' => sub {
$self->finish; $self->finish;
return; return;
} }
# We build a list of peers, except this new one so we can send it
# to the new peer, and he'll be able to contact all those peers
my $others = {}; my $others = {};
foreach my $peer (keys %$peers){ foreach my $peer (keys %$peers){
next if ($peer eq $id); next if ($peer eq $id);
@ -1097,7 +1104,8 @@ websocket '/socket.io/:ver/websocket/:id' => sub {
}; };
$peers->{$id}->{room} = $room; $peers->{$id}->{room} = $room;
$self->app->log->debug("Client id " . $id . " joined room " . $room); $self->app->log->debug("Client id " . $id . " joined room " . $room);
# $self->app->log->debug(Dumper($others)); # Lets send the list of peers in our ack message
# Not sure why the null arg is needed, got it by looking at how it works with SignalMaster
$self->send( $self->send(
Protocol::SocketIO::Message->new( Protocol::SocketIO::Message->new(
type => 'ack', type => 'ack',
@ -1111,23 +1119,27 @@ websocket '/socket.io/:ver/websocket/:id' => sub {
) )
); );
} }
# We have a message from a peer
elsif ($msg->{data}->{name} eq 'message'){ elsif ($msg->{data}->{name} eq 'message'){
# We check the room this peer is in
my $room = $peers->{$id}->{room}; my $room = $peers->{$id}->{room};
$self->app->log->debug("Message received from " . $id); $self->app->log->debug("Signaling message received from " . $id);
# Forward this message to all other members of the same room
foreach my $peer (keys %$peers){ foreach my $peer (keys %$peers){
next if ($peer eq $id); next if ($peer eq $id);
next if $peers->{$peer}->{room} ne $room; next if $peers->{$peer}->{room} ne $room;
#$self->app->log->debug("Relaying message from " . $id . " to " . $peer); # Just set who's sending it
$msg->{data}->{args}[0]->{from} = $id; $msg->{data}->{args}[0]->{from} = $id;
#$self->app->log->debug(Dumper($msg));
$peers->{$peer}->{socket}->send( $peers->{$peer}->{socket}->send(
Protocol::SocketIO::Message->new(%$msg) Protocol::SocketIO::Message->new(%$msg)
); );
} }
} }
# When a peer share its screen
elsif ($msg->{data}->{name} eq 'shareScreen'){ elsif ($msg->{data}->{name} eq 'shareScreen'){
$peers->{$id}->{details}->{screen} = \1; $peers->{$id}->{details}->{screen} = \1;
} }
# Or unshare it
elsif ($msg->{data}->{name} eq 'unshareScreen'){ elsif ($msg->{data}->{name} eq 'unshareScreen'){
$peers->{$id}->{details}->{screen} = \0; $peers->{$id}->{details}->{screen} = \0;
foreach my $peer (keys %$peers){ foreach my $peer (keys %$peers){
@ -1151,19 +1163,20 @@ websocket '/socket.io/:ver/websocket/:id' => sub {
} }
} }
} }
elsif ($msg->type eq 'heartbeat'){
# Heartbeat reply, update timestamp # Heartbeat reply, update timestamp
elsif ($msg->type eq 'heartbeat'){
$peers->{$id}->{last} = time; $peers->{$id}->{last} = time;
} }
}); });
# Triggerred when a websocket connection ends
$self->on(finish => sub { $self->on(finish => sub {
my ($self, $code, $reason) = @_; my ($self, $code, $reason) = @_;
$self->app->log->debug("Client id " . $id . " closed websocket connection"); $self->app->log->debug("Client id " . $id . " closed websocket connection");
foreach my $peer (keys %$peers){ foreach my $peer (keys %$peers){
next if ($peer eq $id); next if ($peer eq $id);
next if $peers->{$peer}->{room} ne $peers->{$id}->{room}; next if $peers->{$peer}->{room} ne $peers->{$id}->{room};
$self->app->log->debug("Relaying message from " . $id . " to " . $peer); $self->app->log->debug("Notifying $peer that $id leaved");
$peers->{$peer}->{socket}->send( $peers->{$peer}->{socket}->send(
Protocol::SocketIO::Message->new( Protocol::SocketIO::Message->new(
type => 'event', type => 'event',
@ -1179,22 +1192,26 @@ websocket '/socket.io/:ver/websocket/:id' => sub {
) )
); );
} }
# Remove this peer from our global hash
delete $peers->{$id}; delete $peers->{$id};
}); });
# This is just the end of the initial handshake, we indicate the client we're ready
$self->send(Protocol::SocketIO::Message->new( type => 'connect' )); $self->send(Protocol::SocketIO::Message->new( type => 'connect' ));
}; };
# Send heartbeats to all websocket clients # Send heartbeats to all websocket clients
Mojo::IOLoop->recurring( 2 => sub { # Every 3 seconds
Mojo::IOLoop->recurring( 3 => sub {
foreach my $peer ( keys %$peers ) { foreach my $peer ( keys %$peers ) {
if ($peers->{$peer}->{last} < time - 10){ # If we had no reply from this peer in the last 15 sec
app->log->debug("Peer $peer didn't reply in 10 sec, disconnecting"); # (5 heartbeat without response), we consider it dead and remove it
if ($peers->{$peer}->{last} < time - 15){
app->log->debug("Peer $peer didn't reply in 15 sec, disconnecting");
$peers->{$peer}->{socket}->finish; $peers->{$peer}->{socket}->finish;
} }
else { else {
$peers->{$peer}->{socket}->send( $peers->{$peer}->{socket}->send(Protocol::SocketIO::Message->new( type => 'heartbeat' ));
Protocol::SocketIO::Message->new( type => 'heartbeat' )
);
} }
} }
}); });

Loading…
Cancel
Save