@@ -2,8 +2,10 @@ package main
22
33import (
44 "fmt"
5+ "io"
56 "net"
67
8+ "github.com/riotpot/pkg/models"
79 "github.com/riotpot/pkg/services"
810 "github.com/riotpot/tools/errors"
911 "github.com/xiegeo/modbusone"
@@ -32,16 +34,23 @@ func Modbusd() services.Service {
3234 Protocol : "tcp" ,
3335 }
3436
37+ handler := handler ()
38+
3539 return & Modbus {
3640 mx ,
41+ handler ,
3742 }
3843}
3944
4045type Modbus struct {
4146 services.MixinService
47+ handler modbusone.ProtocolHandler
4248}
4349
4450func (m * Modbus ) Run () (err error ) {
51+ // before running, migrate the model that we want to store
52+ m .Migrate (& models.Connection {})
53+
4554 // convert the port number to a string that we can use it in the server
4655 var port = fmt .Sprintf (":%d" , m .Port )
4756
@@ -52,35 +61,134 @@ func (m *Modbus) Run() (err error) {
5261 // create the channel for stopping the service
5362 m .StopCh = make (chan int , 1 )
5463
55- // serve the app
56- server := modbusone . NewTCPServer ( listener )
57- m .serve (server )
64+ // build a channel stack to receive connections to the service
65+ conn := make ( chan net. Conn )
66+ m .serve (conn , listener )
5867
5968 // update the status of the service
6069 m .Running <- true
6170
6271 // block here until we receive an stopping signal in the channel
6372 <- m .StopCh
6473
65- // close the server
66- defer server .Close ()
67-
6874 // Close the channel for stopping the service
6975 fmt .Print ("[x] Service stopped...\n " )
7076 close (m .StopCh )
7177
7278 return
7379}
7480
75- func (m * Modbus ) serve (server modbusone.ServerCloser ) {
81+ // Open the service and listen for connections
82+ // inspired on https://gist.github.com/paulsmith/775764#file-echo-go
83+ func (m * Modbus ) serve (ch chan net.Conn , listener net.Listener ) {
84+ // open an infinite loop to receive connections
7685 fmt .Printf ("[%s] Started listenning for connections in port %d\n " , Name , m .Port )
86+ for {
87+ // Accept the client connection
88+ client , err := listener .Accept ()
89+ if err != nil {
90+ return
91+ }
92+ defer client .Close ()
93+
94+ // push the client connection to the channel
95+ ch <- client
96+ }
97+ }
98+
99+ // Handle the pool of connections to the service
100+ func (m * Modbus ) handlePool (ch chan net.Conn ) {
101+ // open an infinite loop to handle the connections
102+ for {
103+ // while the `stop` channel remains empty, continue handling
104+ // new connections.
105+ select {
106+ case <- m .StopCh :
107+ // stop the pool
108+ fmt .Printf ("[x] Stopping %s service...\n " , m .Name )
109+ // update the status of the service
110+ m .Running <- false
111+ return
112+ case conn := <- ch :
113+ // use one goroutine per connection.
114+ go m .handleSession (conn )
115+ }
116+ }
117+ }
118+
119+ // Handle a connection made to the service
120+ // We are rewriting this function so we can interact with the connection loop
121+ // and rewrite some of the underlying function. Unfortunately, the loop
122+ // is deep and contains a goroutine in it as main handler of the session connections.
123+ func (m * Modbus ) handleSession (conn net.Conn ) {
124+ for {
125+ defer conn .Close ()
126+
127+ wec := func (conn net.Conn , bs []byte , req modbusone.PDU , err error ) {
128+ writeTCP (conn , bs , modbusone .ExceptionReplyPacket (req , modbusone .ToExceptionCode (err )))
129+ }
130+
131+ var rb []byte
132+ if modbusone .OverSizeSupport {
133+ rb = make (
134+ []byte ,
135+ modbusone .MBAPHeaderLength + modbusone .OverSizeMaxRTU + modbusone .TCPHeaderLength ,
136+ )
137+ } else {
138+ rb = make ([]byte , modbusone .MBAPHeaderLength + modbusone .MaxPDUSize )
139+ }
140+
141+ for {
142+ n , err := readTCP (conn , rb )
143+ if err != nil {
144+ return
145+ }
146+
147+ // load the payload from the packet i.e. everything after the header
148+ p := modbusone .PDU (rb [modbusone .MBAPHeaderLength :n ])
149+
150+ // save the request connection with the request payload in it.
151+ m .save (conn , p )
152+
153+ // validate the request, checks for errors on the code and the
154+ // length of the payload
155+ err = p .ValidateRequest ()
156+ if err != nil {
157+ m .save (conn , p )
158+ return
159+ }
160+
161+ fc := p .GetFunctionCode ()
162+
163+ // initialize the data. It will be filled with the information
164+ // in the payload.
165+ var data []byte
166+
167+ // Only two things can happen,
168+ // either read from the server or write to it.
169+ if fc .IsReadToServer () {
170+ data , err = m .handler .OnRead (p )
171+ if err != nil {
172+ wec (conn , rb , p , err )
173+ continue
174+ }
175+ writeTCP (conn , rb , p .MakeReadReply (data ))
176+ } else if fc .IsWriteToServer () {
177+ data , err = p .GetRequestValues ()
178+ if err != nil {
179+ wec (conn , rb , p , err )
180+ continue
181+ }
182+ err = m .handler .OnWrite (p , data )
183+ if err != nil {
184+ wec (conn , rb , p , err )
185+ continue
186+ }
187+ writeTCP (conn , rb , p .MakeWriteReply ())
188+ }
77189
78- // since we don't have to handle the connection as the server already
79- // has a handler, we just run the server in another goroutine
80- go func () {
81- err := server .Serve (m .handler ("server" ))
82- errors .Raise (err )
83- }()
190+ }
191+ }
84192}
85193
86194// Simple handler for the Modbus functions.
@@ -97,7 +205,7 @@ func (m *Modbus) serve(server modbusone.ServerCloser) {
97205// 16: multiple 6
98206// Generarly, we will either read a quantity of an unsigned int 16, or write
99207// booleans (0 or 1) plus the address.
100- func ( m * Modbus ) handler (name string ) modbusone.ProtocolHandler {
208+ func handler () modbusone.ProtocolHandler {
101209 return & modbusone.SimpleHandler {
102210
103211 // Discrete Inputs
@@ -151,3 +259,47 @@ func (m *Modbus) handler(name string) modbusone.ProtocolHandler {
151259 },
152260 }
153261}
262+
263+ // Copy pasted from https://github.com/xiegeo/modbusone/blob/797d647e237d97ab9d2bdad49bf42591ea7076f2/tcp_server.go#L19
264+ // both functions are just packet handlers that read the content and headers.
265+ // It is unnecessary to reimplement something already done, however, this function
266+ // are now exported but still necessary for our connection handler.
267+ func readTCP (r io.Reader , bs []byte ) (n int , err error ) {
268+ n , err = io .ReadFull (r , bs [:modbusone .TCPHeaderLength ])
269+ if err != nil {
270+ return n , err
271+ }
272+ if bs [2 ] != 0 || bs [3 ] != 0 {
273+ return n , fmt .Errorf ("MBAP protocol of %X %X is unknown" , bs [2 ], bs [3 ])
274+ }
275+ l := int (bs [4 ])* 256 + int (bs [5 ])
276+ if l <= 2 {
277+ return n , fmt .Errorf ("MBAP data length of %v is too short, bs:%x" , l , bs [:n ])
278+ }
279+ if len (bs ) < l + modbusone .TCPHeaderLength {
280+ return n , fmt .Errorf ("MBAP data length of %v is too long" , l )
281+ }
282+ n , err = io .ReadFull (r , bs [modbusone .TCPHeaderLength :l + modbusone .TCPHeaderLength ])
283+ return n + modbusone .TCPHeaderLength , err
284+ }
285+
286+ //writeTCP writes a PDU packet on TCP reusing the headers and buffer space in bs
287+ func writeTCP (w io.Writer , bs []byte , pdu modbusone.PDU ) (int , error ) {
288+ l := len (pdu ) + 1 //pdu + byte of slaveID
289+ bs [4 ] = byte (l / 256 )
290+ bs [5 ] = byte (l )
291+ copy (bs [modbusone .MBAPHeaderLength :], pdu )
292+ return w .Write (bs [:len (pdu )+ modbusone .MBAPHeaderLength ])
293+ }
294+
295+ func (m * Modbus ) save (conn net.Conn , payload []byte ) {
296+ connection := models .NewConnection ()
297+ connection .LocalAddress = conn .LocalAddr ().String ()
298+ connection .RemoteAddress = conn .RemoteAddr ().String ()
299+ connection .Protocol = "TCP"
300+ connection .Service = Name
301+ connection .Incoming = true
302+ connection .Payload = string (payload )
303+
304+ m .Store (connection )
305+ }
0 commit comments