@@ -2,7 +2,7 @@ use std::collections::VecDeque;
22use std:: sync:: Weak ;
33
44use super :: * ;
5- use crate :: rtp_transceiver:: create_stream_info;
5+ use crate :: rtp_transceiver:: { create_stream_info, PayloadType } ;
66use crate :: stats:: stats_collector:: StatsCollector ;
77use crate :: stats:: {
88 InboundRTPStats , OutboundRTPStats , RTCStatsType , RemoteInboundRTPStats , RemoteOutboundRTPStats ,
@@ -15,7 +15,6 @@ use arc_swap::ArcSwapOption;
1515use portable_atomic:: AtomicIsize ;
1616use smol_str:: SmolStr ;
1717use tokio:: time:: Instant ;
18- use util:: Unmarshal ;
1918
2019pub ( crate ) struct PeerConnectionInternal {
2120 /// a value containing the last known greater mid value
@@ -309,8 +308,12 @@ impl PeerConnectionInternal {
309308 }
310309 } ;
311310
312- let stream = match srtp_session. accept ( ) . await {
313- Ok ( stream) => stream,
311+ let ( stream, header) = match srtp_session. accept ( ) . await {
312+ Ok ( ( stream, Some ( header) ) ) => ( stream, header) ,
313+ Ok ( ( _, None ) ) => {
314+ log:: error!( "Accepting RTP session, without RTP header?" ) ;
315+ return ;
316+ }
314317 Err ( err) => {
315318 log:: warn!( "Failed to accept RTP {}" , err) ;
316319 return ;
@@ -338,16 +341,16 @@ impl PeerConnectionInternal {
338341 let pci = Arc :: clone ( & pci) ;
339342 tokio:: spawn ( async move {
340343 let ssrc = stream. get_ssrc ( ) ;
341-
342344 dtls_transport
343345 . store_simulcast_stream ( ssrc, Arc :: clone ( & stream) )
344346 . await ;
345347
346- if let Err ( err) = pci. handle_incoming_ssrc ( stream, ssrc) . await {
348+ if let Err ( err) = pci
349+ . handle_incoming_rtp_stream ( stream, header. payload_type )
350+ . await
351+ {
347352 log:: warn!(
348- "Incoming unhandled RTP ssrc({}), on_track will not be fired. {}" ,
349- ssrc,
350- err
353+ "Incoming unhandled RTP ssrc({ssrc}), on_track will not be fired. {err}"
351354 ) ;
352355 }
353356
@@ -370,17 +373,18 @@ impl PeerConnectionInternal {
370373 }
371374 } ;
372375
373- let stream = match srtcp_session. accept ( ) . await {
374- Ok ( stream) => stream,
376+ match srtcp_session. accept ( ) . await {
377+ Ok ( ( stream, _) ) => {
378+ let ssrc = stream. get_ssrc ( ) ;
379+ log:: warn!(
380+ "Incoming unhandled RTCP ssrc({ssrc}), on_track will not be fired"
381+ ) ;
382+ }
375383 Err ( err) => {
376- log:: warn!( "Failed to accept RTCP {}" , err ) ;
384+ log:: warn!( "Failed to accept RTCP {err}" ) ;
377385 return ;
378386 }
379387 } ;
380- log:: warn!(
381- "Incoming unhandled RTCP ssrc({}), on_track will not be fired" ,
382- stream. get_ssrc( )
383- ) ;
384388 }
385389 } ) ;
386390 }
@@ -1002,18 +1006,18 @@ impl PeerConnectionInternal {
10021006 Ok ( true )
10031007 }
10041008
1005- async fn handle_incoming_ssrc (
1009+ async fn handle_incoming_rtp_stream (
10061010 self : & Arc < Self > ,
10071011 rtp_stream : Arc < Stream > ,
1008- ssrc : SSRC ,
1012+ payload_type : PayloadType ,
10091013 ) -> Result < ( ) > {
1014+ let ssrc = rtp_stream. get_ssrc ( ) ;
10101015 let parsed = match self . remote_description ( ) . await . and_then ( |rd| rd. parsed ) {
10111016 Some ( r) => r,
10121017 None => return Err ( Error :: ErrPeerConnRemoteDescriptionNil ) ,
10131018 } ;
10141019 // If the remote SDP was only one media section the ssrc doesn't have to be explicitly declared
1015- let handled = self . handle_undeclared_ssrc ( ssrc, & parsed) . await ?;
1016- if handled {
1020+ if self . handle_undeclared_ssrc ( ssrc, & parsed) . await ? {
10171021 return Ok ( ( ) ) ;
10181022 }
10191023
@@ -1046,26 +1050,6 @@ impl PeerConnectionInternal {
10461050 } )
10471051 . await ;
10481052
1049- // Packets that we read as part of simulcast probing that we need to make available
1050- // if we do find a track later.
1051- let mut buffered_packets: VecDeque < ( rtp:: packet:: Packet , Attributes ) > = VecDeque :: default ( ) ;
1052-
1053- let mut buf = vec ! [ 0u8 ; self . setting_engine. get_receive_mtu( ) ] ;
1054- let n = rtp_stream. read ( & mut buf) . await ?;
1055- let mut b = & buf[ ..n] ;
1056-
1057- let ( mut mid, mut rid, mut rsid, payload_type) = handle_unknown_rtp_packet (
1058- b,
1059- mid_extension_id as u8 ,
1060- sid_extension_id as u8 ,
1061- rsid_extension_id as u8 ,
1062- ) ?;
1063-
1064- let packet = rtp:: packet:: Packet :: unmarshal ( & mut b) . unwrap ( ) ;
1065-
1066- // TODO: Can we have attributes on the first packets?
1067- buffered_packets. push_back ( ( packet, Attributes :: new ( ) ) ) ;
1068-
10691053 let params = self
10701054 . media_engine
10711055 . get_rtp_parameters_by_payload_type ( payload_type)
@@ -1089,21 +1073,24 @@ impl PeerConnectionInternal {
10891073 . streams_for_ssrc ( ssrc, & stream_info, & icpr)
10901074 . await ?;
10911075
1092- let a = Attributes :: new ( ) ;
1076+ // Packets that we read as part of simulcast probing that we need to make available
1077+ // if we do find a track later.
1078+ let mut buffered_packets: VecDeque < ( rtp:: packet:: Packet , Attributes ) > = VecDeque :: default ( ) ;
1079+ let mut buf = vec ! [ 0u8 ; self . setting_engine. get_receive_mtu( ) ] ;
1080+
10931081 for _ in 0 ..=SIMULCAST_PROBE_COUNT {
1082+ let ( pkt, a) = rtp_interceptor
1083+ . read ( & mut buf, & stream_info. attributes )
1084+ . await ?;
1085+ let ( mid, rid, rsid) = get_stream_mid_rid (
1086+ & pkt. header ,
1087+ mid_extension_id as u8 ,
1088+ sid_extension_id as u8 ,
1089+ rsid_extension_id as u8 ,
1090+ ) ?;
1091+ buffered_packets. push_back ( ( pkt, a. clone ( ) ) ) ;
1092+
10941093 if mid. is_empty ( ) || ( rid. is_empty ( ) && rsid. is_empty ( ) ) {
1095- let ( pkt, _) = rtp_interceptor. read ( & mut buf, & a) . await ?;
1096- let ( m, r, rs, _) = handle_unknown_rtp_packet (
1097- & buf[ ..n] ,
1098- mid_extension_id as u8 ,
1099- sid_extension_id as u8 ,
1100- rsid_extension_id as u8 ,
1101- ) ?;
1102- mid = m;
1103- rid = r;
1104- rsid = rs;
1105-
1106- buffered_packets. push_back ( ( pkt, a. clone ( ) ) ) ;
11071094 continue ;
11081095 }
11091096
@@ -1544,3 +1531,34 @@ fn capitalize(s: &str) -> String {
15441531
15451532 result
15461533}
1534+
1535+ fn get_stream_mid_rid (
1536+ header : & rtp:: header:: Header ,
1537+ mid_extension_id : u8 ,
1538+ sid_extension_id : u8 ,
1539+ rsid_extension_id : u8 ,
1540+ ) -> Result < ( String , String , String ) > {
1541+ if !header. extension {
1542+ return Ok ( ( String :: new ( ) , String :: new ( ) , String :: new ( ) ) ) ;
1543+ }
1544+
1545+ let mid = if let Some ( payload) = header. get_extension ( mid_extension_id) {
1546+ String :: from_utf8 ( payload. to_vec ( ) ) ?
1547+ } else {
1548+ String :: new ( )
1549+ } ;
1550+
1551+ let rid = if let Some ( payload) = header. get_extension ( sid_extension_id) {
1552+ String :: from_utf8 ( payload. to_vec ( ) ) ?
1553+ } else {
1554+ String :: new ( )
1555+ } ;
1556+
1557+ let srid = if let Some ( payload) = header. get_extension ( rsid_extension_id) {
1558+ String :: from_utf8 ( payload. to_vec ( ) ) ?
1559+ } else {
1560+ String :: new ( )
1561+ } ;
1562+
1563+ Ok ( ( mid, rid, srid) )
1564+ }
0 commit comments