|
1 | 1 | package center |
2 | 2 |
|
3 | 3 | import ( |
4 | | - "io" |
5 | 4 | "sync/atomic" |
6 | 5 | "time" |
7 | 6 |
|
8 | 7 | tea "charm.land/bubbletea/v2" |
9 | 8 |
|
10 | | - appPty "github.com/andyrewlee/amux/internal/pty" |
11 | | - "github.com/andyrewlee/amux/internal/safego" |
| 9 | + "github.com/andyrewlee/amux/internal/ui/common" |
12 | 10 | ) |
13 | 11 |
|
14 | 12 | func (m *Model) flushTiming(tab *Tab, active bool) (time.Duration, time.Duration) { |
@@ -100,172 +98,23 @@ func (m *Model) busyPTYTabCount(now time.Time) int { |
100 | 98 | } |
101 | 99 |
|
102 | 100 | func (m *Model) forwardPTYMsgs(msgCh <-chan tea.Msg) { |
103 | | - for msg := range msgCh { |
104 | | - if msg == nil { |
105 | | - continue |
106 | | - } |
107 | | - out, ok := msg.(PTYOutput) |
108 | | - if !ok { |
109 | | - if m.msgSink != nil { |
110 | | - m.msgSink(msg) |
111 | | - } |
112 | | - continue |
113 | | - } |
114 | | - |
115 | | - merged := out |
116 | | - for { |
117 | | - select { |
118 | | - case next, ok := <-msgCh: |
119 | | - if !ok { |
120 | | - if m.msgSink != nil && len(merged.Data) > 0 { |
121 | | - m.msgSink(merged) |
122 | | - } |
123 | | - return |
124 | | - } |
125 | | - if next == nil { |
126 | | - continue |
127 | | - } |
128 | | - if nextOut, ok := next.(PTYOutput); ok && |
129 | | - nextOut.WorkspaceID == merged.WorkspaceID && |
130 | | - nextOut.TabID == merged.TabID { |
131 | | - merged.Data = append(merged.Data, nextOut.Data...) |
132 | | - if len(merged.Data) >= ptyMaxPendingBytes { |
133 | | - if m.msgSink != nil && len(merged.Data) > 0 { |
134 | | - m.msgSink(merged) |
135 | | - } |
136 | | - merged.Data = nil |
137 | | - } |
138 | | - continue |
139 | | - } |
140 | | - if m.msgSink != nil && len(merged.Data) > 0 { |
141 | | - m.msgSink(merged) |
142 | | - } |
143 | | - if m.msgSink != nil { |
144 | | - m.msgSink(next) |
145 | | - } |
146 | | - goto nextMsg |
147 | | - default: |
148 | | - if m.msgSink != nil && len(merged.Data) > 0 { |
149 | | - m.msgSink(merged) |
150 | | - } |
151 | | - goto nextMsg |
152 | | - } |
153 | | - } |
154 | | - nextMsg: |
155 | | - } |
156 | | -} |
157 | | - |
158 | | -func runPTYReader(term *appPty.Terminal, msgCh chan tea.Msg, cancel <-chan struct{}, wtID string, tabID TabID, heartbeat *int64) { |
159 | | - // Ensure msgCh is always closed even if we panic, so forwardPTYMsgs doesn't block forever. |
160 | | - // The inner recover() catches double-close panics from existing close(msgCh) calls. |
161 | | - defer func() { |
162 | | - defer func() { _ = recover() }() |
163 | | - close(msgCh) |
164 | | - }() |
165 | | - |
166 | | - if term == nil { |
167 | | - return |
168 | | - } |
169 | | - beat := func() { |
170 | | - if heartbeat != nil { |
171 | | - atomic.StoreInt64(heartbeat, time.Now().UnixNano()) |
172 | | - } |
173 | | - } |
174 | | - beat() |
175 | | - |
176 | | - dataCh := make(chan []byte, ptyReadQueueSize) |
177 | | - errCh := make(chan error, 1) |
178 | | - |
179 | | - safego.Go("center.pty_read_loop", func() { |
180 | | - buf := make([]byte, ptyReadBufferSize) |
181 | | - for { |
182 | | - n, err := term.Read(buf) |
183 | | - if err != nil { |
184 | | - select { |
185 | | - case errCh <- err: |
186 | | - default: |
187 | | - } |
188 | | - close(dataCh) |
189 | | - return |
190 | | - } |
191 | | - if n == 0 { |
192 | | - continue |
193 | | - } |
194 | | - beat() |
195 | | - chunk := make([]byte, n) |
196 | | - copy(chunk, buf[:n]) |
197 | | - select { |
198 | | - case dataCh <- chunk: |
199 | | - case <-cancel: |
200 | | - return |
| 101 | + common.ForwardPTYMsgs(msgCh, m.msgSink, common.OutputMerger{ |
| 102 | + ExtractData: func(msg tea.Msg) ([]byte, bool) { |
| 103 | + if out, ok := msg.(PTYOutput); ok { |
| 104 | + return out.Data, true |
201 | 105 | } |
202 | | - } |
| 106 | + return nil, false |
| 107 | + }, |
| 108 | + CanMerge: func(cur, next tea.Msg) bool { |
| 109 | + c, _ := cur.(PTYOutput) |
| 110 | + n, _ := next.(PTYOutput) |
| 111 | + return c.WorkspaceID == n.WorkspaceID && c.TabID == n.TabID |
| 112 | + }, |
| 113 | + Build: func(first tea.Msg, data []byte) tea.Msg { |
| 114 | + out, _ := first.(PTYOutput) |
| 115 | + out.Data = data |
| 116 | + return out |
| 117 | + }, |
| 118 | + MaxPending: ptyMaxPendingBytes, |
203 | 119 | }) |
204 | | - |
205 | | - ticker := time.NewTicker(ptyFrameInterval) |
206 | | - defer ticker.Stop() |
207 | | - |
208 | | - var pending []byte |
209 | | - var stoppedErr error |
210 | | - |
211 | | - for { |
212 | | - select { |
213 | | - case <-cancel: |
214 | | - close(msgCh) |
215 | | - return |
216 | | - case err := <-errCh: |
217 | | - beat() |
218 | | - stoppedErr = err |
219 | | - case data, ok := <-dataCh: |
220 | | - beat() |
221 | | - if !ok { |
222 | | - if len(pending) > 0 { |
223 | | - if !sendPTYMsg(msgCh, cancel, PTYOutput{WorkspaceID: wtID, TabID: tabID, Data: pending}) { |
224 | | - close(msgCh) |
225 | | - return |
226 | | - } |
227 | | - } |
228 | | - if stoppedErr == nil { |
229 | | - stoppedErr = io.EOF |
230 | | - } |
231 | | - sendPTYMsg(msgCh, cancel, PTYStopped{WorkspaceID: wtID, TabID: tabID, Err: stoppedErr}) |
232 | | - close(msgCh) |
233 | | - return |
234 | | - } |
235 | | - pending = append(pending, data...) |
236 | | - if len(pending) >= ptyMaxPendingBytes { |
237 | | - if !sendPTYMsg(msgCh, cancel, PTYOutput{WorkspaceID: wtID, TabID: tabID, Data: pending}) { |
238 | | - close(msgCh) |
239 | | - return |
240 | | - } |
241 | | - pending = nil |
242 | | - } |
243 | | - case <-ticker.C: |
244 | | - beat() |
245 | | - if len(pending) > 0 { |
246 | | - if !sendPTYMsg(msgCh, cancel, PTYOutput{WorkspaceID: wtID, TabID: tabID, Data: pending}) { |
247 | | - close(msgCh) |
248 | | - return |
249 | | - } |
250 | | - pending = nil |
251 | | - } |
252 | | - if stoppedErr != nil { |
253 | | - sendPTYMsg(msgCh, cancel, PTYStopped{WorkspaceID: wtID, TabID: tabID, Err: stoppedErr}) |
254 | | - close(msgCh) |
255 | | - return |
256 | | - } |
257 | | - } |
258 | | - } |
259 | | -} |
260 | | - |
261 | | -func sendPTYMsg(msgCh chan tea.Msg, cancel <-chan struct{}, msg tea.Msg) bool { |
262 | | - if msgCh == nil { |
263 | | - return false |
264 | | - } |
265 | | - select { |
266 | | - case <-cancel: |
267 | | - return false |
268 | | - case msgCh <- msg: |
269 | | - return true |
270 | | - } |
271 | 120 | } |
0 commit comments