@@ -436,19 +436,19 @@ srs_error_t SrsRtcUdpNetwork::write(void* buf, size_t size, ssize_t* nwrite)
436436 return sendonly_skt_->sendto (buf, size, SRS_UTIME_NO_TIMEOUT);
437437}
438438
439- SrsRtcTcpNetwork::SrsRtcTcpNetwork (SrsRtcConnection* conn, SrsEphemeralDelta* delta)
439+ SrsRtcTcpNetwork::SrsRtcTcpNetwork (SrsRtcConnection* conn, SrsEphemeralDelta* delta) : owner_( new SrsRtcTcpConn())
440440{
441441 conn_ = conn;
442442 delta_ = delta;
443443 sendonly_skt_ = NULL ;
444444 transport_ = new SrsSecurityTransport (this );
445445 peer_port_ = 0 ;
446446 state_ = SrsRtcNetworkStateInit;
447- owner_ = NULL ;
448447}
449448
450449SrsRtcTcpNetwork::~SrsRtcTcpNetwork ()
451450{
451+ owner_->interrupt ();
452452 srs_freep (transport_);
453453}
454454
@@ -694,54 +694,72 @@ void SrsRtcTcpNetwork::dispose()
694694
695695#define SRS_RTC_TCP_PACKET_MAX 1500
696696
697- SrsRtcTcpConn::SrsRtcTcpConn (ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm)
697+ SrsRtcTcpConn::SrsRtcTcpConn ()
698+ {
699+ wrapper_ = NULL ;
700+ owner_coroutine_ = NULL ;
701+ owner_cid_ = NULL ;
702+ cid_ = _srs_context->get_id ();
703+
704+ pkt_ = NULL ;
705+ delta_ = NULL ;
706+ skt_ = NULL ;
707+ }
708+
709+ SrsRtcTcpConn::SrsRtcTcpConn (ISrsProtocolReadWriter* skt, std::string cip, int port) : SrsRtcTcpConn()
698710{
699- manager_ = cm;
700711 ip_ = cip;
701712 port_ = port;
702713 skt_ = skt;
703714 delta_ = new SrsNetworkDelta ();
704715 delta_->set_io (skt_, skt_);
705- trd_ = new SrsSTCoroutine (" tcp" , this , _srs_context->get_id ());
706716 session_ = NULL ;
707717 pkt_ = new char [SRS_RTC_TCP_PACKET_MAX];
708- _srs_rtc_manager->subscribe (this );
709718}
710719
711720SrsRtcTcpConn::~SrsRtcTcpConn ()
712721{
713- _srs_rtc_manager->unsubscribe (this );
714- trd_->interrupt ();
715- srs_freep (trd_);
716-
717722 srs_freepa (pkt_);
718723 srs_freep (delta_);
719724 srs_freep (skt_);
720725}
721726
727+ void SrsRtcTcpConn::setup_owner (SrsSharedResource<SrsRtcTcpConn>* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid)
728+ {
729+ wrapper_ = wrapper;
730+ owner_coroutine_ = owner_coroutine;
731+ owner_cid_ = owner_cid;
732+ }
733+
722734ISrsKbpsDelta* SrsRtcTcpConn::delta ()
723735{
724736 return delta_;
725737}
726738
739+ void SrsRtcTcpConn::interrupt ()
740+ {
741+ session_ = NULL ;
742+ if (owner_coroutine_) owner_coroutine_->interrupt ();
743+ }
744+
727745std::string SrsRtcTcpConn::desc ()
728746{
729747 return " Tcp" ;
730748}
731749
732750const SrsContextId& SrsRtcTcpConn::get_id ()
733751{
734- return trd_-> cid () ;
752+ return cid_ ;
735753}
736754
737755std::string SrsRtcTcpConn::remote_ip ()
738756{
739757 return ip_;
740758}
741759
742- srs_error_t SrsRtcTcpConn::start ( )
760+ void SrsRtcTcpConn::on_executor_done (ISrsInterruptable* executor )
743761{
744- return trd_-> start () ;
762+ owner_coroutine_ = NULL ;
745763}
746764
747765srs_error_t SrsRtcTcpConn::cycle ()
@@ -752,15 +770,10 @@ srs_error_t SrsRtcTcpConn::cycle()
752770 SrsStatistic::instance ()->on_disconnect (get_id ().c_str (), err);
753771 SrsStatistic::instance ()->kbps_add_delta (get_id ().c_str (), delta_);
754772
755- // TODO: FIXME: Should manage RTC TCP connection by _srs_rtc_manager.
756- // Because we use manager to manage this object, not the http connection object, so we must remove it here.
757- manager_->remove (this );
758-
759- // TODO: FIXME: When TCP connection(transport) closed, should notify session to dispose, should not free them simultaneously.
760773 // Only remove session when network is established, because client might use other UDP network.
761774 if (session_ && session_->tcp ()->is_establelished ()) {
762775 session_->tcp ()->set_state (SrsRtcNetworkStateClosed);
763- _srs_rtc_manager-> remove (session_ );
776+ session_-> expire ( );
764777 }
765778
766779 // For HTTP-API timeout, we think it's done successfully,
@@ -801,13 +814,18 @@ srs_error_t SrsRtcTcpConn::do_cycle()
801814{
802815 srs_error_t err = srs_success;
803816
817+ // Update all context id to cid of session.
818+ _srs_context->set_id (cid_);
819+ owner_cid_->set_cid (cid_);
820+
804821 if ((err = handshake ()) != srs_success) {
805822 return srs_error_wrap (err, " process rtc tcp pkt" );
806823 }
807824
808825 // TODO: FIXME: Handle all bytes of TCP Connection.
809826 while (true ) {
810- if ((err = trd_->pull ()) != srs_success) {
827+ if (!owner_coroutine_) return err;
828+ if ((err = owner_coroutine_->pull ()) != srs_success) {
811829 return srs_error_wrap (err, " rtc tcp conn" );
812830 }
813831
@@ -859,11 +877,11 @@ srs_error_t SrsRtcTcpConn::handshake()
859877
860878 // Should support only one TCP candidate.
861879 SrsRtcTcpNetwork* network = dynamic_cast <SrsRtcTcpNetwork*>(session->tcp ());
862- if (! network->owner ()) {
863- network->set_owner (this );
880+ if (network->owner (). get () != this ) {
881+ network->set_owner (*wrapper_ );
864882 session_ = session;
865883 }
866- if (network->owner () != this ) {
884+ if (network->owner (). get () != this ) {
867885 return srs_error_new (ERROR_RTC_TCP_UNIQUE, " only support one network" );
868886 }
869887
@@ -939,21 +957,3 @@ srs_error_t SrsRtcTcpConn::on_tcp_pkt(char* pkt, int nb_pkt)
939957 return srs_error_new (ERROR_RTC_UDP, " unknown packet" );
940958}
941959
942- void SrsRtcTcpConn::on_before_dispose (ISrsResource* c)
943- {
944- if (!session_) return ;
945-
946- SrsRtcConnection* conn = dynamic_cast <SrsRtcConnection*>(c);
947- if (conn == session_) {
948- session_ = NULL ;
949- // the related rtc connection will be disposed
950- srs_trace (" RTC: tcp conn diposing, because of rtc connection" );
951- trd_->interrupt ();
952- }
953- }
954-
955- void SrsRtcTcpConn::on_disposing (ISrsResource* c)
956- {
957- return ;
958- }
959-
0 commit comments