Skip to content

Commit ed1b2b1

Browse files
committed
Included .env file and #30 Modbus emulation
1 parent 32ec6b9 commit ed1b2b1

4 files changed

Lines changed: 174 additions & 21 deletions

File tree

.gitignore

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,6 @@ bin/
2424
## VSCode workspace file
2525
workspace*.code-*
2626

27-
# environment files (not example)
28-
.env
29-
!.env.example*
30-
!*.example
31-
3227
# Vendor folder
3328
/vendor/
3429

build/docker/Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ COPY go.sum .
1616
# download all the dependencies
1717
RUN go mod download
1818

19-
# install netcat
19+
# install netcat so we can ping the db until it is ready
2020
RUN apt-get update && apt-get install -y netcat
2121

2222
# Copy everything into the image
@@ -29,8 +29,8 @@ COPY . .
2929
# Disclaimer: if you comment this line, be 100% sure that the binary can be run on linux
3030
RUN make riotpot-builder
3131

32+
# give permissions to the entrypoint to run the file
3233
RUN chmod +x build/docker/entrypoint.sh
3334

3435
# Run RIoTPot
3536
ENTRYPOINT [ "./build/docker/entrypoint.sh" ]
36-
#CMD ["./riotpot"]

build/env/.env

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
AUTOD=false
2+
DB_HOST=postgres
3+
DB_USER=user
4+
DB_PASS=password
5+
DB_NAME=db
6+
DB_PORT=5432

pkg/plugin/modbusd/modbusd.go

Lines changed: 166 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ package main
22

33
import (
44
"fmt"
5+
"io"
56
"net"
67

8+
"github.com/riotpot/pkg/models"
79
"github.com/riotpot/pkg/services"
810
"github.com/riotpot/tools/errors"
911
"github.com/xiegeo/modbusone"
@@ -32,16 +34,23 @@ func Modbusd() services.Service {
3234
Protocol: "tcp",
3335
}
3436

37+
handler := handler()
38+
3539
return &Modbus{
3640
mx,
41+
handler,
3742
}
3843
}
3944

4045
type Modbus struct {
4146
services.MixinService
47+
handler modbusone.ProtocolHandler
4248
}
4349

