-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexample3.js
More file actions
92 lines (79 loc) · 2.65 KB
/
example3.js
File metadata and controls
92 lines (79 loc) · 2.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
const { RaftNode, FsLog, TimeoutLog } = require('./src/index.js')
const { XxHashEncoder } = require('./src/index.js')
const sodium = require('libsodium-wrappers')
const { tcpServer, tcpClient } = require('./src/index.js')
function errCb(err) {
console.error('error', err)
process.exit(1)
}
function opts() {
let myCount = 0n
const apply = (bufs, seq) => {
const results = []
bufs.forEach((buf) => results.push(buf ? ++myCount : null))
return results
}
const read = () => myCount
const groupFn = (nodes) => nodes.length >= 2
return { apply, read, group: 'name', groupFn }
}
function node(key, id, ids) {
const clients = {}
const send = (to, msg) => {
let client = clients[to]
if (!client) {
const [host, port] = to.split(`:`)
client = clients[to] = tcpClient(key, host, parseInt(port)).then((sock) => sock)
}
return client.then((sock) => sock.write(msg))
}
const encoder = new XxHashEncoder(true)
let log = new FsLog('/tmp/', 'node'+id, { encoder })
log = new TimeoutLog(log, { default: 1_000 })
const node = new RaftNode(id, ids, send, log, opts)
const port = parseInt(id.split(`:`)[1])
const msgCb = (sock, msg) => node.onReceive(msg.from, msg)
return tcpServer(key, port, msgCb, errCb).then((srv) => {
node.clients = clients
node.srv = srv
return node
})
}
async function main() {
console.log('boot')
await sodium.ready
const key = sodium.crypto_generichash(32, sodium.from_string('secret'))
const ids = new Array(3).fill(0).map((z, idx) => `127.0.0.1:${9000 + idx + 1}`)
let nodes = ids.map((id) => node(key, id, ids))
nodes = await Promise.all(nodes)
await Promise.all(nodes.map((node) => node.log.del()))
await Promise.all(nodes.map((node) => node.open()))
await Promise.all(nodes.map((node) => node.awaitLeader(1)))
console.log('ready')
const buf = Buffer.from(new Array(1024).fill('a').join(''), 'utf8')
let bufs = []
const producer = setInterval(() => {
bufs.push(buf)
bufs.push(buf)
bufs.push(buf)
}, 500)
const leader = nodes.find((node) => node.state === 'leader')
const consumer = setInterval(() => {
const copy = [...bufs]
bufs = []
leader.appendBatch(copy).then((ok) => {
const [seq, count] = ok
console.log(seq, count)
}).catch(errCb)
}, 500)
const end = () => {
clearInterval(producer)
clearInterval(consumer)
Promise.all(nodes.map((node) => node.close())).then(() => {
nodes.map((node) => Object.values(node.clients)).flat().forEach((client) => client.then((conn) => conn.destroy()))
nodes.forEach((node) => node.srv.close())
}).catch(errCb)
}
setTimeout(end, 10_000)
}
main().catch(errCb)