Skip to content

Commit 242152b

Browse files
winlinvipsuzp1984
andauthored
SmartPtr: Use shared ptr in RTC TCP connection. v6.0.127 (ossrs#4083)
Fix issue ossrs#3784 --- Co-authored-by: Jacob Su <suzp1984@gmail.com>
1 parent 7b9c52b commit 242152b

10 files changed

Lines changed: 96 additions & 77 deletions

trunk/doc/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ The changelog for SRS.
77
<a name="v6-changes"></a>
88

99
## SRS 6.0 Changelog
10+
* v6.0, 2024-06-13, Merge [#4083](https://github.com/ossrs/srs/pull/4083): SmartPtr: Use shared ptr in RTC TCP connection. v6.0.127 (#4083)
1011
* v6.0, 2024-06-12, Merge [#4080](https://github.com/ossrs/srs/pull/4080): SmartPtr: Use shared ptr to manage GB objects. v6.0.126 (#4080)
1112
* v6.0, 2024-06-03, Merge [#4057](https://github.com/ossrs/srs/pull/4057): RTC: Support dropping h.264 SEI from NALUs. v6.0.125 (#4057)
1213
* v6.0, 2024-04-26, Merge [#4044](https://github.com/ossrs/srs/pull/4044): fix: correct SRS_ERRNO_MAP_HTTP duplicate error code. v6.0.124 (#4044)

trunk/src/app/srs_app_rtc_conn.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1924,6 +1924,7 @@ std::string SrsRtcConnection::desc()
19241924

19251925
void SrsRtcConnection::expire()
19261926
{
1927+
// TODO: FIXME: Should set session to expired and remove it by heartbeat checking. Should not remove it directly.
19271928
_srs_rtc_manager->remove(this);
19281929
}
19291930

trunk/src/app/srs_app_rtc_network.cpp

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -436,19 +436,19 @@ srs_error_t SrsRtcUdpNetwork::write(void* buf, size_t size, ssize_t* nwrite)
436436
return sendonly_skt_->sendto(buf, size, SRS_UTIME_NO_TIMEOUT);
437437
}
438438

439-
SrsRtcTcpNetwork::SrsRtcTcpNetwork(SrsRtcConnection* conn, SrsEphemeralDelta* delta)
439+
SrsRtcTcpNetwork::SrsRtcTcpNetwork(SrsRtcConnection* conn, SrsEphemeralDelta* delta) : owner_(new SrsRtcTcpConn())
440440
{
441441
conn_ = conn;
442442
delta_ = delta;
443443
sendonly_skt_ = NULL;
444444
transport_ = new SrsSecurityTransport(this);
445445
peer_port_ = 0;
446446
state_ = SrsRtcNetworkStateInit;
447-
owner_ = NULL;
448447
}
449448

450449
SrsRtcTcpNetwork::~SrsRtcTcpNetwork()
451450
{
451+
owner_->interrupt();
452452
srs_freep(transport_);
453453
}
454454

@@ -694,54 +694,72 @@ void SrsRtcTcpNetwork::dispose()
694694

695695
#define SRS_RTC_TCP_PACKET_MAX 1500
696696

697-
SrsRtcTcpConn::SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm)
697+
SrsRtcTcpConn::SrsRtcTcpConn()
698+
{
699+
wrapper_ = NULL;
700+
owner_coroutine_ = NULL;
701+
owner_cid_ = NULL;
702+
cid_ = _srs_context->get_id();
703+
704+
pkt_ = NULL;
705+
delta_ = NULL;
706+
skt_ = NULL;
707+
}
708+
709+
SrsRtcTcpConn::SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port) : SrsRtcTcpConn()
698710
{
699-
manager_ = cm;
700711
ip_ = cip;
701712
port_ = port;
702713
skt_ = skt;
703714
delta_ = new SrsNetworkDelta();
704715
delta_->set_io(skt_, skt_);
705-
trd_ = new SrsSTCoroutine("tcp", this, _srs_context->get_id());
706716
session_ = NULL;
707717
pkt_ = new char[SRS_RTC_TCP_PACKET_MAX];
708-
_srs_rtc_manager->subscribe(this);
709718
}
710719

711720
SrsRtcTcpConn::~SrsRtcTcpConn()
712721
{
713-
_srs_rtc_manager->unsubscribe(this);
714-
trd_->interrupt();
715-
srs_freep(trd_);
716-
717722
srs_freepa(pkt_);
718723
srs_freep(delta_);
719724
srs_freep(skt_);
720725
}
721726

727+
void SrsRtcTcpConn::setup_owner(SrsSharedResource<SrsRtcTcpConn>* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid)
728+
{
729+
wrapper_ = wrapper;
730+
owner_coroutine_ = owner_coroutine;
731+
owner_cid_ = owner_cid;
732+
}
733+
722734
ISrsKbpsDelta* SrsRtcTcpConn::delta()
723735
{
724736
return delta_;
725737
}
726738

739+
void SrsRtcTcpConn::interrupt()
740+
{
741+
session_ = NULL;
742+
if (owner_coroutine_) owner_coroutine_->interrupt();
743+
}
744+
727745
std::string SrsRtcTcpConn::desc()
728746
{
729747
return "Tcp";
730748
}
731749

732750
const SrsContextId& SrsRtcTcpConn::get_id()
733751
{
734-
return trd_->cid();
752+
return cid_;
735753
}
736754

737755
std::string SrsRtcTcpConn::remote_ip()
738756
{
739757
return ip_;
740758
}
741759

742-
srs_error_t SrsRtcTcpConn::start()
760+
void SrsRtcTcpConn::on_executor_done(ISrsInterruptable* executor)
743761
{
744-
return trd_->start();
762+
owner_coroutine_ = NULL;
745763
}
746764

747765
srs_error_t SrsRtcTcpConn::cycle()
@@ -752,15 +770,10 @@ srs_error_t SrsRtcTcpConn::cycle()
752770
SrsStatistic::instance()->on_disconnect(get_id().c_str(), err);
753771
SrsStatistic::instance()->kbps_add_delta(get_id().c_str(), delta_);
754772

755-
// TODO: FIXME: Should manage RTC TCP connection by _srs_rtc_manager.
756-
// Because we use manager to manage this object, not the http connection object, so we must remove it here.
757-
manager_->remove(this);
758-
759-
// TODO: FIXME: When TCP connection(transport) closed, should notify session to dispose, should not free them simultaneously.
760773
// Only remove session when network is established, because client might use other UDP network.
761774
if(session_ && session_->tcp()->is_establelished()) {
762775
session_->tcp()->set_state(SrsRtcNetworkStateClosed);
763-
_srs_rtc_manager->remove(session_);
776+
session_->expire();
764777
}
765778

766779
// For HTTP-API timeout, we think it's done successfully,
@@ -801,13 +814,18 @@ srs_error_t SrsRtcTcpConn::do_cycle()
801814
{
802815
srs_error_t err = srs_success;
803816

817+
// Update all context id to cid of session.
818+
_srs_context->set_id(cid_);
819+
owner_cid_->set_cid(cid_);
820+
804821
if((err = handshake()) != srs_success) {
805822
return srs_error_wrap(err, "process rtc tcp pkt");
806823
}
807824

808825
// TODO: FIXME: Handle all bytes of TCP Connection.
809826
while(true) {
810-
if((err = trd_->pull()) != srs_success) {
827+
if (!owner_coroutine_) return err;
828+
if ((err = owner_coroutine_->pull()) != srs_success) {
811829
return srs_error_wrap(err, "rtc tcp conn");
812830
}
813831

@@ -859,11 +877,11 @@ srs_error_t SrsRtcTcpConn::handshake()
859877

860878
// Should support only one TCP candidate.
861879
SrsRtcTcpNetwork* network = dynamic_cast<SrsRtcTcpNetwork*>(session->tcp());
862-
if (!network->owner()) {
863-
network->set_owner(this);
880+
if (network->owner().get() != this) {
881+
network->set_owner(*wrapper_);
864882
session_ = session;
865883
}
866-
if (network->owner() != this) {
884+
if (network->owner().get() != this) {
867885
return srs_error_new(ERROR_RTC_TCP_UNIQUE, "only support one network");
868886
}
869887

@@ -939,21 +957,3 @@ srs_error_t SrsRtcTcpConn::on_tcp_pkt(char* pkt, int nb_pkt)
939957
return srs_error_new(ERROR_RTC_UDP, "unknown packet");
940958
}
941959

942-
void SrsRtcTcpConn::on_before_dispose(ISrsResource* c)
943-
{
944-
if (!session_) return;
945-
946-
SrsRtcConnection* conn = dynamic_cast<SrsRtcConnection*>(c);
947-
if(conn == session_) {
948-
session_ = NULL;
949-
// the related rtc connection will be disposed
950-
srs_trace("RTC: tcp conn diposing, because of rtc connection");
951-
trd_->interrupt();
952-
}
953-
}
954-
955-
void SrsRtcTcpConn::on_disposing(ISrsResource* c)
956-
{
957-
return;
958-
}
959-

trunk/src/app/srs_app_rtc_network.hpp

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ class SrsRtcTcpNetwork: public ISrsRtcNetwork
180180
private:
181181
// The DTLS transport over this network.
182182
ISrsRtcTransport* transport_;
183-
SrsRtcTcpConn* owner_;
183+
SrsSharedResource<SrsRtcTcpConn> owner_;
184184
private:
185185
std::string peer_ip_;
186186
int peer_port_;
@@ -189,8 +189,8 @@ class SrsRtcTcpNetwork: public ISrsRtcNetwork
189189
SrsRtcTcpNetwork(SrsRtcConnection* conn, SrsEphemeralDelta* delta);
190190
virtual ~SrsRtcTcpNetwork();
191191
public:
192-
void set_owner(SrsRtcTcpConn* v) { owner_ = v; }
193-
SrsRtcTcpConn* owner() { return owner_; }
192+
void set_owner(SrsSharedResource<SrsRtcTcpConn> v) { owner_ = v; }
193+
SrsSharedResource<SrsRtcTcpConn> owner() { return owner_; }
194194
void update_sendonly_socket(ISrsProtocolReadWriter* skt);
195195
//ISrsRtcNetwork
196196
public:
@@ -232,13 +232,9 @@ class SrsRtcTcpNetwork: public ISrsRtcNetwork
232232
};
233233

234234
// For WebRTC over TCP.
235-
class SrsRtcTcpConn : public ISrsConnection, public ISrsStartable, public ISrsCoroutineHandler, public ISrsDisposingHandler
235+
class SrsRtcTcpConn : public ISrsConnection, public ISrsCoroutineHandler, public ISrsExecutorHandler
236236
{
237237
private:
238-
// The manager object to manage the connection.
239-
ISrsResourceManager* manager_;
240-
// Use a coroutine to serve the TCP connection.
241-
SrsCoroutine* trd_;
242238
// The ip and port of client.
243239
std::string ip_;
244240
int port_;
@@ -249,21 +245,34 @@ class SrsRtcTcpConn : public ISrsConnection, public ISrsStartable, public ISrsCo
249245
ISrsProtocolReadWriter* skt_;
250246
// Packet cache.
251247
char* pkt_;
252-
public:
253-
SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port, ISrsResourceManager* cm);
248+
private:
249+
// The shared resource which own this object, we should never free it because it's managed by shared ptr.
250+
SrsSharedResource<SrsRtcTcpConn>* wrapper_;
251+
// The owner coroutine, allow user to interrupt the loop.
252+
ISrsInterruptable* owner_coroutine_;
253+
ISrsContextIdSetter* owner_cid_;
254+
SrsContextId cid_;
255+
public:
256+
SrsRtcTcpConn();
257+
SrsRtcTcpConn(ISrsProtocolReadWriter* skt, std::string cip, int port);
254258
virtual ~SrsRtcTcpConn();
259+
public:
260+
// Setup the owner, the wrapper is the shared ptr, the interruptable object is the coroutine, and the cid is the context id.
261+
void setup_owner(SrsSharedResource<SrsRtcTcpConn>* wrapper, ISrsInterruptable* owner_coroutine, ISrsContextIdSetter* owner_cid);
255262
public:
256263
ISrsKbpsDelta* delta();
264+
// Interrupt transport by session.
265+
void interrupt();
257266
// Interface ISrsResource.
258267
public:
259268
virtual std::string desc();
260269
virtual const SrsContextId& get_id();
261270
// Interface ISrsConnection.
262271
public:
263272
virtual std::string remote_ip();
264-
// Interface ISrsStartable
273+
// Interface ISrsExecutorHandler
265274
public:
266-
virtual srs_error_t start();
275+
virtual void on_executor_done(ISrsInterruptable* executor);
267276
// Interface ISrsCoroutineHandler
268277
public:
269278
virtual srs_error_t cycle();
@@ -273,10 +282,6 @@ class SrsRtcTcpConn : public ISrsConnection, public ISrsStartable, public ISrsCo
273282
srs_error_t read_packet(char* pkt, int* nb_pkt);
274283
srs_error_t on_stun(char* pkt, int nb_pkt);
275284
srs_error_t on_tcp_pkt(char* pkt, int nb_pkt);
276-
// Interface of ISrsDisposingHandler
277-
public:
278-
virtual void on_before_dispose(ISrsResource* c);
279-
virtual void on_disposing(ISrsResource* c);
280285
};
281286

282287
#endif

trunk/src/app/srs_app_rtc_source.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ const int kVideoSamplerate = 90000;
5959

6060
using namespace std;
6161

62+
#ifdef SRS_FFMPEG_FIT
6263
// The RTP payload max size, reserved some paddings for SRTP as such:
6364
// kRtpPacketSize = kRtpMaxPayloadSize + paddings
6465
// For example, if kRtpPacketSize is 1500, recommend to set kRtpMaxPayloadSize to 1400,
@@ -68,6 +69,7 @@ using namespace std;
6869
// so we set kRtpMaxPayloadSize = 1200.
6970
// see @doc https://groups.google.com/g/discuss-webrtc/c/gH5ysR3SoZI
7071
const int kRtpMaxPayloadSize = kRtpPacketSize - 300;
72+
#endif
7173

7274
// TODO: Add this function into SrsRtpMux class.
7375
srs_error_t aac_raw_append_adts_header(SrsSharedPtrMessage* shared_audio, SrsFormat* format, char** pbuf, int* pnn_buf)

trunk/src/app/srs_app_server.cpp

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ using namespace std;
3939
#include <srs_app_conn.hpp>
4040
#ifdef SRS_RTC
4141
#include <srs_app_rtc_network.hpp>
42+
#include <srs_app_rtc_server.hpp>
4243
#endif
4344
#ifdef SRS_GB28181
4445
#include <srs_app_gb28181.hpp>
@@ -1193,8 +1194,7 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener* listener, srs_netfd_t& stf
11931194
if (nn == 10 && b[0] == 0 && b[2] == 0 && b[3] == 1 && b[1] - b[5] == 20
11941195
&& b[6] == 0x21 && b[7] == 0x12 && b[8] == 0xa4 && b[9] == 0x42
11951196
) {
1196-
// TODO: FIXME: Should manage this connection by _srs_rtc_manager
1197-
resource = new SrsRtcTcpConn(io, ip, port, this);
1197+
resource = new SrsRtcTcpConn(io, ip, port);
11981198
} else {
11991199
resource = new SrsHttpxConn(listener == http_listener_, this, io, http_server, ip, port);
12001200
}
@@ -1213,8 +1213,7 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener* listener, srs_netfd_t& stf
12131213
resource = new SrsHttpxConn(is_https, this, new SrsTcpConnection(stfd2), http_server, ip, port);
12141214
#ifdef SRS_RTC
12151215
} else if (listener == webrtc_listener_) {
1216-
// TODO: FIXME: Should manage this connection by _srs_rtc_manager
1217-
resource = new SrsRtcTcpConn(new SrsTcpConnection(stfd2), ip, port, this);
1216+
resource = new SrsRtcTcpConn(new SrsTcpConnection(stfd2), ip, port);
12181217
#endif
12191218
} else if (listener == exporter_listener_) {
12201219
// TODO: FIXME: Maybe should support https metrics.
@@ -1227,11 +1226,28 @@ srs_error_t SrsServer::do_on_tcp_client(ISrsListener* listener, srs_netfd_t& stf
12271226
}
12281227
}
12291228

1229+
#ifdef SRS_RTC
1230+
// For RTC TCP connection, use resource executor to manage the resource.
1231+
SrsRtcTcpConn* raw_conn = dynamic_cast<SrsRtcTcpConn*>(resource);
1232+
if (raw_conn) {
1233+
SrsSharedResource<SrsRtcTcpConn>* conn = new SrsSharedResource<SrsRtcTcpConn>(raw_conn);
1234+
SrsExecutorCoroutine* executor = new SrsExecutorCoroutine(_srs_rtc_manager, conn, raw_conn, raw_conn);
1235+
raw_conn->setup_owner(conn, executor, executor);
1236+
if ((err = executor->start()) != srs_success) {
1237+
srs_freep(executor);
1238+
return srs_error_wrap(err, "start executor");
1239+
}
1240+
return err;
1241+
}
1242+
#endif
1243+
12301244
// Use connection manager to manage all the resources.
1245+
srs_assert(resource);
12311246
conn_manager->add(resource);
12321247

12331248
// If connection is a resource to start, start a coroutine to handle it.
12341249
ISrsStartable* conn = dynamic_cast<ISrsStartable*>(resource);
1250+
srs_assert(conn);
12351251
if ((err = conn->start()) != srs_success) {
12361252
return srs_error_wrap(err, "start conn coroutine");
12371253
}

0 commit comments

Comments
 (0)