4450
func (m *Modbus) Run() (err error) {
51+
// before running, migrate the model that we want to store
52+
m.Migrate(&models.Connection{})
53+
4554
// convert the port number to a string that we can use it in the server
4655
var port = fmt.Sprintf(":%d", m.Port)
4756

@@ -52,35 +61,134 @@ func (m *Modbus) Run() (err error) {
5261
// create the channel for stopping the service
5362
m.StopCh = make(chan int, 1)
5463

55-
// serve the app
56-
server := modbusone.NewTCPServer(listener)
57-
m.serve(server)
64+
// build a channel stack to receive connections to the service
65+
conn := make(chan net.Conn)
66+
m.serve(conn, listener)
5867

5968
// update the status of the service
6069
m.Running <- true
6170

6271
// block here until we receive an stopping signal in the channel
6372
<-m.StopCh
6473

65-
// close the server
66-
defer server.Close()
67-
6874
// Close the channel for stopping the service
6975
fmt.Print("[x] Service stopped...\n")
7076
close(m.StopCh)
7177

7278
return
7379
}
7480

75-
func (m *Modbus) serve(server modbusone.ServerCloser) {
81+
// Open the service and listen for connections
82+
// inspired on https://gist.github.com/paulsmith/775764#file-echo-go
83+
func (m *Modbus) serve(ch chan net.Conn, listener net.Listener) {
84+
// open an infinite loop to receive connections
7685
fmt.Printf("[%s] Started listenning for connections in port %d\n", Name, m.Port)
86+
for {
87+
// Accept the client connection
88+
client, err := listener.Accept()
89+
if err != nil {
90+
return
91+
}
92+
defer client.Close()
93+
94+
// push the client connection to the channel
95+
ch <- client
96+
}
97+
}
98+
99+
// Handle the pool of connections to the service
100+
func (m *Modbus) handlePool(ch chan net.Conn) {
101+
// open an infinite loop to handle the connections
102+
for {
103+
// while the `stop` channel remains empty, continue handling
104+
// new connections.
105+
select {
106+
case <-m.StopCh:
107+
// stop the pool
108+
fmt.Printf("[x] Stopping %s service...\n", m.Name)
109+
// update the status of the service
110+
m.Running <- false
111+
return
112+
case conn := <-ch:
113+
// use one goroutine per connection.
114+
go m.handleSession(conn)
115+
}
116+
}
117+
}
118+
119+
// Handle a connection made to the service
120+
// We are rewriting this function so we can interact with the connection loop
121+
// and rewrite some of the underlying function. Unfortunately, the loop
122+
// is deep and contains a goroutine in it as main handler of the session connections.
123+
func (m *Modbus) handleSession(conn net.Conn) {
124+
for {
125+
defer conn.Close()
126+
127+
wec := func(conn net.Conn, bs []byte, req modbusone.PDU, err error) {
128+
writeTCP(conn, bs, modbusone.ExceptionReplyPacket(req, modbusone.ToExceptionCode(err)))
129+
}
130+
131+
var rb []byte
132+
if modbusone.OverSizeSupport {
133+
rb = make(
134+
[]byte,
135+
modbusone.MBAPHeaderLength+modbusone.OverSizeMaxRTU+modbusone.TCPHeaderLength,
136+
)
137+
} else {
138+
rb = make([]byte, modbusone.MBAPHeaderLength+modbusone.MaxPDUSize)
139+
}
140+
141+
for {
142+
n, err := readTCP(conn, rb)
143+
if err != nil {
144+
return
145+
}
146+
147+
// load the payload from the packet i.e. everything after the header
148+
p := modbusone.PDU(rb[modbusone.MBAPHeaderLength:n])
149+
150+
// save the request connection with the request payload in it.
151+
m.save(conn, p)
152+
153+
// validate the request, checks for errors on the code and the
154+
// length of the payload
155+
err = p.ValidateRequest()
156+
if err != nil {
157+
m.save(conn, p)
158+
return
159+
}
160+
161+
fc := p.GetFunctionCode()
162+
163+
// initialize the data. It will be filled with the information
164+
// in the payload.
165+
var data []byte
166+
167+
// Only two things can happen,
168+
// either read from the server or write to it.
169+
if fc.IsReadToServer() {
170+
data, err = m.handler.OnRead(p)
171+
if err != nil {
172+
wec(conn, rb, p, err)
173+
continue
174+
}
175+
writeTCP(conn, rb, p.MakeReadReply(data))
176+
} else if fc.IsWriteToServer() {
177+
data, err = p.GetRequestValues()
178+
if err != nil {
179+
wec(conn, rb, p, err)
180+
continue
181+
}
182+
err = m.handler.OnWrite(p, data)
183+
if err != nil {
184+
wec(conn, rb, p, err)
185+
continue
186+
}
187+
writeTCP(conn, rb, p.MakeWriteReply())
188+
}
77189

78-
// since we don't have to handle the connection as the server already
79-
// has a handler, we just run the server in another goroutine
80-
go func() {
81-
err := server.Serve(m.handler("server"))
82-
errors.Raise(err)
83-
}()
190+
}
191+
}
84192
}
85193

86194
// Simple handler for the Modbus functions.
@@ -97,7 +205,7 @@ func (m *Modbus) serve(server modbusone.ServerCloser) {
97205
// 16: multiple 6
98206
// Generarly, we will either read a quantity of an unsigned int 16, or write
99207
// booleans (0 or 1) plus the address.
100-
func (m *Modbus) handler(name string) modbusone.ProtocolHandler {
208+
func handler() modbusone.ProtocolHandler {
101209
return &modbusone.SimpleHandler{
102210

103211
// Discrete Inputs
@@ -151,3 +259,47 @@ func (m *Modbus) handler(name string) modbusone.ProtocolHandler {
151259
},
152260
}
153261
}
262+
263+
// Copy pasted from https://github.com/xiegeo/modbusone/blob/797d647e237d97ab9d2bdad49bf42591ea7076f2/tcp_server.go#L19
264+
// both functions are just packet handlers that read the content and headers.
265+
// It is unnecessary to reimplement something already done, however, this function
266+
// are now exported but still necessary for our connection handler.
267+
func readTCP(r io.Reader, bs []byte) (n int, err error) {
268+
n, err = io.ReadFull(r, bs[:modbusone.TCPHeaderLength])
269+
if err != nil {
270+
return n, err
271+
}
272+
if bs[2] != 0 || bs[3] != 0 {
273+
return n, fmt.Errorf("MBAP protocol of %X %X is unknown", bs[2], bs[3])
274+
}
275+
l := int(bs[4])*256 + int(bs[5])
276+
if l <= 2 {
277+
return n, fmt.Errorf("MBAP data length of %v is too short, bs:%x", l, bs[:n])
278+
}
279+
if len(bs) < l+modbusone.TCPHeaderLength {
280+
return n, fmt.Errorf("MBAP data length of %v is too long", l)
281+
}
282+
n, err = io.ReadFull(r, bs[modbusone.TCPHeaderLength:l+modbusone.TCPHeaderLength])
283+
return n + modbusone.TCPHeaderLength, err
284+
}
285+
286+
//writeTCP writes a PDU packet on TCP reusing the headers and buffer space in bs
287+
func writeTCP(w io.Writer, bs []byte, pdu modbusone.PDU) (int, error) {
288+
l := len(pdu) + 1 //pdu + byte of slaveID
289+
bs[4] = byte(l / 256)
290+
bs[5] = byte(l)
291+
copy(bs[modbusone.MBAPHeaderLength:], pdu)
292+
return w.Write(bs[:len(pdu)+modbusone.MBAPHeaderLength])
293+
}
294+
295+
func (m *Modbus) save(conn net.Conn, payload []byte) {
296+
connection := models.NewConnection()
297+
connection.LocalAddress = conn.LocalAddr().String()
298+
connection.RemoteAddress = conn.RemoteAddr().String()
299+
connection.Protocol = "TCP"
300+
connection.Service = Name
301+
connection.Incoming = true
302+
connection.Payload = string(payload)
303+
304+
m.Store(connection)
305+
}

0 commit comments

Comments
 (0)